This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c4426265e1f4 test(flink): retry short CollectSink reads to de-flake
stream-read ITs (#19030)
c4426265e1f4 is described below
commit c4426265e1f4f349dca53671a0826e7e68cb6513
Author: Vova Kolmakov <[email protected]>
AuthorDate: Thu Jun 18 09:33:12 2026 +0700
test(flink): retry short CollectSink reads to de-flake stream-read ITs
(#19030)
* test(flink): retry short CollectSink reads to de-flake stream-read ITs
ITTestHoodieDataSource streaming reads are collected with CollectTableSink
and terminated by a forced SuccessException at the expected row count. A
tolerated teardown race (Stream-is-closed / readNextRowGroup NPE, broadened in
#19019) can fire before the read completes, so the job ends with fewer rows
than expected and the test asserts on an incomplete result.
Wrap submit-and-collect in submitAndFetchWithRetry, which re-reads (up to 3
times) when the collected count is below the expectation; re-reading the
already committed table is idempotent. Log the swallowed non-SuccessException
cause for diagnosability. Test-only change, no production code touched.
* addressed review comments: fix stale comment reference and shorten
terminal-cause log
---------
Co-authored-by: Vova Kolmakov <[email protected]>
---
.../apache/hudi/table/ITTestHoodieDataSource.java | 72 ++++++++++++++++++++--
1 file changed, 66 insertions(+), 6 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 56d082be1f7a..749da56c8a16 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -79,6 +79,8 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -126,6 +128,15 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
*/
@ExtendWith(FlinkMiniCluster.class)
public class ITTestHoodieDataSource {
+ private static final Logger LOG =
LoggerFactory.getLogger(ITTestHoodieDataSource.class);
+
+ // A streaming read collected via CollectTableSink is terminated by a forced
SuccessException once it
+ // reaches its expected row count. A benign teardown race (see
isAcceptableTerminalFailure) can instead
+ // close the source stream mid-read and terminate the job before all rows
are emitted, leaving an
+ // incomplete result. Re-reading from the same (already committed) table is
idempotent, so retry a few
+ // times before giving up. See submitAndFetchWithRetry.
+ private static final int MAX_STREAM_READ_ATTEMPTS = 3;
+
private TableEnvironment streamTableEnv;
private TableEnvironment batchTableEnv;
@@ -560,7 +571,7 @@ public class ITTestHoodieDataSource {
+ " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "',\n"
+ " 'sink-expected-row-num' = '2'"
+ ")";
- List<Row> result = execSelectSqlWithExpectedNum(streamTableEnv, "select
name, sum(age) from t1 group by name", sinkDDL);
+ List<Row> result = submitAndFetchWithRetry(streamTableEnv, "select name,
sum(age) from t1 group by name", sinkDDL, 2);
final String expected = "[+I(+I[Danny, 24]), +I(+I[Stephen, 34])]";
assertRowsEquals(result, expected, true);
}
@@ -3888,15 +3899,32 @@ public class ITTestHoodieDataSource {
} else {
sinkDDL = TestConfigurations.getCollectSinkDDLWithExpectedNum("sink",
expectedNum);
}
- return execSelectSqlWithExpectedNum(tEnv, select, sinkDDL);
+ return submitAndFetchWithRetry(tEnv, select, sinkDDL, expectedNum);
}
/**
- * Use CollectTableSink to collect results with expected row number.
+ * Submits a streaming select that collects into the {@link
CollectSinkTableFactory} sink and returns
+ * the collected rows.
+ *
+ * <p>The streaming job is terminated by a forced {@link
CollectSinkTableFactory.SuccessException} once
+ * {@code expectedNum} rows are collected. A benign teardown race (see
{@link #isAcceptableTerminalFailure})
+ * can instead end the job before all rows are emitted, leaving a short
result. Re-reading the already
+ * committed table is idempotent, so retry up to {@link
#MAX_STREAM_READ_ATTEMPTS} times when the result
+ * is short; this keeps the race from surfacing as a confusing row-count
assertion failure.
*/
- private List<Row> execSelectSqlWithExpectedNum(TableEnvironment tEnv, String
select, String sinkDDL) {
- TableResult tableResult = submitSelectSql(tEnv, select, sinkDDL);
- return fetchResultWithExpectedNum(tEnv, tableResult);
+ private List<Row> submitAndFetchWithRetry(TableEnvironment tEnv, String
select, String sinkDDL, int expectedNum) {
+ List<Row> rows = Collections.emptyList();
+ for (int attempt = 1; attempt <= MAX_STREAM_READ_ATTEMPTS; attempt++) {
+ TableResult tableResult = submitSelectSql(tEnv, select, sinkDDL);
+ rows = fetchResultWithExpectedNum(tEnv, tableResult);
+ if (expectedNum <= 0 || rows.size() >= expectedNum) {
+ return rows;
+ }
+ LOG.warn("Streaming read collected {} of {} expected rows on attempt
{}/{}; a tolerated teardown "
+ + "race ended the job before the read completed. Retrying.
select=[{}]",
+ rows.size(), expectedNum, attempt, MAX_STREAM_READ_ATTEMPTS, select);
+ }
+ return rows;
}
private TableResult submitSelectSql(TableEnvironment tEnv, String select,
String sinkDDL) {
@@ -3953,6 +3981,14 @@ public class ITTestHoodieDataSource {
if (!isAcceptableTerminalFailure(e)) {
throw new AssertionError("Unexpected job failure", e);
}
+ // The races (2)/(3) usually fire after the sink has collected its
expected rows, but can also fire
+ // before - ending the read with a short result. Log the tolerated cause
so an incomplete read is
+ // diagnosable; submitAndFetchWithRetry re-reads when the collected
count is below the expectation.
+ if (!isSuccessException(e)) {
+ LOG.warn("Streaming read terminated by a tolerated teardown race ({});
collected {} rows so far.",
+ describeTerminalCause(e),
+
CollectSinkTableFactory.RESULT.values().stream().mapToInt(List::size).sum());
+ }
}
tEnv.executeSql("DROP TABLE IF EXISTS sink");
return CollectSinkTableFactory.RESULT.values().stream()
@@ -4012,4 +4048,28 @@ public class ITTestHoodieDataSource {
}
return false;
}
+
+ /**
+ * Whether {@code e} (or any of its causes) is the normal {@link
CollectSinkTableFactory.SuccessException}
+ * terminator (the happy path), as opposed to one of the tolerated
teardown-race symptoms.
+ */
+ private static boolean isSuccessException(Throwable e) {
+ for (Throwable cur = e; cur != null; cur = cur.getCause()) {
+ if (cur instanceof CollectSinkTableFactory.SuccessException) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Short description of {@code e}'s root cause, for logging which tolerated
terminal failure fired.
+ */
+ private static String describeTerminalCause(Throwable e) {
+ Throwable root = e;
+ while (root.getCause() != null) {
+ root = root.getCause();
+ }
+ return root.getClass().getSimpleName() + ": " + root.getMessage();
+ }
}