pvary commented on code in PR #16545:
URL: https://github.com/apache/iceberg/pull/16545#discussion_r3349516027
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/reader/ArrayPoolDataIteratorBatcher.java:
##########
@@ -118,13 +145,46 @@ public void close() throws IOException {
inputIterator.close();
}
+ @Override
+ public void wakeUp() {
+ wakeUp.set(true);
+ }
+
+ /**
+ * Gets a cached entry from the pool.
+ *
+ * <p>Flink's {@link Pool} only offers a blocking {@link
Pool#pollEntry()}, which cannot be
+ * interrupted cooperatively: Flink's shutdown path calls {@link
#wakeUp()} but does not
+ * interrupt the fetcher thread, so a thread parked in {@code pollEntry()}
would leak. Instead
+ * we poll with {@link Pool#tryPollEntry()} and a short sleep, re-checking
the {@link #wakeUp}
+ * flag each iteration.
+ *
+ * <p>While waiting for watermark alignment the {@code SourceOperator}
stops recycling batches,
+ * which exhausts the pool and keeps this loop polling. That is the
intended pause: it is driven
+ * by pool exhaustion, not by {@link #wakeUp()}. If {@link #wakeUp()} is
signalled (during
+ * shutdown, or potentially during split watermark alignment), this method
returns {@code null}
+ * so the caller can hand back an empty batch and stay reentrant.
+ *
+ * @return a cached array from the pool, or {@code null} if woken up
+ */
private T[] getCachedEntry() {
- try {
- return pool.pollEntry();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("Interrupted while waiting for array pool
entry", e);
+ while (!wakeUp.get()) {
+ T[] entry = pool.tryPollEntry();
+ if (entry != null) {
+ return entry;
+ }
+
+ try {
+ Thread.sleep(POLL_TIMEOUT_MS);
Review Comment:
I hate this.
Maybe:
```
class InterruptablePool<T> extends Pool<T> {
// Holds the thread currently blocked in pollEntry(), or null when no
thread is blocking. It is
// written by the fetcher thread and read by the thread calling wakeup(),
so it must be safely
// published across threads - an AtomicReference (like a volatile)
guarantees that visibility.
private final AtomicReference<Thread> blockedThread = new
AtomicReference<>();
InterruptablePool(int poolCapacity) {
super(poolCapacity);
}
@Override
public T pollEntry() throws InterruptedException {
blockedThread.set(Thread.currentThread());
try {
return super.pollEntry();
} finally {
blockedThread.set(null);
}
}
/**
* Interrupts the thread (if any) currently blocked in {@link
#pollEntry()}. This is safe to call
* from a thread other than the one blocked in {@code pollEntry()}.
*/
void wakeup() {
Thread thread = blockedThread.get();
if (thread != null) {
thread.interrupt();
}
}
}
```
Or even better:
```
class PoolWithWakeup<T> {
private final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Deque<T> entries;
private final Pool.Recycler<T> recycler = this::addBack;
// Set by wakeup() and consumed by pollEntry(). Guarded by lock.
private boolean wokenUp;
PoolWithWakeup(int poolCapacity) {
this.entries = new ArrayDeque<>(poolCapacity);
}
/** Adds an entry to the pool. Used to populate the pool initially. */
void add(T entry) {
addBack(entry);
}
/**
* Returns a recycler that adds entries back to the pool once they are no
longer in use. A blocked
* {@link #pollEntry()} is woken up when an entry is recycled.
*/
Pool.Recycler<T> recycler() {
return recycler;
}
/**
* Retrieves the next available entry, blocking until one is recycled or
until {@link #wakeup()} is
* called.
*
* @return the next entry, or {@code null} if the pool was woken up while
waiting
*/
T pollEntry() throws InterruptedException {
lock.lock();
try {
while (entries.isEmpty()) {
if (wokenUp) {
// Consume the wakeup signal and return without an entry.
wokenUp = false;
return null;
}
notEmpty.await();
}
return entries.pollFirst();
} finally {
lock.unlock();
}
}
/**
* Unblocks a thread currently waiting in {@link #pollEntry()} without
interrupting it. If no
* thread is currently blocked, the next {@link #pollEntry()} call that
would otherwise block
* returns {@code null} once. Safe to call from a thread other than the
one blocked in {@code
* pollEntry()}.
*/
void wakeup() {
lock.lock();
try {
wokenUp = true;
notEmpty.signal();
} finally {
lock.unlock();
}
}
private void addBack(T entry) {
lock.lock();
try {
entries.addLast(entry);
notEmpty.signal();
} finally {
lock.unlock();
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]