pvary commented on code in PR #16545:
URL: https://github.com/apache/iceberg/pull/16545#discussion_r3356908779
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/reader/ArrayPoolDataIteratorBatcher.java:
##########
@@ -118,6 +140,24 @@ public void close() throws IOException {
inputIterator.close();
}
+ @Override
+ public void wakeUp() {
+ pool.wakeUp();
+ }
+
+ /**
+ * Gets a cached entry from the pool, blocking until an entry is recycled
or the reader is woken
+ * up.
+ *
+ * <p>While waiting for watermark alignment the {@code SourceOperator}
stops recycling batches,
+ * which exhausts the pool and blocks this call. That is the intended
pause: it is driven by
+ * pool exhaustion, not by {@link #wakeUp()}. If {@link #wakeUp()} is
signalled (during
+ * shutdown, or potentially during split watermark alignment), {@link
+ * PoolWithWakeup#pollEntry()} returns {@code null} so the caller can hand
back an empty batch
+ * and stay reentrant.
+ *
+ * @return a cached array from the pool, or {@code null} if woken up
Review Comment:
```suggestion
* Gets a cached entry from the pool, blocking until an entry is
recycled or the reader is woken
* up.
*
* @return a cached array from the pool, or {@code null} if woken up
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]