dawidwys commented on code in PR #23728:
URL: https://github.com/apache/flink/pull/23728#discussion_r1401992358


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java:
##########
@@ -81,9 +81,22 @@
 public abstract class RestoreTestBase implements TableTestProgramRunner {
 
     private final Class<? extends ExecNode> execNodeUnderTest;
+    private final AfterRestoreState afterRestoreState;
 
     protected RestoreTestBase(Class<? extends ExecNode> execNodeUnderTest) {
         this.execNodeUnderTest = execNodeUnderTest;
+        this.afterRestoreState = AfterRestoreState.FINITE_AFTER_RESTORE;
+    }
+
+    protected RestoreTestBase(
+            Class<? extends ExecNode> execNodeUnderTest, AfterRestoreState 
state) {
+        this.execNodeUnderTest = execNodeUnderTest;
+        this.afterRestoreState = state;
+    }
+
+    protected enum AfterRestoreState {

Review Comment:
   nit: `AfterRestoreState` -> `AfterRestoreSource`
   
   The enum name already contains the `AfterRestore` prefix, we can drop it 
from the enum values:
   
   ```
   AfterRestoreSource.FINITE or AfterRestoreSource.INFINITE
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java:
##########
@@ -200,17 +213,43 @@ void testRestore(TableTestProgram program, 
ExecNodeMetadata metadata) throws Exc
                 .set(
                         TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS,
                         TableConfigOptions.CatalogPlanRestore.IDENTIFIER);
+
         for (SourceTestStep sourceTestStep : 
program.getSetupSourceTestSteps()) {
             final String id = 
TestValuesTableFactory.registerData(sourceTestStep.dataAfterRestore);
             final Map<String, String> options = new HashMap<>();
             options.put("connector", "values");
             options.put("data-id", id);
             options.put("disable-lookup", "true");
             options.put("runtime-source", "NewSource");
+            if (afterRestoreState == AfterRestoreState.INFINITE_AFTER_RESTORE) 
{
+                options.put("terminating", "false");
+            }
             sourceTestStep.apply(tEnv, options);
         }
 
+        final List<CompletableFuture<?>> futures = new ArrayList<>();
+
         for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) {
+            if (afterRestoreState == AfterRestoreState.INFINITE_AFTER_RESTORE) 
{
+                final CompletableFuture<Object> future = new 
CompletableFuture<>();
+                futures.add(future);
+                final String tableName = sinkTestStep.name;
+                TestValuesTableFactory.registerLocalRawResultsObserver(
+                        tableName,
+                        (integer, strings) -> {
+                            List<String> results = new ArrayList<>();
+                            
results.addAll(sinkTestStep.getExpectedBeforeRestoreAsStrings());
+                            
results.addAll(sinkTestStep.getExpectedAfterRestoreAsStrings());
+                            final boolean shouldComplete =
+                                    CollectionUtils.isEqualCollection(
+                                            
TestValuesTableFactory.getRawResultsAsStrings(
+                                                    tableName),
+                                            results);
+                            if (shouldComplete) {
+                                future.complete(null);
+                            }
+                        });
+            }

Review Comment:
   could we extract parts of that to a private method? It's a) duplicated b) 
the method becomes very long and hard to read



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to