[FLINK-5096] Make the RollingSink rescalable.

Integrates the RollingSink with the new state abstractions so
that its parallelism can change after resuming execution from
a savepoint.

This closes #2845.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e2b93f69
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e2b93f69
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e2b93f69

Branch: refs/heads/master
Commit: e2b93f69cea0c8a8d05591285cb9c2f4dc64c619
Parents: 333da3a
Author: kl0u <[email protected]>
Authored: Thu Nov 10 16:57:20 2016 +0100
Committer: zentol <[email protected]>
Committed: Sun Nov 27 12:49:54 2016 +0100

----------------------------------------------------------------------
 .../streaming/connectors/fs/RollingSink.java    | 305 +++++++++----------
 .../fs/RollingSinkFaultTolerance2ITCase.java    | 292 ------------------
 .../fs/RollingSinkFaultToleranceITCase.java     |  29 +-
 .../connectors/fs/RollingSinkITCase.java        | 245 ++++++++++++++-
 4 files changed, 404 insertions(+), 467 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e2b93f69/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index b959bf8..fc4a35e 100644
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,19 +19,22 @@ package org.apache.flink.streaming.connectors.fs;
 
 import org.apache.commons.lang3.time.StopWatch;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
+import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,7 +45,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -50,7 +53,7 @@ import java.util.UUID;
 
 /**
  * Sink that emits its input elements to rolling {@link 
org.apache.hadoop.fs.FileSystem} files. This
- * is itegrated with the checkpointing mechanism to provide exactly once 
semantics.
+ * is integrated with the checkpointing mechanism to provide exactly once 
semantics.
  *
  * <p>
  * When creating the sink a {@code basePath} must be specified. The base 
directory contains
@@ -124,7 +127,9 @@ import java.util.UUID;
  * @deprecated use {@link BucketingSink} instead.
  */
 @Deprecated
-public class RollingSink<T> extends RichSinkFunction<T> implements 
InputTypeConfigurable, Checkpointed<RollingSink.BucketState>, 
CheckpointListener {
+public class RollingSink<T> extends RichSinkFunction<T>
+               implements InputTypeConfigurable, CheckpointedFunction, 
CheckpointListener {
+
        private static final long serialVersionUID = 1L;
 
        private static Logger LOG = LoggerFactory.getLogger(RollingSink.class);
@@ -136,9 +141,7 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
        // These are initialized with some defaults but are meant to be 
changeable by the user
 
        /**
-        * The default maximum size of part files.
-        *
-        * 6 times the default block size
+        * The default maximum size of part files (currently {@code 384 MB}).
         */
        private final long DEFAULT_BATCH_SIZE = 1024L * 1024L * 384L;
 
@@ -189,7 +192,7 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
 
 
        /**
-        * The base {@code Path} that stored all rolling bucket directories.
+        * The base {@code Path} that stores all bucket directories.
         */
        private final String basePath;
 
@@ -215,15 +218,6 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
         */
        private long batchSize;
 
-       /**
-        * If this is true we remove any leftover in-progress/pending files 
when the sink is opened.
-        *
-        * <p>
-        * This should only be set to false if using the sink without 
checkpoints, to not remove
-        * the files already in the directory.
-        */
-       private boolean cleanupOnOpen = true;
-
        // These are the actually configured prefixes/suffixes
        private String inProgressSuffix = DEFAULT_IN_PROGRESS_SUFFIX;
        private String inProgressPrefix = DEFAULT_IN_PROGRESS_PREFIX;
@@ -258,11 +252,6 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
        private transient Path currentBucketDirectory;
 
        /**
-        * Our subtask index, retrieved from the {@code RuntimeContext} in 
{@link #open}.
-        */
-       private transient int subtaskIndex;
-
-       /**
         * For counting the part files inside a bucket directory. Part files 
follow the patter
         * {@code "{part-prefix}-{subtask}-{count}"}. When creating new part 
files we increase the counter.
         */
@@ -271,7 +260,7 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
        /**
         * Tracks if the writer is currently opened or closed.
         */
-       private transient boolean isWriterOpen = false;
+       private transient boolean isWriterOpen;
 
        /**
         * We use reflection to get the .truncate() method, this is only 
available starting with
@@ -285,10 +274,12 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
         */
        private transient BucketState bucketState;
 
+       private transient ListState<BucketState> restoredBucketStates;
+
        /**
         * User-defined FileSystem parameters.
      */
-       private Configuration fsConfig = null;
+       private Configuration fsConfig;
 
        /**
         * The FileSystem reference.
@@ -328,7 +319,7 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
                this.fsConfig = new Configuration();
                for(Map.Entry<String, String> entry : config) {
                        fsConfig.setString(entry.getKey(), entry.getValue());
-               };
+               }
                return this;
        }
 
@@ -341,63 +332,57 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
        }
 
        @Override
+       public void initializeState(FunctionInitializationContext context) 
throws Exception {
+               Preconditions.checkArgument(this.restoredBucketStates == null,
+                       "The " + getClass().getSimpleName() + " has already 
been initialized.");
+
+               initFileSystem();
+
+               if (this.refTruncate == null) {
+                       this.refTruncate = reflectTruncate(fs);
+               }
+
+               OperatorStateStore stateStore = context.getOperatorStateStore();
+               restoredBucketStates = 
stateStore.getSerializableListState("rolling-states");
+
+               int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+               if (context.isRestored()) {
+                       LOG.info("Restoring state for the {} (taskIdx={}).", 
getClass().getSimpleName(), subtaskIndex);
+
+                       for (BucketState bucketState : 
restoredBucketStates.get()) {
+                               handleRestoredBucketState(bucketState);
+                       }
+
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("{} (taskIdx= {}) restored {}", 
getClass().getSimpleName(), subtaskIndex, bucketState);
+                       }
+               } else {
+                       LOG.info("No state to restore for the {} (taskIdx= 
{}).", getClass().getSimpleName(), subtaskIndex);
+               }
+       }
+
+       @Override
        public void open(Configuration parameters) throws Exception {
                super.open(parameters);
 
-               subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
                partCounter = 0;
 
                this.writer = writerTemplate.duplicate();
 
-               if (bucketState == null) {
-                       bucketState = new BucketState();
-               }
-
-               initFileSystem();
-               refTruncate = reflectTruncate(fs);
-
-               // delete pending/in-progress files that might be left if we 
fail while
-               // no checkpoint has yet been done
-               try {
-                       if (fs.exists(new Path(basePath)) && cleanupOnOpen) {
-                               RemoteIterator<LocatedFileStatus> bucketFiles = 
fs.listFiles(new Path(basePath), true);
-
-                               while (bucketFiles.hasNext()) {
-                                       LocatedFileStatus file = 
bucketFiles.next();
-                                       if 
(file.getPath().toString().endsWith(pendingSuffix)) {
-                                               // only delete files that 
contain our subtask index
-                                               if 
(file.getPath().toString().contains(partPrefix + "-" + subtaskIndex + "-")) {
-                                                       LOG.debug("(OPEN) 
Deleting leftover pending file {}", file.getPath().toString());
-                                                       
fs.delete(file.getPath(), true);
-                                               }
-                                       }
-                                       if 
(file.getPath().toString().endsWith(inProgressSuffix)) {
-                                               // only delete files that 
contain our subtask index
-                                               if 
(file.getPath().toString().contains(partPrefix + "-" + subtaskIndex + "-")) {
-                                                       LOG.debug("(OPEN) 
Deleting leftover in-progress file {}", file.getPath().toString());
-                                                       
fs.delete(file.getPath(), true);
-                                               }
-                                       }
-                               }
-                       }
-               } catch (IOException e) {
-                       LOG.error("Error while deleting leftover 
pending/in-progress files: {}", e);
-                       throw new RuntimeException("Error while deleting 
leftover pending/in-progress files.", e);
-               }
+               bucketState = new BucketState();
        }
 
        /**
-        * create a file system with the user defined hdfs config
+        * Create a file system with the user-defined hdfs config
         * @throws IOException
         */
        private void initFileSystem() throws IOException {
-               if(fs != null) {
+               if (fs != null) {
                        return;
                }
                org.apache.hadoop.conf.Configuration hadoopConf = 
HadoopFileSystem.getHadoopConfiguration();
-               if(fsConfig != null) {
-                       String disableCacheName
-                               = String.format("fs.%s.impl.disable.cache", new 
Object[]{new Path(basePath).toUri().getScheme()});
+               if (fsConfig != null) {
+                       String disableCacheName = 
String.format("fs.%s.impl.disable.cache", new 
Path(basePath).toUri().getScheme());
                        hadoopConf.setBoolean(disableCacheName, true);
                        for (String key : fsConfig.keySet()) {
                                hadoopConf.set(key, fsConfig.getString(key, 
null));
@@ -409,21 +394,14 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
 
        @Override
        public void close() throws Exception {
-//             boolean interrupted = Thread.interrupted();
                closeCurrentPartFile();
-
-//             if (interrupted) {
-//                     Thread.currentThread().interrupt();
-//             }
        }
 
        @Override
        public void invoke(T value) throws Exception {
-
                if (shouldRoll()) {
                        openNewPartFile();
                }
-
                writer.write(value);
        }
 
@@ -436,6 +414,7 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
         */
        private boolean shouldRoll() throws IOException {
                boolean shouldRoll = false;
+               int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
                if (!isWriterOpen) {
                        shouldRoll = true;
                        LOG.debug("RollingSink {} starting new initial bucket. 
", subtaskIndex);
@@ -482,12 +461,14 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
                        }
                }
 
-
+               int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
                currentPartPath = new Path(currentBucketDirectory, partPrefix + 
"-" + subtaskIndex + "-" + partCounter);
 
                // This should work since there is only one parallel subtask 
that tries names with
                // our subtask id. Otherwise we would run into concurrency 
issues here.
-               while (fs.exists(currentPartPath) || fs.exists(new 
Path(currentPartPath.getParent(), pendingPrefix + 
currentPartPath.getName()).suffix(pendingSuffix))) {
+               while (fs.exists(currentPartPath) ||
+                               fs.exists(getPendingPathFor(currentPartPath)) ||
+                               
fs.exists(getInProgressPathFor(currentPartPath))) {
                        partCounter++;
                        currentPartPath = new Path(currentBucketDirectory, 
partPrefix + "-" + subtaskIndex + "-" + partCounter);
                }
@@ -497,11 +478,23 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
 
                LOG.debug("Next part path is {}", currentPartPath.toString());
 
-               Path inProgressPath = new Path(currentPartPath.getParent(), 
inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix);
+               Path inProgressPath = getInProgressPathFor(currentPartPath);
                writer.open(fs, inProgressPath);
                isWriterOpen = true;
        }
 
+       private Path getPendingPathFor(Path path) {
+               return new Path(path.getParent(), pendingPrefix + 
path.getName()).suffix(pendingSuffix);
+       }
+
+       private Path getInProgressPathFor(Path path) {
+               return new Path(path.getParent(), inProgressPrefix + 
path.getName()).suffix(inProgressSuffix);
+       }
+
+       private Path getValidLengthPathFor(Path path) {
+               return new Path(path.getParent(), validLengthPrefix + 
path.getName()).suffix(validLengthSuffix);
+       }
+
        /**
         * Closes the current part file.
         *
@@ -516,25 +509,22 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
                }
 
                if (currentPartPath != null) {
-                       Path inProgressPath = new 
Path(currentPartPath.getParent(), inProgressPrefix + 
currentPartPath.getName()).suffix(inProgressSuffix);
-                       Path pendingPath = new 
Path(currentPartPath.getParent(), pendingPrefix + 
currentPartPath.getName()).suffix(pendingSuffix);
+                       Path inProgressPath = 
getInProgressPathFor(currentPartPath);
+                       Path pendingPath = getPendingPathFor(currentPartPath);
                        fs.rename(inProgressPath, pendingPath);
-                       LOG.debug("Moving in-progress bucket {} to pending file 
{}",
-                                       inProgressPath,
-                                       pendingPath);
+                       LOG.debug("Moving in-progress bucket {} to pending file 
{}", inProgressPath, pendingPath);
                        
this.bucketState.pendingFiles.add(currentPartPath.toString());
                }
        }
 
        /**
         * Gets the truncate() call using reflection.
-        *
         * <p>
-        * Note: This code comes from Flume
+        * <b>NOTE: </b>This code comes from Flume
         */
        private Method reflectTruncate(FileSystem fs) {
                Method m = null;
-               if(fs != null) {
+               if (fs != null) {
                        Class<?> fsClass = fs.getClass();
                        try {
                                m = fsClass.getMethod("truncate", Path.class, 
long.class);
@@ -544,7 +534,6 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
                                return null;
                        }
 
-
                        // verify that truncate actually works
                        FSDataOutputStream outputStream;
                        Path testPath = new Path(UUID.randomUUID().toString());
@@ -557,7 +546,6 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
                                throw new RuntimeException("Could not create 
file for checking if truncate works.", e);
                        }
 
-
                        try {
                                m.invoke(fs, testPath, 2);
                        } catch (IllegalAccessException | 
InvocationTargetException e) {
@@ -575,75 +563,75 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
                return m;
        }
 
-
        @Override
        public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
                synchronized (bucketState.pendingFilesPerCheckpoint) {
-                       Set<Long> pastCheckpointIds = 
bucketState.pendingFilesPerCheckpoint.keySet();
-                       Set<Long> checkpointsToRemove = new HashSet<>();
-                       for (Long pastCheckpointId : pastCheckpointIds) {
+                       Iterator<Map.Entry<Long, List<String>>> 
pendingCheckpointsIt =
+                               
bucketState.pendingFilesPerCheckpoint.entrySet().iterator();
+
+                       while (pendingCheckpointsIt.hasNext()) {
+                               Map.Entry<Long, List<String>> entry = 
pendingCheckpointsIt.next();
+                               Long pastCheckpointId = entry.getKey();
+
                                if (pastCheckpointId <= checkpointId) {
                                        LOG.debug("Moving pending files to 
final location for checkpoint {}", pastCheckpointId);
                                        // All the pending files are buckets 
that have been completed but are waiting to be renamed
                                        // to their final name
-                                       for (String filename : 
bucketState.pendingFilesPerCheckpoint.get(
-                                                       pastCheckpointId)) {
+                                       for (String filename : 
entry.getValue()) {
                                                Path finalPath = new 
Path(filename);
-                                               Path pendingPath = new 
Path(finalPath.getParent(),
-                                                               pendingPrefix + 
finalPath.getName()).suffix(pendingSuffix);
+                                               Path pendingPath = 
getPendingPathFor(finalPath);
 
                                                fs.rename(pendingPath, 
finalPath);
-                                               LOG.debug(
-                                                               "Moving pending 
file {} to final location after complete checkpoint {}.",
-                                                               pendingPath,
-                                                               
pastCheckpointId);
+                                               LOG.debug("Moving pending file 
{} to final location after complete checkpoint {}.",
+                                                               pendingPath, 
pastCheckpointId);
                                        }
-                                       
checkpointsToRemove.add(pastCheckpointId);
+                                       pendingCheckpointsIt.remove();
                                }
                        }
-                       for (Long toRemove: checkpointsToRemove) {
-                               
bucketState.pendingFilesPerCheckpoint.remove(toRemove);
-                       }
                }
        }
 
        @Override
-       public BucketState snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
+       public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+               Preconditions.checkNotNull(restoredBucketStates,
+                       "The " + getClass().getSimpleName() + " has not been 
properly initialized.");
+
+               int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
+               
                if (isWriterOpen) {
-                       long pos = writer.flush();
                        bucketState.currentFile = currentPartPath.toString();
-                       bucketState.currentFileValidLength = pos;
+                       bucketState.currentFileValidLength = writer.flush();
                }
+
                synchronized (bucketState.pendingFilesPerCheckpoint) {
-                       bucketState.pendingFilesPerCheckpoint.put(checkpointId, 
bucketState.pendingFiles);
+                       
bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), 
bucketState.pendingFiles);
                }
                bucketState.pendingFiles = new ArrayList<>();
-               return bucketState;
+
+               restoredBucketStates.clear();
+               restoredBucketStates.add(bucketState);
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("{} (taskIdx={}) checkpointed {}.", 
getClass().getSimpleName(), subtaskIdx, bucketState);
+               }
        }
 
-       @Override
-       public void restoreState(BucketState state) {
-               bucketState = state;
-               // we can clean all the pending files since they where renamed 
to final files
-               // after this checkpoint was successfull
+       private void handleRestoredBucketState(BucketState bucketState) {
+               // we can clean all the pending files since they were renamed to
+               // final files after this checkpoint was successful
+               // (we re-start from the last **successful** checkpoint)
                bucketState.pendingFiles.clear();
-               try {
-                       initFileSystem();
-               } catch (IOException e) {
-                       LOG.error("Error while creating FileSystem in 
checkpoint restore.", e);
-                       throw new RuntimeException("Error while creating 
FileSystem in checkpoint restore.", e);
-               }
+
                if (bucketState.currentFile != null) {
-                       // We were writing to a file when the last checkpoint 
occured. This file can either
+                       // We were writing to a file when the last checkpoint 
occurred. This file can either
                        // be still in-progress or became a pending file at 
some point after the checkpoint.
-                       // Either way, we have to truncate it back to a valid 
state (or write a .valid-length)
-                       // file that specifies up to which length it is valid 
and rename it to the final name
+                       // Either way, we have to truncate it back to a valid 
state (or write a .valid-length
+                       // file that specifies up to which length it is valid) 
and rename it to the final name
                        // before starting a new bucket file.
                        Path partPath = new Path(bucketState.currentFile);
                        try {
-                               Path partPendingPath = new 
Path(partPath.getParent(), pendingPrefix + partPath.getName()).suffix(
-                                               pendingSuffix);
-                               Path partInProgressPath = new 
Path(partPath.getParent(), inProgressPrefix + 
partPath.getName()).suffix(inProgressSuffix);
+                               Path partPendingPath = 
getPendingPathFor(partPath);
+                               Path partInProgressPath = 
getInProgressPathFor(partPath);
 
                                if (fs.exists(partPendingPath)) {
                                        LOG.debug("In-progress file {} has been 
moved to pending after checkpoint, moving to final location.", partPath);
@@ -660,7 +648,10 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
                                                        "it was moved to final 
location by a previous snapshot restore", bucketState.currentFile);
                                }
 
-                               refTruncate = reflectTruncate(fs);
+                               if (this.refTruncate == null) {
+                                       this.refTruncate = reflectTruncate(fs);
+                               }
+
                                // truncate it or write a ".valid-length" file 
to specify up to which point it is valid
                                if (refTruncate != null) {
                                        LOG.debug("Truncating {} to valid 
length {}", partPath, bucketState.currentFileValidLength);
@@ -711,7 +702,7 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
 
                                } else {
                                        LOG.debug("Writing valid-length file 
for {} to specify valid length {}", partPath, 
bucketState.currentFileValidLength);
-                                       Path validLengthFilePath = new 
Path(partPath.getParent(), validLengthPrefix + 
partPath.getName()).suffix(validLengthSuffix);
+                                       Path validLengthFilePath = 
getValidLengthPathFor(partPath);
                                        if (!fs.exists(validLengthFilePath)) {
                                                FSDataOutputStream 
lengthFileOut = fs.create(validLengthFilePath);
                                                
lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
@@ -722,17 +713,16 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
                                // invalidate in the state object
                                bucketState.currentFile = null;
                                bucketState.currentFileValidLength = -1;
+                               isWriterOpen = false;
                        } catch (IOException e) {
                                LOG.error("Error while restoring RollingSink 
state.", e);
                                throw new RuntimeException("Error while 
restoring RollingSink state.", e);
                        } catch (InvocationTargetException | 
IllegalAccessException e) {
-                               LOG.error("Cound not invoke truncate.", e);
+                               LOG.error("Could not invoke truncate.", e);
                                throw new RuntimeException("Could not invoke 
truncate.", e);
                        }
                }
 
-               LOG.debug("Clearing pending/in-progress files.");
-
                // Move files that are confirmed by a checkpoint but did not 
get moved to final location
                // because the checkpoint notification did not happen before a 
failure
 
@@ -743,7 +733,7 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
                        // to their final name
                        for (String filename : 
bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) {
                                Path finalPath = new Path(filename);
-                               Path pendingPath = new 
Path(finalPath.getParent(), pendingPrefix + 
finalPath.getName()).suffix(pendingSuffix);
+                               Path pendingPath = getPendingPathFor(finalPath);
 
                                try {
                                        if (fs.exists(pendingPath)) {
@@ -756,39 +746,10 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
                                }
                        }
                }
-               bucketState.pendingFiles.clear();
+
                synchronized (bucketState.pendingFilesPerCheckpoint) {
                        bucketState.pendingFilesPerCheckpoint.clear();
                }
-
-               // we need to get this here since open() has not yet been called
-               int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
-               // delete pending files
-               try {
-
-                       RemoteIterator<LocatedFileStatus> bucketFiles = 
fs.listFiles(new Path(basePath), true);
-
-                       while (bucketFiles.hasNext()) {
-                               LocatedFileStatus file = bucketFiles.next();
-                               if 
(file.getPath().toString().endsWith(pendingSuffix)) {
-                                       // only delete files that contain our 
subtask index
-                                       if 
(file.getPath().toString().contains(partPrefix + "-" + subtaskIndex + "-")) {
-                                               LOG.debug("(RESTORE) Deleting 
pending file {}", file.getPath().toString());
-                                               fs.delete(file.getPath(), true);
-                                       }
-                               }
-                               if 
(file.getPath().toString().endsWith(inProgressSuffix)) {
-                                       // only delete files that contain our 
subtask index
-                                       if 
(file.getPath().toString().contains(partPrefix + "-" + subtaskIndex + "-")) {
-                                               LOG.debug("(RESTORE) Deleting 
in-progress file {}", file.getPath().toString());
-                                               fs.delete(file.getPath(), true);
-                                       }
-                               }
-                       }
-               } catch (IOException e) {
-                       LOG.error("Error while deleting old pending files: {}", 
e);
-                       throw new RuntimeException("Error while deleting old 
pending files.", e);
-               }
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -891,9 +852,12 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
         * <p>
         * This should only be disabled if using the sink without checkpoints, 
to not remove
         * the files already in the directory.
+        *
+        * @deprecated This option is deprecated and remains only for backwards 
compatibility.
+        * We do not clean up lingering files anymore.
         */
+       @Deprecated
        public RollingSink<T> disableCleanupOnOpen() {
-               this.cleanupOnOpen = false;
                return this;
        }
 
@@ -919,9 +883,9 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
                private static final long serialVersionUID = 1L;
 
                /**
-                * The file that was in-progress when the last checkpoint 
occured.
+                * The file that was in-progress when the last checkpoint 
occurred.
                 */
-               String currentFile = null;
+               String currentFile;
 
                /**
                 * The valid length of the in-progress file at the time of the 
last checkpoint.
@@ -939,5 +903,14 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
                 * pending files of completed checkpoints to their final 
location.
                 */
                final Map<Long, List<String>> pendingFilesPerCheckpoint = new 
HashMap<>();
+
+               @Override
+               public String toString() {
+                       return
+                               "In-progress=" + currentFile +
+                               " validLength=" + currentFileValidLength +
+                               " pendingForNextCheckpoint=" + pendingFiles +
+                               " pendingForPrevCheckpoints=" + 
pendingFilesPerCheckpoint;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e2b93f69/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
deleted file mode 100644
index 2b93721..0000000
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.flink.streaming.connectors.fs;
-
-import com.google.common.collect.Sets;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
-import org.apache.flink.util.NetUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Random;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.junit.Assert.assertTrue;
-
-/**
-* Tests for {@link RollingSink}.
-*
-* <p>
-* This test only verifies the exactly once behaviour of the sink. Another test 
tests the
-* rolling behaviour.
-*
-* <p>
-* This differs from RollingSinkFaultToleranceITCase in that the checkpoint 
interval is extremely
-* high. This provokes the case that the sink restarts without any checkpoint 
having been performed.
-* This tests the initial cleanup of pending/in-progress files.
-*
-* @deprecated should be removed with the {@link RollingSink}.
-*/
-@Deprecated
-public class RollingSinkFaultTolerance2ITCase extends 
StreamFaultToleranceTestBase {
-
-       final long NUM_STRINGS = 16_000;
-
-       @ClassRule
-       public static TemporaryFolder tempFolder = new TemporaryFolder();
-
-       private static MiniDFSCluster hdfsCluster;
-       private static org.apache.hadoop.fs.FileSystem dfs;
-
-       private static String outPath;
-
-
-
-       @BeforeClass
-       public static void createHDFS() throws IOException {
-               Configuration conf = new Configuration();
-
-               File dataDir = tempFolder.newFolder();
-
-               conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
dataDir.getAbsolutePath());
-               MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(conf);
-               hdfsCluster = builder.build();
-
-               dfs = hdfsCluster.getFileSystem();
-
-               outPath = "hdfs://"
-                               + 
NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(),  
hdfsCluster.getNameNodePort())
-                               + "/string-non-rolling-out-no-checkpoint";
-       }
-
-       @AfterClass
-       public static void destroyHDFS() {
-               if (hdfsCluster != null) {
-                       hdfsCluster.shutdown();
-               }
-       }
-
-
-       @Override
-       public void testProgram(StreamExecutionEnvironment env) {
-               assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
-
-               int PARALLELISM = 12;
-
-               env.enableCheckpointing(Long.MAX_VALUE);
-               env.setParallelism(PARALLELISM);
-               env.disableOperatorChaining();
-
-               DataStream<String> stream = env.addSource(new 
StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain();
-
-               DataStream<String> mapped = stream
-                               .map(new 
OnceFailingIdentityMapper(NUM_STRINGS));
-
-               RollingSink<String> sink = new RollingSink<String>(outPath)
-                               .setBucketer(new NonRollingBucketer())
-                               .setBatchSize(5000)
-                               .setValidLengthPrefix("")
-                               .setPendingPrefix("");
-
-               mapped.addSink(sink);
-
-       }
-
-       @Override
-       public void postSubmit() throws Exception {
-               // We read the files and verify that we have read all the 
strings. If a valid-length
-               // file exists we only read the file to that point. (This test 
should work with
-               // FileSystems that support truncate() and with others as well.)
-
-               Pattern messageRegex = Pattern.compile("message (\\d*)");
-
-               // Keep a set of the message IDs that we read. The size must 
equal the read count and
-               // the NUM_STRINGS. If numRead is bigger than the size of the 
set we have seen some
-               // elements twice.
-               Set<Integer> readNumbers = Sets.newHashSet();
-               int numRead = 0;
-
-               RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new 
Path(
-                               outPath), true);
-
-               while (files.hasNext()) {
-                       LocatedFileStatus file = files.next();
-
-                       if 
(!file.getPath().toString().endsWith(".valid-length")) {
-                               int validLength = (int) file.getLen();
-                               if 
(dfs.exists(file.getPath().suffix(".valid-length"))) {
-                                       FSDataInputStream inStream = 
dfs.open(file.getPath().suffix(".valid-length"));
-                                       String validLengthString = 
inStream.readUTF();
-                                       validLength = 
Integer.parseInt(validLengthString);
-                                       System.out.println("VALID LENGTH: " + 
validLength);
-                               }
-                               FSDataInputStream inStream = 
dfs.open(file.getPath());
-                               byte[] buffer = new byte[validLength];
-                               inStream.readFully(0, buffer, 0, validLength);
-                               inStream.close();
-
-                               ByteArrayInputStream bais = new 
ByteArrayInputStream(buffer);
-
-                               InputStreamReader inStreamReader = new 
InputStreamReader(bais);
-                               BufferedReader br = new 
BufferedReader(inStreamReader);
-
-                               String line = br.readLine();
-                               while (line != null) {
-                                       Matcher matcher = 
messageRegex.matcher(line);
-                                       if (matcher.matches()) {
-                                               numRead++;
-                                               int messageId = 
Integer.parseInt(matcher.group(1));
-                                               readNumbers.add(messageId);
-                                       } else {
-                                               Assert.fail("Read line does not 
match expected pattern.");
-                                       }
-                                       line = br.readLine();
-                               }
-                               br.close();
-                               inStreamReader.close();
-                               bais.close();
-                       }
-               }
-
-               // Verify that we read all strings (at-least-once)
-               Assert.assertEquals(NUM_STRINGS, readNumbers.size());
-
-               // Verify that we don't have duplicates (boom!, exactly-once)
-               Assert.assertEquals(NUM_STRINGS, numRead);
-       }
-
-       private static class OnceFailingIdentityMapper extends 
RichMapFunction<String, String> {
-               private static final long serialVersionUID = 1L;
-
-               private static volatile boolean hasFailed = false;
-
-               private final long numElements;
-
-               private long failurePos;
-               private long count;
-
-
-               OnceFailingIdentityMapper(long numElements) {
-                       this.numElements = numElements;
-               }
-
-               @Override
-               public void open(org.apache.flink.configuration.Configuration 
parameters) throws IOException {
-                       long failurePosMin = (long) (0.4 * numElements / 
getRuntimeContext().getNumberOfParallelSubtasks());
-                       long failurePosMax = (long) (0.7 * numElements / 
getRuntimeContext().getNumberOfParallelSubtasks());
-
-                       failurePos = (new Random().nextLong() % (failurePosMax 
- failurePosMin)) + failurePosMin;
-                       count = 0;
-               }
-
-               @Override
-               public String map(String value) throws Exception {
-                       count++;
-                       if (!hasFailed && count >= failurePos) {
-                               hasFailed = true;
-                               throw new Exception("Test Failure");
-                       }
-
-                       return value;
-               }
-       }
-
-       private static class StringGeneratingSourceFunction extends 
RichParallelSourceFunction<String>
-                       implements CheckpointedAsynchronously<Integer> {
-
-               private static final long serialVersionUID = 1L;
-
-               private final long numElements;
-
-               private int index;
-
-               private volatile boolean isRunning = true;
-
-
-               StringGeneratingSourceFunction(long numElements) {
-                       this.numElements = numElements;
-               }
-
-               @Override
-               public void run(SourceContext<String> ctx) throws Exception {
-                       final Object lockingObject = ctx.getCheckpointLock();
-
-                       final int step = 
getRuntimeContext().getNumberOfParallelSubtasks();
-
-                       if (index == 0) {
-                               index = 
getRuntimeContext().getIndexOfThisSubtask();
-                       }
-
-                       while (isRunning && index < numElements) {
-
-                               Thread.sleep(1);
-                               synchronized (lockingObject) {
-                                       ctx.collect("message " + index);
-                                       index += step;
-                               }
-                       }
-               }
-
-               @Override
-               public void cancel() {
-                       isRunning = false;
-               }
-
-               private static String randomString(StringBuilder bld, Random 
rnd) {
-                       final int len = rnd.nextInt(10) + 5;
-
-                       for (int i = 0; i < len; i++) {
-                               char next = (char) (rnd.nextInt(20000) + 33);
-                               bld.append(next);
-                       }
-
-                       return bld.toString();
-               }
-
-               @Override
-               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
-                       return index;
-               }
-
-               @Override
-               public void restoreState(Integer state) {
-                       index = state;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e2b93f69/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
index 9c39237..36c0d03 100644
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -42,6 +42,7 @@ import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.HashSet;
 import java.util.Random;
 import java.util.Set;
 import java.util.regex.Matcher;
@@ -71,7 +72,8 @@ public class RollingSinkFaultToleranceITCase extends 
StreamFaultToleranceTestBas
 
        private static String outPath;
 
-
+       private static final String PENDING_SUFFIX = ".pending";
+       private static final String IN_PROGRESS_SUFFIX = ".in-progress";
 
        @BeforeClass
        public static void createHDFS() throws IOException {
@@ -97,14 +99,13 @@ public class RollingSinkFaultToleranceITCase extends 
StreamFaultToleranceTestBas
                }
        }
 
-
        @Override
        public void testProgram(StreamExecutionEnvironment env) {
                assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
 
                int PARALLELISM = 12;
 
-               env.enableCheckpointing(200);
+               env.enableCheckpointing(20);
                env.setParallelism(PARALLELISM);
                env.disableOperatorChaining();
 
@@ -117,7 +118,9 @@ public class RollingSinkFaultToleranceITCase extends 
StreamFaultToleranceTestBas
                                .setBucketer(new NonRollingBucketer())
                                .setBatchSize(10000)
                                .setValidLengthPrefix("")
-                               .setPendingPrefix("");
+                               .setPendingPrefix("")
+                               .setPendingSuffix(PENDING_SUFFIX)
+                               .setInProgressSuffix(IN_PROGRESS_SUFFIX);
 
                mapped.addSink(sink);
 
@@ -135,7 +138,9 @@ public class RollingSinkFaultToleranceITCase extends 
StreamFaultToleranceTestBas
                // the NUM_STRINGS. If numRead is bigger than the size of the 
set we have seen some
                // elements twice.
                Set<Integer> readNumbers = Sets.newHashSet();
-               int numRead = 0;
+
+               HashSet<String> uniqMessagesRead = new HashSet<>();
+               HashSet<String> messagesInCommittedFiles = new HashSet<>();
 
                RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new 
Path(
                                outPath), true);
@@ -165,7 +170,15 @@ public class RollingSinkFaultToleranceITCase extends 
StreamFaultToleranceTestBas
                                while (line != null) {
                                        Matcher matcher = 
messageRegex.matcher(line);
                                        if (matcher.matches()) {
-                                               numRead++;
+                                               uniqMessagesRead.add(line);
+
+                                               // check that in the committed 
files there are no duplicates
+                                               if 
(!file.getPath().toString().endsWith(IN_PROGRESS_SUFFIX) && 
!file.getPath().toString().endsWith(PENDING_SUFFIX)) {
+                                                       if 
(!messagesInCommittedFiles.add(line)) {
+                                                               
Assert.fail("Duplicate entry in committed bucket.");
+                                                       }
+                                               }
+
                                                int messageId = 
Integer.parseInt(matcher.group(1));
                                                readNumbers.add(messageId);
                                        } else {
@@ -183,7 +196,7 @@ public class RollingSinkFaultToleranceITCase extends 
StreamFaultToleranceTestBas
                Assert.assertEquals(NUM_STRINGS, readNumbers.size());
 
                // Verify that we don't have duplicates (boom!, exactly-once)
-               Assert.assertEquals(NUM_STRINGS, numRead);
+               Assert.assertEquals(NUM_STRINGS, uniqMessagesRead.size());
        }
 
        private static class OnceFailingIdentityMapper extends 
RichMapFunction<String, String> {

http://git-wip-us.apache.org/repos/asf/flink/blob/e2b93f69/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index c8440ef..80ae294 100644
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -26,6 +26,7 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericData.StringType;
 import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
@@ -34,7 +35,12 @@ import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
 import 
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.AvroKeyValue;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.NetUtils;
@@ -638,6 +644,243 @@ public class RollingSinkITCase extends 
StreamingMultipleProgramsTestBase {
                Assert.assertEquals(8, numFiles);
        }
 
+       private static final String PART_PREFIX = "part";
+       private static final String PENDING_SUFFIX = ".pending";
+       private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+       private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+       @Test
+       public void testBucketStateTransitions() throws Exception {
+               final File outDir = tempFolder.newFolder();
+
+               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createRescalingTestSink(outDir, 1, 0);
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.setProcessingTime(0L);
+
+               // we have a bucket size of 5 bytes, so each record will get 
its own bucket,
+               // i.e. the bucket should roll after every record.
+
+               testHarness.processElement(new StreamRecord<>("test1", 1L));
+               testHarness.processElement(new StreamRecord<>("test2", 1L));
+               checkFs(outDir, 1, 1 ,0, 0);
+
+               testHarness.processElement(new StreamRecord<>("test3", 1L));
+               checkFs(outDir, 1, 2, 0, 0);
+
+               testHarness.snapshot(0, 0);
+               checkFs(outDir, 1, 2, 0, 0);
+
+               testHarness.notifyOfCompletedCheckpoint(0);
+               checkFs(outDir, 1, 0, 2, 0);
+
+               OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
+
+               testHarness.close();
+               checkFs(outDir, 0, 1, 2, 0);
+
+               testHarness = createRescalingTestSink(outDir, 1, 0);
+               testHarness.setup();
+               testHarness.initializeState(snapshot);
+               testHarness.open();
+               checkFs(outDir, 0, 0, 3, 1);
+
+               snapshot = testHarness.snapshot(2, 0);
+
+               testHarness.processElement(new StreamRecord<>("test4", 10));
+               checkFs(outDir, 1, 0, 3, 1);
+
+               testHarness = createRescalingTestSink(outDir, 1, 0);
+               testHarness.setup();
+               testHarness.initializeState(snapshot);
+               testHarness.open();
+
+               // the in-progress file remains as we do not clean up now
+               checkFs(outDir, 1, 0, 3, 1);
+
+               testHarness.close();
+
+               // at close it is not moved to final because it is not part
+               // of the current task's state, it was just a not cleaned up 
leftover.
+               checkFs(outDir, 1, 0, 3, 1);
+       }
+
+       @Test
+       public void testScalingDown() throws Exception {
+               final File outDir = tempFolder.newFolder();
+
+               OneInputStreamOperatorTestHarness<String, Object> testHarness1 
= createRescalingTestSink(outDir, 3, 0);
+               testHarness1.setup();
+               testHarness1.open();
+
+               OneInputStreamOperatorTestHarness<String, Object> testHarness2 
= createRescalingTestSink(outDir, 3, 1);
+               testHarness2.setup();
+               testHarness2.open();
+
+               OneInputStreamOperatorTestHarness<String, Object> testHarness3 
= createRescalingTestSink(outDir, 3, 2);
+               testHarness3.setup();
+               testHarness3.open();
+
+               testHarness1.processElement(new StreamRecord<>("test1", 0L));
+               checkFs(outDir, 1, 0, 0, 0);
+
+               testHarness2.processElement(new StreamRecord<>("test2", 0L));
+               testHarness2.processElement(new StreamRecord<>("test3", 0L));
+               testHarness2.processElement(new StreamRecord<>("test4", 0L));
+               testHarness2.processElement(new StreamRecord<>("test5", 0L));
+               testHarness2.processElement(new StreamRecord<>("test6", 0L));
+               checkFs(outDir, 2, 4, 0, 0);
+
+               testHarness3.processElement(new StreamRecord<>("test7", 0L));
+               testHarness3.processElement(new StreamRecord<>("test8", 0L));
+               checkFs(outDir, 3, 5, 0, 0);
+
+               // intentionally we snapshot them in a not ascending order so 
that the states are shuffled
+               OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
+                       testHarness3.snapshot(0, 0),
+                       testHarness1.snapshot(0, 0),
+                       testHarness2.snapshot(0, 0)
+               );
+
+               // with the above state reshuffling, we expect testHarness4 to 
take the
+               // state of the previous testHarness3 and testHarness1 while 
testHarness5
+               // will take that of the previous testHarness1
+
+               OneInputStreamOperatorTestHarness<String, Object> testHarness4 
= createRescalingTestSink(outDir, 2, 0);
+               testHarness4.setup();
+               testHarness4.initializeState(mergedSnapshot);
+               testHarness4.open();
+
+               // we do not have a length file for part-2-0 because bucket 
part-2-0
+               // was not "in-progress", but "pending" (its full content is 
valid).
+               checkFs(outDir, 1, 4, 3, 2);
+
+               OneInputStreamOperatorTestHarness<String, Object> testHarness5 
= createRescalingTestSink(outDir, 2, 1);
+               testHarness5.setup();
+               testHarness5.initializeState(mergedSnapshot);
+               testHarness5.open();
+
+               checkFs(outDir, 0, 0, 8, 3);
+       }
+
+       @Test
+       public void testScalingUp() throws Exception {
+               final File outDir = tempFolder.newFolder();
+
+               OneInputStreamOperatorTestHarness<String, Object> testHarness1 
= createRescalingTestSink(outDir, 2, 0);
+               testHarness1.setup();
+               testHarness1.open();
+
+               OneInputStreamOperatorTestHarness<String, Object> testHarness2 
= createRescalingTestSink(outDir, 2, 0);
+               testHarness2.setup();
+               testHarness2.open();
+
+               testHarness1.processElement(new StreamRecord<>("test1", 0L));
+               testHarness1.processElement(new StreamRecord<>("test2", 0L));
+
+               checkFs(outDir, 1, 1, 0, 0);
+
+               testHarness2.processElement(new StreamRecord<>("test3", 0L));
+               testHarness2.processElement(new StreamRecord<>("test4", 0L));
+               testHarness2.processElement(new StreamRecord<>("test5", 0L));
+
+               checkFs(outDir, 2, 3, 0, 0);
+
+               // intentionally we snapshot them in the reverse order so that 
the states are shuffled
+               OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
+                       testHarness2.snapshot(0, 0),
+                       testHarness1.snapshot(0, 0)
+               );
+
+               testHarness1 = createRescalingTestSink(outDir, 3, 0);
+               testHarness1.setup();
+               testHarness1.initializeState(mergedSnapshot);
+               testHarness1.open();
+
+               checkFs(outDir, 1, 1, 3, 1);
+
+               testHarness2 = createRescalingTestSink(outDir, 3, 1);
+               testHarness2.setup();
+               testHarness2.initializeState(mergedSnapshot);
+               testHarness2.open();
+
+               checkFs(outDir, 0, 0, 5, 2);
+
+               OneInputStreamOperatorTestHarness<String, Object> testHarness3 
= createRescalingTestSink(outDir, 3, 2);
+               testHarness3.setup();
+               testHarness3.initializeState(mergedSnapshot);
+               testHarness3.open();
+
+               checkFs(outDir, 0, 0, 5, 2);
+
+               testHarness1.processElement(new StreamRecord<>("test6", 0));
+               testHarness2.processElement(new StreamRecord<>("test6", 0));
+               testHarness3.processElement(new StreamRecord<>("test6", 0));
+
+               // 3 for the different tasks
+               checkFs(outDir, 3, 0, 5, 2);
+
+               testHarness1.snapshot(1, 0);
+               testHarness2.snapshot(1, 0);
+               testHarness3.snapshot(1, 0);
+
+               testHarness1.close();
+               testHarness2.close();
+               testHarness3.close();
+
+               checkFs(outDir, 0, 3, 5, 2);
+       }
+
+       private OneInputStreamOperatorTestHarness<String, Object> 
createRescalingTestSink(
+               File outDir, int totalParallelism, int taskIdx) throws 
Exception {
+
+               RollingSink<String> sink = new 
RollingSink<String>(outDir.getAbsolutePath())
+                       .setWriter(new StringWriter<String>())
+                       .setBatchSize(5)
+                       .setPartPrefix(PART_PREFIX)
+                       .setInProgressPrefix("")
+                       .setPendingPrefix("")
+                       .setValidLengthPrefix("")
+                       .setInProgressSuffix(IN_PROGRESS_SUFFIX)
+                       .setPendingSuffix(PENDING_SUFFIX)
+                       .setValidLengthSuffix(VALID_LENGTH_SUFFIX);
+
+               return createTestSink(sink, totalParallelism, taskIdx);
+       }
+
+       private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink(
+               RollingSink<T> sink, int totalParallelism, int taskIdx) throws 
Exception {
+               return new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink), 10, totalParallelism, taskIdx);
+       }
+
+       private void checkFs(File outDir, int inprogress, int pending, int 
completed, int valid) throws IOException {
+               int inProg = 0;
+               int pend = 0;
+               int compl = 0;
+               int val = 0;
+
+               for (File file: FileUtils.listFiles(outDir, null, true)) {
+                       if (file.getAbsolutePath().endsWith("crc")) {
+                               continue;
+                       }
+                       String path = file.getPath();
+                       if (path.endsWith(IN_PROGRESS_SUFFIX)) {
+                               inProg++;
+                       } else if (path.endsWith(PENDING_SUFFIX)) {
+                               pend++;
+                       } else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
+                               val++;
+                       } else if (path.contains(PART_PREFIX)) {
+                               compl++;
+                       }
+               }
+
+               Assert.assertEquals(inprogress, inProg);
+               Assert.assertEquals(pending, pend);
+               Assert.assertEquals(completed, compl);
+               Assert.assertEquals(valid, val);
+       }
 
        private static class TestSourceFunction implements 
SourceFunction<Tuple2<Integer, String>> {
                private static final long serialVersionUID = 1L;

Reply via email to