shanzi commented on code in PR #16545:
URL: https://github.com/apache/iceberg/pull/16545#discussion_r3346087642
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/reader/ArrayPoolDataIteratorBatcher.java:
##########
@@ -118,13 +145,46 @@ public void close() throws IOException {
inputIterator.close();
}
+ @Override
+ public void wakeUp() {
+ wakeUp.set(true);
+ }
+
+ /**
+ * Gets a cached entry from the pool.
+ *
+ * <p>Flink's {@link Pool} only offers a blocking {@link
Pool#pollEntry()}, which cannot be
+ * interrupted cooperatively: Flink's shutdown path calls {@link
#wakeUp()} but does not
+ * interrupt the fetcher thread, so a thread parked in {@code pollEntry()}
would leak. Instead
+ * we poll with {@link Pool#tryPollEntry()} and a short sleep, re-checking
the {@link #wakeUp}
+ * flag each iteration.
+ *
+ * <p>While waiting for watermark alignment the {@code SourceOperator}
stops recycling batches,
+ * which exhausts the pool and keeps this loop polling. That is the
intended pause: it is driven
+ * by pool exhaustion, not by {@link #wakeUp()}. If {@link #wakeUp()} is
signalled (during
+ * shutdown, or potentially during split watermark alignment), this method
returns {@code null}
+ * so the caller can hand back an empty batch and stay reentrant.
+ *
+ * @return a cached array from the pool, or {@code null} if woken up
+ */
private T[] getCachedEntry() {
- try {
- return pool.pollEntry();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("Interrupted while waiting for array pool
entry", e);
+ while (!wakeUp.get()) {
+ T[] entry = pool.tryPollEntry();
+ if (entry != null) {
+ return entry;
+ }
+
+ try {
+ Thread.sleep(POLL_TIMEOUT_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while waiting for array pool
entry", e);
+ }
Review Comment:
Can you elaborate?
The `org.apache.flink.connector.file.src.util.Pool` is interruptible if the
thread is interrupted. But in flink's SourceFetcher, the `executor` the pool is
running will actually be called `shutdown` (no thread interruption), instead of
`shutdownNow` (thread will be interrupted).
That's why we need to make use of the `wakeUp` mechanism to break the loop.
If we make such an interruptible pool, the best we can do seems to be no
better than an wrapper where we move the loop and `sleep` inside it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]