gaoyunhaii commented on code in PR #21736:
URL: https://github.com/apache/flink/pull/21736#discussion_r1181138638
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase.java:
##########
@@ -60,62 +64,80 @@
* previous Flink versions, as well as for different state backends.
*/
@RunWith(Parameterized.class)
-public class StatefulJobSnapshotMigrationITCase extends
SnapshotMigrationTestBase {
+public class StatefulJobSnapshotMigrationITCase extends
SnapshotMigrationTestBase
+ implements MigrationTest {
private static final int NUM_SOURCE_ELEMENTS = 4;
- // TODO increase this to newer version to create and test snapshot
migration for newer versions
- private static final FlinkVersion currentVersion = FlinkVersion.v1_16;
+ @Parameterized.Parameters(name = "Test snapshot: {0}")
+ public static Collection<SnapshotSpec> createSpecsForTestRuns() {
+ return internalParameters(null);
+ }
+
+ public static Collection<SnapshotSpec> createSpecsForTestDataGeneration(
+ FlinkVersion targetVersion) {
+ return internalParameters(targetVersion);
+ }
- // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
- // TODO Note: You should generate the snapshot based on the release branch
instead of the
- // master.
- private static final ExecutionMode executionMode =
ExecutionMode.VERIFY_SNAPSHOT;
+ private static Collection<SnapshotSpec> internalParameters(
+ @Nullable FlinkVersion targetGeneratingVersion) {
+ BiFunction<FlinkVersion, FlinkVersion, Collection<FlinkVersion>>
getFlinkVersions =
+ (minInclVersion, maxInclVersion) -> {
+ if (targetGeneratingVersion != null) {
+ return Collections.singleton(targetGeneratingVersion);
+ } else {
+ return FlinkVersion.rangeOf(minInclVersion,
maxInclVersion);
+ }
+ };
- @Parameterized.Parameters(name = "Test snapshot: {0}")
- public static Collection<SnapshotSpec> parameters() {
Collection<SnapshotSpec> parameters = new LinkedList<>();
parameters.addAll(
SnapshotSpec.withVersions(
StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
SnapshotType.SAVEPOINT_CANONICAL,
- FlinkVersion.rangeOf(FlinkVersion.v1_8,
FlinkVersion.v1_14)));
+ getFlinkVersions.apply(FlinkVersion.v1_8,
FlinkVersion.v1_14)));
Review Comment:
That's indeed the case, I changed the implementation of `getFlinkVersions`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]