This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c4426265e1f4 test(flink): retry short CollectSink reads to de-flake 
stream-read ITs (#19030)
c4426265e1f4 is described below

commit c4426265e1f4f349dca53671a0826e7e68cb6513
Author: Vova Kolmakov <[email protected]>
AuthorDate: Thu Jun 18 09:33:12 2026 +0700

    test(flink): retry short CollectSink reads to de-flake stream-read ITs 
(#19030)
    
    * test(flink): retry short CollectSink reads to de-flake stream-read ITs
    
    ITTestHoodieDataSource streaming reads are collected with CollectTableSink 
and terminated by a forced SuccessException at the expected row count. A 
tolerated teardown race (Stream-is-closed / readNextRowGroup NPE, broadened in 
#19019) can fire before the read completes, so the job ends with fewer rows 
than expected and the test asserts on an incomplete result.
    
    Wrap submit-and-collect in submitAndFetchWithRetry, which re-reads (up to 3 
times) when the collected count is below the expectation; re-reading the 
already committed table is idempotent. Log the swallowed non-SuccessException 
cause for diagnosability. Test-only change, no production code touched.
    
    * addressed review comments: fix stale comment reference and shorten 
terminal-cause log
    
    ---------
    
    Co-authored-by: Vova Kolmakov <[email protected]>
---
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 72 ++++++++++++++++++++--
 1 file changed, 66 insertions(+), 6 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 56d082be1f7a..749da56c8a16 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -79,6 +79,8 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -126,6 +128,15 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  */
 @ExtendWith(FlinkMiniCluster.class)
 public class ITTestHoodieDataSource {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ITTestHoodieDataSource.class);
+
+  // A streaming read collected via CollectTableSink is terminated by a forced 
SuccessException once it
+  // reaches its expected row count. A benign teardown race (see 
isAcceptableTerminalFailure) can instead
+  // close the source stream mid-read and terminate the job before all rows 
are emitted, leaving an
+  // incomplete result. Re-reading from the same (already committed) table is 
idempotent, so retry a few
+  // times before giving up. See submitAndFetchWithRetry.
+  private static final int MAX_STREAM_READ_ATTEMPTS = 3;
+
   private TableEnvironment streamTableEnv;
   private TableEnvironment batchTableEnv;
 
@@ -560,7 +571,7 @@ public class ITTestHoodieDataSource {
         + "  'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "',\n"
         + "  'sink-expected-row-num' = '2'"
         + ")";
-    List<Row> result = execSelectSqlWithExpectedNum(streamTableEnv, "select 
name, sum(age) from t1 group by name", sinkDDL);
+    List<Row> result = submitAndFetchWithRetry(streamTableEnv, "select name, 
sum(age) from t1 group by name", sinkDDL, 2);
     final String expected = "[+I(+I[Danny, 24]), +I(+I[Stephen, 34])]";
     assertRowsEquals(result, expected, true);
   }
@@ -3888,15 +3899,32 @@ public class ITTestHoodieDataSource {
     } else {
       sinkDDL = TestConfigurations.getCollectSinkDDLWithExpectedNum("sink", 
expectedNum);
     }
-    return execSelectSqlWithExpectedNum(tEnv, select, sinkDDL);
+    return submitAndFetchWithRetry(tEnv, select, sinkDDL, expectedNum);
   }
 
   /**
-   * Use CollectTableSink to collect results with expected row number.
+   * Submits a streaming select that collects into the {@link 
CollectSinkTableFactory} sink and returns
+   * the collected rows.
+   *
+   * <p>The streaming job is terminated by a forced {@link 
CollectSinkTableFactory.SuccessException} once
+   * {@code expectedNum} rows are collected. A benign teardown race (see 
{@link #isAcceptableTerminalFailure})
+   * can instead end the job before all rows are emitted, leaving a short 
result. Re-reading the already
+   * committed table is idempotent, so retry up to {@link 
#MAX_STREAM_READ_ATTEMPTS} times when the result
+   * is short; this keeps the race from surfacing as a confusing row-count 
assertion failure.
    */
-  private List<Row> execSelectSqlWithExpectedNum(TableEnvironment tEnv, String 
select, String sinkDDL) {
-    TableResult tableResult = submitSelectSql(tEnv, select, sinkDDL);
-    return fetchResultWithExpectedNum(tEnv, tableResult);
+  private List<Row> submitAndFetchWithRetry(TableEnvironment tEnv, String 
select, String sinkDDL, int expectedNum) {
+    List<Row> rows = Collections.emptyList();
+    for (int attempt = 1; attempt <= MAX_STREAM_READ_ATTEMPTS; attempt++) {
+      TableResult tableResult = submitSelectSql(tEnv, select, sinkDDL);
+      rows = fetchResultWithExpectedNum(tEnv, tableResult);
+      if (expectedNum <= 0 || rows.size() >= expectedNum) {
+        return rows;
+      }
+      LOG.warn("Streaming read collected {} of {} expected rows on attempt 
{}/{}; a tolerated teardown "
+              + "race ended the job before the read completed. Retrying. 
select=[{}]",
+          rows.size(), expectedNum, attempt, MAX_STREAM_READ_ATTEMPTS, select);
+    }
+    return rows;
   }
 
   private TableResult submitSelectSql(TableEnvironment tEnv, String select, 
String sinkDDL) {
@@ -3953,6 +3981,14 @@ public class ITTestHoodieDataSource {
       if (!isAcceptableTerminalFailure(e)) {
         throw new AssertionError("Unexpected job failure", e);
       }
+      // The races (2)/(3) usually fire after the sink has collected its 
expected rows, but can also fire
+      // before - ending the read with a short result. Log the tolerated cause 
so an incomplete read is
+      // diagnosable; submitAndFetchWithRetry re-reads when the collected 
count is below the expectation.
+      if (!isSuccessException(e)) {
+        LOG.warn("Streaming read terminated by a tolerated teardown race ({}); 
collected {} rows so far.",
+            describeTerminalCause(e),
+            
CollectSinkTableFactory.RESULT.values().stream().mapToInt(List::size).sum());
+      }
     }
     tEnv.executeSql("DROP TABLE IF EXISTS sink");
     return CollectSinkTableFactory.RESULT.values().stream()
@@ -4012,4 +4048,28 @@ public class ITTestHoodieDataSource {
     }
     return false;
   }
+
+  /**
+   * Whether {@code e} (or any of its causes) is the normal {@link 
CollectSinkTableFactory.SuccessException}
+   * terminator (the happy path), as opposed to one of the tolerated 
teardown-race symptoms.
+   */
+  private static boolean isSuccessException(Throwable e) {
+    for (Throwable cur = e; cur != null; cur = cur.getCause()) {
+      if (cur instanceof CollectSinkTableFactory.SuccessException) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Short description of {@code e}'s root cause, for logging which tolerated 
terminal failure fired.
+   */
+  private static String describeTerminalCause(Throwable e) {
+    Throwable root = e;
+    while (root.getCause() != null) {
+      root = root.getCause();
+    }
+    return root.getClass().getSimpleName() + ": " + root.getMessage();
+  }
 }

Reply via email to