This is an automated email from the ASF dual-hosted git repository.

wombatu-kun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d671c276261a test(flink): de-flake 
testStreamReadMorTableWithCompactionFromEarliest (#19019)
d671c276261a is described below

commit d671c276261a7ac8fdda88f6169df571624637c9
Author: Vova Kolmakov <[email protected]>
AuthorDate: Wed Jun 17 10:42:08 2026 +0700

    test(flink): de-flake testStreamReadMorTableWithCompactionFromEarliest 
(#19019)
---
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 42 ++++++++++++++++++++++
 1 file changed, 42 insertions(+)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index e1a0606373d1..56d082be1f7a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -3941,6 +3941,15 @@ public class ITTestHoodieDataSource {
       //      fail the job on stream-closed-mid-read (the right behavior for 
real I/O
       //      failures), so this tolerance is scoped to the 
SuccessException-based test
       //      pattern below and is NOT mirrored in production code.
+      //   3. NullPointerException from 
ParquetColumnarRowSplitReader#readNextRowGroup: the
+      //      same benign teardown race as (2), observed with different 
timing. When the
+      //      SplitFetcher's close() fully completes first, 
ParquetColumnarRowSplitReader#close
+      //      nulls out its `reader` field, so the in-flight row-group read on 
the task thread
+      //      surfaces as a NullPointerException (reader.readNextRowGroup() on 
a null reader)
+      //      instead of an IOException("Stream is closed!"). Same functional 
outcome - the
+      //      sink has already collected the expected rows - only the error 
symptom differs.
+      //      Tolerated narrowly (an NPE originating from that exact frame) 
for the same
+      //      reason as (2), and likewise NOT mirrored in production code.
       if (!isAcceptableTerminalFailure(e)) {
         throw new AssertionError("Unexpected job failure", e);
       }
@@ -3966,8 +3975,41 @@ public class ITTestHoodieDataSource {
       if (msg != null && msg.contains("Stream is closed")) {
         return true;
       }
+      // The NPE twin of the "Stream is closed!" teardown race (cause #3 at 
the call site):
+      // a NullPointerException whose own stack trace originates from
+      // ParquetColumnarRowSplitReader#readNextRowGroup, i.e. 
reader.readNextRowGroup() ran on a
+      // null `reader` that ParquetColumnarRowSplitReader#close had just 
nulled out. Scoped to
+      // that exact frame so genuine NPEs - and the legitimate 
IOException("expecting more
+      // rows...") thrown from the same method - still fail the test.
+      if (isNullPointerException(cur) && containsReadNextRowGroupFrame(cur)) {
+        return true;
+      }
       cur = cur.getCause();
     }
     return false;
   }
+
+  /**
+   * True for a real {@link NullPointerException} as well as one wrapped in 
Flink's
+   * {@code SerializedThrowable} when the failure is propagated back from the 
cluster (its
+   * {@code toString()} preserves the original {@code 
java.lang.NullPointerException} prefix).
+   */
+  private static boolean isNullPointerException(Throwable t) {
+    return t instanceof NullPointerException
+        || t.toString().startsWith(NullPointerException.class.getName());
+  }
+
+  /**
+   * Whether {@code t}'s stack trace (preserved even through {@code 
SerializedThrowable})
+   * contains a {@code ParquetColumnarRowSplitReader#readNextRowGroup} frame.
+   */
+  private static boolean containsReadNextRowGroupFrame(Throwable t) {
+    for (StackTraceElement frame : t.getStackTrace()) {
+      if (frame.getClassName().endsWith("ParquetColumnarRowSplitReader")
+          && "readNextRowGroup".equals(frame.getMethodName())) {
+        return true;
+      }
+    }
+    return false;
+  }
 }

Reply via email to