AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476673355
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
##########
@@ -63,9 +66,38 @@
*/
public CheckpointedInputGate(
InputGate inputGate,
- CheckpointBarrierHandler barrierHandler) {
+ CheckpointBarrierHandler barrierHandler,
+ MailboxExecutor mailboxExecutor) {
this.inputGate = inputGate;
this.barrierHandler = barrierHandler;
+ this.mailboxExecutor = mailboxExecutor;
+
+ waitForPriorityEvents(inputGate, mailboxExecutor);
+ }
+
+ /**
+ * Eagerly pulls and processes all priority events. Must be called from
task thread.
+ *
+ * <p>Basic assumption is that no priority event needs to be handled by
the {@link StreamTaskNetworkInput}.
+ */
+ private void processPriorityEvents() throws IOException,
InterruptedException {
+ // check if the priority event is still not processed (could
have been pulled before mail was being executed)
+ final boolean hasPriorityEvents =
inputGate.getPriorityEventAvailableFuture().isDone();
+ if (hasPriorityEvents) {
+ // process as many priority events as possible
+ while
(pollNext().map(BufferOrEvent::morePriorityEvents).orElse(false)) {
+ }
+ }
+
+ // re-enqueue mail to process priority events
+ waitForPriorityEvents(inputGate, mailboxExecutor);
+ }
+
+ private void waitForPriorityEvents(InputGate inputGate, MailboxExecutor
mailboxExecutor) {
+ final CompletableFuture<?> priorityEventAvailableFuture =
inputGate.getPriorityEventAvailableFuture();
+ priorityEventAvailableFuture.thenRun(() -> {
+ mailboxExecutor.execute(this::processPriorityEvents,
"process priority even @ gate %s", inputGate);
+ });
Review comment:
1. Nope, this assumption does not hold. That's why the first thing that
`processPriorityEvents` does is to check if the future is still completed. If
the task polled the only priority event in the meantime, the future has been
reset. During the execution of `processPriorityEvents` in the task thread, the
task cannot concurrently pull the priority event, so this is safe.
2.+3. The basic idea of not involving `StreamTaskNetworkInput#emitNext` or
using `pollNext()` is to not make non-blocking output more complicated.
Currently, `emitNext` or `pollNext` are only called when an output buffer is
available. In the meantime only mails are processed. Hence, I used a mail to
perform `processPriorityEvents`.
Note that the assumption here is that no priority event ever need to be
handled in `emitNext` (which currently only handles `EndOfPartitionEvent`)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]