AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476675096
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
##########
@@ -63,9 +66,38 @@
*/
public CheckpointedInputGate(
InputGate inputGate,
- CheckpointBarrierHandler barrierHandler) {
+ CheckpointBarrierHandler barrierHandler,
+ MailboxExecutor mailboxExecutor) {
this.inputGate = inputGate;
this.barrierHandler = barrierHandler;
+ this.mailboxExecutor = mailboxExecutor;
+
+ waitForPriorityEvents(inputGate, mailboxExecutor);
+ }
+
+ /**
+ * Eagerly pulls and processes all priority events. Must be called from
task thread.
+ *
+ * <p>Basic assumption is that no priority event needs to be handled by
the {@link StreamTaskNetworkInput}.
+ */
+ private void processPriorityEvents() throws IOException,
InterruptedException {
+ // check if the priority event is still not processed (could
have been pulled before mail was being executed)
+ final boolean hasPriorityEvents =
inputGate.getPriorityEventAvailableFuture().isDone();
+ if (hasPriorityEvents) {
+ // process as many priority events as possible
+ while
(pollNext().map(BufferOrEvent::morePriorityEvents).orElse(false)) {
+ }
Review comment:
Yes, first this method checks if there is at least one priority event
(priority future completed). If there is at least one, it starts processing the
first one. At this point, it relies on `BufferOrEvent::morePriorityEvents` to
be correct in both directions (no false positives or negatives; although a
false negative would just be a tad slower).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]