XComp commented on code in PR #21736:
URL: https://github.com/apache/flink/pull/21736#discussion_r1098832279
##########
flink-annotations/src/main/java/org/apache/flink/FlinkVersion.java:
##########
@@ -86,8 +90,12 @@ public static Optional<FlinkVersion> byCode(String code) {
return Optional.ofNullable(CODE_MAP.get(code));
}
+ public static FlinkVersion getMostRecentlyPublishedVersion() {
+ return values()[values().length - 2];
+ }
+
/** Returns the current version. */
- public static FlinkVersion current() {
+ public static FlinkVersion getCurrentMasterVersion() {
Review Comment:
I guess, that's a method we cannot change because it's a public interface.
Updating the JavaDoc would be sufficient. That's also what's causing the ci
failure.
##########
flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/migration/MigrationTestsSnapshotGenerator.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.migration;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.test.util.MigrationTest;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
Review Comment:
```suggestion
```
`mvn spotless:apply` might help here
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase.java:
##########
@@ -60,62 +61,85 @@
* previous Flink versions, as well as for different state backends.
*/
@RunWith(Parameterized.class)
-public class StatefulJobSnapshotMigrationITCase extends
SnapshotMigrationTestBase {
+public class StatefulJobSnapshotMigrationITCase extends
SnapshotMigrationTestBase
+ implements MigrationTest {
private static final int NUM_SOURCE_ELEMENTS = 4;
- // TODO increase this to newer version to create and test snapshot
migration for newer versions
- private static final FlinkVersion currentVersion = FlinkVersion.v1_16;
+ @Parameterized.Parameters(name = "Test snapshot: {0}")
+ public static Collection<SnapshotSpec> createSpecsForTestRuns() {
+ return internalParameters(null);
+ }
- // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
- // TODO Note: You should generate the snapshot based on the release branch
instead of the
- // master.
- private static final ExecutionMode executionMode =
ExecutionMode.VERIFY_SNAPSHOT;
+ public static Collection<SnapshotSpec> createSpecsForTestDataGeneration(
+ FlinkVersion targetVersion) {
+ return internalParameters(targetVersion);
+ }
- @Parameterized.Parameters(name = "Test snapshot: {0}")
- public static Collection<SnapshotSpec> parameters() {
+ private static Collection<SnapshotSpec> internalParameters(
+ /* Nullable */ FlinkVersion targetGeneratingVersion) {
Review Comment:
```suggestion
@Nullable FlinkVersion targetGeneratingVersion) {
```
there's `javax.annotation.Nullable` that can be used instead. Using the
annotation enables useful IDE features like warnings
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -63,19 +63,11 @@
* Tests for checking whether {@link FlinkKafkaConsumerBase} can restore from
snapshots that were
* done using previous Flink versions' {@link FlinkKafkaConsumerBase}.
*
- * <p>For regenerating the binary snapshot files run {@link #writeSnapshot()}
on the corresponding
- * Flink release-* branch.
+ * <p>For regenerating the binary snapshot files run {@link
#writeSnapshot(FlinkVersion)} on the
+ * corresponding Flink release-* branch.
Review Comment:
Shouldn't we rather refer to the test data generation framework here?
##########
flink-test-utils-parent/flink-migration-test-utils/README.md:
##########
@@ -0,0 +1,93 @@
+# Add State Migration Tests
+
+This module is the tools that helps the state migration tests
+generate the snapshots of the current version before cutting branch.
+
+To add a state migration tests in one module, add the following dependency
Review Comment:
```suggestion
The following dependency need to be added to the module's Maven config in
case a migration test is meant to be added to that module:
```
nit: Just a nitty proposal because the sentence mixed up singular and plural
initially.
##########
flink-test-utils-parent/flink-migration-test-utils/README.md:
##########
@@ -0,0 +1,93 @@
+# Add State Migration Tests
+
+This module is the tools that helps the state migration tests
+generate the snapshots of the current version before cutting branch.
+
+To add a state migration tests in one module, add the following dependency
+
+```xml
+
Review Comment:
```suggestion
```
nit: to keep the README as short as possible
##########
flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/migration/SnapshotGeneratorExecutor.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.migration;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.test.util.MigrationTest;
+
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.RunRules;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.Parameterized;
+import org.junit.runners.model.Statement;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Array;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/** A utility class to execute generateSnapshots method for migration tests. */
+class SnapshotGeneratorExecutor {
+
+ void executeGenerate(Class<?> migrationTestClass, FlinkVersion
flinkVersion) throws Throwable {
+ for (Method method : migrationTestClass.getMethods()) {
+ if
(method.isAnnotationPresent(MigrationTest.SnapshotsGenerator.class)) {
+ executeGenerateMethods(migrationTestClass, method,
flinkVersion);
+ } else if (method.isAnnotationPresent(
+ MigrationTest.ParameterizedSnapshotsGenerator.class)) {
+ String parametersMethodName =
+
method.getAnnotation(MigrationTest.ParameterizedSnapshotsGenerator.class)
+ .value();
+ Method parametersMethod =
+ migrationTestClass.getMethod(parametersMethodName,
FlinkVersion.class);
+ executeParameterizedGenerateMethods(
+ migrationTestClass, method, parametersMethod,
flinkVersion);
+ }
+ }
+ }
+
+ private void executeGenerateMethods(
+ Class<?> migrationTestClass, Method method, FlinkVersion version)
throws Throwable {
+ method.setAccessible(true);
+ List<TestRule> classRules = getRuleFields(migrationTestClass,
ClassRule.class, null);
+
+ executeWithRules(
+ classRules,
+ new Statement() {
+ @Override
+ public void evaluate() throws Throwable {
+ Object migrationTest =
createMigrationTest(migrationTestClass);
+ List<TestRule> rules =
+ getRuleFields(migrationTestClass, Rule.class,
migrationTest);
+ executeWithRules(
+ rules,
+ new Statement() {
+ @Override
+ public void evaluate() throws Throwable {
+ method.invoke(migrationTest, version);
+ }
+ });
+ }
+ });
+ }
+
+ private void executeParameterizedGenerateMethods(
+ Class<?> migrationTestClass,
+ Method method,
+ Method parametersMethod,
+ FlinkVersion version)
+ throws Throwable {
+ method.setAccessible(true);
+ parametersMethod.setAccessible(true);
+ List<TestRule> classRules = getRuleFields(migrationTestClass,
ClassRule.class, null);
+
+ executeWithRules(
+ classRules,
+ new Statement() {
+ @Override
+ public void evaluate() throws Throwable {
+ Object migrationTest =
createMigrationTest(migrationTestClass);
+ List<TestRule> rules =
+ getRuleFields(migrationTestClass, Rule.class,
migrationTest);
+ Collection<?> arguments =
+ (Collection<?>)
parametersMethod.invoke(migrationTest, version);
+ for (Object argument : arguments) {
+ executeWithRules(
+ rules,
+ new Statement() {
+ @Override
+ public void evaluate() throws
Throwable {
+ method.invoke(migrationTest,
version, argument);
Review Comment:
I'm not sure whether we actually need the version as a parameter here.
Almost all of the test data generation methods have the FlinkVersion as a
"unused parameter". The only class where we utilize this parameter is
`TypeSerializerUpgradeTestBase`. But there, we pass this version to
`createTestSpecifications` as well where it is used as an input parameter to
each of the `TestSpecification` instances that are then used to call the actual
test data genetation method (in the marked line, it's the `argument`
parameter). Therefore, even in `TypeSerializerUpgradeTestBase` the flink
version is present through the `TestSpecification` parameter. tbh, I haven't
checked all the `TypeSerializerUpgradeTestBase` subclasses. But it is true for
the ~5 classes I checked. Could we get rid of this parameter?
##########
flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java:
##########
@@ -64,4 +64,8 @@ public void createRestoredJob(StreamExecutionEnvironment env)
{
SingleOutputStreamOperator<Integer> third =
createThirdStatefulMap(ExecutionMode.RESTORE, stateless2);
}
+
+ /** Will use the state generate by {@link ChainBreakTest}. */
+ @Override
+ public void generateSnapshots(FlinkVersion targetVersion) throws Exception
{}
Review Comment:
Having all these empty classes feels odd. I didn't go through the code in
detail. But it looks like we need to have it implemented by `ChainBreakTest`
and been disabled by all the others so that the
`AbstractOperatorRestoreTestBase.generateSnapshots` method is only called once?
Could we extract the data generation out of `AbstractOperatorRestoreTestBase`,
instead? :thinking:
##########
flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobWBroadcastStateMigrationITCase.scala:
##########
@@ -75,43 +75,65 @@ object StatefulJobWBroadcastStateMigrationITCase {
SnapshotSpec.withVersions(
StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
SnapshotType.SAVEPOINT_CANONICAL,
- FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_13)))
+ if (targetGeneratingVersion != null)
Review Comment:
Defining a local BiFunction would shorten the code even more
```
var getFlinkVersions =
new BiFunction[FlinkVersion, FlinkVersion,
util.Collection[FlinkVersion]] {
override def apply(
minInclVersion: FlinkVersion,
maxInclVersion: FlinkVersion): util.Set[FlinkVersion] = if (
targetGeneratingVersion != null
)
Collections.singleton(targetGeneratingVersion)
else
FlinkVersion.rangeOf(minInclVersion, maxInclVersion)
}
```
All the changes below could be replaced with a `getFlinkVersions` which
reduces the redundant code.
##########
flink-core/pom.xml:
##########
@@ -18,148 +18,203 @@ specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-parent</artifactId>
- <version>1.17-SNAPSHOT</version>
- </parent>
-
- <artifactId>flink-core</artifactId>
- <name>Flink : Core</name>
-
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-annotations</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-metrics-core</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-shaded-asm-9</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-shaded-jackson</artifactId>
- </dependency>
-
- <!-- standard utilities -->
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <!-- managed version -->
- </dependency>
-
- <!-- standard utilities -->
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-text</artifactId>
- <!-- managed version -->
- </dependency>
-
- <!-- for the fallback generic serializer -->
- <dependency>
- <groupId>com.esotericsoftware.kryo</groupId>
- <artifactId>kryo</artifactId>
- <!-- managed version -->
- </dependency>
-
- <!-- The common collections are needed for some hash tables
used in the collection execution -->
- <dependency>
- <groupId>commons-collections</groupId>
- <artifactId>commons-collections</artifactId>
- <!-- managed version -->
- </dependency>
-
- <!-- Commons compression, for additional decompressors -->
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-compress</artifactId>
- <!-- managed version -->
- </dependency>
- <dependency>
- <groupId>com.github.luben</groupId>
- <artifactId>zstd-jni</artifactId>
- <version>1.4.9-1</version>
- <scope>test</scope>
- </dependency>
-
-
- <!-- Ratelimiting dependencies -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-shaded-guava</artifactId>
- </dependency>
-
- <!-- ================== test dependencies ================== -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils-junit</artifactId>
- </dependency>
-
- <!-- Joda, jackson, and lombok are used to test that
serialization and type extraction
- work with types from those libraries -->
-
- <dependency>
- <groupId>joda-time</groupId>
- <artifactId>joda-time</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.joda</groupId>
- <artifactId>joda-convert</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <version>1.18.22</version>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
-
- <configuration>
- <suppressionsLocation
combine.self="override">/tools/maven/suppressions-core.xml</suppressionsLocation>
- </configuration>
- </plugin>
-
- <!-- publish some test base classes -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
-
- </build>
-
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-parent</artifactId>
+ <version>1.17-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-core</artifactId>
+ <name>Flink : Core</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-annotations</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-asm-9</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-jackson</artifactId>
+ </dependency>
+
+ <!-- standard utilities -->
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <!-- managed version -->
+ </dependency>
+
+ <!-- standard utilities -->
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-text</artifactId>
+ <!-- managed version -->
+ </dependency>
+
+ <!-- for the fallback generic serializer -->
+ <dependency>
+ <groupId>com.esotericsoftware.kryo</groupId>
+ <artifactId>kryo</artifactId>
+ <!-- managed version -->
+ </dependency>
+
+ <!-- The common collections are needed for some hash tables used in
the collection execution -->
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ <!-- managed version -->
+ </dependency>
+
+ <!-- Commons compression, for additional decompressors -->
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <!-- managed version -->
+ </dependency>
+ <dependency>
+ <groupId>com.github.luben</groupId>
+ <artifactId>zstd-jni</artifactId>
+ <version>1.4.9-1</version>
+ <scope>test</scope>
+ </dependency>
+
+
+ <!-- Ratelimiting dependencies -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-guava</artifactId>
+ </dependency>
+
+ <!-- ================== test dependencies ================== -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils-junit</artifactId>
+ </dependency>
+
+ <!-- Joda, jackson, and lombok are used to test that serialization and
type extraction
+ work with types from those libraries -->
+
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.joda</groupId>
+ <artifactId>joda-convert</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>1.18.22</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>fink-migration-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+
+ <configuration>
+ <suppressionsLocation
combine.self="override">/tools/maven/suppressions-core.xml
+ </suppressionsLocation>
Review Comment:
Why this change?
##########
flink-test-utils-parent/flink-migration-test-utils/src/main/java/org/apache/flink/test/migration/MigrationTestsSnapshotGenerator.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.migration;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.test.util.MigrationTest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * This class aims to generate the snapshots for all the state migration
tests. A migration tests
+ * should implement {@link MigrationTest} interface and its name should match
{@code
+ * *(Test|ITCase)(.java|.scala)}. For scala tests, we also require the class
name is the same with
+ * the containing filename.
+ */
+public class MigrationTestsSnapshotGenerator {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MigrationTestsSnapshotGenerator.class);
+
+ private static final String[] DEFAULT_PATH_PREFIXES =
+ new String[] {"src/test/java", "src/test/scala"};
+
+ private static final Pattern VERSION_PATTERN = Pattern.compile("" +
"v?([0-9]+)[._]([0-9]+)");
+
+ private static final String CLASS_NAME_GROUP = "className";
+ private static final Pattern CLASS_NAME_PATTERN =
+ Pattern.compile("(?<" + CLASS_NAME_GROUP +
">[a-zA-Z0-9]*(Test|ITCase))(.java|.scala)");
+
+ public static void main(String[] args) {
+ try {
+ if (args.length < 2) {
+ throw new IllegalArgumentException(
+ "Usage: java MigrationTestsSnapshotGenerator [project
root path] [version]");
+ }
+
+ File rootDirectory = new File(args[0]);
+ if (!rootDirectory.exists() || !rootDirectory.isDirectory()) {
+ throw new FileNotFoundException(
+ rootDirectory + " does not exist or is not a
directory.");
+ }
+
+ String versionName = args[1];
+ Matcher versionMatcher = VERSION_PATTERN.matcher(versionName);
+ if (!versionMatcher.matches()) {
+ throw new IllegalArgumentException(
+ "Version "
+ + versionName
+ + "could not be parsed, "
+ + "please specify the version with format like
1.17, 1_17, v1_17, v1.17");
+ }
+ String normalizedVersionName =
+ "v" + versionMatcher.group(1) + "_" +
versionMatcher.group(2);
+ FlinkVersion version = FlinkVersion.valueOf(normalizedVersionName);
+
+ LOG.info("Start generating for module {} and version {}",
rootDirectory, version);
+ SnapshotGeneratorExecutor executor = new
SnapshotGeneratorExecutor();
+ List<Class<?>> migrationTests =
findMigrationTests(rootDirectory.getAbsolutePath());
+ for (Class<?> migrationTestClass : migrationTests) {
+ LOG.info("Start generating for {}",
migrationTestClass.getName());
+ executor.executeGenerate(migrationTestClass, version);
+ LOG.info("Finish generating for {}",
migrationTestClass.getName());
+ }
+
+ // Avoids leaking threads blocks the process.
+ System.exit(0);
Review Comment:
Is the `${generated.classes}` property which you use in the pom files a
Maven property? I struggled to find anything about it online? :thinking:
##########
flink-test-utils-parent/flink-migration-test-utils/README.md:
##########
@@ -0,0 +1,93 @@
+# Add State Migration Tests
+
+This module is the tools that helps the state migration tests
+generate the snapshots of the current version before cutting branch.
Review Comment:
```suggestion
This module collects tools that help to generate test data for the state
migration tests.
```
I'm not sure whether we should mention here when the tool should be used.
This is subject for the release documentation. We should avoid having redundant
documentation in various places because it makes it easier to create into
inconsistent documentation by not changing all the locations when updating the
docs.
##########
flink-test-utils-parent/flink-migration-test-utils/README.md:
##########
@@ -0,0 +1,93 @@
+# Add State Migration Tests
+
+This module is the tools that helps the state migration tests
+generate the snapshots of the current version before cutting branch.
+
+To add a state migration tests in one module, add the following dependency
+
+```xml
+
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>fink-migration-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+</dependency>
+```
+
+and the following profile
+
+```xml
+<profile>
+ <id>generate-snapshots</id>
Review Comment:
```suggestion
<id>generate-migration-test-data</id>
```
I feel like `generate-snapshots` is quite generic (especially in the context
of Maven). WDYT? Renaming it here would also imply the other code locations
##########
flink-test-utils-parent/flink-migration-test-utils/README.md:
##########
@@ -0,0 +1,93 @@
+# Add State Migration Tests
+
+This module is the tools that helps the state migration tests
+generate the snapshots of the current version before cutting branch.
+
+To add a state migration tests in one module, add the following dependency
+
+```xml
+
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>fink-migration-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+</dependency>
+```
+
+and the following profile
+
+```xml
+<profile>
+ <id>generate-snapshots</id>
+ <activation>
+ <property>
+ <name>generate-snapshots</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>generate-snapshots</id>
+ <phase>package</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <target>
+ <condition property="optional.classes"
value="--classes '${generate.classes}'"
+ else="">
+ <isset property="generate.classes"/>
+ </condition>
+ <condition property="optional.prefixes"
+ value="--prefixes
'${generate.prefixes}'" else="">
+ <isset property="generate.prefixes"/>
+ </condition>
+ <java
classname="org.apache.flink.test.migration.MigrationTestsSnapshotGenerator"
+ fork="true" failonerror="true"
dir="${project.basedir}">
+ <classpath refid="maven.test.classpath"/>
+ <arg value="--dir"/>
+ <arg line="${project.basedir}"/>
+ <arg value="--version"/>
+ <arg value="${generate.version}"/>
+ <arg line="${optional.classes}"/>
+ <arg line="${optional.prefixes}"/>
+ </java>
+ </target>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</profile>
+```
+
+To show the log during generating, add
+
+```
+logger.migration.name = org.apache.flink.test.migration
+logger.migration.level = INFO
+```
+
+to the `log4j2-test.properties` of this module.
+
+The state migration tests should satisfy
+
+1. The tests are named like `*(Test|ITCase).(java|scala)`.
+2. The test class name is the same with the file name.
+3. The test implements `org.apache.flink.test.util.MigrationTest` and the
snapshots generator methods is labeled
+ with `@SnapshotsGenerato` or `@ParameterizedSnapshotsGenerator`.
+
+# Generating Snapshots
+
+To generating the snapshots for all the tests, execute
+
+```shell
+mvn clean package -Pgenerate-snapshots -Dgenerate.version=1.17 -nsu -DskipRat
-Dcheckstyle.skip -Drat.ignoreErrors=true -DspotlessFiles=ineffective -Dfast
-DskipTests
Review Comment:
```suggestion
mvn clean package -Pgenerate-snapshots -Dgenerate.version=1.17 -nsu -Dfast
-DskipTests
```
`-Dfast` does skip spotless, rat and checkstyle (see `fast` profile
definition in
[pom.xml:1079ff](https://github.com/apache/flink/blob/a9151c42100ec09388d8052c7aa9f77f82efe469/pom.xml#L1079))
##########
flink-test-utils-parent/flink-migration-test-utils/README.md:
##########
@@ -0,0 +1,93 @@
+# Add State Migration Tests
+
+This module is the tools that helps the state migration tests
+generate the snapshots of the current version before cutting branch.
+
+To add a state migration tests in one module, add the following dependency
+
+```xml
+
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>fink-migration-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+</dependency>
+```
+
+and the following profile
+
+```xml
+<profile>
+ <id>generate-snapshots</id>
+ <activation>
+ <property>
+ <name>generate-snapshots</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>generate-snapshots</id>
+ <phase>package</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <target>
+ <condition property="optional.classes"
value="--classes '${generate.classes}'"
+ else="">
+ <isset property="generate.classes"/>
+ </condition>
+ <condition property="optional.prefixes"
+ value="--prefixes
'${generate.prefixes}'" else="">
+ <isset property="generate.prefixes"/>
+ </condition>
+ <java
classname="org.apache.flink.test.migration.MigrationTestsSnapshotGenerator"
+ fork="true" failonerror="true"
dir="${project.basedir}">
+ <classpath refid="maven.test.classpath"/>
+ <arg value="--dir"/>
+ <arg line="${project.basedir}"/>
+ <arg value="--version"/>
+ <arg value="${generate.version}"/>
+ <arg line="${optional.classes}"/>
+ <arg line="${optional.prefixes}"/>
+ </java>
+ </target>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</profile>
+```
+
+To show the log during generating, add
+
+```
+logger.migration.name = org.apache.flink.test.migration
+logger.migration.level = INFO
+```
+
+to the `log4j2-test.properties` of this module.
+
+The state migration tests should satisfy
+
+1. The tests are named like `*(Test|ITCase).(java|scala)`.
+2. The test class name is the same with the file name.
+3. The test implements `org.apache.flink.test.util.MigrationTest` and the
snapshots generator methods is labeled
Review Comment:
```suggestion
3. The test implements `org.apache.flink.test.util.MigrationTest` and the
snapshots generator method is labeled
```
##########
flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala:
##########
@@ -38,78 +38,101 @@ import
org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase
import
org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase.{ExecutionMode,
SnapshotSpec, SnapshotType}
+import org.apache.flink.test.util.MigrationTest
+import org.apache.flink.test.util.MigrationTest.ParameterizedSnapshotsGenerator
import org.apache.flink.util.Collector
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import java.util
-import java.util.stream.Collectors
+import java.util.Collections
import scala.util.{Failure, Try}
object StatefulJobSavepointMigrationITCase {
- // TODO increase this to newer version to create and test snapshot migration
for newer versions
- val currentVersion = FlinkVersion.v1_16
+ @Parameterized.Parameters(name = "Test snapshot: {0}")
+ def createSpecsForTestRuns: util.Collection[SnapshotSpec] =
+ internalParameters(null)
- // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
- // TODO Note: You should generate the snapshot based on the release branch
instead of the
- // master.
- val executionMode = ExecutionMode.VERIFY_SNAPSHOT
+ def createSpecsForTestDataGeneration(version: FlinkVersion):
util.Collection[SnapshotSpec] =
+ internalParameters(version)
- @Parameterized.Parameters(name = "Test snapshot: {0}")
- def parameters: util.Collection[SnapshotSpec] = {
+ private def internalParameters(
+ targetGeneratingVersion: FlinkVersion): util.Collection[SnapshotSpec] = {
// Note: It is not safe to restore savepoints created in a Scala
applications with Flink
// version 1.7 or below. The reason is that up to version 1.7 the
underlying Scala serializer
// used names of anonymous classes that depend on the relative
position/order in code, e.g.,
// if two anonymous classes, instantiated inside the same class and from
the same base class,
// change order in the code their names are switched.
// As a consequence, changes in code may result in restore failures.
// This was fixed in version 1.8, see:
https://issues.apache.org/jira/browse/FLINK-10493
- var parameters: util.List[SnapshotSpec] = new
util.LinkedList[SnapshotSpec]()
+ val parameters: util.List[SnapshotSpec] = new
util.LinkedList[SnapshotSpec]()
parameters.addAll(
SnapshotSpec.withVersions(
StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
SnapshotType.SAVEPOINT_CANONICAL,
- FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_13)))
+ if (targetGeneratingVersion != null)
Review Comment:
A local callback would help here as well to reduce the code redundancy
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/StatefulJobSnapshotMigrationITCase.java:
##########
@@ -60,62 +61,85 @@
* previous Flink versions, as well as for different state backends.
*/
@RunWith(Parameterized.class)
-public class StatefulJobSnapshotMigrationITCase extends
SnapshotMigrationTestBase {
+public class StatefulJobSnapshotMigrationITCase extends
SnapshotMigrationTestBase
+ implements MigrationTest {
private static final int NUM_SOURCE_ELEMENTS = 4;
- // TODO increase this to newer version to create and test snapshot
migration for newer versions
- private static final FlinkVersion currentVersion = FlinkVersion.v1_16;
+ @Parameterized.Parameters(name = "Test snapshot: {0}")
+ public static Collection<SnapshotSpec> createSpecsForTestRuns() {
+ return internalParameters(null);
+ }
- // TODO change this to CREATE_SNAPSHOT to (re)create binary snapshots
- // TODO Note: You should generate the snapshot based on the release branch
instead of the
- // master.
- private static final ExecutionMode executionMode =
ExecutionMode.VERIFY_SNAPSHOT;
+ public static Collection<SnapshotSpec> createSpecsForTestDataGeneration(
+ FlinkVersion targetVersion) {
+ return internalParameters(targetVersion);
+ }
- @Parameterized.Parameters(name = "Test snapshot: {0}")
- public static Collection<SnapshotSpec> parameters() {
+ private static Collection<SnapshotSpec> internalParameters(
+ /* Nullable */ FlinkVersion targetGeneratingVersion) {
Collection<SnapshotSpec> parameters = new LinkedList<>();
parameters.addAll(
SnapshotSpec.withVersions(
StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
SnapshotType.SAVEPOINT_CANONICAL,
- FlinkVersion.rangeOf(FlinkVersion.v1_8,
FlinkVersion.v1_14)));
+ targetGeneratingVersion != null
Review Comment:
This could be a local BiFunction callback. All the ternary operator calls
could be replaced in that case
##########
flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java:
##########
@@ -57,4 +57,10 @@ public void createRestoredJob(StreamExecutionEnvironment
env) {
SingleOutputStreamOperator<Integer> third =
createThirdStatefulMap(ExecutionMode.RESTORE, stateless);
}
+
+ /** Will use the state generate by {@link ChainBreakTest}. */
+ @Override
+ public void generateSnapshots(FlinkVersion targetVersion) throws Exception
{
+ System.out.println("I replace the old one");
Review Comment:
```suggestion
```
##########
flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java:
##########
@@ -66,7 +58,22 @@
* Creates a collection of {@link TestSpecification} which will be used as
input for
* parametrized tests.
*/
- public abstract Collection<TestSpecification<?, ?>>
createTestSpecifications() throws Exception;
+ public abstract Collection<TestSpecification<?, ?>>
createTestSpecifications(
Review Comment:
Are we sure that we're not changing the behavior here? It looks like in the
old implementations created specifications for each MIGRATION_VERSIONS entry.
With the new implementation, we only do it for the version specified as a
parameter in `MigrationTestsSnapshotGenerator`. :thinking: I might be wrong
here. I just tried to reason it based on `CompositeTypeSerializerUpgradeTest`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]