hudi-agent commented on code in PR #18701:
URL: https://github.com/apache/hudi/pull/18701#discussion_r3210349685
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/ParquetSplitRecordIterator.java:
##########
@@ -32,15 +32,37 @@
public final class ParquetSplitRecordIterator implements
ClosableIterator<RowData> {
private final ParquetColumnarRowSplitReader reader;
+ // Cached end-of-stream signal once the underlying Hadoop input stream is
closed externally
+ // (see hasNext() for the rationale). Avoids re-invoking the reader after
we've already
+ // observed the close.
+ private boolean drained = false;
+
public ParquetSplitRecordIterator(ParquetColumnarRowSplitReader reader) {
this.reader = reader;
}
@Override
public boolean hasNext() {
+ if (drained) {
+ return false;
+ }
try {
return !reader.reachedEnd();
} catch (IOException e) {
+ // The underlying Hadoop FSDataInputStream can be closed externally
during streaming
+ // source teardown: the SourceV2 SplitFetcher thread can close the
stream after enqueueing
+ // a BatchRecords, then the mailbox thread polls and tries to read
another row group on
+ // the now-closed stream. The well-known stable Hadoop signal for this is
+ // BufferedFSInputStream / FSInputChecker throwing IOException("Stream
is closed!"). Since
+ // the streaming source runs with
restart-strategy.maxNumberRestartAttempts=0, surfacing
+ // this as a fatal HoodieIOException permanently fails the job and turns
+ // ITTestHoodieDataSource#testStreamReadFromSpecifiedCommitWithChangelog
into a flake.
+ // Treat it as end-of-stream and cache the state so subsequent hasNext()
calls on the
+ // now-broken reader short-circuit instead of re-throwing on every poll.
+ if (isStreamClosedSignal(e)) {
Review Comment:
🤖 This iterator is wired into batch readers too (`CopyOnWriteInputFormat`,
`MergeOnReadInputFormat`, `HoodieRowDataParquetReader`), not just the SourceV2
streaming path. In a batch read, an `IOException("Stream is closed!")` from a
partially-read split would silently truncate the result instead of failing.
Could the suppression be scoped to the SourceV2/CDC caller (e.g. a flag on the
iterator, or done inside `HoodieCdcSplitReaderFunction`) so batch reads still
surface the error? @danny0405 could you weigh in on whether this is the right
layer for the fix?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/ParquetSplitRecordIterator.java:
##########
@@ -32,15 +32,37 @@
public final class ParquetSplitRecordIterator implements
ClosableIterator<RowData> {
private final ParquetColumnarRowSplitReader reader;
+ // Cached end-of-stream signal once the underlying Hadoop input stream is
closed externally
+ // (see hasNext() for the rationale). Avoids re-invoking the reader after
we've already
+ // observed the close.
+ private boolean drained = false;
+
public ParquetSplitRecordIterator(ParquetColumnarRowSplitReader reader) {
this.reader = reader;
}
@Override
public boolean hasNext() {
+ if (drained) {
+ return false;
+ }
try {
return !reader.reachedEnd();
Review Comment:
🤖 If `reader.reachedEnd()` throws "Stream is closed!" mid-split (rather than
at the boundary between the last consumed batch and the next), how do we know
the remaining rows in the current already-buffered batch have actually been
delivered to the consumer? The fix assumes the BatchRecords already-fetched
buffer is complete, but `reachedEnd()` calls `ensureBatch()` which can throw
both when refilling AND when the current batch index is past the last buffered
row — is there a way to verify we haven't dropped the tail of an in-flight
batch?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]