asfgit closed pull request #6477: [FLINK-10027] Add logging to StreamingFileSink
URL: https://github.com/apache/flink/pull/6477
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index a350096e38b..6187e6853dd 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -34,13 +37,14 @@
 /**
  * A bucket is the directory organization of the output of the {@link 
StreamingFileSink}.
  *
- * <p>For each incoming  element in the {@code BucketingSink}, the 
user-specified
- * {@link Bucketer Bucketer} is
- * queried to see in which bucket this element should be written to.
+ * <p>For each incoming element in the {@code StreamingFileSink}, the 
user-specified
+ * {@link BucketAssigner} is queried to see in which bucket this element 
should be written to.
  */
-@PublicEvolving
+@Internal
 public class Bucket<IN, BucketID> {
 
+       private static final Logger LOG = LoggerFactory.getLogger(Bucket.class);
+
        private static final String PART_PREFIX = "part";
 
        private final BucketID bucketId;
@@ -53,57 +57,27 @@
 
        private final RecoverableWriter fsWriter;
 
-       private final Map<Long, List<RecoverableWriter.CommitRecoverable>> 
pendingPerCheckpoint = new HashMap<>();
-
-       private long partCounter;
-
-       private PartFileWriter<IN, BucketID> currentPart;
+       private final RollingPolicy<IN, BucketID> rollingPolicy;
 
-       private List<RecoverableWriter.CommitRecoverable> pending;
-
-       /**
-        * Constructor to restore a bucket from checkpointed state.
-        */
-       public Bucket(
-                       RecoverableWriter fsWriter,
-                       int subtaskIndex,
-                       long initialPartCounter,
-                       PartFileWriter.PartFileFactory<IN, BucketID> 
partFileFactory,
-                       BucketState<BucketID> bucketState) throws IOException {
+       private final Map<Long, List<RecoverableWriter.CommitRecoverable>> 
pendingPartsPerCheckpoint = new HashMap<>();
 
-               this(fsWriter, subtaskIndex, bucketState.getBucketId(), 
bucketState.getBucketPath(), initialPartCounter, partFileFactory);
-
-               // the constructor must have already initialized the filesystem 
writer
-               Preconditions.checkState(fsWriter != null);
-
-               // we try to resume the previous in-progress file, if the 
filesystem
-               // supports such operation. If not, we just commit the file and 
start fresh.
+       private long partCounter;
 
-               final RecoverableWriter.ResumeRecoverable resumable = 
bucketState.getInProgress();
-               if (resumable != null) {
-                       currentPart = partFileFactory.resumeFrom(
-                                       bucketId, fsWriter, resumable, 
bucketState.getCreationTime());
-               }
+       private PartFileWriter<IN, BucketID> inProgressPart;
 
-               // we commit pending files for previous checkpoints to the last 
successful one
-               // (from which we are recovering from)
-               for (List<RecoverableWriter.CommitRecoverable> commitables: 
bucketState.getPendingPerCheckpoint().values()) {
-                       for (RecoverableWriter.CommitRecoverable commitable: 
commitables) {
-                               
fsWriter.recoverForCommit(commitable).commitAfterRecovery();
-                       }
-               }
-       }
+       private List<RecoverableWriter.CommitRecoverable> 
pendingPartsForCurrentCheckpoint;
 
        /**
         * Constructor to create a new empty bucket.
         */
-       public Bucket(
-                       RecoverableWriter fsWriter,
-                       int subtaskIndex,
-                       BucketID bucketId,
-                       Path bucketPath,
-                       long initialPartCounter,
-                       PartFileWriter.PartFileFactory<IN, BucketID> 
partFileFactory) {
+       private Bucket(
+                       final RecoverableWriter fsWriter,
+                       final int subtaskIndex,
+                       final BucketID bucketId,
+                       final Path bucketPath,
+                       final long initialPartCounter,
+                       final PartFileWriter.PartFileFactory<IN, BucketID> 
partFileFactory,
+                       final RollingPolicy<IN, BucketID> rollingPolicy) {
 
                this.fsWriter = Preconditions.checkNotNull(fsWriter);
                this.subtaskIndex = subtaskIndex;
@@ -111,117 +85,241 @@ public Bucket(
                this.bucketPath = Preconditions.checkNotNull(bucketPath);
                this.partCounter = initialPartCounter;
                this.partFileFactory = 
Preconditions.checkNotNull(partFileFactory);
+               this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
 
-               this.pending = new ArrayList<>();
+               this.pendingPartsForCurrentCheckpoint = new ArrayList<>();
        }
 
        /**
-        * Gets the information available for the currently
-        * open part file, i.e. the one we are currently writing to.
-        *
-        * <p>This will be null if there is no currently open part file. This
-        * is the case when we have a new, just created bucket or a bucket
-        * that has not received any data after the closing of its previously
-        * open in-progress file due to the specified rolling policy.
-        *
-        * @return The information about the currently in-progress part file
-        * or {@code null} if there is no open part file.
+        * Constructor to restore a bucket from checkpointed state.
         */
-       public PartFileInfo<BucketID> getInProgressPartInfo() {
-               return currentPart;
+       private Bucket(
+                       final RecoverableWriter fsWriter,
+                       final int subtaskIndex,
+                       final long initialPartCounter,
+                       final PartFileWriter.PartFileFactory<IN, BucketID> 
partFileFactory,
+                       final RollingPolicy<IN, BucketID> rollingPolicy,
+                       final BucketState<BucketID> bucketState) throws 
IOException {
+
+               this(
+                               fsWriter,
+                               subtaskIndex,
+                               bucketState.getBucketId(),
+                               bucketState.getBucketPath(),
+                               initialPartCounter,
+                               partFileFactory,
+                               rollingPolicy);
+
+               restoreInProgressFile(bucketState);
+               commitRecoveredPendingFiles(bucketState);
        }
 
-       public BucketID getBucketId() {
+       private void restoreInProgressFile(final BucketState<BucketID> state) 
throws IOException {
+
+               // we try to resume the previous in-progress file
+               if (state.hasInProgressResumableFile()) {
+                       final RecoverableWriter.ResumeRecoverable resumable = 
state.getInProgressResumableFile();
+                       inProgressPart = partFileFactory.resumeFrom(
+                                       bucketId, fsWriter, resumable, 
state.getInProgressFileCreationTime());
+               }
+       }
+
+       private void commitRecoveredPendingFiles(final BucketState<BucketID> 
state) throws IOException {
+
+               // we commit pending files for checkpoints that precess the 
last successful one, from which we are recovering
+               for (List<RecoverableWriter.CommitRecoverable> committables: 
state.getCommittableFilesPerCheckpoint().values()) {
+                       for (RecoverableWriter.CommitRecoverable committable: 
committables) {
+                               
fsWriter.recoverForCommit(committable).commitAfterRecovery();
+                       }
+               }
+       }
+
+       BucketID getBucketId() {
                return bucketId;
        }
 
-       public Path getBucketPath() {
+       Path getBucketPath() {
                return bucketPath;
        }
 
-       public long getPartCounter() {
+       long getPartCounter() {
                return partCounter;
        }
 
-       public boolean isActive() {
-               return currentPart != null || !pending.isEmpty() || 
!pendingPerCheckpoint.isEmpty();
+       boolean isActive() {
+               return inProgressPart != null || 
!pendingPartsForCurrentCheckpoint.isEmpty() || 
!pendingPartsPerCheckpoint.isEmpty();
+       }
+
+       void merge(final Bucket<IN, BucketID> bucket) throws IOException {
+               Preconditions.checkNotNull(bucket);
+               Preconditions.checkState(Objects.equals(bucket.bucketPath, 
bucketPath));
+
+               // There should be no pending files in the "to-merge" states.
+               // The reason is that:
+               // 1) the pendingPartsForCurrentCheckpoint is emptied whenever 
we take a snapshot (see prepareBucketForCheckpointing()).
+               //    So a snapshot, including the one we are recovering from, 
will never contain such files.
+               // 2) the files in pendingPartsPerCheckpoint are committed upon 
recovery (see commitRecoveredPendingFiles()).
+
+               
Preconditions.checkState(bucket.pendingPartsForCurrentCheckpoint.isEmpty());
+               
Preconditions.checkState(bucket.pendingPartsPerCheckpoint.isEmpty());
+
+               RecoverableWriter.CommitRecoverable committable = 
bucket.closePartFile();
+               if (committable != null) {
+                       pendingPartsForCurrentCheckpoint.add(committable);
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Subtask {} merging buckets for bucket 
id={}", subtaskIndex, bucketId);
+               }
        }
 
        void write(IN element, long currentTime) throws IOException {
-               Preconditions.checkState(currentPart != null, "bucket has been 
closed");
-               currentPart.write(element, currentTime);
+               if (inProgressPart == null || 
rollingPolicy.shouldRollOnEvent(inProgressPart, element)) {
+
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Subtask {} closing in-progress part 
file for bucket id={} due to element {}.",
+                                               subtaskIndex, bucketId, 
element);
+                       }
+
+                       rollPartFile(currentTime);
+               }
+               inProgressPart.write(element, currentTime);
        }
 
-       void rollPartFile(final long currentTime) throws IOException {
+       private void rollPartFile(final long currentTime) throws IOException {
                closePartFile();
-               currentPart = partFileFactory.openNew(bucketId, fsWriter, 
getNewPartPath(), currentTime);
+
+               final Path partFilePath = assembleNewPartPath();
+               inProgressPart = partFileFactory.openNew(bucketId, fsWriter, 
partFilePath, currentTime);
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Subtask {} opening new part file \"{}\" for 
bucket id={}.",
+                                       subtaskIndex, partFilePath.getName(), 
bucketId);
+               }
+
                partCounter++;
        }
 
-       void merge(final Bucket<IN, BucketID> bucket) throws IOException {
-               Preconditions.checkNotNull(bucket);
-               Preconditions.checkState(Objects.equals(bucket.getBucketPath(), 
bucketPath));
+       private Path assembleNewPartPath() {
+               return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + 
'-' + partCounter);
+       }
 
-               // there should be no pending files in the "to-merge" states.
-               Preconditions.checkState(bucket.pending.isEmpty());
-               Preconditions.checkState(bucket.pendingPerCheckpoint.isEmpty());
+       private RecoverableWriter.CommitRecoverable closePartFile() throws 
IOException {
+               RecoverableWriter.CommitRecoverable committable = null;
+               if (inProgressPart != null) {
+                       committable = inProgressPart.closeForCommit();
+                       pendingPartsForCurrentCheckpoint.add(committable);
+                       inProgressPart = null;
+               }
+               return committable;
+       }
 
-               RecoverableWriter.CommitRecoverable commitable = 
bucket.closePartFile();
-               if (commitable != null) {
-                       pending.add(commitable);
+       void disposePartFile() {
+               if (inProgressPart != null) {
+                       inProgressPart.dispose();
                }
        }
 
-       RecoverableWriter.CommitRecoverable closePartFile() throws IOException {
-               RecoverableWriter.CommitRecoverable commitable = null;
-               if (currentPart != null) {
-                       commitable = currentPart.closeForCommit();
-                       pending.add(commitable);
-                       currentPart = null;
+       BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws 
IOException {
+               prepareBucketForCheckpointing(checkpointId);
+
+               RecoverableWriter.ResumeRecoverable inProgressResumable = null;
+               long inProgressFileCreationTime = Long.MAX_VALUE;
+
+               if (inProgressPart != null) {
+                       inProgressResumable = inProgressPart.persist();
+                       inProgressFileCreationTime = 
inProgressPart.getCreationTime();
                }
-               return commitable;
+
+               return new BucketState<>(bucketId, bucketPath, 
inProgressFileCreationTime, inProgressResumable, pendingPartsPerCheckpoint);
        }
 
-       public void dispose() {
-               if (currentPart != null) {
-                       currentPart.dispose();
+       private void prepareBucketForCheckpointing(long checkpointId) throws 
IOException {
+               if (inProgressPart != null && 
rollingPolicy.shouldRollOnCheckpoint(inProgressPart)) {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Subtask {} closing in-progress part 
file for bucket id={} on checkpoint.", subtaskIndex, bucketId);
+                       }
+                       closePartFile();
+               }
+
+               if (!pendingPartsForCurrentCheckpoint.isEmpty()) {
+                       pendingPartsPerCheckpoint.put(checkpointId, 
pendingPartsForCurrentCheckpoint);
+                       pendingPartsForCurrentCheckpoint = new ArrayList<>();
                }
        }
 
-       public void onCheckpointAcknowledgment(long checkpointId) throws 
IOException {
+       void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws 
IOException {
                Preconditions.checkNotNull(fsWriter);
 
                Iterator<Map.Entry<Long, 
List<RecoverableWriter.CommitRecoverable>>> it =
-                               pendingPerCheckpoint.entrySet().iterator();
+                               pendingPartsPerCheckpoint.entrySet().iterator();
 
                while (it.hasNext()) {
                        Map.Entry<Long, 
List<RecoverableWriter.CommitRecoverable>> entry = it.next();
+
                        if (entry.getKey() <= checkpointId) {
-                               for (RecoverableWriter.CommitRecoverable 
commitable : entry.getValue()) {
-                                       
fsWriter.recoverForCommit(commitable).commit();
+                               for (RecoverableWriter.CommitRecoverable 
committable : entry.getValue()) {
+                                       
fsWriter.recoverForCommit(committable).commit();
                                }
                                it.remove();
                        }
                }
        }
 
-       public BucketState<BucketID> onCheckpoint(long checkpointId) throws 
IOException {
-               RecoverableWriter.ResumeRecoverable resumable = null;
-               long creationTime = Long.MAX_VALUE;
-
-               if (currentPart != null) {
-                       resumable = currentPart.persist();
-                       creationTime = currentPart.getCreationTime();
+       void onProcessingTime(long timestamp) throws IOException {
+               if (inProgressPart != null && 
rollingPolicy.shouldRollOnProcessingTime(inProgressPart, timestamp)) {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Subtask {} closing in-progress part 
file for bucket id={} due to processing time rolling policy " +
+                                               "(in-progress file created @ 
{}, last updated @ {} and current time is {}).",
+                                               subtaskIndex, bucketId, 
inProgressPart.getCreationTime(), inProgressPart.getLastUpdateTime(), 
timestamp);
+                       }
+                       closePartFile();
                }
+       }
 
-               if (!pending.isEmpty()) {
-                       pendingPerCheckpoint.put(checkpointId, pending);
-                       pending = new ArrayList<>();
-               }
-               return new BucketState<>(bucketId, bucketPath, creationTime, 
resumable, pendingPerCheckpoint);
+       // --------------------------- Static Factory Methods 
-----------------------------
+
+       /**
+        * Creates a new empty {@code Bucket}.
+        * @param fsWriter the filesystem-specific {@link RecoverableWriter}.
+        * @param subtaskIndex the index of the subtask creating the bucket.
+        * @param bucketId the identifier of the bucket, as returned by the 
{@link BucketAssigner}.
+        * @param bucketPath the path to where the part files for the bucket 
will be written to.
+        * @param initialPartCounter the initial counter for the part files of 
the bucket.
+        * @param partFileFactory the {@link PartFileWriter.PartFileFactory} 
the factory creating part file writers.
+        * @param <IN> the type of input elements to the sink.
+        * @param <BucketID> the type of the identifier of the bucket, as 
returned by the {@link BucketAssigner}
+        * @return The new Bucket.
+        */
+       static <IN, BucketID> Bucket<IN, BucketID> getNew(
+                       final RecoverableWriter fsWriter,
+                       final int subtaskIndex,
+                       final BucketID bucketId,
+                       final Path bucketPath,
+                       final long initialPartCounter,
+                       final PartFileWriter.PartFileFactory<IN, BucketID> 
partFileFactory,
+                       final RollingPolicy<IN, BucketID> rollingPolicy) {
+               return new Bucket<>(fsWriter, subtaskIndex, bucketId, 
bucketPath, initialPartCounter, partFileFactory, rollingPolicy);
        }
 
-       private Path getNewPartPath() {
-               return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + 
'-' + partCounter);
+       /**
+        * Restores a {@code Bucket} from the state included in the provided 
{@link BucketState}.
+        * @param fsWriter the filesystem-specific {@link RecoverableWriter}.
+        * @param subtaskIndex the index of the subtask creating the bucket.
+        * @param initialPartCounter the initial counter for the part files of 
the bucket.
+        * @param partFileFactory the {@link PartFileWriter.PartFileFactory} 
the factory creating part file writers.
+        * @param bucketState the initial state of the restored bucket.
+        * @param <IN> the type of input elements to the sink.
+        * @param <BucketID> the type of the identifier of the bucket, as 
returned by the {@link BucketAssigner}
+        * @return The restored Bucket.
+        */
+       static <IN, BucketID> Bucket<IN, BucketID> restore(
+                       final RecoverableWriter fsWriter,
+                       final int subtaskIndex,
+                       final long initialPartCounter,
+                       final PartFileWriter.PartFileFactory<IN, BucketID> 
partFileFactory,
+                       final RollingPolicy<IN, BucketID> rollingPolicy,
+                       final BucketState<BucketID> bucketState) throws 
IOException {
+               return new Bucket<>(fsWriter, subtaskIndex, initialPartCounter, 
partFileFactory, rollingPolicy, bucketState);
        }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucketer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.java
similarity index 69%
rename from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucketer.java
rename to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.java
index a7052cb219b..a9b0200b110 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucketer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.java
@@ -28,49 +28,45 @@
 import java.io.Serializable;
 
 /**
- * A bucketer is used with a {@link 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink}
- * to determine the {@link 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket} each incoming 
element
+ * A BucketAssigner is used with a {@link StreamingFileSink} to determine the 
{@link Bucket} each incoming element
  * should be put into.
  *
  * <p>The {@code StreamingFileSink} can be writing to many buckets at a time, 
and it is responsible for managing
- * a set of active buckets. Whenever a new element arrives it will ask the 
{@code Bucketer} for the bucket the
- * element should fall in. The {@code Bucketer} can, for example, determine 
buckets based on system time.
+ * a set of active buckets. Whenever a new element arrives it will ask the 
{@code BucketAssigner} for the bucket the
+ * element should fall in. The {@code BucketAssigner} can, for example, 
determine buckets based on system time.
  *
  * @param <IN> The type of input elements.
- * @param <BucketID> The type of the object returned by the {@link 
#getBucketId(Object, Bucketer.Context)}. This has to have
+ * @param <BucketID> The type of the object returned by the {@link 
#getBucketId(Object, BucketAssigner.Context)}. This has to have
  *                  a correct {@link #hashCode()} and {@link #equals(Object)} 
method. In addition, the {@link Path}
  *                  to the created bucket will be the result of the {@link 
#toString()} of this method, appended to
- *                  the {@code basePath} specified in the
- *                  {@link 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink 
StreamingFileSink}
+ *                  the {@code basePath} specified in the {@link 
StreamingFileSink StreamingFileSink}.
  */
 @PublicEvolving
-public interface Bucketer<IN, BucketID> extends Serializable {
+public interface BucketAssigner<IN, BucketID> extends Serializable {
 
        /**
         * Returns the identifier of the bucket the provided element should be 
put into.
         * @param element The current element being processed.
-        * @param context The {@link SinkFunction.Context context} used by the
-        *                {@link 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink 
sink}.
+        * @param context The {@link SinkFunction.Context context} used by the 
{@link StreamingFileSink sink}.
         *
         * @return A string representing the identifier of the bucket the 
element should be put into.
-        * This actual path to the bucket will result from the concatenation of 
the returned string
-        * and the {@code base path} provided during the initialization of the
-        * {@link 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink 
sink}.
+        * The actual path to the bucket will result from the concatenation of 
the returned string
+        * and the {@code base path} provided during the initialization of the 
{@link StreamingFileSink sink}.
         */
-       BucketID getBucketId(IN element, Bucketer.Context context);
+       BucketID getBucketId(IN element, BucketAssigner.Context context);
 
        /**
         * @return A {@link SimpleVersionedSerializer} capable of 
serializing/deserializing the elements
         * of type {@code BucketID}. That is the type of the objects returned 
by the
-        * {@link #getBucketId(Object, Bucketer.Context)}.
+        * {@link #getBucketId(Object, BucketAssigner.Context)}.
         */
        SimpleVersionedSerializer<BucketID> getSerializer();
 
        /**
-        * Context that the {@link Bucketer} can use for getting additional 
data about
+        * Context that the {@link BucketAssigner} can use for getting 
additional data about
         * an input record.
         *
-        * <p>The context is only valid for the duration of a {@link 
Bucketer#getBucketId(Object, Bucketer.Context)} call.
+        * <p>The context is only valid for the duration of a {@link 
BucketAssigner#getBucketId(Object, BucketAssigner.Context)} call.
         * Do not store the context and use afterwards!
         */
        @PublicEvolving
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
index 0c6b587b421..2306fafff6a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
@@ -37,12 +37,14 @@
                        final BucketID bucketId,
                        final Path bucketPath,
                        final long initialPartCounter,
-                       final PartFileWriter.PartFileFactory<IN, BucketID> 
partFileWriterFactory) throws IOException;
+                       final PartFileWriter.PartFileFactory<IN, BucketID> 
partFileWriterFactory,
+                       final RollingPolicy<IN, BucketID> rollingPolicy) throws 
IOException;
 
        Bucket<IN, BucketID> restoreBucket(
                        final RecoverableWriter fsWriter,
                        final int subtaskIndex,
                        final long initialPartCounter,
                        final PartFileWriter.PartFileFactory<IN, BucketID> 
partFileWriterFactory,
+                       final RollingPolicy<IN, BucketID> rollingPolicy,
                        final BucketState<BucketID> bucketState) throws 
IOException;
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
index bb49e3a5d25..18293815068 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
@@ -32,64 +32,90 @@
  * The state of the {@link Bucket} that is to be checkpointed.
  */
 @Internal
-public class BucketState<BucketID> {
+class BucketState<BucketID> {
 
        private final BucketID bucketId;
 
-       /**
-        * The base path for the bucket, i.e. the directory where all the part 
files are stored.
-        */
+       /** The directory where all the part files of the bucket are stored. */
        private final Path bucketPath;
 
        /**
-        * The creation time of the currently open part file, or {@code 
Long.MAX_VALUE} if there is no open part file.
+        * The creation time of the currently open part file,
+        * or {@code Long.MAX_VALUE} if there is no open part file.
         */
-       private final long creationTime;
+       private final long inProgressFileCreationTime;
 
        /**
-        * A {@link RecoverableWriter.ResumeRecoverable} for the currently open 
part file, or null
-        * if there is no currently open part file.
+        * A {@link RecoverableWriter.ResumeRecoverable} for the currently open
+        * part file, or null if there is no currently open part file.
         */
        @Nullable
-       private final RecoverableWriter.ResumeRecoverable inProgress;
+       private final RecoverableWriter.ResumeRecoverable 
inProgressResumableFile;
 
        /**
-        * The {@link RecoverableWriter.CommitRecoverable files} pending to be 
committed, organized by checkpoint id.
+        * The {@link RecoverableWriter.CommitRecoverable files} pending to be
+        * committed, organized by checkpoint id.
         */
-       private final Map<Long, List<RecoverableWriter.CommitRecoverable>> 
pendingPerCheckpoint;
+       private final Map<Long, List<RecoverableWriter.CommitRecoverable>> 
committableFilesPerCheckpoint;
 
-       public BucketState(
+       BucketState(
                        final BucketID bucketId,
                        final Path bucketPath,
-                       final long creationTime,
-                       @Nullable final RecoverableWriter.ResumeRecoverable 
inProgress,
-                       final Map<Long, 
List<RecoverableWriter.CommitRecoverable>> pendingPerCheckpoint
+                       final long inProgressFileCreationTime,
+                       @Nullable final RecoverableWriter.ResumeRecoverable 
inProgressResumableFile,
+                       final Map<Long, 
List<RecoverableWriter.CommitRecoverable>> pendingCommittablesPerCheckpoint
        ) {
                this.bucketId = Preconditions.checkNotNull(bucketId);
                this.bucketPath = Preconditions.checkNotNull(bucketPath);
-               this.creationTime = creationTime;
-               this.inProgress = inProgress;
-               this.pendingPerCheckpoint = 
Preconditions.checkNotNull(pendingPerCheckpoint);
+               this.inProgressFileCreationTime = inProgressFileCreationTime;
+               this.inProgressResumableFile = inProgressResumableFile;
+               this.committableFilesPerCheckpoint = 
Preconditions.checkNotNull(pendingCommittablesPerCheckpoint);
        }
 
-       public BucketID getBucketId() {
+       BucketID getBucketId() {
                return bucketId;
        }
 
-       public Path getBucketPath() {
+       Path getBucketPath() {
                return bucketPath;
        }
 
-       public long getCreationTime() {
-               return creationTime;
+       long getInProgressFileCreationTime() {
+               return inProgressFileCreationTime;
+       }
+
+       boolean hasInProgressResumableFile() {
+               return inProgressResumableFile != null;
        }
 
        @Nullable
-       public RecoverableWriter.ResumeRecoverable getInProgress() {
-               return inProgress;
+       RecoverableWriter.ResumeRecoverable getInProgressResumableFile() {
+               return inProgressResumableFile;
+       }
+
+       Map<Long, List<RecoverableWriter.CommitRecoverable>> 
getCommittableFilesPerCheckpoint() {
+               return committableFilesPerCheckpoint;
        }
 
-       public Map<Long, List<RecoverableWriter.CommitRecoverable>> 
getPendingPerCheckpoint() {
-               return pendingPerCheckpoint;
+       @Override
+       public String toString() {
+               final StringBuilder strBuilder = new StringBuilder();
+
+               strBuilder
+                               .append("BucketState for 
bucketId=").append(bucketId)
+                               .append(" and bucketPath=").append(bucketPath);
+
+               if (hasInProgressResumableFile()) {
+                       strBuilder.append(", has open part file created @ 
").append(inProgressFileCreationTime);
+               }
+
+               if (!committableFilesPerCheckpoint.isEmpty()) {
+                       strBuilder.append(", has pending files for checkpoints: 
{");
+                       for (long checkpointId: 
committableFilesPerCheckpoint.keySet()) {
+                               strBuilder.append(checkpointId).append(' ');
+                       }
+                       strBuilder.append('}');
+               }
+               return strBuilder.toString();
        }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
index cf9b8057bd1..04de2462d67 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
@@ -90,20 +90,20 @@ public int getVersion() {
        void serializeV1(BucketState<BucketID> state, DataOutputView out) 
throws IOException {
                
SimpleVersionedSerialization.writeVersionAndSerialize(bucketIdSerializer, 
state.getBucketId(), out);
                out.writeUTF(state.getBucketPath().toString());
-               out.writeLong(state.getCreationTime());
+               out.writeLong(state.getInProgressFileCreationTime());
 
                // put the current open part file
-               final RecoverableWriter.ResumeRecoverable currentPart = 
state.getInProgress();
-               if (currentPart != null) {
+               if (state.hasInProgressResumableFile()) {
+                       final RecoverableWriter.ResumeRecoverable resumable = 
state.getInProgressResumableFile();
                        out.writeBoolean(true);
-                       
SimpleVersionedSerialization.writeVersionAndSerialize(resumableSerializer, 
currentPart, out);
+                       
SimpleVersionedSerialization.writeVersionAndSerialize(resumableSerializer, 
resumable, out);
                }
                else {
                        out.writeBoolean(false);
                }
 
                // put the map of pending files per checkpoint
-               final Map<Long, List<RecoverableWriter.CommitRecoverable>> 
pendingCommitters = state.getPendingPerCheckpoint();
+               final Map<Long, List<RecoverableWriter.CommitRecoverable>> 
pendingCommitters = state.getCommittableFilesPerCheckpoint();
 
                // manually keep the version here to safe some bytes
                out.writeInt(commitableSerializer.getVersion());
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index e62c425fc2f..2aca841f16d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -44,8 +45,9 @@
  * this class to the lifecycle of the operator.
  *
  * @param <IN> The type of input elements.
- * @param <BucketID> The type of ids for the buckets, as returned by the 
{@link Bucketer}.
+ * @param <BucketID> The type of ids for the buckets, as returned by the 
{@link BucketAssigner}.
  */
+@Internal
 public class Buckets<IN, BucketID> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(Buckets.class);
@@ -56,7 +58,7 @@
 
        private final BucketFactory<IN, BucketID> bucketFactory;
 
-       private final Bucketer<IN, BucketID> bucketer;
+       private final BucketAssigner<IN, BucketID> bucketAssigner;
 
        private final PartFileWriter.PartFileFactory<IN, BucketID> 
partFileWriterFactory;
 
@@ -70,33 +72,33 @@
 
        private final Map<BucketID, Bucket<IN, BucketID>> activeBuckets;
 
-       private long maxPartCounterUsed;
+       private long maxPartCounter;
 
-       private final RecoverableWriter fileSystemWriter;
+       private final RecoverableWriter fsWriter;
 
        // --------------------------- State Related Fields 
-----------------------------
 
        private final BucketStateSerializer<BucketID> bucketStateSerializer;
 
        /**
-        * A private constructor creating a new empty bucket manager.
+        * A constructor creating a new empty bucket manager.
         *
         * @param basePath The base path for our buckets.
-        * @param bucketer The {@link Bucketer} provided by the user.
+        * @param bucketAssigner The {@link BucketAssigner} provided by the 
user.
         * @param bucketFactory The {@link BucketFactory} to be used to create 
buckets.
         * @param partFileWriterFactory The {@link 
PartFileWriter.PartFileFactory} to be used when writing data.
         * @param rollingPolicy The {@link RollingPolicy} as specified by the 
user.
         */
        Buckets(
                        final Path basePath,
-                       final Bucketer<IN, BucketID> bucketer,
+                       final BucketAssigner<IN, BucketID> bucketAssigner,
                        final BucketFactory<IN, BucketID> bucketFactory,
                        final PartFileWriter.PartFileFactory<IN, BucketID> 
partFileWriterFactory,
                        final RollingPolicy<IN, BucketID> rollingPolicy,
                        final int subtaskIndex) throws IOException {
 
                this.basePath = Preconditions.checkNotNull(basePath);
-               this.bucketer = Preconditions.checkNotNull(bucketer);
+               this.bucketAssigner = 
Preconditions.checkNotNull(bucketAssigner);
                this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
                this.partFileWriterFactory = 
Preconditions.checkNotNull(partFileWriterFactory);
                this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
@@ -105,75 +107,102 @@
                this.activeBuckets = new HashMap<>();
                this.bucketerContext = new Buckets.BucketerContext();
 
-               this.fileSystemWriter = 
FileSystem.get(basePath.toUri()).createRecoverableWriter();
+               try {
+                       this.fsWriter = 
FileSystem.get(basePath.toUri()).createRecoverableWriter();
+               } catch (IOException e) {
+                       LOG.error("Unable to create filesystem for path: {}", 
basePath);
+                       throw e;
+               }
+
                this.bucketStateSerializer = new BucketStateSerializer<>(
-                               
fileSystemWriter.getResumeRecoverableSerializer(),
-                               
fileSystemWriter.getCommitRecoverableSerializer(),
-                               bucketer.getSerializer()
+                               fsWriter.getResumeRecoverableSerializer(),
+                               fsWriter.getCommitRecoverableSerializer(),
+                               bucketAssigner.getSerializer()
                );
 
-               this.maxPartCounterUsed = 0L;
+               this.maxPartCounter = 0L;
        }
 
        /**
         * Initializes the state after recovery from a failure.
+        *
+        * <p>During this process:
+        * <ol>
+        *     <li>we set the initial value for part counter to the maximum 
value used before across all tasks and buckets.
+        *     This guarantees that we do not overwrite valid data,</li>
+        *     <li>we commit any pending files for previous checkpoints 
(previous to the last successful one from which we restore),</li>
+        *     <li>we resume writing to the previous in-progress file of each 
bucket, and</li>
+        *     <li>if we receive multiple states for the same bucket, we merge 
them.</li>
+        * </ol>
         * @param bucketStates the state holding recovered state about active 
buckets.
         * @param partCounterState the state holding the max previously used 
part counters.
-        * @throws Exception
+        * @throws Exception if anything goes wrong during retrieving the state 
or restoring/committing of any
+        * in-progress/pending part files
         */
        void initializeState(final ListState<byte[]> bucketStates, final 
ListState<Long> partCounterState) throws Exception {
 
-               // When resuming after a failure:
-               // 1) we get the max part counter used before in order to make 
sure that we do not overwrite valid data
-               // 2) we commit any pending files for previous checkpoints 
(previous to the last successful one)
-               // 3) we resume writing to the previous in-progress file of 
each bucket, and
-               // 4) if we receive multiple states for the same bucket, we 
merge them.
+               initializePartCounter(partCounterState);
+
+               LOG.info("Subtask {} initializing its state (max part 
counter={}).", subtaskIndex, maxPartCounter);
+
+               initializeActiveBuckets(bucketStates);
+       }
 
-               // get the max counter
+       private void initializePartCounter(final ListState<Long> 
partCounterState) throws Exception {
                long maxCounter = 0L;
                for (long partCounter: partCounterState.get()) {
                        maxCounter = Math.max(partCounter, maxCounter);
                }
-               maxPartCounterUsed = maxCounter;
+               maxPartCounter = maxCounter;
+       }
 
-               // get the restored buckets
-               for (byte[] recoveredState : bucketStates.get()) {
-                       final BucketState<BucketID> bucketState = 
SimpleVersionedSerialization.readVersionAndDeSerialize(
-                                       bucketStateSerializer, recoveredState);
+       private void initializeActiveBuckets(final ListState<byte[]> 
bucketStates) throws Exception {
+               for (byte[] serializedRecoveredState : bucketStates.get()) {
+                       final BucketState<BucketID> recoveredState =
+                                       
SimpleVersionedSerialization.readVersionAndDeSerialize(
+                                                       bucketStateSerializer, 
serializedRecoveredState);
+                       handleRestoredBucketState(recoveredState);
+               }
+       }
 
-                       final BucketID bucketId = bucketState.getBucketId();
+       private void handleRestoredBucketState(final BucketState<BucketID> 
recoveredState) throws Exception {
+               final BucketID bucketId = recoveredState.getBucketId();
 
-                       LOG.info("Recovered bucket for {}", bucketId);
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Subtask {} restoring: {}", subtaskIndex, 
recoveredState);
+               }
 
-                       final Bucket<IN, BucketID> restoredBucket = 
bucketFactory.restoreBucket(
-                                       fileSystemWriter,
-                                       subtaskIndex,
-                                       maxPartCounterUsed,
-                                       partFileWriterFactory,
-                                       bucketState
-                       );
-
-                       final Bucket<IN, BucketID> existingBucket = 
activeBuckets.get(bucketId);
-                       if (existingBucket == null) {
-                               activeBuckets.put(bucketId, restoredBucket);
-                       } else {
-                               existingBucket.merge(restoredBucket);
-                       }
+               final Bucket<IN, BucketID> restoredBucket = bucketFactory
+                               .restoreBucket(
+                                               fsWriter,
+                                               subtaskIndex,
+                                               maxPartCounter,
+                                               partFileWriterFactory,
+                                               rollingPolicy,
+                                               recoveredState
+                               );
+
+               updateActiveBucketId(bucketId, restoredBucket);
+       }
 
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("{} idx {} restored state for bucket 
{}", getClass().getSimpleName(),
-                                               subtaskIndex, 
assembleBucketPath(bucketId));
-                       }
+       private void updateActiveBucketId(final BucketID bucketId, final 
Bucket<IN, BucketID> restoredBucket) throws IOException {
+               final Bucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
+               if (bucket != null) {
+                       bucket.merge(restoredBucket);
+               } else {
+                       activeBuckets.put(bucketId, restoredBucket);
                }
        }
 
-       void publishUpToCheckpoint(long checkpointId) throws IOException {
+       void commitUpToCheckpoint(final long checkpointId) throws IOException {
                final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> 
activeBucketIt =
                                activeBuckets.entrySet().iterator();
 
+               LOG.info("Subtask {} received completion notification for 
checkpoint with id={}.", subtaskIndex, checkpointId);
+
                while (activeBucketIt.hasNext()) {
-                       Bucket<IN, BucketID> bucket = 
activeBucketIt.next().getValue();
-                       bucket.onCheckpointAcknowledgment(checkpointId);
+                       final Bucket<IN, BucketID> bucket = 
activeBucketIt.next().getValue();
+                       bucket.onSuccessfulCompletionOfCheckpoint(checkpointId);
 
                        if (!bucket.isActive()) {
                                // We've dealt with all the pending files and 
the writer for this bucket is not currently open.
@@ -185,35 +214,39 @@ void publishUpToCheckpoint(long checkpointId) throws 
IOException {
 
        void snapshotState(
                        final long checkpointId,
-                       final ListState<byte[]> bucketStates,
-                       final ListState<Long> partCounterState) throws 
Exception {
+                       final ListState<byte[]> bucketStatesContainer,
+                       final ListState<Long> partCounterStateContainer) throws 
Exception {
 
                Preconditions.checkState(
-                               fileSystemWriter != null && 
bucketStateSerializer != null,
-                               "sink has not been initialized"
-               );
+                               fsWriter != null && bucketStateSerializer != 
null,
+                               "sink has not been initialized");
+
+               LOG.info("Subtask {} checkpointing for checkpoint with id={} 
(max part counter={}).",
+                               subtaskIndex, checkpointId, maxPartCounter);
+
+               snapshotActiveBuckets(checkpointId, bucketStatesContainer);
+               partCounterStateContainer.add(maxPartCounter);
+       }
+
+       private void snapshotActiveBuckets(
+                       final long checkpointId,
+                       final ListState<byte[]> bucketStatesContainer) throws 
Exception {
 
                for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
-                       final PartFileInfo<BucketID> info = 
bucket.getInProgressPartInfo();
+                       final BucketState<BucketID> bucketState = 
bucket.onReceptionOfCheckpoint(checkpointId);
 
-                       if (info != null && 
rollingPolicy.shouldRollOnCheckpoint(info)) {
-                               bucket.closePartFile();
-                       }
+                       final byte[] serializedBucketState = 
SimpleVersionedSerialization
+                                       
.writeVersionAndSerialize(bucketStateSerializer, bucketState);
 
-                       final BucketState<BucketID> bucketState = 
bucket.onCheckpoint(checkpointId);
-                       
bucketStates.add(SimpleVersionedSerialization.writeVersionAndSerialize(bucketStateSerializer,
 bucketState));
-               }
+                       bucketStatesContainer.add(serializedBucketState);
 
-               partCounterState.add(maxPartCounterUsed);
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Subtask {} checkpointing: {}", 
subtaskIndex, bucketState);
+                       }
+               }
        }
 
-       /**
-        * Called on every incoming element to write it to its final location.
-        * @param value the element itself.
-        * @param context the {@link SinkFunction.Context context} available to 
the sink function.
-        * @throws Exception
-        */
-       void onElement(IN value, SinkFunction.Context context) throws Exception 
{
+       void onElement(final IN value, final SinkFunction.Context context) 
throws Exception {
                final long currentProcessingTime = 
context.currentProcessingTime();
 
                // setting the values in the bucketer context
@@ -222,79 +255,56 @@ void onElement(IN value, SinkFunction.Context context) 
throws Exception {
                                context.currentWatermark(),
                                currentProcessingTime);
 
-               final BucketID bucketId = bucketer.getBucketId(value, 
bucketerContext);
+               final BucketID bucketId = bucketAssigner.getBucketId(value, 
bucketerContext);
+               final Bucket<IN, BucketID> bucket = 
getOrCreateBucketForBucketId(bucketId);
+               bucket.write(value, currentProcessingTime);
+
+               // we update the global max counter here because as buckets 
become inactive and
+               // get removed from the list of active buckets, at the time 
when we want to create
+               // another part file for the bucket, if we start from 0 we may 
overwrite previous parts.
+
+               this.maxPartCounter = Math.max(maxPartCounter, 
bucket.getPartCounter());
+       }
 
+       private Bucket<IN, BucketID> getOrCreateBucketForBucketId(final 
BucketID bucketId) throws IOException {
                Bucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
                if (bucket == null) {
                        final Path bucketPath = assembleBucketPath(bucketId);
                        bucket = bucketFactory.getNewBucket(
-                                       fileSystemWriter,
+                                       fsWriter,
                                        subtaskIndex,
                                        bucketId,
                                        bucketPath,
-                                       maxPartCounterUsed,
-                                       partFileWriterFactory);
+                                       maxPartCounter,
+                                       partFileWriterFactory,
+                                       rollingPolicy);
                        activeBuckets.put(bucketId, bucket);
                }
-
-               final PartFileInfo<BucketID> info = 
bucket.getInProgressPartInfo();
-               if (info == null || rollingPolicy.shouldRollOnEvent(info, 
value)) {
-
-                       // info will be null if there is no currently open part 
file. This
-                       // is the case when we have a new, just created bucket 
or a bucket
-                       // that has not received any data after the closing of 
its previously
-                       // open in-progress file due to the specified rolling 
policy.
-
-                       bucket.rollPartFile(currentProcessingTime);
-               }
-               bucket.write(value, currentProcessingTime);
-
-               // we update the counter here because as buckets become 
inactive and
-               // get removed in the initializeState(), at the time we 
snapshot they
-               // may not be there to take them into account during 
checkpointing.
-               updateMaxPartCounter(bucket.getPartCounter());
+               return bucket;
        }
 
        void onProcessingTime(long timestamp) throws Exception {
                for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
-                       final PartFileInfo<BucketID> info = 
bucket.getInProgressPartInfo();
-                       if (info != null && 
rollingPolicy.shouldRollOnProcessingTime(info, timestamp)) {
-                               bucket.closePartFile();
-                       }
+                       bucket.onProcessingTime(timestamp);
                }
        }
 
        void close() {
                if (activeBuckets != null) {
-                       activeBuckets.values().forEach(Bucket::dispose);
+                       activeBuckets.values().forEach(Bucket::disposePartFile);
                }
        }
 
-       /**
-        * Assembles the final bucket {@link Path} that will be used for the 
provided bucket in the
-        * underlying filesystem.
-        * @param bucketId the id of the bucket as returned by the {@link 
Bucketer}.
-        * @return The resulting path.
-        */
        private Path assembleBucketPath(BucketID bucketId) {
                return new Path(basePath, bucketId.toString());
        }
 
        /**
-        * Updates the state keeping track of the maximum used part
-        * counter across all local active buckets.
-        * @param candidate the part counter that will potentially replace the 
current {@link #maxPartCounterUsed}.
-        */
-       private void updateMaxPartCounter(long candidate) {
-               maxPartCounterUsed = Math.max(maxPartCounterUsed, candidate);
-       }
-
-       /**
-        * The {@link Bucketer.Context} exposed to the
-        * {@link Bucketer#getBucketId(Object, Bucketer.Context)}
+        * The {@link BucketAssigner.Context} exposed to the
+        * {@link BucketAssigner#getBucketId(Object, BucketAssigner.Context)}
         * whenever a new incoming element arrives.
         */
-       private static final class BucketerContext implements Bucketer.Context {
+       private static final class BucketerContext implements 
BucketAssigner.Context {
 
                @Nullable
                private Long elementTimestamp;
@@ -309,8 +319,8 @@ private BucketerContext() {
                        this.currentProcessingTime = Long.MIN_VALUE;
                }
 
-               void update(@Nullable Long element, long watermark, long 
processingTime) {
-                       this.elementTimestamp = element;
+               void update(@Nullable Long elementTimestamp, long watermark, 
long processingTime) {
+                       this.elementTimestamp = elementTimestamp;
                        this.currentWatermark = watermark;
                        this.currentProcessingTime = processingTime;
                }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
index 7b8c8fe5c04..005ae4e737f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
@@ -66,7 +66,7 @@ void write(IN element, long currentTime) throws IOException {
        /**
         * A factory that creates {@link BulkPartWriter BulkPartWriters}.
         * @param <IN> The type of input elements.
-        * @param <BucketID> The type of ids for the buckets, as returned by 
the {@link Bucketer}.
+        * @param <BucketID> The type of ids for the buckets, as returned by 
the {@link BucketAssigner}.
         */
        static class Factory<IN, BucketID> implements 
PartFileWriter.PartFileFactory<IN, BucketID> {
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
similarity index 85%
rename from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
rename to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
index 532138f1ba2..1d2edbcaeb5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
@@ -28,7 +28,7 @@
  * A factory returning {@link Bucket buckets}.
  */
 @Internal
-class DefaultBucketFactory<IN, BucketID> implements BucketFactory<IN, 
BucketID> {
+class DefaultBucketFactoryImpl<IN, BucketID> implements BucketFactory<IN, 
BucketID> {
 
        private static final long serialVersionUID = 1L;
 
@@ -39,15 +39,17 @@
                        final BucketID bucketId,
                        final Path bucketPath,
                        final long initialPartCounter,
-                       final PartFileWriter.PartFileFactory<IN, BucketID> 
partFileWriterFactory) {
+                       final PartFileWriter.PartFileFactory<IN, BucketID> 
partFileWriterFactory,
+                       final RollingPolicy<IN, BucketID> rollingPolicy) {
 
-               return new Bucket<>(
+               return Bucket.getNew(
                                fsWriter,
                                subtaskIndex,
                                bucketId,
                                bucketPath,
                                initialPartCounter,
-                               partFileWriterFactory);
+                               partFileWriterFactory,
+                               rollingPolicy);
        }
 
        @Override
@@ -56,13 +58,15 @@
                        final int subtaskIndex,
                        final long initialPartCounter,
                        final PartFileWriter.PartFileFactory<IN, BucketID> 
partFileWriterFactory,
+                       final RollingPolicy<IN, BucketID> rollingPolicy,
                        final BucketState<BucketID> bucketState) throws 
IOException {
 
-               return new Bucket<>(
+               return Bucket.restore(
                                fsWriter,
                                subtaskIndex,
                                initialPartCounter,
                                partFileWriterFactory,
+                               rollingPolicy,
                                bucketState);
        }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
index dbd62a27d8c..7750acd7e58 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
@@ -32,7 +32,7 @@
 
        /**
         * @return The bucket identifier of the current buffer, as returned by 
the
-        * {@link Bucketer#getBucketId(Object, Bucketer.Context)}.
+        * {@link BucketAssigner#getBucketId(Object, BucketAssigner.Context)}.
         */
        BucketID getBucketId();
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
index 269b12c12f5..2478b79a527 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
@@ -54,7 +54,7 @@ void write(IN element, long currentTime) throws IOException {
        /**
         * A factory that creates {@link RowWisePartWriter RowWisePartWriters}.
         * @param <IN> The type of input elements.
-        * @param <BucketID> The type of ids for the buckets, as returned by 
the {@link Bucketer}.
+        * @param <BucketID> The type of ids for the buckets, as returned by 
the {@link BucketAssigner}.
         */
        static class Factory<IN, BucketID> implements 
PartFileWriter.PartFileFactory<IN, BucketID> {
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index 7635e372478..6c7e135caf8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -36,7 +36,7 @@
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -58,11 +58,11 @@
  * These part files contain the actual output data.
  *
  *
- * <p>The sink uses a {@link Bucketer} to determine in which bucket directory 
each element should
- * be written to inside the base directory. The {@code Bucketer} can, for 
example, use time or
- * a property of the element to determine the bucket directory. The default 
{@code Bucketer} is a
- * {@link DateTimeBucketer} which will create one new bucket every hour. You 
can specify
- * a custom {@code Bucketer} using the {@code setBucketer(Bucketer)} method, 
after calling
+ * <p>The sink uses a {@link BucketAssigner} to determine in which bucket 
directory each element should
+ * be written to inside the base directory. The {@code BucketAssigner} can, 
for example, use time or
+ * a property of the element to determine the bucket directory. The default 
{@code BucketAssigner} is a
+ * {@link DateTimeBucketAssigner} which will create one new bucket every hour. 
You can specify
+ * a custom {@code BucketAssigner} using the {@code 
setBucketAssigner(bucketAssigner)} method, after calling
  * {@link StreamingFileSink#forRowFormat(Path, Encoder)} or
  * {@link StreamingFileSink#forBulkFormat(Path, BulkWriter.Factory)}.
  *
@@ -151,7 +151,7 @@ private StreamingFileSink(
         */
        public static <IN> StreamingFileSink.RowFormatBuilder<IN, String> 
forRowFormat(
                        final Path basePath, final Encoder<IN> encoder) {
-               return new StreamingFileSink.RowFormatBuilder<>(basePath, 
encoder, new DateTimeBucketer<>());
+               return new StreamingFileSink.RowFormatBuilder<>(basePath, 
encoder, new DateTimeBucketAssigner<>());
        }
 
        /**
@@ -164,7 +164,7 @@ private StreamingFileSink(
         */
        public static <IN> StreamingFileSink.BulkFormatBuilder<IN, String> 
forBulkFormat(
                        final Path basePath, final BulkWriter.Factory<IN> 
writerFactory) {
-               return new StreamingFileSink.BulkFormatBuilder<>(basePath, 
writerFactory, new DateTimeBucketer<>());
+               return new StreamingFileSink.BulkFormatBuilder<>(basePath, 
writerFactory, new DateTimeBucketAssigner<>());
        }
 
        /**
@@ -191,50 +191,45 @@ private StreamingFileSink(
 
                private final Encoder<IN> encoder;
 
-               private final Bucketer<IN, BucketID> bucketer;
+               private final BucketAssigner<IN, BucketID> bucketAssigner;
 
                private final RollingPolicy<IN, BucketID> rollingPolicy;
 
                private final BucketFactory<IN, BucketID> bucketFactory;
 
-               RowFormatBuilder(Path basePath, Encoder<IN> encoder, 
Bucketer<IN, BucketID> bucketer) {
-                       this(basePath, encoder, bucketer, 
DefaultRollingPolicy.create().build(), 60L * 1000L, new 
DefaultBucketFactory<>());
+               RowFormatBuilder(Path basePath, Encoder<IN> encoder, 
BucketAssigner<IN, BucketID> bucketAssigner) {
+                       this(basePath, encoder, bucketAssigner, 
DefaultRollingPolicy.create().build(), 60L * 1000L, new 
DefaultBucketFactoryImpl<>());
                }
 
                private RowFormatBuilder(
                                Path basePath,
                                Encoder<IN> encoder,
-                               Bucketer<IN, BucketID> bucketer,
-                               RollingPolicy<IN, BucketID> rollingPolicy,
+                               BucketAssigner<IN, BucketID> assigner,
+                               RollingPolicy<IN, BucketID> policy,
                                long bucketCheckInterval,
                                BucketFactory<IN, BucketID> bucketFactory) {
                        this.basePath = Preconditions.checkNotNull(basePath);
                        this.encoder = Preconditions.checkNotNull(encoder);
-                       this.bucketer = Preconditions.checkNotNull(bucketer);
-                       this.rollingPolicy = 
Preconditions.checkNotNull(rollingPolicy);
+                       this.bucketAssigner = 
Preconditions.checkNotNull(assigner);
+                       this.rollingPolicy = Preconditions.checkNotNull(policy);
                        this.bucketCheckInterval = bucketCheckInterval;
                        this.bucketFactory = 
Preconditions.checkNotNull(bucketFactory);
                }
 
                public StreamingFileSink.RowFormatBuilder<IN, BucketID> 
withBucketCheckInterval(final long interval) {
-                       return new RowFormatBuilder<>(basePath, encoder, 
bucketer, rollingPolicy, interval, bucketFactory);
+                       return new RowFormatBuilder<>(basePath, encoder, 
bucketAssigner, rollingPolicy, interval, bucketFactory);
                }
 
-               public StreamingFileSink.RowFormatBuilder<IN, BucketID> 
withBucketer(final Bucketer<IN, BucketID> bucketer) {
-                       return new RowFormatBuilder<>(basePath, encoder, 
Preconditions.checkNotNull(bucketer), rollingPolicy, bucketCheckInterval, 
bucketFactory);
+               public StreamingFileSink.RowFormatBuilder<IN, BucketID> 
withBucketAssigner(final BucketAssigner<IN, BucketID> assigner) {
+                       return new RowFormatBuilder<>(basePath, encoder, 
Preconditions.checkNotNull(assigner), rollingPolicy, bucketCheckInterval, 
bucketFactory);
                }
 
                public StreamingFileSink.RowFormatBuilder<IN, BucketID> 
withRollingPolicy(final RollingPolicy<IN, BucketID> policy) {
-                       return new RowFormatBuilder<>(basePath, encoder, 
bucketer, Preconditions.checkNotNull(policy), bucketCheckInterval, 
bucketFactory);
+                       return new RowFormatBuilder<>(basePath, encoder, 
bucketAssigner, Preconditions.checkNotNull(policy), bucketCheckInterval, 
bucketFactory);
                }
 
-               public <ID> StreamingFileSink.RowFormatBuilder<IN, ID> 
withBucketerAndPolicy(final Bucketer<IN, ID> bucketer, final RollingPolicy<IN, 
ID> policy) {
-                       return new RowFormatBuilder<>(basePath, encoder, 
Preconditions.checkNotNull(bucketer), Preconditions.checkNotNull(policy), 
bucketCheckInterval, new DefaultBucketFactory<>());
-               }
-
-               @VisibleForTesting
-               StreamingFileSink.RowFormatBuilder<IN, BucketID> 
withBucketFactory(final BucketFactory<IN, BucketID> factory) {
-                       return new RowFormatBuilder<>(basePath, encoder, 
bucketer, rollingPolicy, bucketCheckInterval, 
Preconditions.checkNotNull(factory));
+               public <ID> StreamingFileSink.RowFormatBuilder<IN, ID> 
withBucketAssignerAndPolicy(final BucketAssigner<IN, ID> assigner, final 
RollingPolicy<IN, ID> policy) {
+                       return new RowFormatBuilder<>(basePath, encoder, 
Preconditions.checkNotNull(assigner), Preconditions.checkNotNull(policy), 
bucketCheckInterval, new DefaultBucketFactoryImpl<>());
                }
 
                /** Creates the actual sink. */
@@ -246,12 +241,17 @@ private RowFormatBuilder(
                Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws 
IOException {
                        return new Buckets<>(
                                        basePath,
-                                       bucketer,
+                                       bucketAssigner,
                                        bucketFactory,
                                        new 
RowWisePartWriter.Factory<>(encoder),
                                        rollingPolicy,
                                        subtaskIndex);
                }
+
+               @VisibleForTesting
+               StreamingFileSink.RowFormatBuilder<IN, BucketID> 
withBucketFactory(final BucketFactory<IN, BucketID> factory) {
+                       return new RowFormatBuilder<>(basePath, encoder, 
bucketAssigner, rollingPolicy, bucketCheckInterval, 
Preconditions.checkNotNull(factory));
+               }
        }
 
        /**
@@ -268,38 +268,38 @@ private RowFormatBuilder(
 
                private final BulkWriter.Factory<IN> writerFactory;
 
-               private final Bucketer<IN, BucketID> bucketer;
+               private final BucketAssigner<IN, BucketID> bucketAssigner;
 
                private final BucketFactory<IN, BucketID> bucketFactory;
 
-               BulkFormatBuilder(Path basePath, BulkWriter.Factory<IN> 
writerFactory, Bucketer<IN, BucketID> bucketer) {
-                       this(basePath, writerFactory, bucketer, 60L * 1000L, 
new DefaultBucketFactory<>());
+               BulkFormatBuilder(Path basePath, BulkWriter.Factory<IN> 
writerFactory, BucketAssigner<IN, BucketID> assigner) {
+                       this(basePath, writerFactory, assigner, 60L * 1000L, 
new DefaultBucketFactoryImpl<>());
                }
 
                private BulkFormatBuilder(
                                Path basePath,
                                BulkWriter.Factory<IN> writerFactory,
-                               Bucketer<IN, BucketID> bucketer,
+                               BucketAssigner<IN, BucketID> assigner,
                                long bucketCheckInterval,
                                BucketFactory<IN, BucketID> bucketFactory) {
                        this.basePath = Preconditions.checkNotNull(basePath);
                        this.writerFactory = writerFactory;
-                       this.bucketer = Preconditions.checkNotNull(bucketer);
+                       this.bucketAssigner = 
Preconditions.checkNotNull(assigner);
                        this.bucketCheckInterval = bucketCheckInterval;
                        this.bucketFactory = 
Preconditions.checkNotNull(bucketFactory);
                }
 
                public StreamingFileSink.BulkFormatBuilder<IN, BucketID> 
withBucketCheckInterval(long interval) {
-                       return new BulkFormatBuilder<>(basePath, writerFactory, 
bucketer, interval, bucketFactory);
+                       return new BulkFormatBuilder<>(basePath, writerFactory, 
bucketAssigner, interval, bucketFactory);
                }
 
-               public <ID> StreamingFileSink.BulkFormatBuilder<IN, ID> 
withBucketer(Bucketer<IN, ID> bucketer) {
-                       return new BulkFormatBuilder<>(basePath, writerFactory, 
Preconditions.checkNotNull(bucketer), bucketCheckInterval, new 
DefaultBucketFactory<>());
+               public <ID> StreamingFileSink.BulkFormatBuilder<IN, ID> 
withBucketAssigner(BucketAssigner<IN, ID> assigner) {
+                       return new BulkFormatBuilder<>(basePath, writerFactory, 
Preconditions.checkNotNull(assigner), bucketCheckInterval, new 
DefaultBucketFactoryImpl<>());
                }
 
                @VisibleForTesting
                StreamingFileSink.BulkFormatBuilder<IN, BucketID> 
withBucketFactory(final BucketFactory<IN, BucketID> factory) {
-                       return new BulkFormatBuilder<>(basePath, writerFactory, 
bucketer, bucketCheckInterval, Preconditions.checkNotNull(factory));
+                       return new BulkFormatBuilder<>(basePath, writerFactory, 
bucketAssigner, bucketCheckInterval, Preconditions.checkNotNull(factory));
                }
 
                /** Creates the actual sink. */
@@ -311,10 +311,10 @@ private BulkFormatBuilder(
                Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws 
IOException {
                        return new Buckets<>(
                                        basePath,
-                                       bucketer,
+                                       bucketAssigner,
                                        bucketFactory,
                                        new 
BulkPartWriter.Factory<>(writerFactory),
-                                       new OnCheckpointRollingPolicy<>(),
+                                       OnCheckpointRollingPolicy.build(),
                                        subtaskIndex);
                }
        }
@@ -337,7 +337,7 @@ public void initializeState(FunctionInitializationContext 
context) throws Except
 
        @Override
        public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
-               buckets.publishUpToCheckpoint(checkpointId);
+               buckets.commitUpToCheckpoint(checkpointId);
        }
 
        @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/BasePathBucketAssigner.java
similarity index 84%
rename from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
rename to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/BasePathBucketAssigner.java
index c35ba8031c2..3633004cc5d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/BasePathBucketAssigner.java
@@ -16,23 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
+package 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.streaming.api.functions.sink.filesystem.Bucketer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
 
 /**
- * A {@link Bucketer} that does not perform any
+ * A {@link BucketAssigner} that does not perform any
  * bucketing of files. All files are written to the base path.
  */
 @PublicEvolving
-public class BasePathBucketer<T> implements Bucketer<T, String> {
+public class BasePathBucketAssigner<T> implements BucketAssigner<T, String> {
 
        private static final long serialVersionUID = -6033643155550226022L;
 
        @Override
-       public String getBucketId(T element, Bucketer.Context context) {
+       public String getBucketId(T element, BucketAssigner.Context context) {
                return "";
        }
 
@@ -44,6 +44,6 @@ public String getBucketId(T element, Bucketer.Context 
context) {
 
        @Override
        public String toString() {
-               return "BasePathBucketer";
+               return "BasePathBucketAssigner";
        }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java
similarity index 87%
rename from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
rename to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java
index d226d200f2f..5d30f39fa2e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java
@@ -16,17 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
+package 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.streaming.api.functions.sink.filesystem.Bucketer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
 
 import java.text.SimpleDateFormat;
 import java.util.Date;
 
 /**
- * A {@link Bucketer} that assigns to buckets based on current system time.
+ * A {@link BucketAssigner} that assigns to buckets based on current system 
time.
  *
  *
  * <p>The {@code DateTimeBucketer} will create directories of the following 
form:
@@ -52,7 +52,7 @@
  *
  */
 @PublicEvolving
-public class DateTimeBucketer<IN> implements Bucketer<IN, String> {
+public class DateTimeBucketAssigner<IN> implements BucketAssigner<IN, String> {
 
        private static final long serialVersionUID = 1L;
 
@@ -65,7 +65,7 @@
        /**
         * Creates a new {@code DateTimeBucketer} with format string {@code 
"yyyy-MM-dd--HH"}.
         */
-       public DateTimeBucketer() {
+       public DateTimeBucketAssigner() {
                this(DEFAULT_FORMAT_STRING);
        }
 
@@ -75,12 +75,12 @@ public DateTimeBucketer() {
         * @param formatString The format string that will be given to {@code 
SimpleDateFormat} to determine
         *                     the bucket path.
         */
-       public DateTimeBucketer(String formatString) {
+       public DateTimeBucketAssigner(String formatString) {
                this.formatString = formatString;
        }
 
        @Override
-       public String getBucketId(IN element, Bucketer.Context context) {
+       public String getBucketId(IN element, BucketAssigner.Context context) {
                if (dateFormatter == null) {
                        dateFormatter = new SimpleDateFormat(formatString);
                }
@@ -94,8 +94,6 @@ public String getBucketId(IN element, Bucketer.Context 
context) {
 
        @Override
        public String toString() {
-               return "DateTimeBucketer{" +
-                               "formatString='" + formatString + '\'' +
-                               '}';
+               return "DateTimeBucketAssigner{formatString='" + formatString + 
'\'' + '}';
        }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/SimpleVersionedStringSerializer.java
similarity index 96%
rename from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java
rename to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/SimpleVersionedStringSerializer.java
index d025af97650..4726aff967c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/SimpleVersionedStringSerializer.java
@@ -16,8 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
+package 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
@@ -29,6 +30,7 @@
 /**
  * A {@link SimpleVersionedSerializer} implementation for Strings.
  */
+@PublicEvolving
 public final class SimpleVersionedStringSerializer implements 
SimpleVersionedSerializer<String> {
 
        private static final Charset CHARSET = StandardCharsets.UTF_8;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
index b4b1cebf88c..7c75f1c5e25 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
@@ -87,7 +87,10 @@ public boolean shouldRollOnProcessingTime(final 
PartFileInfo<BucketID> partFileS
         * To finalize it and have the actual policy, call {@code .create()}.
         */
        public static DefaultRollingPolicy.PolicyBuilder create() {
-               return new DefaultRollingPolicy.PolicyBuilder();
+               return new DefaultRollingPolicy.PolicyBuilder(
+                               DEFAULT_MAX_PART_SIZE,
+                               DEFAULT_ROLLOVER_INTERVAL,
+                               DEFAULT_INACTIVITY_INTERVAL);
        }
 
        /**
@@ -96,42 +99,46 @@ public boolean shouldRollOnProcessingTime(final 
PartFileInfo<BucketID> partFileS
        @PublicEvolving
        public static final class PolicyBuilder {
 
-               private long partSize = DEFAULT_MAX_PART_SIZE;
+               private final long partSize;
 
-               private long rolloverInterval = DEFAULT_ROLLOVER_INTERVAL;
+               private final long rolloverInterval;
 
-               private long inactivityInterval = DEFAULT_INACTIVITY_INTERVAL;
+               private final long inactivityInterval;
 
-               private PolicyBuilder() {}
+               private PolicyBuilder(
+                               final long partSize,
+                               final long rolloverInterval,
+                               final long inactivityInterval) {
+                       this.partSize = partSize;
+                       this.rolloverInterval = rolloverInterval;
+                       this.inactivityInterval = inactivityInterval;
+               }
 
                /**
                 * Sets the part size above which a part file will have to roll.
                 * @param size the allowed part size.
                 */
-               public DefaultRollingPolicy.PolicyBuilder withMaxPartSize(long 
size) {
+               public DefaultRollingPolicy.PolicyBuilder withMaxPartSize(final 
long size) {
                        Preconditions.checkState(size > 0L);
-                       this.partSize = size;
-                       return this;
+                       return new PolicyBuilder(size, rolloverInterval, 
inactivityInterval);
                }
 
                /**
                 * Sets the interval of allowed inactivity after which a part 
file will have to roll.
                 * @param interval the allowed inactivity interval.
                 */
-               public DefaultRollingPolicy.PolicyBuilder 
withInactivityInterval(long interval) {
+               public DefaultRollingPolicy.PolicyBuilder 
withInactivityInterval(final long interval) {
                        Preconditions.checkState(interval > 0L);
-                       this.inactivityInterval = interval;
-                       return this;
+                       return new PolicyBuilder(partSize, rolloverInterval, 
interval);
                }
 
                /**
                 * Sets the max time a part file can stay open before having to 
roll.
                 * @param interval the desired rollover interval.
                 */
-               public DefaultRollingPolicy.PolicyBuilder 
withRolloverInterval(long interval) {
+               public DefaultRollingPolicy.PolicyBuilder 
withRolloverInterval(final long interval) {
                        Preconditions.checkState(interval > 0L);
-                       this.rolloverInterval = interval;
-                       return this;
+                       return new PolicyBuilder(partSize, interval, 
inactivityInterval);
                }
 
                /**
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
index 9ad8172e9de..53fce082c3b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
@@ -18,16 +18,20 @@
 
 package 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
 import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
 
 /**
  * A {@link RollingPolicy} which rolls on every checkpoint.
  */
+@PublicEvolving
 public class OnCheckpointRollingPolicy<IN, BucketID> implements 
RollingPolicy<IN, BucketID> {
 
        private static final long serialVersionUID = 1L;
 
+       private OnCheckpointRollingPolicy() {}
+
        @Override
        public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> 
partFileState) {
                return true;
@@ -42,4 +46,8 @@ public boolean shouldRollOnEvent(PartFileInfo<BucketID> 
partFileState, IN elemen
        public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> 
partFileState, long currentTime) {
                return false;
        }
+
+       public static <IN, BucketID> OnCheckpointRollingPolicy<IN, BucketID> 
build() {
+               return new OnCheckpointRollingPolicy<>();
+       }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
index 3d5be6340ff..55360a4a353 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
@@ -25,7 +25,7 @@
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
-import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
 
 import org.junit.Assert;
 import org.junit.ClassRule;
@@ -75,8 +75,8 @@ public void testSerializationEmpty() throws IOException {
                final BucketState<String> recoveredState =  
SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
 
                Assert.assertEquals(testBucket, recoveredState.getBucketPath());
-               Assert.assertNull(recoveredState.getInProgress());
-               
Assert.assertTrue(recoveredState.getPendingPerCheckpoint().isEmpty());
+               Assert.assertNull(recoveredState.getInProgressResumableFile());
+               
Assert.assertTrue(recoveredState.getCommittableFilesPerCheckpoint().isEmpty());
        }
 
        @Test
@@ -166,7 +166,7 @@ public void testSerializationFull() throws IOException {
 
                Assert.assertEquals(bucketPath, recoveredState.getBucketPath());
 
-               final Map<Long, List<RecoverableWriter.CommitRecoverable>> 
recoveredRecoverables = recoveredState.getPendingPerCheckpoint();
+               final Map<Long, List<RecoverableWriter.CommitRecoverable>> 
recoveredRecoverables = recoveredState.getCommittableFilesPerCheckpoint();
                Assert.assertEquals(5L, recoveredRecoverables.size());
 
                // recover and commit
@@ -238,9 +238,9 @@ public void testSerializationNullInProgress() throws 
IOException {
                final BucketState<String> recoveredState =  
SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
 
                Assert.assertEquals(bucketPath, recoveredState.getBucketPath());
-               Assert.assertNull(recoveredState.getInProgress());
+               Assert.assertNull(recoveredState.getInProgressResumableFile());
 
-               final Map<Long, List<RecoverableWriter.CommitRecoverable>> 
recoveredRecoverables = recoveredState.getPendingPerCheckpoint();
+               final Map<Long, List<RecoverableWriter.CommitRecoverable>> 
recoveredRecoverables = recoveredState.getCommittableFilesPerCheckpoint();
                Assert.assertEquals(5L, recoveredRecoverables.size());
 
                // recover and commit
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
index 5e8eb6d9ba7..25622d14466 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
@@ -22,7 +22,7 @@
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
 
 import org.junit.Assert;
 import org.junit.ClassRule;
@@ -58,7 +58,7 @@ private void testCorrectPassingOfContext(Long timestamp, long 
watermark, long pr
 
                final Buckets<String, String> buckets = StreamingFileSink
                                .<String>forRowFormat(new Path(outDir.toURI()), 
new SimpleStringEncoder<>())
-                               .withBucketer(new 
VarifyingBucketer(expectedTimestamp, expectedWatermark, expectedProcessingTime))
+                               .withBucketAssigner(new 
VarifyingBucketer(expectedTimestamp, expectedWatermark, expectedProcessingTime))
                                .createBuckets(2);
 
                buckets.onElement("TEST", new SinkFunction.Context() {
@@ -79,7 +79,7 @@ public Long timestamp() {
                });
        }
 
-       private static class VarifyingBucketer implements Bucketer<String, 
String> {
+       private static class VarifyingBucketer implements 
BucketAssigner<String, String> {
 
                private static final long serialVersionUID = 
7729086510972377578L;
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
index 7b6b82cd4eb..20890c07a93 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
@@ -59,7 +59,7 @@ public void testCustomBulkWriter() throws Exception {
                                                                10L,
                                                                new 
TestUtils.TupleToStringBucketer(),
                                                                new 
TestBulkWriterFactory(),
-                                                               new 
DefaultBucketFactory<>())
+                                                               new 
DefaultBucketFactoryImpl<>())
                ) {
 
                        testHarness.setup();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
index a0c438e1847..ab487d168bc 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
@@ -491,8 +491,8 @@ public void testMaxCounterUponRecovery() throws Exception {
 
                OperatorSubtaskState mergedSnapshot;
 
-               final TestBucketFactory first = new TestBucketFactory();
-               final TestBucketFactory second = new TestBucketFactory();
+               final TestBucketFactoryImpl first = new TestBucketFactoryImpl();
+               final TestBucketFactoryImpl second = new 
TestBucketFactoryImpl();
 
                final RollingPolicy<Tuple2<String, Integer>, String> 
rollingPolicy = DefaultRollingPolicy
                                .create()
@@ -526,8 +526,8 @@ public void testMaxCounterUponRecovery() throws Exception {
                        );
                }
 
-               final TestBucketFactory firstRecovered = new 
TestBucketFactory();
-               final TestBucketFactory secondRecovered = new 
TestBucketFactory();
+               final TestBucketFactoryImpl firstRecovered = new 
TestBucketFactoryImpl();
+               final TestBucketFactoryImpl secondRecovered = new 
TestBucketFactoryImpl();
 
                try (
                                
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 
= TestUtils.createCustomRescalingTestSink(
@@ -559,7 +559,7 @@ public void testMaxCounterUponRecovery() throws Exception {
 
        //////////////////////                  Helper Methods                  
//////////////////////
 
-       static class TestBucketFactory extends 
DefaultBucketFactory<Tuple2<String, Integer>, String> {
+       static class TestBucketFactoryImpl extends 
DefaultBucketFactoryImpl<Tuple2<String, Integer>, String> {
 
                private static final long serialVersionUID = 
2794824980604027930L;
 
@@ -572,7 +572,8 @@ public void testMaxCounterUponRecovery() throws Exception {
                                final String bucketId,
                                final Path bucketPath,
                                final long initialPartCounter,
-                               final 
PartFileWriter.PartFileFactory<Tuple2<String, Integer>, String> 
partFileWriterFactory) {
+                               final 
PartFileWriter.PartFileFactory<Tuple2<String, Integer>, String> 
partFileWriterFactory,
+                               final RollingPolicy<Tuple2<String, Integer>, 
String> rollingPolicy) {
 
                        this.initialCounter = initialPartCounter;
 
@@ -582,7 +583,8 @@ public void testMaxCounterUponRecovery() throws Exception {
                                        bucketId,
                                        bucketPath,
                                        initialPartCounter,
-                                       partFileWriterFactory);
+                                       partFileWriterFactory,
+                                       rollingPolicy);
                }
 
                @Override
@@ -591,6 +593,7 @@ public void testMaxCounterUponRecovery() throws Exception {
                                final int subtaskIndex,
                                final long initialPartCounter,
                                final 
PartFileWriter.PartFileFactory<Tuple2<String, Integer>, String> 
partFileWriterFactory,
+                               final RollingPolicy<Tuple2<String, Integer>, 
String> rollingPolicy,
                                final BucketState<String> bucketState) throws 
IOException {
 
                        this.initialCounter = initialPartCounter;
@@ -600,6 +603,7 @@ public void testMaxCounterUponRecovery() throws Exception {
                                        subtaskIndex,
                                        initialPartCounter,
                                        partFileWriterFactory,
+                                       rollingPolicy,
                                        bucketState);
                }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
index f16a9085d9d..851b6825d9a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
@@ -67,7 +67,7 @@ public void testDefaultRollingPolicy() throws Exception {
                                                new 
TestUtils.TupleToStringBucketer(),
                                                new SimpleStringEncoder<>(),
                                                rollingPolicy,
-                                               new DefaultBucketFactory<>())
+                                               new 
DefaultBucketFactoryImpl<>())
                ) {
                        testHarness.setup();
                        testHarness.open();
@@ -111,7 +111,7 @@ public void testDefaultRollingPolicy() throws Exception {
        public void testRollOnCheckpointPolicy() throws Exception {
                final File outDir = TEMP_FOLDER.newFolder();
 
-               final RollingPolicy<Tuple2<String, Integer>, String> 
rollingPolicy = new OnCheckpointRollingPolicy<>();
+               final RollingPolicy<Tuple2<String, Integer>, String> 
rollingPolicy = OnCheckpointRollingPolicy.build();
 
                try (
                                
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness 
= TestUtils.createCustomRescalingTestSink(
@@ -122,7 +122,7 @@ public void testRollOnCheckpointPolicy() throws Exception {
                                                new 
TestUtils.TupleToStringBucketer(),
                                                new SimpleStringEncoder<>(),
                                                rollingPolicy,
-                                               new DefaultBucketFactory<>())
+                                               new 
DefaultBucketFactoryImpl<>())
                ) {
                        testHarness.setup();
                        testHarness.open();
@@ -246,7 +246,7 @@ public boolean 
shouldRollOnProcessingTime(PartFileInfo<String> partFileState, lo
                                                new 
TestUtils.TupleToStringBucketer(),
                                                new SimpleStringEncoder<>(),
                                                rollingPolicy,
-                                               new DefaultBucketFactory<>())
+                                               new 
DefaultBucketFactoryImpl<>())
                ) {
                        testHarness.setup();
                        testHarness.open();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
index 8d9392b3c37..bfbc12043ee 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
@@ -23,7 +23,7 @@
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
-import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -58,7 +58,7 @@
                                                
.withInactivityInterval(inactivityInterval)
                                                .build();
 
-               final Bucketer<Tuple2<String, Integer>, String> bucketer = new 
TupleToStringBucketer();
+               final BucketAssigner<Tuple2<String, Integer>, String> bucketer 
= new TupleToStringBucketer();
 
                final Encoder<Tuple2<String, Integer>> encoder = (element, 
stream) -> {
                        stream.write((element.f0 + '@' + 
element.f1).getBytes(StandardCharsets.UTF_8));
@@ -73,7 +73,7 @@
                                bucketer,
                                encoder,
                                rollingPolicy,
-                               new DefaultBucketFactory<>());
+                               new DefaultBucketFactoryImpl<>());
        }
 
        static OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Object> createCustomRescalingTestSink(
@@ -81,14 +81,14 @@
                        final int totalParallelism,
                        final int taskIdx,
                        final long bucketCheckInterval,
-                       final Bucketer<Tuple2<String, Integer>, String> 
bucketer,
+                       final BucketAssigner<Tuple2<String, Integer>, String> 
bucketer,
                        final Encoder<Tuple2<String, Integer>> writer,
                        final RollingPolicy<Tuple2<String, Integer>, String> 
rollingPolicy,
                        final BucketFactory<Tuple2<String, Integer>, String> 
bucketFactory) throws Exception {
 
                StreamingFileSink<Tuple2<String, Integer>> sink = 
StreamingFileSink
                                .forRowFormat(new Path(outDir.toURI()), writer)
-                               .withBucketer(bucketer)
+                               .withBucketAssigner(bucketer)
                                .withRollingPolicy(rollingPolicy)
                                .withBucketCheckInterval(bucketCheckInterval)
                                .withBucketFactory(bucketFactory)
@@ -102,13 +102,13 @@
                        final int totalParallelism,
                        final int taskIdx,
                        final long bucketCheckInterval,
-                       final Bucketer<Tuple2<String, Integer>, String> 
bucketer,
+                       final BucketAssigner<Tuple2<String, Integer>, String> 
bucketer,
                        final BulkWriter.Factory<Tuple2<String, Integer>> 
writer,
                        final BucketFactory<Tuple2<String, Integer>, String> 
bucketFactory) throws Exception {
 
                StreamingFileSink<Tuple2<String, Integer>> sink = 
StreamingFileSink
                                .forBulkFormat(new Path(outDir.toURI()), writer)
-                               .withBucketer(bucketer)
+                               .withBucketAssigner(bucketer)
                                .withBucketCheckInterval(bucketCheckInterval)
                                .withBucketFactory(bucketFactory)
                                .build();
@@ -146,7 +146,7 @@ static void checkLocalFs(File outDir, int 
expectedInProgress, int expectedComple
                return contents;
        }
 
-       static class TupleToStringBucketer implements Bucketer<Tuple2<String, 
Integer>, String> {
+       static class TupleToStringBucketer implements 
BucketAssigner<Tuple2<String, Integer>, String> {
 
                private static final long serialVersionUID = 1L;
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to