http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index 0406afc..c208079 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -20,8 +20,8 @@ package 
org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.api.common.serialization.Encoder;
-import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
@@ -30,29 +30,23 @@ import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerial
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies.DefaultRollingPolicy;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies.OnCheckpointRollingPolicy;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.Preconditions;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
+import java.io.Serializable;
 
 /**
  * Sink that emits its input elements to {@link FileSystem} files within 
buckets. This is
@@ -69,7 +63,9 @@ import java.util.Map;
  * be written to inside the base directory. The {@code Bucketer} can, for 
example, use time or
  * a property of the element to determine the bucket directory. The default 
{@code Bucketer} is a
  * {@link DateTimeBucketer} which will create one new bucket every hour. You 
can specify
- * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}.
+ * a custom {@code Bucketer} using the {@code setBucketer(Bucketer)} method, 
after calling
+ * {@link StreamingFileSink#forRowFormat(Path, Encoder)} or
+ * {@link StreamingFileSink#forBulkFormat(Path, BulkWriter.Factory)}.
  *
  *
  * <p>The filenames of the part files contain the part prefix, "part-", the 
parallel subtask index of the sink
@@ -94,19 +90,6 @@ import java.util.Map;
  * state are transferred into the {@code finished} state while any {@code 
in-progress} files are rolled back, so that
  * they do not contain data that arrived after the checkpoint from which we 
restore.
  *
- * <p><b>NOTE:</b>
- * <ol>
- *     <li>
- *         If checkpointing is not enabled the pending files will never be 
moved to the finished state.
- *     </li>
- *     <li>
- *         The part files are written using an instance of {@link Encoder}. By 
default, a
- *         {@link SimpleStringEncoder} is used, which writes the result of 
{@code toString()} for
- *         every element, separated by newlines. You can configure the writer 
using the
- *         {@link #setEncoder(Encoder)}.
- *     </li>
- * </ol>
- *
  * @param <IN> Type of the elements emitted by this sink
  */
 @PublicEvolving
@@ -116,8 +99,6 @@ public class StreamingFileSink<IN>
 
        private static final long serialVersionUID = 1L;
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(StreamingFileSink.class);
-
        // -------------------------- state descriptors 
---------------------------
 
        private static final ListStateDescriptor<byte[]> BUCKET_STATE_DESC =
@@ -128,298 +109,264 @@ public class StreamingFileSink<IN>
 
        // ------------------------ configuration fields 
--------------------------
 
-       private final Path basePath;
-
-       private final BucketFactory<IN> bucketFactory;
-
-       private long bucketCheckInterval = 60L * 1000L;
-
-       private Bucketer<IN> bucketer;
+       private final long bucketCheckInterval;
 
-       private Encoder<IN> encoder;
-
-       private RollingPolicy rollingPolicy;
+       private final StreamingFileSink.BucketsBuilder<IN, ?> bucketsBuilder;
 
        // --------------------------- runtime fields 
-----------------------------
 
-       private transient BucketerContext bucketerContext;
-
-       private transient RecoverableWriter fileSystemWriter;
+       private transient Buckets<IN, ?> buckets;
 
        private transient ProcessingTimeService processingTimeService;
 
-       private transient Map<String, Bucket<IN>> activeBuckets;
+       // --------------------------- State Related Fields 
-----------------------------
 
-       //////////////////                      State Related Fields            
        /////////////////////
+       private transient ListState<byte[]> bucketStates;
 
-       private transient BucketStateSerializer bucketStateSerializer;
+       private transient ListState<Long> maxPartCountersState;
 
-       private transient ListState<byte[]> restoredBucketStates;
+       /**
+        * Creates a new {@code StreamingFileSink} that writes files to the 
given base directory.
+        */
+       private StreamingFileSink(
+                       final StreamingFileSink.BucketsBuilder<IN, ?> 
bucketsBuilder,
+                       final long bucketCheckInterval) {
+
+               Preconditions.checkArgument(bucketCheckInterval > 0L);
 
-       private transient ListState<Long> restoredMaxCounters;
+               this.bucketsBuilder = 
Preconditions.checkNotNull(bucketsBuilder);
+               this.bucketCheckInterval = bucketCheckInterval;
+       }
 
-       private transient long initMaxPartCounter;
+       // 
------------------------------------------------------------------------
 
-       private transient long maxPartCounterUsed;
+       // --------------------------- Sink Builders  
-----------------------------
 
        /**
-        * Creates a new {@code StreamingFileSink} that writes files to the 
given base directory.
-        *
-        * <p>This uses a {@link DateTimeBucketer} as {@link Bucketer} and a 
{@link SimpleStringEncoder} as a writer.
-        *
-        * @param basePath The directory to which to write the bucket files.
+        * Creates the builder for a {@code StreamingFileSink} with 
row-encoding format.
+        * @param basePath the base path where all the buckets are going to be 
created as sub-directories.
+        * @param encoder the {@link Encoder} to be used when writing elements 
in the buckets.
+        * @param <IN> the type of incoming elements
+        * @return The builder where the remaining of the configuration 
parameters for the sink can be configured.
+        * In order to instantiate the sink, call {@link 
RowFormatBuilder#build()} after specifying the desired parameters.
         */
-       public StreamingFileSink(Path basePath) {
-               this(basePath, new DefaultBucketFactory<>());
+       public static <IN> StreamingFileSink.RowFormatBuilder<IN, String> 
forRowFormat(
+                       final Path basePath, final Encoder<IN> encoder) {
+               return new StreamingFileSink.RowFormatBuilder<>(basePath, 
encoder, new DateTimeBucketer<>());
        }
 
-       @VisibleForTesting
-       StreamingFileSink(Path basePath, BucketFactory<IN> bucketFactory) {
-               this.basePath = Preconditions.checkNotNull(basePath);
-               this.bucketer = new DateTimeBucketer<>();
-               this.encoder = new SimpleStringEncoder<>();
-               this.rollingPolicy = DefaultRollingPolicy.create().build();
-               this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
+       /**
+        * Creates the builder for a {@link StreamingFileSink} with 
row-encoding format.
+        * @param basePath the base path where all the buckets are going to be 
created as sub-directories.
+        * @param writerFactory the {@link BulkWriter.Factory} to be used when 
writing elements in the buckets.
+        * @param <IN> the type of incoming elements
+        * @return The builder where the remaining of the configuration 
parameters for the sink can be configured.
+        * In order to instantiate the sink, call {@link 
RowFormatBuilder#build()} after specifying the desired parameters.
+        */
+       public static <IN> StreamingFileSink.BulkFormatBuilder<IN, String> 
forBulkFormat(
+                       final Path basePath, final BulkWriter.Factory<IN> 
writerFactory) {
+               return new StreamingFileSink.BulkFormatBuilder<>(basePath, 
writerFactory, new DateTimeBucketer<>());
        }
 
-       public StreamingFileSink<IN> setEncoder(Encoder<IN> encoder) {
-               this.encoder = Preconditions.checkNotNull(encoder);
-               return this;
-       }
+       /**
+        * The base abstract class for the {@link RowFormatBuilder} and {@link 
BulkFormatBuilder}.
+        */
+       private abstract static class BucketsBuilder<IN, BucketID> implements 
Serializable {
 
-       public StreamingFileSink<IN> setBucketer(Bucketer<IN> bucketer) {
-               this.bucketer = Preconditions.checkNotNull(bucketer);
-               return this;
-       }
+               private static final long serialVersionUID = 1L;
 
-       public StreamingFileSink<IN> setBucketCheckInterval(long interval) {
-               this.bucketCheckInterval = interval;
-               return this;
+               abstract Buckets<IN, BucketID> createBuckets(final int 
subtaskIndex) throws IOException;
        }
 
-       public StreamingFileSink<IN> setRollingPolicy(RollingPolicy policy) {
-               this.rollingPolicy = Preconditions.checkNotNull(policy);
-               return this;
-       }
+       /**
+        * A builder for configuring the sink for row-wise encoding formats.
+        */
+       @PublicEvolving
+       public static class RowFormatBuilder<IN, BucketID> extends 
StreamingFileSink.BucketsBuilder<IN, BucketID> {
 
-       @Override
-       public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
-               final Iterator<Map.Entry<String, Bucket<IN>>> activeBucketIt =
-                               activeBuckets.entrySet().iterator();
-
-               while (activeBucketIt.hasNext()) {
-                       Bucket<IN> bucket = activeBucketIt.next().getValue();
-                       bucket.commitUpToCheckpoint(checkpointId);
-
-                       if (!bucket.isActive()) {
-                               // We've dealt with all the pending files and 
the writer for this bucket is not currently open.
-                               // Therefore this bucket is currently inactive 
and we can remove it from our state.
-                               activeBucketIt.remove();
-                       }
-               }
-       }
+               private static final long serialVersionUID = 1L;
 
-       @Override
-       public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
-               Preconditions.checkState(
-                               restoredBucketStates != null && 
fileSystemWriter != null && bucketStateSerializer != null,
-                               "sink has not been initialized");
+               private long bucketCheckInterval = 60L * 1000L;
+
+               private final Path basePath;
 
-               restoredBucketStates.clear();
-               for (Bucket<IN> bucket : activeBuckets.values()) {
+               private final Encoder<IN> encoder;
 
-                       final PartFileInfo info = 
bucket.getInProgressPartInfo();
-                       final long checkpointTimestamp = 
context.getCheckpointTimestamp();
+               private Bucketer<IN, BucketID> bucketer;
 
-                       if (info != null && rollingPolicy.shouldRoll(info, 
checkpointTimestamp)) {
-                               // we also check here so that we do not have to 
always
-                               // wait for the "next" element to arrive.
-                               bucket.closePartFile();
-                       }
+               private RollingPolicy<BucketID> rollingPolicy;
 
-                       final BucketState bucketState = 
bucket.snapshot(context.getCheckpointId());
-                       
restoredBucketStates.add(SimpleVersionedSerialization.writeVersionAndSerialize(bucketStateSerializer,
 bucketState));
+               private BucketFactory<IN, BucketID> bucketFactory = new 
DefaultBucketFactory<>();
+
+               RowFormatBuilder(Path basePath, Encoder<IN> encoder, 
Bucketer<IN, BucketID> bucketer) {
+                       this.basePath = Preconditions.checkNotNull(basePath);
+                       this.encoder = Preconditions.checkNotNull(encoder);
+                       this.bucketer = Preconditions.checkNotNull(bucketer);
+                       this.rollingPolicy = 
DefaultRollingPolicy.create().build();
                }
 
-               restoredMaxCounters.clear();
-               restoredMaxCounters.add(maxPartCounterUsed);
-       }
+               public StreamingFileSink.RowFormatBuilder<IN, BucketID> 
withBucketCheckInterval(final long interval) {
+                       this.bucketCheckInterval = interval;
+                       return this;
+               }
 
-       @Override
-       public void initializeState(FunctionInitializationContext context) 
throws Exception {
-               initFileSystemWriter();
+               public StreamingFileSink.RowFormatBuilder<IN, BucketID> 
withBucketer(final Bucketer<IN, BucketID> bucketer) {
+                       this.bucketer = Preconditions.checkNotNull(bucketer);
+                       return this;
+               }
 
-               this.activeBuckets = new HashMap<>();
+               public StreamingFileSink.RowFormatBuilder<IN, BucketID> 
withRollingPolicy(final RollingPolicy<BucketID> policy) {
+                       this.rollingPolicy = Preconditions.checkNotNull(policy);
+                       return this;
+               }
 
-               // When resuming after a failure:
-               // 1) we get the max part counter used before in order to make 
sure that we do not overwrite valid data
-               // 2) we commit any pending files for previous checkpoints 
(previous to the last successful one)
-               // 3) we resume writing to the previous in-progress file of 
each bucket, and
-               // 4) if we receive multiple states for the same bucket, we 
merge them.
+               public <ID> StreamingFileSink.RowFormatBuilder<IN, ID> 
withBucketerAndPolicy(final Bucketer<IN, ID> bucketer, final RollingPolicy<ID> 
policy) {
+                       @SuppressWarnings("unchecked")
+                       StreamingFileSink.RowFormatBuilder<IN, ID> 
reInterpreted = (StreamingFileSink.RowFormatBuilder<IN, ID>) this;
+                       reInterpreted.bucketer = 
Preconditions.checkNotNull(bucketer);
+                       reInterpreted.rollingPolicy = 
Preconditions.checkNotNull(policy);
+                       return reInterpreted;
+               }
 
-               final OperatorStateStore stateStore = 
context.getOperatorStateStore();
+               @VisibleForTesting
+               StreamingFileSink.RowFormatBuilder<IN, BucketID> 
withBucketFactory(final BucketFactory<IN, BucketID> factory) {
+                       this.bucketFactory = 
Preconditions.checkNotNull(factory);
+                       return this;
+               }
 
-               restoredBucketStates = 
stateStore.getListState(BUCKET_STATE_DESC);
-               restoredMaxCounters = 
stateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC);
+               /** Creates the actual sink. */
+               public StreamingFileSink<IN> build() {
+                       return new StreamingFileSink<>(this, 
bucketCheckInterval);
+               }
 
-               if (context.isRestored()) {
-                       final int subtaskIndex = 
getRuntimeContext().getIndexOfThisSubtask();
-
-                       LOG.info("Restoring state for the {} (taskIdx={}).", 
getClass().getSimpleName(), subtaskIndex);
-
-                       long maxCounter = 0L;
-                       for (long partCounter: restoredMaxCounters.get()) {
-                               maxCounter = Math.max(partCounter, maxCounter);
-                       }
-                       initMaxPartCounter = maxCounter;
-
-                       for (byte[] recoveredState : 
restoredBucketStates.get()) {
-                               final BucketState bucketState = 
SimpleVersionedSerialization.readVersionAndDeSerialize(
-                                               bucketStateSerializer, 
recoveredState);
-
-                               final String bucketId = 
bucketState.getBucketId();
-
-                               LOG.info("Recovered bucket for {}", bucketId);
-
-                               final Bucket<IN> restoredBucket = 
bucketFactory.restoreBucket(
-                                               fileSystemWriter,
-                                               subtaskIndex,
-                                               initMaxPartCounter,
-                                               encoder,
-                                               bucketState
-                               );
-
-                               final Bucket<IN> existingBucket = 
activeBuckets.get(bucketId);
-                               if (existingBucket == null) {
-                                       activeBuckets.put(bucketId, 
restoredBucket);
-                               } else {
-                                       existingBucket.merge(restoredBucket);
-                               }
-
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.debug("{} idx {} restored state for 
bucket {}", getClass().getSimpleName(),
-                                                       subtaskIndex, 
assembleBucketPath(bucketId));
-                               }
-                       }
+               @Override
+               Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws 
IOException {
+                       return new Buckets<>(
+                                       basePath,
+                                       bucketer,
+                                       bucketFactory,
+                                       new 
RowWisePartWriter.Factory<>(encoder),
+                                       rollingPolicy,
+                                       subtaskIndex);
                }
        }
 
-       @Override
-       public void open(Configuration parameters) throws Exception {
-               super.open(parameters);
+       /**
+        * A builder for configuring the sink for bulk-encoding formats, e.g. 
Parquet/ORC.
+        */
+       @PublicEvolving
+       public static class BulkFormatBuilder<IN, BucketID> extends 
StreamingFileSink.BucketsBuilder<IN, BucketID> {
 
-               processingTimeService = ((StreamingRuntimeContext) 
getRuntimeContext()).getProcessingTimeService();
-               long currentProcessingTime = 
processingTimeService.getCurrentProcessingTime();
-               processingTimeService.registerTimer(currentProcessingTime + 
bucketCheckInterval, this);
-               this.bucketerContext = new BucketerContext();
-       }
+               private static final long serialVersionUID = 1L;
 
-       @Override
-       public void onProcessingTime(long timestamp) throws Exception {
-               final long currentTime = 
processingTimeService.getCurrentProcessingTime();
-               for (Bucket<IN> bucket : activeBuckets.values()) {
-                       final PartFileInfo info = 
bucket.getInProgressPartInfo();
-                       if (info != null && rollingPolicy.shouldRoll(info, 
currentTime)) {
-                               bucket.closePartFile();
-                       }
-               }
-               processingTimeService.registerTimer(timestamp + 
bucketCheckInterval, this);
-       }
+               private long bucketCheckInterval = 60L * 1000L;
 
-       @Override
-       public void invoke(IN value, Context context) throws Exception {
-               final long currentProcessingTime = 
processingTimeService.getCurrentProcessingTime();
-               final int subtaskIndex = 
getRuntimeContext().getIndexOfThisSubtask();
+               private final Path basePath;
+
+               private final BulkWriter.Factory<IN> writerFactory;
+
+               private Bucketer<IN, BucketID> bucketer;
 
-               // setting the values in the bucketer context
-               bucketerContext.update(context.timestamp(), 
context.currentWatermark(), currentProcessingTime);
-
-               final String bucketId = bucketer.getBucketId(value, 
bucketerContext);
-
-               Bucket<IN> bucket = activeBuckets.get(bucketId);
-               if (bucket == null) {
-                       final Path bucketPath = assembleBucketPath(bucketId);
-                       bucket = bucketFactory.getNewBucket(
-                                       fileSystemWriter,
-                                       subtaskIndex,
-                                       bucketId,
-                                       bucketPath,
-                                       initMaxPartCounter,
-                                       encoder);
-                       activeBuckets.put(bucketId, bucket);
+               private BucketFactory<IN, BucketID> bucketFactory = new 
DefaultBucketFactory<>();
+
+               BulkFormatBuilder(Path basePath, BulkWriter.Factory<IN> 
writerFactory, Bucketer<IN, BucketID> bucketer) {
+                       this.basePath = Preconditions.checkNotNull(basePath);
+                       this.writerFactory = 
Preconditions.checkNotNull(writerFactory);
+                       this.bucketer = Preconditions.checkNotNull(bucketer);
                }
 
-               final PartFileInfo info = bucket.getInProgressPartInfo();
-               if (info == null || rollingPolicy.shouldRoll(info, 
currentProcessingTime)) {
-                       bucket.rollPartFile(currentProcessingTime);
+               public StreamingFileSink.BulkFormatBuilder<IN, BucketID> 
withBucketCheckInterval(long interval) {
+                       this.bucketCheckInterval = interval;
+                       return this;
                }
-               bucket.write(value, currentProcessingTime);
 
-               // we update the counter here because as buckets become 
inactive and
-               // get removed in the initializeState(), at the time we 
snapshot they
-               // may not be there to take them into account during 
checkpointing.
-               updateMaxPartCounter(bucket.getPartCounter());
-       }
+               public <ID> StreamingFileSink.BulkFormatBuilder<IN, ID> 
withBucketer(Bucketer<IN, ID> bucketer) {
+                       @SuppressWarnings("unchecked")
+                       StreamingFileSink.BulkFormatBuilder<IN, ID> 
reInterpreted = (StreamingFileSink.BulkFormatBuilder<IN, ID>) this;
+                       reInterpreted.bucketer = 
Preconditions.checkNotNull(bucketer);
+                       return reInterpreted;
+               }
 
-       @Override
-       public void close() throws Exception {
-               if (activeBuckets != null) {
-                       activeBuckets.values().forEach(Bucket::dispose);
+               @VisibleForTesting
+               StreamingFileSink.BulkFormatBuilder<IN, BucketID> 
withBucketFactory(final BucketFactory<IN, BucketID> factory) {
+                       this.bucketFactory = 
Preconditions.checkNotNull(factory);
+                       return this;
                }
-       }
 
-       private void initFileSystemWriter() throws IOException {
-               if (fileSystemWriter == null) {
-                       fileSystemWriter = 
FileSystem.get(basePath.toUri()).createRecoverableWriter();
-                       bucketStateSerializer = new BucketStateSerializer(
-                                       
fileSystemWriter.getResumeRecoverableSerializer(),
-                                       
fileSystemWriter.getCommitRecoverableSerializer()
-                       );
+               /** Creates the actual sink. */
+               public StreamingFileSink<IN> build() {
+                       return new StreamingFileSink<>(this, 
bucketCheckInterval);
                }
-       }
 
-       private void updateMaxPartCounter(long candidate) {
-               maxPartCounterUsed = Math.max(maxPartCounterUsed, candidate);
+               @Override
+               Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws 
IOException {
+                       return new Buckets<>(
+                                       basePath,
+                                       bucketer,
+                                       bucketFactory,
+                                       new 
BulkPartWriter.Factory<>(writerFactory),
+                                       new OnCheckpointRollingPolicy<>(),
+                                       subtaskIndex);
+               }
        }
 
-       private Path assembleBucketPath(String bucketId) {
-               return new Path(basePath, bucketId);
+       // --------------------------- Sink Methods 
-----------------------------
+
+       @Override
+       public void initializeState(FunctionInitializationContext context) 
throws Exception {
+               final int subtaskIndex = 
getRuntimeContext().getIndexOfThisSubtask();
+               this.buckets = bucketsBuilder.createBuckets(subtaskIndex);
+
+               final OperatorStateStore stateStore = 
context.getOperatorStateStore();
+               bucketStates = stateStore.getListState(BUCKET_STATE_DESC);
+               maxPartCountersState = 
stateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC);
+
+               if (context.isRestored()) {
+                       buckets.initializeState(bucketStates, 
maxPartCountersState);
+               }
        }
 
-       /**
-        * The {@link Bucketer.Context} exposed to the
-        * {@link Bucketer#getBucketId(Object, Bucketer.Context)}
-        * whenever a new incoming element arrives.
-        */
-       private static class BucketerContext implements Bucketer.Context {
+       @Override
+       public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+               buckets.publishUpToCheckpoint(checkpointId);
+       }
 
-               @Nullable
-               private Long elementTimestamp;
+       @Override
+       public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+               Preconditions.checkState(bucketStates != null && 
maxPartCountersState != null, "sink has not been initialized");
 
-               private long currentWatermark;
+               bucketStates.clear();
+               maxPartCountersState.clear();
 
-               private long currentProcessingTime;
+               buckets.snapshotState(
+                               context.getCheckpointId(),
+                               context.getCheckpointTimestamp(),
+                               bucketStates,
+                               maxPartCountersState);
+       }
 
-               void update(@Nullable Long elementTimestamp, long 
currentWatermark, long currentProcessingTime) {
-                       this.elementTimestamp = elementTimestamp;
-                       this.currentWatermark = currentWatermark;
-                       this.currentProcessingTime = currentProcessingTime;
-               }
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               this.processingTimeService = ((StreamingRuntimeContext) 
getRuntimeContext()).getProcessingTimeService();
+               long currentProcessingTime = 
processingTimeService.getCurrentProcessingTime();
+               processingTimeService.registerTimer(currentProcessingTime + 
bucketCheckInterval, this);
+       }
 
-               @Override
-               public long currentProcessingTime() {
-                       return currentProcessingTime;
-               }
+       @Override
+       public void onProcessingTime(long timestamp) throws Exception {
+               final long currentTime = 
processingTimeService.getCurrentProcessingTime();
+               buckets.onProcessingTime(currentTime);
+               processingTimeService.registerTimer(currentTime + 
bucketCheckInterval, this);
+       }
 
-               @Override
-               public long currentWatermark() {
-                       return currentWatermark;
-               }
+       @Override
+       public void invoke(IN value, SinkFunction.Context context) throws 
Exception {
+               buckets.onElement(value, context);
+       }
 
-               @Override
-               @Nullable
-               public Long timestamp() {
-                       return elementTimestamp;
-               }
+       @Override
+       public void close() throws Exception {
+               buckets.close();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
index 5ffe152..d7b2013 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
@@ -19,22 +19,29 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 /**
  * A {@link Bucketer} that does not perform any
  * bucketing of files. All files are written to the base path.
  */
 @PublicEvolving
-public class BasePathBucketer<T> implements Bucketer<T> {
+public class BasePathBucketer<T> implements Bucketer<T, String> {
 
        private static final long serialVersionUID = -6033643155550226022L;
 
        @Override
-       public String getBucketId(T element, Context context) {
+       public String getBucketId(T element, Bucketer.Context context) {
                return "";
        }
 
        @Override
+       public SimpleVersionedSerializer<String> getSerializer() {
+               // in the future this could be optimized as it is the empty 
string.
+               return SimpleVersionedStringSerializer.INSTANCE;
+       }
+
+       @Override
        public String toString() {
                return "BasePathBucketer";
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java
index 5c30927..503e361 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java
@@ -19,6 +19,8 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 
 import javax.annotation.Nullable;
@@ -33,9 +35,16 @@ import java.io.Serializable;
  * <p>The {@code StreamingFileSink} can be writing to many buckets at a time, 
and it is responsible for managing
  * a set of active buckets. Whenever a new element arrives it will ask the 
{@code Bucketer} for the bucket the
  * element should fall in. The {@code Bucketer} can, for example, determine 
buckets based on system time.
+ *
+ * @param <IN> The type of input elements.
+ * @param <BucketID> The type of the object returned by the {@link 
#getBucketId(Object, Bucketer.Context)}. This has to have
+ *                  a correct {@link #hashCode()} and {@link #equals(Object)} 
method. In addition, the {@link Path}
+ *                  to the created bucket will be the result of the {@link 
#toString()} of this method, appended to
+ *                  the {@code basePath} specified in the
+ *                  {@link 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink 
StreamingFileSink}
  */
 @PublicEvolving
-public interface Bucketer<T> extends Serializable {
+public interface Bucketer<IN, BucketID> extends Serializable {
 
        /**
         * Returns the identifier of the bucket the provided element should be 
put into.
@@ -48,13 +57,20 @@ public interface Bucketer<T> extends Serializable {
         * and the {@code base path} provided during the initialization of the
         * {@link 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink 
sink}.
         */
-       String getBucketId(T element, Context context);
+       BucketID getBucketId(IN element, Bucketer.Context context);
+
+       /**
+        * @return A {@link SimpleVersionedSerializer} capable of 
serializing/deserializing the elements
+        * of type {@code BucketID}. That is the type of the objects returned 
by the
+        * {@link #getBucketId(Object, Bucketer.Context)}.
+        */
+       SimpleVersionedSerializer<BucketID> getSerializer();
 
        /**
         * Context that the {@link Bucketer} can use for getting additional 
data about
         * an input record.
         *
-        * <p>The context is only valid for the duration of a {@link 
Bucketer#getBucketId(Object, Context)} call.
+        * <p>The context is only valid for the duration of a {@link 
Bucketer#getBucketId(Object, Bucketer.Context)} call.
         * Do not store the context and use afterwards!
         */
        @PublicEvolving

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
index 515468c..eed0b79 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.text.SimpleDateFormat;
 import java.util.Date;
@@ -50,9 +51,9 @@ import java.util.Date;
  *
  */
 @PublicEvolving
-public class DateTimeBucketer<T> implements Bucketer<T> {
+public class DateTimeBucketer<IN> implements Bucketer<IN, String> {
 
-       private static final long serialVersionUID = 3284420879277893804L;
+       private static final long serialVersionUID = 1L;
 
        private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
 
@@ -78,7 +79,7 @@ public class DateTimeBucketer<T> implements Bucketer<T> {
        }
 
        @Override
-       public String getBucketId(T element, Context context) {
+       public String getBucketId(IN element, Bucketer.Context context) {
                if (dateFormatter == null) {
                        dateFormatter = new SimpleDateFormat(formatString);
                }
@@ -86,6 +87,11 @@ public class DateTimeBucketer<T> implements Bucketer<T> {
        }
 
        @Override
+       public SimpleVersionedSerializer<String> getSerializer() {
+               return SimpleVersionedStringSerializer.INSTANCE;
+       }
+
+       @Override
        public String toString() {
                return "DateTimeBucketer{" +
                                "formatString='" + formatString + '\'' +

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java
new file mode 100644
index 0000000..d025af9
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * A {@link SimpleVersionedSerializer} implementation for Strings.
+ */
+public final class SimpleVersionedStringSerializer implements 
SimpleVersionedSerializer<String> {
+
+       private static final Charset CHARSET = StandardCharsets.UTF_8;
+
+       public static final SimpleVersionedStringSerializer INSTANCE = new 
SimpleVersionedStringSerializer();
+
+       @Override
+       public int getVersion() {
+               return 1;
+       }
+
+       @Override
+       public byte[] serialize(String value) {
+               final byte[] serialized = 
value.getBytes(StandardCharsets.UTF_8);
+               final byte[] targetBytes = new byte[Integer.BYTES + 
serialized.length];
+
+               final ByteBuffer bb = 
ByteBuffer.wrap(targetBytes).order(ByteOrder.LITTLE_ENDIAN);
+               bb.putInt(serialized.length);
+               bb.put(serialized);
+               return targetBytes;
+       }
+
+       @Override
+       public String deserialize(int version, byte[] serialized) throws 
IOException {
+               switch (version) {
+                       case 1:
+                               return deserializeV1(serialized);
+                       default:
+                               throw new IOException("Unrecognized version or 
corrupt state: " + version);
+               }
+       }
+
+       private static String deserializeV1(byte[] serialized) {
+               final ByteBuffer bb = 
ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
+               final byte[] targetStringBytes = new byte[bb.getInt()];
+               bb.get(targetStringBytes);
+               return new String(targetStringBytes, CHARSET);
+       }
+
+       /**
+        * Private constructor to prevent instantiation.
+        * Access the serializer through the {@link #INSTANCE}.
+        */
+       private SimpleVersionedStringSerializer() {}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java
new file mode 100644
index 0000000..a9ff617
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * The default implementation of the {@link RollingPolicy}.
+ *
+ * <p>This policy rolls a part file if:
+ * <ol>
+ *     <li>there is no open part file,</li>
+ *        <li>the current file has reached the maximum bucket size (by default 
128MB),</li>
+ *        <li>the current file is older than the roll over interval (by 
default 60 sec), or</li>
+ *        <li>the current file has not been written to for more than the 
allowed inactivityTime (by default 60 sec).</li>
+ * </ol>
+ */
+@PublicEvolving
+public final class DefaultRollingPolicy<BucketID> implements 
RollingPolicy<BucketID> {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final long DEFAULT_INACTIVITY_INTERVAL = 60L * 1000L;
+
+       private static final long DEFAULT_ROLLOVER_INTERVAL = 60L * 1000L;
+
+       private static final long DEFAULT_MAX_PART_SIZE = 1024L * 1024L * 128L;
+
+       private final long partSize;
+
+       private final long rolloverInterval;
+
+       private final long inactivityInterval;
+
+       /**
+        * Private constructor to avoid direct instantiation.
+        */
+       private DefaultRollingPolicy(long partSize, long rolloverInterval, long 
inactivityInterval) {
+               Preconditions.checkArgument(partSize > 0L);
+               Preconditions.checkArgument(rolloverInterval > 0L);
+               Preconditions.checkArgument(inactivityInterval > 0L);
+
+               this.partSize = partSize;
+               this.rolloverInterval = rolloverInterval;
+               this.inactivityInterval = inactivityInterval;
+       }
+
+       @Override
+       public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> 
partFileState) {
+               return false;
+       }
+
+       @Override
+       public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState) 
throws IOException {
+               if (partFileState == null) {
+                       // this means that there is no currently open part file.
+                       return true;
+               }
+
+               return partFileState.getSize() > partSize;
+       }
+
+       @Override
+       public boolean shouldRollOnProcessingTime(final PartFileInfo<BucketID> 
partFileState, final long currentTime) {
+               if (partFileState == null) {
+                       // this means that there is no currently open part file.
+                       return true;
+               }
+
+               if (currentTime - partFileState.getCreationTime() > 
rolloverInterval) {
+                       return true;
+               }
+
+               return currentTime - partFileState.getLastUpdateTime() > 
inactivityInterval;
+       }
+
+       /**
+        * Initiates the instantiation of a {@code DefaultRollingPolicy}.
+        * To finalize it and have the actual policy, call {@code .create()}.
+        */
+       public static DefaultRollingPolicy.PolicyBuilder create() {
+               return new DefaultRollingPolicy.PolicyBuilder();
+       }
+
+       /**
+        * A helper class that holds the configuration properties for the 
{@link DefaultRollingPolicy}.
+        */
+       @PublicEvolving
+       public static final class PolicyBuilder {
+
+               private long partSize = DEFAULT_MAX_PART_SIZE;
+
+               private long rolloverInterval = DEFAULT_ROLLOVER_INTERVAL;
+
+               private long inactivityInterval = DEFAULT_INACTIVITY_INTERVAL;
+
+               private PolicyBuilder() {}
+
+               /**
+                * Sets the part size above which a part file will have to roll.
+                * @param size the allowed part size.
+                */
+               public DefaultRollingPolicy.PolicyBuilder withMaxPartSize(long 
size) {
+                       Preconditions.checkState(size > 0L);
+                       this.partSize = size;
+                       return this;
+               }
+
+               /**
+                * Sets the interval of allowed inactivity after which a part 
file will have to roll.
+                * @param interval the allowed inactivity interval.
+                */
+               public DefaultRollingPolicy.PolicyBuilder 
withInactivityInterval(long interval) {
+                       Preconditions.checkState(interval > 0L);
+                       this.inactivityInterval = interval;
+                       return this;
+               }
+
+               /**
+                * Sets the max time a part file can stay open before having to 
roll.
+                * @param interval the desired rollover interval.
+                */
+               public DefaultRollingPolicy.PolicyBuilder 
withRolloverInterval(long interval) {
+                       Preconditions.checkState(interval > 0L);
+                       this.rolloverInterval = interval;
+                       return this;
+               }
+
+               /**
+                * Creates the actual policy.
+                */
+               public <BucketID> DefaultRollingPolicy<BucketID> build() {
+                       return new DefaultRollingPolicy<>(partSize, 
rolloverInterval, inactivityInterval);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java
new file mode 100644
index 0000000..4361941
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies;
+
+import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+
+/**
+ * A {@link RollingPolicy} which rolls on every checkpoint.
+ */
+public class OnCheckpointRollingPolicy<BucketID> implements 
RollingPolicy<BucketID> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> 
partFileState) {
+               return true;
+       }
+
+       @Override
+       public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState) {
+               return false;
+       }
+
+       @Override
+       public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> 
partFileState, long currentTime) {
+               return false;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
index 353ac00..3d5be63 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer;
 
 import org.junit.Assert;
 import org.junit.ClassRule;
@@ -60,20 +61,21 @@ public class BucketStateSerializerTest {
 
                final Path testBucket = new Path(testFolder.getPath(), "test");
 
-               final BucketState bucketState = new BucketState(
+               final BucketState<String> bucketState = new BucketState<>(
                                "test", testBucket, Long.MAX_VALUE, null, new 
HashMap<>());
 
-               final SimpleVersionedSerializer<BucketState> serializer =
-                               new BucketStateSerializer(
+               final SimpleVersionedSerializer<BucketState<String>> serializer 
=
+                               new BucketStateSerializer<>(
                                                
writer.getResumeRecoverableSerializer(),
-                                               
writer.getCommitRecoverableSerializer()
+                                               
writer.getCommitRecoverableSerializer(),
+                                               
SimpleVersionedStringSerializer.INSTANCE
                                );
 
                byte[] bytes = 
SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
-               final BucketState recoveredState =  
SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+               final BucketState<String> recoveredState =  
SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
 
                Assert.assertEquals(testBucket, recoveredState.getBucketPath());
-               Assert.assertNull(recoveredState.getCurrentInProgress());
+               Assert.assertNull(recoveredState.getInProgress());
                
Assert.assertTrue(recoveredState.getPendingPerCheckpoint().isEmpty());
        }
 
@@ -90,13 +92,14 @@ public class BucketStateSerializerTest {
 
                final RecoverableWriter.ResumeRecoverable current = 
stream.persist();
 
-               final BucketState bucketState = new BucketState(
+               final BucketState<String> bucketState = new BucketState<>(
                                "test", testBucket, Long.MAX_VALUE, current, 
new HashMap<>());
 
-               final SimpleVersionedSerializer<BucketState> serializer =
-                               new BucketStateSerializer(
+               final SimpleVersionedSerializer<BucketState<String>> serializer 
=
+                               new BucketStateSerializer<>(
                                                
writer.getResumeRecoverableSerializer(),
-                                               
writer.getCommitRecoverableSerializer()
+                                               
writer.getCommitRecoverableSerializer(),
+                                               
SimpleVersionedStringSerializer.INSTANCE
                                );
 
                final byte[] bytes = 
SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
@@ -104,7 +107,7 @@ public class BucketStateSerializerTest {
                // to simulate that everything is over for file.
                stream.close();
 
-               final BucketState recoveredState =  
SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+               final BucketState<String> recoveredState =  
SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
 
                Assert.assertEquals(testBucket, recoveredState.getBucketPath());
 
@@ -147,18 +150,19 @@ public class BucketStateSerializerTest {
 
                final RecoverableWriter.ResumeRecoverable current = 
stream.persist();
 
-               final BucketState bucketState = new BucketState(
+               final BucketState<String> bucketState = new BucketState<>(
                                "test-2", bucketPath, Long.MAX_VALUE, current, 
commitRecoverables);
-               final SimpleVersionedSerializer<BucketState> serializer =
-                               new BucketStateSerializer(
+               final SimpleVersionedSerializer<BucketState<String>> serializer 
=
+                               new BucketStateSerializer<>(
                                                
writer.getResumeRecoverableSerializer(),
-                                               
writer.getCommitRecoverableSerializer()
+                                               
writer.getCommitRecoverableSerializer(),
+                                               
SimpleVersionedStringSerializer.INSTANCE
                                );
                stream.close();
 
                byte[] bytes = 
SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
 
-               final BucketState recoveredState =  
SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+               final BucketState<String> recoveredState =  
SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
 
                Assert.assertEquals(bucketPath, recoveredState.getBucketPath());
 
@@ -220,20 +224,21 @@ public class BucketStateSerializerTest {
 
                final RecoverableWriter.ResumeRecoverable current = null;
 
-               final BucketState bucketState = new BucketState(
+               final BucketState<String> bucketState = new BucketState<>(
                                "", bucketPath, Long.MAX_VALUE, current, 
commitRecoverables);
 
-               final SimpleVersionedSerializer<BucketState> serializer = new 
BucketStateSerializer(
+               final SimpleVersionedSerializer<BucketState<String>> serializer 
= new BucketStateSerializer<>(
                                writer.getResumeRecoverableSerializer(),
-                               writer.getCommitRecoverableSerializer()
+                               writer.getCommitRecoverableSerializer(),
+                               SimpleVersionedStringSerializer.INSTANCE
                );
 
                byte[] bytes = 
SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
 
-               final BucketState recoveredState =  
SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+               final BucketState<String> recoveredState =  
SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
 
                Assert.assertEquals(bucketPath, recoveredState.getBucketPath());
-               Assert.assertNull(recoveredState.getCurrentInProgress());
+               Assert.assertNull(recoveredState.getInProgress());
 
                final Map<Long, List<RecoverableWriter.CommitRecoverable>> 
recoveredRecoverables = recoveredState.getPendingPerCheckpoint();
                Assert.assertEquals(5L, recoveredRecoverables.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
new file mode 100644
index 0000000..042ba4e
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer;
+
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+/**
+ * Tests for {@link Buckets}.
+ */
+public class BucketsTest {
+
+       @ClassRule
+       public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+       @Test
+       public void testContextPassingNormalExecution() throws Exception {
+               testCorrectPassingOfContext(1L, 2L, 3L);
+       }
+
+       @Test
+       public void testContextPassingNullTimestamp() throws Exception {
+               testCorrectPassingOfContext(null, 2L, 3L);
+       }
+
+       private void testCorrectPassingOfContext(Long timestamp, long 
watermark, long processingTime) throws Exception {
+               final File outDir = TEMP_FOLDER.newFolder();
+
+               final Long expectedTimestamp = timestamp;
+               final long expectedWatermark = watermark;
+               final long expectedProcessingTime = processingTime;
+
+               final Buckets<String, String> buckets = StreamingFileSink
+                               .<String>forRowFormat(new Path(outDir.toURI()), 
new SimpleStringEncoder<>())
+                               .withBucketer(new 
VarifyingBucketer(expectedTimestamp, expectedWatermark, expectedProcessingTime))
+                               .createBuckets(2);
+
+               buckets.onElement("TEST", new SinkFunction.Context() {
+                       @Override
+                       public long currentProcessingTime() {
+                               return expectedProcessingTime;
+                       }
+
+                       @Override
+                       public long currentWatermark() {
+                               return expectedWatermark;
+                       }
+
+                       @Override
+                       public Long timestamp() {
+                               return expectedTimestamp;
+                       }
+               });
+       }
+
+       private static class VarifyingBucketer implements Bucketer<String, 
String> {
+
+               private static final long serialVersionUID = 
7729086510972377578L;
+
+               private final Long expectedTimestamp;
+               private final long expectedWatermark;
+               private final long expectedProcessingTime;
+
+               VarifyingBucketer(
+                               final Long expectedTimestamp,
+                               final long expectedWatermark,
+                               final long expectedProcessingTime
+               ) {
+                       this.expectedTimestamp = expectedTimestamp;
+                       this.expectedWatermark = expectedWatermark;
+                       this.expectedProcessingTime = expectedProcessingTime;
+               }
+
+               @Override
+               public String getBucketId(String element, Context context) {
+                       final Long elementTimestamp = context.timestamp();
+                       final long watermark = context.currentWatermark();
+                       final long processingTime = 
context.currentProcessingTime();
+
+                       Assert.assertEquals(expectedTimestamp, 
elementTimestamp);
+                       Assert.assertEquals(expectedProcessingTime, 
processingTime);
+                       Assert.assertEquals(expectedWatermark, watermark);
+
+                       return element;
+               }
+
+               @Override
+               public SimpleVersionedSerializer<String> getSerializer() {
+                       return SimpleVersionedStringSerializer.INSTANCE;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
new file mode 100644
index 0000000..7b6b82c
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/**
+ * Tests for the {@link StreamingFileSink} with {@link BulkWriter}.
+ */
+public class BulkWriterTest extends TestLogger {
+
+       @ClassRule
+       public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+       @Test
+       public void testCustomBulkWriter() throws Exception {
+               final File outDir = TEMP_FOLDER.newFolder();
+
+               // we set the max bucket size to small so that we can know when 
it rolls
+               try (
+                               
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness =
+                                               
TestUtils.createTestSinkWithBulkEncoder(
+                                                               outDir,
+                                                               1,
+                                                               0,
+                                                               10L,
+                                                               new 
TestUtils.TupleToStringBucketer(),
+                                                               new 
TestBulkWriterFactory(),
+                                                               new 
DefaultBucketFactory<>())
+               ) {
+
+                       testHarness.setup();
+                       testHarness.open();
+
+                       // this creates a new bucket "test1" and part-0-0
+                       testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 1), 1L));
+                       TestUtils.checkLocalFs(outDir, 1, 0);
+
+                       // we take a checkpoint so we roll.
+                       testHarness.snapshot(1L, 1L);
+
+                       // these will close part-0-0 and open part-0-1
+                       testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 2), 2L));
+                       testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 3), 3L));
+
+                       // we take a checkpoint so we roll again.
+                       testHarness.snapshot(2L, 2L);
+
+                       TestUtils.checkLocalFs(outDir, 2, 0);
+
+                       Map<File, String> contents = 
TestUtils.getFileContentByPath(outDir);
+                       int fileCounter = 0;
+                       for (Map.Entry<File, String> fileContents : 
contents.entrySet()) {
+                               if 
(fileContents.getKey().getName().contains(".part-0-0.inprogress")) {
+                                       fileCounter++;
+                                       Assert.assertEquals("test1@1\n", 
fileContents.getValue());
+                               } else if 
(fileContents.getKey().getName().contains(".part-0-1.inprogress")) {
+                                       fileCounter++;
+                                       
Assert.assertEquals("test1@2\ntest1@3\n", fileContents.getValue());
+                               }
+                       }
+                       Assert.assertEquals(2L, fileCounter);
+
+                       // we acknowledge the latest checkpoint, so everything 
should be published.
+                       testHarness.notifyOfCompletedCheckpoint(2L);
+                       TestUtils.checkLocalFs(outDir, 0, 2);
+               }
+       }
+
+       /**
+        * A {@link BulkWriter} used for the tests.
+        */
+       private static class TestBulkWriter implements 
BulkWriter<Tuple2<String, Integer>> {
+
+               private static final Charset CHARSET = StandardCharsets.UTF_8;
+
+               private final FSDataOutputStream stream;
+
+               TestBulkWriter(final FSDataOutputStream stream) {
+                       this.stream = Preconditions.checkNotNull(stream);
+               }
+
+               @Override
+               public void addElement(Tuple2<String, Integer> element) throws 
IOException {
+                       stream.write((element.f0 + '@' + element.f1 + 
'\n').getBytes(CHARSET));
+               }
+
+               @Override
+               public void flush() throws IOException {
+                       stream.flush();
+               }
+
+               @Override
+               public void finish() throws IOException {
+                       flush();
+               }
+       }
+
+       /**
+        * A {@link BulkWriter.Factory} used for the tests.
+        */
+       private static class TestBulkWriterFactory implements 
BulkWriter.Factory<Tuple2<String, Integer>> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public BulkWriter<Tuple2<String, Integer>> 
create(FSDataOutputStream out) {
+                       return new TestBulkWriter(out);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
index b6f73ac..6e942e9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
@@ -18,20 +18,17 @@
 
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
-import org.apache.flink.api.common.serialization.Encoder;
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
-import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies.DefaultRollingPolicy;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.TestLogger;
 
-import org.apache.commons.io.FileUtils;
 import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -39,9 +36,6 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -56,12 +50,13 @@ public class LocalStreamingFileSinkTest extends TestLogger {
        public void testClosingWithoutInput() throws Exception {
                final File outDir = TEMP_FOLDER.newFolder();
 
-               OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Object> testHarness =
-                               createRescalingTestSink(outDir, 1, 0, 100L, 
124L);
-               testHarness.setup();
-               testHarness.open();
-
-               testHarness.close();
+               try (
+                       OneInputStreamOperatorTestHarness<Tuple2<String, 
Integer>, Object> testHarness =
+                                       
TestUtils.createRescalingTestSink(outDir, 1, 0, 100L, 124L);
+               ) {
+                       testHarness.setup();
+                       testHarness.open();
+               }
        }
 
        @Test
@@ -70,7 +65,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
                OperatorSubtaskState snapshot;
 
                // we set the max bucket size to small so that we can know when 
it rolls
-               try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Object> testHarness = createRescalingTestSink(
+               try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Object> testHarness = TestUtils.createRescalingTestSink(
                                outDir, 1, 0, 100L, 10L)) {
 
                        testHarness.setup();
@@ -78,7 +73,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
 
                        // this creates a new bucket "test1" and part-0-0
                        testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 1), 1L));
-                       checkLocalFs(outDir, 1, 0);
+                       TestUtils.checkLocalFs(outDir, 1, 0);
 
                        // we take a checkpoint so that we keep the in-progress 
file offset.
                        snapshot = testHarness.snapshot(1L, 1L);
@@ -87,9 +82,9 @@ public class LocalStreamingFileSinkTest extends TestLogger {
                        testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 2), 2L));
                        testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 3), 3L));
 
-                       checkLocalFs(outDir, 2, 0);
+                       TestUtils.checkLocalFs(outDir, 2, 0);
 
-                       Map<File, String> contents = 
getFileContentByPath(outDir);
+                       Map<File, String> contents = 
TestUtils.getFileContentByPath(outDir);
                        int fileCounter = 0;
                        for (Map.Entry<File, String> fileContents : 
contents.entrySet()) {
                                if 
(fileContents.getKey().getName().contains(".part-0-0.inprogress")) {
@@ -103,7 +98,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
                        Assert.assertEquals(2L, fileCounter);
                }
 
-               try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Object> testHarness = createRescalingTestSink(
+               try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Object> testHarness = TestUtils.createRescalingTestSink(
                                outDir, 1, 0, 100L, 10L)) {
 
                        testHarness.setup();
@@ -111,11 +106,11 @@ public class LocalStreamingFileSinkTest extends 
TestLogger {
                        testHarness.open();
 
                        // the in-progress is the not cleaned up one and the 
pending is truncated and finalized
-                       checkLocalFs(outDir, 2, 0);
+                       TestUtils.checkLocalFs(outDir, 2, 0);
 
                        // now we go back to the first checkpoint so it should 
truncate part-0-0 and restart part-0-1
                        int fileCounter = 0;
-                       for (Map.Entry<File, String> fileContents : 
getFileContentByPath(outDir).entrySet()) {
+                       for (Map.Entry<File, String> fileContents : 
TestUtils.getFileContentByPath(outDir).entrySet()) {
                                if 
(fileContents.getKey().getName().contains(".part-0-0.inprogress")) {
                                        // truncated
                                        fileCounter++;
@@ -132,7 +127,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
                        testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 4), 4L));
 
                        fileCounter = 0;
-                       for (Map.Entry<File, String> fileContents : 
getFileContentByPath(outDir).entrySet()) {
+                       for (Map.Entry<File, String> fileContents : 
TestUtils.getFileContentByPath(outDir).entrySet()) {
                                if 
(fileContents.getKey().getName().contains(".part-0-0.inprogress")) {
                                        fileCounter++;
                                        
Assert.assertEquals("test1@1\ntest1@4\n", fileContents.getValue());
@@ -145,16 +140,16 @@ public class LocalStreamingFileSinkTest extends 
TestLogger {
                        Assert.assertEquals(2L, fileCounter);
 
                        testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 5), 5L));
-                       checkLocalFs(outDir, 3, 0); // the previous part-0-1 in 
progress is simply ignored (random extension)
+                       TestUtils.checkLocalFs(outDir, 3, 0); // the previous 
part-0-1 in progress is simply ignored (random extension)
 
                        testHarness.snapshot(2L, 2L);
 
                        // this will close the new part-0-1
                        testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 6), 6L));
-                       checkLocalFs(outDir, 3, 0);
+                       TestUtils.checkLocalFs(outDir, 3, 0);
 
                        fileCounter = 0;
-                       for (Map.Entry<File, String> fileContents : 
getFileContentByPath(outDir).entrySet()) {
+                       for (Map.Entry<File, String> fileContents : 
TestUtils.getFileContentByPath(outDir).entrySet()) {
                                if 
(fileContents.getKey().getName().contains(".part-0-0.inprogress")) {
                                        fileCounter++;
                                        
Assert.assertEquals("test1@1\ntest1@4\n", fileContents.getValue());
@@ -169,10 +164,10 @@ public class LocalStreamingFileSinkTest extends 
TestLogger {
 
                        // this will publish part-0-0
                        testHarness.notifyOfCompletedCheckpoint(2L);
-                       checkLocalFs(outDir, 2, 1);
+                       TestUtils.checkLocalFs(outDir, 2, 1);
 
                        fileCounter = 0;
-                       for (Map.Entry<File, String> fileContents : 
getFileContentByPath(outDir).entrySet()) {
+                       for (Map.Entry<File, String> fileContents : 
TestUtils.getFileContentByPath(outDir).entrySet()) {
                                if 
(fileContents.getKey().getName().equals("part-0-0")) {
                                        fileCounter++;
                                        
Assert.assertEquals("test1@1\ntest1@4\n", fileContents.getValue());
@@ -192,7 +187,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
                final File outDir = TEMP_FOLDER.newFolder();
 
                // we set the max bucket size to small so that we can know when 
it rolls
-               try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Object> testHarness = createRescalingTestSink(
+               try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Object> testHarness = TestUtils.createRescalingTestSink(
                                outDir, 1, 0, 100L, 10L)) {
 
                        testHarness.setup();
@@ -203,11 +198,11 @@ public class LocalStreamingFileSinkTest extends 
TestLogger {
                        // these 2 create a new bucket "test1", with a 
.part-0-0.inprogress and also fill it
                        testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 1), 1L));
                        testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 2), 2L));
-                       checkLocalFs(outDir, 1, 0);
+                       TestUtils.checkLocalFs(outDir, 1, 0);
 
                        // this will open .part-0-1.inprogress
                        testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 3), 3L));
-                       checkLocalFs(outDir, 2, 0);
+                       TestUtils.checkLocalFs(outDir, 2, 0);
 
                        // we take a checkpoint so that we keep the in-progress 
file offset.
                        testHarness.snapshot(1L, 1L);
@@ -218,13 +213,13 @@ public class LocalStreamingFileSinkTest extends 
TestLogger {
                        // and open and fill .part-0-2.inprogress
                        testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 5), 5L));
                        testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 6), 6L));
-                       checkLocalFs(outDir, 3, 0);                    // 
nothing committed yet
+                       TestUtils.checkLocalFs(outDir, 3, 0);                   
 // nothing committed yet
 
                        testHarness.snapshot(2L, 2L);
 
                        // open .part-0-3.inprogress
                        testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 7), 7L));
-                       checkLocalFs(outDir, 4, 0);
+                       TestUtils.checkLocalFs(outDir, 4, 0);
 
                        // this will close the part file (time)
                        testHarness.setProcessingTime(101L);
@@ -232,10 +227,10 @@ public class LocalStreamingFileSinkTest extends 
TestLogger {
                        testHarness.snapshot(3L, 3L);
 
                        testHarness.notifyOfCompletedCheckpoint(1L);            
                                        // the pending for checkpoint 1 are 
committed
-                       checkLocalFs(outDir, 3, 1);
+                       TestUtils.checkLocalFs(outDir, 3, 1);
 
                        int fileCounter = 0;
-                       for (Map.Entry<File, String> fileContents : 
getFileContentByPath(outDir).entrySet()) {
+                       for (Map.Entry<File, String> fileContents : 
TestUtils.getFileContentByPath(outDir).entrySet()) {
                                if 
(fileContents.getKey().getName().equals("part-0-0")) {
                                        fileCounter++;
                                        
Assert.assertEquals("test1@1\ntest1@2\n", fileContents.getValue());
@@ -253,10 +248,10 @@ public class LocalStreamingFileSinkTest extends 
TestLogger {
                        Assert.assertEquals(4L, fileCounter);
 
                        testHarness.notifyOfCompletedCheckpoint(3L);            
                                        // all the pending for checkpoint 2 and 
3 are committed
-                       checkLocalFs(outDir, 0, 4);
+                       TestUtils.checkLocalFs(outDir, 0, 4);
 
                        fileCounter = 0;
-                       for (Map.Entry<File, String> fileContents : 
getFileContentByPath(outDir).entrySet()) {
+                       for (Map.Entry<File, String> fileContents : 
TestUtils.getFileContentByPath(outDir).entrySet()) {
                                if 
(fileContents.getKey().getName().equals("part-0-0")) {
                                        fileCounter++;
                                        
Assert.assertEquals("test1@1\ntest1@2\n", fileContents.getValue());
@@ -280,7 +275,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
                final File outDir = TEMP_FOLDER.newFolder();
 
                // we set a big bucket size so that it does not close by size, 
but by timers.
-               try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Object> testHarness = createRescalingTestSink(
+               try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Object> testHarness = TestUtils.createRescalingTestSink(
                                outDir, 1, 0, 100L, 124L)) {
 
                        testHarness.setup();
@@ -290,10 +285,10 @@ public class LocalStreamingFileSinkTest extends 
TestLogger {
 
                        testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 1), 1L));
                        testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test2", 1), 1L));
-                       checkLocalFs(outDir, 2, 0);
+                       TestUtils.checkLocalFs(outDir, 2, 0);
 
                        int bucketCounter = 0;
-                       for (Map.Entry<File, String> fileContents : 
getFileContentByPath(outDir).entrySet()) {
+                       for (Map.Entry<File, String> fileContents : 
TestUtils.getFileContentByPath(outDir).entrySet()) {
                                if 
(fileContents.getKey().getParentFile().getName().equals("test1")) {
                                        bucketCounter++;
                                } else if 
(fileContents.getKey().getParentFile().getName().equals("test2")) {
@@ -303,10 +298,10 @@ public class LocalStreamingFileSinkTest extends 
TestLogger {
                        Assert.assertEquals(2L, bucketCounter);                 
                // verifies that we have 2 buckets, "test1" and "test2"
 
                        testHarness.setProcessingTime(101L);                    
            // put them in pending
-                       checkLocalFs(outDir, 2, 0);
+                       TestUtils.checkLocalFs(outDir, 2, 0);
 
                        testHarness.snapshot(0L, 0L);                // put 
them in pending for 0
-                       checkLocalFs(outDir, 2, 0);
+                       TestUtils.checkLocalFs(outDir, 2, 0);
 
                        // create another 2 buckets with 1 inprogress file each
                        testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test3", 1), 1L));
@@ -315,13 +310,13 @@ public class LocalStreamingFileSinkTest extends 
TestLogger {
                        testHarness.setProcessingTime(202L);                    
            // put them in pending
 
                        testHarness.snapshot(1L, 0L);                // put 
them in pending for 1
-                       checkLocalFs(outDir, 4, 0);
+                       TestUtils.checkLocalFs(outDir, 4, 0);
 
                        testHarness.notifyOfCompletedCheckpoint(0L);            
// put the pending for 0 to the "committed" state
-                       checkLocalFs(outDir, 2, 2);
+                       TestUtils.checkLocalFs(outDir, 2, 2);
 
                        bucketCounter = 0;
-                       for (Map.Entry<File, String> fileContents : 
getFileContentByPath(outDir).entrySet()) {
+                       for (Map.Entry<File, String> fileContents : 
TestUtils.getFileContentByPath(outDir).entrySet()) {
                                if 
(fileContents.getKey().getParentFile().getName().equals("test1")) {
                                        bucketCounter++;
                                        Assert.assertEquals("part-0-0", 
fileContents.getKey().getName());
@@ -339,10 +334,10 @@ public class LocalStreamingFileSinkTest extends 
TestLogger {
                        Assert.assertEquals(4L, bucketCounter);
 
                        testHarness.notifyOfCompletedCheckpoint(1L);            
// put the pending for 1 to the "committed" state
-                       checkLocalFs(outDir, 0, 4);
+                       TestUtils.checkLocalFs(outDir, 0, 4);
 
                        bucketCounter = 0;
-                       for (Map.Entry<File, String> fileContents : 
getFileContentByPath(outDir).entrySet()) {
+                       for (Map.Entry<File, String> fileContents : 
TestUtils.getFileContentByPath(outDir).entrySet()) {
                                if 
(fileContents.getKey().getParentFile().getName().equals("test1")) {
                                        bucketCounter++;
                                        Assert.assertEquals("test1@1\n", 
fileContents.getValue());
@@ -367,9 +362,10 @@ public class LocalStreamingFileSinkTest extends TestLogger 
{
        public void testClosingOnSnapshot() throws Exception {
                final File outDir = TEMP_FOLDER.newFolder();
 
-               try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Object> testHarness =
-                                       createRescalingTestSink(outDir, 1, 0, 
100L, 2L)) {
-
+               try (
+                               
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness =
+                                               
TestUtils.createRescalingTestSink(outDir, 1, 0, 100L, 2L)
+               ) {
                        testHarness.setup();
                        testHarness.open();
 
@@ -377,29 +373,29 @@ public class LocalStreamingFileSinkTest extends 
TestLogger {
 
                        testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 1), 1L));
                        testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test2", 1), 1L));
-                       checkLocalFs(outDir, 2, 0);
+                       TestUtils.checkLocalFs(outDir, 2, 0);
 
                        // this is to check the inactivity threshold
                        testHarness.setProcessingTime(101L);
-                       checkLocalFs(outDir, 2, 0);
+                       TestUtils.checkLocalFs(outDir, 2, 0);
 
                        testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test3", 1), 1L));
-                       checkLocalFs(outDir, 3, 0);
+                       TestUtils.checkLocalFs(outDir, 3, 0);
 
                        testHarness.snapshot(0L, 1L);
-                       checkLocalFs(outDir, 3, 0);
+                       TestUtils.checkLocalFs(outDir, 3, 0);
 
                        testHarness.notifyOfCompletedCheckpoint(0L);
-                       checkLocalFs(outDir, 0, 3);
+                       TestUtils.checkLocalFs(outDir, 0, 3);
 
                        testHarness.snapshot(1L, 0L);
 
                        testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test4", 10), 10L));
-                       checkLocalFs(outDir, 1, 3);
+                       TestUtils.checkLocalFs(outDir, 1, 3);
                }
 
                // at close it is not moved to final.
-               checkLocalFs(outDir, 1, 3);
+               TestUtils.checkLocalFs(outDir, 1, 3);
        }
 
        @Test
@@ -408,11 +404,11 @@ public class LocalStreamingFileSinkTest extends 
TestLogger {
 
                OperatorSubtaskState mergedSnapshot;
 
-               // we set small file size so that the part file rolls.
+               // we set small file size so that the part file rolls on every 
element.
                try (
-                               
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 
= createRescalingTestSink(
+                               
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 
= TestUtils.createRescalingTestSink(
                                                outDir, 2, 0, 100L, 10L);
-                               
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 
= createRescalingTestSink(
+                               
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 
= TestUtils.createRescalingTestSink(
                                                outDir, 2, 1, 100L, 10L)
                ) {
                        testHarness1.setup();
@@ -422,16 +418,16 @@ public class LocalStreamingFileSinkTest extends 
TestLogger {
                        testHarness2.open();
 
                        testHarness1.processElement(new 
StreamRecord<>(Tuple2.of("test1", 0), 0L));
-                       checkLocalFs(outDir, 1, 0);
+                       TestUtils.checkLocalFs(outDir, 1, 0);
 
                        testHarness2.processElement(new 
StreamRecord<>(Tuple2.of("test1", 1), 1L));
                        testHarness2.processElement(new 
StreamRecord<>(Tuple2.of("test2", 1), 1L));
 
                        // all the files are in-progress
-                       checkLocalFs(outDir, 3, 0);
+                       TestUtils.checkLocalFs(outDir, 3, 0);
 
                        int counter = 0;
-                       for (Map.Entry<File, String> fileContents : 
getFileContentByPath(outDir).entrySet()) {
+                       for (Map.Entry<File, String> fileContents : 
TestUtils.getFileContentByPath(outDir).entrySet()) {
                                final String parentFilename = 
fileContents.getKey().getParentFile().getName();
                                final String inProgressFilename = 
fileContents.getKey().getName();
 
@@ -456,7 +452,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
                }
 
                try (
-                               
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness 
= createRescalingTestSink(
+                               
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness 
= TestUtils.createRescalingTestSink(
                                                outDir, 1, 0, 100L, 10L)
                ) {
                        testHarness.setup();
@@ -464,13 +460,13 @@ public class LocalStreamingFileSinkTest extends 
TestLogger {
                        testHarness.open();
 
                        // still everything in-progress but the in-progress for 
prev task 1 should be put in pending now
-                       checkLocalFs(outDir, 3, 0);
+                       TestUtils.checkLocalFs(outDir, 3, 0);
 
                        testHarness.snapshot(2L, 2L);
                        testHarness.notifyOfCompletedCheckpoint(2L);
 
                        int counter = 0;
-                       for (Map.Entry<File, String> fileContents : 
getFileContentByPath(outDir).entrySet()) {
+                       for (Map.Entry<File, String> fileContents : 
TestUtils.getFileContentByPath(outDir).entrySet()) {
                                final String parentFilename = 
fileContents.getKey().getParentFile().getName();
                                final String filename = 
fileContents.getKey().getName();
 
@@ -498,11 +494,18 @@ public class LocalStreamingFileSinkTest extends 
TestLogger {
                final TestBucketFactory first = new TestBucketFactory();
                final TestBucketFactory second = new TestBucketFactory();
 
+               final RollingPolicy<String> rollingPolicy = DefaultRollingPolicy
+                               .create()
+                               .withMaxPartSize(2L)
+                               .withRolloverInterval(100L)
+                               .withInactivityInterval(100L)
+                               .build();
+
                try (
-                               
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 
= createCustomRescalingTestSink(
-                                               outDir, 2, 0, 100L, 2L, first, 
new SimpleStringEncoder<>());
-                               
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 
= createCustomRescalingTestSink(
-                                               outDir, 2, 1, 100L, 2L, second, 
new SimpleStringEncoder<>())
+                               
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 
= TestUtils.createCustomRescalingTestSink(
+                                               outDir, 2, 0, 10L, new 
TestUtils.TupleToStringBucketer(), new SimpleStringEncoder<>(), rollingPolicy, 
first);
+                               
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 
= TestUtils.createCustomRescalingTestSink(
+                                               outDir, 2, 1, 10L, new 
TestUtils.TupleToStringBucketer(), new SimpleStringEncoder<>(), rollingPolicy, 
second)
                ) {
                        testHarness1.setup();
                        testHarness1.open();
@@ -514,7 +517,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
                        testHarness1.processElement(new 
StreamRecord<>(Tuple2.of("test1", 0), 0L));
                        testHarness1.processElement(new 
StreamRecord<>(Tuple2.of("test1", 0), 0L));
                        testHarness1.processElement(new 
StreamRecord<>(Tuple2.of("test1", 0), 0L));
-                       checkLocalFs(outDir, 3, 0);
+                       TestUtils.checkLocalFs(outDir, 3, 0);
 
                        // intentionally we snapshot them in the reverse order 
so that the states are shuffled
                        mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
@@ -527,10 +530,10 @@ public class LocalStreamingFileSinkTest extends 
TestLogger {
                final TestBucketFactory secondRecovered = new 
TestBucketFactory();
 
                try (
-                               
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 
= createCustomRescalingTestSink(
-                                               outDir, 2, 0, 100L, 2L, 
firstRecovered, new SimpleStringEncoder<>());
-                               
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 
= createCustomRescalingTestSink(
-                                               outDir, 2, 1, 100L, 2L, 
secondRecovered, new SimpleStringEncoder<>())
+                               
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 
= TestUtils.createCustomRescalingTestSink(
+                                               outDir, 2, 0, 10L, new 
TestUtils.TupleToStringBucketer(), new SimpleStringEncoder<>(), rollingPolicy, 
firstRecovered);
+                               
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 
= TestUtils.createCustomRescalingTestSink(
+                                               outDir, 2, 1, 10L, new 
TestUtils.TupleToStringBucketer(), new SimpleStringEncoder<>(), rollingPolicy, 
secondRecovered)
                ) {
                        testHarness1.setup();
                        testHarness1.initializeState(mergedSnapshot);
@@ -540,7 +543,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
                        testHarness1.processElement(new 
StreamRecord<>(Tuple2.of("test4", 0), 0L));
 
                        Assert.assertEquals(3L, 
firstRecovered.getInitialCounter());
-                       checkLocalFs(outDir, 1, 3);
+                       TestUtils.checkLocalFs(outDir, 1, 3);
 
                        testHarness2.setup();
                        testHarness2.initializeState(mergedSnapshot);
@@ -550,78 +553,26 @@ public class LocalStreamingFileSinkTest extends 
TestLogger {
                        testHarness2.processElement(new 
StreamRecord<>(Tuple2.of("test2", 0), 0L));
 
                        Assert.assertEquals(3L, 
secondRecovered.getInitialCounter());
-                       checkLocalFs(outDir, 2, 3);
+                       TestUtils.checkLocalFs(outDir, 2, 3);
                }
        }
 
        //////////////////////                  Helper Methods                  
//////////////////////
 
-       private OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Object> createRescalingTestSink(
-                       File outDir,
-                       int totalParallelism,
-                       int taskIdx,
-                       long inactivityInterval,
-                       long partMaxSize) throws Exception {
-
-               return createCustomRescalingTestSink(
-                               outDir,
-                               totalParallelism,
-                               taskIdx,
-                               inactivityInterval,
-                               partMaxSize,
-                               new DefaultBucketFactory<>(),
-                               (Encoder<Tuple2<String, Integer>>) (element, 
stream) -> {
-                                       stream.write((element.f0 + '@' + 
element.f1).getBytes(StandardCharsets.UTF_8));
-                                       stream.write('\n');
-                               });
-       }
-
-       private OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Object> createCustomRescalingTestSink(
-                       File outDir,
-                       int totalParallelism,
-                       int taskIdx,
-                       long inactivityInterval,
-                       long partMaxSize,
-                       BucketFactory<Tuple2<String, Integer>> factory,
-                       Encoder<Tuple2<String, Integer>> writer) throws 
Exception {
-
-               StreamingFileSink<Tuple2<String, Integer>> sink = new 
StreamingFileSink<>(new Path(outDir.toURI()), factory)
-                               .setBucketer(new Bucketer<Tuple2<String, 
Integer>>() {
-
-                                       private static final long 
serialVersionUID = -3086487303018372007L;
-
-                                       @Override
-                                       public String 
getBucketId(Tuple2<String, Integer> element, Context context) {
-                                               return element.f0;
-                                       }
-                               })
-                               .setEncoder(writer)
-                               .setRollingPolicy(
-                                               DefaultRollingPolicy
-                                                               .create()
-                                                               
.withMaxPartSize(partMaxSize)
-                                                               
.withRolloverInterval(inactivityInterval)
-                                                               
.withInactivityInterval(inactivityInterval)
-                                                               .build())
-                               .setBucketCheckInterval(10L);
-
-               return new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink), 10, totalParallelism, taskIdx);
-       }
-
-       static class TestBucketFactory extends 
DefaultBucketFactory<Tuple2<String, Integer>> {
+       static class TestBucketFactory extends 
DefaultBucketFactory<Tuple2<String, Integer>, String> {
 
                private static final long serialVersionUID = 
2794824980604027930L;
 
                private long initialCounter = -1L;
 
                @Override
-               public Bucket<Tuple2<String, Integer>> getNewBucket(
-                               RecoverableWriter fsWriter,
-                               int subtaskIndex,
-                               String bucketId,
-                               Path bucketPath,
-                               long initialPartCounter,
-                               Encoder<Tuple2<String, Integer>> writer) throws 
IOException {
+               public Bucket<Tuple2<String, Integer>, String> getNewBucket(
+                               final RecoverableWriter fsWriter,
+                               final int subtaskIndex,
+                               final String bucketId,
+                               final Path bucketPath,
+                               final long initialPartCounter,
+                               final 
PartFileWriter.PartFileFactory<Tuple2<String, Integer>, String> 
partFileWriterFactory) {
 
                        this.initialCounter = initialPartCounter;
 
@@ -631,16 +582,16 @@ public class LocalStreamingFileSinkTest extends 
TestLogger {
                                        bucketId,
                                        bucketPath,
                                        initialPartCounter,
-                                       writer);
+                                       partFileWriterFactory);
                }
 
                @Override
-               public Bucket<Tuple2<String, Integer>> restoreBucket(
-                               RecoverableWriter fsWriter,
-                               int subtaskIndex,
-                               long initialPartCounter,
-                               Encoder<Tuple2<String, Integer>> writer,
-                               BucketState bucketState) throws IOException {
+               public Bucket<Tuple2<String, Integer>, String> restoreBucket(
+                               final RecoverableWriter fsWriter,
+                               final int subtaskIndex,
+                               final long initialPartCounter,
+                               final 
PartFileWriter.PartFileFactory<Tuple2<String, Integer>, String> 
partFileWriterFactory,
+                               final BucketState<String> bucketState) throws 
IOException {
 
                        this.initialCounter = initialPartCounter;
 
@@ -648,7 +599,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
                                        fsWriter,
                                        subtaskIndex,
                                        initialPartCounter,
-                                       writer,
+                                       partFileWriterFactory,
                                        bucketState);
                }
 
@@ -656,34 +607,4 @@ public class LocalStreamingFileSinkTest extends TestLogger 
{
                        return initialCounter;
                }
        }
-
-       private static void checkLocalFs(File outDir, int expectedInProgress, 
int expectedCompleted) {
-               int inProgress = 0;
-               int finished = 0;
-
-               for (File file: FileUtils.listFiles(outDir, null, true)) {
-                       if (file.getAbsolutePath().endsWith("crc")) {
-                               continue;
-                       }
-
-                       if 
(file.toPath().getFileName().toString().startsWith(".")) {
-                               inProgress++;
-                       } else {
-                               finished++;
-                       }
-               }
-
-               Assert.assertEquals(expectedInProgress, inProgress);
-               Assert.assertEquals(expectedCompleted, finished);
-       }
-
-       private static Map<File, String> getFileContentByPath(File directory) 
throws IOException {
-               Map<File, String> contents = new HashMap<>(4);
-
-               final Collection<File> filesInBucket = 
FileUtils.listFiles(directory, null, true);
-               for (File file : filesInBucket) {
-                       contents.put(file, FileUtils.readFileToString(file));
-               }
-               return contents;
-       }
 }

Reply via email to