bvarghese1 commented on code in PR #23728:
URL: https://github.com/apache/flink/pull/23728#discussion_r1402821698
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java:
##########
@@ -200,17 +213,43 @@ void testRestore(TableTestProgram program,
ExecNodeMetadata metadata) throws Exc
.set(
TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS,
TableConfigOptions.CatalogPlanRestore.IDENTIFIER);
+
for (SourceTestStep sourceTestStep :
program.getSetupSourceTestSteps()) {
final String id =
TestValuesTableFactory.registerData(sourceTestStep.dataAfterRestore);
final Map<String, String> options = new HashMap<>();
options.put("connector", "values");
options.put("data-id", id);
options.put("disable-lookup", "true");
options.put("runtime-source", "NewSource");
+ if (afterRestoreState == AfterRestoreState.INFINITE_AFTER_RESTORE)
{
+ options.put("terminating", "false");
+ }
sourceTestStep.apply(tEnv, options);
}
+ final List<CompletableFuture<?>> futures = new ArrayList<>();
+
for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) {
+ if (afterRestoreState == AfterRestoreState.INFINITE_AFTER_RESTORE)
{
+ final CompletableFuture<Object> future = new
CompletableFuture<>();
+ futures.add(future);
+ final String tableName = sinkTestStep.name;
+ TestValuesTableFactory.registerLocalRawResultsObserver(
+ tableName,
+ (integer, strings) -> {
+ List<String> results = new ArrayList<>();
+
results.addAll(sinkTestStep.getExpectedBeforeRestoreAsStrings());
+
results.addAll(sinkTestStep.getExpectedAfterRestoreAsStrings());
+ final boolean shouldComplete =
+ CollectionUtils.isEqualCollection(
+
TestValuesTableFactory.getRawResultsAsStrings(
+ tableName),
+ results);
+ if (shouldComplete) {
+ future.complete(null);
+ }
+ });
+ }
Review Comment:
Created a private method here
https://github.com/apache/flink/pull/23728/files#diff-a815488ad1ad3ed866421e69b53f1c0c07cbcf16dac7e6fe357f3bb87d2e20baR132
Both generate and restore are calling this now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]