[FLINK-5443] Migrate from Rolling to Bucketing sink.

To migrate from a RollingSink to a BucketingSink, a
user can now take a savepoint, change his code to
use a BuckeingSink with the same properties as the
previous RollingSink, and resume his program from
that savepoint.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b4c60a94
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b4c60a94
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b4c60a94

Branch: refs/heads/release-1.2
Commit: b4c60a942fe07e355dd49ed2aab3c0a7ae94285d
Parents: 078e248
Author: kl0u <kklou...@gmail.com>
Authored: Fri Jan 6 16:52:51 2017 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Fri Jan 13 11:38:44 2017 +0100

----------------------------------------------------------------------
 .../streaming/connectors/fs/RollingSink.java    |  10 +-
 .../connectors/fs/bucketing/BucketingSink.java  | 278 +++++++++++--------
 .../fs/bucketing/RollingSinkMigrationTest.java  |  17 ++
 .../RollingToBucketingMigrationTest.java        | 161 +++++++++++
 4 files changed, 348 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b4c60a94/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index 98eb2d4..429d00a 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -905,30 +905,30 @@ public class RollingSink<T> extends RichSinkFunction<T>
         * This is used for keeping track of the current in-progress files and 
files that we mark
         * for moving from pending to final location after we get a 
checkpoint-complete notification.
         */
-       static final class BucketState implements Serializable {
+       public static final class BucketState implements Serializable {
                private static final long serialVersionUID = 1L;
 
                /**
                 * The file that was in-progress when the last checkpoint 
occurred.
                 */
-               String currentFile;
+               public String currentFile;
 
                /**
                 * The valid length of the in-progress file at the time of the 
last checkpoint.
                 */
-               long currentFileValidLength = -1;
+               public long currentFileValidLength = -1;
 
                /**
                 * Pending files that accumulated since the last checkpoint.
                 */
-               List<String> pendingFiles = new ArrayList<>();
+               public List<String> pendingFiles = new ArrayList<>();
 
                /**
                 * When doing a checkpoint we move the pending files since the 
last checkpoint to this map
                 * with the id of the checkpoint. When we get the 
checkpoint-complete notification we move
                 * pending files of completed checkpoints to their final 
location.
                 */
-               final Map<Long, List<String>> pendingFilesPerCheckpoint = new 
HashMap<>();
+               public final Map<Long, List<String>> pendingFilesPerCheckpoint 
= new HashMap<>();
 
                @Override
                public String toString() {

http://git-wip-us.apache.org/repos/asf/flink/blob/b4c60a94/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index e8bff21..7dbcda7 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -29,9 +29,11 @@ import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.fs.Clock;
+import org.apache.flink.streaming.connectors.fs.RollingSink;
 import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
 import org.apache.flink.streaming.connectors.fs.StringWriter;
 import org.apache.flink.streaming.connectors.fs.Writer;
@@ -149,7 +151,8 @@ import java.util.Iterator;
  */
 public class BucketingSink<T>
                extends RichSinkFunction<T>
-               implements InputTypeConfigurable, CheckpointedFunction, 
CheckpointListener, ProcessingTimeCallback {
+               implements InputTypeConfigurable, CheckpointedFunction, 
CheckpointListener,
+                                       
CheckpointedRestoring<RollingSink.BucketState>, ProcessingTimeCallback {
 
        private static final long serialVersionUID = 1L;
 
@@ -344,7 +347,12 @@ public class BucketingSink<T>
        public void initializeState(FunctionInitializationContext context) 
throws Exception {
                Preconditions.checkArgument(this.restoredBucketStates == null, 
"The operator has already been initialized.");
 
-               initFileSystem();
+               try {
+                       initFileSystem();
+               } catch (IOException e) {
+                       LOG.error("Error while creating FileSystem when 
initializing the state of the BucketingSink.", e);
+                       throw new RuntimeException("Error while creating 
FileSystem when initializing the state of the BucketingSink.", e);
+               }
 
                if (this.refTruncate == null) {
                        this.refTruncate = reflectTruncate(fs);
@@ -706,139 +714,183 @@ public class BucketingSink<T>
                        // (we re-start from the last **successful** checkpoint)
                        bucketState.pendingFiles.clear();
 
-                       if (bucketState.currentFile != null) {
+                       handlePendingInProgressFile(bucketState.currentFile, 
bucketState.currentFileValidLength);
 
-                               // We were writing to a file when the last 
checkpoint occurred. This file can either
-                               // be still in-progress or became a pending 
file at some point after the checkpoint.
-                               // Either way, we have to truncate it back to a 
valid state (or write a .valid-length
-                               // file that specifies up to which length it is 
valid) and rename it to the final name
-                               // before starting a new bucket file.
+                       // Now that we've restored the bucket to a valid state, 
reset the current file info
+                       bucketState.currentFile = null;
+                       bucketState.currentFileValidLength = -1;
+                       bucketState.isWriterOpen = false;
 
-                               Path partPath = new 
Path(bucketState.currentFile);
-                               try {
-                                       Path partPendingPath = 
getPendingPathFor(partPath);
-                                       Path partInProgressPath = 
getInProgressPathFor(partPath);
-
-                                       if (fs.exists(partPendingPath)) {
-                                               LOG.debug("In-progress file {} 
has been moved to pending after checkpoint, moving to final location.", 
partPath);
-                                               // has been moved to pending in 
the mean time, rename to final location
-                                               fs.rename(partPendingPath, 
partPath);
-                                       } else if 
(fs.exists(partInProgressPath)) {
-                                               LOG.debug("In-progress file {} 
is still in-progress, moving to final location.", partPath);
-                                               // it was still in progress, 
rename to final path
-                                               fs.rename(partInProgressPath, 
partPath);
-                                       } else if (fs.exists(partPath)) {
-                                               LOG.debug("In-Progress file {} 
was already moved to final location {}.", bucketState.currentFile, partPath);
-                                       } else {
-                                               LOG.debug("In-Progress file {} 
was neither moved to pending nor is still in progress. Possibly, " +
-                                                       "it was moved to final 
location by a previous snapshot restore", bucketState.currentFile);
-                                       }
+                       
handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
 
-                                       // We use reflection to get the 
.truncate() method, this
-                                       // is only available starting with 
Hadoop 2.7
-                                       if (this.refTruncate == null) {
-                                               this.refTruncate = 
reflectTruncate(fs);
-                                       }
+                       synchronized (bucketState.pendingFilesPerCheckpoint) {
+                               bucketState.pendingFilesPerCheckpoint.clear();
+                       }
+               }
+       }
+
+       private void handleRestoredRollingSinkState(RollingSink.BucketState 
restoredState) {
+               restoredState.pendingFiles.clear();
 
-                                       // truncate it or write a 
".valid-length" file to specify up to which point it is valid
-                                       if (refTruncate != null) {
-                                               LOG.debug("Truncating {} to 
valid length {}", partPath, bucketState.currentFileValidLength);
-                                               // some-one else might still 
hold the lease from a previous try, we are
-                                               // recovering, after all ...
-                                               if (fs instanceof 
DistributedFileSystem) {
-                                                       DistributedFileSystem 
dfs = (DistributedFileSystem) fs;
-                                                       LOG.debug("Trying to 
recover file lease {}", partPath);
-                                                       
dfs.recoverLease(partPath);
-                                                       boolean isclosed = 
dfs.isFileClosed(partPath);
-                                                       StopWatch sw = new 
StopWatch();
-                                                       sw.start();
-                                                       while (!isclosed) {
-                                                               if 
(sw.getTime() > asyncTimeout) {
-                                                                       break;
-                                                               }
-                                                               try {
-                                                                       
Thread.sleep(500);
-                                                               } catch 
(InterruptedException e1) {
-                                                                       // 
ignore it
-                                                               }
-                                                               isclosed = 
dfs.isFileClosed(partPath);
+               handlePendingInProgressFile(restoredState.currentFile, 
restoredState.currentFileValidLength);
+
+               // Now that we've restored the bucket to a valid state, reset 
the current file info
+               restoredState.currentFile = null;
+               restoredState.currentFileValidLength = -1;
+
+               
handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);
+
+               synchronized (restoredState.pendingFilesPerCheckpoint) {
+                       restoredState.pendingFilesPerCheckpoint.clear();
+               }
+       }
+
+       private void handlePendingInProgressFile(String file, long validLength) 
{
+               if (file != null) {
+
+                       // We were writing to a file when the last checkpoint 
occurred. This file can either
+                       // be still in-progress or became a pending file at 
some point after the checkpoint.
+                       // Either way, we have to truncate it back to a valid 
state (or write a .valid-length
+                       // file that specifies up to which length it is valid) 
and rename it to the final name
+                       // before starting a new bucket file.
+
+                       Path partPath = new Path(file);
+                       try {
+                               Path partPendingPath = 
getPendingPathFor(partPath);
+                               Path partInProgressPath = 
getInProgressPathFor(partPath);
+
+                               if (fs.exists(partPendingPath)) {
+                                       LOG.debug("In-progress file {} has been 
moved to pending after checkpoint, moving to final location.", partPath);
+                                       // has been moved to pending in the 
mean time, rename to final location
+                                       fs.rename(partPendingPath, partPath);
+                               } else if (fs.exists(partInProgressPath)) {
+                                       LOG.debug("In-progress file {} is still 
in-progress, moving to final location.", partPath);
+                                       // it was still in progress, rename to 
final path
+                                       fs.rename(partInProgressPath, partPath);
+                               } else if (fs.exists(partPath)) {
+                                       LOG.debug("In-Progress file {} was 
already moved to final location {}.", file, partPath);
+                               } else {
+                                       LOG.debug("In-Progress file {} was 
neither moved to pending nor is still in progress. Possibly, " +
+                                               "it was moved to final location 
by a previous snapshot restore", file);
+                               }
+
+                               // We use reflection to get the .truncate() 
method, this
+                               // is only available starting with Hadoop 2.7
+                               if (this.refTruncate == null) {
+                                       this.refTruncate = reflectTruncate(fs);
+                               }
+
+                               // truncate it or write a ".valid-length" file 
to specify up to which point it is valid
+                               if (refTruncate != null) {
+                                       LOG.debug("Truncating {} to valid 
length {}", partPath, validLength);
+                                       // some-one else might still hold the 
lease from a previous try, we are
+                                       // recovering, after all ...
+                                       if (fs instanceof 
DistributedFileSystem) {
+                                               DistributedFileSystem dfs = 
(DistributedFileSystem) fs;
+                                               LOG.debug("Trying to recover 
file lease {}", partPath);
+                                               dfs.recoverLease(partPath);
+                                               boolean isclosed = 
dfs.isFileClosed(partPath);
+                                               StopWatch sw = new StopWatch();
+                                               sw.start();
+                                               while (!isclosed) {
+                                                       if (sw.getTime() > 
asyncTimeout) {
+                                                               break;
+                                                       }
+                                                       try {
+                                                               
Thread.sleep(500);
+                                                       } catch 
(InterruptedException e1) {
+                                                               // ignore it
                                                        }
+                                                       isclosed = 
dfs.isFileClosed(partPath);
                                                }
-                                               Boolean truncated = (Boolean) 
refTruncate.invoke(fs, partPath, bucketState.currentFileValidLength);
-                                               if (!truncated) {
-                                                       LOG.debug("Truncate did 
not immediately complete for {}, waiting...", partPath);
-
-                                                       // we must wait for the 
asynchronous truncate operation to complete
-                                                       StopWatch sw = new 
StopWatch();
-                                                       sw.start();
-                                                       long newLen = 
fs.getFileStatus(partPath).getLen();
-                                                       while (newLen != 
bucketState.currentFileValidLength) {
-                                                               if 
(sw.getTime() > asyncTimeout) {
-                                                                       break;
-                                                               }
-                                                               try {
-                                                                       
Thread.sleep(500);
-                                                               } catch 
(InterruptedException e1) {
-                                                                       // 
ignore it
-                                                               }
-                                                               newLen = 
fs.getFileStatus(partPath).getLen();
+                                       }
+                                       Boolean truncated = (Boolean) 
refTruncate.invoke(fs, partPath, validLength);
+                                       if (!truncated) {
+                                               LOG.debug("Truncate did not 
immediately complete for {}, waiting...", partPath);
+
+                                               // we must wait for the 
asynchronous truncate operation to complete
+                                               StopWatch sw = new StopWatch();
+                                               sw.start();
+                                               long newLen = 
fs.getFileStatus(partPath).getLen();
+                                               while (newLen != validLength) {
+                                                       if (sw.getTime() > 
asyncTimeout) {
+                                                               break;
                                                        }
-                                                       if (newLen != 
bucketState.currentFileValidLength) {
-                                                               throw new 
RuntimeException("Truncate did not truncate to right length. Should be " + 
bucketState.currentFileValidLength + " is " + newLen + ".");
+                                                       try {
+                                                               
Thread.sleep(500);
+                                                       } catch 
(InterruptedException e1) {
+                                                               // ignore it
                                                        }
+                                                       newLen = 
fs.getFileStatus(partPath).getLen();
                                                }
-                                       } else {
-                                               LOG.debug("Writing valid-length 
file for {} to specify valid length {}", partPath, 
bucketState.currentFileValidLength);
-                                               Path validLengthFilePath = 
getValidLengthPathFor(partPath);
-                                               if 
(!fs.exists(validLengthFilePath)) {
-                                                       FSDataOutputStream 
lengthFileOut = fs.create(validLengthFilePath);
-                                                       
lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
-                                                       lengthFileOut.close();
+                                               if (newLen != validLength) {
+                                                       throw new 
RuntimeException("Truncate did not truncate to right length. Should be " + 
validLength + " is " + newLen + ".");
                                                }
                                        }
-
-                                       // Now that we've restored the bucket 
to a valid state, reset the current file info
-                                       bucketState.currentFile = null;
-                                       bucketState.currentFileValidLength = -1;
-                                       bucketState.isWriterOpen = false;
-                               } catch (IOException e) {
-                                       LOG.error("Error while restoring 
BucketingSink state.", e);
-                                       throw new RuntimeException("Error while 
restoring BucketingSink state.", e);
-                               } catch (InvocationTargetException | 
IllegalAccessException e) {
-                                       LOG.error("Could not invoke truncate.", 
e);
-                                       throw new RuntimeException("Could not 
invoke truncate.", e);
+                               } else {
+                                       LOG.debug("Writing valid-length file 
for {} to specify valid length {}", partPath, validLength);
+                                       Path validLengthFilePath = 
getValidLengthPathFor(partPath);
+                                       if (!fs.exists(validLengthFilePath) && 
fs.exists(partPath)) {
+                                               FSDataOutputStream 
lengthFileOut = fs.create(validLengthFilePath);
+                                               
lengthFileOut.writeUTF(Long.toString(validLength));
+                                               lengthFileOut.close();
+                                       }
                                }
+
+                       } catch (IOException e) {
+                               LOG.error("Error while restoring BucketingSink 
state.", e);
+                               throw new RuntimeException("Error while 
restoring BucketingSink state.", e);
+                       } catch (InvocationTargetException | 
IllegalAccessException e) {
+                               LOG.error("Could not invoke truncate.", e);
+                               throw new RuntimeException("Could not invoke 
truncate.", e);
                        }
+               }
+       }
 
-                       // Move files that are confirmed by a checkpoint but 
did not get moved to final location
-                       // because the checkpoint notification did not happen 
before a failure
+       private void handlePendingFilesForPreviousCheckpoints(Map<Long, 
List<String>> pendingFilesPerCheckpoint) {
+               // Move files that are confirmed by a checkpoint but did not 
get moved to final location
+               // because the checkpoint notification did not happen before a 
failure
 
-                       LOG.debug("Moving pending files to final location on 
restore.");
+               LOG.debug("Moving pending files to final location on restore.");
 
-                       Set<Long> pastCheckpointIds = 
bucketState.pendingFilesPerCheckpoint.keySet();
-                       for (Long pastCheckpointId : pastCheckpointIds) {
-                               // All the pending files are buckets that have 
been completed but are waiting to be renamed
-                               // to their final name
-                               for (String filename : 
bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) {
-                                       Path finalPath = new Path(filename);
-                                       Path pendingPath = 
getPendingPathFor(finalPath);
+               Set<Long> pastCheckpointIds = 
pendingFilesPerCheckpoint.keySet();
+               for (Long pastCheckpointId : pastCheckpointIds) {
+                       // All the pending files are buckets that have been 
completed but are waiting to be renamed
+                       // to their final name
+                       for (String filename : 
pendingFilesPerCheckpoint.get(pastCheckpointId)) {
+                               Path finalPath = new Path(filename);
+                               Path pendingPath = getPendingPathFor(finalPath);
 
-                                       try {
-                                               if (fs.exists(pendingPath)) {
-                                                       LOG.debug("Restoring 
BucketingSink State: Moving pending file {} to final location after complete 
checkpoint {}.", pendingPath, pastCheckpointId);
-                                                       fs.rename(pendingPath, 
finalPath);
-                                               }
-                                       } catch (IOException e) {
-                                               LOG.error("Restoring 
BucketingSink State: Error while renaming pending file {} to final path {}: 
{}", pendingPath, finalPath, e);
-                                               throw new 
RuntimeException("Error while renaming pending file " + pendingPath + " to 
final path " + finalPath, e);
+                               try {
+                                       if (fs.exists(pendingPath)) {
+                                               LOG.debug("Restoring 
BucketingSink State: Moving pending file {} to final location after complete 
checkpoint {}.", pendingPath, pastCheckpointId);
+                                               fs.rename(pendingPath, 
finalPath);
                                        }
+                               } catch (IOException e) {
+                                       LOG.error("Restoring BucketingSink 
State: Error while renaming pending file {} to final path {}: {}", pendingPath, 
finalPath, e);
+                                       throw new RuntimeException("Error while 
renaming pending file " + pendingPath + " to final path " + finalPath, e);
                                }
                        }
+               }
+       }
 
-                       synchronized (bucketState.pendingFilesPerCheckpoint) {
-                               bucketState.pendingFilesPerCheckpoint.clear();
-                       }
+       // 
--------------------------------------------------------------------------------------------
+       //  Backwards compatibility with Flink 1.1
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void restoreState(RollingSink.BucketState state) throws 
Exception {
+               LOG.info("{} (taskIdx={}) restored bucket state from the 
RollingSink an older Flink version: {}",
+                       getClass().getSimpleName(), 
getRuntimeContext().getIndexOfThisSubtask(), state);
+
+               try {
+                       initFileSystem();
+               } catch (IOException e) {
+                       LOG.error("Error while creating FileSystem when 
restoring the state of the BucketingSink.", e);
+                       throw new RuntimeException("Error while creating 
FileSystem when restoring the state of the BucketingSink.", e);
                }
+
+               handleRestoredRollingSinkState(state);
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/b4c60a94/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
index 0c5e16b..3355fae 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.flink.streaming.connectors.fs.bucketing;
 
 import org.apache.commons.io.FileUtils;

http://git-wip-us.apache.org/repos/asf/flink/blob/b4c60a94/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java
new file mode 100644
index 0000000..257b157
--- /dev/null
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingToBucketingMigrationTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.fs.RollingSink;
+import org.apache.flink.streaming.connectors.fs.StringWriter;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+public class RollingToBucketingMigrationTest {
+
+       @ClassRule
+       public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+       private static final String PART_PREFIX = "part";
+       private static final String PENDING_SUFFIX = ".pending";
+       private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+       private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+       @Test
+       public void testMigration() throws Exception {
+               final File outDir = tempFolder.newFolder();
+
+               BucketingSink<String> sink = new 
ValidatingBucketingSink<String>(outDir.getAbsolutePath())
+                       .setWriter(new StringWriter<String>())
+                       .setBatchSize(5)
+                       .setPartPrefix(PART_PREFIX)
+                       .setInProgressPrefix("")
+                       .setPendingPrefix("")
+                       .setValidLengthPrefix("")
+                       .setInProgressSuffix(IN_PROGRESS_SUFFIX)
+                       .setPendingSuffix(PENDING_SUFFIX)
+                       .setValidLengthSuffix(VALID_LENGTH_SUFFIX);
+
+               OneInputStreamOperatorTestHarness<String, Object> testHarness1 
= new OneInputStreamOperatorTestHarness<>(
+                       new StreamSink<>(sink), 10, 1, 0);
+               testHarness1.setup();
+               
testHarness1.initializeStateFromLegacyCheckpoint(getResourceFilename("rolling-sink-migration-test-flink1.1-snapshot"));
+               testHarness1.open();
+
+               testHarness1.processElement(new StreamRecord<>("test1", 0L));
+               testHarness1.processElement(new StreamRecord<>("test2", 0L));
+
+               checkFs(outDir, 1, 1, 0, 0);
+
+               testHarness1.close();
+       }
+
+       private static String getResourceFilename(String filename) {
+               ClassLoader cl = 
RollingToBucketingMigrationTest.class.getClassLoader();
+               URL resource = cl.getResource(filename);
+               return resource.getFile();
+       }
+
+       private void checkFs(File outDir, int inprogress, int pending, int 
completed, int valid) throws IOException {
+               int inProg = 0;
+               int pend = 0;
+               int compl = 0;
+               int val = 0;
+
+               for (File file: FileUtils.listFiles(outDir, null, true)) {
+                       if (file.getAbsolutePath().endsWith("crc")) {
+                               continue;
+                       }
+                       String path = file.getPath();
+                       if (path.endsWith(IN_PROGRESS_SUFFIX)) {
+                               inProg++;
+                       } else if (path.endsWith(PENDING_SUFFIX)) {
+                               pend++;
+                       } else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
+                               val++;
+                       } else if (path.contains(PART_PREFIX)) {
+                               compl++;
+                       }
+               }
+
+               Assert.assertEquals(inprogress, inProg);
+               Assert.assertEquals(pending, pend);
+               Assert.assertEquals(completed, compl);
+               Assert.assertEquals(valid, val);
+       }
+
+       static class ValidatingBucketingSink<T> extends BucketingSink<T> {
+
+               private static final long serialVersionUID = 
-4263974081712009141L;
+
+               ValidatingBucketingSink(String basePath) {
+                       super(basePath);
+               }
+
+               @Override
+               public void restoreState(RollingSink.BucketState state) throws 
Exception {
+
+                       /**
+                        * this validates that we read the state that was 
checkpointed by the previous version. We expect it to be:
+                        * 
In-progress=/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4
+                        *                                      validLength=6
+                        * pendingForNextCheckpoint=[]
+                        * pendingForPrevCheckpoints={0=[       
/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-0,
+                        *                                                      
                
/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-1,
+                        *                                                      
                
/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-2,
+                        *                                                      
                
/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-3]}
+                        * */
+
+                       String current = state.currentFile;
+                       long validLength = state.currentFileValidLength;
+
+                       
Assert.assertEquals("/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4",
 current);
+                       Assert.assertEquals(6, validLength);
+
+                       List<String> pendingFiles = state.pendingFiles;
+                       Assert.assertTrue(pendingFiles.isEmpty());
+
+                       final Map<Long, List<String>> pendingFilesPerCheckpoint 
= state.pendingFilesPerCheckpoint;
+                       Assert.assertEquals(1, 
pendingFilesPerCheckpoint.size());
+
+                       for (Map.Entry<Long, List<String>> entry: 
pendingFilesPerCheckpoint.entrySet()) {
+                               long checkpoint = entry.getKey();
+                               List<String> files = entry.getValue();
+
+                               Assert.assertEquals(0L, checkpoint);
+                               Assert.assertEquals(4, files.size());
+
+                               for (int i = 0; i < 4; i++) {
+                                       Assert.assertEquals(
+                                               
"/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-"
 + i,
+                                               files.get(i));
+                               }
+                       }
+
+                       super.restoreState(state);
+               }
+       }
+}

Reply via email to