[FLINK-9903] [DataStream API] Refactor StreamingFileSink / add bulk encoders

* Add supports for bulk encoders.
* Expose more options in the rolling policy and
* Allows to return any object as bucket id from the bucketer.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b56c75ca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b56c75ca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b56c75ca

Branch: refs/heads/master
Commit: b56c75ca375049b1d2c80d2d0945ae1ae04eb39e
Parents: d309e61
Author: kkloudas <[email protected]>
Authored: Tue Jul 17 11:52:02 2018 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Fri Jul 20 16:12:29 2018 +0200

----------------------------------------------------------------------
 .../api/common/serialization/BulkWriter.java    |  95 ++++
 .../core/fs/SafetyNetWrapperFileSystem.java     |   5 +
 .../api/functions/sink/filesystem/Bucket.java   |  48 +-
 .../sink/filesystem/BucketFactory.java          |  29 +-
 .../functions/sink/filesystem/BucketState.java  |  12 +-
 .../sink/filesystem/BucketStateSerializer.java  |  39 +-
 .../api/functions/sink/filesystem/Buckets.java  | 339 ++++++++++++++
 .../sink/filesystem/BulkPartWriter.java         | 110 +++++
 .../sink/filesystem/DefaultBucketFactory.java   |  35 +-
 .../sink/filesystem/DefaultRollingPolicy.java   | 142 ------
 .../sink/filesystem/PartFileHandler.java        | 122 -----
 .../functions/sink/filesystem/PartFileInfo.java |   4 +-
 .../sink/filesystem/PartFileWriter.java         | 141 ++++++
 .../sink/filesystem/RollingPolicy.java          |  20 +-
 .../sink/filesystem/RowWisePartWriter.java      |  96 ++++
 .../sink/filesystem/StreamingFileSink.java      | 457 ++++++++-----------
 .../filesystem/bucketers/BasePathBucketer.java  |  11 +-
 .../sink/filesystem/bucketers/Bucketer.java     |  22 +-
 .../filesystem/bucketers/DateTimeBucketer.java  |  12 +-
 .../SimpleVersionedStringSerializer.java        |  77 ++++
 .../rolling/policies/DefaultRollingPolicy.java  | 157 +++++++
 .../policies/OnCheckpointRollingPolicy.java     |  45 ++
 .../filesystem/BucketStateSerializerTest.java   |  47 +-
 .../functions/sink/filesystem/BucketsTest.java  | 119 +++++
 .../sink/filesystem/BulkWriterTest.java         | 144 ++++++
 .../filesystem/LocalStreamingFileSinkTest.java  | 267 ++++-------
 .../sink/filesystem/RollingPolicyTest.java      | 225 +++++++++
 .../functions/sink/filesystem/TestUtils.java    | 164 +++++++
 28 files changed, 2177 insertions(+), 807 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-core/src/main/java/org/apache/flink/api/common/serialization/BulkWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/serialization/BulkWriter.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/BulkWriter.java
new file mode 100644
index 0000000..44f5fbe
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/serialization/BulkWriter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * An encoder that encodes data in a bulk fashion, encoding many records 
together at a time.
+ *
+ * <p>Examples for bulk encoding are most compressed formats, including 
formats like
+ * Parquet and ORC which encode batches of records into blocks of column 
vectors.
+ *
+ * <p>The bulk encoder may be stateful and is bound to a single stream during 
its
+ * lifetime.
+ *
+ * @param <T> The type of the elements encoded through this encoder.
+ */
+@PublicEvolving
+public interface BulkWriter<T> {
+
+       /**
+        * Adds an element to the encoder. The encoder may temporarily buffer 
the element,
+        * or immediately write it to the stream.
+        *
+        * <p>It may be that adding this element fills up an internal buffer 
and causes the
+        * encoding and flushing of a batch of internally buffered elements.
+        *
+        * @param element The element to add.
+        * @throws IOException Thrown, if the element cannot be added to the 
encoder,
+        *                     or if the output stream throws an exception.
+        */
+       void addElement(T element) throws IOException;
+
+       /**
+        * Flushes all intermediate buffered data to the output stream.
+        * It is expected that flushing often may reduce the efficiency of the 
encoding.
+        *
+        * @throws IOException Thrown if the encoder cannot be flushed, or if 
the output
+        *                     stream throws an exception.
+        */
+       void flush() throws IOException;
+
+       /**
+        * Finishes the writing. This must flush all internal buffer, finish 
encoding, and write
+        * footers.
+        *
+        * <p>The writer is not expected to handle any more records via {@link 
#addElement(Object)} after
+        * this method is called.
+        *
+        * <p><b>Important:</b> This method MUST NOT close the stream that the 
writer writes to.
+        * Closing the stream is expected to happen through the invoker of this 
method afterwards.
+        *
+        * @throws IOException Thrown if the finalization fails.
+        */
+       void finish() throws IOException;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A factory that creates a {@link BulkWriter}.
+        * @param <T> The type of record to write.
+        */
+       @FunctionalInterface
+       interface Factory<T> extends Serializable {
+
+               /**
+                * Creates a writer that writes to the given stream.
+                *
+                * @param out The output stream to write the encoded data to.
+                * @throws IOException Thrown if the writer cannot be opened, 
or if the output
+                *                     stream throws an exception.
+                */
+               BulkWriter<T> create(FSDataOutputStream out) throws IOException;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
index 92b3a74c..04e6315 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
@@ -65,6 +65,11 @@ public class SafetyNetWrapperFileSystem extends FileSystem 
implements WrappingPr
        }
 
        @Override
+       public RecoverableWriter createRecoverableWriter() throws IOException {
+               return unsafeFileSystem.createRecoverableWriter();
+       }
+
+       @Override
        public BlockLocation[] getFileBlockLocations(FileStatus file, long 
start, long len) throws IOException {
                return unsafeFileSystem.getFileBlockLocations(file, start, len);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index d9a6d75..3e2d22c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.serialization.Encoder;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.util.Preconditions;
@@ -30,6 +29,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * A bucket is the directory organization of the output of the {@link 
StreamingFileSink}.
@@ -39,17 +39,17 @@ import java.util.Map;
  * queried to see in which bucket this element should be written to.
  */
 @PublicEvolving
-public class Bucket<IN> {
+public class Bucket<IN, BucketID> {
 
        private static final String PART_PREFIX = "part";
 
-       private final String bucketId;
+       private final BucketID bucketId;
 
        private final Path bucketPath;
 
        private final int subtaskIndex;
 
-       private final Encoder<IN> encoder;
+       private final PartFileWriter.PartFileFactory<IN, BucketID> 
partFileFactory;
 
        private final RecoverableWriter fsWriter;
 
@@ -57,7 +57,7 @@ public class Bucket<IN> {
 
        private long partCounter;
 
-       private PartFileHandler<IN> currentPart;
+       private PartFileWriter<IN, BucketID> currentPart;
 
        private List<RecoverableWriter.CommitRecoverable> pending;
 
@@ -68,10 +68,10 @@ public class Bucket<IN> {
                        RecoverableWriter fsWriter,
                        int subtaskIndex,
                        long initialPartCounter,
-                       Encoder<IN> writer,
-                       BucketState bucketstate) throws IOException {
+                       PartFileWriter.PartFileFactory<IN, BucketID> 
partFileFactory,
+                       BucketState<BucketID> bucketState) throws IOException {
 
-               this(fsWriter, subtaskIndex, bucketstate.getBucketId(), 
bucketstate.getBucketPath(), initialPartCounter, writer);
+               this(fsWriter, subtaskIndex, bucketState.getBucketId(), 
bucketState.getBucketPath(), initialPartCounter, partFileFactory);
 
                // the constructor must have already initialized the filesystem 
writer
                Preconditions.checkState(fsWriter != null);
@@ -79,15 +79,15 @@ public class Bucket<IN> {
                // we try to resume the previous in-progress file, if the 
filesystem
                // supports such operation. If not, we just commit the file and 
start fresh.
 
-               final RecoverableWriter.ResumeRecoverable resumable = 
bucketstate.getCurrentInProgress();
+               final RecoverableWriter.ResumeRecoverable resumable = 
bucketState.getInProgress();
                if (resumable != null) {
-                       currentPart = PartFileHandler.resumeFrom(
-                                       bucketId, fsWriter, resumable, 
bucketstate.getCreationTime());
+                       currentPart = partFileFactory.resumeFrom(
+                                       bucketId, fsWriter, resumable, 
bucketState.getCreationTime());
                }
 
                // we commit pending files for previous checkpoints to the last 
successful one
                // (from which we are recovering from)
-               for (List<RecoverableWriter.CommitRecoverable> commitables: 
bucketstate.getPendingPerCheckpoint().values()) {
+               for (List<RecoverableWriter.CommitRecoverable> commitables: 
bucketState.getPendingPerCheckpoint().values()) {
                        for (RecoverableWriter.CommitRecoverable commitable: 
commitables) {
                                
fsWriter.recoverForCommit(commitable).commitAfterRecovery();
                        }
@@ -100,26 +100,26 @@ public class Bucket<IN> {
        public Bucket(
                        RecoverableWriter fsWriter,
                        int subtaskIndex,
-                       String bucketId,
+                       BucketID bucketId,
                        Path bucketPath,
                        long initialPartCounter,
-                       Encoder<IN> writer) {
+                       PartFileWriter.PartFileFactory<IN, BucketID> 
partFileFactory) {
 
                this.fsWriter = Preconditions.checkNotNull(fsWriter);
                this.subtaskIndex = subtaskIndex;
                this.bucketId = Preconditions.checkNotNull(bucketId);
                this.bucketPath = Preconditions.checkNotNull(bucketPath);
                this.partCounter = initialPartCounter;
-               this.encoder = Preconditions.checkNotNull(writer);
+               this.partFileFactory = 
Preconditions.checkNotNull(partFileFactory);
 
                this.pending = new ArrayList<>();
        }
 
-       public PartFileInfo getInProgressPartInfo() {
+       public PartFileInfo<BucketID> getInProgressPartInfo() {
                return currentPart;
        }
 
-       public String getBucketId() {
+       public BucketID getBucketId() {
                return bucketId;
        }
 
@@ -137,18 +137,18 @@ public class Bucket<IN> {
 
        void write(IN element, long currentTime) throws IOException {
                Preconditions.checkState(currentPart != null, "bucket has been 
closed");
-               currentPart.write(element, encoder, currentTime);
+               currentPart.write(element, currentTime);
        }
 
        void rollPartFile(final long currentTime) throws IOException {
                closePartFile();
-               currentPart = PartFileHandler.openNew(bucketId, fsWriter, 
getNewPartPath(), currentTime);
+               currentPart = partFileFactory.openNew(bucketId, fsWriter, 
getNewPartPath(), currentTime);
                partCounter++;
        }
 
-       void merge(final Bucket<IN> bucket) throws IOException {
+       void merge(final Bucket<IN, BucketID> bucket) throws IOException {
                Preconditions.checkNotNull(bucket);
-               
Preconditions.checkState(bucket.getBucketPath().equals(getBucketPath()));
+               Preconditions.checkState(Objects.equals(bucket.getBucketPath(), 
bucketPath));
 
                // there should be no pending files in the "to-merge" states.
                Preconditions.checkState(bucket.pending.isEmpty());
@@ -176,7 +176,7 @@ public class Bucket<IN> {
                }
        }
 
-       public void commitUpToCheckpoint(long checkpointId) throws IOException {
+       public void onCheckpointAcknowledgment(long checkpointId) throws 
IOException {
                Preconditions.checkNotNull(fsWriter);
 
                Iterator<Map.Entry<Long, 
List<RecoverableWriter.CommitRecoverable>>> it =
@@ -193,7 +193,7 @@ public class Bucket<IN> {
                }
        }
 
-       public BucketState snapshot(long checkpointId) throws IOException {
+       public BucketState<BucketID> onCheckpoint(long checkpointId) throws 
IOException {
                RecoverableWriter.ResumeRecoverable resumable = null;
                long creationTime = Long.MAX_VALUE;
 
@@ -206,7 +206,7 @@ public class Bucket<IN> {
                        pendingPerCheckpoint.put(checkpointId, pending);
                        pending = new ArrayList<>();
                }
-               return new BucketState(bucketId, bucketPath, creationTime, 
resumable, pendingPerCheckpoint);
+               return new BucketState<>(bucketId, bucketPath, creationTime, 
resumable, pendingPerCheckpoint);
        }
 
        private Path getNewPartPath() {

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
index 88f3c1a..0c6b587 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.Encoder;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableWriter;
 
@@ -30,20 +29,20 @@ import java.io.Serializable;
  * A factory able to create {@link Bucket buckets} for the {@link 
StreamingFileSink}.
  */
 @Internal
-public interface BucketFactory<IN> extends Serializable {
+interface BucketFactory<IN, BucketID> extends Serializable {
 
-       Bucket<IN> getNewBucket(
-                       RecoverableWriter fsWriter,
-                       int subtaskIndex,
-                       String bucketId,
-                       Path bucketPath,
-                       long initialPartCounter,
-                       Encoder<IN> writer) throws IOException;
+       Bucket<IN, BucketID> getNewBucket(
+                       final RecoverableWriter fsWriter,
+                       final int subtaskIndex,
+                       final BucketID bucketId,
+                       final Path bucketPath,
+                       final long initialPartCounter,
+                       final PartFileWriter.PartFileFactory<IN, BucketID> 
partFileWriterFactory) throws IOException;
 
-       Bucket<IN> restoreBucket(
-                       RecoverableWriter fsWriter,
-                       int subtaskIndex,
-                       long initialPartCounter,
-                       Encoder<IN> writer,
-                       BucketState bucketstate) throws IOException;
+       Bucket<IN, BucketID> restoreBucket(
+                       final RecoverableWriter fsWriter,
+                       final int subtaskIndex,
+                       final long initialPartCounter,
+                       final PartFileWriter.PartFileFactory<IN, BucketID> 
partFileWriterFactory,
+                       final BucketState<BucketID> bucketState) throws 
IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
index 5ebc46c..bb49e3a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
@@ -32,9 +32,9 @@ import java.util.Map;
  * The state of the {@link Bucket} that is to be checkpointed.
  */
 @Internal
-public class BucketState {
+public class BucketState<BucketID> {
 
-       private final String bucketId;
+       private final BucketID bucketId;
 
        /**
         * The base path for the bucket, i.e. the directory where all the part 
files are stored.
@@ -59,10 +59,10 @@ public class BucketState {
        private final Map<Long, List<RecoverableWriter.CommitRecoverable>> 
pendingPerCheckpoint;
 
        public BucketState(
-                       final String bucketId,
+                       final BucketID bucketId,
                        final Path bucketPath,
                        final long creationTime,
-                       final @Nullable RecoverableWriter.ResumeRecoverable 
inProgress,
+                       @Nullable final RecoverableWriter.ResumeRecoverable 
inProgress,
                        final Map<Long, 
List<RecoverableWriter.CommitRecoverable>> pendingPerCheckpoint
        ) {
                this.bucketId = Preconditions.checkNotNull(bucketId);
@@ -72,7 +72,7 @@ public class BucketState {
                this.pendingPerCheckpoint = 
Preconditions.checkNotNull(pendingPerCheckpoint);
        }
 
-       public String getBucketId() {
+       public BucketID getBucketId() {
                return bucketId;
        }
 
@@ -85,7 +85,7 @@ public class BucketState {
        }
 
        @Nullable
-       public RecoverableWriter.ResumeRecoverable getCurrentInProgress() {
+       public RecoverableWriter.ResumeRecoverable getInProgress() {
                return inProgress;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
index a167ec9..cf9b805 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
@@ -42,7 +41,7 @@ import java.util.Map.Entry;
  * A {@code SimpleVersionedSerializer} used to serialize the {@link 
BucketState BucketState}.
  */
 @Internal
-class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> {
+class BucketStateSerializer<BucketID> implements 
SimpleVersionedSerializer<BucketState<BucketID>> {
 
        private static final int MAGIC_NUMBER = 0x1e764b79;
 
@@ -50,12 +49,16 @@ class BucketStateSerializer implements 
SimpleVersionedSerializer<BucketState> {
 
        private final 
SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> 
commitableSerializer;
 
-       public BucketStateSerializer(
-                       final 
SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> 
resumableSerializer,
-                       final 
SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> 
commitableSerializer) {
+       private final SimpleVersionedSerializer<BucketID> bucketIdSerializer;
 
+       BucketStateSerializer(
+                       final 
SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> 
resumableSerializer,
+                       final 
SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> 
commitableSerializer,
+                       final SimpleVersionedSerializer<BucketID> 
bucketIdSerializer
+       ) {
                this.resumableSerializer = 
Preconditions.checkNotNull(resumableSerializer);
                this.commitableSerializer = 
Preconditions.checkNotNull(commitableSerializer);
+               this.bucketIdSerializer = 
Preconditions.checkNotNull(bucketIdSerializer);
        }
 
        @Override
@@ -64,7 +67,7 @@ class BucketStateSerializer implements 
SimpleVersionedSerializer<BucketState> {
        }
 
        @Override
-       public byte[] serialize(BucketState state) throws IOException {
+       public byte[] serialize(BucketState<BucketID> state) throws IOException 
{
                DataOutputSerializer out = new DataOutputSerializer(256);
                out.writeInt(MAGIC_NUMBER);
                serializeV1(state, out);
@@ -72,7 +75,7 @@ class BucketStateSerializer implements 
SimpleVersionedSerializer<BucketState> {
        }
 
        @Override
-       public BucketState deserialize(int version, byte[] serialized) throws 
IOException {
+       public BucketState<BucketID> deserialize(int version, byte[] 
serialized) throws IOException {
                switch (version) {
                        case 1:
                                DataInputDeserializer in = new 
DataInputDeserializer(serialized);
@@ -84,13 +87,13 @@ class BucketStateSerializer implements 
SimpleVersionedSerializer<BucketState> {
        }
 
        @VisibleForTesting
-       void serializeV1(BucketState state, DataOutputView out) throws 
IOException {
-               out.writeUTF(state.getBucketId());
+       void serializeV1(BucketState<BucketID> state, DataOutputView out) 
throws IOException {
+               
SimpleVersionedSerialization.writeVersionAndSerialize(bucketIdSerializer, 
state.getBucketId(), out);
                out.writeUTF(state.getBucketPath().toString());
                out.writeLong(state.getCreationTime());
 
                // put the current open part file
-               final RecoverableWriter.ResumeRecoverable currentPart = 
state.getCurrentInProgress();
+               final RecoverableWriter.ResumeRecoverable currentPart = 
state.getInProgress();
                if (currentPart != null) {
                        out.writeBoolean(true);
                        
SimpleVersionedSerialization.writeVersionAndSerialize(resumableSerializer, 
currentPart, out);
@@ -100,19 +103,19 @@ class BucketStateSerializer implements 
SimpleVersionedSerializer<BucketState> {
                }
 
                // put the map of pending files per checkpoint
-               final Map<Long, List<CommitRecoverable>> pendingCommitters = 
state.getPendingPerCheckpoint();
+               final Map<Long, List<RecoverableWriter.CommitRecoverable>> 
pendingCommitters = state.getPendingPerCheckpoint();
 
                // manually keep the version here to safe some bytes
                out.writeInt(commitableSerializer.getVersion());
 
                out.writeInt(pendingCommitters.size());
-               for (Entry<Long, List<CommitRecoverable>> 
resumablesForCheckpoint : pendingCommitters.entrySet()) {
-                       List<CommitRecoverable> resumables = 
resumablesForCheckpoint.getValue();
+               for (Entry<Long, List<RecoverableWriter.CommitRecoverable>> 
resumablesForCheckpoint : pendingCommitters.entrySet()) {
+                       List<RecoverableWriter.CommitRecoverable> resumables = 
resumablesForCheckpoint.getValue();
 
                        out.writeLong(resumablesForCheckpoint.getKey());
                        out.writeInt(resumables.size());
 
-                       for (CommitRecoverable resumable : resumables) {
+                       for (RecoverableWriter.CommitRecoverable resumable : 
resumables) {
                                byte[] serialized = 
commitableSerializer.serialize(resumable);
                                out.writeInt(serialized.length);
                                out.write(serialized);
@@ -121,8 +124,8 @@ class BucketStateSerializer implements 
SimpleVersionedSerializer<BucketState> {
        }
 
        @VisibleForTesting
-       BucketState deserializeV1(DataInputView in) throws IOException {
-               final String bucketId = in.readUTF();
+       BucketState<BucketID> deserializeV1(DataInputView in) throws 
IOException {
+               final BucketID bucketId = 
SimpleVersionedSerialization.readVersionAndDeSerialize(bucketIdSerializer, in);
                final String bucketPathStr = in.readUTF();
                final long creationTime = in.readLong();
 
@@ -140,7 +143,7 @@ class BucketStateSerializer implements 
SimpleVersionedSerializer<BucketState> {
                        final long checkpointId = in.readLong();
                        final int noOfResumables = in.readInt();
 
-                       final ArrayList<RecoverableWriter.CommitRecoverable> 
resumables = new ArrayList<>(noOfResumables);
+                       final List<RecoverableWriter.CommitRecoverable> 
resumables = new ArrayList<>(noOfResumables);
                        for (int j = 0; j < noOfResumables; j++) {
                                final byte[] bytes = new byte[in.readInt()];
                                in.readFully(bytes);
@@ -149,7 +152,7 @@ class BucketStateSerializer implements 
SimpleVersionedSerializer<BucketState> {
                        resumablesPerCheckpoint.put(checkpointId, resumables);
                }
 
-               return new BucketState(
+               return new BucketState<>(
                                bucketId,
                                new Path(bucketPathStr),
                                creationTime,

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
new file mode 100644
index 0000000..6afba17
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * The manager of the different active buckets in the {@link 
StreamingFileSink}.
+ *
+ * <p>This class is responsible for all bucket-related operations and the 
actual
+ * {@link StreamingFileSink} is just plugging in the functionality offered by
+ * this class to the lifecycle of the operator.
+ *
+ * @param <IN> The type of input elements.
+ * @param <BucketID> The type of ids for the buckets, as returned by the 
{@link Bucketer}.
+ */
+public class Buckets<IN, BucketID> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(Buckets.class);
+
+       // ------------------------ configuration fields 
--------------------------
+
+       private final Path basePath;
+
+       private final BucketFactory<IN, BucketID> bucketFactory;
+
+       private final Bucketer<IN, BucketID> bucketer;
+
+       private final PartFileWriter.PartFileFactory<IN, BucketID> 
partFileWriterFactory;
+
+       private final RollingPolicy<BucketID> rollingPolicy;
+
+       // --------------------------- runtime fields 
-----------------------------
+
+       private final int subtaskIndex;
+
+       private final Buckets.BucketerContext bucketerContext;
+
+       private final Map<BucketID, Bucket<IN, BucketID>> activeBuckets;
+
+       private long initMaxPartCounter;
+
+       private long maxPartCounterUsed;
+
+       private final RecoverableWriter fileSystemWriter;
+
+       // --------------------------- State Related Fields 
-----------------------------
+
+       private final BucketStateSerializer<BucketID> bucketStateSerializer;
+
+       /**
+        * A private constructor creating a new empty bucket manager.
+        *
+        * @param basePath The base path for our buckets.
+        * @param bucketer The {@link Bucketer} provided by the user.
+        * @param bucketFactory The {@link BucketFactory} to be used to create 
buckets.
+        * @param partFileWriterFactory The {@link 
PartFileWriter.PartFileFactory} to be used when writing data.
+        * @param rollingPolicy The {@link RollingPolicy} as specified by the 
user.
+        */
+       Buckets(
+                       final Path basePath,
+                       final Bucketer<IN, BucketID> bucketer,
+                       final BucketFactory<IN, BucketID> bucketFactory,
+                       final PartFileWriter.PartFileFactory<IN, BucketID> 
partFileWriterFactory,
+                       final RollingPolicy<BucketID> rollingPolicy,
+                       final int subtaskIndex) throws IOException {
+
+               this.basePath = Preconditions.checkNotNull(basePath);
+               this.bucketer = Preconditions.checkNotNull(bucketer);
+               this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
+               this.partFileWriterFactory = 
Preconditions.checkNotNull(partFileWriterFactory);
+               this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
+               this.subtaskIndex = subtaskIndex;
+
+               this.activeBuckets = new HashMap<>();
+               this.bucketerContext = new Buckets.BucketerContext();
+
+               this.fileSystemWriter = 
FileSystem.get(basePath.toUri()).createRecoverableWriter();
+               this.bucketStateSerializer = new BucketStateSerializer<>(
+                               
fileSystemWriter.getResumeRecoverableSerializer(),
+                               
fileSystemWriter.getCommitRecoverableSerializer(),
+                               bucketer.getSerializer()
+               );
+
+               this.initMaxPartCounter = 0L;
+               this.maxPartCounterUsed = 0L;
+       }
+
+       /**
+        * Initializes the state after recovery from a failure.
+        * @param bucketStates the state holding recovered state about active 
buckets.
+        * @param partCounterState the state holding the max previously used 
part counters.
+        * @throws Exception
+        */
+       void initializeState(final ListState<byte[]> bucketStates, final 
ListState<Long> partCounterState) throws Exception {
+
+               // When resuming after a failure:
+               // 1) we get the max part counter used before in order to make 
sure that we do not overwrite valid data
+               // 2) we commit any pending files for previous checkpoints 
(previous to the last successful one)
+               // 3) we resume writing to the previous in-progress file of 
each bucket, and
+               // 4) if we receive multiple states for the same bucket, we 
merge them.
+
+               // get the max counter
+               long maxCounter = 0L;
+               for (long partCounter: partCounterState.get()) {
+                       maxCounter = Math.max(partCounter, maxCounter);
+               }
+               initMaxPartCounter = maxCounter;
+
+               // get the restored buckets
+               for (byte[] recoveredState : bucketStates.get()) {
+                       final BucketState<BucketID> bucketState = 
SimpleVersionedSerialization.readVersionAndDeSerialize(
+                                       bucketStateSerializer, recoveredState);
+
+                       final BucketID bucketId = bucketState.getBucketId();
+
+                       LOG.info("Recovered bucket for {}", bucketId);
+
+                       final Bucket<IN, BucketID> restoredBucket = 
bucketFactory.restoreBucket(
+                                       fileSystemWriter,
+                                       subtaskIndex,
+                                       initMaxPartCounter,
+                                       partFileWriterFactory,
+                                       bucketState
+                       );
+
+                       final Bucket<IN, BucketID> existingBucket = 
activeBuckets.get(bucketId);
+                       if (existingBucket == null) {
+                               activeBuckets.put(bucketId, restoredBucket);
+                       } else {
+                               existingBucket.merge(restoredBucket);
+                       }
+
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("{} idx {} restored state for bucket 
{}", getClass().getSimpleName(),
+                                               subtaskIndex, 
assembleBucketPath(bucketId));
+                       }
+               }
+       }
+
+       void publishUpToCheckpoint(long checkpointId) throws IOException {
+               final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> 
activeBucketIt =
+                               activeBuckets.entrySet().iterator();
+
+               while (activeBucketIt.hasNext()) {
+                       Bucket<IN, BucketID> bucket = 
activeBucketIt.next().getValue();
+                       bucket.onCheckpointAcknowledgment(checkpointId);
+
+                       if (!bucket.isActive()) {
+                               // We've dealt with all the pending files and 
the writer for this bucket is not currently open.
+                               // Therefore this bucket is currently inactive 
and we can remove it from our state.
+                               activeBucketIt.remove();
+                       }
+               }
+       }
+
+       void snapshotState(
+                       final long checkpointId,
+                       final long checkpointTimestamp,
+                       final ListState<byte[]> bucketStates,
+                       final ListState<Long> partCounterState) throws 
Exception {
+
+               Preconditions.checkState(
+                               fileSystemWriter != null && 
bucketStateSerializer != null,
+                               "sink has not been initialized"
+               );
+
+               for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
+                       final PartFileInfo<BucketID> info = 
bucket.getInProgressPartInfo();
+
+                       if (info != null &&
+                                       
(rollingPolicy.shouldRollOnCheckpoint(info) ||
+                                       rollingPolicy.shouldRollOnEvent(info) ||
+                                       
rollingPolicy.shouldRollOnProcessingTime(info, checkpointTimestamp))
+                       ) {
+                               // we also check here so that we do not have to 
always
+                               // wait for the "next" element to arrive.
+                               bucket.closePartFile();
+                       }
+
+                       final BucketState<BucketID> bucketState = 
bucket.onCheckpoint(checkpointId);
+                       
bucketStates.add(SimpleVersionedSerialization.writeVersionAndSerialize(bucketStateSerializer,
 bucketState));
+               }
+
+               partCounterState.add(maxPartCounterUsed);
+       }
+
+       /**
+        * Called on every incoming element to write it to its final location.
+        * @param value the element itself.
+        * @param context the {@link SinkFunction.Context context} available to 
the sink function.
+        * @throws Exception
+        */
+       void onElement(IN value, SinkFunction.Context context) throws Exception 
{
+               final long currentProcessingTime = 
context.currentProcessingTime();
+
+               // setting the values in the bucketer context
+               bucketerContext.update(
+                               context.timestamp(),
+                               context.currentWatermark(),
+                               currentProcessingTime);
+
+               final BucketID bucketId = bucketer.getBucketId(value, 
bucketerContext);
+
+               Bucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
+               if (bucket == null) {
+                       final Path bucketPath = assembleBucketPath(bucketId);
+                       bucket = bucketFactory.getNewBucket(
+                                       fileSystemWriter,
+                                       subtaskIndex,
+                                       bucketId,
+                                       bucketPath,
+                                       initMaxPartCounter,
+                                       partFileWriterFactory);
+                       activeBuckets.put(bucketId, bucket);
+               }
+
+               final PartFileInfo<BucketID> info = 
bucket.getInProgressPartInfo();
+               if (info == null || rollingPolicy.shouldRollOnEvent(info)) {
+                       bucket.rollPartFile(currentProcessingTime);
+               }
+               bucket.write(value, currentProcessingTime);
+
+               // we update the counter here because as buckets become 
inactive and
+               // get removed in the initializeState(), at the time we 
snapshot they
+               // may not be there to take them into account during 
checkpointing.
+               updateMaxPartCounter(bucket.getPartCounter());
+       }
+
+       void onProcessingTime(long timestamp) throws Exception {
+               for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
+                       final PartFileInfo<BucketID> info = 
bucket.getInProgressPartInfo();
+                       if (info != null && 
rollingPolicy.shouldRollOnProcessingTime(info, timestamp)) {
+                               bucket.closePartFile();
+                       }
+               }
+       }
+
+       void close() {
+               if (activeBuckets != null) {
+                       activeBuckets.values().forEach(Bucket::dispose);
+               }
+       }
+
+       /**
+        * Assembles the final bucket {@link Path} that will be used for the 
provided bucket in the
+        * underlying filesystem.
+        * @param bucketId the id of the bucket as returned by the {@link 
Bucketer}.
+        * @return The resulting path.
+        */
+       private Path assembleBucketPath(BucketID bucketId) {
+               return new Path(basePath, bucketId.toString());
+       }
+
+       /**
+        * Updates the state keeping track of the maximum used part
+        * counter across all local active buckets.
+        * @param candidate the part counter that will potentially replace the 
current {@link #maxPartCounterUsed}.
+        */
+       private void updateMaxPartCounter(long candidate) {
+               maxPartCounterUsed = Math.max(maxPartCounterUsed, candidate);
+       }
+
+       /**
+        * The {@link Bucketer.Context} exposed to the
+        * {@link Bucketer#getBucketId(Object, Bucketer.Context)}
+        * whenever a new incoming element arrives.
+        */
+       private static final class BucketerContext implements Bucketer.Context {
+
+               @Nullable
+               private Long elementTimestamp;
+
+               private long currentWatermark;
+
+               private long currentProcessingTime;
+
+               private BucketerContext() {
+                       this.elementTimestamp = null;
+                       this.currentWatermark = Long.MIN_VALUE;
+                       this.currentProcessingTime = Long.MIN_VALUE;
+               }
+
+               void update(@Nullable Long element, long watermark, long 
processingTime) {
+                       this.elementTimestamp = element;
+                       this.currentWatermark = watermark;
+                       this.currentProcessingTime = processingTime;
+               }
+
+               @Override
+               public long currentProcessingTime() {
+                       return currentProcessingTime;
+               }
+
+               @Override
+               public long currentWatermark() {
+                       return currentWatermark;
+               }
+
+               @Override
+               @Nullable
+               public Long timestamp() {
+                       return elementTimestamp;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
new file mode 100644
index 0000000..558b1bf
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A {@link PartFileWriter} for bulk-encoding formats that use an {@link 
BulkPartWriter}.
+ * This also implements the {@link PartFileInfo}.
+ */
+@Internal
+final class BulkPartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
+
+       private final BulkWriter<IN> writer;
+
+       private BulkPartWriter(
+                       final BucketID bucketId,
+                       final RecoverableFsDataOutputStream currentPartStream,
+                       final BulkWriter<IN> writer,
+                       final long creationTime) {
+               super(bucketId, currentPartStream, creationTime);
+               this.writer = Preconditions.checkNotNull(writer);
+       }
+
+       @Override
+       void write(IN element, long currentTime) throws IOException {
+               writer.addElement(element);
+               markWrite(currentTime);
+       }
+
+       @Override
+       RecoverableWriter.ResumeRecoverable persist() {
+               throw new UnsupportedOperationException("Bulk Part Writers do 
not support \"pause and resume\" operations.");
+       }
+
+       @Override
+       RecoverableWriter.CommitRecoverable closeForCommit() throws IOException 
{
+               writer.flush();
+               writer.finish();
+               return super.closeForCommit();
+       }
+
+       /**
+        * A factory that creates {@link BulkPartWriter BulkPartWriters}.
+        * @param <IN> The type of input elements.
+        * @param <BucketID> The type of ids for the buckets, as returned by 
the {@link Bucketer}.
+        */
+       static class Factory<IN, BucketID> implements 
PartFileWriter.PartFileFactory<IN, BucketID> {
+
+               private final BulkWriter.Factory<IN> writerFactory;
+
+               Factory(BulkWriter.Factory<IN> writerFactory) {
+                       this.writerFactory = writerFactory;
+               }
+
+               @Override
+               public PartFileWriter<IN, BucketID> resumeFrom(
+                               final BucketID bucketId,
+                               final RecoverableWriter fileSystemWriter,
+                               final RecoverableWriter.ResumeRecoverable 
resumable,
+                               final long creationTime) throws IOException {
+
+                       Preconditions.checkNotNull(fileSystemWriter);
+                       Preconditions.checkNotNull(resumable);
+
+                       final RecoverableFsDataOutputStream stream = 
fileSystemWriter.recover(resumable);
+                       final BulkWriter<IN> writer = 
writerFactory.create(stream);
+                       return new BulkPartWriter<>(bucketId, stream, writer, 
creationTime);
+               }
+
+               @Override
+               public PartFileWriter<IN, BucketID> openNew(
+                               final BucketID bucketId,
+                               final RecoverableWriter fileSystemWriter,
+                               final Path path,
+                               final long creationTime) throws IOException {
+
+                       Preconditions.checkNotNull(fileSystemWriter);
+                       Preconditions.checkNotNull(path);
+
+                       final RecoverableFsDataOutputStream stream = 
fileSystemWriter.open(path);
+                       final BulkWriter<IN> writer = 
writerFactory.create(stream);
+                       return new BulkPartWriter<>(bucketId, stream, writer, 
creationTime);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
index 795ba74..532138f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.Encoder;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableWriter;
 
@@ -29,18 +28,18 @@ import java.io.IOException;
  * A factory returning {@link Bucket buckets}.
  */
 @Internal
-public class DefaultBucketFactory<IN> implements BucketFactory<IN> {
+class DefaultBucketFactory<IN, BucketID> implements BucketFactory<IN, 
BucketID> {
 
-       private static final long serialVersionUID = 3372881359208513357L;
+       private static final long serialVersionUID = 1L;
 
        @Override
-       public Bucket<IN> getNewBucket(
-                       RecoverableWriter fsWriter,
-                       int subtaskIndex,
-                       String bucketId,
-                       Path bucketPath,
-                       long initialPartCounter,
-                       Encoder<IN> writer) throws IOException {
+       public Bucket<IN, BucketID> getNewBucket(
+                       final RecoverableWriter fsWriter,
+                       final int subtaskIndex,
+                       final BucketID bucketId,
+                       final Path bucketPath,
+                       final long initialPartCounter,
+                       final PartFileWriter.PartFileFactory<IN, BucketID> 
partFileWriterFactory) {
 
                return new Bucket<>(
                                fsWriter,
@@ -48,22 +47,22 @@ public class DefaultBucketFactory<IN> implements 
BucketFactory<IN> {
                                bucketId,
                                bucketPath,
                                initialPartCounter,
-                               writer);
+                               partFileWriterFactory);
        }
 
        @Override
-       public Bucket<IN> restoreBucket(
-                       RecoverableWriter fsWriter,
-                       int subtaskIndex,
-                       long initialPartCounter,
-                       Encoder<IN> writer,
-                       BucketState bucketState) throws IOException {
+       public Bucket<IN, BucketID> restoreBucket(
+                       final RecoverableWriter fsWriter,
+                       final int subtaskIndex,
+                       final long initialPartCounter,
+                       final PartFileWriter.PartFileFactory<IN, BucketID> 
partFileWriterFactory,
+                       final BucketState<BucketID> bucketState) throws 
IOException {
 
                return new Bucket<>(
                                fsWriter,
                                subtaskIndex,
                                initialPartCounter,
-                               writer,
+                               partFileWriterFactory,
                                bucketState);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultRollingPolicy.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultRollingPolicy.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultRollingPolicy.java
deleted file mode 100644
index 026ac70..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultRollingPolicy.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink.filesystem;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-/**
- * The default implementation of the {@link RollingPolicy}.
- *
- * <p>This policy rolls a part file if:
- * <ol>
- *     <li>there is no open part file,</li>
- *        <li>the current file has reached the maximum bucket size (by default 
128MB),</li>
- *        <li>the current file is older than the roll over interval (by 
default 60 sec), or</li>
- *        <li>the current file has not been written to for more than the 
allowed inactivityTime (by default 60 sec).</li>
- * </ol>
- */
-@PublicEvolving
-public final class DefaultRollingPolicy implements RollingPolicy {
-
-       private static final long serialVersionUID = 1318929857047767030L;
-
-       private static final long DEFAULT_INACTIVITY_INTERVAL = 60L * 1000L;
-
-       private static final long DEFAULT_ROLLOVER_INTERVAL = 60L * 1000L;
-
-       private static final long DEFAULT_MAX_PART_SIZE = 1024L * 1024L * 128L;
-
-       private final long partSize;
-
-       private final long rolloverInterval;
-
-       private final long inactivityInterval;
-
-       /**
-        * Private constructor to avoid direct instantiation.
-        */
-       private DefaultRollingPolicy(long partSize, long rolloverInterval, long 
inactivityInterval) {
-               Preconditions.checkArgument(partSize > 0L);
-               Preconditions.checkArgument(rolloverInterval > 0L);
-               Preconditions.checkArgument(inactivityInterval > 0L);
-
-               this.partSize = partSize;
-               this.rolloverInterval = rolloverInterval;
-               this.inactivityInterval = inactivityInterval;
-       }
-
-       @Override
-       public boolean shouldRoll(final PartFileInfo state, final long 
currentTime) throws IOException {
-               if (state == null) {
-                       // this means that there is no currently open part file.
-                       return true;
-               }
-
-               if (state.getSize() > partSize) {
-                       return true;
-               }
-
-               if (currentTime - state.getCreationTime() > rolloverInterval) {
-                       return true;
-               }
-
-               return currentTime - state.getLastUpdateTime() > 
inactivityInterval;
-       }
-
-       /**
-        * Initiates the instantiation of a {@link DefaultRollingPolicy}.
-        * To finalize it and have the actual policy, call {@code .create()}.
-        */
-       public static PolicyBuilder create() {
-               return new PolicyBuilder();
-       }
-
-       /**
-        * A helper class that holds the configuration properties for the 
{@link DefaultRollingPolicy}.
-        */
-       @PublicEvolving
-       public static class PolicyBuilder {
-
-               private long partSize = DEFAULT_MAX_PART_SIZE;
-
-               private long rolloverInterval = DEFAULT_ROLLOVER_INTERVAL;
-
-               private long inactivityInterval = DEFAULT_INACTIVITY_INTERVAL;
-
-               /**
-                * Sets the part size above which a part file will have to roll.
-                * @param size the allowed part size.
-                */
-               public PolicyBuilder withMaxPartSize(long size) {
-                       Preconditions.checkState(size > 0L);
-                       this.partSize = size;
-                       return this;
-               }
-
-               /**
-                * Sets the interval of allowed inactivity after which a part 
file will have to roll.
-                * @param interval the allowed inactivity interval.
-                */
-               public PolicyBuilder withInactivityInterval(long interval) {
-                       Preconditions.checkState(interval > 0L);
-                       this.inactivityInterval = interval;
-                       return this;
-               }
-
-               /**
-                * Sets the max time a part file can stay open before having to 
roll.
-                * @param interval the desired rollover interval.
-                */
-               public PolicyBuilder withRolloverInterval(long interval) {
-                       Preconditions.checkState(interval > 0L);
-                       this.rolloverInterval = interval;
-                       return this;
-               }
-
-               /**
-                * Creates the actual policy.
-                */
-               public DefaultRollingPolicy build() {
-                       return new DefaultRollingPolicy(partSize, 
rolloverInterval, inactivityInterval);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileHandler.java
deleted file mode 100644
index 10fd12b..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileHandler.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink.filesystem;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.Encoder;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-/**
- * A handler for the currently open part file in a specific {@link Bucket}.
- * This also implements the {@link PartFileInfo}.
- */
-@Internal
-class PartFileHandler<IN> implements PartFileInfo {
-
-       private final String bucketId;
-
-       private final long creationTime;
-
-       private final RecoverableFsDataOutputStream currentPartStream;
-
-       private long lastUpdateTime;
-
-       private PartFileHandler(
-                       final String bucketId,
-                       final RecoverableFsDataOutputStream currentPartStream,
-                       final long creationTime) {
-
-               Preconditions.checkArgument(creationTime >= 0L);
-               this.bucketId = Preconditions.checkNotNull(bucketId);
-               this.currentPartStream = 
Preconditions.checkNotNull(currentPartStream);
-               this.creationTime = creationTime;
-               this.lastUpdateTime = creationTime;
-       }
-
-       public static <IN> PartFileHandler<IN> resumeFrom(
-                       final String bucketId,
-                       final RecoverableWriter fileSystemWriter,
-                       final RecoverableWriter.ResumeRecoverable resumable,
-                       final long creationTime) throws IOException {
-               Preconditions.checkNotNull(bucketId);
-               Preconditions.checkNotNull(fileSystemWriter);
-               Preconditions.checkNotNull(resumable);
-
-               final RecoverableFsDataOutputStream stream = 
fileSystemWriter.recover(resumable);
-               return new PartFileHandler<>(bucketId, stream, creationTime);
-       }
-
-       public static <IN> PartFileHandler<IN> openNew(
-                       final String bucketId,
-                       final RecoverableWriter fileSystemWriter,
-                       final Path path,
-                       final long creationTime) throws IOException {
-               Preconditions.checkNotNull(bucketId);
-               Preconditions.checkNotNull(fileSystemWriter);
-               Preconditions.checkNotNull(path);
-
-               final RecoverableFsDataOutputStream stream = 
fileSystemWriter.open(path);
-               return new PartFileHandler<>(bucketId, stream, creationTime);
-       }
-
-       void write(IN element, Encoder<IN> encoder, long currentTime) throws 
IOException {
-               encoder.encode(element, currentPartStream);
-               this.lastUpdateTime = currentTime;
-       }
-
-       RecoverableWriter.ResumeRecoverable persist() throws IOException {
-               return currentPartStream.persist();
-       }
-
-       RecoverableWriter.CommitRecoverable closeForCommit() throws IOException 
{
-               return currentPartStream.closeForCommit().getRecoverable();
-       }
-
-       void dispose() {
-               // we can suppress exceptions here, because we do not rely on 
close() to
-               // flush or persist any data
-               IOUtils.closeQuietly(currentPartStream);
-       }
-
-       @Override
-       public String getBucketId() {
-               return bucketId;
-       }
-
-       @Override
-       public long getCreationTime() {
-               return creationTime;
-       }
-
-       @Override
-       public long getSize() throws IOException {
-               return currentPartStream.getPos();
-       }
-
-       @Override
-       public long getLastUpdateTime() {
-               return lastUpdateTime;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
index 9c3d047..5e72ea0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
@@ -29,13 +29,13 @@ import java.io.IOException;
  * should roll the part file or not.
  */
 @PublicEvolving
-public interface PartFileInfo {
+public interface PartFileInfo<BucketID> {
 
        /**
         * @return The bucket identifier of the current buffer, as returned by 
the
         * {@link Bucketer#getBucketId(Object, Bucketer.Context)}.
         */
-       String getBucketId();
+       BucketID getBucketId();
 
        /**
         * @return The creation time (in ms) of the currently open part file.

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
new file mode 100644
index 0000000..662454b
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * An abstract writer for the currently open part file in a specific {@link 
Bucket}.
+ *
+ * <p>Currently, there are two subclasses, of this class:
+ * <ol>
+ *     <li>One for row-wise formats: the {@link RowWisePartWriter}.</li>
+ *     <li>One for bulk encoding formats: the {@link BulkPartWriter}.</li>
+ * </ol>
+ *
+ * <p>This also implements the {@link PartFileInfo}.
+ */
+@Internal
+abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> {
+
+       private final BucketID bucketId;
+
+       private final long creationTime;
+
+       protected final RecoverableFsDataOutputStream currentPartStream;
+
+       private long lastUpdateTime;
+
+       protected PartFileWriter(
+                       final BucketID bucketId,
+                       final RecoverableFsDataOutputStream currentPartStream,
+                       final long creationTime) {
+
+               Preconditions.checkArgument(creationTime >= 0L);
+               this.bucketId = Preconditions.checkNotNull(bucketId);
+               this.currentPartStream = 
Preconditions.checkNotNull(currentPartStream);
+               this.creationTime = creationTime;
+               this.lastUpdateTime = creationTime;
+       }
+
+       abstract void write(IN element, long currentTime) throws IOException;
+
+       RecoverableWriter.ResumeRecoverable persist() throws IOException {
+               return currentPartStream.persist();
+       }
+
+       RecoverableWriter.CommitRecoverable closeForCommit() throws IOException 
{
+               return currentPartStream.closeForCommit().getRecoverable();
+       }
+
+       void dispose() {
+               // we can suppress exceptions here, because we do not rely on 
close() to
+               // flush or persist any data
+               IOUtils.closeQuietly(currentPartStream);
+       }
+
+       void markWrite(long now) {
+               this.lastUpdateTime = now;
+       }
+
+       @Override
+       public BucketID getBucketId() {
+               return bucketId;
+       }
+
+       @Override
+       public long getCreationTime() {
+               return creationTime;
+       }
+
+       @Override
+       public long getSize() throws IOException {
+               return currentPartStream.getPos();
+       }
+
+       @Override
+       public long getLastUpdateTime() {
+               return lastUpdateTime;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * An interface for factories that create the different {@link 
PartFileWriter writers}.
+        */
+       interface PartFileFactory<IN, BucketID> {
+
+               /**
+                * Used upon recovery from a failure to recover a {@link 
PartFileWriter writer}.
+                * @param bucketId the id of the bucket this writer is writing 
to.
+                * @param fileSystemWriter the filesystem-specific writer to 
use when writing to the filesystem.
+                * @param resumable the state of the stream we are resurrecting.
+                * @param creationTime the creation time of the stream.
+                * @return the recovered {@link PartFileWriter writer}.
+                * @throws IOException
+                */
+               PartFileWriter<IN, BucketID> resumeFrom(
+                       final BucketID bucketId,
+                       final RecoverableWriter fileSystemWriter,
+                       final RecoverableWriter.ResumeRecoverable resumable,
+                       final long creationTime) throws IOException;
+
+               /**
+                * Used to create a new {@link PartFileWriter writer}.
+                * @param bucketId the id of the bucket this writer is writing 
to.
+                * @param fileSystemWriter the filesystem-specific writer to 
use when writing to the filesystem.
+                * @param path the part this writer will write to.
+                * @param creationTime the creation time of the stream.
+                * @return the new {@link PartFileWriter writer}.
+                * @throws IOException
+                */
+               PartFileWriter<IN, BucketID> openNew(
+                       final BucketID bucketId,
+                       final RecoverableWriter fileSystemWriter,
+                       final Path path,
+                       final long creationTime) throws IOException;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
index 936377e..24c38aa 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
@@ -28,13 +28,27 @@ import java.io.Serializable;
  * rolls its currently open part file and opens a new one.
  */
 @PublicEvolving
-public interface RollingPolicy extends Serializable {
+public interface RollingPolicy<BucketID> extends Serializable {
 
        /**
-        * Determines if the in-progress part file for a bucket should roll.
+        * Determines if the in-progress part file for a bucket should roll on 
every checkpoint.
+        * @param partFileState the state of the currently open part file of 
the bucket.
+        * @return {@code True} if the part file should roll, {@link false} 
otherwise.
+        */
+       boolean shouldRollOnCheckpoint(final PartFileInfo<BucketID> 
partFileState) throws IOException;
+
+       /**
+        * Determines if the in-progress part file for a bucket should roll 
based on its current state, e.g. its size.
+        * @param partFileState the state of the currently open part file of 
the bucket.
+        * @return {@code True} if the part file should roll, {@link false} 
otherwise.
+        */
+       boolean shouldRollOnEvent(final PartFileInfo<BucketID> partFileState) 
throws IOException;
+
+       /**
+        * Determines if the in-progress part file for a bucket should roll 
based on a time condition.
         * @param partFileState the state of the currently open part file of 
the bucket.
         * @param currentTime the current processing time.
         * @return {@code True} if the part file should roll, {@link false} 
otherwise.
         */
-       boolean shouldRoll(final PartFileInfo partFileState, final long 
currentTime) throws IOException;
+       boolean shouldRollOnProcessingTime(final PartFileInfo<BucketID> 
partFileState, final long currentTime) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
new file mode 100644
index 0000000..0b00b43
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A {@link PartFileWriter} for row-wise formats that use an {@link Encoder}.
+ * This also implements the {@link PartFileInfo}.
+ */
+@Internal
+final class RowWisePartWriter<IN, BucketID> extends PartFileWriter<IN, 
BucketID> {
+
+       private final Encoder<IN> encoder;
+
+       private RowWisePartWriter(
+                       final BucketID bucketId,
+                       final RecoverableFsDataOutputStream currentPartStream,
+                       final Encoder<IN> encoder,
+                       final long creationTime) {
+               super(bucketId, currentPartStream, creationTime);
+               this.encoder = Preconditions.checkNotNull(encoder);
+       }
+
+       @Override
+       void write(IN element, long currentTime) throws IOException {
+               encoder.encode(element, currentPartStream);
+               markWrite(currentTime);
+       }
+
+       /**
+        * A factory that creates {@link RowWisePartWriter RowWisePartWriters}.
+        * @param <IN> The type of input elements.
+        * @param <BucketID> The type of ids for the buckets, as returned by 
the {@link Bucketer}.
+        */
+       static class Factory<IN, BucketID> implements 
PartFileWriter.PartFileFactory<IN, BucketID> {
+
+               private final Encoder<IN> encoder;
+
+               Factory(Encoder<IN> encoder) {
+                       this.encoder = encoder;
+               }
+
+               @Override
+               public PartFileWriter<IN, BucketID> resumeFrom(
+                               final BucketID bucketId,
+                               final RecoverableWriter fileSystemWriter,
+                               final RecoverableWriter.ResumeRecoverable 
resumable,
+                               final long creationTime) throws IOException {
+
+                       Preconditions.checkNotNull(fileSystemWriter);
+                       Preconditions.checkNotNull(resumable);
+
+                       final RecoverableFsDataOutputStream stream = 
fileSystemWriter.recover(resumable);
+                       return new RowWisePartWriter<>(bucketId, stream, 
encoder, creationTime);
+               }
+
+               @Override
+               public PartFileWriter<IN, BucketID> openNew(
+                               final BucketID bucketId,
+                               final RecoverableWriter fileSystemWriter,
+                               final Path path,
+                               final long creationTime) throws IOException {
+
+                       Preconditions.checkNotNull(fileSystemWriter);
+                       Preconditions.checkNotNull(path);
+
+                       final RecoverableFsDataOutputStream stream = 
fileSystemWriter.open(path);
+                       return new RowWisePartWriter<>(bucketId, stream, 
encoder, creationTime);
+               }
+       }
+}

Reply via email to