http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
deleted file mode 100644
index 80ae294..0000000
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ /dev/null
@@ -1,991 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.fs;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.file.DataFileConstants;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericData.StringType;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichFilterFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.testutils.MultiShotLatch;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import 
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.AvroKeyValue;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.NetUtils;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Tests for {@link RollingSink}. These
- * tests test the different output methods as well as the rolling feature 
using a manual clock
- * that increases time in lockstep with element computation using latches.
- *
- * <p>
- * This only tests the rolling behaviour of the sink. There is a separate 
ITCase that verifies
- * exactly once behaviour.
- *
- * @deprecated should be removed with the {@link RollingSink}.
- */
-@Deprecated
-public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
-
-       protected static final Logger LOG = 
LoggerFactory.getLogger(RollingSinkITCase.class);
-
-       @ClassRule
-       public static TemporaryFolder tempFolder = new TemporaryFolder();
-
-       protected static MiniDFSCluster hdfsCluster;
-       protected static org.apache.hadoop.fs.FileSystem dfs;
-       protected static String hdfsURI;
-       protected static Configuration conf = new Configuration();
-
-       protected static File dataDir;
-
-       @BeforeClass
-       public static void createHDFS() throws IOException {
-
-               LOG.info("In RollingSinkITCase: Starting MiniDFSCluster ");
-
-               dataDir = tempFolder.newFolder();
-
-               conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
dataDir.getAbsolutePath());
-               MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(conf);
-               hdfsCluster = builder.build();
-
-               dfs = hdfsCluster.getFileSystem();
-
-               hdfsURI = "hdfs://"
-                               + 
NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), 
hdfsCluster.getNameNodePort())
-                               + "/";
-       }
-
-       @AfterClass
-       public static void destroyHDFS() {
-               LOG.info("In RollingSinkITCase: tearing down MiniDFSCluster ");
-               hdfsCluster.shutdown();
-       }
-
-       /**
-        * This tests {@link StringWriter} with
-        * non-rolling output.
-        */
-       @Test
-       public void testNonRollingStringWriter() throws Exception {
-               final int NUM_ELEMENTS = 20;
-               final int PARALLELISM = 2;
-               final String outPath = hdfsURI + "/string-non-rolling-out";
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(PARALLELISM);
-
-               DataStream<Tuple2<Integer, String>> source = env.addSource(new 
TestSourceFunction(NUM_ELEMENTS))
-                               .broadcast()
-                               .filter(new OddEvenFilter());
-
-               RollingSink<String> sink = new RollingSink<String>(outPath)
-                               .setBucketer(new NonRollingBucketer())
-                               .setPartPrefix("part")
-                               .setPendingPrefix("")
-                               .setPendingSuffix("");
-
-               source
-                               .map(new MapFunction<Tuple2<Integer,String>, 
String>() {
-                                       private static final long 
serialVersionUID = 1L;
-                                       @Override
-                                       public String map(Tuple2<Integer, 
String> value) throws Exception {
-                                               return value.f1;
-                                       }
-                               })
-                               .addSink(sink);
-
-               env.execute("RollingSink String Write Test");
-
-               FSDataInputStream inStream = dfs.open(new Path(outPath + 
"/part-0-0"));
-
-               BufferedReader br = new BufferedReader(new 
InputStreamReader(inStream));
-
-               for (int i = 0; i < NUM_ELEMENTS; i += 2) {
-                       String line = br.readLine();
-                       Assert.assertEquals("message #" + i, line);
-               }
-
-               inStream.close();
-
-               inStream = dfs.open(new Path(outPath + "/part-1-0"));
-
-               br = new BufferedReader(new InputStreamReader(inStream));
-
-               for (int i = 1; i < NUM_ELEMENTS; i += 2) {
-                       String line = br.readLine();
-                       Assert.assertEquals("message #" + i, line);
-               }
-
-               inStream.close();
-       }
-
-       /**
-        * This tests {@link SequenceFileWriter}
-        * with non-rolling output and without compression.
-        */
-       @Test
-       public void testNonRollingSequenceFileWithoutCompressionWriter() throws 
Exception {
-               final int NUM_ELEMENTS = 20;
-               final int PARALLELISM = 2;
-               final String outPath = hdfsURI + "/seq-no-comp-non-rolling-out";
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(PARALLELISM);
-
-               DataStream<Tuple2<Integer, String>> source = env.addSource(new 
TestSourceFunction(NUM_ELEMENTS))
-                               .broadcast()
-                               .filter(new OddEvenFilter());
-
-               DataStream<Tuple2<IntWritable, Text>> mapped =  source.map(new 
MapFunction<Tuple2<Integer,String>, Tuple2<IntWritable, Text>>() {
-                       private static final long serialVersionUID = 1L;
-
-                       @Override
-                       public Tuple2<IntWritable, Text> map(Tuple2<Integer, 
String> value) throws Exception {
-                               return Tuple2.of(new IntWritable(value.f0), new 
Text(value.f1));
-                       }
-               });
-
-
-               RollingSink<Tuple2<IntWritable, Text>> sink = new 
RollingSink<Tuple2<IntWritable, Text>>(outPath)
-                               .setWriter(new SequenceFileWriter<IntWritable, 
Text>())
-                               .setBucketer(new NonRollingBucketer())
-                               .setPartPrefix("part")
-                               .setPendingPrefix("")
-                               .setPendingSuffix("");
-
-               mapped.addSink(sink);
-
-               env.execute("RollingSink String Write Test");
-
-               FSDataInputStream inStream = dfs.open(new Path(outPath + 
"/part-0-0"));
-
-               SequenceFile.Reader reader = new SequenceFile.Reader(inStream,
-                               1000,
-                               0,
-                               100000,
-                               new Configuration());
-
-               IntWritable intWritable = new IntWritable();
-               Text txt = new Text();
-
-               for (int i = 0; i < NUM_ELEMENTS; i += 2) {
-                       reader.next(intWritable, txt);
-                       Assert.assertEquals(i, intWritable.get());
-                       Assert.assertEquals("message #" + i, txt.toString());
-               }
-
-               reader.close();
-               inStream.close();
-
-               inStream = dfs.open(new Path(outPath + "/part-1-0"));
-
-               reader = new SequenceFile.Reader(inStream,
-                               1000,
-                               0,
-                               100000,
-                               new Configuration());
-
-               for (int i = 1; i < NUM_ELEMENTS; i += 2) {
-                       reader.next(intWritable, txt);
-                       Assert.assertEquals(i, intWritable.get());
-                       Assert.assertEquals("message #" + i, txt.toString());
-               }
-
-               reader.close();
-               inStream.close();
-       }
-
-       /**
-        * This tests {@link SequenceFileWriter}
-        * with non-rolling output but with compression.
-        */
-       @Test
-       public void testNonRollingSequenceFileWithCompressionWriter() throws 
Exception {
-               final int NUM_ELEMENTS = 20;
-               final int PARALLELISM = 2;
-               final String outPath = hdfsURI + "/seq-non-rolling-out";
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(PARALLELISM);
-
-               DataStream<Tuple2<Integer, String>> source = env.addSource(new 
TestSourceFunction(NUM_ELEMENTS))
-                               .broadcast()
-                               .filter(new OddEvenFilter());
-
-               DataStream<Tuple2<IntWritable, Text>> mapped =  source.map(new 
MapFunction<Tuple2<Integer,String>, Tuple2<IntWritable, Text>>() {
-                       private static final long serialVersionUID = 1L;
-
-                       @Override
-                       public Tuple2<IntWritable, Text> map(Tuple2<Integer, 
String> value) throws Exception {
-                               return Tuple2.of(new IntWritable(value.f0), new 
Text(value.f1));
-                       }
-               });
-
-
-               RollingSink<Tuple2<IntWritable, Text>> sink = new 
RollingSink<Tuple2<IntWritable, Text>>(outPath)
-                               .setWriter(new SequenceFileWriter<IntWritable, 
Text>("Default", SequenceFile.CompressionType.BLOCK))
-                               .setBucketer(new NonRollingBucketer())
-                               .setPartPrefix("part")
-                               .setPendingPrefix("")
-                               .setPendingSuffix("");
-
-               mapped.addSink(sink);
-
-               env.execute("RollingSink String Write Test");
-
-               FSDataInputStream inStream = dfs.open(new Path(outPath + 
"/part-0-0"));
-
-               SequenceFile.Reader reader = new SequenceFile.Reader(inStream,
-                               1000,
-                               0,
-                               100000,
-                               new Configuration());
-
-               IntWritable intWritable = new IntWritable();
-               Text txt = new Text();
-
-               for (int i = 0; i < NUM_ELEMENTS; i += 2) {
-                       reader.next(intWritable, txt);
-                       Assert.assertEquals(i, intWritable.get());
-                       Assert.assertEquals("message #" + i, txt.toString());
-               }
-
-               reader.close();
-               inStream.close();
-
-               inStream = dfs.open(new Path(outPath + "/part-1-0"));
-
-               reader = new SequenceFile.Reader(inStream,
-                               1000,
-                               0,
-                               100000,
-                               new Configuration());
-
-               for (int i = 1; i < NUM_ELEMENTS; i += 2) {
-                       reader.next(intWritable, txt);
-                       Assert.assertEquals(i, intWritable.get());
-                       Assert.assertEquals("message #" + i, txt.toString());
-               }
-
-               reader.close();
-               inStream.close();
-       }
-       
-       
-       /**
-        * This tests {@link AvroKeyValueSinkWriter}
-        * with non-rolling output and without compression.
-        */
-       @Test
-       public void testNonRollingAvroKeyValueWithoutCompressionWriter() throws 
Exception {
-               final int NUM_ELEMENTS = 20;
-               final int PARALLELISM = 2;
-               final String outPath = hdfsURI + 
"/avro-kv-no-comp-non-rolling-out";
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(PARALLELISM);
-
-               DataStream<Tuple2<Integer, String>> source = env.addSource(new 
TestSourceFunction(NUM_ELEMENTS))
-                               .broadcast()
-                               .filter(new OddEvenFilter());
-
-
-               Map<String, String> properties = new HashMap<>();
-               Schema keySchema = Schema.create(Type.INT);
-               Schema valueSchema = Schema.create(Type.STRING);
-               properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, 
keySchema.toString());
-               properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, 
valueSchema.toString());
-               RollingSink<Tuple2<Integer, String>> sink = new 
RollingSink<Tuple2<Integer, String>>(outPath)
-                               .setWriter(new AvroKeyValueSinkWriter<Integer, 
String>(properties))
-                               .setBucketer(new NonRollingBucketer())
-                               .setPartPrefix("part")
-                               .setPendingPrefix("")
-                               .setPendingSuffix("");
-
-               source.addSink(sink);
-
-               env.execute("RollingSink Avro KeyValue Writer Test");
-
-               GenericData.setStringType(valueSchema, StringType.String);
-               Schema elementSchema = AvroKeyValue.getSchema(keySchema, 
valueSchema);
-
-               FSDataInputStream inStream = dfs.open(new Path(outPath + 
"/part-0-0"));
-               SpecificDatumReader<GenericRecord> elementReader = new 
SpecificDatumReader<GenericRecord>(elementSchema);
-               DataFileStream<GenericRecord> dataFileStream = new 
DataFileStream<GenericRecord>(inStream, elementReader);
-               for (int i = 0; i < NUM_ELEMENTS; i += 2) {
-                       AvroKeyValue<Integer, String> wrappedEntry = new 
AvroKeyValue<Integer, String>(dataFileStream.next());
-                       int key = wrappedEntry.getKey().intValue();
-                       Assert.assertEquals(i, key);
-                       String value = wrappedEntry.getValue();
-                       Assert.assertEquals("message #" + i, value);
-               }
-
-               dataFileStream.close();
-               inStream.close();
-
-               inStream = dfs.open(new Path(outPath + "/part-1-0"));
-               dataFileStream = new DataFileStream<GenericRecord>(inStream, 
elementReader);
-
-               for (int i = 1; i < NUM_ELEMENTS; i += 2) {
-                       AvroKeyValue<Integer, String> wrappedEntry = new 
AvroKeyValue<Integer, String>(dataFileStream.next());
-                       int key = wrappedEntry.getKey().intValue();
-                       Assert.assertEquals(i, key);
-                       String value = wrappedEntry.getValue();
-                       Assert.assertEquals("message #" + i, value);
-               }
-
-               dataFileStream.close();
-               inStream.close();
-       }
-       
-       /**
-        * This tests {@link AvroKeyValueSinkWriter}
-        * with non-rolling output and with compression.
-        */
-       @Test
-       public void testNonRollingAvroKeyValueWithCompressionWriter() throws 
Exception {
-               final int NUM_ELEMENTS = 20;
-               final int PARALLELISM = 2;
-               final String outPath = hdfsURI + 
"/avro-kv-no-comp-non-rolling-out";
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(PARALLELISM);
-
-               DataStream<Tuple2<Integer, String>> source = env.addSource(new 
TestSourceFunction(NUM_ELEMENTS))
-                               .broadcast()
-                               .filter(new OddEvenFilter());
-
-
-               Map<String, String> properties = new HashMap<>();
-               Schema keySchema = Schema.create(Type.INT);
-               Schema valueSchema = Schema.create(Type.STRING);
-               properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, 
keySchema.toString());
-               properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, 
valueSchema.toString());
-               properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, 
String.valueOf(true));
-               properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, 
DataFileConstants.SNAPPY_CODEC);
-               RollingSink<Tuple2<Integer, String>> sink = new 
RollingSink<Tuple2<Integer, String>>(outPath)
-                               .setWriter(new AvroKeyValueSinkWriter<Integer, 
String>(properties))
-                               .setBucketer(new NonRollingBucketer())
-                               .setPartPrefix("part")
-                               .setPendingPrefix("")
-                               .setPendingSuffix("");
-
-               source.addSink(sink);
-
-               env.execute("RollingSink Avro KeyValue Writer Test");
-
-               GenericData.setStringType(valueSchema, StringType.String);
-               Schema elementSchema = AvroKeyValue.getSchema(keySchema, 
valueSchema);
-
-               FSDataInputStream inStream = dfs.open(new Path(outPath + 
"/part-0-0"));
-               SpecificDatumReader<GenericRecord> elementReader = new 
SpecificDatumReader<GenericRecord>(elementSchema);
-               DataFileStream<GenericRecord> dataFileStream = new 
DataFileStream<GenericRecord>(inStream, elementReader);
-               for (int i = 0; i < NUM_ELEMENTS; i += 2) {
-                       AvroKeyValue<Integer, String> wrappedEntry = new 
AvroKeyValue<Integer, String>(dataFileStream.next());
-                       int key = wrappedEntry.getKey().intValue();
-                       Assert.assertEquals(i, key);
-                       String value = wrappedEntry.getValue();
-                       Assert.assertEquals("message #" + i, value);
-               }
-
-               dataFileStream.close();
-               inStream.close();
-
-               inStream = dfs.open(new Path(outPath + "/part-1-0"));
-               dataFileStream = new DataFileStream<GenericRecord>(inStream, 
elementReader);
-
-               for (int i = 1; i < NUM_ELEMENTS; i += 2) {
-                       AvroKeyValue<Integer, String> wrappedEntry = new 
AvroKeyValue<Integer, String>(dataFileStream.next());
-                       int key = wrappedEntry.getKey().intValue();
-                       Assert.assertEquals(i, key);
-                       String value = wrappedEntry.getValue();
-                       Assert.assertEquals("message #" + i, value);
-               }
-
-               dataFileStream.close();
-               inStream.close();
-       }
-
-
-       /**
-        * This tests user defined hdfs configuration
-        * @throws Exception
-     */
-       @Test
-       public void testUserDefinedConfiguration() throws Exception {
-               final int NUM_ELEMENTS = 20;
-               final int PARALLELISM = 2;
-               final String outPath = hdfsURI + 
"/string-non-rolling-with-config";
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(PARALLELISM);
-
-               DataStream<Tuple2<Integer, String>> source = env.addSource(new 
TestSourceFunction(NUM_ELEMENTS))
-                       .broadcast()
-                       .filter(new OddEvenFilter());
-
-               Configuration conf = new Configuration();
-               conf.set("io.file.buffer.size", "40960");
-               RollingSink<String> sink = new RollingSink<String>(outPath)
-                       .setFSConfig(conf)
-                       .setWriter(new 
StreamWriterWithConfigCheck<String>("io.file.buffer.size", "40960"))
-                       .setBucketer(new NonRollingBucketer())
-                       .setPartPrefix("part")
-                       .setPendingPrefix("")
-                       .setPendingSuffix("");
-
-               source
-                       .map(new MapFunction<Tuple2<Integer,String>, String>() {
-                               private static final long serialVersionUID = 1L;
-                               @Override
-                               public String map(Tuple2<Integer, String> 
value) throws Exception {
-                                       return value.f1;
-                               }
-                       })
-                       .addSink(sink);
-
-               env.execute("RollingSink with configuration Test");
-
-               FSDataInputStream inStream = dfs.open(new Path(outPath + 
"/part-0-0"));
-
-               BufferedReader br = new BufferedReader(new 
InputStreamReader(inStream));
-
-               for (int i = 0; i < NUM_ELEMENTS; i += 2) {
-                       String line = br.readLine();
-                       Assert.assertEquals("message #" + i, line);
-               }
-
-               inStream.close();
-
-               inStream = dfs.open(new Path(outPath + "/part-1-0"));
-
-               br = new BufferedReader(new InputStreamReader(inStream));
-
-               for (int i = 1; i < NUM_ELEMENTS; i += 2) {
-                       String line = br.readLine();
-                       Assert.assertEquals("message #" + i, line);
-               }
-
-               inStream.close();
-       }
-
-       // we use this to synchronize the clock changes to elements being 
processed
-       final static MultiShotLatch latch1 = new MultiShotLatch();
-       final static MultiShotLatch latch2 = new MultiShotLatch();
-
-       /**
-        * This uses {@link 
org.apache.flink.streaming.connectors.fs.DateTimeBucketer} to
-        * produce rolling files. The clock of DateTimeBucketer is set to
-        * {@link ModifyableClock} to keep the time in lockstep with the 
processing of elements using
-        * latches.
-        */
-       @Test
-       public void testDateTimeRollingStringWriter() throws Exception {
-               final int NUM_ELEMENTS = 20;
-               final int PARALLELISM = 2;
-               final String outPath = hdfsURI + "/rolling-out";
-               DateTimeBucketer.setClock(new ModifyableClock());
-               ModifyableClock.setCurrentTime(0);
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(PARALLELISM);
-
-
-
-               DataStream<Tuple2<Integer, String>> source = env.addSource(new 
WaitingTestSourceFunction(
-                               NUM_ELEMENTS))
-                               .broadcast();
-
-               // the parallel flatMap is chained to the sink, so when it has 
seen 5 elements it can
-               // fire the latch
-               DataStream<String> mapped = source
-                               .flatMap(new 
RichFlatMapFunction<Tuple2<Integer, String>, String>() {
-                                       private static final long 
serialVersionUID = 1L;
-
-                                       int count = 0;
-                                       @Override
-                                       public void flatMap(Tuple2<Integer, 
String> value,
-                                                       Collector<String> out) 
throws Exception {
-                                               out.collect(value.f1);
-                                               count++;
-                                               if (count >= 5) {
-                                                       if 
(getRuntimeContext().getIndexOfThisSubtask() == 0) {
-                                                               
latch1.trigger();
-                                                       } else {
-                                                               
latch2.trigger();
-                                                       }
-                                                       count = 0;
-                                               }
-                                       }
-
-                               });
-
-               RollingSink<String> sink = new RollingSink<String>(outPath)
-                               .setBucketer(new DateTimeBucketer("ss"))
-                               .setPartPrefix("part")
-                               .setPendingPrefix("")
-                               .setPendingSuffix("");
-
-               mapped.addSink(sink);
-
-               env.execute("RollingSink String Write Test");
-
-               RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new 
Path(outPath), true);
-
-               // we should have 8 rolling files, 4 time intervals and 
parallelism of 2
-               int numFiles = 0;
-               while (files.hasNext()) {
-                       LocatedFileStatus file = files.next();
-                       numFiles++;
-                       if 
(file.getPath().toString().contains("rolling-out/00")) {
-                               FSDataInputStream inStream = 
dfs.open(file.getPath());
-
-                               BufferedReader br = new BufferedReader(new 
InputStreamReader(inStream));
-
-                               for (int i = 0; i < 5; i++) {
-                                       String line = br.readLine();
-                                       Assert.assertEquals("message #" + i, 
line);
-                               }
-
-                               inStream.close();
-                       } else if 
(file.getPath().toString().contains("rolling-out/05")) {
-                               FSDataInputStream inStream = 
dfs.open(file.getPath());
-
-                               BufferedReader br = new BufferedReader(new 
InputStreamReader(inStream));
-
-                               for (int i = 5; i < 10; i++) {
-                                       String line = br.readLine();
-                                       Assert.assertEquals("message #" + i, 
line);
-                               }
-
-                               inStream.close();
-                       } else if 
(file.getPath().toString().contains("rolling-out/10")) {
-                               FSDataInputStream inStream = 
dfs.open(file.getPath());
-
-                               BufferedReader br = new BufferedReader(new 
InputStreamReader(inStream));
-
-                               for (int i = 10; i < 15; i++) {
-                                       String line = br.readLine();
-                                       Assert.assertEquals("message #" + i, 
line);
-                               }
-
-                               inStream.close();
-                       } else if 
(file.getPath().toString().contains("rolling-out/15")) {
-                               FSDataInputStream inStream = 
dfs.open(file.getPath());
-
-                               BufferedReader br = new BufferedReader(new 
InputStreamReader(inStream));
-
-                               for (int i = 15; i < 20; i++) {
-                                       String line = br.readLine();
-                                       Assert.assertEquals("message #" + i, 
line);
-                               }
-
-                               inStream.close();
-                       } else {
-                               Assert.fail("File " + file + " does not match 
any expected roll pattern.");
-                       }
-               }
-
-               Assert.assertEquals(8, numFiles);
-       }
-
-       private static final String PART_PREFIX = "part";
-       private static final String PENDING_SUFFIX = ".pending";
-       private static final String IN_PROGRESS_SUFFIX = ".in-progress";
-       private static final String VALID_LENGTH_SUFFIX = ".valid";
-
-       @Test
-       public void testBucketStateTransitions() throws Exception {
-               final File outDir = tempFolder.newFolder();
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createRescalingTestSink(outDir, 1, 0);
-               testHarness.setup();
-               testHarness.open();
-
-               testHarness.setProcessingTime(0L);
-
-               // we have a bucket size of 5 bytes, so each record will get 
its own bucket,
-               // i.e. the bucket should roll after every record.
-
-               testHarness.processElement(new StreamRecord<>("test1", 1L));
-               testHarness.processElement(new StreamRecord<>("test2", 1L));
-               checkFs(outDir, 1, 1 ,0, 0);
-
-               testHarness.processElement(new StreamRecord<>("test3", 1L));
-               checkFs(outDir, 1, 2, 0, 0);
-
-               testHarness.snapshot(0, 0);
-               checkFs(outDir, 1, 2, 0, 0);
-
-               testHarness.notifyOfCompletedCheckpoint(0);
-               checkFs(outDir, 1, 0, 2, 0);
-
-               OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
-
-               testHarness.close();
-               checkFs(outDir, 0, 1, 2, 0);
-
-               testHarness = createRescalingTestSink(outDir, 1, 0);
-               testHarness.setup();
-               testHarness.initializeState(snapshot);
-               testHarness.open();
-               checkFs(outDir, 0, 0, 3, 1);
-
-               snapshot = testHarness.snapshot(2, 0);
-
-               testHarness.processElement(new StreamRecord<>("test4", 10));
-               checkFs(outDir, 1, 0, 3, 1);
-
-               testHarness = createRescalingTestSink(outDir, 1, 0);
-               testHarness.setup();
-               testHarness.initializeState(snapshot);
-               testHarness.open();
-
-               // the in-progress file remains as we do not clean up now
-               checkFs(outDir, 1, 0, 3, 1);
-
-               testHarness.close();
-
-               // at close it is not moved to final because it is not part
-               // of the current task's state, it was just a not cleaned up 
leftover.
-               checkFs(outDir, 1, 0, 3, 1);
-       }
-
-       @Test
-       public void testScalingDown() throws Exception {
-               final File outDir = tempFolder.newFolder();
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness1 
= createRescalingTestSink(outDir, 3, 0);
-               testHarness1.setup();
-               testHarness1.open();
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness2 
= createRescalingTestSink(outDir, 3, 1);
-               testHarness2.setup();
-               testHarness2.open();
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness3 
= createRescalingTestSink(outDir, 3, 2);
-               testHarness3.setup();
-               testHarness3.open();
-
-               testHarness1.processElement(new StreamRecord<>("test1", 0L));
-               checkFs(outDir, 1, 0, 0, 0);
-
-               testHarness2.processElement(new StreamRecord<>("test2", 0L));
-               testHarness2.processElement(new StreamRecord<>("test3", 0L));
-               testHarness2.processElement(new StreamRecord<>("test4", 0L));
-               testHarness2.processElement(new StreamRecord<>("test5", 0L));
-               testHarness2.processElement(new StreamRecord<>("test6", 0L));
-               checkFs(outDir, 2, 4, 0, 0);
-
-               testHarness3.processElement(new StreamRecord<>("test7", 0L));
-               testHarness3.processElement(new StreamRecord<>("test8", 0L));
-               checkFs(outDir, 3, 5, 0, 0);
-
-               // intentionally we snapshot them in a not ascending order so 
that the states are shuffled
-               OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
-                       testHarness3.snapshot(0, 0),
-                       testHarness1.snapshot(0, 0),
-                       testHarness2.snapshot(0, 0)
-               );
-
-               // with the above state reshuffling, we expect testHarness4 to 
take the
-               // state of the previous testHarness3 and testHarness1 while 
testHarness5
-               // will take that of the previous testHarness1
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness4 
= createRescalingTestSink(outDir, 2, 0);
-               testHarness4.setup();
-               testHarness4.initializeState(mergedSnapshot);
-               testHarness4.open();
-
-               // we do not have a length file for part-2-0 because bucket 
part-2-0
-               // was not "in-progress", but "pending" (its full content is 
valid).
-               checkFs(outDir, 1, 4, 3, 2);
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness5 
= createRescalingTestSink(outDir, 2, 1);
-               testHarness5.setup();
-               testHarness5.initializeState(mergedSnapshot);
-               testHarness5.open();
-
-               checkFs(outDir, 0, 0, 8, 3);
-       }
-
-       @Test
-       public void testScalingUp() throws Exception {
-               final File outDir = tempFolder.newFolder();
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness1 
= createRescalingTestSink(outDir, 2, 0);
-               testHarness1.setup();
-               testHarness1.open();
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness2 
= createRescalingTestSink(outDir, 2, 0);
-               testHarness2.setup();
-               testHarness2.open();
-
-               testHarness1.processElement(new StreamRecord<>("test1", 0L));
-               testHarness1.processElement(new StreamRecord<>("test2", 0L));
-
-               checkFs(outDir, 1, 1, 0, 0);
-
-               testHarness2.processElement(new StreamRecord<>("test3", 0L));
-               testHarness2.processElement(new StreamRecord<>("test4", 0L));
-               testHarness2.processElement(new StreamRecord<>("test5", 0L));
-
-               checkFs(outDir, 2, 3, 0, 0);
-
-               // intentionally we snapshot them in the reverse order so that 
the states are shuffled
-               OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
-                       testHarness2.snapshot(0, 0),
-                       testHarness1.snapshot(0, 0)
-               );
-
-               testHarness1 = createRescalingTestSink(outDir, 3, 0);
-               testHarness1.setup();
-               testHarness1.initializeState(mergedSnapshot);
-               testHarness1.open();
-
-               checkFs(outDir, 1, 1, 3, 1);
-
-               testHarness2 = createRescalingTestSink(outDir, 3, 1);
-               testHarness2.setup();
-               testHarness2.initializeState(mergedSnapshot);
-               testHarness2.open();
-
-               checkFs(outDir, 0, 0, 5, 2);
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness3 
= createRescalingTestSink(outDir, 3, 2);
-               testHarness3.setup();
-               testHarness3.initializeState(mergedSnapshot);
-               testHarness3.open();
-
-               checkFs(outDir, 0, 0, 5, 2);
-
-               testHarness1.processElement(new StreamRecord<>("test6", 0));
-               testHarness2.processElement(new StreamRecord<>("test6", 0));
-               testHarness3.processElement(new StreamRecord<>("test6", 0));
-
-               // 3 for the different tasks
-               checkFs(outDir, 3, 0, 5, 2);
-
-               testHarness1.snapshot(1, 0);
-               testHarness2.snapshot(1, 0);
-               testHarness3.snapshot(1, 0);
-
-               testHarness1.close();
-               testHarness2.close();
-               testHarness3.close();
-
-               checkFs(outDir, 0, 3, 5, 2);
-       }
-
-       private OneInputStreamOperatorTestHarness<String, Object> 
createRescalingTestSink(
-               File outDir, int totalParallelism, int taskIdx) throws 
Exception {
-
-               RollingSink<String> sink = new 
RollingSink<String>(outDir.getAbsolutePath())
-                       .setWriter(new StringWriter<String>())
-                       .setBatchSize(5)
-                       .setPartPrefix(PART_PREFIX)
-                       .setInProgressPrefix("")
-                       .setPendingPrefix("")
-                       .setValidLengthPrefix("")
-                       .setInProgressSuffix(IN_PROGRESS_SUFFIX)
-                       .setPendingSuffix(PENDING_SUFFIX)
-                       .setValidLengthSuffix(VALID_LENGTH_SUFFIX);
-
-               return createTestSink(sink, totalParallelism, taskIdx);
-       }
-
-       private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink(
-               RollingSink<T> sink, int totalParallelism, int taskIdx) throws 
Exception {
-               return new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink), 10, totalParallelism, taskIdx);
-       }
-
-       private void checkFs(File outDir, int inprogress, int pending, int 
completed, int valid) throws IOException {
-               int inProg = 0;
-               int pend = 0;
-               int compl = 0;
-               int val = 0;
-
-               for (File file: FileUtils.listFiles(outDir, null, true)) {
-                       if (file.getAbsolutePath().endsWith("crc")) {
-                               continue;
-                       }
-                       String path = file.getPath();
-                       if (path.endsWith(IN_PROGRESS_SUFFIX)) {
-                               inProg++;
-                       } else if (path.endsWith(PENDING_SUFFIX)) {
-                               pend++;
-                       } else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
-                               val++;
-                       } else if (path.contains(PART_PREFIX)) {
-                               compl++;
-                       }
-               }
-
-               Assert.assertEquals(inprogress, inProg);
-               Assert.assertEquals(pending, pend);
-               Assert.assertEquals(completed, compl);
-               Assert.assertEquals(valid, val);
-       }
-
-       private static class TestSourceFunction implements 
SourceFunction<Tuple2<Integer, String>> {
-               private static final long serialVersionUID = 1L;
-
-               private volatile boolean running = true;
-
-               private final int numElements;
-
-               public TestSourceFunction(int numElements) {
-                       this.numElements = numElements;
-               }
-
-               @Override
-               public void run(SourceContext<Tuple2<Integer, String>> ctx) 
throws Exception {
-                       for (int i = 0; i < numElements && running; i++) {
-                               ctx.collect(Tuple2.of(i, "message #" + i));
-                       }
-               }
-
-               @Override
-               public void cancel() {
-                       running = false;
-               }
-       }
-
-       /**
-        * This waits on the two multi-shot latches. The latches are triggered 
in a parallel
-        * flatMap inside the test topology.
-        */
-       private static class WaitingTestSourceFunction implements 
SourceFunction<Tuple2<Integer, String>> {
-               private static final long serialVersionUID = 1L;
-
-               private volatile boolean running = true;
-
-               private final int numElements;
-
-               public WaitingTestSourceFunction(int numElements) {
-                       this.numElements = numElements;
-               }
-
-               @Override
-               public void run(SourceContext<Tuple2<Integer, String>> ctx) 
throws Exception {
-                       for (int i = 0; i < numElements && running; i++) {
-                               if (i % 5 == 0 && i > 0) {
-                                       // update the clock after "five 
seconds", so we get 20 seconds in total
-                                       // with 5 elements in each time window
-                                       latch1.await();
-                                       latch2.await();
-                                       ModifyableClock.setCurrentTime(i * 
1000);
-                               }
-                               ctx.collect(Tuple2.of(i, "message #" + i));
-                       }
-               }
-
-               @Override
-               public void cancel() {
-                       running = false;
-               }
-       }
-
-
-       private static class StreamWriterWithConfigCheck<T> extends 
StringWriter<T> {
-               private String key;
-               private String expect;
-               public StreamWriterWithConfigCheck(String key, String expect) {
-                       this.key = key;
-                       this.expect = expect;
-               }
-
-               @Override
-               public void open(FileSystem fs, Path path) throws IOException {
-                       super.open(fs, path);
-                       Assert.assertEquals(expect, fs.getConf().get(key));
-               }
-
-               @Override
-               public Writer<T> duplicate() {
-                       return new StreamWriterWithConfigCheck<>(key, expect);
-               }
-       }
-
-       public static class OddEvenFilter extends 
RichFilterFunction<Tuple2<Integer, String>> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public boolean filter(Tuple2<Integer, String> value) throws 
Exception {
-                       if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
-                               return value.f0 % 2 == 0;
-                       } else {
-                               return value.f0 % 2 == 1;
-                       }
-               }
-       }
-
-       public static class ModifyableClock implements Clock {
-
-               private static volatile long currentTime = 0;
-
-               public static void setCurrentTime(long currentTime) {
-                       ModifyableClock.currentTime = currentTime;
-               }
-
-               @Override
-               public long currentTimeMillis() {
-                       return currentTime;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
deleted file mode 100644
index eb12d07..0000000
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.fs;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.security.SecurityUtils;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.test.util.SecureTestEnvironment;
-import org.apache.flink.test.util.TestingSecurityContext;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.apache.flink.util.NetUtils;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.http.HttpConfig;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.VersionInfo;
-import org.junit.AfterClass;
-import org.junit.Assume;
-import org.junit.BeforeClass;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
-
-/**
- * Tests for running {@link RollingSinkSecuredITCase} which is an extension of 
{@link RollingSink} in secure environment
- * Note: only executed for Hadoop version > 3.x.x
- */
-public class RollingSinkSecuredITCase extends RollingSinkITCase {
-
-       protected static final Logger LOG = 
LoggerFactory.getLogger(RollingSinkSecuredITCase.class);
-
-       /**
-        * Skips all tests if the Hadoop version doesn't match.
-        * We can't run this test class until HDFS-9213 is fixed which allows a 
secure DataNode
-        * to bind to non-privileged ports for testing.
-        * For now, we skip this test class until Hadoop version 3.x.x.
-        */
-       private static void skipIfHadoopVersionIsNotAppropriate() {
-               // Skips all tests if the Hadoop version doesn't match
-               String hadoopVersionString = VersionInfo.getVersion();
-               String[] split = hadoopVersionString.split("\\.");
-               if (split.length != 3) {
-                       throw new IllegalStateException("Hadoop version was not 
of format 'X.X.X': " + hadoopVersionString);
-               }
-               Assume.assumeTrue(
-                       // check whether we're running Hadoop version >= 3.x.x
-                       Integer.parseInt(split[0]) >= 3
-               );
-       }
-
-       /*
-        * override super class static methods to avoid creating MiniDFS and 
MiniFlink with wrong configurations
-        * and out-of-order sequence for secure cluster
-        */
-       @BeforeClass
-       public static void setup() throws Exception {}
-
-       @AfterClass
-       public static void teardown() throws Exception {}
-
-       @BeforeClass
-       public static void createHDFS() throws IOException {}
-
-       @AfterClass
-       public static void destroyHDFS() {}
-
-       @BeforeClass
-       public static void startSecureCluster() throws Exception {
-
-               skipIfHadoopVersionIsNotAppropriate();
-
-               LOG.info("starting secure cluster environment for testing");
-
-               dataDir = tempFolder.newFolder();
-
-               conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
dataDir.getAbsolutePath());
-
-               SecureTestEnvironment.prepare(tempFolder);
-
-               populateSecureConfigurations();
-
-               Configuration flinkConfig = new Configuration();
-               flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,
-                               SecureTestEnvironment.getTestKeytab());
-               flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,
-                               
SecureTestEnvironment.getHadoopServicePrincipal());
-
-               SecurityUtils.SecurityConfiguration ctx = new 
SecurityUtils.SecurityConfiguration(flinkConfig);
-               ctx.setHadoopConfiguration(conf);
-               try {
-                       TestingSecurityContext.install(ctx, 
SecureTestEnvironment.getClientSecurityConfigurationMap());
-               } catch (Exception e) {
-                       throw new RuntimeException("Exception occurred while 
setting up secure test context. Reason: {}", e);
-               }
-
-               File hdfsSiteXML = new File(dataDir.getAbsolutePath() + 
"/hdfs-site.xml");
-
-               FileWriter writer = new FileWriter(hdfsSiteXML);
-               conf.writeXml(writer);
-               writer.flush();
-               writer.close();
-
-               Map<String, String> map = new HashMap<String, 
String>(System.getenv());
-               map.put("HADOOP_CONF_DIR", 
hdfsSiteXML.getParentFile().getAbsolutePath());
-               TestBaseUtils.setEnv(map);
-
-
-               MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(conf);
-               builder.checkDataNodeAddrConfig(true);
-               builder.checkDataNodeHostConfig(true);
-               hdfsCluster = builder.build();
-
-               dfs = hdfsCluster.getFileSystem();
-
-               hdfsURI = "hdfs://"
-                               + 
NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), 
hdfsCluster.getNameNodePort())
-                               + "/";
-
-               startSecureFlinkClusterWithRecoveryModeEnabled();
-       }
-
-       @AfterClass
-       public static void teardownSecureCluster() throws Exception {
-               LOG.info("tearing down secure cluster environment");
-
-               TestStreamEnvironment.unsetAsContext();
-               stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
-
-               if (hdfsCluster != null) {
-                       hdfsCluster.shutdown();
-               }
-
-               SecureTestEnvironment.cleanup();
-       }
-
-       private static void populateSecureConfigurations() {
-
-               String dataTransferProtection = "authentication";
-
-               
SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS,
 conf);
-               conf.set(DFS_NAMENODE_USER_NAME_KEY, 
SecureTestEnvironment.getHadoopServicePrincipal());
-               conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, 
SecureTestEnvironment.getTestKeytab());
-               conf.set(DFS_DATANODE_USER_NAME_KEY, 
SecureTestEnvironment.getHadoopServicePrincipal());
-               conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, 
SecureTestEnvironment.getTestKeytab());
-               conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, 
SecureTestEnvironment.getHadoopServicePrincipal());
-
-               conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
-
-               conf.set("dfs.data.transfer.protection", 
dataTransferProtection);
-
-               conf.set(DFS_HTTP_POLICY_KEY, 
HttpConfig.Policy.HTTP_ONLY.name());
-
-               conf.set(DFS_ENCRYPT_DATA_TRANSFER_KEY, "false");
-
-               conf.setInt("dfs.datanode.socket.write.timeout", 0);
-
-               /*
-                * We ae setting the port number to privileged port - see 
HDFS-9213
-                * This requires the user to have root privilege to bind to the 
port
-                * Use below command (ubuntu) to set privilege to java process 
for the
-                * bind() to work if the java process is not running as root.
-                * setcap 'cap_net_bind_service=+ep' /path/to/java
-                */
-               conf.set(DFS_DATANODE_ADDRESS_KEY, "localhost:1002");
-               conf.set(DFS_DATANODE_HOST_NAME_KEY, "localhost");
-               conf.set(DFS_DATANODE_HTTP_ADDRESS_KEY, "localhost:1003");
-       }
-
-       private static void startSecureFlinkClusterWithRecoveryModeEnabled() {
-               try {
-                       LOG.info("Starting Flink and ZK in secure mode");
-
-                       dfs.mkdirs(new Path("/flink/checkpoints"));
-                       dfs.mkdirs(new Path("/flink/recovery"));
-
-                       org.apache.flink.configuration.Configuration config = 
new org.apache.flink.configuration.Configuration();
-
-                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
DEFAULT_PARALLELISM);
-                       
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
-                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
-                       config.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
-                       config.setString(ConfigConstants.STATE_BACKEND, 
"filesystem");
-                       
config.setString(ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + 
"/flink/checkpoints");
-                       
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + 
"/flink/recovery");
-                       config.setString("state.backend.fs.checkpointdir", 
hdfsURI + "/flink/checkpoints");
-
-                       
SecureTestEnvironment.populateFlinkSecureConfigurations(config);
-
-                       cluster = TestBaseUtils.startCluster(config, false);
-                       TestStreamEnvironment.setAsContext(cluster, 
DEFAULT_PARALLELISM);
-
-               } catch (Exception e) {
-                       throw new RuntimeException(e);
-               }
-       }
-
-       /* For secure cluster testing, it is enough to run only one test and 
override below test methods
-        * to keep the overall build time minimal
-        */
-       @Override
-       public void testNonRollingSequenceFileWithoutCompressionWriter() throws 
Exception {}
-
-       @Override
-       public void testNonRollingSequenceFileWithCompressionWriter() throws 
Exception {}
-
-       @Override
-       public void testNonRollingAvroKeyValueWithoutCompressionWriter() throws 
Exception {}
-
-       @Override
-       public void testNonRollingAvroKeyValueWithCompressionWriter() throws 
Exception {}
-
-       @Override
-       public void testDateTimeRollingStringWriter() throws Exception {}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
deleted file mode 100644
index 54703a3..0000000
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs.bucketing;
-
-import com.google.common.collect.Sets;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
-import org.apache.flink.util.NetUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.BufferedReader;
-import java.util.HashSet;
-import java.util.Random;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for {@link BucketingSink}.
- *
- * <p>
- * This test only verifies the exactly once behaviour of the sink. Another 
test tests the
- * rolling behaviour.
- */
-public class BucketingSinkFaultToleranceITCase extends 
StreamFaultToleranceTestBase {
-
-       final long NUM_STRINGS = 16_000;
-
-       @ClassRule
-       public static TemporaryFolder tempFolder = new TemporaryFolder();
-
-       private static MiniDFSCluster hdfsCluster;
-       private static org.apache.hadoop.fs.FileSystem dfs;
-
-       private static String outPath;
-
-       private static final String PENDING_SUFFIX = ".pending";
-       private static final String IN_PROGRESS_SUFFIX = ".in-progress";
-
-       @BeforeClass
-       public static void createHDFS() throws IOException {
-               Configuration conf = new Configuration();
-
-               File dataDir = tempFolder.newFolder();
-
-               conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
dataDir.getAbsolutePath());
-               MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(conf);
-               hdfsCluster = builder.build();
-
-               dfs = hdfsCluster.getFileSystem();
-
-               outPath = "hdfs://"
-                               + 
NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), 
hdfsCluster.getNameNodePort())
-                               + "/string-non-rolling-out";
-       }
-
-       @AfterClass
-       public static void destroyHDFS() {
-               if (hdfsCluster != null) {
-                       hdfsCluster.shutdown();
-               }
-       }
-
-       @Override
-       public void testProgram(StreamExecutionEnvironment env) {
-               assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
-
-               int PARALLELISM = 12;
-
-               env.enableCheckpointing(20);
-               env.setParallelism(PARALLELISM);
-               env.disableOperatorChaining();
-
-               DataStream<String> stream = env.addSource(new 
StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain();
-
-               DataStream<String> mapped = stream
-                               .map(new 
OnceFailingIdentityMapper(NUM_STRINGS));
-
-               BucketingSink<String> sink = new BucketingSink<String>(outPath)
-                               .setBucketer(new BasePathBucketer<String>())
-                               .setBatchSize(10000)
-                               .setValidLengthPrefix("")
-                               .setPendingPrefix("")
-                               .setPendingSuffix(PENDING_SUFFIX)
-                               .setInProgressSuffix(IN_PROGRESS_SUFFIX);
-
-               mapped.addSink(sink);
-
-       }
-
-       @Override
-       public void postSubmit() throws Exception {
-               // We read the files and verify that we have read all the 
strings. If a valid-length
-               // file exists we only read the file to that point. (This test 
should work with
-               // FileSystems that support truncate() and with others as well.)
-
-               Pattern messageRegex = Pattern.compile("message (\\d*)");
-
-               // Keep a set of the message IDs that we read. The size must 
equal the read count and
-               // the NUM_STRINGS. If numRead is bigger than the size of the 
set we have seen some
-               // elements twice.
-               Set<Integer> readNumbers = Sets.newHashSet();
-
-               HashSet<String> uniqMessagesRead = new HashSet<>();
-               HashSet<String> messagesInCommittedFiles = new HashSet<>();
-
-               RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new 
Path(
-                               outPath), true);
-
-               while (files.hasNext()) {
-                       LocatedFileStatus file = files.next();
-
-                       if 
(!file.getPath().toString().endsWith(".valid-length")) {
-                               int validLength = (int) file.getLen();
-                               if 
(dfs.exists(file.getPath().suffix(".valid-length"))) {
-                                       FSDataInputStream inStream = 
dfs.open(file.getPath().suffix(".valid-length"));
-                                       String validLengthString = 
inStream.readUTF();
-                                       validLength = 
Integer.parseInt(validLengthString);
-                                       System.out.println("VALID LENGTH: " + 
validLength);
-                               }
-                               FSDataInputStream inStream = 
dfs.open(file.getPath());
-                               byte[] buffer = new byte[validLength];
-                               inStream.readFully(0, buffer, 0, validLength);
-                               inStream.close();
-
-                               ByteArrayInputStream bais = new 
ByteArrayInputStream(buffer);
-
-                               InputStreamReader inStreamReader = new 
InputStreamReader(bais);
-                               BufferedReader br = new 
BufferedReader(inStreamReader);
-
-                               String line = br.readLine();
-                               while (line != null) {
-                                       Matcher matcher = 
messageRegex.matcher(line);
-                                       if (matcher.matches()) {
-                                               uniqMessagesRead.add(line);
-
-                                               // check that in the committed 
files there are no duplicates
-                                               if 
(!file.getPath().toString().endsWith(IN_PROGRESS_SUFFIX) && 
!file.getPath().toString().endsWith(PENDING_SUFFIX)) {
-                                                       if 
(!messagesInCommittedFiles.add(line)) {
-                                                               
Assert.fail("Duplicate entry in committed bucket.");
-                                                       }
-                                               }
-
-                                               int messageId = 
Integer.parseInt(matcher.group(1));
-                                               readNumbers.add(messageId);
-                                       } else {
-                                               Assert.fail("Read line does not 
match expected pattern.");
-                                       }
-                                       line = br.readLine();
-                               }
-                               br.close();
-                               inStreamReader.close();
-                               bais.close();
-                       }
-               }
-
-               // Verify that we read all strings (at-least-once)
-               Assert.assertEquals(NUM_STRINGS, readNumbers.size());
-
-               // Verify that we don't have duplicates (boom!, exactly-once)
-               Assert.assertEquals(NUM_STRINGS, uniqMessagesRead.size());
-       }
-
-       private static class OnceFailingIdentityMapper extends 
RichMapFunction<String, String> {
-               private static final long serialVersionUID = 1L;
-
-               private static volatile boolean hasFailed = false;
-
-               private final long numElements;
-
-               private long failurePos;
-               private long count;
-
-
-               OnceFailingIdentityMapper(long numElements) {
-                       this.numElements = numElements;
-               }
-
-               @Override
-               public void open(org.apache.flink.configuration.Configuration 
parameters) throws IOException {
-                       long failurePosMin = (long) (0.7 * numElements / 
getRuntimeContext().getNumberOfParallelSubtasks());
-                       long failurePosMax = (long) (0.9 * numElements / 
getRuntimeContext().getNumberOfParallelSubtasks());
-
-                       failurePos = (new Random().nextLong() % (failurePosMax 
- failurePosMin)) + failurePosMin;
-                       count = 0;
-               }
-
-               @Override
-               public String map(String value) throws Exception {
-                       count++;
-                       if (!hasFailed && count >= failurePos) {
-                               hasFailed = true;
-                               throw new Exception("Test Failure");
-                       }
-
-                       return value;
-               }
-       }
-
-       private static class StringGeneratingSourceFunction extends 
RichParallelSourceFunction<String>
-                       implements CheckpointedAsynchronously<Integer> {
-
-               private static final long serialVersionUID = 1L;
-
-               private final long numElements;
-
-               private int index;
-
-               private volatile boolean isRunning = true;
-
-
-               StringGeneratingSourceFunction(long numElements) {
-                       this.numElements = numElements;
-               }
-
-               @Override
-               public void run(SourceContext<String> ctx) throws Exception {
-                       final Object lockingObject = ctx.getCheckpointLock();
-
-                       final int step = 
getRuntimeContext().getNumberOfParallelSubtasks();
-
-                       if (index == 0) {
-                               index = 
getRuntimeContext().getIndexOfThisSubtask();
-                       }
-
-                       while (isRunning && index < numElements) {
-
-                               Thread.sleep(1);
-                               synchronized (lockingObject) {
-                                       ctx.collect("message " + index);
-                                       index += step;
-                               }
-                       }
-               }
-
-               @Override
-               public void cancel() {
-                       isRunning = false;
-               }
-
-               private static String randomString(StringBuilder bld, Random 
rnd) {
-                       final int len = rnd.nextInt(10) + 5;
-
-                       for (int i = 0; i < len; i++) {
-                               char next = (char) (rnd.nextInt(20000) + 33);
-                               bld.append(next);
-                       }
-
-                       return bld.toString();
-               }
-
-               @Override
-               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return index;
-               }
-
-               @Override
-               public void restoreState(Integer state) {
-                       index = state;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
deleted file mode 100644
index d671874..0000000
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ /dev/null
@@ -1,867 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs.bucketing;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileConstants;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter;
-import org.apache.flink.streaming.connectors.fs.Clock;
-import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
-import org.apache.flink.streaming.connectors.fs.StringWriter;
-import org.apache.flink.streaming.connectors.fs.Writer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.util.NetUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.Assert;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.HashMap;
-import java.util.Map;
-
-public class BucketingSinkTest {
-       @ClassRule
-       public static TemporaryFolder tempFolder = new TemporaryFolder();
-
-       private static MiniDFSCluster hdfsCluster;
-       private static org.apache.hadoop.fs.FileSystem dfs;
-       private static String hdfsURI;
-
-       private static final String PART_PREFIX = "part";
-       private static final String PENDING_SUFFIX = ".pending";
-       private static final String IN_PROGRESS_SUFFIX = ".in-progress";
-       private static final String VALID_LENGTH_SUFFIX = ".valid";
-
-       private OneInputStreamOperatorTestHarness<String, Object> 
createRescalingTestSink(
-               File outDir, int totalParallelism, int taskIdx, long 
inactivityInterval) throws Exception {
-
-               BucketingSink<String> sink = new 
BucketingSink<String>(outDir.getAbsolutePath())
-                       .setBucketer(new Bucketer<String>() {
-                               private static final long serialVersionUID = 1L;
-
-                               @Override
-                               public Path getBucketPath(Clock clock, Path 
basePath, String element) {
-                                       return new Path(basePath, element);
-                               }
-                       })
-                       .setWriter(new StringWriter<String>())
-                       .setInactiveBucketCheckInterval(inactivityInterval)
-                       .setInactiveBucketThreshold(inactivityInterval)
-                       .setPartPrefix(PART_PREFIX)
-                       .setInProgressPrefix("")
-                       .setPendingPrefix("")
-                       .setValidLengthPrefix("")
-                       .setInProgressSuffix(IN_PROGRESS_SUFFIX)
-                       .setPendingSuffix(PENDING_SUFFIX)
-                       .setValidLengthSuffix(VALID_LENGTH_SUFFIX);
-
-               return createTestSink(sink, totalParallelism, taskIdx);
-       }
-
-       private OneInputStreamOperatorTestHarness<String, Object> 
createTestSink(File dataDir, int totalParallelism, int taskIdx) throws 
Exception {
-               BucketingSink<String> sink = new 
BucketingSink<String>(dataDir.getAbsolutePath())
-                       .setBucketer(new Bucketer<String>() {
-                               private static final long serialVersionUID = 1L;
-
-                               @Override
-                               public Path getBucketPath(Clock clock, Path 
basePath, String element) {
-                                       return new Path(basePath, element);
-                               }
-                       })
-                       .setWriter(new StringWriter<String>())
-                       .setPartPrefix(PART_PREFIX)
-                       .setPendingPrefix("")
-                       .setInactiveBucketCheckInterval(5*60*1000L)
-                       .setInactiveBucketThreshold(5*60*1000L)
-                       .setPendingSuffix(PENDING_SUFFIX)
-                       .setInProgressSuffix(IN_PROGRESS_SUFFIX);
-
-               return createTestSink(sink, totalParallelism, taskIdx);
-       }
-
-       private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink(
-                       BucketingSink<T> sink, int totalParallelism, int 
taskIdx) throws Exception {
-               return new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink), 10, totalParallelism, taskIdx);
-       }
-
-       @BeforeClass
-       public static void createHDFS() throws IOException {
-               Configuration conf = new Configuration();
-
-               File dataDir = tempFolder.newFolder();
-
-               conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
dataDir.getAbsolutePath());
-               MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(conf);
-               hdfsCluster = builder.build();
-
-               dfs = hdfsCluster.getFileSystem();
-
-               hdfsURI = "hdfs://"
-                       + 
NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), 
hdfsCluster.getNameNodePort())
-                       + "/";
-       }
-
-       @AfterClass
-       public static void destroyHDFS() {
-               hdfsCluster.shutdown();
-       }
-
-       @Test
-       public void testInactivityPeriodWithLateNotify() throws Exception {
-               final File outDir = tempFolder.newFolder();
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createRescalingTestSink(outDir, 1, 0, 100);
-               testHarness.setup();
-               testHarness.open();
-
-               testHarness.setProcessingTime(0L);
-
-               testHarness.processElement(new StreamRecord<>("test1", 1L));
-               testHarness.processElement(new StreamRecord<>("test2", 1L));
-               checkFs(outDir, 2, 0 ,0, 0);
-
-               testHarness.setProcessingTime(101L);    // put some in pending
-               checkFs(outDir, 0, 2, 0, 0);
-
-               testHarness.snapshot(0, 0);                             // put 
them in pending for 0
-               checkFs(outDir, 0, 2, 0, 0);
-
-               testHarness.processElement(new StreamRecord<>("test3", 1L));
-               testHarness.processElement(new StreamRecord<>("test4", 1L));
-
-               testHarness.setProcessingTime(202L);    // put some in pending
-
-               testHarness.snapshot(1, 0);                             // put 
them in pending for 1
-               checkFs(outDir, 0, 4, 0, 0);
-
-               testHarness.notifyOfCompletedCheckpoint(0);     // put the 
pending for 0 to the "committed" state
-               checkFs(outDir, 0, 2, 2, 0);
-
-               testHarness.notifyOfCompletedCheckpoint(1); // put the pending 
for 1 to the "committed" state
-               checkFs(outDir, 0, 0, 4, 0);
-       }
-
-       @Test
-       public void testBucketStateTransitions() throws Exception {
-               final File outDir = tempFolder.newFolder();
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createRescalingTestSink(outDir, 1, 0, 100);
-               testHarness.setup();
-               testHarness.open();
-
-               testHarness.setProcessingTime(0L);
-
-               testHarness.processElement(new StreamRecord<>("test1", 1L));
-               testHarness.processElement(new StreamRecord<>("test2", 1L));
-               checkFs(outDir, 2, 0 ,0, 0);
-
-               // this is to check the inactivity threshold
-               testHarness.setProcessingTime(101L);
-               checkFs(outDir, 0, 2, 0, 0);
-
-               testHarness.processElement(new StreamRecord<>("test3", 1L));
-               checkFs(outDir, 1, 2, 0, 0);
-
-               testHarness.snapshot(0, 0);
-               checkFs(outDir, 1, 2, 0, 0);
-
-               testHarness.notifyOfCompletedCheckpoint(0);
-               checkFs(outDir, 1, 0, 2, 0);
-
-               OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
-
-               testHarness.close();
-               checkFs(outDir, 0, 1, 2, 0);
-
-               testHarness = createRescalingTestSink(outDir, 1, 0, 100);
-               testHarness.setup();
-               testHarness.initializeState(snapshot);
-               testHarness.open();
-               checkFs(outDir, 0, 0, 3, 1);
-
-               snapshot = testHarness.snapshot(2, 0);
-
-               testHarness.processElement(new StreamRecord<>("test4", 10));
-               checkFs(outDir, 1, 0, 3, 1);
-
-               testHarness = createRescalingTestSink(outDir, 1, 0, 100);
-               testHarness.setup();
-               testHarness.initializeState(snapshot);
-               testHarness.open();
-
-               // the in-progress file remains as we do not clean up now
-               checkFs(outDir, 1, 0, 3, 1);
-
-               testHarness.close();
-
-               // at close it is not moved to final because it is not part
-               // of the current task's state, it was just a not cleaned up 
leftover.
-               checkFs(outDir, 1, 0, 3, 1);
-       }
-
-       @Test
-       public void testSameParallelismWithShufflingStates() throws Exception {
-               final File outDir = tempFolder.newFolder();
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness1 
= createRescalingTestSink(outDir, 2, 0, 100);
-               testHarness1.setup();
-               testHarness1.open();
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness2 
= createRescalingTestSink(outDir, 2, 1, 100);
-               testHarness2.setup();
-               testHarness2.open();
-
-               testHarness1.processElement(new StreamRecord<>("test1", 0L));
-               checkFs(outDir, 1, 0, 0, 0);
-
-               testHarness2.processElement(new StreamRecord<>("test2", 0L));
-               checkFs(outDir, 2, 0, 0, 0);
-
-               // intentionally we snapshot them in the reverse order so that 
the states are shuffled
-               OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
-                       testHarness2.snapshot(0, 0),
-                       testHarness1.snapshot(0, 0)
-               );
-
-               checkFs(outDir, 2, 0, 0, 0);
-
-               // this will not be included in any checkpoint so it can be 
cleaned up (although we do not)
-               testHarness2.processElement(new StreamRecord<>("test3", 0L));
-               checkFs(outDir, 3, 0, 0, 0);
-
-               testHarness1 = createRescalingTestSink(outDir, 2, 0, 100);
-               testHarness1.setup();
-               testHarness1.initializeState(mergedSnapshot);
-               testHarness1.open();
-
-               // the one in-progress will be the one assigned to the next 
instance,
-               // the other is the test3 which is just not cleaned up
-               checkFs(outDir, 2, 0, 1, 1);
-
-               testHarness2 = createRescalingTestSink(outDir, 2, 1, 100);
-               testHarness2.setup();
-               testHarness2.initializeState(mergedSnapshot);
-               testHarness2.open();
-
-               checkFs(outDir, 1, 0, 2, 2);
-
-               testHarness1.close();
-               testHarness2.close();
-
-               // the 1 in-progress can be discarded.
-               checkFs(outDir, 1, 0, 2, 2);
-       }
-
-       @Test
-       public void testScalingDown() throws Exception {
-               final File outDir = tempFolder.newFolder();
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness1 
= createRescalingTestSink(outDir, 3, 0, 100);
-               testHarness1.setup();
-               testHarness1.open();
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness2 
= createRescalingTestSink(outDir, 3, 1, 100);
-               testHarness2.setup();
-               testHarness2.open();
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness3 
= createRescalingTestSink(outDir, 3, 2, 100);
-               testHarness3.setup();
-               testHarness3.open();
-
-               testHarness1.processElement(new StreamRecord<>("test1", 0L));
-               checkFs(outDir, 1, 0, 0, 0);
-
-               testHarness2.processElement(new StreamRecord<>("test2", 0L));
-               checkFs(outDir, 2, 0, 0, 0);
-
-               testHarness3.processElement(new StreamRecord<>("test3", 0L));
-               testHarness3.processElement(new StreamRecord<>("test4", 0L));
-               checkFs(outDir, 4, 0, 0, 0);
-
-               // intentionally we snapshot them in the reverse order so that 
the states are shuffled
-               OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
-                       testHarness3.snapshot(0, 0),
-                       testHarness1.snapshot(0, 0),
-                       testHarness2.snapshot(0, 0)
-               );
-
-               testHarness1 = createRescalingTestSink(outDir, 2, 0, 100);
-               testHarness1.setup();
-               testHarness1.initializeState(mergedSnapshot);
-               testHarness1.open();
-
-               checkFs(outDir, 1, 0, 3, 3);
-
-               testHarness2 = createRescalingTestSink(outDir, 2, 1, 100);
-               testHarness2.setup();
-               testHarness2.initializeState(mergedSnapshot);
-               testHarness2.open();
-
-               checkFs(outDir, 0, 0, 4, 4);
-       }
-
-       @Test
-       public void testScalingUp() throws Exception {
-               final File outDir = tempFolder.newFolder();
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness1 
= createRescalingTestSink(outDir, 2, 0, 100);
-               testHarness1.setup();
-               testHarness1.open();
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness2 
= createRescalingTestSink(outDir, 2, 0, 100);
-               testHarness2.setup();
-               testHarness2.open();
-
-               testHarness1.processElement(new StreamRecord<>("test1", 1L));
-               testHarness1.processElement(new StreamRecord<>("test2", 1L));
-
-               checkFs(outDir, 2, 0, 0, 0);
-
-               testHarness2.processElement(new StreamRecord<>("test3", 1L));
-               testHarness2.processElement(new StreamRecord<>("test4", 1L));
-               testHarness2.processElement(new StreamRecord<>("test5", 1L));
-
-               checkFs(outDir, 5, 0, 0, 0);
-
-               // intentionally we snapshot them in the reverse order so that 
the states are shuffled
-               OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
-                       testHarness2.snapshot(0, 0),
-                       testHarness1.snapshot(0, 0)
-               );
-
-               testHarness1 = createRescalingTestSink(outDir, 3, 0, 100);
-               testHarness1.setup();
-               testHarness1.initializeState(mergedSnapshot);
-               testHarness1.open();
-
-               checkFs(outDir, 2, 0, 3, 3);
-
-               testHarness2 = createRescalingTestSink(outDir, 3, 1, 100);
-               testHarness2.setup();
-               testHarness2.initializeState(mergedSnapshot);
-               testHarness2.open();
-
-               checkFs(outDir, 0, 0, 5, 5);
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness3 
= createRescalingTestSink(outDir, 3, 2, 100);
-               testHarness3.setup();
-               testHarness3.initializeState(mergedSnapshot);
-               testHarness3.open();
-
-               checkFs(outDir, 0, 0, 5, 5);
-
-               testHarness1.processElement(new StreamRecord<>("test6", 0));
-               testHarness2.processElement(new StreamRecord<>("test6", 0));
-               testHarness3.processElement(new StreamRecord<>("test6", 0));
-
-               checkFs(outDir, 3, 0, 5, 5);
-
-               testHarness1.snapshot(1, 0);
-               testHarness2.snapshot(1, 0);
-               testHarness3.snapshot(1, 0);
-
-               testHarness1.close();
-               testHarness2.close();
-               testHarness3.close();
-
-               checkFs(outDir, 0, 3, 5, 5);
-       }
-
-       private void checkFs(File outDir, int inprogress, int pending, int 
completed, int valid) throws IOException {
-               int inProg = 0;
-               int pend = 0;
-               int compl = 0;
-               int val = 0;
-
-               for (File file: FileUtils.listFiles(outDir, null, true)) {
-                       if (file.getAbsolutePath().endsWith("crc")) {
-                               continue;
-                       }
-                       String path = file.getPath();
-                       if (path.endsWith(IN_PROGRESS_SUFFIX)) {
-                               inProg++;
-                       } else if (path.endsWith(PENDING_SUFFIX)) {
-                               pend++;
-                       } else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
-                               val++;
-                       } else if (path.contains(PART_PREFIX)) {
-                               compl++;
-                       }
-               }
-
-               Assert.assertEquals(inprogress, inProg);
-               Assert.assertEquals(pending, pend);
-               Assert.assertEquals(completed, compl);
-               Assert.assertEquals(valid, val);
-       }
-
-       /**
-        * This tests {@link StringWriter} with
-        * non-bucketing output.
-        */
-       @Test
-       public void testNonRollingStringWriter() throws Exception {
-               final String outPath = hdfsURI + "/string-non-rolling-out";
-
-               final int numElements = 20;
-
-               BucketingSink<String> sink = new BucketingSink<String>(outPath)
-                       .setBucketer(new BasePathBucketer<String>())
-                       .setPartPrefix(PART_PREFIX)
-                       .setPendingPrefix("")
-                       .setPendingSuffix("");
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createTestSink(sink, 1, 0);
-
-               testHarness.setProcessingTime(0L);
-
-               testHarness.setup();
-               testHarness.open();
-
-               for (int i = 0; i < numElements; i++) {
-                       testHarness.processElement(new StreamRecord<>("message 
#" + Integer.toString(i)));
-               }
-
-               testHarness.close();
-
-               FSDataInputStream inStream = dfs.open(new Path(outPath + "/" + 
PART_PREFIX + "-0-0"));
-
-               BufferedReader br = new BufferedReader(new 
InputStreamReader(inStream));
-
-               for (int i = 0; i < numElements; i++) {
-                       String line = br.readLine();
-                       Assert.assertEquals("message #" + i, line);
-               }
-
-               inStream.close();
-       }
-
-       /**
-        * This tests {@link SequenceFileWriter}
-        * with non-rolling output and without compression.
-        */
-       @Test
-       public void testNonRollingSequenceFileWithoutCompressionWriter() throws 
Exception {
-               final String outPath = hdfsURI + "/seq-no-comp-non-rolling-out";
-
-               final int numElements = 20;
-
-               BucketingSink<Tuple2<IntWritable, Text>> sink = new 
BucketingSink<Tuple2<IntWritable, Text>>(outPath)
-                       .setWriter(new SequenceFileWriter<IntWritable, Text>())
-                       .setBucketer(new BasePathBucketer<Tuple2<IntWritable, 
Text>>())
-                       .setPartPrefix(PART_PREFIX)
-                       .setPendingPrefix("")
-                       .setPendingSuffix("");
-
-               sink.setInputType(TypeInformation.of(new 
TypeHint<Tuple2<IntWritable, Text>>(){}), new ExecutionConfig());
-
-               OneInputStreamOperatorTestHarness<Tuple2<IntWritable, Text>, 
Object> testHarness =
-                       createTestSink(sink, 1, 0);
-
-               testHarness.setProcessingTime(0L);
-
-               testHarness.setup();
-               testHarness.open();
-
-               for (int i = 0; i < numElements; i++) {
-                       testHarness.processElement(new StreamRecord<>(Tuple2.of(
-                               new IntWritable(i),
-                               new Text("message #" + Integer.toString(i))
-                       )));
-               }
-
-               testHarness.close();
-
-               FSDataInputStream inStream = dfs.open(new Path(outPath + "/" + 
PART_PREFIX + "-0-0"));
-
-               SequenceFile.Reader reader = new SequenceFile.Reader(inStream, 
1000, 0, 100000, new Configuration());
-
-               IntWritable intWritable = new IntWritable();
-               Text txt = new Text();
-
-               for (int i = 0; i < numElements; i++) {
-                       reader.next(intWritable, txt);
-                       Assert.assertEquals(i, intWritable.get());
-                       Assert.assertEquals("message #" + i, txt.toString());
-               }
-
-               reader.close();
-               inStream.close();
-       }
-
-       /**
-        * This tests {@link AvroKeyValueSinkWriter}
-        * with non-rolling output and with compression.
-        */
-       @Test
-       public void testNonRollingAvroKeyValueWithCompressionWriter() throws 
Exception {
-               final String outPath = hdfsURI + 
"/avro-kv-no-comp-non-rolling-out";
-
-               final int numElements = 20;
-
-               Map<String, String> properties = new HashMap<>();
-               Schema keySchema = Schema.create(Schema.Type.INT);
-               Schema valueSchema = Schema.create(Schema.Type.STRING);
-               properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, 
keySchema.toString());
-               properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, 
valueSchema.toString());
-               properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, 
String.valueOf(true));
-               properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, 
DataFileConstants.SNAPPY_CODEC);
-
-               BucketingSink<Tuple2<Integer, String>> sink = new 
BucketingSink<Tuple2<Integer, String>>(outPath)
-                       .setWriter(new AvroKeyValueSinkWriter<Integer, 
String>(properties))
-                       .setBucketer(new BasePathBucketer<Tuple2<Integer, 
String>>())
-                       .setPartPrefix(PART_PREFIX)
-                       .setPendingPrefix("")
-                       .setPendingSuffix("");
-
-               OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, 
Object> testHarness =
-                       createTestSink(sink, 1, 0);
-
-               testHarness.setProcessingTime(0L);
-
-               testHarness.setup();
-               testHarness.open();
-
-               for (int i = 0; i < numElements; i++) {
-                       testHarness.processElement(new StreamRecord<>(Tuple2.of(
-                               i, "message #" + Integer.toString(i)
-                       )));
-               }
-
-               testHarness.close();
-
-               GenericData.setStringType(valueSchema, 
GenericData.StringType.String);
-               Schema elementSchema = 
AvroKeyValueSinkWriter.AvroKeyValue.getSchema(keySchema, valueSchema);
-
-               FSDataInputStream inStream = dfs.open(new Path(outPath + "/" + 
PART_PREFIX + "-0-0"));
-
-               SpecificDatumReader<GenericRecord> elementReader = new 
SpecificDatumReader<>(elementSchema);
-               DataFileStream<GenericRecord> dataFileStream = new 
DataFileStream<>(inStream, elementReader);
-               for (int i = 0; i < numElements; i++) {
-                       AvroKeyValueSinkWriter.AvroKeyValue<Integer, String> 
wrappedEntry =
-                               new 
AvroKeyValueSinkWriter.AvroKeyValue<>(dataFileStream.next());
-                       int key = wrappedEntry.getKey();
-                       Assert.assertEquals(i, key);
-                       String value = wrappedEntry.getValue();
-                       Assert.assertEquals("message #" + i, value);
-               }
-
-               dataFileStream.close();
-               inStream.close();
-       }
-
-       /**
-        * This uses {@link DateTimeBucketer} to
-        * produce rolling files. We use {@link 
OneInputStreamOperatorTestHarness} to manually
-        * advance processing time.
-        */
-       @Test
-       public void testDateTimeRollingStringWriter() throws Exception {
-               final int numElements = 20;
-
-               final String outPath = hdfsURI + "/rolling-out";
-
-               BucketingSink<String> sink = new BucketingSink<String>(outPath)
-                       .setBucketer(new DateTimeBucketer<String>("ss"))
-                       .setPartPrefix(PART_PREFIX)
-                       .setPendingPrefix("")
-                       .setPendingSuffix("");
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createTestSink(sink, 1, 0);
-
-               testHarness.setProcessingTime(0L);
-
-               testHarness.setup();
-               testHarness.open();
-
-               for (int i = 0; i < numElements; i++) {
-                       // Every 5 elements, increase the clock time. We should 
end up with 5 elements per bucket.
-                       if (i % 5 == 0) {
-                               testHarness.setProcessingTime(i * 1000L);
-                       }
-                       testHarness.processElement(new StreamRecord<>("message 
#" + Integer.toString(i)));
-               }
-
-               testHarness.close();
-
-               RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new 
Path(outPath), true);
-
-               // We should have 4 rolling files across 4 time intervals
-               int numFiles = 0;
-               while (files.hasNext()) {
-                       LocatedFileStatus file = files.next();
-                       numFiles++;
-                       if 
(file.getPath().toString().contains("rolling-out/00")) {
-                               FSDataInputStream inStream = 
dfs.open(file.getPath());
-
-                               BufferedReader br = new BufferedReader(new 
InputStreamReader(inStream));
-
-                               for (int i = 0; i < 5; i++) {
-                                       String line = br.readLine();
-                                       Assert.assertEquals("message #" + i, 
line);
-                               }
-
-                               inStream.close();
-                       } else if 
(file.getPath().toString().contains("rolling-out/05")) {
-                               FSDataInputStream inStream = 
dfs.open(file.getPath());
-
-                               BufferedReader br = new BufferedReader(new 
InputStreamReader(inStream));
-
-                               for (int i = 5; i < 10; i++) {
-                                       String line = br.readLine();
-                                       Assert.assertEquals("message #" + i, 
line);
-                               }
-
-                               inStream.close();
-                       } else if 
(file.getPath().toString().contains("rolling-out/10")) {
-                               FSDataInputStream inStream = 
dfs.open(file.getPath());
-
-                               BufferedReader br = new BufferedReader(new 
InputStreamReader(inStream));
-
-                               for (int i = 10; i < 15; i++) {
-                                       String line = br.readLine();
-                                       Assert.assertEquals("message #" + i, 
line);
-                               }
-
-                               inStream.close();
-                       } else if 
(file.getPath().toString().contains("rolling-out/15")) {
-                               FSDataInputStream inStream = 
dfs.open(file.getPath());
-
-                               BufferedReader br = new BufferedReader(new 
InputStreamReader(inStream));
-
-                               for (int i = 15; i < 20; i++) {
-                                       String line = br.readLine();
-                                       Assert.assertEquals("message #" + i, 
line);
-                               }
-
-                               inStream.close();
-                       } else {
-                               Assert.fail("File " + file + " does not match 
any expected roll pattern.");
-                       }
-               }
-
-               Assert.assertEquals(4, numFiles);
-       }
-
-       /**
-        * This uses a custom bucketing function which determines the bucket 
from the input.
-        */
-       @Test
-       public void testCustomBucketing() throws Exception {
-               File dataDir = tempFolder.newFolder();
-
-               final int numIds = 4;
-               final int numElements = 20;
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createTestSink(dataDir, 1, 0);
-
-               testHarness.setProcessingTime(0L);
-
-               testHarness.setup();
-               testHarness.open();
-
-               for (int i = 0; i < numElements; i++) {
-                       testHarness.processElement(new 
StreamRecord<>(Integer.toString(i % numIds)));
-               }
-
-               testHarness.close();
-
-               // we should have 4 buckets, with 1 file each
-               int numFiles = 0;
-               for (File file: FileUtils.listFiles(dataDir, null, true)) {
-                       if (file.getName().startsWith(PART_PREFIX)) {
-                               numFiles++;
-                       }
-               }
-
-               Assert.assertEquals(4, numFiles);
-       }
-
-       /**
-        * This uses a custom bucketing function which determines the bucket 
from the input.
-        * We use a simulated clock to reduce the number of buckets being 
written to over time.
-        * This causes buckets to become 'inactive' and their file parts 
'closed' by the sink.
-        */
-       @Test
-       public void testCustomBucketingInactiveBucketCleanup() throws Exception 
{
-               File dataDir = tempFolder.newFolder();
-
-               final int step1NumIds = 4;
-               final int step2NumIds = 2;
-               final int numElementsPerStep = 20;
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createTestSink(dataDir, 1, 0);
-
-               testHarness.setProcessingTime(0L);
-
-               testHarness.setup();
-               testHarness.open();
-
-               for (int i = 0; i < numElementsPerStep; i++) {
-                       testHarness.processElement(new 
StreamRecord<>(Integer.toString(i % step1NumIds)));
-               }
-
-               testHarness.setProcessingTime(2*60*1000L);
-
-               for (int i = 0; i < numElementsPerStep; i++) {
-                       testHarness.processElement(new 
StreamRecord<>(Integer.toString(i % step2NumIds)));
-               }
-
-               testHarness.setProcessingTime(6*60*1000L);
-
-               for (int i = 0; i < numElementsPerStep; i++) {
-                       testHarness.processElement(new 
StreamRecord<>(Integer.toString(i % step2NumIds)));
-               }
-
-               // we should have 4 buckets, with 1 file each
-               // 2 of these buckets should have been finalised due to 
becoming inactive
-               int numFiles = 0;
-               int numInProgress = 0;
-               for (File file: FileUtils.listFiles(dataDir, null, true)) {
-                       if (file.getAbsolutePath().endsWith("crc")) {
-                               continue;
-                       }
-                       if (file.getPath().endsWith(IN_PROGRESS_SUFFIX)) {
-                               numInProgress++;
-                       }
-                       numFiles++;
-               }
-
-               testHarness.close();
-
-               Assert.assertEquals(4, numFiles);
-               Assert.assertEquals(2, numInProgress);
-       }
-
-       /**
-        * This tests user defined hdfs configuration
-        * @throws Exception
-        */
-       @Test
-       public void testUserDefinedConfiguration() throws Exception {
-               final String outPath = hdfsURI + 
"/string-non-rolling-with-config";
-               final int numElements = 20;
-
-               Map<String, String> properties = new HashMap<>();
-               Schema keySchema = Schema.create(Schema.Type.INT);
-               Schema valueSchema = Schema.create(Schema.Type.STRING);
-               properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, 
keySchema.toString());
-               properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, 
valueSchema.toString());
-               properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, 
String.valueOf(true));
-               properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, 
DataFileConstants.SNAPPY_CODEC);
-
-               Configuration conf = new Configuration();
-               conf.set("io.file.buffer.size", "40960");
-
-               BucketingSink<Tuple2<Integer,String>> sink = new 
BucketingSink<Tuple2<Integer, String>>(outPath)
-                       .setFSConfig(conf)
-                       .setWriter(new StreamWriterWithConfigCheck<Integer, 
String>(properties, "io.file.buffer.size", "40960"))
-                       .setBucketer(new 
BasePathBucketer<Tuple2<Integer,String>>())
-                       .setPartPrefix(PART_PREFIX)
-                       .setPendingPrefix("")
-                       .setPendingSuffix("");
-
-               OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, 
Object> testHarness =
-                       createTestSink(sink, 1, 0);
-
-               testHarness.setProcessingTime(0L);
-
-               testHarness.setup();
-               testHarness.open();
-
-               for (int i = 0; i < numElements; i++) {
-                       testHarness.processElement(new StreamRecord<>(Tuple2.of(
-                               i, "message #" + Integer.toString(i)
-                       )));
-               }
-
-               testHarness.close();
-
-               GenericData.setStringType(valueSchema, 
GenericData.StringType.String);
-               Schema elementSchema = 
AvroKeyValueSinkWriter.AvroKeyValue.getSchema(keySchema, valueSchema);
-
-               FSDataInputStream inStream = dfs.open(new Path(outPath + "/" + 
PART_PREFIX + "-0-0"));
-
-               SpecificDatumReader<GenericRecord> elementReader = new 
SpecificDatumReader<>(elementSchema);
-               DataFileStream<GenericRecord> dataFileStream = new 
DataFileStream<>(inStream, elementReader);
-               for (int i = 0; i < numElements; i++) {
-                       AvroKeyValueSinkWriter.AvroKeyValue<Integer, String> 
wrappedEntry =
-                               new 
AvroKeyValueSinkWriter.AvroKeyValue<>(dataFileStream.next());
-                       int key = wrappedEntry.getKey();
-                       Assert.assertEquals(i, key);
-                       String value = wrappedEntry.getValue();
-                       Assert.assertEquals("message #" + i, value);
-               }
-
-               dataFileStream.close();
-               inStream.close();
-       }
-
-       private static class StreamWriterWithConfigCheck<K, V> extends 
AvroKeyValueSinkWriter<K, V> {
-               private Map<String, String> properties;
-               private String key;
-               private String expect;
-               public StreamWriterWithConfigCheck(Map<String, String> 
properties, String key, String expect) {
-                       super(properties);
-                       this.properties = properties;
-                       this.key = key;
-                       this.expect = expect;
-               }
-
-               @Override
-               public void open(FileSystem fs, Path path) throws IOException {
-                       super.open(fs, path);
-                       Assert.assertEquals(expect, fs.getConf().get(key));
-               }
-
-               @Override
-               public Writer<Tuple2<K, V>> duplicate() {
-                       return new StreamWriterWithConfigCheck<>(properties, 
key, expect);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
deleted file mode 100644
index 5c22851..0000000
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,29 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-
-log4j.logger.org.apache.directory=OFF, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml
 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml
deleted file mode 100644
index 45b3b92..0000000
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-    <logger name="org.apache.flink.streaming" level="WARN"/>
-</configuration>
\ No newline at end of file

Reply via email to