[FLINK-5318] Make the RollingSink backwards compatible.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/078e2489 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/078e2489 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/078e2489 Branch: refs/heads/release-1.2 Commit: 078e2489180b7544f2af48afef8147401cd9ebd6 Parents: 2215f82 Author: kl0u <kklou...@gmail.com> Authored: Fri Jan 6 15:38:28 2017 +0100 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Fri Jan 13 11:38:44 2017 +0100 ---------------------------------------------------------------------- .../streaming/connectors/fs/RollingSink.java | 32 +++- .../connectors/fs/bucketing/BucketingSink.java | 8 +- .../fs/bucketing/RollingSinkMigrationTest.java | 183 +++++++++++++++++++ ...olling-sink-migration-test-flink1.1-snapshot | Bin 0 -> 1471 bytes 4 files changed, 216 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/078e2489/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java index fc4a35e..98eb2d4 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; import org.apache.flink.util.Preconditions; @@ -128,7 +129,8 @@ import java.util.UUID; */ @Deprecated public class RollingSink<T> extends RichSinkFunction<T> - implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener { + implements InputTypeConfigurable, CheckpointedFunction, + CheckpointListener, CheckpointedRestoring<RollingSink.BucketState> { private static final long serialVersionUID = 1L; @@ -336,7 +338,12 @@ public class RollingSink<T> extends RichSinkFunction<T> Preconditions.checkArgument(this.restoredBucketStates == null, "The " + getClass().getSimpleName() + " has already been initialized."); - initFileSystem(); + try { + initFileSystem(); + } catch (IOException e) { + LOG.error("Error while creating FileSystem when initializing the state of the RollingSink.", e); + throw new RuntimeException("Error while creating FileSystem when initializing the state of the RollingSink.", e); + } if (this.refTruncate == null) { this.refTruncate = reflectTruncate(fs); @@ -703,7 +710,7 @@ public class RollingSink<T> extends RichSinkFunction<T> } else { LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength); Path validLengthFilePath = getValidLengthPathFor(partPath); - if (!fs.exists(validLengthFilePath)) { + if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) { FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath); lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength)); lengthFileOut.close(); @@ -753,6 +760,25 @@ public class RollingSink<T> extends RichSinkFunction<T> } // -------------------------------------------------------------------------------------------- + // Backwards compatibility with Flink 1.1 + // -------------------------------------------------------------------------------------------- + + @Override + public void restoreState(BucketState state) throws Exception { + LOG.info("{} (taskIdx={}) restored bucket state from an older Flink version: {}", + getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), state); + + try { + initFileSystem(); + } catch (IOException e) { + LOG.error("Error while creating FileSystem when restoring the state of the RollingSink.", e); + throw new RuntimeException("Error while creating FileSystem when restoring the state of the RollingSink.", e); + } + + handleRestoredBucketState(state); + } + + // -------------------------------------------------------------------------------------------- // Setters for User configuration values // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/078e2489/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java index cf2c373..e8bff21 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java @@ -58,7 +58,7 @@ import java.util.UUID; import java.util.Iterator; /** - * Sink that emits its input elements to {@link org.apache.hadoop.fs.FileSystem} files within + * Sink that emits its input elements to {@link FileSystem} files within * buckets. This is integrated with the checkpointing mechanism to provide exactly once semantics. * * <p> @@ -124,9 +124,9 @@ import java.util.Iterator; * </li> * <li> * The part files are written using an instance of {@link Writer}. By default, a - * {@link org.apache.flink.streaming.connectors.fs.StringWriter} is used, which writes the result - * of {@code toString()} for every element, separated by newlines. You can configure the writer using the - * {@link #setWriter(Writer)}. For example, {@link org.apache.flink.streaming.connectors.fs.SequenceFileWriter} + * {@link StringWriter} is used, which writes the result of {@code toString()} for + * every element, separated by newlines. You can configure the writer using the + * {@link #setWriter(Writer)}. For example, {@link SequenceFileWriter} * can be used to write Hadoop {@code SequenceFiles}. * </li> * </ol> http://git-wip-us.apache.org/repos/asf/flink/blob/078e2489/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java new file mode 100644 index 0000000..0c5e16b --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/RollingSinkMigrationTest.java @@ -0,0 +1,183 @@ +package org.apache.flink.streaming.connectors.fs.bucketing; + +import org.apache.commons.io.FileUtils; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.fs.RollingSink; +import org.apache.flink.streaming.connectors.fs.StringWriter; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.List; +import java.util.Map; + +@Deprecated +public class RollingSinkMigrationTest { + + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); + + private static final String PART_PREFIX = "part"; + private static final String PENDING_SUFFIX = ".pending"; + private static final String IN_PROGRESS_SUFFIX = ".in-progress"; + private static final String VALID_LENGTH_SUFFIX = ".valid"; + + @Test + public void testMigration() throws Exception { + + /* + * Code ran to get the snapshot: + * + * final File outDir = tempFolder.newFolder(); + + RollingSink<String> sink = new RollingSink<String>(outDir.getAbsolutePath()) + .setWriter(new StringWriter<String>()) + .setBatchSize(5) + .setPartPrefix(PART_PREFIX) + .setInProgressPrefix("") + .setPendingPrefix("") + .setValidLengthPrefix("") + .setInProgressSuffix(IN_PROGRESS_SUFFIX) + .setPendingSuffix(PENDING_SUFFIX) + .setValidLengthSuffix(VALID_LENGTH_SUFFIX); + + OneInputStreamOperatorTestHarness<String, Object> testHarness1 = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink)); + + testHarness1.setup(); + testHarness1.open(); + + testHarness1.processElement(new StreamRecord<>("test1", 0L)); + testHarness1.processElement(new StreamRecord<>("test2", 0L)); + + checkFs(outDir, 1, 1, 0, 0); + + testHarness1.processElement(new StreamRecord<>("test3", 0L)); + testHarness1.processElement(new StreamRecord<>("test4", 0L)); + testHarness1.processElement(new StreamRecord<>("test5", 0L)); + + checkFs(outDir, 1, 4, 0, 0); + + StreamTaskState taskState = testHarness1.snapshot(0, 0); + testHarness1.snaphotToFile(taskState, "src/test/resources/rolling-sink-migration-test-flink1.1-snapshot"); + testHarness1.close(); + * */ + + final File outDir = tempFolder.newFolder(); + + RollingSink<String> sink = new ValidatingRollingSink<String>(outDir.getAbsolutePath()) + .setWriter(new StringWriter<String>()) + .setBatchSize(5) + .setPartPrefix(PART_PREFIX) + .setInProgressPrefix("") + .setPendingPrefix("") + .setValidLengthPrefix("") + .setInProgressSuffix(IN_PROGRESS_SUFFIX) + .setPendingSuffix(PENDING_SUFFIX) + .setValidLengthSuffix(VALID_LENGTH_SUFFIX); + + OneInputStreamOperatorTestHarness<String, Object> testHarness1 = new OneInputStreamOperatorTestHarness<>( + new StreamSink<>(sink), 10, 1, 0); + testHarness1.setup(); + testHarness1.initializeStateFromLegacyCheckpoint(getResourceFilename("rolling-sink-migration-test-flink1.1-snapshot")); + testHarness1.open(); + + testHarness1.processElement(new StreamRecord<>("test1", 0L)); + testHarness1.processElement(new StreamRecord<>("test2", 0L)); + + checkFs(outDir, 1, 1, 0, 0); + + testHarness1.close(); + } + + private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException { + int inProg = 0; + int pend = 0; + int compl = 0; + int val = 0; + + for (File file: FileUtils.listFiles(outDir, null, true)) { + if (file.getAbsolutePath().endsWith("crc")) { + continue; + } + String path = file.getPath(); + if (path.endsWith(IN_PROGRESS_SUFFIX)) { + inProg++; + } else if (path.endsWith(PENDING_SUFFIX)) { + pend++; + } else if (path.endsWith(VALID_LENGTH_SUFFIX)) { + val++; + } else if (path.contains(PART_PREFIX)) { + compl++; + } + } + + Assert.assertEquals(inprogress, inProg); + Assert.assertEquals(pending, pend); + Assert.assertEquals(completed, compl); + Assert.assertEquals(valid, val); + } + + private static String getResourceFilename(String filename) { + ClassLoader cl = RollingSinkMigrationTest.class.getClassLoader(); + URL resource = cl.getResource(filename); + return resource.getFile(); + } + + static class ValidatingRollingSink<T> extends RollingSink<T> { + + private static final long serialVersionUID = -4263974081712009141L; + + ValidatingRollingSink(String basePath) { + super(basePath); + } + + @Override + public void restoreState(BucketState state) throws Exception { + + /** + * this validates that we read the state that was checkpointed by the previous version. We expect it to be: + * In-progress=/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4 + * validLength=6 + * pendingForNextCheckpoint=[] + * pendingForPrevCheckpoints={0=[ /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-0, + * /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-1, + * /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-2, + * /var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-3]} + * */ + + String current = state.currentFile; + long validLength = state.currentFileValidLength; + + Assert.assertEquals("/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-4", current); + Assert.assertEquals(6, validLength); + + List<String> pendingFiles = state.pendingFiles; + Assert.assertTrue(pendingFiles.isEmpty()); + + final Map<Long, List<String>> pendingFilesPerCheckpoint = state.pendingFilesPerCheckpoint; + Assert.assertEquals(1, pendingFilesPerCheckpoint.size()); + + for (Map.Entry<Long, List<String>> entry: pendingFilesPerCheckpoint.entrySet()) { + long checkpoint = entry.getKey(); + List<String> files = entry.getValue(); + + Assert.assertEquals(0L, checkpoint); + Assert.assertEquals(4, files.size()); + + for (int i = 0; i < 4; i++) { + Assert.assertEquals( + "/var/folders/z5/fxvg1j6s6mn94nsf8b1yc8s80000gn/T/junit2927527303216950257/junit5645682027227039270/2017-01-09--18/part-0-" + i, + files.get(i)); + } + } + super.restoreState(state); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/078e2489/flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot b/flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot new file mode 100644 index 0000000..2ebd70a Binary files /dev/null and b/flink-connectors/flink-connector-filesystem/src/test/resources/rolling-sink-migration-test-flink1.1-snapshot differ