XComp commented on code in PR #21736:
URL: https://github.com/apache/flink/pull/21736#discussion_r1098832279


##########
flink-annotations/src/main/java/org/apache/flink/FlinkVersion.java:
##########
@@ -86,8 +90,12 @@ public static Optional<FlinkVersion> byCode(String code) {
         return Optional.ofNullable(CODE_MAP.get(code));
     }
 
+    public static FlinkVersion getMostRecentlyPublishedVersion() {
+        return values()[values().length - 2];
+    }
+
     /** Returns the current version. */
-    public static FlinkVersion current() {
+    public static FlinkVersion getCurrentMasterVersion() {

Review Comment:
   I guess, that's a method we cannot change because it's a public interface. 
Updating the JavaDoc would be sufficient. That's also what's causing the ci 
failure.



##########
flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/migration/MigrationTestsSnapshotGenerator.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.migration;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.test.util.MigrationTest;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;

Review Comment:
   ```suggestion
   ```
   `mvn spotless:apply` might help here



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase.java:
##########
@@ -60,62 +61,85 @@
  * previous Flink versions, as well as for different state backends.
  */
 @RunWith(Parameterized.class)
-public class StatefulJobSnapshotMigrationITCase extends 
SnapshotMigrationTestBase {
+public class StatefulJobSnapshotMigrationITCase extends 
SnapshotMigrationTestBase
+        implements MigrationTest {
 
     private static final int NUM_SOURCE_ELEMENTS = 4;
 
-    // TODO increase this to newer version to create and test snapshot 
migration for newer versions
-    private static final FlinkVersion currentVersion = FlinkVersion.v1_16;
+    @Parameterized.Parameters(name = "Test snapshot: {0}")
+    public static Collection<SnapshotSpec> createSpecsForTestRuns() {
+        return internalParameters(null);
+    }
 
-    // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
-    // TODO Note: You should generate the snapshot based on the release branch 
instead of the
-    // master.
-    private static final ExecutionMode executionMode = 
ExecutionMode.VERIFY_SNAPSHOT;
+    public static Collection<SnapshotSpec> createSpecsForTestDataGeneration(
+            FlinkVersion targetVersion) {
+        return internalParameters(targetVersion);
+    }
 
-    @Parameterized.Parameters(name = "Test snapshot: {0}")
-    public static Collection<SnapshotSpec> parameters() {
+    private static Collection<SnapshotSpec> internalParameters(
+            /* Nullable */ FlinkVersion targetGeneratingVersion) {

Review Comment:
   ```suggestion
               @Nullable FlinkVersion targetGeneratingVersion) {
   ```
   there's `javax.annotation.Nullable` that can be used instead. Using the 
annotation enables useful IDE features like warnings



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -63,19 +63,11 @@
  * Tests for checking whether {@link FlinkKafkaConsumerBase} can restore from 
snapshots that were
  * done using previous Flink versions' {@link FlinkKafkaConsumerBase}.
  *
- * <p>For regenerating the binary snapshot files run {@link #writeSnapshot()} 
on the corresponding
- * Flink release-* branch.
+ * <p>For regenerating the binary snapshot files run {@link 
#writeSnapshot(FlinkVersion)} on the
+ * corresponding Flink release-* branch.

Review Comment:
   Shouldn't we rather refer to the test data generation framework here?



##########
flink-test-utils-parent/flink-migration-test-utils/README.md:
##########
@@ -0,0 +1,93 @@
+# Add State Migration Tests
+
+This module is the tools that helps the state migration tests
+generate the snapshots of the current version before cutting branch.
+
+To add a state migration tests in one module, add the following dependency

Review Comment:
   ```suggestion
   The following dependency need to be added to the module's Maven config in 
case a migration test is meant to be added to that module:
   ```
   nit: Just a nitty proposal because the sentence mixed up singular and plural 
initially.



##########
flink-test-utils-parent/flink-migration-test-utils/README.md:
##########
@@ -0,0 +1,93 @@
+# Add State Migration Tests
+
+This module is the tools that helps the state migration tests
+generate the snapshots of the current version before cutting branch.
+
+To add a state migration tests in one module, add the following dependency
+
+```xml
+

Review Comment:
   ```suggestion
   ```
   nit: to keep the README as short as possible



##########
flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/migration/SnapshotGeneratorExecutor.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.migration;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.test.util.MigrationTest;
+
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.RunRules;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.Parameterized;
+import org.junit.runners.model.Statement;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Array;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/** A utility class to execute generateSnapshots method for migration tests. */
+class SnapshotGeneratorExecutor {
+
+    void executeGenerate(Class<?> migrationTestClass, FlinkVersion 
flinkVersion) throws Throwable {
+        for (Method method : migrationTestClass.getMethods()) {
+            if 
(method.isAnnotationPresent(MigrationTest.SnapshotsGenerator.class)) {
+                executeGenerateMethods(migrationTestClass, method, 
flinkVersion);
+            } else if (method.isAnnotationPresent(
+                    MigrationTest.ParameterizedSnapshotsGenerator.class)) {
+                String parametersMethodName =
+                        
method.getAnnotation(MigrationTest.ParameterizedSnapshotsGenerator.class)
+                                .value();
+                Method parametersMethod =
+                        migrationTestClass.getMethod(parametersMethodName, 
FlinkVersion.class);
+                executeParameterizedGenerateMethods(
+                        migrationTestClass, method, parametersMethod, 
flinkVersion);
+            }
+        }
+    }
+
+    private void executeGenerateMethods(
+            Class<?> migrationTestClass, Method method, FlinkVersion version) 
throws Throwable {
+        method.setAccessible(true);
+        List<TestRule> classRules = getRuleFields(migrationTestClass, 
ClassRule.class, null);
+
+        executeWithRules(
+                classRules,
+                new Statement() {
+                    @Override
+                    public void evaluate() throws Throwable {
+                        Object migrationTest = 
createMigrationTest(migrationTestClass);
+                        List<TestRule> rules =
+                                getRuleFields(migrationTestClass, Rule.class, 
migrationTest);
+                        executeWithRules(
+                                rules,
+                                new Statement() {
+                                    @Override
+                                    public void evaluate() throws Throwable {
+                                        method.invoke(migrationTest, version);
+                                    }
+                                });
+                    }
+                });
+    }
+
+    private void executeParameterizedGenerateMethods(
+            Class<?> migrationTestClass,
+            Method method,
+            Method parametersMethod,
+            FlinkVersion version)
+            throws Throwable {
+        method.setAccessible(true);
+        parametersMethod.setAccessible(true);
+        List<TestRule> classRules = getRuleFields(migrationTestClass, 
ClassRule.class, null);
+
+        executeWithRules(
+                classRules,
+                new Statement() {
+                    @Override
+                    public void evaluate() throws Throwable {
+                        Object migrationTest = 
createMigrationTest(migrationTestClass);
+                        List<TestRule> rules =
+                                getRuleFields(migrationTestClass, Rule.class, 
migrationTest);
+                        Collection<?> arguments =
+                                (Collection<?>) 
parametersMethod.invoke(migrationTest, version);
+                        for (Object argument : arguments) {
+                            executeWithRules(
+                                    rules,
+                                    new Statement() {
+                                        @Override
+                                        public void evaluate() throws 
Throwable {
+                                            method.invoke(migrationTest, 
version, argument);

Review Comment:
   I'm not sure whether we actually need the version as a parameter here. 
Almost all of the test data generation methods have the FlinkVersion as a 
"unused parameter". The only class where we utilize this parameter is 
`TypeSerializerUpgradeTestBase`. But there, we pass this version to 
`createTestSpecifications` as well where it is used as an input parameter to 
each of the `TestSpecification` instances that are then used to call the actual 
test data genetation method (in the marked line, it's the `argument` 
parameter). Therefore, even in `TypeSerializerUpgradeTestBase` the flink 
version is present through the `TestSpecification` parameter. tbh, I haven't 
checked all the `TypeSerializerUpgradeTestBase` subclasses. But it is true for 
the ~5 classes I checked. Could we get rid of this parameter?



##########
flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java:
##########
@@ -64,4 +64,8 @@ public void createRestoredJob(StreamExecutionEnvironment env) 
{
         SingleOutputStreamOperator<Integer> third =
                 createThirdStatefulMap(ExecutionMode.RESTORE, stateless2);
     }
+
+    /** Will use the state generate by {@link ChainBreakTest}. */
+    @Override
+    public void generateSnapshots(FlinkVersion targetVersion) throws Exception 
{}

Review Comment:
   Having all these empty classes feels odd. I didn't go through the code in 
detail. But it looks like we need to have it implemented by `ChainBreakTest` 
and been disabled by all the others so that the 
`AbstractOperatorRestoreTestBase.generateSnapshots` method is only called once? 
Could we extract the data generation out of `AbstractOperatorRestoreTestBase`, 
instead? :thinking: 



##########
flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala:
##########
@@ -75,43 +75,65 @@ object StatefulJobWBroadcastStateMigrationITCase {
       SnapshotSpec.withVersions(
         StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
         SnapshotType.SAVEPOINT_CANONICAL,
-        FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_13)))
+        if (targetGeneratingVersion != null)

Review Comment:
   Defining a local BiFunction would shorten the code even more
   ```
       var getFlinkVersions =
         new BiFunction[FlinkVersion, FlinkVersion, 
util.Collection[FlinkVersion]] {
           override def apply(
               minInclVersion: FlinkVersion,
               maxInclVersion: FlinkVersion): util.Set[FlinkVersion] = if (
             targetGeneratingVersion != null
           )
             Collections.singleton(targetGeneratingVersion)
           else
             FlinkVersion.rangeOf(minInclVersion, maxInclVersion)
         }
   ```
   All the changes below could be replaced with a `getFlinkVersions` which 
reduces the redundant code.



##########
flink-core/pom.xml:
##########
@@ -18,148 +18,203 @@ specific language governing permissions and limitations
 under the License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-parent</artifactId>
-               <version>1.17-SNAPSHOT</version>
-       </parent>
-
-       <artifactId>flink-core</artifactId>
-       <name>Flink : Core</name>
-
-       <packaging>jar</packaging>
-
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-annotations</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-metrics-core</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-shaded-asm-9</artifactId>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-shaded-jackson</artifactId>
-               </dependency>
-
-               <!-- standard utilities -->
-               <dependency>
-                       <groupId>org.apache.commons</groupId>
-                       <artifactId>commons-lang3</artifactId>
-                       <!-- managed version -->
-               </dependency>
-
-               <!-- standard utilities -->
-               <dependency>
-                       <groupId>org.apache.commons</groupId>
-                       <artifactId>commons-text</artifactId>
-                       <!-- managed version -->
-               </dependency>
-
-               <!-- for the fallback generic serializer -->
-               <dependency>
-                       <groupId>com.esotericsoftware.kryo</groupId>
-                       <artifactId>kryo</artifactId>
-                       <!-- managed version -->
-               </dependency>
-
-               <!-- The common collections are needed for some hash tables 
used in the collection execution -->
-               <dependency>
-                       <groupId>commons-collections</groupId>
-                       <artifactId>commons-collections</artifactId>
-                       <!-- managed version -->
-               </dependency>
-
-               <!-- Commons compression, for additional decompressors -->
-               <dependency>
-                       <groupId>org.apache.commons</groupId>
-                       <artifactId>commons-compress</artifactId>
-                       <!-- managed version -->
-               </dependency>
-               <dependency>
-                       <groupId>com.github.luben</groupId>
-                       <artifactId>zstd-jni</artifactId>
-                       <version>1.4.9-1</version>
-                       <scope>test</scope>
-               </dependency>
-
-
-               <!-- Ratelimiting dependencies -->
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-shaded-guava</artifactId>
-               </dependency>
-
-               <!-- ================== test dependencies ================== -->
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-test-utils-junit</artifactId>
-               </dependency>
-
-               <!-- Joda, jackson, and lombok are used to test that 
serialization and type extraction
-                       work with types from those libraries -->
-
-               <dependency>
-                       <groupId>joda-time</groupId>
-                       <artifactId>joda-time</artifactId>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.joda</groupId>
-                       <artifactId>joda-convert</artifactId>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.projectlombok</groupId>
-                       <artifactId>lombok</artifactId>
-                       <version>1.18.22</version>
-                       <scope>test</scope>
-               </dependency>
-
-       </dependencies>
-
-       <build>
-               <plugins>
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-checkstyle-plugin</artifactId>
-
-                               <configuration>
-                                       <suppressionsLocation 
combine.self="override">/tools/maven/suppressions-core.xml</suppressionsLocation>
-                               </configuration>
-                       </plugin>
-
-                       <!-- publish some test base classes -->
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-jar-plugin</artifactId>
-                               <executions>
-                                       <execution>
-                                               <goals>
-                                                       <goal>test-jar</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                       </plugin>
-               </plugins>
-
-       </build>
-
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-parent</artifactId>
+        <version>1.17-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-core</artifactId>
+    <name>Flink : Core</name>
+
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-annotations</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-metrics-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-shaded-asm-9</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-shaded-jackson</artifactId>
+        </dependency>
+
+        <!-- standard utilities -->
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <!-- managed version -->
+        </dependency>
+
+        <!-- standard utilities -->
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-text</artifactId>
+            <!-- managed version -->
+        </dependency>
+
+        <!-- for the fallback generic serializer -->
+        <dependency>
+            <groupId>com.esotericsoftware.kryo</groupId>
+            <artifactId>kryo</artifactId>
+            <!-- managed version -->
+        </dependency>
+
+        <!-- The common collections are needed for some hash tables used in 
the collection execution -->
+        <dependency>
+            <groupId>commons-collections</groupId>
+            <artifactId>commons-collections</artifactId>
+            <!-- managed version -->
+        </dependency>
+
+        <!-- Commons compression, for additional decompressors -->
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-compress</artifactId>
+            <!-- managed version -->
+        </dependency>
+        <dependency>
+            <groupId>com.github.luben</groupId>
+            <artifactId>zstd-jni</artifactId>
+            <version>1.4.9-1</version>
+            <scope>test</scope>
+        </dependency>
+
+
+        <!-- Ratelimiting dependencies -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-shaded-guava</artifactId>
+        </dependency>
+
+        <!-- ================== test dependencies ================== -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils-junit</artifactId>
+        </dependency>
+
+        <!-- Joda, jackson, and lombok are used to test that serialization and 
type extraction
+            work with types from those libraries -->
+
+        <dependency>
+            <groupId>joda-time</groupId>
+            <artifactId>joda-time</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.joda</groupId>
+            <artifactId>joda-convert</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>1.18.22</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>fink-migration-test-utils</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+
+                <configuration>
+                    <suppressionsLocation 
combine.self="override">/tools/maven/suppressions-core.xml
+                    </suppressionsLocation>

Review Comment:
   Why this change?



##########
flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/migration/MigrationTestsSnapshotGenerator.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.migration;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.test.util.MigrationTest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * This class aims to generate the snapshots for all the state migration 
tests. A migration tests
+ * should implement {@link MigrationTest} interface and its name should match 
{@code
+ * *(Test|ITCase)(.java|.scala)}. For scala tests, we also require the class 
name is the same with
+ * the containing filename.
+ */
+public class MigrationTestsSnapshotGenerator {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(MigrationTestsSnapshotGenerator.class);
+
+    private static final String[] DEFAULT_PATH_PREFIXES =
+            new String[] {"src/test/java", "src/test/scala"};
+
+    private static final Pattern VERSION_PATTERN = Pattern.compile("" + 
"v?([0-9]+)[._]([0-9]+)");
+
+    private static final String CLASS_NAME_GROUP = "className";
+    private static final Pattern CLASS_NAME_PATTERN =
+            Pattern.compile("(?<" + CLASS_NAME_GROUP + 
">[a-zA-Z0-9]*(Test|ITCase))(.java|.scala)");
+
+    public static void main(String[] args) {
+        try {
+            if (args.length < 2) {
+                throw new IllegalArgumentException(
+                        "Usage: java MigrationTestsSnapshotGenerator [project 
root path] [version]");
+            }
+
+            File rootDirectory = new File(args[0]);
+            if (!rootDirectory.exists() || !rootDirectory.isDirectory()) {
+                throw new FileNotFoundException(
+                        rootDirectory + " does not exist or is not a 
directory.");
+            }
+
+            String versionName = args[1];
+            Matcher versionMatcher = VERSION_PATTERN.matcher(versionName);
+            if (!versionMatcher.matches()) {
+                throw new IllegalArgumentException(
+                        "Version "
+                                + versionName
+                                + "could not be parsed, "
+                                + "please specify the version with format like 
1.17, 1_17, v1_17, v1.17");
+            }
+            String normalizedVersionName =
+                    "v" + versionMatcher.group(1) + "_" + 
versionMatcher.group(2);
+            FlinkVersion version = FlinkVersion.valueOf(normalizedVersionName);
+
+            LOG.info("Start generating for module {} and version {}", 
rootDirectory, version);
+            SnapshotGeneratorExecutor executor = new 
SnapshotGeneratorExecutor();
+            List<Class<?>> migrationTests = 
findMigrationTests(rootDirectory.getAbsolutePath());
+            for (Class<?> migrationTestClass : migrationTests) {
+                LOG.info("Start generating for {}", 
migrationTestClass.getName());
+                executor.executeGenerate(migrationTestClass, version);
+                LOG.info("Finish generating for {}", 
migrationTestClass.getName());
+            }
+
+            // Avoids leaking threads blocks the process.
+            System.exit(0);

Review Comment:
   Is the `${generated.classes}` property which you use in the pom files a 
Maven property? I struggled to find anything about it online? :thinking: 



##########
flink-test-utils-parent/flink-migration-test-utils/README.md:
##########
@@ -0,0 +1,93 @@
+# Add State Migration Tests
+
+This module is the tools that helps the state migration tests
+generate the snapshots of the current version before cutting branch.

Review Comment:
   ```suggestion
   This module collects tools that help to generate test data for the state 
migration tests.
   ```
   I'm not sure whether we should mention here when the tool should be used. 
This is subject for the release documentation. We should avoid having redundant 
documentation in various places because it makes it easier to create into 
inconsistent documentation by not changing all the locations when updating the 
docs.



##########
flink-test-utils-parent/flink-migration-test-utils/README.md:
##########
@@ -0,0 +1,93 @@
+# Add State Migration Tests
+
+This module is the tools that helps the state migration tests
+generate the snapshots of the current version before cutting branch.
+
+To add a state migration tests in one module, add the following dependency
+
+```xml
+
+<dependency>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>fink-migration-test-utils</artifactId>
+    <version>${project.version}</version>
+    <scope>test</scope>
+</dependency>
+```
+
+and the following profile
+
+```xml
+<profile>
+    <id>generate-snapshots</id>

Review Comment:
   ```suggestion
       <id>generate-migration-test-data</id>
   ```
   I feel like `generate-snapshots` is quite generic (especially in the context 
of Maven). WDYT? Renaming it here would also imply the other code locations



##########
flink-test-utils-parent/flink-migration-test-utils/README.md:
##########
@@ -0,0 +1,93 @@
+# Add State Migration Tests
+
+This module is the tools that helps the state migration tests
+generate the snapshots of the current version before cutting branch.
+
+To add a state migration tests in one module, add the following dependency
+
+```xml
+
+<dependency>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>fink-migration-test-utils</artifactId>
+    <version>${project.version}</version>
+    <scope>test</scope>
+</dependency>
+```
+
+and the following profile
+
+```xml
+<profile>
+    <id>generate-snapshots</id>
+    <activation>
+        <property>
+            <name>generate-snapshots</name>
+        </property>
+    </activation>
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>generate-snapshots</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                        <configuration>
+                            <target>
+                                <condition property="optional.classes" 
value="--classes '${generate.classes}'"
+                                           else="">
+                                    <isset property="generate.classes"/>
+                                </condition>
+                                <condition property="optional.prefixes"
+                                           value="--prefixes 
'${generate.prefixes}'" else="">
+                                    <isset property="generate.prefixes"/>
+                                </condition>
+                                <java 
classname="org.apache.flink.test.migration.MigrationTestsSnapshotGenerator"
+                                      fork="true" failonerror="true" 
dir="${project.basedir}">
+                                    <classpath refid="maven.test.classpath"/>
+                                    <arg value="--dir"/>
+                                    <arg line="${project.basedir}"/>
+                                    <arg value="--version"/>
+                                    <arg value="${generate.version}"/>
+                                    <arg line="${optional.classes}"/>
+                                    <arg line="${optional.prefixes}"/>
+                                </java>
+                            </target>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</profile>
+```
+
+To show the log during generating, add
+
+```
+logger.migration.name = org.apache.flink.test.migration
+logger.migration.level = INFO
+```
+
+to the `log4j2-test.properties` of this module.
+
+The state migration tests should satisfy
+
+1. The tests are named like `*(Test|ITCase).(java|scala)`.
+2. The test class name is the same with the file name.
+3. The test implements `org.apache.flink.test.util.MigrationTest` and the 
snapshots generator methods is labeled
+   with `@SnapshotsGenerato` or `@ParameterizedSnapshotsGenerator`.
+
+# Generating Snapshots
+
+To generating the snapshots for all the tests, execute
+
+```shell
+mvn clean package -Pgenerate-snapshots -Dgenerate.version=1.17 -nsu -DskipRat 
-Dcheckstyle.skip -Drat.ignoreErrors=true -DspotlessFiles=ineffective -Dfast 
-DskipTests

Review Comment:
   ```suggestion
   mvn clean package -Pgenerate-snapshots -Dgenerate.version=1.17 -nsu -Dfast 
-DskipTests
   ```
   `-Dfast` does skip spotless, rat and checkstyle (see `fast` profile 
definition in 
[pom.xml:1079ff](https://github.com/apache/flink/blob/a9151c42100ec09388d8052c7aa9f77f82efe469/pom.xml#L1079))



##########
flink-test-utils-parent/flink-migration-test-utils/README.md:
##########
@@ -0,0 +1,93 @@
+# Add State Migration Tests
+
+This module is the tools that helps the state migration tests
+generate the snapshots of the current version before cutting branch.
+
+To add a state migration tests in one module, add the following dependency
+
+```xml
+
+<dependency>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>fink-migration-test-utils</artifactId>
+    <version>${project.version}</version>
+    <scope>test</scope>
+</dependency>
+```
+
+and the following profile
+
+```xml
+<profile>
+    <id>generate-snapshots</id>
+    <activation>
+        <property>
+            <name>generate-snapshots</name>
+        </property>
+    </activation>
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>generate-snapshots</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                        <configuration>
+                            <target>
+                                <condition property="optional.classes" 
value="--classes '${generate.classes}'"
+                                           else="">
+                                    <isset property="generate.classes"/>
+                                </condition>
+                                <condition property="optional.prefixes"
+                                           value="--prefixes 
'${generate.prefixes}'" else="">
+                                    <isset property="generate.prefixes"/>
+                                </condition>
+                                <java 
classname="org.apache.flink.test.migration.MigrationTestsSnapshotGenerator"
+                                      fork="true" failonerror="true" 
dir="${project.basedir}">
+                                    <classpath refid="maven.test.classpath"/>
+                                    <arg value="--dir"/>
+                                    <arg line="${project.basedir}"/>
+                                    <arg value="--version"/>
+                                    <arg value="${generate.version}"/>
+                                    <arg line="${optional.classes}"/>
+                                    <arg line="${optional.prefixes}"/>
+                                </java>
+                            </target>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</profile>
+```
+
+To show the log during generating, add
+
+```
+logger.migration.name = org.apache.flink.test.migration
+logger.migration.level = INFO
+```
+
+to the `log4j2-test.properties` of this module.
+
+The state migration tests should satisfy
+
+1. The tests are named like `*(Test|ITCase).(java|scala)`.
+2. The test class name is the same with the file name.
+3. The test implements `org.apache.flink.test.util.MigrationTest` and the 
snapshots generator methods is labeled

Review Comment:
   ```suggestion
   3. The test implements `org.apache.flink.test.util.MigrationTest` and the 
snapshots generator method is labeled
   ```



##########
flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala:
##########
@@ -38,78 +38,101 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase
 import 
org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.{ExecutionMode,
 SnapshotSpec, SnapshotType}
+import org.apache.flink.test.util.MigrationTest
+import org.apache.flink.test.util.MigrationTest.ParameterizedSnapshotsGenerator
 import org.apache.flink.util.Collector
 
 import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
 import java.util
-import java.util.stream.Collectors
+import java.util.Collections
 
 import scala.util.{Failure, Try}
 
 object StatefulJobSavepointMigrationITCase {
 
-  // TODO increase this to newer version to create and test snapshot migration 
for newer versions
-  val currentVersion = FlinkVersion.v1_16
+  @Parameterized.Parameters(name = "Test snapshot: {0}")
+  def createSpecsForTestRuns: util.Collection[SnapshotSpec] =
+    internalParameters(null)
 
-  // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
-  // TODO Note: You should generate the snapshot based on the release branch 
instead of the
-  // master.
-  val executionMode = ExecutionMode.VERIFY_SNAPSHOT
+  def createSpecsForTestDataGeneration(version: FlinkVersion): 
util.Collection[SnapshotSpec] =
+    internalParameters(version)
 
-  @Parameterized.Parameters(name = "Test snapshot: {0}")
-  def parameters: util.Collection[SnapshotSpec] = {
+  private def internalParameters(
+      targetGeneratingVersion: FlinkVersion): util.Collection[SnapshotSpec] = {
     // Note: It is not safe to restore savepoints created in a Scala 
applications with Flink
     // version 1.7 or below. The reason is that up to version 1.7 the 
underlying Scala serializer
     // used names of anonymous classes that depend on the relative 
position/order in code, e.g.,
     // if two anonymous classes, instantiated inside the same class and from 
the same base class,
     // change order in the code their names are switched.
     // As a consequence, changes in code may result in restore failures.
     // This was fixed in version 1.8, see: 
https://issues.apache.org/jira/browse/FLINK-10493
-    var parameters: util.List[SnapshotSpec] = new 
util.LinkedList[SnapshotSpec]()
+    val parameters: util.List[SnapshotSpec] = new 
util.LinkedList[SnapshotSpec]()
     parameters.addAll(
       SnapshotSpec.withVersions(
         StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
         SnapshotType.SAVEPOINT_CANONICAL,
-        FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_13)))
+        if (targetGeneratingVersion != null)

Review Comment:
   A local callback would help here as well to reduce the code redundancy



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase.java:
##########
@@ -60,62 +61,85 @@
  * previous Flink versions, as well as for different state backends.
  */
 @RunWith(Parameterized.class)
-public class StatefulJobSnapshotMigrationITCase extends 
SnapshotMigrationTestBase {
+public class StatefulJobSnapshotMigrationITCase extends 
SnapshotMigrationTestBase
+        implements MigrationTest {
 
     private static final int NUM_SOURCE_ELEMENTS = 4;
 
-    // TODO increase this to newer version to create and test snapshot 
migration for newer versions
-    private static final FlinkVersion currentVersion = FlinkVersion.v1_16;
+    @Parameterized.Parameters(name = "Test snapshot: {0}")
+    public static Collection<SnapshotSpec> createSpecsForTestRuns() {
+        return internalParameters(null);
+    }
 
-    // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
-    // TODO Note: You should generate the snapshot based on the release branch 
instead of the
-    // master.
-    private static final ExecutionMode executionMode = 
ExecutionMode.VERIFY_SNAPSHOT;
+    public static Collection<SnapshotSpec> createSpecsForTestDataGeneration(
+            FlinkVersion targetVersion) {
+        return internalParameters(targetVersion);
+    }
 
-    @Parameterized.Parameters(name = "Test snapshot: {0}")
-    public static Collection<SnapshotSpec> parameters() {
+    private static Collection<SnapshotSpec> internalParameters(
+            /* Nullable */ FlinkVersion targetGeneratingVersion) {
         Collection<SnapshotSpec> parameters = new LinkedList<>();
         parameters.addAll(
                 SnapshotSpec.withVersions(
                         StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
                         SnapshotType.SAVEPOINT_CANONICAL,
-                        FlinkVersion.rangeOf(FlinkVersion.v1_8, 
FlinkVersion.v1_14)));
+                        targetGeneratingVersion != null

Review Comment:
   This could be a local BiFunction callback. All the ternary operator calls 
could be replaced in that case



##########
flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java:
##########
@@ -57,4 +57,10 @@ public void createRestoredJob(StreamExecutionEnvironment 
env) {
         SingleOutputStreamOperator<Integer> third =
                 createThirdStatefulMap(ExecutionMode.RESTORE, stateless);
     }
+
+    /** Will use the state generate by {@link ChainBreakTest}. */
+    @Override
+    public void generateSnapshots(FlinkVersion targetVersion) throws Exception 
{
+        System.out.println("I replace the old one");

Review Comment:
   ```suggestion
   ```



##########
flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java:
##########
@@ -66,7 +58,22 @@
      * Creates a collection of {@link TestSpecification} which will be used as 
input for
      * parametrized tests.
      */
-    public abstract Collection<TestSpecification<?, ?>> 
createTestSpecifications() throws Exception;
+    public abstract Collection<TestSpecification<?, ?>> 
createTestSpecifications(

Review Comment:
   Are we sure that we're not changing the behavior here? It looks like in the 
old implementations created specifications for each MIGRATION_VERSIONS entry. 
With the new implementation, we only do it for the version specified as a 
parameter in `MigrationTestsSnapshotGenerator`. :thinking: I might be wrong 
here. I just tried to reason it based on `CompositeTypeSerializerUpgradeTest`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to