[FLINK-6830] [fileSink] Port BucketingSinkFrom12MigrationTest for Flink 1.3
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0544b448 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0544b448 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0544b448 Branch: refs/heads/release-1.3 Commit: 0544b44808d3680fa22695f23d69e341820afde9 Parents: 8dec81c Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Authored: Sat Jun 3 23:45:22 2017 +0200 Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Committed: Wed Jun 7 19:06:15 2017 +0200 ---------------------------------------------------------------------- .../BucketingSinkFrom12MigrationTest.java | 223 ---------------- .../bucketing/BucketingSinkMigrationTest.java | 262 +++++++++++++++++++ ...keting-sink-migration-test-flink1.3-snapshot | Bin 0 -> 1862 bytes 3 files changed, 262 insertions(+), 223 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0544b448/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFrom12MigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFrom12MigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFrom12MigrationTest.java deleted file mode 100644 index 350b7b4..0000000 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFrom12MigrationTest.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.fs.bucketing; - -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import org.apache.commons.io.FileUtils; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.OperatorStateStore; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.connectors.fs.StringWriter; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.OperatorSnapshotUtil; -import org.apache.hadoop.fs.Path; -import org.junit.Assert; -import org.junit.ClassRule; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -/** - * Tests for checking whether {@link BucketingSink} can restore from snapshots that were done - * using the Flink 1.2 {@link BucketingSink}. - * - * <p>For regenerating the binary snapshot file you have to run the {@code write*()} method on - * the Flink 1.2 branch. - */ - -public class BucketingSinkFrom12MigrationTest { - - @ClassRule - public static TemporaryFolder tempFolder = new TemporaryFolder(); - - private static final String PART_PREFIX = "part"; - private static final String PENDING_SUFFIX = ".pending"; - private static final String IN_PROGRESS_SUFFIX = ".in-progress"; - private static final String VALID_LENGTH_SUFFIX = ".valid"; - - /** - * Manually run this to write binary snapshot data. Remove @Ignore to run. - */ - @Ignore - @Test - public void writeSnapshot() throws Exception { - - final File outDir = tempFolder.newFolder(); - - BucketingSink<String> sink = new BucketingSink<String>(outDir.getAbsolutePath()) - .setWriter(new StringWriter<String>()) - .setBatchSize(5) - .setPartPrefix(PART_PREFIX) - .setInProgressPrefix("") - .setPendingPrefix("") - .setValidLengthPrefix("") - .setInProgressSuffix(IN_PROGRESS_SUFFIX) - .setPendingSuffix(PENDING_SUFFIX) - .setValidLengthSuffix(VALID_LENGTH_SUFFIX); - - OneInputStreamOperatorTestHarness<String, Object> testHarness = - new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink)); - - testHarness.setup(); - testHarness.open(); - - testHarness.processElement(new StreamRecord<>("test1", 0L)); - testHarness.processElement(new StreamRecord<>("test2", 0L)); - - checkFs(outDir, 1, 1, 0, 0); - - testHarness.processElement(new StreamRecord<>("test3", 0L)); - testHarness.processElement(new StreamRecord<>("test4", 0L)); - testHarness.processElement(new StreamRecord<>("test5", 0L)); - - checkFs(outDir, 1, 4, 0, 0); - - OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L); - - OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/bucketing-sink-migration-test-flink1.2-snapshot"); - testHarness.close(); - } - - @Test - public void testRestore() throws Exception { - final File outDir = tempFolder.newFolder(); - - ValidatingBucketingSink<String> sink = (ValidatingBucketingSink<String>) new ValidatingBucketingSink<String>(outDir.getAbsolutePath()) - .setWriter(new StringWriter<String>()) - .setBatchSize(5) - .setPartPrefix(PART_PREFIX) - .setInProgressPrefix("") - .setPendingPrefix("") - .setValidLengthPrefix("") - .setInProgressSuffix(IN_PROGRESS_SUFFIX) - .setPendingSuffix(PENDING_SUFFIX) - .setValidLengthSuffix(VALID_LENGTH_SUFFIX); - - OneInputStreamOperatorTestHarness<String, Object> testHarness = new OneInputStreamOperatorTestHarness<>( - new StreamSink<>(sink), 10, 1, 0); - testHarness.setup(); - testHarness.initializeState( - OperatorSnapshotUtil.readStateHandle( - OperatorSnapshotUtil.getResourceFilename("bucketing-sink-migration-test-flink1.2-snapshot"))); - testHarness.open(); - - assertTrue(sink.initializeCalled); - - testHarness.processElement(new StreamRecord<>("test1", 0L)); - testHarness.processElement(new StreamRecord<>("test2", 0L)); - - checkFs(outDir, 1, 1, 0, 0); - - testHarness.close(); - } - - private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException { - int inProg = 0; - int pend = 0; - int compl = 0; - int val = 0; - - for (File file: FileUtils.listFiles(outDir, null, true)) { - if (file.getAbsolutePath().endsWith("crc")) { - continue; - } - String path = file.getPath(); - if (path.endsWith(IN_PROGRESS_SUFFIX)) { - inProg++; - } else if (path.endsWith(PENDING_SUFFIX)) { - pend++; - } else if (path.endsWith(VALID_LENGTH_SUFFIX)) { - val++; - } else if (path.contains(PART_PREFIX)) { - compl++; - } - } - - Assert.assertEquals(inprogress, inProg); - Assert.assertEquals(pending, pend); - Assert.assertEquals(completed, compl); - Assert.assertEquals(valid, val); - } - - static class ValidatingBucketingSink<T> extends BucketingSink<T> { - - private static final long serialVersionUID = -4263974081712009141L; - - public boolean initializeCalled = false; - - ValidatingBucketingSink(String basePath) { - super(basePath); - } - - /** - * The actual paths in this depend on the binary checkpoint so it you update this the paths - * here have to be updated as well. - */ - @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - OperatorStateStore stateStore = context.getOperatorStateStore(); - - ListState<State<T>> restoredBucketStates = stateStore.getSerializableListState("bucket-states"); - - if (context.isRestored()) { - - for (State<T> states : restoredBucketStates.get()) { - for (String bucketPath : states.bucketStates.keySet()) { - BucketState state = states.getBucketState(new Path(bucketPath)); - String current = state.currentFile; - long validLength = state.currentFileValidLength; - - Assert.assertEquals("/var/folders/v_/ry2wp5fx0y7c1rvr41xy9_700000gn/T/junit9160378385359106772/junit479663758539998903/1970-01-01--01/part-0-4", current); - Assert.assertEquals(6, validLength); - - List<String> pendingFiles = state.pendingFiles; - assertTrue(pendingFiles.isEmpty()); - - final Map<Long, List<String>> pendingFilesPerCheckpoint = state.pendingFilesPerCheckpoint; - Assert.assertEquals(1, pendingFilesPerCheckpoint.size()); - - for (Map.Entry<Long, List<String>> entry: pendingFilesPerCheckpoint.entrySet()) { - long checkpoint = entry.getKey(); - List<String> files = entry.getValue(); - - Assert.assertEquals(0L, checkpoint); - Assert.assertEquals(4, files.size()); - - for (int i = 0; i < 4; i++) { - Assert.assertEquals( - "/var/folders/v_/ry2wp5fx0y7c1rvr41xy9_700000gn/T/junit9160378385359106772/junit479663758539998903/1970-01-01--01/part-0-" + i, - files.get(i)); - } - } - } - } - } - - initializeCalled = true; - super.initializeState(context); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0544b448/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java new file mode 100644 index 0000000..d3383f3 --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.fs.bucketing; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.fs.StringWriter; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OperatorSnapshotUtil; +import org.apache.flink.streaming.util.migration.MigrationTestUtil; +import org.apache.flink.streaming.util.migration.MigrationVersion; + +import org.apache.hadoop.fs.Path; + +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; + +/** + * Tests for checking whether {@link BucketingSink} can restore from snapshots that were done + * using previous Flink versions' {@link BucketingSink}. + * + * <p>For regenerating the binary snapshot file you have to run the {@code write*()} method on + * the corresponding Flink release-* branch. + */ +@RunWith(Parameterized.class) +public class BucketingSinkMigrationTest { + + /** + * TODO change this to the corresponding savepoint version to be written (e.g. {@link MigrationVersion#v1_3} for 1.3) + * TODO and remove all @Ignore annotations on write*Snapshot() methods to generate savepoints + */ + private final MigrationVersion flinkGenerateSavepointVersion = null; + + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); + + private static final String PART_PREFIX = "part"; + private static final String PENDING_SUFFIX = ".pending"; + private static final String IN_PROGRESS_SUFFIX = ".in-progress"; + private static final String VALID_LENGTH_SUFFIX = ".valid"; + + @Parameterized.Parameters(name = "Migration Savepoint / Bucket Files Prefix: {0}") + public static Collection<Tuple2<MigrationVersion, String>> parameters () { + return Arrays.asList( + Tuple2.of(MigrationVersion.v1_2, "/var/folders/v_/ry2wp5fx0y7c1rvr41xy9_700000gn/T/junit9160378385359106772/junit479663758539998903/1970-01-01--01/part-0-"), + Tuple2.of(MigrationVersion.v1_3, "/var/folders/tv/b_1d8fvx23dgk1_xs8db_95h0000gn/T/junit4273542175898623023/junit3801102997056424640/1970-01-01--01/part-0-")); + } + + private final MigrationVersion testMigrateVersion; + private final String expectedBucketFilesPrefix; + + public BucketingSinkMigrationTest(Tuple2<MigrationVersion, String> migrateVersionAndExpectedBucketFilesPrefix) { + this.testMigrateVersion = migrateVersionAndExpectedBucketFilesPrefix.f0; + this.expectedBucketFilesPrefix = migrateVersionAndExpectedBucketFilesPrefix.f1; + } + + /** + * Manually run this to write binary snapshot data. Remove @Ignore to run. + */ + @Ignore + @Test + public void writeSnapshot() throws Exception { + + final File outDir = tempFolder.newFolder(); + + BucketingSink<String> sink = new BucketingSink<String>(outDir.getAbsolutePath()) + .setWriter(new StringWriter<String>()) + .setBatchSize(5) + .setPartPrefix(PART_PREFIX) + .setInProgressPrefix("") + .setPendingPrefix("") + .setValidLengthPrefix("") + .setInProgressSuffix(IN_PROGRESS_SUFFIX) + .setPendingSuffix(PENDING_SUFFIX) + .setValidLengthSuffix(VALID_LENGTH_SUFFIX); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink)); + + testHarness.setup(); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>("test1", 0L)); + testHarness.processElement(new StreamRecord<>("test2", 0L)); + + checkFs(outDir, 1, 1, 0, 0); + + testHarness.processElement(new StreamRecord<>("test3", 0L)); + testHarness.processElement(new StreamRecord<>("test4", 0L)); + testHarness.processElement(new StreamRecord<>("test5", 0L)); + + checkFs(outDir, 1, 4, 0, 0); + + OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L); + + OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/bucketing-sink-migration-test-flink" + flinkGenerateSavepointVersion + "-snapshot"); + testHarness.close(); + } + + @Test + public void testRestore() throws Exception { + final File outDir = tempFolder.newFolder(); + + ValidatingBucketingSink<String> sink = (ValidatingBucketingSink<String>) + new ValidatingBucketingSink<String>(outDir.getAbsolutePath(), expectedBucketFilesPrefix) + .setWriter(new StringWriter<String>()) + .setBatchSize(5) + .setPartPrefix(PART_PREFIX) + .setInProgressPrefix("") + .setPendingPrefix("") + .setValidLengthPrefix("") + .setInProgressSuffix(IN_PROGRESS_SUFFIX) + .setPendingSuffix(PENDING_SUFFIX) + .setValidLengthSuffix(VALID_LENGTH_SUFFIX); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = new OneInputStreamOperatorTestHarness<>( + new StreamSink<>(sink), 10, 1, 0); + testHarness.setup(); + + MigrationTestUtil.restoreFromSnapshot( + testHarness, + OperatorSnapshotUtil.getResourceFilename( + "bucketing-sink-migration-test-flink" + testMigrateVersion + "-snapshot"), + testMigrateVersion); + + testHarness.open(); + + assertTrue(sink.initializeCalled); + + testHarness.processElement(new StreamRecord<>("test1", 0L)); + testHarness.processElement(new StreamRecord<>("test2", 0L)); + + checkFs(outDir, 1, 1, 0, 0); + + testHarness.close(); + } + + private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException { + int inProg = 0; + int pend = 0; + int compl = 0; + int val = 0; + + for (File file: FileUtils.listFiles(outDir, null, true)) { + if (file.getAbsolutePath().endsWith("crc")) { + continue; + } + String path = file.getPath(); + if (path.endsWith(IN_PROGRESS_SUFFIX)) { + inProg++; + } else if (path.endsWith(PENDING_SUFFIX)) { + pend++; + } else if (path.endsWith(VALID_LENGTH_SUFFIX)) { + val++; + } else if (path.contains(PART_PREFIX)) { + compl++; + } + } + + Assert.assertEquals(inprogress, inProg); + Assert.assertEquals(pending, pend); + Assert.assertEquals(completed, compl); + Assert.assertEquals(valid, val); + } + + static class ValidatingBucketingSink<T> extends BucketingSink<T> { + + private static final long serialVersionUID = -4263974081712009141L; + + public boolean initializeCalled = false; + + private final String expectedBucketFilesPrefix; + + ValidatingBucketingSink(String basePath, String expectedBucketFilesPrefix) { + super(basePath); + this.expectedBucketFilesPrefix = expectedBucketFilesPrefix; + } + + /** + * The actual paths in this depend on the binary checkpoint so it you update this the paths + * here have to be updated as well. + */ + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + OperatorStateStore stateStore = context.getOperatorStateStore(); + + ListState<State<T>> restoredBucketStates = stateStore.getSerializableListState("bucket-states"); + + if (context.isRestored()) { + + for (State<T> states : restoredBucketStates.get()) { + for (String bucketPath : states.bucketStates.keySet()) { + BucketState state = states.getBucketState(new Path(bucketPath)); + String current = state.currentFile; + long validLength = state.currentFileValidLength; + + Assert.assertEquals(expectedBucketFilesPrefix + "4", current); + Assert.assertEquals(6, validLength); + + List<String> pendingFiles = state.pendingFiles; + assertTrue(pendingFiles.isEmpty()); + + final Map<Long, List<String>> pendingFilesPerCheckpoint = state.pendingFilesPerCheckpoint; + Assert.assertEquals(1, pendingFilesPerCheckpoint.size()); + + for (Map.Entry<Long, List<String>> entry: pendingFilesPerCheckpoint.entrySet()) { + long checkpoint = entry.getKey(); + List<String> files = entry.getValue(); + + Assert.assertEquals(0L, checkpoint); + Assert.assertEquals(4, files.size()); + + for (int i = 0; i < 4; i++) { + Assert.assertEquals( + expectedBucketFilesPrefix + i, + files.get(i)); + } + } + } + } + } + + initializeCalled = true; + super.initializeState(context); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0544b448/flink-connectors/flink-connector-filesystem/src/test/resources/bucketing-sink-migration-test-flink1.3-snapshot ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/resources/bucketing-sink-migration-test-flink1.3-snapshot b/flink-connectors/flink-connector-filesystem/src/test/resources/bucketing-sink-migration-test-flink1.3-snapshot new file mode 100644 index 0000000..765e8bf Binary files /dev/null and b/flink-connectors/flink-connector-filesystem/src/test/resources/bucketing-sink-migration-test-flink1.3-snapshot differ