[FLINK-5318] Make the RollingSink backwards compatible.

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/078e2489
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/078e2489
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/078e2489

Branch: refs/heads/release-1.2
Commit: 078e2489180b7544f2af48afef8147401cd9ebd6
Parents: 2215f82
Author: kl0u <kklou...@gmail.com>
Authored: Fri Jan 6 15:38:28 2017 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Fri Jan 13 11:38:44 2017 +0100

----------------------------------------------------------------------
 .../streaming/connectors/fs/RollingSink.java    |  32 +++-
 .../connectors/fs/bucketing/BucketingSink.java  |   8 +-
 .../fs/bucketing/RollingSinkMigrationTest.java  | 183 +++++++++++++++++++
 ...olling-sink-migration-test-flink1.1-snapshot | Bin 0 -> 1471 bytes
 4 files changed, 216 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/078e2489/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index fc4a35e..98eb2d4 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
 import org.apache.flink.util.Preconditions;
@@ -128,7 +129,8 @@ import java.util.UUID;
  */
 @Deprecated
 public class RollingSink<T> extends RichSinkFunction<T>
-               implements InputTypeConfigurable, CheckpointedFunction, 
CheckpointListener {
+               implements InputTypeConfigurable, CheckpointedFunction,
+                                       CheckpointListener, 
CheckpointedRestoring<RollingSink.BucketState> {
 
        private static final long serialVersionUID = 1L;
 
@@ -336,7 +338,12 @@ public class RollingSink<T> extends RichSinkFunction<T>
                Preconditions.checkArgument(this.restoredBucketStates == null,
                        "The " + getClass().getSimpleName() + " has already 
been initialized.");
 
-               initFileSystem();
+               try {
+                       initFileSystem();
+               } catch (IOException e) {
+                       LOG.error("Error while creating FileSystem when 
initializing the state of the RollingSink.", e);
+                       throw new RuntimeException("Error while creating 
FileSystem when initializing the state of the RollingSink.", e);
+               }
 
                if (this.refTruncate == null) {
                        this.refTruncate = reflectTruncate(fs);
@@ -703,7 +710,7 @@ public class RollingSink<T> extends RichSinkFunction<T>
                                } else {
                                        LOG.debug("Writing valid-length file 
for {} to specify valid length {}", partPath, 
bucketState.currentFileValidLength);
                                        Path validLengthFilePath = 
getValidLengthPathFor(partPath);
-                                       if (!fs.exists(validLengthFilePath)) {
+                                       if (!fs.exists(validLengthFilePath) && 
fs.exists(partPath)) {
                                                FSDataOutputStream 
lengthFileOut = fs.create(validLengthFilePath);
                                                
lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
                                                lengthFileOut.close();
@@ -753,6 +760,25 @@ public class RollingSink<T> extends RichSinkFunction<T>
        }
 
        // 
--------------------------------------------------------------------------------------------
+       //  Backwards compatibility with Flink 1.1
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void restoreState(BucketState state) throws Exception {
+               LOG.info("{} (taskIdx={}) restored bucket state from an older 
Flink version: {}",
+                       getClass().getSimpleName(), 
getRuntimeContext().getIndexOfThisSubtask(), state);
+
+               try {
+                       initFileSystem();
+               } catch (IOException e) {
+                       LOG.error("Error while creating FileSystem when 
restoring the state of the RollingSink.", e);
+                       throw new RuntimeException("Error while creating 
FileSystem when restoring the state of the RollingSink.", e);
+               }
+
+               handleRestoredBucketState(state);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
        //  Setters for User configuration values
        // 
--------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/078e2489/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index cf2c373..e8bff21 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -58,7 +58,7 @@ import java.util.UUID;
 import java.util.Iterator;
 
 /**
- * Sink that emits its input elements to {@link 
org.apache.hadoop.fs.FileSystem} files within
+ * Sink that emits its input elements to {@link FileSystem} files within
  * buckets. This is integrated with the checkpointing mechanism to provide 
exactly once semantics.
  *
  * <p>
@@ -124,9 +124,9 @@ import java.util.Iterator;
  *     </li>
  *     <li>
  *         The part files are written using an instance of {@link Writer}. By 
default, a
- *         {@link org.apache.flink.streaming.connectors.fs.StringWriter} is 
used, which writes the result
- *         of {@code toString()} for every element, separated by newlines. You 
can configure the writer using  the
- *         {@link #setWriter(Writer)}. For example, {@link 
org.apache.flink.streaming.connectors.fs.SequenceFileWriter}
+ *         {@link StringWriter} is used, which writes the result of {@code 
toString()} for
+ *         every element, separated by newlines. You can configure the writer 
using the
+ *         {@link #setWriter(Writer)}. For example, {@link SequenceFileWriter}
  *         can be used to write Hadoop {@code SequenceFiles}.
  *     </li>
  * </ol>

http://git-wip-us.apache.org/repos/asf/flink/blob/078e2489/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
new file mode 100644
index 0000000..0c5e16b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java
@@ -0,0 +1,183 @@
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.fs.RollingSink;
+import org.apache.flink.streaming.connectors.fs.StringWriter;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+@Deprecated
+public class RollingSinkMigrationTest {
+
+       @ClassRule
+       public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+       private static final String PART_PREFIX = "part";
+       private static final String PENDING_SUFFIX = ".pending";
+       private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+       private static final String VALID_LENGTH_SUFFIX = ".valid";
+
+       @Test
+       public void testMigration() throws Exception {
+
+               /*
+               * Code ran to get the snapshot:
+               *
+               * final File outDir = tempFolder.newFolder();
+
+               RollingSink<String> sink = new 
RollingSink<String>(outDir.getAbsolutePath())
+                       .setWriter(new StringWriter<String>())
+                       .setBatchSize(5)
+                       .setPartPrefix(PART_PREFIX)
+                       .setInProgressPrefix("")
+                       .setPendingPrefix("")
+                       .setValidLengthPrefix("")
+                       .setInProgressSuffix(IN_PROGRESS_SUFFIX)
+                       .setPendingSuffix(PENDING_SUFFIX)
+                       .setValidLengthSuffix(VALID_LENGTH_SUFFIX);
+
+               OneInputStreamOperatorTestHarness<String, Object> testHarness1 =
+                       new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink));
+
+               testHarness1.setup();
+               testHarness1.open();
+
+               testHarness1.processElement(new StreamRecord<>("test1", 0L));
+               testHarness1.processElement(new StreamRecord<>("test2", 0L));
+
+               checkFs(outDir, 1, 1, 0, 0);
+
+               testHarness1.processElement(new StreamRecord<>("test3", 0L));
+               testHarness1.processElement(new StreamRecord<>("test4", 0L));
+               testHarness1.processElement(new StreamRecord<>("test5", 0L));
+
+               checkFs(outDir, 1, 4, 0, 0);
+
+               StreamTaskState taskState = testHarness1.snapshot(0, 0);
+               testHarness1.snaphotToFile(taskState, 
"src/test/resources/rolling-sink-migration-test-flink1.1-snapshot");
+               testHarness1.close();
+               * */
+
+               final File outDir = tempFolder.newFolder();
+
+               RollingSink<String> sink = new 
ValidatingRollingSink<String>(outDir.getAbsolutePath())
+                       .setWriter(new StringWriter<String>())
+                       .setBatchSize(5)
+                       .setPartPrefix(PART_PREFIX)
+                       .setInProgressPrefix("")
+                       .setPendingPrefix("")
+                       .setValidLengthPrefix("")
+                       .setInProgressSuffix(IN_PROGRESS_SUFFIX)
+                       .setPendingSuffix(PENDING_SUFFIX)
+                       .setValidLengthSuffix(VALID_LENGTH_SUFFIX);
+
+               OneInputStreamOperatorTestHarness<String, Object> testHarness1 
= new OneInputStreamOperatorTestHarness<>(
+                       new StreamSink<>(sink), 10, 1, 0);
+               testHarness1.setup();
+               
testHarness1.initializeStateFromLegacyCheckpoint(getResourceFilename("rolling-sink-migration-test-flink1.1-snapshot"));
+               testHarness1.open();
+
+               testHarness1.processElement(new StreamRecord<>("test1", 0L));
+               testHarness1.processElement(new StreamRecord<>("test2", 0L));
+
+               checkFs(outDir, 1, 1, 0, 0);
+
+               testHarness1.close();
+       }
+
+       private void checkFs(File outDir, int inprogress, int pending, int 
completed, int valid) throws IOException {
+               int inProg = 0;
+               int pend = 0;
+               int compl = 0;
+               int val = 0;
+
+               for (File file: FileUtils.listFiles(outDir, null, true)) {
+                       if (file.getAbsolutePath().endsWith("crc")) {
+                               continue;
+                       }
+                       String path = file.getPath();
+                       if (path.endsWith(IN_PROGRESS_SUFFIX)) {
+                               inProg++;
+                       } else if (path.endsWith(PENDING_SUFFIX)) {
+                               pend++;
+                       } else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
+                               val++;
+                       } else if (path.contains(PART_PREFIX)) {
+                               compl++;
+                       }
+               }
+
+               Assert.assertEquals(inprogress, inProg);
+               Assert.assertEquals(pending, pend);
+               Assert.assertEquals(completed, compl);
+               Assert.assertEquals(valid, val);
+       }
+
+       private static String getResourceFilename(String filename) {
+               ClassLoader cl = 
RollingSinkMigrationTest.class.getClassLoader();
+               URL resource = cl.getResource(filename);
+               return resource.getFile();
+       }
+
+       static class ValidatingRollingSink<T> extends RollingSink<T> {
+
+               private static final long serialVersionUID = 
-4263974081712009141L;
+
+               ValidatingRollingSink(String basePath) {
+                       super(basePath);
+               }
+
+               @Override
+               public void restoreState(BucketState state) throws Exception {
+
+                       /**
+                        * this validates that we read the state that was 
checkpointed by the previous version. We expect it to be:
+                        * 
In-progress=/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4
+                        *                                      validLength=6
+                        * pendingForNextCheckpoint=[]
+                        * pendingForPrevCheckpoints={0=[       
/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-0,
+                        *                                                      
                
/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-1,
+                        *                                                      
                
/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-2,
+                        *                                                      
                
/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-3]}
+                        * */
+
+                       String current = state.currentFile;
+                       long validLength = state.currentFileValidLength;
+
+                       
Assert.assertEquals("/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4",
 current);
+                       Assert.assertEquals(6, validLength);
+
+                       List<String> pendingFiles = state.pendingFiles;
+                       Assert.assertTrue(pendingFiles.isEmpty());
+
+                       final Map<Long, List<String>> pendingFilesPerCheckpoint 
= state.pendingFilesPerCheckpoint;
+                       Assert.assertEquals(1, 
pendingFilesPerCheckpoint.size());
+
+                       for (Map.Entry<Long, List<String>> entry: 
pendingFilesPerCheckpoint.entrySet()) {
+                               long checkpoint = entry.getKey();
+                               List<String> files = entry.getValue();
+
+                               Assert.assertEquals(0L, checkpoint);
+                               Assert.assertEquals(4, files.size());
+
+                               for (int i = 0; i < 4; i++) {
+                                       Assert.assertEquals(
+                                               
"/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-"
 + i,
+                                               files.get(i));
+                               }
+                       }
+                       super.restoreState(state);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/078e2489/flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot
 
b/flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot
new file mode 100644
index 0000000..2ebd70a
Binary files /dev/null and 
b/flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot
 differ

Reply via email to