AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476673355



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
##########
@@ -63,9 +66,38 @@
         */
        public CheckpointedInputGate(
                        InputGate inputGate,
-                       CheckpointBarrierHandler barrierHandler) {
+                       CheckpointBarrierHandler barrierHandler,
+                       MailboxExecutor mailboxExecutor) {
                this.inputGate = inputGate;
                this.barrierHandler = barrierHandler;
+               this.mailboxExecutor = mailboxExecutor;
+
+               waitForPriorityEvents(inputGate, mailboxExecutor);
+       }
+
+       /**
+        * Eagerly pulls and processes all priority events. Must be called from 
task thread.
+        *
+        * <p>Basic assumption is that no priority event needs to be handled by 
the {@link StreamTaskNetworkInput}.
+        */
+       private void processPriorityEvents() throws IOException, 
InterruptedException {
+               // check if the priority event is still not processed (could 
have been pulled before mail was being executed)
+               final boolean hasPriorityEvents = 
inputGate.getPriorityEventAvailableFuture().isDone();
+               if (hasPriorityEvents) {
+                       // process as many priority events as possible
+                       while 
(pollNext().map(BufferOrEvent::morePriorityEvents).orElse(false)) {
+                       }
+               }
+
+               // re-enqueue mail to process priority events
+               waitForPriorityEvents(inputGate, mailboxExecutor);
+       }
+
+       private void waitForPriorityEvents(InputGate inputGate, MailboxExecutor 
mailboxExecutor) {
+               final CompletableFuture<?> priorityEventAvailableFuture = 
inputGate.getPriorityEventAvailableFuture();
+               priorityEventAvailableFuture.thenRun(() -> {
+                       mailboxExecutor.execute(this::processPriorityEvents, 
"process priority even @ gate %s", inputGate);
+               });

Review comment:
       1. Nope, this assumption does not hold. That's why the first thing that 
`processPriorityEvents` does is to check if the future is still completed. If 
the task polled the only priority event in the meantime, the future has been 
reset. During the execution of `processPriorityEvents` in the task thread, the 
task cannot concurrently pull the priority event, so this is safe.
   2.+3. The basic idea of not involving `StreamTaskNetworkInput#emitNext` or 
using `pollNext()` is to not make non-blocking output more complicated. 
Currently, `emitNext` or `pollNext` are only called when an output buffer is 
available. In the meantime only mails are processed. Hence, I used a mail to 
perform `processPriorityEvents`.
   Note that the assumption here is that no priority event ever need to be 
handled in `emitNext` (which currently only handles `EndOfPartitionEvent`)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to