This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9dd6043f4e474658cd6830a011d74f0147fbcc07 Author: Sebastian Mattheis <[email protected]> AuthorDate: Fri Feb 18 10:17:59 2022 +0100 [FLINK-26176] Fix scala savepoint migration tests - Remove test parameters and artifacts for tests of rocksb state backend since artifacts are no rocksdb savepoints but actually memory state savepoints - Remove test parameters and artifacts for tests of savepoints with Flink version 1.7 or below as those are not safe to restore due to bug FLINK-10493 --- .../_metadata | Bin 41481 -> 0 bytes .../_metadata | Bin 41584 -> 0 bytes .../_metadata | Bin 41436 -> 0 bytes .../_metadata | Bin 41962 -> 0 bytes .../_metadata | Bin 42002 -> 0 bytes .../_metadata | Bin 213759 -> 0 bytes .../_metadata | Bin 213759 -> 0 bytes .../_metadata | Bin 204543 -> 0 bytes .../_metadata | Bin 204543 -> 0 bytes .../_metadata | Bin 205638 -> 0 bytes .../_metadata | Bin 205638 -> 0 bytes .../_metadata | Bin 229848 -> 0 bytes .../_metadata | Bin 229848 -> 0 bytes .../_metadata | Bin 39985 -> 0 bytes .../_metadata | Bin 40081 -> 0 bytes .../_metadata | Bin 53342 -> 0 bytes .../_metadata | Bin 53419 -> 0 bytes .../_metadata | Bin 53271 -> 0 bytes .../_metadata | Bin 53733 -> 0 bytes .../_metadata | Bin 53773 -> 0 bytes .../_metadata | Bin 220470 -> 0 bytes .../_metadata | Bin 220470 -> 0 bytes .../_metadata | Bin 253956 -> 0 bytes .../_metadata | Bin 253956 -> 0 bytes .../_metadata | Bin 287008 -> 0 bytes .../_metadata | Bin 287008 -> 0 bytes .../_metadata | Bin 51846 -> 0 bytes .../_metadata | Bin 51942 -> 0 bytes .../StatefulJobSavepointMigrationITCase.scala | 153 +++++++--------- ...StatefulJobWBroadcastStateMigrationITCase.scala | 198 ++++++++------------- 30 files changed, 144 insertions(+), 207 deletions(-) diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata deleted file mode 100644 index d6ade08..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata deleted file mode 100644 index 56d6d77..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata deleted file mode 100644 index dd2ef62..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata deleted file mode 100644 index 24636f5..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata deleted file mode 100644 index fbbf888..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata deleted file mode 100644 index e0c6da6..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata deleted file mode 100644 index f719eac..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-jobmanager-savepoint/_metadata deleted file mode 100644 index 69aa8d4..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-jobmanager-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-rocksdb-savepoint/_metadata deleted file mode 100644 index 7dcef9c..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.4-rocksdb-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata deleted file mode 100644 index 55243f3..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata deleted file mode 100644 index defe3b1..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata deleted file mode 100644 index b421e64..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata deleted file mode 100644 index 53e493c..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata deleted file mode 100644 index 3f41ed6..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata deleted file mode 100644 index 2c191ef..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata deleted file mode 100644 index e558ab9..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.10-rocksdb-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata deleted file mode 100644 index b4ffaa1..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.11-rocksdb-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata deleted file mode 100644 index e36c884..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.12-rocksdb-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata deleted file mode 100644 index 0615aa8..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.13-rocksdb-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata deleted file mode 100644 index 7436fca..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.14-rocksdb-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-jobmanager-savepoint/_metadata deleted file mode 100644 index f845ee3..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-jobmanager-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata deleted file mode 100644 index 6b05ef9..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.5-rocksdb-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata deleted file mode 100644 index b64e810..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-jobmanager-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata deleted file mode 100644 index 59bebc9..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.6-rocksdb-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata deleted file mode 100644 index 88cee1b..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-jobmanager-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata deleted file mode 100644 index e4b058a..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.7-rocksdb-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata deleted file mode 100644 index f2870a4..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.8-rocksdb-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata deleted file mode 100644 index 5f20171..0000000 Binary files a/flink-tests/src/test/resources/stateful-scala-with-broadcast-udf-migration-itcase-flink1.9-rocksdb-savepoint/_metadata and /dev/null differ diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala index c68abf3..3ff91ee 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala @@ -35,56 +35,67 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.watermark.Watermark -import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase +import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase import org.apache.flink.util.Collector import org.apache.flink.api.java.tuple.Tuple2 import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext, StateBackendLoader} import org.apache.flink.api.scala._ import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum - +import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.ExecutionMode import org.junit.runner.RunWith import org.junit.runners.Parameterized -import org.junit.{Ignore, Test} +import org.junit.Test +import java.util.stream.Collectors import scala.util.{Failure, Try} object StatefulJobSavepointMigrationITCase { + // TODO increase this to newer version to create and test snapshot migration for newer versions + val currentVersion = FlinkVersion.v1_14 + + // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots + // TODO Note: You should generate the snapshot based on the release branch instead of the + // master. + val executionMode = ExecutionMode.VERIFY_SNAPSHOT + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") def parameters: util.Collection[(FlinkVersion, String)] = { - util.Arrays.asList( - (FlinkVersion.v1_3, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - (FlinkVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_4, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - (FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - (FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_7, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), + var parameters = util.Arrays.asList( + // (FlinkVersion.v1_3, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + // (FlinkVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + // (FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + // (FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + // Note: It is not safe to restore savepoints created in a Scala applications with Flink + // version 1.7 or below. The reason is that up to version 1.7 the underlying Scala serializer + // used names of anonymous classes that depend on the relative position/order in code, e.g., + // if two anonymous classes, instantiated inside the same class and from the same base class, + // change order in the code their names are switched. + // As a consequence, changes in code may result in restore failures. + // This was fixed in version 1.8, see: https://issues.apache.org/jira/browse/FLINK-10493 (FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_8, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), (FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_9, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), (FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_10, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), (FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_11, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), (FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), (FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), (FlinkVersion.v1_14, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME), - (FlinkVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)) + (FlinkVersion.v1_15, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME), + ) + if (executionMode == ExecutionMode.CREATE_SNAPSHOT) { + parameters = parameters.stream().filter(x => x._1 == currentVersion) + .collect(Collectors.toList()) + } + parameters } - // TODO to generate savepoints for a specific Flink version / backend type, - // TODO change these values accordingly, e.g. to generate for 1.3 with RocksDB, - // TODO set as (FlinkVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME) - // TODO Note: You should generate the savepoint based on the release branch instead of the master. - val GENERATE_SAVEPOINT_VER: FlinkVersion = FlinkVersion.v1_14 - val GENERATE_SAVEPOINT_BACKEND_TYPE: String = StateBackendLoader.HASHMAP_STATE_BACKEND_NAME - val NUM_ELEMENTS = 4 + + def getSnapshotPath(migrationVersionAndBackend: (FlinkVersion, String)): String = { + s"stateful-scala-udf-migration-itcase" + + s"-flink${migrationVersionAndBackend._1}" + + s"-${migrationVersionAndBackend._2}-savepoint" + } } /** @@ -92,16 +103,15 @@ object StatefulJobSavepointMigrationITCase { */ @RunWith(classOf[Parameterized]) class StatefulJobSavepointMigrationITCase( - migrationVersionAndBackend: (FlinkVersion, String)) - extends SavepointMigrationTestBase with Serializable { + migrationVersionAndBackend: (FlinkVersion, String)) + extends SnapshotMigrationTestBase with Serializable { - @Ignore @Test - def testCreateSavepoint(): Unit = { + def testSavepoint(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE match { + migrationVersionAndBackend._2 match { case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME => env.setStateBackend(new EmbeddedRocksDBStateBackend()) case StateBackendLoader.MEMORY_STATE_BACKEND_NAME => @@ -111,7 +121,7 @@ class StatefulJobSavepointMigrationITCase( case _ => throw new UnsupportedOperationException } - env.setStateBackend(new MemoryStateBackend) + env.enableChangelogStateBackend(false) env.enableCheckpointing(500) env.setParallelism(4) env.setMaxParallelism(4) @@ -127,60 +137,29 @@ class StatefulJobSavepointMigrationITCase( .flatMap(new StatefulFlatMapper) .addSink(new AccumulatorCountingSink) - executeAndSavepoint( - env, - s"src/test/resources/stateful-scala-udf-migration-itcase-flink" + - s"${StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_VER}" + - s"-${StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE}-savepoint", - new Tuple2( - AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, - StatefulJobSavepointMigrationITCase.NUM_ELEMENTS + if (StatefulJobSavepointMigrationITCase.executionMode == ExecutionMode.CREATE_SNAPSHOT) { + executeAndSavepoint( + env, + s"src/test/resources/" + + StatefulJobSavepointMigrationITCase.getSnapshotPath(migrationVersionAndBackend), + new Tuple2( + AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, + StatefulJobSavepointMigrationITCase.NUM_ELEMENTS + ) ) - ) - } - - @Test - def testRestoreSavepoint(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - - migrationVersionAndBackend._2 match { - case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME => - env.setStateBackend(new EmbeddedRocksDBStateBackend()) - case StateBackendLoader.MEMORY_STATE_BACKEND_NAME => - env.setStateBackend(new MemoryStateBackend()) - case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME => - env.setStateBackend(new HashMapStateBackend()) - case _ => throw new UnsupportedOperationException - } - env.enableChangelogStateBackend(false); - - env.setStateBackend(new MemoryStateBackend) - env.enableCheckpointing(500) - env.setParallelism(4) - env.setMaxParallelism(4) - - env - .addSource( - new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource") - .keyBy( - new KeySelector[(Long, Long), Long] { - override def getKey(value: (Long, Long)): Long = value._1 - } + } else if ( + StatefulJobSavepointMigrationITCase.executionMode == ExecutionMode.VERIFY_SNAPSHOT) { + restoreAndExecute( + env, + SnapshotMigrationTestBase.getResourceFilename( + StatefulJobSavepointMigrationITCase.getSnapshotPath(migrationVersionAndBackend)), + new Tuple2( + AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, + StatefulJobSavepointMigrationITCase.NUM_ELEMENTS) ) - .flatMap(new StatefulFlatMapper) - .addSink(new AccumulatorCountingSink) - - restoreAndExecute( - env, - SavepointMigrationTestBase.getResourceFilename( - s"stateful-scala" + - s"-udf-migration-itcase-flink${migrationVersionAndBackend._1}" + - s"-${migrationVersionAndBackend._2}-savepoint"), - new Tuple2( - AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, - StatefulJobSavepointMigrationITCase.NUM_ELEMENTS) - ) + } else { + throw new UnsupportedOperationException("Unsupported execution mode.") + } } @SerialVersionUID(1L) @@ -190,7 +169,7 @@ class StatefulJobSavepointMigrationITCase( @SerialVersionUID(1L) private class CheckpointedSource(val numElements: Int) - extends SourceFunction[(Long, Long)] with CheckpointedFunction { + extends SourceFunction[(Long, Long)] with CheckpointedFunction { private var isRunning = true private var state: ListState[CustomCaseClass] = _ @@ -201,8 +180,8 @@ class StatefulJobSavepointMigrationITCase( ctx.getCheckpointLock synchronized { var i = 0 while (i < numElements) { - ctx.collect(i, i) - i += 1 + ctx.collect(i, i) + i += 1 } } // don't emit a final watermark so that we don't trigger the registered event-time diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala index 78e646e..47dc3ae 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala @@ -41,132 +41,73 @@ import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.watermark.Watermark -import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase +import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase +import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.ExecutionMode import org.apache.flink.util.Collector - import org.junit.runner.RunWith import org.junit.runners.Parameterized -import org.junit.{Assert, Ignore, Test} +import org.junit.{Assert, Test} +import java.util.stream.Collectors import scala.util.{Failure, Try} object StatefulJobWBroadcastStateMigrationITCase { + // TODO increase this to newer version to create and test snapshot migration for newer versions + val currentVersion = FlinkVersion.v1_14 + + // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots + // TODO Note: You should generate the snapshot based on the release branch instead of the + // master. + val executionMode = ExecutionMode.VERIFY_SNAPSHOT + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") def parameters: util.Collection[(FlinkVersion, String)] = { - util.Arrays.asList( - (FlinkVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - (FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_6, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), - (FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_7, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), + var parameters = util.Arrays.asList( + // (FlinkVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + // (FlinkVersion.v1_6, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + // (FlinkVersion.v1_7, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + // Note: It is not safe to restore savepoints created in a Scala applications with Flink + // version 1.7 or below. The reason is that up to version 1.7 the underlying Scala serializer + // used names of anonymous classes that depend on the relative position/order in code, e.g., + // if two anonymous classes, instantiated inside the same class and from the same base class, + // change order in the code their names are switched. + // As a consequence, changes in code may result in restore failures. + // This was fixed in version 1.8, see: https://issues.apache.org/jira/browse/FLINK-10493 (FlinkVersion.v1_8, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_8, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), (FlinkVersion.v1_9, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_9, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), (FlinkVersion.v1_10, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_10, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), (FlinkVersion.v1_11, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_11, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), (FlinkVersion.v1_12, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_12, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), (FlinkVersion.v1_13, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), - (FlinkVersion.v1_13, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME), (FlinkVersion.v1_14, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME), - (FlinkVersion.v1_14, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)) + (FlinkVersion.v1_15, StateBackendLoader.HASHMAP_STATE_BACKEND_NAME), + ) + if (executionMode == ExecutionMode.CREATE_SNAPSHOT) { + parameters = parameters.stream().filter(x => x._1 == currentVersion) + .collect(Collectors.toList()) + } + parameters } - // TODO to generate savepoints for a specific Flink version / backend type, - // TODO change these values accordingly, e.g. to generate for 1.3 with RocksDB, - // TODO set as (FlinkVersion.v1_3, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME) - // TODO Note: You should generate the savepoint based on the release branch instead of the master. - val GENERATE_SAVEPOINT_VER: FlinkVersion = FlinkVersion.v1_14 - val GENERATE_SAVEPOINT_BACKEND_TYPE: String = StateBackendLoader.HASHMAP_STATE_BACKEND_NAME + def getSnapshotPath(migrationVersionAndBackend: (FlinkVersion, String)): String = { + s"stateful-scala-with-broadcast-udf-migration-itcase" + + s"-flink${migrationVersionAndBackend._1}" + + s"-${migrationVersionAndBackend._2}-savepoint" + } val NUM_ELEMENTS = 4 } /** - * ITCase for migration Scala state types across different Flink versions. - */ + * ITCase for migration Scala state types across different Flink versions. + */ @RunWith(classOf[Parameterized]) -class StatefulJobWBroadcastStateMigrationITCase( - migrationVersionAndBackend: (FlinkVersion, String)) - extends SavepointMigrationTestBase with Serializable { +class StatefulJobWBroadcastStateMigrationITCase(migrationVersionAndBackend: (FlinkVersion, String)) + extends SnapshotMigrationTestBase with Serializable { @Test - @Ignore - def testCreateSavepointWithBroadcastState(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - - StatefulJobWBroadcastStateMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE match { - case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME => - env.setStateBackend(new EmbeddedRocksDBStateBackend()) - case StateBackendLoader.MEMORY_STATE_BACKEND_NAME => - env.setStateBackend(new MemoryStateBackend()) - case StateBackendLoader.HASHMAP_STATE_BACKEND_NAME => - env.setStateBackend(new HashMapStateBackend()) - case _ => throw new UnsupportedOperationException - } - env.enableChangelogStateBackend(false) - - lazy val firstBroadcastStateDesc = new MapStateDescriptor[Long, Long]( - "broadcast-state-1", - BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], - BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]) - - lazy val secondBroadcastStateDesc = new MapStateDescriptor[String, String]( - "broadcast-state-2", - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO) - - env.setStateBackend(new MemoryStateBackend) - env.enableCheckpointing(500) - env.setParallelism(4) - env.setMaxParallelism(4) - - val stream = env - .addSource( - new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource") - .keyBy( - new KeySelector[(Long, Long), Long] { - override def getKey(value: (Long, Long)): Long = value._1 - } - ) - .flatMap(new StatefulFlatMapper) - .keyBy( - new KeySelector[(Long, Long), Long] { - override def getKey(value: (Long, Long)): Long = value._1 - } - ) - - val broadcastStream = env - .addSource( - new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedBroadcastSource") - .broadcast(firstBroadcastStateDesc, secondBroadcastStateDesc) - - stream - .connect(broadcastStream) - .process(new TestBroadcastProcessFunction) - .addSink(new AccumulatorCountingSink) - - executeAndSavepoint( - env, - s"src/test/resources/stateful-scala-with-broadcast" + - s"-udf-migration-itcase-flink" + - s"${StatefulJobWBroadcastStateMigrationITCase.GENERATE_SAVEPOINT_VER}" + - s"-${StatefulJobWBroadcastStateMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE}-savepoint", - new Tuple2( - AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, - StatefulJobWBroadcastStateMigrationITCase.NUM_ELEMENTS - ) - ) - } - - @Test - def testRestoreSavepointWithBroadcast(): Unit = { + def testSavepointWithBroadcast(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) @@ -192,7 +133,6 @@ class StatefulJobWBroadcastStateMigrationITCase( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO) - env.setStateBackend(new MemoryStateBackend) env.enableCheckpointing(500) env.setParallelism(4) env.setMaxParallelism(4) @@ -217,26 +157,44 @@ class StatefulJobWBroadcastStateMigrationITCase( new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedBroadcastSource") .broadcast(firstBroadcastStateDesc, secondBroadcastStateDesc) - val expectedFirstState: Map[Long, Long] = - Map(0L -> 0L, 1L -> 1L, 2L -> 2L, 3L -> 3L) - val expectedSecondState: Map[String, String] = - Map("0" -> "0", "1" -> "1", "2" -> "2", "3" -> "3") - - stream - .connect(broadcastStream) - .process(new VerifyingBroadcastProcessFunction(expectedFirstState, expectedSecondState)) - .addSink(new AccumulatorCountingSink) - - restoreAndExecute( - env, - SavepointMigrationTestBase.getResourceFilename( - s"stateful-scala-with-broadcast" + - s"-udf-migration-itcase-flink${migrationVersionAndBackend._1}" + - s"-${migrationVersionAndBackend._2}-savepoint"), - new Tuple2( - AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, - StatefulJobWBroadcastStateMigrationITCase.NUM_ELEMENTS) - ) + if (StatefulJobWBroadcastStateMigrationITCase.executionMode == ExecutionMode.CREATE_SNAPSHOT) { + stream + .connect(broadcastStream) + .process(new TestBroadcastProcessFunction) + .addSink(new AccumulatorCountingSink) + + executeAndSavepoint( + env, + s"src/test/resources/" + + StatefulJobWBroadcastStateMigrationITCase.getSnapshotPath(migrationVersionAndBackend), + new Tuple2( + AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, + StatefulJobWBroadcastStateMigrationITCase.NUM_ELEMENTS + ) + ) + } else if ( + StatefulJobWBroadcastStateMigrationITCase.executionMode == ExecutionMode.VERIFY_SNAPSHOT) { + val expectedFirstState: Map[Long, Long] = + Map(0L -> 0L, 1L -> 1L, 2L -> 2L, 3L -> 3L) + val expectedSecondState: Map[String, String] = + Map("0" -> "0", "1" -> "1", "2" -> "2", "3" -> "3") + + stream + .connect(broadcastStream) + .process(new VerifyingBroadcastProcessFunction(expectedFirstState, expectedSecondState)) + .addSink(new AccumulatorCountingSink) + + restoreAndExecute( + env, + SnapshotMigrationTestBase.getResourceFilename( + StatefulJobWBroadcastStateMigrationITCase.getSnapshotPath(migrationVersionAndBackend)), + new Tuple2( + AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, + StatefulJobWBroadcastStateMigrationITCase.NUM_ELEMENTS) + ) + } else { + throw new UnsupportedOperationException("Unsupported execution mode.") + } } }
