XComp commented on code in PR #21736: URL: https://github.com/apache/flink/pull/21736#discussion_r1142950463
########## flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/migration/MigrationTestsSnapshotGenerator.java: ########## @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.migration; + +import org.apache.flink.FlinkVersion; +import org.apache.flink.test.util.MigrationTest; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * This class aims to generate the snapshots for all the state migration tests. A migration test + * should implement {@link MigrationTest} interface and its name should match {@code + * *(Test|ITCase)(.java|.scala)}. For scala tests, we also require the class name is the same with + * the containing filename. + */ +public class MigrationTestsSnapshotGenerator { + + private static final Logger LOG = + LoggerFactory.getLogger(MigrationTestsSnapshotGenerator.class); + + private static final String DEFAULT_PATH_PREFIXES = "src/test/java,src/test/scala"; + + private static final Option OPTION_HELP = + Option.builder() + .longOpt("help") + .required(false) + .hasArg(false) + .desc("print this help information.") + .build(); + + private static final Option OPTION_DIR = + Option.builder() + .longOpt("dir") + .required() + .hasArg() + .desc("The root directory for scanning. Required.") + .build(); + + private static final Option OPTION_VERSION = + Option.builder() + .longOpt("version") + .required() + .hasArg() + .desc("The version to generate. Required.") + .build(); + + private static final Option OPTION_PREFIXES = + Option.builder() + .longOpt("prefixes") + .required(false) + .hasArg() + .desc( + "The prefix paths to scan under the root directory, separated by \",\". Default to \"" + + DEFAULT_PATH_PREFIXES + + "\"") + .build(); + + private static final Option OPTION_CLASSES = + Option.builder() + .longOpt("classes") + .required(false) + .hasArg() + .desc( + "The specified qualified class name to generate test data, " + + "separated by \",\". This option has a higher priority than the prefix option.") + .build(); + + private static final Pattern VERSION_PATTERN = Pattern.compile("v?([0-9]+)[._]([0-9]+)"); + + private static final String CLASS_NAME_GROUP = "className"; + private static final Pattern CLASS_NAME_PATTERN = + Pattern.compile("(?<" + CLASS_NAME_GROUP + ">[a-zA-Z0-9]*(Test|ITCase))(.java|.scala)"); + + public static void main(String[] args) { + for (String s : args) { + if (s.equals("--" + OPTION_HELP.getLongOpt())) { + HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.setOptionComparator(null); + helpFormatter.printHelp( + "java " + MigrationTestsSnapshotGenerator.class.getName(), createOptions()); + return; + } + } + + try { + Options options = createOptions(); + CommandLineParser parser = new DefaultParser(); + CommandLine commandLine = parser.parse(options, args); + + File rootDirectory = new File(commandLine.getOptionValue(OPTION_DIR)); + if (!rootDirectory.exists() || !rootDirectory.isDirectory()) { + throw new FileNotFoundException( + rootDirectory + " does not exist or is not a directory."); + } + + String versionName = commandLine.getOptionValue(OPTION_VERSION); + Matcher versionMatcher = VERSION_PATTERN.matcher(versionName); + if (!versionMatcher.matches()) { + throw new IllegalArgumentException( + "Version " + + versionName + + "could not be parsed, " + + "please specify the version with format like 1.17, 1_17, v1_17, v1.17"); + } + FlinkVersion version = + FlinkVersion.valueOf( + Integer.parseInt(versionMatcher.group(1)), + Integer.parseInt(versionMatcher.group(2))); + + LOG.info( + "Start test data generating for module {} and version {}", + rootDirectory, + version); + List<Class<?>> migrationTests; + if (commandLine.hasOption(OPTION_CLASSES)) { + migrationTests = + loadSpecifiedMigrationTests( + commandLine.getOptionValue(OPTION_CLASSES).split(",\\s*")); + } else { + String[] prefixes = + commandLine + .getOptionValue(OPTION_PREFIXES, DEFAULT_PATH_PREFIXES) + .split(",\\s*"); + migrationTests = findMigrationTests(rootDirectory.getAbsolutePath(), prefixes); + } + + for (Class<?> migrationTestClass : migrationTests) { + LOG.info("Start test data generating for {}", migrationTestClass.getName()); + SnapshotGeneratorUtils.executeGenerate(migrationTestClass, version); + LOG.info("Finish test data generating for {}", migrationTestClass.getName()); + } + } catch (Throwable e) { Review Comment: What's the value of translating the `Throwable` into a `RuntimeException` which is then thrown anyway? I guess, we could get rid of the try/catch block entirely, here. WDYT? ########## flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/migration/MigrationTestsSnapshotGenerator.java: ########## @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.migration; + +import org.apache.flink.FlinkVersion; +import org.apache.flink.test.util.MigrationTest; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * This class aims to generate the snapshots for all the state migration tests. A migration test + * should implement {@link MigrationTest} interface and its name should match {@code + * *(Test|ITCase)(.java|.scala)}. For scala tests, we also require the class name is the same with + * the containing filename. + */ +public class MigrationTestsSnapshotGenerator { + + private static final Logger LOG = + LoggerFactory.getLogger(MigrationTestsSnapshotGenerator.class); + + private static final String DEFAULT_PATH_PREFIXES = "src/test/java,src/test/scala"; + + private static final Option OPTION_HELP = + Option.builder() + .longOpt("help") + .required(false) + .hasArg(false) + .desc("print this help information.") + .build(); + + private static final Option OPTION_DIR = + Option.builder() + .longOpt("dir") + .required() + .hasArg() + .desc("The root directory for scanning. Required.") + .build(); + + private static final Option OPTION_VERSION = + Option.builder() + .longOpt("version") + .required() + .hasArg() + .desc("The version to generate. Required.") + .build(); + + private static final Option OPTION_PREFIXES = + Option.builder() + .longOpt("prefixes") + .required(false) + .hasArg() + .desc( + "The prefix paths to scan under the root directory, separated by \",\". Default to \"" + + DEFAULT_PATH_PREFIXES + + "\"") + .build(); + + private static final Option OPTION_CLASSES = + Option.builder() + .longOpt("classes") + .required(false) + .hasArg() + .desc( + "The specified qualified class name to generate test data, " + + "separated by \",\". This option has a higher priority than the prefix option.") + .build(); + + private static final Pattern VERSION_PATTERN = Pattern.compile("v?([0-9]+)[._]([0-9]+)"); + + private static final String CLASS_NAME_GROUP = "className"; + private static final Pattern CLASS_NAME_PATTERN = + Pattern.compile("(?<" + CLASS_NAME_GROUP + ">[a-zA-Z0-9]*(Test|ITCase))(.java|.scala)"); + + public static void main(String[] args) { + for (String s : args) { + if (s.equals("--" + OPTION_HELP.getLongOpt())) { + HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.setOptionComparator(null); + helpFormatter.printHelp( + "java " + MigrationTestsSnapshotGenerator.class.getName(), createOptions()); + return; + } + } Review Comment: That for loop can be removed. Instead, you could just use `commandLine.hasOption(OPTION_HELP)` utilizing the proper parameter parsing. ########## flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/migration/MigrationTestsSnapshotGenerator.java: ########## @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.migration; + +import org.apache.flink.FlinkVersion; +import org.apache.flink.test.util.MigrationTest; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * This class aims to generate the snapshots for all the state migration tests. A migration test + * should implement {@link MigrationTest} interface and its name should match {@code + * *(Test|ITCase)(.java|.scala)}. For scala tests, we also require the class name is the same with + * the containing filename. + */ +public class MigrationTestsSnapshotGenerator { + + private static final Logger LOG = + LoggerFactory.getLogger(MigrationTestsSnapshotGenerator.class); + + private static final String DEFAULT_PATH_PREFIXES = "src/test/java,src/test/scala"; + + private static final Option OPTION_HELP = + Option.builder() + .longOpt("help") + .required(false) + .hasArg(false) + .desc("print this help information.") + .build(); + + private static final Option OPTION_DIR = + Option.builder() + .longOpt("dir") + .required() + .hasArg() + .desc("The root directory for scanning. Required.") + .build(); + + private static final Option OPTION_VERSION = + Option.builder() + .longOpt("version") + .required() + .hasArg() + .desc("The version to generate. Required.") + .build(); + + private static final Option OPTION_PREFIXES = + Option.builder() + .longOpt("prefixes") + .required(false) + .hasArg() + .desc( + "The prefix paths to scan under the root directory, separated by \",\". Default to \"" + + DEFAULT_PATH_PREFIXES + + "\"") + .build(); + + private static final Option OPTION_CLASSES = + Option.builder() + .longOpt("classes") + .required(false) + .hasArg() + .desc( + "The specified qualified class name to generate test data, " + + "separated by \",\". This option has a higher priority than the prefix option.") + .build(); + + private static final Pattern VERSION_PATTERN = Pattern.compile("v?([0-9]+)[._]([0-9]+)"); + + private static final String CLASS_NAME_GROUP = "className"; + private static final Pattern CLASS_NAME_PATTERN = + Pattern.compile("(?<" + CLASS_NAME_GROUP + ">[a-zA-Z0-9]*(Test|ITCase))(.java|.scala)"); + + public static void main(String[] args) { + for (String s : args) { + if (s.equals("--" + OPTION_HELP.getLongOpt())) { + HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.setOptionComparator(null); + helpFormatter.printHelp( + "java " + MigrationTestsSnapshotGenerator.class.getName(), createOptions()); + return; + } + } + + try { + Options options = createOptions(); + CommandLineParser parser = new DefaultParser(); + CommandLine commandLine = parser.parse(options, args); + + File rootDirectory = new File(commandLine.getOptionValue(OPTION_DIR)); + if (!rootDirectory.exists() || !rootDirectory.isDirectory()) { + throw new FileNotFoundException( + rootDirectory + " does not exist or is not a directory."); + } + + String versionName = commandLine.getOptionValue(OPTION_VERSION); + Matcher versionMatcher = VERSION_PATTERN.matcher(versionName); + if (!versionMatcher.matches()) { + throw new IllegalArgumentException( + "Version " + + versionName + + "could not be parsed, " + + "please specify the version with format like 1.17, 1_17, v1_17, v1.17"); + } + FlinkVersion version = + FlinkVersion.valueOf( + Integer.parseInt(versionMatcher.group(1)), + Integer.parseInt(versionMatcher.group(2))); + + LOG.info( + "Start test data generating for module {} and version {}", + rootDirectory, + version); + List<Class<?>> migrationTests; + if (commandLine.hasOption(OPTION_CLASSES)) { + migrationTests = + loadSpecifiedMigrationTests( + commandLine.getOptionValue(OPTION_CLASSES).split(",\\s*")); + } else { + String[] prefixes = + commandLine + .getOptionValue(OPTION_PREFIXES, DEFAULT_PATH_PREFIXES) + .split(",\\s*"); + migrationTests = findMigrationTests(rootDirectory.getAbsolutePath(), prefixes); + } + + for (Class<?> migrationTestClass : migrationTests) { + LOG.info("Start test data generating for {}", migrationTestClass.getName()); + SnapshotGeneratorUtils.executeGenerate(migrationTestClass, version); + LOG.info("Finish test data generating for {}", migrationTestClass.getName()); + } + } catch (Throwable e) { + LOG.error("Failed to generate snapshots for the state migration tests", e); + throw new RuntimeException(e); + } + } + + private static Options createOptions() { + Options options = new Options(); + + // The root directory for the scanning + options.addOption(OPTION_HELP); + options.addOption(OPTION_DIR); + options.addOption(OPTION_VERSION); + options.addOption(OPTION_PREFIXES); + options.addOption(OPTION_CLASSES); + return options; + } + + private static List<Class<?>> loadSpecifiedMigrationTests(String[] specifiedClassNames) + throws Exception { + List<Class<?>> migrationTestClasses = new ArrayList<>(); + for (String name : specifiedClassNames) { + Class<?> clazz = Class.forName(name); + if (!MigrationTest.class.isAssignableFrom(clazz)) { + throw new IllegalArgumentException("Class " + name + " is not migration test."); + } + migrationTestClasses.add(clazz); + } + return migrationTestClasses; + } + + private static List<Class<?>> findMigrationTests(String rootDirectory, String[] prefixes) + throws Exception { + List<Class<?>> migrationTestClasses = new ArrayList<>(); + for (String prefix : prefixes) { + recursivelyFindMigrationTests(rootDirectory, prefix, "", migrationTestClasses); + } + + return migrationTestClasses; + } + + private static void recursivelyFindMigrationTests( + String rootDirectory, String prefix, String packageName, List<Class<?>> result) + throws Exception { + File codeDirectory = new File(rootDirectory + File.separator + prefix); + if (!codeDirectory.exists()) { + return; + } + + // Search all the directories + try (DirectoryStream<Path> stream = Files.newDirectoryStream(codeDirectory.toPath())) { + for (Path entry : stream) { + File file = entry.toFile(); + if (file.isDirectory()) { + recursivelyFindMigrationTests( + rootDirectory, + prefix + File.separator + file.getName(), + packageName.isEmpty() + ? file.getName() + : packageName + "." + file.getName(), + result); + } else { + Matcher m = CLASS_NAME_PATTERN.matcher(file.getName()); + if (m.matches()) { + String className = packageName + "." + m.group(CLASS_NAME_GROUP); Review Comment: Is this correct? We would generate a ".ABCTest" class name if `packageName` is empty. :thinking: ########## flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/migration/SnapshotGeneratorUtils.java: ########## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.migration; + +import org.apache.flink.FlinkVersion; +import org.apache.flink.test.util.MigrationTest; + +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.rules.ExternalResource; +import org.junit.rules.RunRules; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.Parameterized; +import org.junit.runners.model.Statement; + +import java.lang.annotation.Annotation; +import java.lang.reflect.Array; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** A utility class to execute generateSnapshots method for migration tests. */ +class SnapshotGeneratorUtils { + + static void executeGenerate(Class<?> migrationTestClass, FlinkVersion flinkVersion) + throws Throwable { + for (Method method : migrationTestClass.getMethods()) { + if (method.isAnnotationPresent(MigrationTest.SnapshotsGenerator.class)) { + executeGenerateMethods(migrationTestClass, method, flinkVersion); + } else if (method.isAnnotationPresent( + MigrationTest.ParameterizedSnapshotsGenerator.class)) { + String parametersMethodName = + method.getAnnotation(MigrationTest.ParameterizedSnapshotsGenerator.class) + .value(); + Method parametersMethod = + migrationTestClass.getMethod(parametersMethodName, FlinkVersion.class); + executeParameterizedGenerateMethods( + migrationTestClass, method, parametersMethod, flinkVersion); + } + } + } + + private static void executeGenerateMethods( + Class<?> migrationTestClass, Method method, FlinkVersion version) throws Throwable { + method.setAccessible(true); + List<TestRule> classRules = getRuleFields(migrationTestClass, ClassRule.class, null); + + executeWithRules( + classRules, + new Statement() { + @Override + public void evaluate() throws Throwable { + Object migrationTest = createMigrationTest(migrationTestClass); + List<TestRule> rules = + getRuleFields(migrationTestClass, Rule.class, migrationTest); + executeWithRules( + rules, + new Statement() { + @Override + public void evaluate() throws Throwable { + method.invoke(migrationTest, version); + } + }); + } + }); + } + + private static void executeParameterizedGenerateMethods( + Class<?> migrationTestClass, + Method method, + Method parametersMethod, + FlinkVersion version) + throws Throwable { + method.setAccessible(true); + parametersMethod.setAccessible(true); + List<TestRule> classRules = getRuleFields(migrationTestClass, ClassRule.class, null); + + executeWithRules( + classRules, + new Statement() { + @Override + public void evaluate() throws Throwable { + Object migrationTest = createMigrationTest(migrationTestClass); + List<TestRule> rules = + getRuleFields(migrationTestClass, Rule.class, migrationTest); + Collection<?> arguments = + (Collection<?>) parametersMethod.invoke(migrationTest, version); + for (Object argument : arguments) { + executeWithRules( + rules, + new Statement() { + @Override + public void evaluate() throws Throwable { + method.invoke(migrationTest, argument); + } + }); + } + } + }); + } + + private static void executeWithRules(List<TestRule> rules, Statement statement) + throws Throwable { + new RunRules(statement, rules, Description.EMPTY).evaluate(); + } + + private static List<TestRule> getRuleFields( + Class<?> migrationTestClass, Class<? extends Annotation> tagClass, Object migrationTest) + throws IllegalAccessException { + List<TestRule> rules = new ArrayList<>(); + + Field[] fields = migrationTestClass.getFields(); + for (Field field : fields) { + if (ExternalResource.class.isAssignableFrom(field.getType()) + && field.isAnnotationPresent(tagClass)) { + field.setAccessible(true); + rules.add((ExternalResource) field.get(migrationTest)); + } + } + + return rules; + } + + private static Object createMigrationTest(Class<?> migrationTestClass) throws Exception { + Constructor<?>[] constructors = migrationTestClass.getDeclaredConstructors(); + for (Constructor<?> constructor : constructors) { + if (constructor.getParameterCount() == 0) { + constructor.setAccessible(true); + return constructor.newInstance(); + } + } + + // This class does not have a constructor without argument. + Constructor<?> constructor = constructors[0]; + constructor.setAccessible(true); + + // Check if we could find method labeled with @Parameterized.Parameters + for (Method method : migrationTestClass.getMethods()) { + if (Modifier.isStatic(method.getModifiers()) + && method.isAnnotationPresent(Parameterized.Parameters.class)) { + Object argumentLists = method.invoke(null); + if (argumentLists instanceof Collection) { + return constructor.newInstance( + ((Collection<?>) argumentLists).iterator().next()); + } else if (argumentLists.getClass().isArray()) { + return constructor.newInstance(Array.get(argumentLists, 0)); + } else { + throw new RuntimeException( + "Failed to create parameterized class object due to argument lists type not supported: " + + argumentLists.getClass()); + } + } + } + + throw new RuntimeException("Could not create the object for " + migrationTestClass); Review Comment: ```suggestion throw new RuntimeException("Could not create the object for " + migrationTestClass + ": No default constructor or @Parameterized.Parameters method found."); ``` ########## flink-test-utils-parent/flink-migration-test-utils/README.md: ########## @@ -0,0 +1,103 @@ +# Add State Migration Tests + +This module collects tools that help to generate test data for the state migration tests. + +The following dependency need to be added to the module's Maven config in case a +migration test is meant to be added to that module: + +```xml +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>fink-migration-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> +</dependency> +``` + +and the following profile + +```xml +<profile> + <id>generate-migration-test-data</id> + <activation> + <property> + <name>generate-migration-test-data</name> + </property> + </activation> + <build> + <plugins> + <plugin> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <id>generate-migration-test-data</id> + <phase>package</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <target> + <condition property="optional.classes" value="--classes '${generate.classes}'" + else=""> + <isset property="generate.classes"/> + </condition> + <condition property="optional.prefixes" + value="--prefixes '${generate.prefixes}'" else=""> + <isset property="generate.prefixes"/> + </condition> + <java classname="org.apache.flink.test.migration.MigrationTestsSnapshotGenerator" + fork="true" failonerror="true" dir="${project.basedir}"> + <classpath refid="maven.test.classpath"/> + <arg value="--dir"/> + <arg line="${project.basedir}"/> + <arg value="--version"/> + <arg value="${generate.version}"/> + <arg line="${optional.classes}"/> + <arg line="${optional.prefixes}"/> + </java> + </target> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</profile> +``` + +To show the log during generating, add + +``` +logger.migration.name = org.apache.flink.test.migration +logger.migration.level = INFO +``` + +to the `log4j2-test.properties` of this module. + +The state migration tests should satisfy + +1. The tests are named like `*(Test|ITCase).(java|scala)`. +2. The test class name is the same with the file name. +3. The test implements `org.apache.flink.test.util.MigrationTest` and the snapshots generator methods are labeled + with `@SnapshotsGenerato` or `@ParameterizedSnapshotsGenerator`. Review Comment: ```suggestion with `@SnapshotsGenerator` or `@ParameterizedSnapshotsGenerator`. ``` ########## flink-test-utils-parent/flink-migration-test-utils/README.md: ########## @@ -0,0 +1,103 @@ +# Add State Migration Tests + +This module collects tools that help to generate test data for the state migration tests. + +The following dependency need to be added to the module's Maven config in case a +migration test is meant to be added to that module: + +```xml +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>fink-migration-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> +</dependency> +``` + +and the following profile + +```xml +<profile> + <id>generate-migration-test-data</id> + <activation> + <property> + <name>generate-migration-test-data</name> + </property> + </activation> + <build> + <plugins> + <plugin> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <id>generate-migration-test-data</id> + <phase>package</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <target> + <condition property="optional.classes" value="--classes '${generate.classes}'" + else=""> + <isset property="generate.classes"/> + </condition> + <condition property="optional.prefixes" + value="--prefixes '${generate.prefixes}'" else=""> + <isset property="generate.prefixes"/> + </condition> + <java classname="org.apache.flink.test.migration.MigrationTestsSnapshotGenerator" + fork="true" failonerror="true" dir="${project.basedir}"> + <classpath refid="maven.test.classpath"/> + <arg value="--dir"/> + <arg line="${project.basedir}"/> + <arg value="--version"/> + <arg value="${generate.version}"/> + <arg line="${optional.classes}"/> + <arg line="${optional.prefixes}"/> + </java> + </target> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</profile> +``` + +To show the log during generating, add + +``` +logger.migration.name = org.apache.flink.test.migration +logger.migration.level = INFO +``` + +to the `log4j2-test.properties` of this module. + +The state migration tests should satisfy + +1. The tests are named like `*(Test|ITCase).(java|scala)`. +2. The test class name is the same with the file name. +3. The test implements `org.apache.flink.test.util.MigrationTest` and the snapshots generator methods are labeled + with `@SnapshotsGenerato` or `@ParameterizedSnapshotsGenerator`. + +# Generating Snapshots + +To generating the snapshots for all the tests, execute + +```shell +mvn clean package -Pgenerate-migration-test-data -Dgenerate.version=1.17 -nsu -Dfast -DskipTests +``` + +The version should be replaced with the target one. Review Comment: ```suggestion The version (in the command above `1.17`) should be replaced with the target one. ``` ########## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationOperatorTest.java: ########## @@ -52,9 +51,6 @@ public String getOperatorSnapshotPath(FlinkVersion version) { + "-snapshot"; } - @Ignore @Override - public void writeSnapshot() throws Exception { - throw new UnsupportedOperationException(); - } + public void writeSnapshot(FlinkVersion targetVersion) throws Exception {} Review Comment: Why do we need to override this method here? Is it because this class is inhering `MigrationTest` and therefore becomes eligible for test data generation as well? Overwriting this method would, therefore, disable the data generation because we're relying on the test data generation in `FlinkKafkaProducerMigrationTest`?! We might want to add a comment to the method's body here mentioning the reasoning. ########## flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationOperatorTest.java: ########## @@ -20,15 +20,14 @@ import org.apache.flink.FlinkVersion; -import org.junit.Ignore; import org.junit.runners.Parameterized; import java.util.Arrays; import java.util.Collection; /** * Migration test from FlinkKafkaProducer011 operator. This test depends on the resource generated - * by {@link FlinkKafkaProducer011MigrationTest#writeSnapshot()}. + * by {@link FlinkKafkaProducerMigrationOperatorTest#writeSnapshot(FlinkVersion)}. Review Comment: This sounds wrong? :thinking: ########## flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java: ########## @@ -309,17 +288,34 @@ public void testMonitoringSourceRestore() throws Exception { testHarness.setup(); - testHarness.initializeState( - OperatorSnapshotUtil.getResourceFilename( - "monitoring-function-migration-test-" - + expectedModTime - + "-flink" - + testMigrateVersion - + "-snapshot")); + // Get the exact filename + Tuple2<String, Long> fileNameAndModTime = getResourceFilename(testMigrateVersion); + + testHarness.initializeState(fileNameAndModTime.f0); testHarness.open(); - Assert.assertEquals((long) expectedModTime, monitoringFunction.getGlobalModificationTime()); + Assert.assertEquals( + fileNameAndModTime.f1.longValue(), monitoringFunction.getGlobalModificationTime()); + } + + private Tuple2<String, Long> getResourceFilename(FlinkVersion version) throws IOException { + String resourceDirectory = OperatorSnapshotUtil.getResourceFilename(""); + Pattern fileNamePattern = + Pattern.compile( + "monitoring-function-migration-test-(\\d+)-flink" + + version.toString() + + "-snapshot"); + + for (java.nio.file.Path file : Files.newDirectoryStream(Paths.get(resourceDirectory))) { + String fileName = file.getFileName().toString(); + Matcher matcher = fileNamePattern.matcher(fileName); + if (matcher.matches()) { + return new Tuple2<>(file.toString(), Long.parseLong(matcher.group(1))); + } + } + + throw new RuntimeException("The snapshot for " + version + " not found"); Review Comment: ```suggestion throw new IllegalArgumentException("The snapshot for " + version + " not found"); ``` `IllegalArgumentException` seems to be a better fit here. WDYT? ########## flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/migration/MigrationTestsSnapshotGenerator.java: ########## @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.migration; + +import org.apache.flink.FlinkVersion; +import org.apache.flink.test.util.MigrationTest; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * This class aims to generate the snapshots for all the state migration tests. A migration test + * should implement {@link MigrationTest} interface and its name should match {@code + * *(Test|ITCase)(.java|.scala)}. For scala tests, we also require the class name is the same with + * the containing filename. + */ +public class MigrationTestsSnapshotGenerator { + + private static final Logger LOG = + LoggerFactory.getLogger(MigrationTestsSnapshotGenerator.class); + + private static final String DEFAULT_PATH_PREFIXES = "src/test/java,src/test/scala"; + + private static final Option OPTION_HELP = + Option.builder() + .longOpt("help") + .required(false) + .hasArg(false) + .desc("print this help information.") + .build(); + + private static final Option OPTION_DIR = + Option.builder() + .longOpt("dir") + .required() + .hasArg() + .desc("The root directory for scanning. Required.") + .build(); + + private static final Option OPTION_VERSION = + Option.builder() + .longOpt("version") + .required() + .hasArg() + .desc("The version to generate. Required.") + .build(); + + private static final Option OPTION_PREFIXES = + Option.builder() + .longOpt("prefixes") + .required(false) + .hasArg() + .desc( + "The prefix paths to scan under the root directory, separated by \",\". Default to \"" + + DEFAULT_PATH_PREFIXES + + "\"") + .build(); + + private static final Option OPTION_CLASSES = + Option.builder() + .longOpt("classes") + .required(false) + .hasArg() + .desc( + "The specified qualified class name to generate test data, " + + "separated by \",\". This option has a higher priority than the prefix option.") + .build(); + + private static final Pattern VERSION_PATTERN = Pattern.compile("v?([0-9]+)[._]([0-9]+)"); + + private static final String CLASS_NAME_GROUP = "className"; + private static final Pattern CLASS_NAME_PATTERN = + Pattern.compile("(?<" + CLASS_NAME_GROUP + ">[a-zA-Z0-9]*(Test|ITCase))(.java|.scala)"); + + public static void main(String[] args) { + for (String s : args) { + if (s.equals("--" + OPTION_HELP.getLongOpt())) { + HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.setOptionComparator(null); + helpFormatter.printHelp( + "java " + MigrationTestsSnapshotGenerator.class.getName(), createOptions()); + return; + } + } + + try { + Options options = createOptions(); + CommandLineParser parser = new DefaultParser(); + CommandLine commandLine = parser.parse(options, args); + + File rootDirectory = new File(commandLine.getOptionValue(OPTION_DIR)); + if (!rootDirectory.exists() || !rootDirectory.isDirectory()) { + throw new FileNotFoundException( + rootDirectory + " does not exist or is not a directory."); + } + + String versionName = commandLine.getOptionValue(OPTION_VERSION); + Matcher versionMatcher = VERSION_PATTERN.matcher(versionName); + if (!versionMatcher.matches()) { + throw new IllegalArgumentException( + "Version " + + versionName + + "could not be parsed, " + + "please specify the version with format like 1.17, 1_17, v1_17, v1.17"); + } + FlinkVersion version = + FlinkVersion.valueOf( + Integer.parseInt(versionMatcher.group(1)), + Integer.parseInt(versionMatcher.group(2))); + + LOG.info( + "Start test data generating for module {} and version {}", + rootDirectory, + version); + List<Class<?>> migrationTests; + if (commandLine.hasOption(OPTION_CLASSES)) { + migrationTests = + loadSpecifiedMigrationTests( + commandLine.getOptionValue(OPTION_CLASSES).split(",\\s*")); + } else { + String[] prefixes = + commandLine + .getOptionValue(OPTION_PREFIXES, DEFAULT_PATH_PREFIXES) + .split(",\\s*"); + migrationTests = findMigrationTests(rootDirectory.getAbsolutePath(), prefixes); + } + + for (Class<?> migrationTestClass : migrationTests) { + LOG.info("Start test data generating for {}", migrationTestClass.getName()); + SnapshotGeneratorUtils.executeGenerate(migrationTestClass, version); + LOG.info("Finish test data generating for {}", migrationTestClass.getName()); + } + } catch (Throwable e) { + LOG.error("Failed to generate snapshots for the state migration tests", e); + throw new RuntimeException(e); + } + } + + private static Options createOptions() { + Options options = new Options(); + + // The root directory for the scanning + options.addOption(OPTION_HELP); + options.addOption(OPTION_DIR); + options.addOption(OPTION_VERSION); + options.addOption(OPTION_PREFIXES); + options.addOption(OPTION_CLASSES); + return options; + } + + private static List<Class<?>> loadSpecifiedMigrationTests(String[] specifiedClassNames) + throws Exception { + List<Class<?>> migrationTestClasses = new ArrayList<>(); + for (String name : specifiedClassNames) { + Class<?> clazz = Class.forName(name); + if (!MigrationTest.class.isAssignableFrom(clazz)) { + throw new IllegalArgumentException("Class " + name + " is not migration test."); + } + migrationTestClasses.add(clazz); + } + return migrationTestClasses; + } + + private static List<Class<?>> findMigrationTests(String rootDirectory, String[] prefixes) + throws Exception { + List<Class<?>> migrationTestClasses = new ArrayList<>(); + for (String prefix : prefixes) { + recursivelyFindMigrationTests(rootDirectory, prefix, "", migrationTestClasses); + } + + return migrationTestClasses; + } + + private static void recursivelyFindMigrationTests( + String rootDirectory, String prefix, String packageName, List<Class<?>> result) + throws Exception { + File codeDirectory = new File(rootDirectory + File.separator + prefix); + if (!codeDirectory.exists()) { + return; + } + + // Search all the directories + try (DirectoryStream<Path> stream = Files.newDirectoryStream(codeDirectory.toPath())) { + for (Path entry : stream) { + File file = entry.toFile(); + if (file.isDirectory()) { + recursivelyFindMigrationTests( + rootDirectory, + prefix + File.separator + file.getName(), + packageName.isEmpty() + ? file.getName() + : packageName + "." + file.getName(), + result); + } else { + Matcher m = CLASS_NAME_PATTERN.matcher(file.getName()); + if (m.matches()) { + String className = packageName + "." + m.group(CLASS_NAME_GROUP); + + // TODO: For now we require the class name to match the file name + // for both java and scala. Review Comment: ```suggestion // For now we require the class name to match the file name for both java // and scala. ``` What's the purpose of the TODO here? Shall we create a ticket instead? Or would it be ok to just remove it? ########## flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/migration/MigrationTestsSnapshotGenerator.java: ########## @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.migration; + +import org.apache.flink.FlinkVersion; +import org.apache.flink.test.util.MigrationTest; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * This class aims to generate the snapshots for all the state migration tests. A migration test + * should implement {@link MigrationTest} interface and its name should match {@code + * *(Test|ITCase)(.java|.scala)}. For scala tests, we also require the class name is the same with + * the containing filename. + */ +public class MigrationTestsSnapshotGenerator { + + private static final Logger LOG = + LoggerFactory.getLogger(MigrationTestsSnapshotGenerator.class); + + private static final String DEFAULT_PATH_PREFIXES = "src/test/java,src/test/scala"; + + private static final Option OPTION_HELP = + Option.builder() + .longOpt("help") + .required(false) + .hasArg(false) + .desc("print this help information.") + .build(); + + private static final Option OPTION_DIR = + Option.builder() + .longOpt("dir") + .required() + .hasArg() + .desc("The root directory for scanning. Required.") + .build(); + + private static final Option OPTION_VERSION = + Option.builder() + .longOpt("version") + .required() + .hasArg() + .desc("The version to generate. Required.") + .build(); + + private static final Option OPTION_PREFIXES = + Option.builder() + .longOpt("prefixes") + .required(false) + .hasArg() + .desc( + "The prefix paths to scan under the root directory, separated by \",\". Default to \"" + + DEFAULT_PATH_PREFIXES + + "\"") + .build(); + + private static final Option OPTION_CLASSES = + Option.builder() + .longOpt("classes") + .required(false) + .hasArg() + .desc( + "The specified qualified class name to generate test data, " + + "separated by \",\". This option has a higher priority than the prefix option.") + .build(); + + private static final Pattern VERSION_PATTERN = Pattern.compile("v?([0-9]+)[._]([0-9]+)"); + + private static final String CLASS_NAME_GROUP = "className"; + private static final Pattern CLASS_NAME_PATTERN = + Pattern.compile("(?<" + CLASS_NAME_GROUP + ">[a-zA-Z0-9]*(Test|ITCase))(.java|.scala)"); + + public static void main(String[] args) { + for (String s : args) { + if (s.equals("--" + OPTION_HELP.getLongOpt())) { + HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.setOptionComparator(null); + helpFormatter.printHelp( + "java " + MigrationTestsSnapshotGenerator.class.getName(), createOptions()); + return; + } + } + + try { + Options options = createOptions(); + CommandLineParser parser = new DefaultParser(); + CommandLine commandLine = parser.parse(options, args); + + File rootDirectory = new File(commandLine.getOptionValue(OPTION_DIR)); + if (!rootDirectory.exists() || !rootDirectory.isDirectory()) { + throw new FileNotFoundException( + rootDirectory + " does not exist or is not a directory."); + } + + String versionName = commandLine.getOptionValue(OPTION_VERSION); + Matcher versionMatcher = VERSION_PATTERN.matcher(versionName); + if (!versionMatcher.matches()) { + throw new IllegalArgumentException( + "Version " + + versionName + + "could not be parsed, " + + "please specify the version with format like 1.17, 1_17, v1_17, v1.17"); + } + FlinkVersion version = + FlinkVersion.valueOf( + Integer.parseInt(versionMatcher.group(1)), + Integer.parseInt(versionMatcher.group(2))); + + LOG.info( + "Start test data generating for module {} and version {}", + rootDirectory, + version); + List<Class<?>> migrationTests; + if (commandLine.hasOption(OPTION_CLASSES)) { + migrationTests = + loadSpecifiedMigrationTests( + commandLine.getOptionValue(OPTION_CLASSES).split(",\\s*")); + } else { + String[] prefixes = + commandLine + .getOptionValue(OPTION_PREFIXES, DEFAULT_PATH_PREFIXES) + .split(",\\s*"); + migrationTests = findMigrationTests(rootDirectory.getAbsolutePath(), prefixes); + } + + for (Class<?> migrationTestClass : migrationTests) { + LOG.info("Start test data generating for {}", migrationTestClass.getName()); + SnapshotGeneratorUtils.executeGenerate(migrationTestClass, version); + LOG.info("Finish test data generating for {}", migrationTestClass.getName()); + } + } catch (Throwable e) { + LOG.error("Failed to generate snapshots for the state migration tests", e); + throw new RuntimeException(e); + } + } + + private static Options createOptions() { + Options options = new Options(); + + // The root directory for the scanning + options.addOption(OPTION_HELP); + options.addOption(OPTION_DIR); + options.addOption(OPTION_VERSION); + options.addOption(OPTION_PREFIXES); + options.addOption(OPTION_CLASSES); + return options; + } + + private static List<Class<?>> loadSpecifiedMigrationTests(String[] specifiedClassNames) + throws Exception { + List<Class<?>> migrationTestClasses = new ArrayList<>(); + for (String name : specifiedClassNames) { + Class<?> clazz = Class.forName(name); + if (!MigrationTest.class.isAssignableFrom(clazz)) { + throw new IllegalArgumentException("Class " + name + " is not migration test."); + } + migrationTestClasses.add(clazz); + } + return migrationTestClasses; + } + + private static List<Class<?>> findMigrationTests(String rootDirectory, String[] prefixes) + throws Exception { + List<Class<?>> migrationTestClasses = new ArrayList<>(); + for (String prefix : prefixes) { + recursivelyFindMigrationTests(rootDirectory, prefix, "", migrationTestClasses); + } + + return migrationTestClasses; + } + + private static void recursivelyFindMigrationTests( + String rootDirectory, String prefix, String packageName, List<Class<?>> result) + throws Exception { + File codeDirectory = new File(rootDirectory + File.separator + prefix); + if (!codeDirectory.exists()) { + return; Review Comment: ```suggestion LOG.debug("{} doesn't exist and will be skipped.", codeDirectory); return; ``` nit: as a proposal ########## flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala: ########## @@ -38,78 +38,95 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.{ExecutionMode, SnapshotSpec, SnapshotType} +import org.apache.flink.test.util.MigrationTest +import org.apache.flink.test.util.MigrationTest.ParameterizedSnapshotsGenerator import org.apache.flink.util.Collector import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized +import javax.annotation.Nullable + import java.util -import java.util.stream.Collectors +import java.util.Collections +import java.util.function.BiFunction import scala.util.{Failure, Try} object StatefulJobSavepointMigrationITCase { - // TODO increase this to newer version to create and test snapshot migration for newer versions - val currentVersion = FlinkVersion.v1_16 - - // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots - // TODO Note: You should generate the snapshot based on the release branch instead of the - // master. - val executionMode = ExecutionMode.VERIFY_SNAPSHOT - @Parameterized.Parameters(name = "Test snapshot: {0}") - def parameters: util.Collection[SnapshotSpec] = { + def createSpecsForTestRuns: util.Collection[SnapshotSpec] = + internalParameters(null) + + def createSpecsForTestDataGeneration(version: FlinkVersion): util.Collection[SnapshotSpec] = + internalParameters(version) + + private def internalParameters( + @Nullable targetGeneratingVersion: FlinkVersion): util.Collection[SnapshotSpec] = { + val getFlinkVersions = + new BiFunction[FlinkVersion, FlinkVersion, util.Collection[FlinkVersion]] { + override def apply( + minInclVersion: FlinkVersion, + maxInclVersion: FlinkVersion): util.Collection[FlinkVersion] = if ( + targetGeneratingVersion != null + ) + Collections.singleton(targetGeneratingVersion) + else + FlinkVersion.rangeOf(minInclVersion, maxInclVersion) + } + // Note: It is not safe to restore savepoints created in a Scala applications with Flink // version 1.7 or below. The reason is that up to version 1.7 the underlying Scala serializer // used names of anonymous classes that depend on the relative position/order in code, e.g., // if two anonymous classes, instantiated inside the same class and from the same base class, // change order in the code their names are switched. // As a consequence, changes in code may result in restore failures. // This was fixed in version 1.8, see: https://issues.apache.org/jira/browse/FLINK-10493 - var parameters: util.List[SnapshotSpec] = new util.LinkedList[SnapshotSpec]() + val parameters: util.List[SnapshotSpec] = new util.LinkedList[SnapshotSpec]() parameters.addAll( SnapshotSpec.withVersions( StateBackendLoader.MEMORY_STATE_BACKEND_NAME, SnapshotType.SAVEPOINT_CANONICAL, - FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_13))) + getFlinkVersions.apply(FlinkVersion.v1_8, FlinkVersion.v1_13) Review Comment: Here as well: The old code filters for the most-recently published version. The new code doesn't do that anymore. ########## flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java: ########## @@ -104,16 +123,32 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger { .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM) .build()); - private final boolean allowNonRestoredState; private final ScheduledExecutor scheduledExecutor = new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()); - protected AbstractOperatorRestoreTestBase() { - this(true); + protected AbstractOperatorRestoreTestBase(FlinkVersion flinkVersion) { + this.flinkVersion = flinkVersion; } - protected AbstractOperatorRestoreTestBase(boolean allowNonRestoredState) { - this.allowNonRestoredState = allowNonRestoredState; + protected void internalGenerateSnapshots(FlinkVersion targetVersion) throws Exception { Review Comment: I'm a bit puzzled here: Where is the data generation been done in the old code? :thinking: ########## flink-test-utils-parent/flink-migration-test-utils/README.md: ########## @@ -0,0 +1,103 @@ +# Add State Migration Tests + +This module collects tools that help to generate test data for the state migration tests. + +The following dependency need to be added to the module's Maven config in case a +migration test is meant to be added to that module: + +```xml +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>fink-migration-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> +</dependency> +``` + +and the following profile + +```xml +<profile> + <id>generate-migration-test-data</id> + <activation> + <property> + <name>generate-migration-test-data</name> + </property> + </activation> + <build> + <plugins> + <plugin> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <id>generate-migration-test-data</id> + <phase>package</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <target> + <condition property="optional.classes" value="--classes '${generate.classes}'" + else=""> + <isset property="generate.classes"/> + </condition> + <condition property="optional.prefixes" + value="--prefixes '${generate.prefixes}'" else=""> + <isset property="generate.prefixes"/> + </condition> + <java classname="org.apache.flink.test.migration.MigrationTestsSnapshotGenerator" + fork="true" failonerror="true" dir="${project.basedir}"> + <classpath refid="maven.test.classpath"/> + <arg value="--dir"/> + <arg line="${project.basedir}"/> + <arg value="--version"/> + <arg value="${generate.version}"/> + <arg line="${optional.classes}"/> + <arg line="${optional.prefixes}"/> + </java> + </target> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</profile> +``` + +To show the log during generating, add + +``` +logger.migration.name = org.apache.flink.test.migration +logger.migration.level = INFO +``` + +to the `log4j2-test.properties` of this module. + +The state migration tests should satisfy + +1. The tests are named like `*(Test|ITCase).(java|scala)`. +2. The test class name is the same with the file name. +3. The test implements `org.apache.flink.test.util.MigrationTest` and the snapshots generator methods are labeled + with `@SnapshotsGenerato` or `@ParameterizedSnapshotsGenerator`. + +# Generating Snapshots + +To generating the snapshots for all the tests, execute Review Comment: ```suggestion To generate the snapshots for all the tests, execute ``` ########## flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/migration/SnapshotGeneratorUtils.java: ########## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.migration; + +import org.apache.flink.FlinkVersion; +import org.apache.flink.test.util.MigrationTest; + +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.rules.ExternalResource; +import org.junit.rules.RunRules; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.Parameterized; +import org.junit.runners.model.Statement; + +import java.lang.annotation.Annotation; +import java.lang.reflect.Array; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** A utility class to execute generateSnapshots method for migration tests. */ +class SnapshotGeneratorUtils { + + static void executeGenerate(Class<?> migrationTestClass, FlinkVersion flinkVersion) + throws Throwable { + for (Method method : migrationTestClass.getMethods()) { + if (method.isAnnotationPresent(MigrationTest.SnapshotsGenerator.class)) { + executeGenerateMethods(migrationTestClass, method, flinkVersion); + } else if (method.isAnnotationPresent( + MigrationTest.ParameterizedSnapshotsGenerator.class)) { + String parametersMethodName = + method.getAnnotation(MigrationTest.ParameterizedSnapshotsGenerator.class) + .value(); + Method parametersMethod = + migrationTestClass.getMethod(parametersMethodName, FlinkVersion.class); + executeParameterizedGenerateMethods( + migrationTestClass, method, parametersMethod, flinkVersion); + } + } + } + + private static void executeGenerateMethods( + Class<?> migrationTestClass, Method method, FlinkVersion version) throws Throwable { + method.setAccessible(true); + List<TestRule> classRules = getRuleFields(migrationTestClass, ClassRule.class, null); + + executeWithRules( + classRules, + new Statement() { + @Override + public void evaluate() throws Throwable { + Object migrationTest = createMigrationTest(migrationTestClass); + List<TestRule> rules = + getRuleFields(migrationTestClass, Rule.class, migrationTest); + executeWithRules( + rules, + new Statement() { + @Override + public void evaluate() throws Throwable { + method.invoke(migrationTest, version); + } + }); + } + }); + } + + private static void executeParameterizedGenerateMethods( + Class<?> migrationTestClass, + Method method, + Method parametersMethod, + FlinkVersion version) + throws Throwable { + method.setAccessible(true); + parametersMethod.setAccessible(true); + List<TestRule> classRules = getRuleFields(migrationTestClass, ClassRule.class, null); + + executeWithRules( + classRules, + new Statement() { + @Override + public void evaluate() throws Throwable { + Object migrationTest = createMigrationTest(migrationTestClass); + List<TestRule> rules = + getRuleFields(migrationTestClass, Rule.class, migrationTest); + Collection<?> arguments = + (Collection<?>) parametersMethod.invoke(migrationTest, version); + for (Object argument : arguments) { + executeWithRules( + rules, + new Statement() { + @Override + public void evaluate() throws Throwable { + method.invoke(migrationTest, argument); + } + }); + } + } + }); + } + + private static void executeWithRules(List<TestRule> rules, Statement statement) + throws Throwable { + new RunRules(statement, rules, Description.EMPTY).evaluate(); + } + + private static List<TestRule> getRuleFields( + Class<?> migrationTestClass, Class<? extends Annotation> tagClass, Object migrationTest) + throws IllegalAccessException { + List<TestRule> rules = new ArrayList<>(); + + Field[] fields = migrationTestClass.getFields(); + for (Field field : fields) { + if (ExternalResource.class.isAssignableFrom(field.getType()) + && field.isAnnotationPresent(tagClass)) { + field.setAccessible(true); + rules.add((ExternalResource) field.get(migrationTest)); + } + } + + return rules; + } + + private static Object createMigrationTest(Class<?> migrationTestClass) throws Exception { + Constructor<?>[] constructors = migrationTestClass.getDeclaredConstructors(); + for (Constructor<?> constructor : constructors) { + if (constructor.getParameterCount() == 0) { + constructor.setAccessible(true); + return constructor.newInstance(); + } + } + + // This class does not have a constructor without argument. + Constructor<?> constructor = constructors[0]; Review Comment: What if there are multiple constructors with arguments? :thinking: ########## flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/util/MigrationTest.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.FlinkVersion; +import org.apache.flink.test.migration.PublishedVersionUtils; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** Interface for state migration tests. */ +public interface MigrationTest { + + static FlinkVersion getMostRecentlyPublishedVersion() { + return PublishedVersionUtils.getMostRecentlyPublishedVersion(); + } + + /** + * Marks a method as snapshots generator. The method should be like + * + * <pre> + * {@literal @}SnapshotsGenerator + * void function(FlinkVersion version) {} + * </pre> + */ + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + @interface SnapshotsGenerator {} + + /** + * Marks a method as parameterized snapshots generator. The value should be the method that + * returns Collection of arguments. + * + * <p>The method generating parameters should be like + * + * <pre> + * Collection<?> parameters(FlinkVersion version) {} + * </pre> + * + * <p>The generator method should be like + * + * <pre> + * {@literal @}ParameterizedSnapshotsGenerator Review Comment: ```suggestion * {@literal @}ParameterizedSnapshotsGenerator("parameters") ``` Even though, I'd say that using a different method name ("generateMethodParameters" instead of "parameters") might make the docs more explicit. ########## flink-test-utils-parent/flink-migration-test-utils/README.md: ########## @@ -0,0 +1,103 @@ +# Add State Migration Tests + +This module collects tools that help to generate test data for the state migration tests. + +The following dependency need to be added to the module's Maven config in case a +migration test is meant to be added to that module: + +```xml +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>fink-migration-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> +</dependency> +``` + +and the following profile + +```xml +<profile> + <id>generate-migration-test-data</id> + <activation> + <property> + <name>generate-migration-test-data</name> + </property> + </activation> + <build> + <plugins> + <plugin> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <id>generate-migration-test-data</id> + <phase>package</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <target> + <condition property="optional.classes" value="--classes '${generate.classes}'" + else=""> + <isset property="generate.classes"/> + </condition> + <condition property="optional.prefixes" + value="--prefixes '${generate.prefixes}'" else=""> + <isset property="generate.prefixes"/> + </condition> + <java classname="org.apache.flink.test.migration.MigrationTestsSnapshotGenerator" + fork="true" failonerror="true" dir="${project.basedir}"> + <classpath refid="maven.test.classpath"/> + <arg value="--dir"/> + <arg line="${project.basedir}"/> + <arg value="--version"/> + <arg value="${generate.version}"/> + <arg line="${optional.classes}"/> + <arg line="${optional.prefixes}"/> + </java> + </target> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</profile> +``` + +To show the log during generating, add + +``` +logger.migration.name = org.apache.flink.test.migration +logger.migration.level = INFO +``` + +to the `log4j2-test.properties` of this module. + +The state migration tests should satisfy + +1. The tests are named like `*(Test|ITCase).(java|scala)`. +2. The test class name is the same with the file name. +3. The test implements `org.apache.flink.test.util.MigrationTest` and the snapshots generator methods are labeled + with `@SnapshotsGenerato` or `@ParameterizedSnapshotsGenerator`. + +# Generating Snapshots + +To generating the snapshots for all the tests, execute Review Comment: ```suggestion To generating the snapshots for all the tests, execute from within the target version's release branch: ``` ########## flink-test-utils-parent/flink-migration-test-utils/README.md: ########## @@ -0,0 +1,103 @@ +# Add State Migration Tests + +This module collects tools that help to generate test data for the state migration tests. + +The following dependency need to be added to the module's Maven config in case a +migration test is meant to be added to that module: + +```xml +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>fink-migration-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> +</dependency> +``` + +and the following profile + +```xml +<profile> + <id>generate-migration-test-data</id> + <activation> + <property> + <name>generate-migration-test-data</name> + </property> + </activation> + <build> + <plugins> + <plugin> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <id>generate-migration-test-data</id> + <phase>package</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <target> + <condition property="optional.classes" value="--classes '${generate.classes}'" + else=""> + <isset property="generate.classes"/> + </condition> + <condition property="optional.prefixes" + value="--prefixes '${generate.prefixes}'" else=""> + <isset property="generate.prefixes"/> + </condition> + <java classname="org.apache.flink.test.migration.MigrationTestsSnapshotGenerator" + fork="true" failonerror="true" dir="${project.basedir}"> + <classpath refid="maven.test.classpath"/> + <arg value="--dir"/> + <arg line="${project.basedir}"/> + <arg value="--version"/> + <arg value="${generate.version}"/> + <arg line="${optional.classes}"/> + <arg line="${optional.prefixes}"/> + </java> + </target> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</profile> +``` + +To show the log during generating, add + +``` +logger.migration.name = org.apache.flink.test.migration +logger.migration.level = INFO +``` + +to the `log4j2-test.properties` of this module. + +The state migration tests should satisfy + +1. The tests are named like `*(Test|ITCase).(java|scala)`. +2. The test class name is the same with the file name. +3. The test implements `org.apache.flink.test.util.MigrationTest` and the snapshots generator methods are labeled + with `@SnapshotsGenerato` or `@ParameterizedSnapshotsGenerator`. + +# Generating Snapshots + +To generating the snapshots for all the tests, execute + +```shell +mvn clean package -Pgenerate-migration-test-data -Dgenerate.version=1.17 -nsu -Dfast -DskipTests +``` + +The version should be replaced with the target one. + +By default, it will search for the migration tests under `src/test/java` and `src/test/scala`. It is also supported +to change the default search paths or only generating for the specific classes: Review Comment: ```suggestion to change the default search paths or only generating for specific classes: ``` ########## flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerUpgradeTest.java: ########## @@ -38,87 +38,89 @@ class PojoSerializerUpgradeTest extends TypeSerializerUpgradeTestBase<Object, Ob testVersions.add(FlinkVersion.v1_7); testVersions.add(FlinkVersion.v1_8); testVersions.add(FlinkVersion.v1_9); - testVersions.addAll(MIGRATION_VERSIONS); - for (FlinkVersion flinkVersion : testVersions) { - testSpecifications.add( - new TestSpecification<>( - "pojo-serializer-identical-schema", - flinkVersion, - PojoSerializerUpgradeTestSpecifications.IdenticalPojoSchemaSetup.class, - PojoSerializerUpgradeTestSpecifications.IdenticalPojoSchemaVerifier - .class)); - testSpecifications.add( - new TestSpecification<>( - "pojo-serializer-with-modified-schema", - flinkVersion, - PojoSerializerUpgradeTestSpecifications.ModifiedPojoSchemaSetup.class, - PojoSerializerUpgradeTestSpecifications.ModifiedPojoSchemaVerifier - .class)); - testSpecifications.add( - new TestSpecification<>( - "pojo-serializer-with-different-field-types", - flinkVersion, - PojoSerializerUpgradeTestSpecifications - .DifferentFieldTypePojoSchemaSetup.class, - PojoSerializerUpgradeTestSpecifications - .DifferentFieldTypePojoSchemaVerifier.class)); - testSpecifications.add( - new TestSpecification<>( - "pojo-serializer-with-modified-schema-in-registered-subclass", - flinkVersion, - PojoSerializerUpgradeTestSpecifications - .ModifiedRegisteredPojoSubclassSchemaSetup.class, - PojoSerializerUpgradeTestSpecifications - .ModifiedRegisteredPojoSubclassSchemaVerifier.class)); - testSpecifications.add( - new TestSpecification<>( - "pojo-serializer-with-different-field-types-in-registered-subclass", - flinkVersion, - PojoSerializerUpgradeTestSpecifications - .DifferentFieldTypePojoSubclassSchemaSetup.class, - PojoSerializerUpgradeTestSpecifications - .DifferentFieldTypePojoSubclassSchemaVerifier.class)); - testSpecifications.add( - new TestSpecification<>( - "pojo-serializer-with-non-registered-subclass", - flinkVersion, - PojoSerializerUpgradeTestSpecifications.NonRegisteredPojoSubclassSetup - .class, - PojoSerializerUpgradeTestSpecifications - .NonRegisteredPojoSubclassVerifier.class)); - testSpecifications.add( - new TestSpecification<>( - "pojo-serializer-with-different-subclass-registration-order", - flinkVersion, - PojoSerializerUpgradeTestSpecifications - .DifferentPojoSubclassRegistrationOrderSetup.class, - PojoSerializerUpgradeTestSpecifications - .DifferentPojoSubclassRegistrationOrderVerifier.class)); - testSpecifications.add( - new TestSpecification<>( - "pojo-serializer-with-missing-registered-subclass", - flinkVersion, - PojoSerializerUpgradeTestSpecifications - .MissingRegisteredPojoSubclassSetup.class, - PojoSerializerUpgradeTestSpecifications - .MissingRegisteredPojoSubclassVerifier.class)); - testSpecifications.add( - new TestSpecification<>( - "pojo-serializer-with-new-registered-subclass", - flinkVersion, - PojoSerializerUpgradeTestSpecifications.NewRegisteredPojoSubclassSetup - .class, - PojoSerializerUpgradeTestSpecifications - .NewRegisteredPojoSubclassVerifier.class)); - testSpecifications.add( - new TestSpecification<>( - "pojo-serializer-with-new-and-missing-registered-subclasses", - flinkVersion, - PojoSerializerUpgradeTestSpecifications - .NewAndMissingRegisteredPojoSubclassesSetup.class, - PojoSerializerUpgradeTestSpecifications - .NewAndMissingRegisteredPojoSubclassesVerifier.class)); - } + testVersions.addAll(super.getMigrationVersions()); + return testVersions; + } + + public Collection<TestSpecification<?, ?>> createTestSpecifications(FlinkVersion flinkVersion) + throws Exception { + ArrayList<TestSpecification<?, ?>> testSpecifications = new ArrayList<>(); Review Comment: ```suggestion Collection<TestSpecification<?, ?>> testSpecifications = new ArrayList<>(); ``` nit: We should use interfaces rather than classes as type in a variable declaration. ########## flink-test-utils-parent/flink-migration-test-utils/README.md: ########## @@ -0,0 +1,103 @@ +# Add State Migration Tests + +This module collects tools that help to generate test data for the state migration tests. + +The following dependency need to be added to the module's Maven config in case a +migration test is meant to be added to that module: + +```xml +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>fink-migration-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> +</dependency> +``` + +and the following profile + +```xml +<profile> + <id>generate-migration-test-data</id> + <activation> + <property> + <name>generate-migration-test-data</name> + </property> + </activation> Review Comment: What's the purpose of this property condition? In the documentation below, we explicitly enable the profile through the `-P` parameter. Do we want to support other means of enabling the profile or is this `<activation/>` added accidentally? ########## flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java: ########## @@ -465,23 +471,28 @@ private static <T> void assertSerializerIsValid( /** Paths to use during snapshot generation, which should only use the CURRENT_VERSION. */ private Path getGenerateSerializerSnapshotFilePath( TestSpecification<PreviousElementT, UpgradedElementT> testSpecification) { - return Paths.get(getGenerateResourceDirectory(testSpecification) + "/serializer-snapshot"); + return Paths.get( + getGenerateResourceDirectory(testSpecification, testSpecification.flinkVersion) + + "/serializer-snapshot"); } /** Paths to use during snapshot generation, which should only use the CURRENT_VERSION. */ private Path getGenerateDataFilePath( TestSpecification<PreviousElementT, UpgradedElementT> testSpecification) { - return Paths.get(getGenerateResourceDirectory(testSpecification) + "/test-data"); + return Paths.get( + getGenerateResourceDirectory(testSpecification, testSpecification.flinkVersion) + + "/test-data"); } /** Paths to use during snapshot generation, which should only use the CURRENT_VERSION. */ private String getGenerateResourceDirectory( - TestSpecification<PreviousElementT, UpgradedElementT> testSpecification) { + TestSpecification<PreviousElementT, UpgradedElementT> testSpecification, + FlinkVersion currentVersion) { Review Comment: Isn't the `currentVersion` redundant here because it's provided by `testSpecification` as well? ########## flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerUpgradeTest.java: ########## @@ -42,22 +42,26 @@ @VisibleForTesting public class RowSerializerUpgradeTest extends TypeSerializerUpgradeTestBase<Row, Row> { - public Collection<TestSpecification<?, ?>> createTestSpecifications() throws Exception { - ArrayList<TestSpecification<?, ?>> testSpecifications = new ArrayList<>(); + @Override + public Collection<FlinkVersion> getMigrationVersions() { // for RowSerializer we also test against 1.10 and newer because we have snapshots // for this which go beyond what we have for the usual subclasses of // TypeSerializerUpgradeTestBase List<FlinkVersion> testVersions = new ArrayList<>(); testVersions.add(FlinkVersion.v1_10); - testVersions.addAll(MIGRATION_VERSIONS); - for (FlinkVersion flinkVersion : testVersions) { - testSpecifications.add( - new TestSpecification<>( - "row-serializer", - flinkVersion, - RowSerializerSetup.class, - RowSerializerVerifier.class)); - } + testVersions.addAll(super.getMigrationVersions()); + return testVersions; + } + + public Collection<TestSpecification<?, ?>> createTestSpecifications(FlinkVersion flinkVersion) + throws Exception { + ArrayList<TestSpecification<?, ?>> testSpecifications = new ArrayList<>(); Review Comment: ```suggestion Collection<TestSpecification<?, ?>> testSpecifications = new ArrayList<>(); ``` Alternatively, you could also use `Collections.singleton`: ``` return Collections.singleton( new TestSpecification<>( "row-serializer", flinkVersion, RowSerializerSetup.class, RowSerializerVerifier.class)); ``` ########## flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java: ########## @@ -309,17 +288,34 @@ public void testMonitoringSourceRestore() throws Exception { testHarness.setup(); - testHarness.initializeState( - OperatorSnapshotUtil.getResourceFilename( - "monitoring-function-migration-test-" - + expectedModTime - + "-flink" - + testMigrateVersion - + "-snapshot")); + // Get the exact filename + Tuple2<String, Long> fileNameAndModTime = getResourceFilename(testMigrateVersion); + + testHarness.initializeState(fileNameAndModTime.f0); testHarness.open(); - Assert.assertEquals((long) expectedModTime, monitoringFunction.getGlobalModificationTime()); + Assert.assertEquals( + fileNameAndModTime.f1.longValue(), monitoringFunction.getGlobalModificationTime()); + } + + private Tuple2<String, Long> getResourceFilename(FlinkVersion version) throws IOException { + String resourceDirectory = OperatorSnapshotUtil.getResourceFilename(""); + Pattern fileNamePattern = + Pattern.compile( + "monitoring-function-migration-test-(\\d+)-flink" Review Comment: I'm kind of fascinated how complex this class was implemented. Why did we store the mod time in the class in the first place. Well done getting rid of the class fields, at least :+1: ########## flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase.java: ########## @@ -60,62 +64,80 @@ * previous Flink versions, as well as for different state backends. */ @RunWith(Parameterized.class) -public class StatefulJobSnapshotMigrationITCase extends SnapshotMigrationTestBase { +public class StatefulJobSnapshotMigrationITCase extends SnapshotMigrationTestBase + implements MigrationTest { private static final int NUM_SOURCE_ELEMENTS = 4; - // TODO increase this to newer version to create and test snapshot migration for newer versions - private static final FlinkVersion currentVersion = FlinkVersion.v1_16; + @Parameterized.Parameters(name = "Test snapshot: {0}") + public static Collection<SnapshotSpec> createSpecsForTestRuns() { + return internalParameters(null); + } + + public static Collection<SnapshotSpec> createSpecsForTestDataGeneration( + FlinkVersion targetVersion) { + return internalParameters(targetVersion); + } - // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots - // TODO Note: You should generate the snapshot based on the release branch instead of the - // master. - private static final ExecutionMode executionMode = ExecutionMode.VERIFY_SNAPSHOT; + private static Collection<SnapshotSpec> internalParameters( + @Nullable FlinkVersion targetGeneratingVersion) { + BiFunction<FlinkVersion, FlinkVersion, Collection<FlinkVersion>> getFlinkVersions = + (minInclVersion, maxInclVersion) -> { + if (targetGeneratingVersion != null) { + return Collections.singleton(targetGeneratingVersion); + } else { + return FlinkVersion.rangeOf(minInclVersion, maxInclVersion); + } + }; - @Parameterized.Parameters(name = "Test snapshot: {0}") - public static Collection<SnapshotSpec> parameters() { Collection<SnapshotSpec> parameters = new LinkedList<>(); parameters.addAll( SnapshotSpec.withVersions( StateBackendLoader.MEMORY_STATE_BACKEND_NAME, SnapshotType.SAVEPOINT_CANONICAL, - FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_14))); + getFlinkVersions.apply(FlinkVersion.v1_8, FlinkVersion.v1_14))); Review Comment: I noticed that we're doing something wrong in this specific case: The old code generated the `SnapshotSpec` instances for each version and then filtered out the once that have the "current version" (i.e. `getMostRecentlyPublishedVersion`). But for the first entries (i.e. `StateBackendLoader.MEMORY_STATE_BACKEND_NAME`/`SnapshotType.SAVEPOINT_CANONICAL`), we didn't add such a `SnapshotSpec` for the most-recently published version and, therefore, didn't create a `SnapshotSpec` for that version. With the new code, we do create it. WDYT? ########## flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobWBroadcastStateMigrationITCase.java: ########## @@ -36,81 +36,103 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.test.checkpointing.utils.MigrationTestUtils; import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase; +import org.apache.flink.test.util.MigrationTest; import org.apache.flink.util.Collector; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import javax.annotation.Nullable; + import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; -import java.util.stream.Collectors; +import java.util.function.BiFunction; /** * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to * (potentially) cover migrating for multiple previous Flink versions, as well as for different * state backends. */ @RunWith(Parameterized.class) -public class StatefulJobWBroadcastStateMigrationITCase extends SnapshotMigrationTestBase { +public class StatefulJobWBroadcastStateMigrationITCase extends SnapshotMigrationTestBase + implements MigrationTest { private static final int NUM_SOURCE_ELEMENTS = 4; - // TODO increase this to newer version to create and test snapshot migration for newer versions - private static final FlinkVersion currentVersion = FlinkVersion.v1_16; + @Parameterized.Parameters(name = "Test snapshot: {0}") + public static Collection<SnapshotSpec> createSpecsForTestRuns() { + return internalParameters(null); + } + + public static Collection<SnapshotSpec> createSpecsForTestDataGeneration( + FlinkVersion targetVersion) { + return internalParameters(targetVersion); + } - // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots - // TODO Note: You should generate the snapshot based on the release branch instead of the - // master. - private static final ExecutionMode executionMode = ExecutionMode.VERIFY_SNAPSHOT; + private static Collection<SnapshotSpec> internalParameters( + @Nullable FlinkVersion targetGeneratingVersion) { + BiFunction<FlinkVersion, FlinkVersion, Collection<FlinkVersion>> getFlinkVersions = + (minInclVersion, maxInclVersion) -> { + if (targetGeneratingVersion != null) { + return Collections.singleton(targetGeneratingVersion); + } else { + return FlinkVersion.rangeOf(minInclVersion, maxInclVersion); + } + }; - @Parameterized.Parameters(name = "Test snapshot: {0}") - public static Collection<SnapshotSpec> parameters() { Collection<SnapshotSpec> parameters = new LinkedList<>(); parameters.addAll( SnapshotSpec.withVersions( StateBackendLoader.MEMORY_STATE_BACKEND_NAME, SnapshotType.SAVEPOINT_CANONICAL, - FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_14))); + getFlinkVersions.apply(FlinkVersion.v1_8, FlinkVersion.v1_14))); Review Comment: Here we have a behavioral change as well: The old code didn't generate the specs for the current version but the new code does. ########## flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java: ########## @@ -104,16 +123,32 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger { .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM) .build()); - private final boolean allowNonRestoredState; Review Comment: I checked why we don't need that anymore. Apparently, this feature was added with FLINK-7595 and removed for Flink 1.4 with FLINK-8472. I'm gonna leave it like that. I guess it's fine to remove it :+1: ########## flink-test-utils-parent/flink-migration-test-utils/README.md: ########## @@ -0,0 +1,103 @@ +# Add State Migration Tests + +This module collects tools that help to generate test data for the state migration tests. + +The following dependency need to be added to the module's Maven config in case a +migration test is meant to be added to that module: + +```xml +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>fink-migration-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> +</dependency> +``` + +and the following profile + +```xml +<profile> + <id>generate-migration-test-data</id> + <activation> + <property> + <name>generate-migration-test-data</name> + </property> + </activation> Review Comment: ...as a consequence, we would have to update all the pom.xml files accordingly. ########## flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala: ########## @@ -75,43 +91,44 @@ object StatefulJobWBroadcastStateMigrationITCase { SnapshotSpec.withVersions( StateBackendLoader.MEMORY_STATE_BACKEND_NAME, SnapshotType.SAVEPOINT_CANONICAL, - FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_13))) + getFlinkVersions.apply(FlinkVersion.v1_8, FlinkVersion.v1_13) Review Comment: here as welll: We're filtering in the old code but don't do it in the new code. :thinking: ########## flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/migration/PublishedVersionUtils.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.migration; + +import org.apache.flink.FlinkVersion; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Objects; + +/** Utilities for update and read the most-recently published flink version. */ +public class PublishedVersionUtils { + + private static final String MOST_RECENTLY_PUBLISHED_VERSION_FILE = + "most_recently_published_version"; + + public static FlinkVersion getMostRecentlyPublishedVersion() { Review Comment: I'm wondering whether we should add some functionality for ignoring comments (e.g. lines starting with an `#`). That way we could add some comment into the file to give some context for the value. Just as an idea: You don't have to do it if you think that it's to much... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
