shanzi commented on code in PR #16545:
URL: https://github.com/apache/iceberg/pull/16545#discussion_r3355031944


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/reader/ArrayPoolDataIteratorBatcher.java:
##########
@@ -118,13 +145,46 @@ public void close() throws IOException {
       inputIterator.close();
     }
 
+    @Override
+    public void wakeUp() {
+      wakeUp.set(true);
+    }
+
+    /**
+     * Gets a cached entry from the pool.
+     *
+     * <p>Flink's {@link Pool} only offers a blocking {@link 
Pool#pollEntry()}, which cannot be
+     * interrupted cooperatively: Flink's shutdown path calls {@link 
#wakeUp()} but does not
+     * interrupt the fetcher thread, so a thread parked in {@code pollEntry()} 
would leak. Instead
+     * we poll with {@link Pool#tryPollEntry()} and a short sleep, re-checking 
the {@link #wakeUp}
+     * flag each iteration.
+     *
+     * <p>While waiting for watermark alignment the {@code SourceOperator} 
stops recycling batches,
+     * which exhausts the pool and keeps this loop polling. That is the 
intended pause: it is driven
+     * by pool exhaustion, not by {@link #wakeUp()}. If {@link #wakeUp()} is 
signalled (during
+     * shutdown, or potentially during split watermark alignment), this method 
returns {@code null}
+     * so the caller can hand back an empty batch and stay reentrant.
+     *
+     * @return a cached array from the pool, or {@code null} if woken up
+     */
     private T[] getCachedEntry() {
-      try {
-        return pool.pollEntry();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException("Interrupted while waiting for array pool 
entry", e);
+      while (!wakeUp.get()) {
+        T[] entry = pool.tryPollEntry();
+        if (entry != null) {
+          return entry;
+        }
+
+        try {
+          Thread.sleep(POLL_TIMEOUT_MS);

Review Comment:
   thanks. let me try



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to