XComp commented on code in PR #21736:
URL: https://github.com/apache/flink/pull/21736#discussion_r1147503902


##########
flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java:
##########
@@ -71,7 +80,17 @@
  * AbstractOperatorRestoreTestBase#migrateJob}, please create the 
corresponding test resource
  * directory and copy the _metadata file by hand.
  */
-public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
+public abstract class AbstractOperatorRestoreTestBase extends TestLogger 
implements MigrationTest {
+
+    private static final FlinkVersion MIGRATION_BASE_VERSION = 
FlinkVersion.v1_16;

Review Comment:
   can't we add a utility method `FlinkVersion.previousMinorVersion()` that 
decrements the minor version by 1? That's how we can derive the base version 
from the target version instead of having a fixed field which we would have to 
update again with each release.



##########
flink-test-utils-parent/flink-migration-test-utils/src/main/resources/most_recently_published_version:
##########
@@ -0,0 +1 @@
+v1.16

Review Comment:
   That one needs to be updated to v1.17 now after FLINK-31593 is merged.



##########
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java:
##########
@@ -309,17 +288,34 @@ public void testMonitoringSourceRestore() throws 
Exception {
 
         testHarness.setup();
 
-        testHarness.initializeState(
-                OperatorSnapshotUtil.getResourceFilename(
-                        "monitoring-function-migration-test-"
-                                + expectedModTime
-                                + "-flink"
-                                + testMigrateVersion
-                                + "-snapshot"));
+        // Get the exact filename
+        Tuple2<String, Long> fileNameAndModTime = 
getResourceFilename(testMigrateVersion);
+
+        testHarness.initializeState(fileNameAndModTime.f0);
 
         testHarness.open();
 
-        Assert.assertEquals((long) expectedModTime, 
monitoringFunction.getGlobalModificationTime());
+        Assert.assertEquals(
+                fileNameAndModTime.f1.longValue(), 
monitoringFunction.getGlobalModificationTime());
+    }
+
+    private Tuple2<String, Long> getResourceFilename(FlinkVersion version) 
throws IOException {
+        String resourceDirectory = 
OperatorSnapshotUtil.getResourceFilename("");
+        Pattern fileNamePattern =
+                Pattern.compile(
+                        "monitoring-function-migration-test-(\\d+)-flink"

Review Comment:
   I did some digging through the code. My impression is that we don't even 
need the modification time, do we? It only ends up in the filename :thinking: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to