pnowojski commented on a change in pull request #13827:
URL: https://github.com/apache/flink/pull/13827#discussion_r514425412
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -517,6 +517,21 @@ public void checkpointStopped(long checkpointId) {
}
}
+ @Override
+ public void convertToPriorityEvent(int sequenceNumber) throws
IOException {
+ boolean firstPriorityEvent;
+ synchronized (receivedBuffers) {
+ checkState(!channelStatePersister.hasBarrierReceived());
+ SequenceBuffer toPrioritize =
receivedBuffers.getAndRemove(
+ sequenceBuffer -> sequenceBuffer.sequenceNumber
== sequenceNumber);
+ checkState(!toPrioritize.buffer.isBuffer());
+ firstPriorityEvent = addPriorityBuffer(toPrioritize);
+ }
+ if (firstPriorityEvent) {
+ notifyPriorityEvent(sequenceNumber);
+ }
+ }
+
Review comment:
This is different to what we discussed offline @AHeise . Instead of
switching to the pattern of keeping the `CheckpointBarrier` at the tail of the
input queue, and spilling everything that falls between `CheckpointBarrier` and
it's announcement, I decided to keep for the sake of simplicity the current
approach of using priority events. And instead I'm just prioritizing the
previously announced aligned `CheckpointBarrier`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]