AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r493044470
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
##########
@@ -62,9 +65,37 @@
*/
public CheckpointedInputGate(
InputGate inputGate,
- CheckpointBarrierHandler barrierHandler) {
+ CheckpointBarrierHandler barrierHandler,
+ MailboxExecutor mailboxExecutor) {
this.inputGate = inputGate;
this.barrierHandler = barrierHandler;
+ this.mailboxExecutor = mailboxExecutor;
+
+ waitForPriorityEvents(inputGate, mailboxExecutor);
+ }
+
+ /**
+ * Eagerly pulls and processes all priority events. Must be called from
task thread.
+ *
+ * <p>Basic assumption is that no priority event needs to be handled by
the {@link StreamTaskNetworkInput}.
+ */
+ private void processPriorityEvents() throws IOException,
InterruptedException {
+ // check if the priority event is still not processed (could
have been pulled before mail was being executed)
+ if (inputGate.getPriorityEventAvailableFuture().isDone()) {
+ // process as many priority events as possible
+ while
(pollNext().map(BufferOrEvent::morePriorityEvents).orElse(false)) {
+ }
+ }
+
Review comment:
Good idea, I solved it in the following way:
```
// check if the priority event is still not processed (could
have been pulled before mail was being executed)
boolean hasPriorityEvent =
inputGate.getPriorityEventAvailableFuture().isDone();
while (hasPriorityEvent) {
// process as many priority events as possible
final Optional<BufferOrEvent> bufferOrEventOpt =
pollNext();
bufferOrEventOpt.ifPresent(bufferOrEvent ->
checkState(bufferOrEvent.hasPriority(), "Should
only poll priority events"));
hasPriorityEvent =
bufferOrEventOpt.map(BufferOrEvent::morePriorityEvents).orElse(false);
}
```
`checkState(!inputGate.getPriorityEventAvailableFuture().isDone())` might be
failing if netty receives a new priority event and triggers this available
future while the task thread polled the last priority event. This case should
happen quite often when the first barrier arrives (at that time the only
priority event, morePriorityEvents = false) and triggers the whole
checkpointing process. The second barrier would then complete the
`getPriorityEventAvailableFuture` causing a more or less immediate re-execution
of this method.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]