[FLINK-6830] [tests] Add StatefulJobSavepointFrom13MigrationITCase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a4d016e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a4d016e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a4d016e Branch: refs/heads/release-1.3 Commit: 7a4d016eff4eccaaa2955e2d41ffba7b3bef17c4 Parents: f74caf7 Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Authored: Fri Jun 2 17:18:51 2017 +0200 Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Committed: Wed Jun 7 18:52:16 2017 +0200 ---------------------------------------------------------------------- .../utils/SavepointMigrationTestBase.java | 8 ++- ...atefulJobSavepointFrom12MigrationITCase.java | 19 +++--- ...atefulJobSavepointFrom13MigrationITCase.java | 58 +++++++++++++++++++ .../_metadata | Bin 0 -> 36467 bytes .../_metadata | Bin 0 -> 36395 bytes 5 files changed, 77 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7a4d016e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java index c5672a2..e4004c7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java @@ -169,7 +169,13 @@ public class SavepointMigrationTestBase extends TestBaseUtils { final String jobmanagerSavepointPath = ((JobManagerMessages.TriggerSavepointSuccess) savepointResult).savepointPath(); LOG.info("Saved savepoint: " + jobmanagerSavepointPath); - FileUtils.moveFile(new File(new URI(jobmanagerSavepointPath).getPath()), new File(savepointPath)); + File jobManagerSavepoint = new File(new URI(jobmanagerSavepointPath).getPath()); + // savepoints were changed to be directories in Flink 1.3 + if (jobManagerSavepoint.isDirectory()) { + FileUtils.moveDirectory(jobManagerSavepoint, new File(savepointPath)); + } else { + FileUtils.moveFile(jobManagerSavepoint, new File(savepointPath)); + } } @SafeVarargs http://git-wip-us.apache.org/repos/asf/flink/blob/7a4d016e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java index 5f03195..4a1d181 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java @@ -76,7 +76,6 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - // we only test memory state backend yet env.setStateBackend(new MemoryStateBackend()); env.enableCheckpointing(500); env.setParallelism(4); @@ -103,7 +102,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio executeAndSavepoint( env, - "src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint", + "src/test/resources/" + getSavepointPath(), new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS)); } @@ -144,7 +143,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio executeAndSavepoint( env, - "src/test/resources/stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint", + "src/test/resources/" + getRocksDBSavepointPath(), new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS)); } @@ -155,7 +154,6 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.noRestart()); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - // we only test memory state backend yet env.setStateBackend(new MemoryStateBackend()); env.enableCheckpointing(500); env.setParallelism(4); @@ -182,7 +180,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio restoreAndExecute( env, - getResourceFilename("stateful-udf-migration-itcase-flink1.2-savepoint"), + getResourceFilename(getSavepointPath()), new Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1), new Tuple2<>(CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), new Tuple2<>(CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), @@ -201,7 +199,6 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.noRestart()); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - // we only test memory state backend yet env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())); env.enableCheckpointing(500); env.setParallelism(4); @@ -228,7 +225,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio restoreAndExecute( env, - getResourceFilename("stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint"), + getResourceFilename(getRocksDBSavepointPath()), new Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1), new Tuple2<>(CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), new Tuple2<>(CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS), @@ -241,6 +238,14 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS)); } + protected String getSavepointPath() { + return "stateful-udf-migration-itcase-flink1.2-savepoint"; + } + + protected String getRocksDBSavepointPath() { + return "stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint"; + } + private static class LegacyCheckpointedSource implements SourceFunction<Tuple2<Long, Long>>, Checkpointed<String> { http://git-wip-us.apache.org/repos/asf/flink/blob/7a4d016e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom13MigrationITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom13MigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom13MigrationITCase.java new file mode 100644 index 0000000..a2d3201 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom13MigrationITCase.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.junit.Ignore; +import org.junit.Test; + +/** + * This verifies that we can restore a complete job from a Flink 1.3 savepoint. + * + * <p>The test pipeline contains both "Checkpointed" state and keyed user state. + */ +public class StatefulJobSavepointFrom13MigrationITCase extends StatefulJobSavepointFrom12MigrationITCase { + + /** + * This has to be manually executed to create the savepoint on Flink 1.3. + */ + @Test + @Ignore + public void testCreateSavepointOnFlink13() throws Exception { + testCreateSavepointOnFlink12(); + } + + /** + * This has to be manually executed to create the savepoint on Flink 1.3. + */ + @Test + @Ignore + public void testCreateSavepointOnFlink13WithRocksDB() throws Exception { + testCreateSavepointOnFlink12WithRocksDB(); + } + + @Override + protected String getSavepointPath() { + return "stateful-udf-migration-itcase-flink1.3-savepoint"; + } + + @Override + protected String getRocksDBSavepointPath() { + return "stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint"; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7a4d016e/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata new file mode 100644 index 0000000..8f22bcb Binary files /dev/null and b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata differ http://git-wip-us.apache.org/repos/asf/flink/blob/7a4d016e/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-savepoint/_metadata ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-savepoint/_metadata b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-savepoint/_metadata new file mode 100644 index 0000000..8ca91ec Binary files /dev/null and b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-savepoint/_metadata differ