shanzi commented on code in PR #16545:
URL: https://github.com/apache/iceberg/pull/16545#discussion_r3346087642


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/reader/ArrayPoolDataIteratorBatcher.java:
##########
@@ -118,13 +145,46 @@ public void close() throws IOException {
       inputIterator.close();
     }
 
+    @Override
+    public void wakeUp() {
+      wakeUp.set(true);
+    }
+
+    /**
+     * Gets a cached entry from the pool.
+     *
+     * <p>Flink's {@link Pool} only offers a blocking {@link 
Pool#pollEntry()}, which cannot be
+     * interrupted cooperatively: Flink's shutdown path calls {@link 
#wakeUp()} but does not
+     * interrupt the fetcher thread, so a thread parked in {@code pollEntry()} 
would leak. Instead
+     * we poll with {@link Pool#tryPollEntry()} and a short sleep, re-checking 
the {@link #wakeUp}
+     * flag each iteration.
+     *
+     * <p>While waiting for watermark alignment the {@code SourceOperator} 
stops recycling batches,
+     * which exhausts the pool and keeps this loop polling. That is the 
intended pause: it is driven
+     * by pool exhaustion, not by {@link #wakeUp()}. If {@link #wakeUp()} is 
signalled (during
+     * shutdown, or potentially during split watermark alignment), this method 
returns {@code null}
+     * so the caller can hand back an empty batch and stay reentrant.
+     *
+     * @return a cached array from the pool, or {@code null} if woken up
+     */
     private T[] getCachedEntry() {
-      try {
-        return pool.pollEntry();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException("Interrupted while waiting for array pool 
entry", e);
+      while (!wakeUp.get()) {
+        T[] entry = pool.tryPollEntry();
+        if (entry != null) {
+          return entry;
+        }
+
+        try {
+          Thread.sleep(POLL_TIMEOUT_MS);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException("Interrupted while waiting for array pool 
entry", e);
+        }

Review Comment:
   Can you elaborate?
   
   The `org.apache.flink.connector.file.src.util.Pool` is interruptible if the 
thread is interrupted. But in flink's SourceFetcher, the `executor` the pool is 
running will actually be called `shutdown` (no thread interruption), instead of 
`shutdownNow` (thread will be interrupted).
   
   That's why we need to make use of the `wakeUp` mechanism to break the loop.
   
   If we make such an interruptible pool, the best we can do seems to be no 
better than an wrapper where we move the loop and `sleep` inside it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to