XComp commented on code in PR #21736:
URL: https://github.com/apache/flink/pull/21736#discussion_r1142950463


##########
flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/migration/MigrationTestsSnapshotGenerator.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.migration;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.test.util.MigrationTest;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * This class aims to generate the snapshots for all the state migration 
tests. A migration test
+ * should implement {@link MigrationTest} interface and its name should match 
{@code
+ * *(Test|ITCase)(.java|.scala)}. For scala tests, we also require the class 
name is the same with
+ * the containing filename.
+ */
+public class MigrationTestsSnapshotGenerator {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(MigrationTestsSnapshotGenerator.class);
+
+    private static final String DEFAULT_PATH_PREFIXES = 
"src/test/java,src/test/scala";
+
+    private static final Option OPTION_HELP =
+            Option.builder()
+                    .longOpt("help")
+                    .required(false)
+                    .hasArg(false)
+                    .desc("print this help information.")
+                    .build();
+
+    private static final Option OPTION_DIR =
+            Option.builder()
+                    .longOpt("dir")
+                    .required()
+                    .hasArg()
+                    .desc("The root directory for scanning. Required.")
+                    .build();
+
+    private static final Option OPTION_VERSION =
+            Option.builder()
+                    .longOpt("version")
+                    .required()
+                    .hasArg()
+                    .desc("The version to generate. Required.")
+                    .build();
+
+    private static final Option OPTION_PREFIXES =
+            Option.builder()
+                    .longOpt("prefixes")
+                    .required(false)
+                    .hasArg()
+                    .desc(
+                            "The prefix paths to scan under the root 
directory, separated by \",\". Default to \""
+                                    + DEFAULT_PATH_PREFIXES
+                                    + "\"")
+                    .build();
+
+    private static final Option OPTION_CLASSES =
+            Option.builder()
+                    .longOpt("classes")
+                    .required(false)
+                    .hasArg()
+                    .desc(
+                            "The specified qualified class name to generate 
test data, "
+                                    + "separated by \",\". This option has a 
higher priority than the prefix option.")
+                    .build();
+
+    private static final Pattern VERSION_PATTERN = 
Pattern.compile("v?([0-9]+)[._]([0-9]+)");
+
+    private static final String CLASS_NAME_GROUP = "className";
+    private static final Pattern CLASS_NAME_PATTERN =
+            Pattern.compile("(?<" + CLASS_NAME_GROUP + 
">[a-zA-Z0-9]*(Test|ITCase))(.java|.scala)");
+
+    public static void main(String[] args) {
+        for (String s : args) {
+            if (s.equals("--" + OPTION_HELP.getLongOpt())) {
+                HelpFormatter helpFormatter = new HelpFormatter();
+                helpFormatter.setOptionComparator(null);
+                helpFormatter.printHelp(
+                        "java " + 
MigrationTestsSnapshotGenerator.class.getName(), createOptions());
+                return;
+            }
+        }
+
+        try {
+            Options options = createOptions();
+            CommandLineParser parser = new DefaultParser();
+            CommandLine commandLine = parser.parse(options, args);
+
+            File rootDirectory = new 
File(commandLine.getOptionValue(OPTION_DIR));
+            if (!rootDirectory.exists() || !rootDirectory.isDirectory()) {
+                throw new FileNotFoundException(
+                        rootDirectory + " does not exist or is not a 
directory.");
+            }
+
+            String versionName = commandLine.getOptionValue(OPTION_VERSION);
+            Matcher versionMatcher = VERSION_PATTERN.matcher(versionName);
+            if (!versionMatcher.matches()) {
+                throw new IllegalArgumentException(
+                        "Version "
+                                + versionName
+                                + "could not be parsed, "
+                                + "please specify the version with format like 
1.17, 1_17, v1_17, v1.17");
+            }
+            FlinkVersion version =
+                    FlinkVersion.valueOf(
+                            Integer.parseInt(versionMatcher.group(1)),
+                            Integer.parseInt(versionMatcher.group(2)));
+
+            LOG.info(
+                    "Start test data generating for module {} and version {}",
+                    rootDirectory,
+                    version);
+            List<Class<?>> migrationTests;
+            if (commandLine.hasOption(OPTION_CLASSES)) {
+                migrationTests =
+                        loadSpecifiedMigrationTests(
+                                
commandLine.getOptionValue(OPTION_CLASSES).split(",\\s*"));
+            } else {
+                String[] prefixes =
+                        commandLine
+                                .getOptionValue(OPTION_PREFIXES, 
DEFAULT_PATH_PREFIXES)
+                                .split(",\\s*");
+                migrationTests = 
findMigrationTests(rootDirectory.getAbsolutePath(), prefixes);
+            }
+
+            for (Class<?> migrationTestClass : migrationTests) {
+                LOG.info("Start test data generating for {}", 
migrationTestClass.getName());
+                SnapshotGeneratorUtils.executeGenerate(migrationTestClass, 
version);
+                LOG.info("Finish test data generating for {}", 
migrationTestClass.getName());
+            }
+        } catch (Throwable e) {

Review Comment:
   What's the value of translating the `Throwable` into a `RuntimeException` 
which is then thrown anyway? I guess, we could get rid of the try/catch block 
entirely, here. WDYT?



##########
flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/migration/MigrationTestsSnapshotGenerator.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.migration;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.test.util.MigrationTest;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * This class aims to generate the snapshots for all the state migration 
tests. A migration test
+ * should implement {@link MigrationTest} interface and its name should match 
{@code
+ * *(Test|ITCase)(.java|.scala)}. For scala tests, we also require the class 
name is the same with
+ * the containing filename.
+ */
+public class MigrationTestsSnapshotGenerator {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(MigrationTestsSnapshotGenerator.class);
+
+    private static final String DEFAULT_PATH_PREFIXES = 
"src/test/java,src/test/scala";
+
+    private static final Option OPTION_HELP =
+            Option.builder()
+                    .longOpt("help")
+                    .required(false)
+                    .hasArg(false)
+                    .desc("print this help information.")
+                    .build();
+
+    private static final Option OPTION_DIR =
+            Option.builder()
+                    .longOpt("dir")
+                    .required()
+                    .hasArg()
+                    .desc("The root directory for scanning. Required.")
+                    .build();
+
+    private static final Option OPTION_VERSION =
+            Option.builder()
+                    .longOpt("version")
+                    .required()
+                    .hasArg()
+                    .desc("The version to generate. Required.")
+                    .build();
+
+    private static final Option OPTION_PREFIXES =
+            Option.builder()
+                    .longOpt("prefixes")
+                    .required(false)
+                    .hasArg()
+                    .desc(
+                            "The prefix paths to scan under the root 
directory, separated by \",\". Default to \""
+                                    + DEFAULT_PATH_PREFIXES
+                                    + "\"")
+                    .build();
+
+    private static final Option OPTION_CLASSES =
+            Option.builder()
+                    .longOpt("classes")
+                    .required(false)
+                    .hasArg()
+                    .desc(
+                            "The specified qualified class name to generate 
test data, "
+                                    + "separated by \",\". This option has a 
higher priority than the prefix option.")
+                    .build();
+
+    private static final Pattern VERSION_PATTERN = 
Pattern.compile("v?([0-9]+)[._]([0-9]+)");
+
+    private static final String CLASS_NAME_GROUP = "className";
+    private static final Pattern CLASS_NAME_PATTERN =
+            Pattern.compile("(?<" + CLASS_NAME_GROUP + 
">[a-zA-Z0-9]*(Test|ITCase))(.java|.scala)");
+
+    public static void main(String[] args) {
+        for (String s : args) {
+            if (s.equals("--" + OPTION_HELP.getLongOpt())) {
+                HelpFormatter helpFormatter = new HelpFormatter();
+                helpFormatter.setOptionComparator(null);
+                helpFormatter.printHelp(
+                        "java " + 
MigrationTestsSnapshotGenerator.class.getName(), createOptions());
+                return;
+            }
+        }

Review Comment:
   That for loop can be removed. Instead, you could just use 
`commandLine.hasOption(OPTION_HELP)` utilizing the proper parameter parsing.



##########
flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/migration/MigrationTestsSnapshotGenerator.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.migration;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.test.util.MigrationTest;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * This class aims to generate the snapshots for all the state migration 
tests. A migration test
+ * should implement {@link MigrationTest} interface and its name should match 
{@code
+ * *(Test|ITCase)(.java|.scala)}. For scala tests, we also require the class 
name is the same with
+ * the containing filename.
+ */
+public class MigrationTestsSnapshotGenerator {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(MigrationTestsSnapshotGenerator.class);
+
+    private static final String DEFAULT_PATH_PREFIXES = 
"src/test/java,src/test/scala";
+
+    private static final Option OPTION_HELP =
+            Option.builder()
+                    .longOpt("help")
+                    .required(false)
+                    .hasArg(false)
+                    .desc("print this help information.")
+                    .build();
+
+    private static final Option OPTION_DIR =
+            Option.builder()
+                    .longOpt("dir")
+                    .required()
+                    .hasArg()
+                    .desc("The root directory for scanning. Required.")
+                    .build();
+
+    private static final Option OPTION_VERSION =
+            Option.builder()
+                    .longOpt("version")
+                    .required()
+                    .hasArg()
+                    .desc("The version to generate. Required.")
+                    .build();
+
+    private static final Option OPTION_PREFIXES =
+            Option.builder()
+                    .longOpt("prefixes")
+                    .required(false)
+                    .hasArg()
+                    .desc(
+                            "The prefix paths to scan under the root 
directory, separated by \",\". Default to \""
+                                    + DEFAULT_PATH_PREFIXES
+                                    + "\"")
+                    .build();
+
+    private static final Option OPTION_CLASSES =
+            Option.builder()
+                    .longOpt("classes")
+                    .required(false)
+                    .hasArg()
+                    .desc(
+                            "The specified qualified class name to generate 
test data, "
+                                    + "separated by \",\". This option has a 
higher priority than the prefix option.")
+                    .build();
+
+    private static final Pattern VERSION_PATTERN = 
Pattern.compile("v?([0-9]+)[._]([0-9]+)");
+
+    private static final String CLASS_NAME_GROUP = "className";
+    private static final Pattern CLASS_NAME_PATTERN =
+            Pattern.compile("(?<" + CLASS_NAME_GROUP + 
">[a-zA-Z0-9]*(Test|ITCase))(.java|.scala)");
+
+    public static void main(String[] args) {
+        for (String s : args) {
+            if (s.equals("--" + OPTION_HELP.getLongOpt())) {
+                HelpFormatter helpFormatter = new HelpFormatter();
+                helpFormatter.setOptionComparator(null);
+                helpFormatter.printHelp(
+                        "java " + 
MigrationTestsSnapshotGenerator.class.getName(), createOptions());
+                return;
+            }
+        }
+
+        try {
+            Options options = createOptions();
+            CommandLineParser parser = new DefaultParser();
+            CommandLine commandLine = parser.parse(options, args);
+
+            File rootDirectory = new 
File(commandLine.getOptionValue(OPTION_DIR));
+            if (!rootDirectory.exists() || !rootDirectory.isDirectory()) {
+                throw new FileNotFoundException(
+                        rootDirectory + " does not exist or is not a 
directory.");
+            }
+
+            String versionName = commandLine.getOptionValue(OPTION_VERSION);
+            Matcher versionMatcher = VERSION_PATTERN.matcher(versionName);
+            if (!versionMatcher.matches()) {
+                throw new IllegalArgumentException(
+                        "Version "
+                                + versionName
+                                + "could not be parsed, "
+                                + "please specify the version with format like 
1.17, 1_17, v1_17, v1.17");
+            }
+            FlinkVersion version =
+                    FlinkVersion.valueOf(
+                            Integer.parseInt(versionMatcher.group(1)),
+                            Integer.parseInt(versionMatcher.group(2)));
+
+            LOG.info(
+                    "Start test data generating for module {} and version {}",
+                    rootDirectory,
+                    version);
+            List<Class<?>> migrationTests;
+            if (commandLine.hasOption(OPTION_CLASSES)) {
+                migrationTests =
+                        loadSpecifiedMigrationTests(
+                                
commandLine.getOptionValue(OPTION_CLASSES).split(",\\s*"));
+            } else {
+                String[] prefixes =
+                        commandLine
+                                .getOptionValue(OPTION_PREFIXES, 
DEFAULT_PATH_PREFIXES)
+                                .split(",\\s*");
+                migrationTests = 
findMigrationTests(rootDirectory.getAbsolutePath(), prefixes);
+            }
+
+            for (Class<?> migrationTestClass : migrationTests) {
+                LOG.info("Start test data generating for {}", 
migrationTestClass.getName());
+                SnapshotGeneratorUtils.executeGenerate(migrationTestClass, 
version);
+                LOG.info("Finish test data generating for {}", 
migrationTestClass.getName());
+            }
+        } catch (Throwable e) {
+            LOG.error("Failed to generate snapshots for the state migration 
tests", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static Options createOptions() {
+        Options options = new Options();
+
+        // The root directory for the scanning
+        options.addOption(OPTION_HELP);
+        options.addOption(OPTION_DIR);
+        options.addOption(OPTION_VERSION);
+        options.addOption(OPTION_PREFIXES);
+        options.addOption(OPTION_CLASSES);
+        return options;
+    }
+
+    private static List<Class<?>> loadSpecifiedMigrationTests(String[] 
specifiedClassNames)
+            throws Exception {
+        List<Class<?>> migrationTestClasses = new ArrayList<>();
+        for (String name : specifiedClassNames) {
+            Class<?> clazz = Class.forName(name);
+            if (!MigrationTest.class.isAssignableFrom(clazz)) {
+                throw new IllegalArgumentException("Class " + name + " is not 
migration test.");
+            }
+            migrationTestClasses.add(clazz);
+        }
+        return migrationTestClasses;
+    }
+
+    private static List<Class<?>> findMigrationTests(String rootDirectory, 
String[] prefixes)
+            throws Exception {
+        List<Class<?>> migrationTestClasses = new ArrayList<>();
+        for (String prefix : prefixes) {
+            recursivelyFindMigrationTests(rootDirectory, prefix, "", 
migrationTestClasses);
+        }
+
+        return migrationTestClasses;
+    }
+
+    private static void recursivelyFindMigrationTests(
+            String rootDirectory, String prefix, String packageName, 
List<Class<?>> result)
+            throws Exception {
+        File codeDirectory = new File(rootDirectory + File.separator + prefix);
+        if (!codeDirectory.exists()) {
+            return;
+        }
+
+        // Search all the directories
+        try (DirectoryStream<Path> stream = 
Files.newDirectoryStream(codeDirectory.toPath())) {
+            for (Path entry : stream) {
+                File file = entry.toFile();
+                if (file.isDirectory()) {
+                    recursivelyFindMigrationTests(
+                            rootDirectory,
+                            prefix + File.separator + file.getName(),
+                            packageName.isEmpty()
+                                    ? file.getName()
+                                    : packageName + "." + file.getName(),
+                            result);
+                } else {
+                    Matcher m = CLASS_NAME_PATTERN.matcher(file.getName());
+                    if (m.matches()) {
+                        String className = packageName + "." + 
m.group(CLASS_NAME_GROUP);

Review Comment:
   Is this correct? We would generate a ".ABCTest" class name if `packageName` 
is empty. :thinking: 



##########
flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/migration/SnapshotGeneratorUtils.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.migration;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.test.util.MigrationTest;
+
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.RunRules;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.Parameterized;
+import org.junit.runners.model.Statement;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Array;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/** A utility class to execute generateSnapshots method for migration tests. */
+class SnapshotGeneratorUtils {
+
+    static void executeGenerate(Class<?> migrationTestClass, FlinkVersion 
flinkVersion)
+            throws Throwable {
+        for (Method method : migrationTestClass.getMethods()) {
+            if 
(method.isAnnotationPresent(MigrationTest.SnapshotsGenerator.class)) {
+                executeGenerateMethods(migrationTestClass, method, 
flinkVersion);
+            } else if (method.isAnnotationPresent(
+                    MigrationTest.ParameterizedSnapshotsGenerator.class)) {
+                String parametersMethodName =
+                        
method.getAnnotation(MigrationTest.ParameterizedSnapshotsGenerator.class)
+                                .value();
+                Method parametersMethod =
+                        migrationTestClass.getMethod(parametersMethodName, 
FlinkVersion.class);
+                executeParameterizedGenerateMethods(
+                        migrationTestClass, method, parametersMethod, 
flinkVersion);
+            }
+        }
+    }
+
+    private static void executeGenerateMethods(
+            Class<?> migrationTestClass, Method method, FlinkVersion version) 
throws Throwable {
+        method.setAccessible(true);
+        List<TestRule> classRules = getRuleFields(migrationTestClass, 
ClassRule.class, null);
+
+        executeWithRules(
+                classRules,
+                new Statement() {
+                    @Override
+                    public void evaluate() throws Throwable {
+                        Object migrationTest = 
createMigrationTest(migrationTestClass);
+                        List<TestRule> rules =
+                                getRuleFields(migrationTestClass, Rule.class, 
migrationTest);
+                        executeWithRules(
+                                rules,
+                                new Statement() {
+                                    @Override
+                                    public void evaluate() throws Throwable {
+                                        method.invoke(migrationTest, version);
+                                    }
+                                });
+                    }
+                });
+    }
+
+    private static void executeParameterizedGenerateMethods(
+            Class<?> migrationTestClass,
+            Method method,
+            Method parametersMethod,
+            FlinkVersion version)
+            throws Throwable {
+        method.setAccessible(true);
+        parametersMethod.setAccessible(true);
+        List<TestRule> classRules = getRuleFields(migrationTestClass, 
ClassRule.class, null);
+
+        executeWithRules(
+                classRules,
+                new Statement() {
+                    @Override
+                    public void evaluate() throws Throwable {
+                        Object migrationTest = 
createMigrationTest(migrationTestClass);
+                        List<TestRule> rules =
+                                getRuleFields(migrationTestClass, Rule.class, 
migrationTest);
+                        Collection<?> arguments =
+                                (Collection<?>) 
parametersMethod.invoke(migrationTest, version);
+                        for (Object argument : arguments) {
+                            executeWithRules(
+                                    rules,
+                                    new Statement() {
+                                        @Override
+                                        public void evaluate() throws 
Throwable {
+                                            method.invoke(migrationTest, 
argument);
+                                        }
+                                    });
+                        }
+                    }
+                });
+    }
+
+    private static void executeWithRules(List<TestRule> rules, Statement 
statement)
+            throws Throwable {
+        new RunRules(statement, rules, Description.EMPTY).evaluate();
+    }
+
+    private static List<TestRule> getRuleFields(
+            Class<?> migrationTestClass, Class<? extends Annotation> tagClass, 
Object migrationTest)
+            throws IllegalAccessException {
+        List<TestRule> rules = new ArrayList<>();
+
+        Field[] fields = migrationTestClass.getFields();
+        for (Field field : fields) {
+            if (ExternalResource.class.isAssignableFrom(field.getType())
+                    && field.isAnnotationPresent(tagClass)) {
+                field.setAccessible(true);
+                rules.add((ExternalResource) field.get(migrationTest));
+            }
+        }
+
+        return rules;
+    }
+
+    private static Object createMigrationTest(Class<?> migrationTestClass) 
throws Exception {
+        Constructor<?>[] constructors = 
migrationTestClass.getDeclaredConstructors();
+        for (Constructor<?> constructor : constructors) {
+            if (constructor.getParameterCount() == 0) {
+                constructor.setAccessible(true);
+                return constructor.newInstance();
+            }
+        }
+
+        // This class does not have a constructor without argument.
+        Constructor<?> constructor = constructors[0];
+        constructor.setAccessible(true);
+
+        // Check if we could find method labeled with @Parameterized.Parameters
+        for (Method method : migrationTestClass.getMethods()) {
+            if (Modifier.isStatic(method.getModifiers())
+                    && 
method.isAnnotationPresent(Parameterized.Parameters.class)) {
+                Object argumentLists = method.invoke(null);
+                if (argumentLists instanceof Collection) {
+                    return constructor.newInstance(
+                            ((Collection<?>) argumentLists).iterator().next());
+                } else if (argumentLists.getClass().isArray()) {
+                    return constructor.newInstance(Array.get(argumentLists, 
0));
+                } else {
+                    throw new RuntimeException(
+                            "Failed to create parameterized class object due 
to argument lists type not supported: "
+                                    + argumentLists.getClass());
+                }
+            }
+        }
+
+        throw new RuntimeException("Could not create the object for " + 
migrationTestClass);

Review Comment:
   ```suggestion
           throw new RuntimeException("Could not create the object for " + 
migrationTestClass + ": No default constructor or @Parameterized.Parameters 
method found.");
   ```



##########
flink-test-utils-parent/flink-migration-test-utils/README.md:
##########
@@ -0,0 +1,103 @@
+# Add State Migration Tests
+
+This module collects tools that help to generate test data for the state 
migration tests.
+
+The following dependency need to be added to the module's Maven config in case 
a
+migration test is meant to be added to that module:
+
+```xml
+<dependency>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>fink-migration-test-utils</artifactId>
+    <version>${project.version}</version>
+    <scope>test</scope>
+</dependency>
+```
+
+and the following profile
+
+```xml
+<profile>
+    <id>generate-migration-test-data</id>
+    <activation>
+        <property>
+            <name>generate-migration-test-data</name>
+        </property>
+    </activation>
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>generate-migration-test-data</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                        <configuration>
+                            <target>
+                                <condition property="optional.classes" 
value="--classes '${generate.classes}'"
+                                           else="">
+                                    <isset property="generate.classes"/>
+                                </condition>
+                                <condition property="optional.prefixes"
+                                           value="--prefixes 
'${generate.prefixes}'" else="">
+                                    <isset property="generate.prefixes"/>
+                                </condition>
+                                <java 
classname="org.apache.flink.test.migration.MigrationTestsSnapshotGenerator"
+                                      fork="true" failonerror="true" 
dir="${project.basedir}">
+                                    <classpath refid="maven.test.classpath"/>
+                                    <arg value="--dir"/>
+                                    <arg line="${project.basedir}"/>
+                                    <arg value="--version"/>
+                                    <arg value="${generate.version}"/>
+                                    <arg line="${optional.classes}"/>
+                                    <arg line="${optional.prefixes}"/>
+                                </java>
+                            </target>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</profile>
+```
+
+To show the log during generating, add
+
+```
+logger.migration.name = org.apache.flink.test.migration
+logger.migration.level = INFO
+```
+
+to the `log4j2-test.properties` of this module.
+
+The state migration tests should satisfy
+
+1. The tests are named like `*(Test|ITCase).(java|scala)`.
+2. The test class name is the same with the file name.
+3. The test implements `org.apache.flink.test.util.MigrationTest` and the 
snapshots generator methods are labeled
+   with `@SnapshotsGenerato` or `@ParameterizedSnapshotsGenerator`.

Review Comment:
   ```suggestion
      with `@SnapshotsGenerator` or `@ParameterizedSnapshotsGenerator`.
   ```



##########
flink-test-utils-parent/flink-migration-test-utils/README.md:
##########
@@ -0,0 +1,103 @@
+# Add State Migration Tests
+
+This module collects tools that help to generate test data for the state 
migration tests.
+
+The following dependency need to be added to the module's Maven config in case 
a
+migration test is meant to be added to that module:
+
+```xml
+<dependency>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>fink-migration-test-utils</artifactId>
+    <version>${project.version}</version>
+    <scope>test</scope>
+</dependency>
+```
+
+and the following profile
+
+```xml
+<profile>
+    <id>generate-migration-test-data</id>
+    <activation>
+        <property>
+            <name>generate-migration-test-data</name>
+        </property>
+    </activation>
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>generate-migration-test-data</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                        <configuration>
+                            <target>
+                                <condition property="optional.classes" 
value="--classes '${generate.classes}'"
+                                           else="">
+                                    <isset property="generate.classes"/>
+                                </condition>
+                                <condition property="optional.prefixes"
+                                           value="--prefixes 
'${generate.prefixes}'" else="">
+                                    <isset property="generate.prefixes"/>
+                                </condition>
+                                <java 
classname="org.apache.flink.test.migration.MigrationTestsSnapshotGenerator"
+                                      fork="true" failonerror="true" 
dir="${project.basedir}">
+                                    <classpath refid="maven.test.classpath"/>
+                                    <arg value="--dir"/>
+                                    <arg line="${project.basedir}"/>
+                                    <arg value="--version"/>
+                                    <arg value="${generate.version}"/>
+                                    <arg line="${optional.classes}"/>
+                                    <arg line="${optional.prefixes}"/>
+                                </java>
+                            </target>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</profile>
+```
+
+To show the log during generating, add
+
+```
+logger.migration.name = org.apache.flink.test.migration
+logger.migration.level = INFO
+```
+
+to the `log4j2-test.properties` of this module.
+
+The state migration tests should satisfy
+
+1. The tests are named like `*(Test|ITCase).(java|scala)`.
+2. The test class name is the same with the file name.
+3. The test implements `org.apache.flink.test.util.MigrationTest` and the 
snapshots generator methods are labeled
+   with `@SnapshotsGenerato` or `@ParameterizedSnapshotsGenerator`.
+
+# Generating Snapshots
+
+To generating the snapshots for all the tests, execute
+
+```shell
+mvn clean package -Pgenerate-migration-test-data -Dgenerate.version=1.17 -nsu 
-Dfast -DskipTests
+```
+
+The version should be replaced with the target one.

Review Comment:
   ```suggestion
   The version (in the command above `1.17`) should be replaced with the target 
one.
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationOperatorTest.java:
##########
@@ -52,9 +51,6 @@ public String getOperatorSnapshotPath(FlinkVersion version) {
                 + "-snapshot";
     }
 
-    @Ignore
     @Override
-    public void writeSnapshot() throws Exception {
-        throw new UnsupportedOperationException();
-    }
+    public void writeSnapshot(FlinkVersion targetVersion) throws Exception {}

Review Comment:
   Why do we need to override this method here? Is it because this class is 
inhering `MigrationTest` and therefore becomes eligible for test data 
generation as well? Overwriting this method would, therefore, disable the data 
generation because we're relying on the test data generation in 
`FlinkKafkaProducerMigrationTest`?! We might want to add a comment to the 
method's body here mentioning the reasoning.



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationOperatorTest.java:
##########
@@ -20,15 +20,14 @@
 
 import org.apache.flink.FlinkVersion;
 
-import org.junit.Ignore;
 import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
 import java.util.Collection;
 
 /**
  * Migration test from FlinkKafkaProducer011 operator. This test depends on 
the resource generated
- * by {@link FlinkKafkaProducer011MigrationTest#writeSnapshot()}.
+ * by {@link 
FlinkKafkaProducerMigrationOperatorTest#writeSnapshot(FlinkVersion)}.

Review Comment:
   This sounds wrong? :thinking: 



##########
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java:
##########
@@ -309,17 +288,34 @@ public void testMonitoringSourceRestore() throws 
Exception {
 
         testHarness.setup();
 
-        testHarness.initializeState(
-                OperatorSnapshotUtil.getResourceFilename(
-                        "monitoring-function-migration-test-"
-                                + expectedModTime
-                                + "-flink"
-                                + testMigrateVersion
-                                + "-snapshot"));
+        // Get the exact filename
+        Tuple2<String, Long> fileNameAndModTime = 
getResourceFilename(testMigrateVersion);
+
+        testHarness.initializeState(fileNameAndModTime.f0);
 
         testHarness.open();
 
-        Assert.assertEquals((long) expectedModTime, 
monitoringFunction.getGlobalModificationTime());
+        Assert.assertEquals(
+                fileNameAndModTime.f1.longValue(), 
monitoringFunction.getGlobalModificationTime());
+    }
+
+    private Tuple2<String, Long> getResourceFilename(FlinkVersion version) 
throws IOException {
+        String resourceDirectory = 
OperatorSnapshotUtil.getResourceFilename("");
+        Pattern fileNamePattern =
+                Pattern.compile(
+                        "monitoring-function-migration-test-(\\d+)-flink"
+                                + version.toString()
+                                + "-snapshot");
+
+        for (java.nio.file.Path file : 
Files.newDirectoryStream(Paths.get(resourceDirectory))) {
+            String fileName = file.getFileName().toString();
+            Matcher matcher = fileNamePattern.matcher(fileName);
+            if (matcher.matches()) {
+                return new Tuple2<>(file.toString(), 
Long.parseLong(matcher.group(1)));
+            }
+        }
+
+        throw new RuntimeException("The snapshot for " + version + " not 
found");

Review Comment:
   ```suggestion
           throw new IllegalArgumentException("The snapshot for " + version + " 
not found");
   ```
   `IllegalArgumentException` seems to be a better fit here. WDYT?



##########
flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/migration/MigrationTestsSnapshotGenerator.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.migration;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.test.util.MigrationTest;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * This class aims to generate the snapshots for all the state migration 
tests. A migration test
+ * should implement {@link MigrationTest} interface and its name should match 
{@code
+ * *(Test|ITCase)(.java|.scala)}. For scala tests, we also require the class 
name is the same with
+ * the containing filename.
+ */
+public class MigrationTestsSnapshotGenerator {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(MigrationTestsSnapshotGenerator.class);
+
+    private static final String DEFAULT_PATH_PREFIXES = 
"src/test/java,src/test/scala";
+
+    private static final Option OPTION_HELP =
+            Option.builder()
+                    .longOpt("help")
+                    .required(false)
+                    .hasArg(false)
+                    .desc("print this help information.")
+                    .build();
+
+    private static final Option OPTION_DIR =
+            Option.builder()
+                    .longOpt("dir")
+                    .required()
+                    .hasArg()
+                    .desc("The root directory for scanning. Required.")
+                    .build();
+
+    private static final Option OPTION_VERSION =
+            Option.builder()
+                    .longOpt("version")
+                    .required()
+                    .hasArg()
+                    .desc("The version to generate. Required.")
+                    .build();
+
+    private static final Option OPTION_PREFIXES =
+            Option.builder()
+                    .longOpt("prefixes")
+                    .required(false)
+                    .hasArg()
+                    .desc(
+                            "The prefix paths to scan under the root 
directory, separated by \",\". Default to \""
+                                    + DEFAULT_PATH_PREFIXES
+                                    + "\"")
+                    .build();
+
+    private static final Option OPTION_CLASSES =
+            Option.builder()
+                    .longOpt("classes")
+                    .required(false)
+                    .hasArg()
+                    .desc(
+                            "The specified qualified class name to generate 
test data, "
+                                    + "separated by \",\". This option has a 
higher priority than the prefix option.")
+                    .build();
+
+    private static final Pattern VERSION_PATTERN = 
Pattern.compile("v?([0-9]+)[._]([0-9]+)");
+
+    private static final String CLASS_NAME_GROUP = "className";
+    private static final Pattern CLASS_NAME_PATTERN =
+            Pattern.compile("(?<" + CLASS_NAME_GROUP + 
">[a-zA-Z0-9]*(Test|ITCase))(.java|.scala)");
+
+    public static void main(String[] args) {
+        for (String s : args) {
+            if (s.equals("--" + OPTION_HELP.getLongOpt())) {
+                HelpFormatter helpFormatter = new HelpFormatter();
+                helpFormatter.setOptionComparator(null);
+                helpFormatter.printHelp(
+                        "java " + 
MigrationTestsSnapshotGenerator.class.getName(), createOptions());
+                return;
+            }
+        }
+
+        try {
+            Options options = createOptions();
+            CommandLineParser parser = new DefaultParser();
+            CommandLine commandLine = parser.parse(options, args);
+
+            File rootDirectory = new 
File(commandLine.getOptionValue(OPTION_DIR));
+            if (!rootDirectory.exists() || !rootDirectory.isDirectory()) {
+                throw new FileNotFoundException(
+                        rootDirectory + " does not exist or is not a 
directory.");
+            }
+
+            String versionName = commandLine.getOptionValue(OPTION_VERSION);
+            Matcher versionMatcher = VERSION_PATTERN.matcher(versionName);
+            if (!versionMatcher.matches()) {
+                throw new IllegalArgumentException(
+                        "Version "
+                                + versionName
+                                + "could not be parsed, "
+                                + "please specify the version with format like 
1.17, 1_17, v1_17, v1.17");
+            }
+            FlinkVersion version =
+                    FlinkVersion.valueOf(
+                            Integer.parseInt(versionMatcher.group(1)),
+                            Integer.parseInt(versionMatcher.group(2)));
+
+            LOG.info(
+                    "Start test data generating for module {} and version {}",
+                    rootDirectory,
+                    version);
+            List<Class<?>> migrationTests;
+            if (commandLine.hasOption(OPTION_CLASSES)) {
+                migrationTests =
+                        loadSpecifiedMigrationTests(
+                                
commandLine.getOptionValue(OPTION_CLASSES).split(",\\s*"));
+            } else {
+                String[] prefixes =
+                        commandLine
+                                .getOptionValue(OPTION_PREFIXES, 
DEFAULT_PATH_PREFIXES)
+                                .split(",\\s*");
+                migrationTests = 
findMigrationTests(rootDirectory.getAbsolutePath(), prefixes);
+            }
+
+            for (Class<?> migrationTestClass : migrationTests) {
+                LOG.info("Start test data generating for {}", 
migrationTestClass.getName());
+                SnapshotGeneratorUtils.executeGenerate(migrationTestClass, 
version);
+                LOG.info("Finish test data generating for {}", 
migrationTestClass.getName());
+            }
+        } catch (Throwable e) {
+            LOG.error("Failed to generate snapshots for the state migration 
tests", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static Options createOptions() {
+        Options options = new Options();
+
+        // The root directory for the scanning
+        options.addOption(OPTION_HELP);
+        options.addOption(OPTION_DIR);
+        options.addOption(OPTION_VERSION);
+        options.addOption(OPTION_PREFIXES);
+        options.addOption(OPTION_CLASSES);
+        return options;
+    }
+
+    private static List<Class<?>> loadSpecifiedMigrationTests(String[] 
specifiedClassNames)
+            throws Exception {
+        List<Class<?>> migrationTestClasses = new ArrayList<>();
+        for (String name : specifiedClassNames) {
+            Class<?> clazz = Class.forName(name);
+            if (!MigrationTest.class.isAssignableFrom(clazz)) {
+                throw new IllegalArgumentException("Class " + name + " is not 
migration test.");
+            }
+            migrationTestClasses.add(clazz);
+        }
+        return migrationTestClasses;
+    }
+
+    private static List<Class<?>> findMigrationTests(String rootDirectory, 
String[] prefixes)
+            throws Exception {
+        List<Class<?>> migrationTestClasses = new ArrayList<>();
+        for (String prefix : prefixes) {
+            recursivelyFindMigrationTests(rootDirectory, prefix, "", 
migrationTestClasses);
+        }
+
+        return migrationTestClasses;
+    }
+
+    private static void recursivelyFindMigrationTests(
+            String rootDirectory, String prefix, String packageName, 
List<Class<?>> result)
+            throws Exception {
+        File codeDirectory = new File(rootDirectory + File.separator + prefix);
+        if (!codeDirectory.exists()) {
+            return;
+        }
+
+        // Search all the directories
+        try (DirectoryStream<Path> stream = 
Files.newDirectoryStream(codeDirectory.toPath())) {
+            for (Path entry : stream) {
+                File file = entry.toFile();
+                if (file.isDirectory()) {
+                    recursivelyFindMigrationTests(
+                            rootDirectory,
+                            prefix + File.separator + file.getName(),
+                            packageName.isEmpty()
+                                    ? file.getName()
+                                    : packageName + "." + file.getName(),
+                            result);
+                } else {
+                    Matcher m = CLASS_NAME_PATTERN.matcher(file.getName());
+                    if (m.matches()) {
+                        String className = packageName + "." + 
m.group(CLASS_NAME_GROUP);
+
+                        // TODO: For now we require the class name to match 
the file name
+                        // for both java and scala.

Review Comment:
   ```suggestion
                           // For now we require the class name to match the 
file name for both java
                           // and scala.
   ```
   What's the purpose of the TODO here? Shall we create a ticket instead? Or 
would it be ok to just remove it?



##########
flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/migration/MigrationTestsSnapshotGenerator.java:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.migration;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.test.util.MigrationTest;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * This class aims to generate the snapshots for all the state migration 
tests. A migration test
+ * should implement {@link MigrationTest} interface and its name should match 
{@code
+ * *(Test|ITCase)(.java|.scala)}. For scala tests, we also require the class 
name is the same with
+ * the containing filename.
+ */
+public class MigrationTestsSnapshotGenerator {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(MigrationTestsSnapshotGenerator.class);
+
+    private static final String DEFAULT_PATH_PREFIXES = 
"src/test/java,src/test/scala";
+
+    private static final Option OPTION_HELP =
+            Option.builder()
+                    .longOpt("help")
+                    .required(false)
+                    .hasArg(false)
+                    .desc("print this help information.")
+                    .build();
+
+    private static final Option OPTION_DIR =
+            Option.builder()
+                    .longOpt("dir")
+                    .required()
+                    .hasArg()
+                    .desc("The root directory for scanning. Required.")
+                    .build();
+
+    private static final Option OPTION_VERSION =
+            Option.builder()
+                    .longOpt("version")
+                    .required()
+                    .hasArg()
+                    .desc("The version to generate. Required.")
+                    .build();
+
+    private static final Option OPTION_PREFIXES =
+            Option.builder()
+                    .longOpt("prefixes")
+                    .required(false)
+                    .hasArg()
+                    .desc(
+                            "The prefix paths to scan under the root 
directory, separated by \",\". Default to \""
+                                    + DEFAULT_PATH_PREFIXES
+                                    + "\"")
+                    .build();
+
+    private static final Option OPTION_CLASSES =
+            Option.builder()
+                    .longOpt("classes")
+                    .required(false)
+                    .hasArg()
+                    .desc(
+                            "The specified qualified class name to generate 
test data, "
+                                    + "separated by \",\". This option has a 
higher priority than the prefix option.")
+                    .build();
+
+    private static final Pattern VERSION_PATTERN = 
Pattern.compile("v?([0-9]+)[._]([0-9]+)");
+
+    private static final String CLASS_NAME_GROUP = "className";
+    private static final Pattern CLASS_NAME_PATTERN =
+            Pattern.compile("(?<" + CLASS_NAME_GROUP + 
">[a-zA-Z0-9]*(Test|ITCase))(.java|.scala)");
+
+    public static void main(String[] args) {
+        for (String s : args) {
+            if (s.equals("--" + OPTION_HELP.getLongOpt())) {
+                HelpFormatter helpFormatter = new HelpFormatter();
+                helpFormatter.setOptionComparator(null);
+                helpFormatter.printHelp(
+                        "java " + 
MigrationTestsSnapshotGenerator.class.getName(), createOptions());
+                return;
+            }
+        }
+
+        try {
+            Options options = createOptions();
+            CommandLineParser parser = new DefaultParser();
+            CommandLine commandLine = parser.parse(options, args);
+
+            File rootDirectory = new 
File(commandLine.getOptionValue(OPTION_DIR));
+            if (!rootDirectory.exists() || !rootDirectory.isDirectory()) {
+                throw new FileNotFoundException(
+                        rootDirectory + " does not exist or is not a 
directory.");
+            }
+
+            String versionName = commandLine.getOptionValue(OPTION_VERSION);
+            Matcher versionMatcher = VERSION_PATTERN.matcher(versionName);
+            if (!versionMatcher.matches()) {
+                throw new IllegalArgumentException(
+                        "Version "
+                                + versionName
+                                + "could not be parsed, "
+                                + "please specify the version with format like 
1.17, 1_17, v1_17, v1.17");
+            }
+            FlinkVersion version =
+                    FlinkVersion.valueOf(
+                            Integer.parseInt(versionMatcher.group(1)),
+                            Integer.parseInt(versionMatcher.group(2)));
+
+            LOG.info(
+                    "Start test data generating for module {} and version {}",
+                    rootDirectory,
+                    version);
+            List<Class<?>> migrationTests;
+            if (commandLine.hasOption(OPTION_CLASSES)) {
+                migrationTests =
+                        loadSpecifiedMigrationTests(
+                                
commandLine.getOptionValue(OPTION_CLASSES).split(",\\s*"));
+            } else {
+                String[] prefixes =
+                        commandLine
+                                .getOptionValue(OPTION_PREFIXES, 
DEFAULT_PATH_PREFIXES)
+                                .split(",\\s*");
+                migrationTests = 
findMigrationTests(rootDirectory.getAbsolutePath(), prefixes);
+            }
+
+            for (Class<?> migrationTestClass : migrationTests) {
+                LOG.info("Start test data generating for {}", 
migrationTestClass.getName());
+                SnapshotGeneratorUtils.executeGenerate(migrationTestClass, 
version);
+                LOG.info("Finish test data generating for {}", 
migrationTestClass.getName());
+            }
+        } catch (Throwable e) {
+            LOG.error("Failed to generate snapshots for the state migration 
tests", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static Options createOptions() {
+        Options options = new Options();
+
+        // The root directory for the scanning
+        options.addOption(OPTION_HELP);
+        options.addOption(OPTION_DIR);
+        options.addOption(OPTION_VERSION);
+        options.addOption(OPTION_PREFIXES);
+        options.addOption(OPTION_CLASSES);
+        return options;
+    }
+
+    private static List<Class<?>> loadSpecifiedMigrationTests(String[] 
specifiedClassNames)
+            throws Exception {
+        List<Class<?>> migrationTestClasses = new ArrayList<>();
+        for (String name : specifiedClassNames) {
+            Class<?> clazz = Class.forName(name);
+            if (!MigrationTest.class.isAssignableFrom(clazz)) {
+                throw new IllegalArgumentException("Class " + name + " is not 
migration test.");
+            }
+            migrationTestClasses.add(clazz);
+        }
+        return migrationTestClasses;
+    }
+
+    private static List<Class<?>> findMigrationTests(String rootDirectory, 
String[] prefixes)
+            throws Exception {
+        List<Class<?>> migrationTestClasses = new ArrayList<>();
+        for (String prefix : prefixes) {
+            recursivelyFindMigrationTests(rootDirectory, prefix, "", 
migrationTestClasses);
+        }
+
+        return migrationTestClasses;
+    }
+
+    private static void recursivelyFindMigrationTests(
+            String rootDirectory, String prefix, String packageName, 
List<Class<?>> result)
+            throws Exception {
+        File codeDirectory = new File(rootDirectory + File.separator + prefix);
+        if (!codeDirectory.exists()) {
+            return;

Review Comment:
   ```suggestion
               LOG.debug("{} doesn't exist and will be skipped.", 
codeDirectory);
               return;
   ```
   nit: as a proposal



##########
flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala:
##########
@@ -38,78 +38,95 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase
 import 
org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.{ExecutionMode,
 SnapshotSpec, SnapshotType}
+import org.apache.flink.test.util.MigrationTest
+import org.apache.flink.test.util.MigrationTest.ParameterizedSnapshotsGenerator
 import org.apache.flink.util.Collector
 
 import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
+import javax.annotation.Nullable
+
 import java.util
-import java.util.stream.Collectors
+import java.util.Collections
+import java.util.function.BiFunction
 
 import scala.util.{Failure, Try}
 
 object StatefulJobSavepointMigrationITCase {
 
-  // TODO increase this to newer version to create and test snapshot migration 
for newer versions
-  val currentVersion = FlinkVersion.v1_16
-
-  // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
-  // TODO Note: You should generate the snapshot based on the release branch 
instead of the
-  // master.
-  val executionMode = ExecutionMode.VERIFY_SNAPSHOT
-
   @Parameterized.Parameters(name = "Test snapshot: {0}")
-  def parameters: util.Collection[SnapshotSpec] = {
+  def createSpecsForTestRuns: util.Collection[SnapshotSpec] =
+    internalParameters(null)
+
+  def createSpecsForTestDataGeneration(version: FlinkVersion): 
util.Collection[SnapshotSpec] =
+    internalParameters(version)
+
+  private def internalParameters(
+      @Nullable targetGeneratingVersion: FlinkVersion): 
util.Collection[SnapshotSpec] = {
+    val getFlinkVersions =
+      new BiFunction[FlinkVersion, FlinkVersion, 
util.Collection[FlinkVersion]] {
+        override def apply(
+            minInclVersion: FlinkVersion,
+            maxInclVersion: FlinkVersion): util.Collection[FlinkVersion] = if (
+          targetGeneratingVersion != null
+        )
+          Collections.singleton(targetGeneratingVersion)
+        else
+          FlinkVersion.rangeOf(minInclVersion, maxInclVersion)
+      }
+
     // Note: It is not safe to restore savepoints created in a Scala 
applications with Flink
     // version 1.7 or below. The reason is that up to version 1.7 the 
underlying Scala serializer
     // used names of anonymous classes that depend on the relative 
position/order in code, e.g.,
     // if two anonymous classes, instantiated inside the same class and from 
the same base class,
     // change order in the code their names are switched.
     // As a consequence, changes in code may result in restore failures.
     // This was fixed in version 1.8, see: 
https://issues.apache.org/jira/browse/FLINK-10493
-    var parameters: util.List[SnapshotSpec] = new 
util.LinkedList[SnapshotSpec]()
+    val parameters: util.List[SnapshotSpec] = new 
util.LinkedList[SnapshotSpec]()
     parameters.addAll(
       SnapshotSpec.withVersions(
         StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
         SnapshotType.SAVEPOINT_CANONICAL,
-        FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_13)))
+        getFlinkVersions.apply(FlinkVersion.v1_8, FlinkVersion.v1_13)

Review Comment:
   Here as well: The old code filters for the most-recently published version. 
The new code doesn't do that anymore.



##########
flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java:
##########
@@ -104,16 +123,32 @@ public abstract class AbstractOperatorRestoreTestBase 
extends TestLogger {
                             .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
                             .build());
 
-    private final boolean allowNonRestoredState;
     private final ScheduledExecutor scheduledExecutor =
             new 
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor());
 
-    protected AbstractOperatorRestoreTestBase() {
-        this(true);
+    protected AbstractOperatorRestoreTestBase(FlinkVersion flinkVersion) {
+        this.flinkVersion = flinkVersion;
     }
 
-    protected AbstractOperatorRestoreTestBase(boolean allowNonRestoredState) {
-        this.allowNonRestoredState = allowNonRestoredState;
+    protected void internalGenerateSnapshots(FlinkVersion targetVersion) 
throws Exception {

Review Comment:
   I'm a bit puzzled here: Where is the data generation been done in the old 
code? :thinking: 



##########
flink-test-utils-parent/flink-migration-test-utils/README.md:
##########
@@ -0,0 +1,103 @@
+# Add State Migration Tests
+
+This module collects tools that help to generate test data for the state 
migration tests.
+
+The following dependency need to be added to the module's Maven config in case 
a
+migration test is meant to be added to that module:
+
+```xml
+<dependency>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>fink-migration-test-utils</artifactId>
+    <version>${project.version}</version>
+    <scope>test</scope>
+</dependency>
+```
+
+and the following profile
+
+```xml
+<profile>
+    <id>generate-migration-test-data</id>
+    <activation>
+        <property>
+            <name>generate-migration-test-data</name>
+        </property>
+    </activation>
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>generate-migration-test-data</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                        <configuration>
+                            <target>
+                                <condition property="optional.classes" 
value="--classes '${generate.classes}'"
+                                           else="">
+                                    <isset property="generate.classes"/>
+                                </condition>
+                                <condition property="optional.prefixes"
+                                           value="--prefixes 
'${generate.prefixes}'" else="">
+                                    <isset property="generate.prefixes"/>
+                                </condition>
+                                <java 
classname="org.apache.flink.test.migration.MigrationTestsSnapshotGenerator"
+                                      fork="true" failonerror="true" 
dir="${project.basedir}">
+                                    <classpath refid="maven.test.classpath"/>
+                                    <arg value="--dir"/>
+                                    <arg line="${project.basedir}"/>
+                                    <arg value="--version"/>
+                                    <arg value="${generate.version}"/>
+                                    <arg line="${optional.classes}"/>
+                                    <arg line="${optional.prefixes}"/>
+                                </java>
+                            </target>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</profile>
+```
+
+To show the log during generating, add
+
+```
+logger.migration.name = org.apache.flink.test.migration
+logger.migration.level = INFO
+```
+
+to the `log4j2-test.properties` of this module.
+
+The state migration tests should satisfy
+
+1. The tests are named like `*(Test|ITCase).(java|scala)`.
+2. The test class name is the same with the file name.
+3. The test implements `org.apache.flink.test.util.MigrationTest` and the 
snapshots generator methods are labeled
+   with `@SnapshotsGenerato` or `@ParameterizedSnapshotsGenerator`.
+
+# Generating Snapshots
+
+To generating the snapshots for all the tests, execute

Review Comment:
   ```suggestion
   To generate the snapshots for all the tests, execute
   ```



##########
flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/migration/SnapshotGeneratorUtils.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.migration;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.test.util.MigrationTest;
+
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.RunRules;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.Parameterized;
+import org.junit.runners.model.Statement;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Array;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/** A utility class to execute generateSnapshots method for migration tests. */
+class SnapshotGeneratorUtils {
+
+    static void executeGenerate(Class<?> migrationTestClass, FlinkVersion 
flinkVersion)
+            throws Throwable {
+        for (Method method : migrationTestClass.getMethods()) {
+            if 
(method.isAnnotationPresent(MigrationTest.SnapshotsGenerator.class)) {
+                executeGenerateMethods(migrationTestClass, method, 
flinkVersion);
+            } else if (method.isAnnotationPresent(
+                    MigrationTest.ParameterizedSnapshotsGenerator.class)) {
+                String parametersMethodName =
+                        
method.getAnnotation(MigrationTest.ParameterizedSnapshotsGenerator.class)
+                                .value();
+                Method parametersMethod =
+                        migrationTestClass.getMethod(parametersMethodName, 
FlinkVersion.class);
+                executeParameterizedGenerateMethods(
+                        migrationTestClass, method, parametersMethod, 
flinkVersion);
+            }
+        }
+    }
+
+    private static void executeGenerateMethods(
+            Class<?> migrationTestClass, Method method, FlinkVersion version) 
throws Throwable {
+        method.setAccessible(true);
+        List<TestRule> classRules = getRuleFields(migrationTestClass, 
ClassRule.class, null);
+
+        executeWithRules(
+                classRules,
+                new Statement() {
+                    @Override
+                    public void evaluate() throws Throwable {
+                        Object migrationTest = 
createMigrationTest(migrationTestClass);
+                        List<TestRule> rules =
+                                getRuleFields(migrationTestClass, Rule.class, 
migrationTest);
+                        executeWithRules(
+                                rules,
+                                new Statement() {
+                                    @Override
+                                    public void evaluate() throws Throwable {
+                                        method.invoke(migrationTest, version);
+                                    }
+                                });
+                    }
+                });
+    }
+
+    private static void executeParameterizedGenerateMethods(
+            Class<?> migrationTestClass,
+            Method method,
+            Method parametersMethod,
+            FlinkVersion version)
+            throws Throwable {
+        method.setAccessible(true);
+        parametersMethod.setAccessible(true);
+        List<TestRule> classRules = getRuleFields(migrationTestClass, 
ClassRule.class, null);
+
+        executeWithRules(
+                classRules,
+                new Statement() {
+                    @Override
+                    public void evaluate() throws Throwable {
+                        Object migrationTest = 
createMigrationTest(migrationTestClass);
+                        List<TestRule> rules =
+                                getRuleFields(migrationTestClass, Rule.class, 
migrationTest);
+                        Collection<?> arguments =
+                                (Collection<?>) 
parametersMethod.invoke(migrationTest, version);
+                        for (Object argument : arguments) {
+                            executeWithRules(
+                                    rules,
+                                    new Statement() {
+                                        @Override
+                                        public void evaluate() throws 
Throwable {
+                                            method.invoke(migrationTest, 
argument);
+                                        }
+                                    });
+                        }
+                    }
+                });
+    }
+
+    private static void executeWithRules(List<TestRule> rules, Statement 
statement)
+            throws Throwable {
+        new RunRules(statement, rules, Description.EMPTY).evaluate();
+    }
+
+    private static List<TestRule> getRuleFields(
+            Class<?> migrationTestClass, Class<? extends Annotation> tagClass, 
Object migrationTest)
+            throws IllegalAccessException {
+        List<TestRule> rules = new ArrayList<>();
+
+        Field[] fields = migrationTestClass.getFields();
+        for (Field field : fields) {
+            if (ExternalResource.class.isAssignableFrom(field.getType())
+                    && field.isAnnotationPresent(tagClass)) {
+                field.setAccessible(true);
+                rules.add((ExternalResource) field.get(migrationTest));
+            }
+        }
+
+        return rules;
+    }
+
+    private static Object createMigrationTest(Class<?> migrationTestClass) 
throws Exception {
+        Constructor<?>[] constructors = 
migrationTestClass.getDeclaredConstructors();
+        for (Constructor<?> constructor : constructors) {
+            if (constructor.getParameterCount() == 0) {
+                constructor.setAccessible(true);
+                return constructor.newInstance();
+            }
+        }
+
+        // This class does not have a constructor without argument.
+        Constructor<?> constructor = constructors[0];

Review Comment:
   What if there are multiple constructors with arguments? :thinking: 



##########
flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/util/MigrationTest.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.test.migration.PublishedVersionUtils;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/** Interface for state migration tests. */
+public interface MigrationTest {
+
+    static FlinkVersion getMostRecentlyPublishedVersion() {
+        return PublishedVersionUtils.getMostRecentlyPublishedVersion();
+    }
+
+    /**
+     * Marks a method as snapshots generator. The method should be like
+     *
+     * <pre>
+     * {@literal @}SnapshotsGenerator
+     * void function(FlinkVersion version) {}
+     * </pre>
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.METHOD)
+    @interface SnapshotsGenerator {}
+
+    /**
+     * Marks a method as parameterized snapshots generator. The value should 
be the method that
+     * returns Collection of arguments.
+     *
+     * <p>The method generating parameters should be like
+     *
+     * <pre>
+     * Collection<?> parameters(FlinkVersion version) {}
+     * </pre>
+     *
+     * <p>The generator method should be like
+     *
+     * <pre>
+     * {@literal @}ParameterizedSnapshotsGenerator

Review Comment:
   ```suggestion
        * {@literal @}ParameterizedSnapshotsGenerator("parameters")
   ```
   Even though, I'd say that using a different method name 
("generateMethodParameters" instead of "parameters") might make the docs more 
explicit.



##########
flink-test-utils-parent/flink-migration-test-utils/README.md:
##########
@@ -0,0 +1,103 @@
+# Add State Migration Tests
+
+This module collects tools that help to generate test data for the state 
migration tests.
+
+The following dependency need to be added to the module's Maven config in case 
a
+migration test is meant to be added to that module:
+
+```xml
+<dependency>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>fink-migration-test-utils</artifactId>
+    <version>${project.version}</version>
+    <scope>test</scope>
+</dependency>
+```
+
+and the following profile
+
+```xml
+<profile>
+    <id>generate-migration-test-data</id>
+    <activation>
+        <property>
+            <name>generate-migration-test-data</name>
+        </property>
+    </activation>
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>generate-migration-test-data</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                        <configuration>
+                            <target>
+                                <condition property="optional.classes" 
value="--classes '${generate.classes}'"
+                                           else="">
+                                    <isset property="generate.classes"/>
+                                </condition>
+                                <condition property="optional.prefixes"
+                                           value="--prefixes 
'${generate.prefixes}'" else="">
+                                    <isset property="generate.prefixes"/>
+                                </condition>
+                                <java 
classname="org.apache.flink.test.migration.MigrationTestsSnapshotGenerator"
+                                      fork="true" failonerror="true" 
dir="${project.basedir}">
+                                    <classpath refid="maven.test.classpath"/>
+                                    <arg value="--dir"/>
+                                    <arg line="${project.basedir}"/>
+                                    <arg value="--version"/>
+                                    <arg value="${generate.version}"/>
+                                    <arg line="${optional.classes}"/>
+                                    <arg line="${optional.prefixes}"/>
+                                </java>
+                            </target>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</profile>
+```
+
+To show the log during generating, add
+
+```
+logger.migration.name = org.apache.flink.test.migration
+logger.migration.level = INFO
+```
+
+to the `log4j2-test.properties` of this module.
+
+The state migration tests should satisfy
+
+1. The tests are named like `*(Test|ITCase).(java|scala)`.
+2. The test class name is the same with the file name.
+3. The test implements `org.apache.flink.test.util.MigrationTest` and the 
snapshots generator methods are labeled
+   with `@SnapshotsGenerato` or `@ParameterizedSnapshotsGenerator`.
+
+# Generating Snapshots
+
+To generating the snapshots for all the tests, execute

Review Comment:
   ```suggestion
   To generating the snapshots for all the tests, execute from within the 
target version's release branch:
   ```



##########
flink-test-utils-parent/flink-migration-test-utils/README.md:
##########
@@ -0,0 +1,103 @@
+# Add State Migration Tests
+
+This module collects tools that help to generate test data for the state 
migration tests.
+
+The following dependency need to be added to the module's Maven config in case 
a
+migration test is meant to be added to that module:
+
+```xml
+<dependency>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>fink-migration-test-utils</artifactId>
+    <version>${project.version}</version>
+    <scope>test</scope>
+</dependency>
+```
+
+and the following profile
+
+```xml
+<profile>
+    <id>generate-migration-test-data</id>
+    <activation>
+        <property>
+            <name>generate-migration-test-data</name>
+        </property>
+    </activation>
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>generate-migration-test-data</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                        <configuration>
+                            <target>
+                                <condition property="optional.classes" 
value="--classes '${generate.classes}'"
+                                           else="">
+                                    <isset property="generate.classes"/>
+                                </condition>
+                                <condition property="optional.prefixes"
+                                           value="--prefixes 
'${generate.prefixes}'" else="">
+                                    <isset property="generate.prefixes"/>
+                                </condition>
+                                <java 
classname="org.apache.flink.test.migration.MigrationTestsSnapshotGenerator"
+                                      fork="true" failonerror="true" 
dir="${project.basedir}">
+                                    <classpath refid="maven.test.classpath"/>
+                                    <arg value="--dir"/>
+                                    <arg line="${project.basedir}"/>
+                                    <arg value="--version"/>
+                                    <arg value="${generate.version}"/>
+                                    <arg line="${optional.classes}"/>
+                                    <arg line="${optional.prefixes}"/>
+                                </java>
+                            </target>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</profile>
+```
+
+To show the log during generating, add
+
+```
+logger.migration.name = org.apache.flink.test.migration
+logger.migration.level = INFO
+```
+
+to the `log4j2-test.properties` of this module.
+
+The state migration tests should satisfy
+
+1. The tests are named like `*(Test|ITCase).(java|scala)`.
+2. The test class name is the same with the file name.
+3. The test implements `org.apache.flink.test.util.MigrationTest` and the 
snapshots generator methods are labeled
+   with `@SnapshotsGenerato` or `@ParameterizedSnapshotsGenerator`.
+
+# Generating Snapshots
+
+To generating the snapshots for all the tests, execute
+
+```shell
+mvn clean package -Pgenerate-migration-test-data -Dgenerate.version=1.17 -nsu 
-Dfast -DskipTests
+```
+
+The version should be replaced with the target one.
+
+By default, it will search for the migration tests under `src/test/java` and 
`src/test/scala`. It is also supported
+to change the default search paths or only generating for the specific classes:

Review Comment:
   ```suggestion
   to change the default search paths or only generating for specific classes:
   ```



##########
flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTest.java:
##########
@@ -38,87 +38,89 @@ class PojoSerializerUpgradeTest extends 
TypeSerializerUpgradeTestBase<Object, Ob
         testVersions.add(FlinkVersion.v1_7);
         testVersions.add(FlinkVersion.v1_8);
         testVersions.add(FlinkVersion.v1_9);
-        testVersions.addAll(MIGRATION_VERSIONS);
-        for (FlinkVersion flinkVersion : testVersions) {
-            testSpecifications.add(
-                    new TestSpecification<>(
-                            "pojo-serializer-identical-schema",
-                            flinkVersion,
-                            
PojoSerializerUpgradeTestSpecifications.IdenticalPojoSchemaSetup.class,
-                            
PojoSerializerUpgradeTestSpecifications.IdenticalPojoSchemaVerifier
-                                    .class));
-            testSpecifications.add(
-                    new TestSpecification<>(
-                            "pojo-serializer-with-modified-schema",
-                            flinkVersion,
-                            
PojoSerializerUpgradeTestSpecifications.ModifiedPojoSchemaSetup.class,
-                            
PojoSerializerUpgradeTestSpecifications.ModifiedPojoSchemaVerifier
-                                    .class));
-            testSpecifications.add(
-                    new TestSpecification<>(
-                            "pojo-serializer-with-different-field-types",
-                            flinkVersion,
-                            PojoSerializerUpgradeTestSpecifications
-                                    .DifferentFieldTypePojoSchemaSetup.class,
-                            PojoSerializerUpgradeTestSpecifications
-                                    
.DifferentFieldTypePojoSchemaVerifier.class));
-            testSpecifications.add(
-                    new TestSpecification<>(
-                            
"pojo-serializer-with-modified-schema-in-registered-subclass",
-                            flinkVersion,
-                            PojoSerializerUpgradeTestSpecifications
-                                    
.ModifiedRegisteredPojoSubclassSchemaSetup.class,
-                            PojoSerializerUpgradeTestSpecifications
-                                    
.ModifiedRegisteredPojoSubclassSchemaVerifier.class));
-            testSpecifications.add(
-                    new TestSpecification<>(
-                            
"pojo-serializer-with-different-field-types-in-registered-subclass",
-                            flinkVersion,
-                            PojoSerializerUpgradeTestSpecifications
-                                    
.DifferentFieldTypePojoSubclassSchemaSetup.class,
-                            PojoSerializerUpgradeTestSpecifications
-                                    
.DifferentFieldTypePojoSubclassSchemaVerifier.class));
-            testSpecifications.add(
-                    new TestSpecification<>(
-                            "pojo-serializer-with-non-registered-subclass",
-                            flinkVersion,
-                            
PojoSerializerUpgradeTestSpecifications.NonRegisteredPojoSubclassSetup
-                                    .class,
-                            PojoSerializerUpgradeTestSpecifications
-                                    .NonRegisteredPojoSubclassVerifier.class));
-            testSpecifications.add(
-                    new TestSpecification<>(
-                            
"pojo-serializer-with-different-subclass-registration-order",
-                            flinkVersion,
-                            PojoSerializerUpgradeTestSpecifications
-                                    
.DifferentPojoSubclassRegistrationOrderSetup.class,
-                            PojoSerializerUpgradeTestSpecifications
-                                    
.DifferentPojoSubclassRegistrationOrderVerifier.class));
-            testSpecifications.add(
-                    new TestSpecification<>(
-                            "pojo-serializer-with-missing-registered-subclass",
-                            flinkVersion,
-                            PojoSerializerUpgradeTestSpecifications
-                                    .MissingRegisteredPojoSubclassSetup.class,
-                            PojoSerializerUpgradeTestSpecifications
-                                    
.MissingRegisteredPojoSubclassVerifier.class));
-            testSpecifications.add(
-                    new TestSpecification<>(
-                            "pojo-serializer-with-new-registered-subclass",
-                            flinkVersion,
-                            
PojoSerializerUpgradeTestSpecifications.NewRegisteredPojoSubclassSetup
-                                    .class,
-                            PojoSerializerUpgradeTestSpecifications
-                                    .NewRegisteredPojoSubclassVerifier.class));
-            testSpecifications.add(
-                    new TestSpecification<>(
-                            
"pojo-serializer-with-new-and-missing-registered-subclasses",
-                            flinkVersion,
-                            PojoSerializerUpgradeTestSpecifications
-                                    
.NewAndMissingRegisteredPojoSubclassesSetup.class,
-                            PojoSerializerUpgradeTestSpecifications
-                                    
.NewAndMissingRegisteredPojoSubclassesVerifier.class));
-        }
+        testVersions.addAll(super.getMigrationVersions());
+        return testVersions;
+    }
+
+    public Collection<TestSpecification<?, ?>> 
createTestSpecifications(FlinkVersion flinkVersion)
+            throws Exception {
+        ArrayList<TestSpecification<?, ?>> testSpecifications = new 
ArrayList<>();

Review Comment:
   ```suggestion
           Collection<TestSpecification<?, ?>> testSpecifications = new 
ArrayList<>();
   ```
   nit: We should use interfaces rather than classes as type in a variable 
declaration.



##########
flink-test-utils-parent/flink-migration-test-utils/README.md:
##########
@@ -0,0 +1,103 @@
+# Add State Migration Tests
+
+This module collects tools that help to generate test data for the state 
migration tests.
+
+The following dependency need to be added to the module's Maven config in case 
a
+migration test is meant to be added to that module:
+
+```xml
+<dependency>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>fink-migration-test-utils</artifactId>
+    <version>${project.version}</version>
+    <scope>test</scope>
+</dependency>
+```
+
+and the following profile
+
+```xml
+<profile>
+    <id>generate-migration-test-data</id>
+    <activation>
+        <property>
+            <name>generate-migration-test-data</name>
+        </property>
+    </activation>

Review Comment:
   What's the purpose of this property condition? In the documentation below, 
we explicitly enable the profile through the `-P` parameter. Do we want to 
support other means of enabling the profile or is this `<activation/>` added 
accidentally?



##########
flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java:
##########
@@ -465,23 +471,28 @@ private static <T> void assertSerializerIsValid(
     /** Paths to use during snapshot generation, which should only use the 
CURRENT_VERSION. */
     private Path getGenerateSerializerSnapshotFilePath(
             TestSpecification<PreviousElementT, UpgradedElementT> 
testSpecification) {
-        return Paths.get(getGenerateResourceDirectory(testSpecification) + 
"/serializer-snapshot");
+        return Paths.get(
+                getGenerateResourceDirectory(testSpecification, 
testSpecification.flinkVersion)
+                        + "/serializer-snapshot");
     }
 
     /** Paths to use during snapshot generation, which should only use the 
CURRENT_VERSION. */
     private Path getGenerateDataFilePath(
             TestSpecification<PreviousElementT, UpgradedElementT> 
testSpecification) {
-        return Paths.get(getGenerateResourceDirectory(testSpecification) + 
"/test-data");
+        return Paths.get(
+                getGenerateResourceDirectory(testSpecification, 
testSpecification.flinkVersion)
+                        + "/test-data");
     }
 
     /** Paths to use during snapshot generation, which should only use the 
CURRENT_VERSION. */
     private String getGenerateResourceDirectory(
-            TestSpecification<PreviousElementT, UpgradedElementT> 
testSpecification) {
+            TestSpecification<PreviousElementT, UpgradedElementT> 
testSpecification,
+            FlinkVersion currentVersion) {

Review Comment:
   Isn't the `currentVersion` redundant here because it's provided by 
`testSpecification` as well? 



##########
flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java:
##########
@@ -42,22 +42,26 @@
 @VisibleForTesting
 public class RowSerializerUpgradeTest extends 
TypeSerializerUpgradeTestBase<Row, Row> {
 
-    public Collection<TestSpecification<?, ?>> createTestSpecifications() 
throws Exception {
-        ArrayList<TestSpecification<?, ?>> testSpecifications = new 
ArrayList<>();
+    @Override
+    public Collection<FlinkVersion> getMigrationVersions() {
         // for RowSerializer we also test against 1.10 and newer because we 
have snapshots
         // for this which go beyond what we have for the usual subclasses of
         // TypeSerializerUpgradeTestBase
         List<FlinkVersion> testVersions = new ArrayList<>();
         testVersions.add(FlinkVersion.v1_10);
-        testVersions.addAll(MIGRATION_VERSIONS);
-        for (FlinkVersion flinkVersion : testVersions) {
-            testSpecifications.add(
-                    new TestSpecification<>(
-                            "row-serializer",
-                            flinkVersion,
-                            RowSerializerSetup.class,
-                            RowSerializerVerifier.class));
-        }
+        testVersions.addAll(super.getMigrationVersions());
+        return testVersions;
+    }
+
+    public Collection<TestSpecification<?, ?>> 
createTestSpecifications(FlinkVersion flinkVersion)
+            throws Exception {
+        ArrayList<TestSpecification<?, ?>> testSpecifications = new 
ArrayList<>();

Review Comment:
   ```suggestion
           Collection<TestSpecification<?, ?>> testSpecifications = new 
ArrayList<>();
   ```
   
   Alternatively, you could also use `Collections.singleton`:
   ```
   return Collections.singleton(
                   new TestSpecification<>(
                           "row-serializer",
                           flinkVersion,
                           RowSerializerSetup.class,
                           RowSerializerVerifier.class));
   ```



##########
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java:
##########
@@ -309,17 +288,34 @@ public void testMonitoringSourceRestore() throws 
Exception {
 
         testHarness.setup();
 
-        testHarness.initializeState(
-                OperatorSnapshotUtil.getResourceFilename(
-                        "monitoring-function-migration-test-"
-                                + expectedModTime
-                                + "-flink"
-                                + testMigrateVersion
-                                + "-snapshot"));
+        // Get the exact filename
+        Tuple2<String, Long> fileNameAndModTime = 
getResourceFilename(testMigrateVersion);
+
+        testHarness.initializeState(fileNameAndModTime.f0);
 
         testHarness.open();
 
-        Assert.assertEquals((long) expectedModTime, 
monitoringFunction.getGlobalModificationTime());
+        Assert.assertEquals(
+                fileNameAndModTime.f1.longValue(), 
monitoringFunction.getGlobalModificationTime());
+    }
+
+    private Tuple2<String, Long> getResourceFilename(FlinkVersion version) 
throws IOException {
+        String resourceDirectory = 
OperatorSnapshotUtil.getResourceFilename("");
+        Pattern fileNamePattern =
+                Pattern.compile(
+                        "monitoring-function-migration-test-(\\d+)-flink"

Review Comment:
   I'm kind of fascinated how complex this class was implemented. Why did we 
store the mod time in the class in the first place. Well done getting rid of 
the class fields, at least :+1: 



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase.java:
##########
@@ -60,62 +64,80 @@
  * previous Flink versions, as well as for different state backends.
  */
 @RunWith(Parameterized.class)
-public class StatefulJobSnapshotMigrationITCase extends 
SnapshotMigrationTestBase {
+public class StatefulJobSnapshotMigrationITCase extends 
SnapshotMigrationTestBase
+        implements MigrationTest {
 
     private static final int NUM_SOURCE_ELEMENTS = 4;
 
-    // TODO increase this to newer version to create and test snapshot 
migration for newer versions
-    private static final FlinkVersion currentVersion = FlinkVersion.v1_16;
+    @Parameterized.Parameters(name = "Test snapshot: {0}")
+    public static Collection<SnapshotSpec> createSpecsForTestRuns() {
+        return internalParameters(null);
+    }
+
+    public static Collection<SnapshotSpec> createSpecsForTestDataGeneration(
+            FlinkVersion targetVersion) {
+        return internalParameters(targetVersion);
+    }
 
-    // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
-    // TODO Note: You should generate the snapshot based on the release branch 
instead of the
-    // master.
-    private static final ExecutionMode executionMode = 
ExecutionMode.VERIFY_SNAPSHOT;
+    private static Collection<SnapshotSpec> internalParameters(
+            @Nullable FlinkVersion targetGeneratingVersion) {
+        BiFunction<FlinkVersion, FlinkVersion, Collection<FlinkVersion>> 
getFlinkVersions =
+                (minInclVersion, maxInclVersion) -> {
+                    if (targetGeneratingVersion != null) {
+                        return Collections.singleton(targetGeneratingVersion);
+                    } else {
+                        return FlinkVersion.rangeOf(minInclVersion, 
maxInclVersion);
+                    }
+                };
 
-    @Parameterized.Parameters(name = "Test snapshot: {0}")
-    public static Collection<SnapshotSpec> parameters() {
         Collection<SnapshotSpec> parameters = new LinkedList<>();
         parameters.addAll(
                 SnapshotSpec.withVersions(
                         StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
                         SnapshotType.SAVEPOINT_CANONICAL,
-                        FlinkVersion.rangeOf(FlinkVersion.v1_8, 
FlinkVersion.v1_14)));
+                        getFlinkVersions.apply(FlinkVersion.v1_8, 
FlinkVersion.v1_14)));

Review Comment:
   I noticed that we're doing something wrong in this specific case: The old 
code generated the `SnapshotSpec` instances for each version and then filtered 
out the once that have the "current version" (i.e. 
`getMostRecentlyPublishedVersion`). But for the first entries (i.e. 
`StateBackendLoader.MEMORY_STATE_BACKEND_NAME`/`SnapshotType.SAVEPOINT_CANONICAL`),
 we didn't add such a `SnapshotSpec` for the most-recently published version 
and, therefore, didn't create a `SnapshotSpec` for that version. With the new 
code, we do create it. WDYT?



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobWBroadcastStateMigrationITCase.java:
##########
@@ -36,81 +36,103 @@
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;
 import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
+import org.apache.flink.test.util.MigrationTest;
 import org.apache.flink.util.Collector;
 
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import javax.annotation.Nullable;
+
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.function.BiFunction;
 
 /**
  * Migration ITCases for a stateful job with broadcast state. The tests are 
parameterized to
  * (potentially) cover migrating for multiple previous Flink versions, as well 
as for different
  * state backends.
  */
 @RunWith(Parameterized.class)
-public class StatefulJobWBroadcastStateMigrationITCase extends 
SnapshotMigrationTestBase {
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SnapshotMigrationTestBase
+        implements MigrationTest {
 
     private static final int NUM_SOURCE_ELEMENTS = 4;
 
-    // TODO increase this to newer version to create and test snapshot 
migration for newer versions
-    private static final FlinkVersion currentVersion = FlinkVersion.v1_16;
+    @Parameterized.Parameters(name = "Test snapshot: {0}")
+    public static Collection<SnapshotSpec> createSpecsForTestRuns() {
+        return internalParameters(null);
+    }
+
+    public static Collection<SnapshotSpec> createSpecsForTestDataGeneration(
+            FlinkVersion targetVersion) {
+        return internalParameters(targetVersion);
+    }
 
-    // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
-    // TODO Note: You should generate the snapshot based on the release branch 
instead of the
-    // master.
-    private static final ExecutionMode executionMode = 
ExecutionMode.VERIFY_SNAPSHOT;
+    private static Collection<SnapshotSpec> internalParameters(
+            @Nullable FlinkVersion targetGeneratingVersion) {
+        BiFunction<FlinkVersion, FlinkVersion, Collection<FlinkVersion>> 
getFlinkVersions =
+                (minInclVersion, maxInclVersion) -> {
+                    if (targetGeneratingVersion != null) {
+                        return Collections.singleton(targetGeneratingVersion);
+                    } else {
+                        return FlinkVersion.rangeOf(minInclVersion, 
maxInclVersion);
+                    }
+                };
 
-    @Parameterized.Parameters(name = "Test snapshot: {0}")
-    public static Collection<SnapshotSpec> parameters() {
         Collection<SnapshotSpec> parameters = new LinkedList<>();
         parameters.addAll(
                 SnapshotSpec.withVersions(
                         StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
                         SnapshotType.SAVEPOINT_CANONICAL,
-                        FlinkVersion.rangeOf(FlinkVersion.v1_8, 
FlinkVersion.v1_14)));
+                        getFlinkVersions.apply(FlinkVersion.v1_8, 
FlinkVersion.v1_14)));

Review Comment:
   Here we have a behavioral change as well: The old code didn't generate the 
specs for the current version but the new code does.



##########
flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java:
##########
@@ -104,16 +123,32 @@ public abstract class AbstractOperatorRestoreTestBase 
extends TestLogger {
                             .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
                             .build());
 
-    private final boolean allowNonRestoredState;

Review Comment:
   I checked why we don't need that anymore. Apparently, this feature was added 
with FLINK-7595 and removed for Flink 1.4 with FLINK-8472. I'm gonna leave it 
like that. I guess it's fine to remove it :+1: 



##########
flink-test-utils-parent/flink-migration-test-utils/README.md:
##########
@@ -0,0 +1,103 @@
+# Add State Migration Tests
+
+This module collects tools that help to generate test data for the state 
migration tests.
+
+The following dependency need to be added to the module's Maven config in case 
a
+migration test is meant to be added to that module:
+
+```xml
+<dependency>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>fink-migration-test-utils</artifactId>
+    <version>${project.version}</version>
+    <scope>test</scope>
+</dependency>
+```
+
+and the following profile
+
+```xml
+<profile>
+    <id>generate-migration-test-data</id>
+    <activation>
+        <property>
+            <name>generate-migration-test-data</name>
+        </property>
+    </activation>

Review Comment:
   ...as a consequence, we would have to update all the pom.xml files 
accordingly.



##########
flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala:
##########
@@ -75,43 +91,44 @@ object StatefulJobWBroadcastStateMigrationITCase {
       SnapshotSpec.withVersions(
         StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
         SnapshotType.SAVEPOINT_CANONICAL,
-        FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_13)))
+        getFlinkVersions.apply(FlinkVersion.v1_8, FlinkVersion.v1_13)

Review Comment:
   here as welll: We're filtering in the old code but don't do it in the new 
code. :thinking: 



##########
flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/migration/PublishedVersionUtils.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.migration;
+
+import org.apache.flink.FlinkVersion;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Objects;
+
+/** Utilities for update and read the most-recently published flink version. */
+public class PublishedVersionUtils {
+
+    private static final String MOST_RECENTLY_PUBLISHED_VERSION_FILE =
+            "most_recently_published_version";
+
+    public static FlinkVersion getMostRecentlyPublishedVersion() {

Review Comment:
   I'm wondering whether we should add some functionality for ignoring comments 
(e.g. lines starting with an `#`). That way we could add some comment into the 
file to give some context for the value. Just as an idea: You don't have to do 
it if you think that it's to much...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to