This is an automated email from the ASF dual-hosted git repository.
wombatu-kun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d671c276261a test(flink): de-flake
testStreamReadMorTableWithCompactionFromEarliest (#19019)
d671c276261a is described below
commit d671c276261a7ac8fdda88f6169df571624637c9
Author: Vova Kolmakov <[email protected]>
AuthorDate: Wed Jun 17 10:42:08 2026 +0700
test(flink): de-flake testStreamReadMorTableWithCompactionFromEarliest
(#19019)
---
.../apache/hudi/table/ITTestHoodieDataSource.java | 42 ++++++++++++++++++++++
1 file changed, 42 insertions(+)
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index e1a0606373d1..56d082be1f7a 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -3941,6 +3941,15 @@ public class ITTestHoodieDataSource {
// fail the job on stream-closed-mid-read (the right behavior for
real I/O
// failures), so this tolerance is scoped to the
SuccessException-based test
// pattern below and is NOT mirrored in production code.
+ // 3. NullPointerException from
ParquetColumnarRowSplitReader#readNextRowGroup: the
+ // same benign teardown race as (2), observed with different
timing. When the
+ // SplitFetcher's close() fully completes first,
ParquetColumnarRowSplitReader#close
+ // nulls out its `reader` field, so the in-flight row-group read on
the task thread
+ // surfaces as a NullPointerException (reader.readNextRowGroup() on
a null reader)
+ // instead of an IOException("Stream is closed!"). Same functional
outcome - the
+ // sink has already collected the expected rows - only the error
symptom differs.
+ // Tolerated narrowly (an NPE originating from that exact frame)
for the same
+ // reason as (2), and likewise NOT mirrored in production code.
if (!isAcceptableTerminalFailure(e)) {
throw new AssertionError("Unexpected job failure", e);
}
@@ -3966,8 +3975,41 @@ public class ITTestHoodieDataSource {
if (msg != null && msg.contains("Stream is closed")) {
return true;
}
+ // The NPE twin of the "Stream is closed!" teardown race (cause #3 at
the call site):
+ // a NullPointerException whose own stack trace originates from
+ // ParquetColumnarRowSplitReader#readNextRowGroup, i.e.
reader.readNextRowGroup() ran on a
+ // null `reader` that ParquetColumnarRowSplitReader#close had just
nulled out. Scoped to
+ // that exact frame so genuine NPEs - and the legitimate
IOException("expecting more
+ // rows...") thrown from the same method - still fail the test.
+ if (isNullPointerException(cur) && containsReadNextRowGroupFrame(cur)) {
+ return true;
+ }
cur = cur.getCause();
}
return false;
}
+
+ /**
+ * True for a real {@link NullPointerException} as well as one wrapped in
Flink's
+ * {@code SerializedThrowable} when the failure is propagated back from the
cluster (its
+ * {@code toString()} preserves the original {@code
java.lang.NullPointerException} prefix).
+ */
+ private static boolean isNullPointerException(Throwable t) {
+ return t instanceof NullPointerException
+ || t.toString().startsWith(NullPointerException.class.getName());
+ }
+
+ /**
+ * Whether {@code t}'s stack trace (preserved even through {@code
SerializedThrowable})
+ * contains a {@code ParquetColumnarRowSplitReader#readNextRowGroup} frame.
+ */
+ private static boolean containsReadNextRowGroupFrame(Throwable t) {
+ for (StackTraceElement frame : t.getStackTrace()) {
+ if (frame.getClassName().endsWith("ParquetColumnarRowSplitReader")
+ && "readNextRowGroup".equals(frame.getMethodName())) {
+ return true;
+ }
+ }
+ return false;
+ }
}