pvary commented on code in PR #16545:
URL: https://github.com/apache/iceberg/pull/16545#discussion_r3334649297


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java:
##########
@@ -123,7 +124,16 @@ public void 
handleSplitsChanges(SplitsChange<IcebergSourceSplit> splitsChange) {
   }
 
   @Override
-  public void wakeUp() {}
+  public void wakeUp() {
+    // Flink calls wakeUp() to unblock a fetcher parked in fetch(). The only 
place fetch() can block
+    // is in ArrayPoolDataIteratorBatcher waiting for a pool entry (e.g. while 
the pool is exhausted
+    // during watermark alignment). Relay the signal so that thread can return 
control and exit
+    // cleanly on shutdown. fetch() stays reentrant: a woken read just returns 
an empty batch.
+    CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> reader = 
currentReader;
+    if (reader instanceof WakeableIterator) {
+      ((WakeableIterator<?>) reader).wakeUp();
+    }

Review Comment:
   ```suggestion
       if (currentReader instanceof WakeableIterator wakeableIterator) {
         wakeableIterator.wakeUp();
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to