[FLINK-6830] [tests] Add StatefulJobSavepointFrom13MigrationITCase

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a4d016e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a4d016e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a4d016e

Branch: refs/heads/release-1.3
Commit: 7a4d016eff4eccaaa2955e2d41ffba7b3bef17c4
Parents: f74caf7
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Authored: Fri Jun 2 17:18:51 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Wed Jun 7 18:52:16 2017 +0200

----------------------------------------------------------------------
 .../utils/SavepointMigrationTestBase.java       |   8 ++-
 ...atefulJobSavepointFrom12MigrationITCase.java |  19 +++---
 ...atefulJobSavepointFrom13MigrationITCase.java |  58 +++++++++++++++++++
 .../_metadata                                   | Bin 0 -> 36467 bytes
 .../_metadata                                   | Bin 0 -> 36395 bytes
 5 files changed, 77 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7a4d016e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index c5672a2..e4004c7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -169,7 +169,13 @@ public class SavepointMigrationTestBase extends 
TestBaseUtils {
                final String jobmanagerSavepointPath = 
((JobManagerMessages.TriggerSavepointSuccess) savepointResult).savepointPath();
                LOG.info("Saved savepoint: " + jobmanagerSavepointPath);
 
-               FileUtils.moveFile(new File(new 
URI(jobmanagerSavepointPath).getPath()), new File(savepointPath));
+               File jobManagerSavepoint = new File(new 
URI(jobmanagerSavepointPath).getPath());
+               // savepoints were changed to be directories in Flink 1.3
+               if (jobManagerSavepoint.isDirectory()) {
+                       FileUtils.moveDirectory(jobManagerSavepoint, new 
File(savepointPath));
+               } else {
+                       FileUtils.moveFile(jobManagerSavepoint, new 
File(savepointPath));
+               }
        }
 
        @SafeVarargs

http://git-wip-us.apache.org/repos/asf/flink/blob/7a4d016e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
index 5f03195..4a1d181 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
@@ -76,7 +76,6 @@ public class StatefulJobSavepointFrom12MigrationITCase 
extends SavepointMigratio
 
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-               // we only test memory state backend yet
                env.setStateBackend(new MemoryStateBackend());
                env.enableCheckpointing(500);
                env.setParallelism(4);
@@ -103,7 +102,7 @@ public class StatefulJobSavepointFrom12MigrationITCase 
extends SavepointMigratio
 
                executeAndSavepoint(
                                env,
-                               
"src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint",
+                               "src/test/resources/" + getSavepointPath(),
                                new 
Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, 
NUM_SOURCE_ELEMENTS));
        }
 
@@ -144,7 +143,7 @@ public class StatefulJobSavepointFrom12MigrationITCase 
extends SavepointMigratio
 
                executeAndSavepoint(
                                env,
-                               
"src/test/resources/stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint",
+                               "src/test/resources/" + 
getRocksDBSavepointPath(),
                                new 
Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, 
NUM_SOURCE_ELEMENTS));
        }
 
@@ -155,7 +154,6 @@ public class StatefulJobSavepointFrom12MigrationITCase 
extends SavepointMigratio
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setRestartStrategy(RestartStrategies.noRestart());
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-               // we only test memory state backend yet
                env.setStateBackend(new MemoryStateBackend());
                env.enableCheckpointing(500);
                env.setParallelism(4);
@@ -182,7 +180,7 @@ public class StatefulJobSavepointFrom12MigrationITCase 
extends SavepointMigratio
 
                restoreAndExecute(
                                env,
-                               
getResourceFilename("stateful-udf-migration-itcase-flink1.2-savepoint"),
+                               getResourceFilename(getSavepointPath()),
                                new 
Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1),
                                new 
Tuple2<>(CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 
NUM_SOURCE_ELEMENTS),
                                new 
Tuple2<>(CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
 NUM_SOURCE_ELEMENTS),
@@ -201,7 +199,6 @@ public class StatefulJobSavepointFrom12MigrationITCase 
extends SavepointMigratio
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setRestartStrategy(RestartStrategies.noRestart());
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-               // we only test memory state backend yet
                env.setStateBackend(new RocksDBStateBackend(new 
MemoryStateBackend()));
                env.enableCheckpointing(500);
                env.setParallelism(4);
@@ -228,7 +225,7 @@ public class StatefulJobSavepointFrom12MigrationITCase 
extends SavepointMigratio
 
                restoreAndExecute(
                                env,
-                               
getResourceFilename("stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint"),
+                               getResourceFilename(getRocksDBSavepointPath()),
                                new 
Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1),
                                new 
Tuple2<>(CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 
NUM_SOURCE_ELEMENTS),
                                new 
Tuple2<>(CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR,
 NUM_SOURCE_ELEMENTS),
@@ -241,6 +238,14 @@ public class StatefulJobSavepointFrom12MigrationITCase 
extends SavepointMigratio
                                new 
Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, 
NUM_SOURCE_ELEMENTS));
        }
 
+       protected String getSavepointPath() {
+               return "stateful-udf-migration-itcase-flink1.2-savepoint";
+       }
+
+       protected String getRocksDBSavepointPath() {
+               return 
"stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint";
+       }
+
        private static class LegacyCheckpointedSource
                        implements SourceFunction<Tuple2<Long, Long>>, 
Checkpointed<String> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7a4d016e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom13MigrationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom13MigrationITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom13MigrationITCase.java
new file mode 100644
index 0000000..a2d3201
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom13MigrationITCase.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * This verifies that we can restore a complete job from a Flink 1.3 savepoint.
+ *
+ * <p>The test pipeline contains both "Checkpointed" state and keyed user 
state.
+ */
+public class StatefulJobSavepointFrom13MigrationITCase extends 
StatefulJobSavepointFrom12MigrationITCase {
+
+       /**
+        * This has to be manually executed to create the savepoint on Flink 
1.3.
+        */
+       @Test
+       @Ignore
+       public void testCreateSavepointOnFlink13() throws Exception {
+               testCreateSavepointOnFlink12();
+       }
+
+       /**
+        * This has to be manually executed to create the savepoint on Flink 
1.3.
+        */
+       @Test
+       @Ignore
+       public void testCreateSavepointOnFlink13WithRocksDB() throws Exception {
+               testCreateSavepointOnFlink12WithRocksDB();
+       }
+
+       @Override
+       protected String getSavepointPath() {
+               return "stateful-udf-migration-itcase-flink1.3-savepoint";
+       }
+
+       @Override
+       protected String getRocksDBSavepointPath() {
+               return 
"stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint";
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a4d016e/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
 
b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
new file mode 100644
index 0000000..8f22bcb
Binary files /dev/null and 
b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/7a4d016e/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-savepoint/_metadata
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-savepoint/_metadata
 
b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-savepoint/_metadata
new file mode 100644
index 0000000..8ca91ec
Binary files /dev/null and 
b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-savepoint/_metadata
 differ

Reply via email to