pvary commented on code in PR #16545:
URL: https://github.com/apache/iceberg/pull/16545#discussion_r3334649297
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java:
##########
@@ -123,7 +124,16 @@ public void
handleSplitsChanges(SplitsChange<IcebergSourceSplit> splitsChange) {
}
@Override
- public void wakeUp() {}
+ public void wakeUp() {
+ // Flink calls wakeUp() to unblock a fetcher parked in fetch(). The only
place fetch() can block
+ // is in ArrayPoolDataIteratorBatcher waiting for a pool entry (e.g. while
the pool is exhausted
+ // during watermark alignment). Relay the signal so that thread can return
control and exit
+ // cleanly on shutdown. fetch() stays reentrant: a woken read just returns
an empty batch.
+ CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> reader =
currentReader;
+ if (reader instanceof WakeableIterator) {
+ ((WakeableIterator<?>) reader).wakeUp();
+ }
Review Comment:
```suggestion
if (currentReader instanceof WakeableIterator wakeableIterator) {
wakeableIterator.wakeUp();
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]