pnowojski commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476480862



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
##########
@@ -133,14 +136,14 @@ public boolean isAvailable() {
         * @param bufferAndBacklog
         *              current buffer and backlog including information about 
the next buffer
         */
-       private boolean isAvailable(BufferAndBacklog bufferAndBacklog) {
+       @Nullable

Review comment:
       nit: add a javadoc explaining the returned value?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -171,19 +175,42 @@ private void handleAddingBarrier(BufferConsumer 
bufferConsumer, boolean insertAs
                        checkState(inflightBufferSnapshot.isEmpty(), 
"Supporting only one concurrent checkpoint in unaligned " +
                                "checkpoints");
 
-                       // Meanwhile prepare the collection of in-flight 
buffers which would be fetched in the next step later.
-                       for (BufferConsumer buffer : buffers) {
-                               try (BufferConsumer bc = buffer.copy()) {
-                                       if (bc.isBuffer()) {
-                                               
inflightBufferSnapshot.add(bc.build());
+                       final int pos = buffers.getNumPriorityElements();
+                       buffers.addPriorityElement(bufferConsumer);
+
+                       boolean unalignedCheckpoint = 
isUnalignedCheckpoint(bufferConsumer);
+                       if (unalignedCheckpoint) {
+                               final Iterator<BufferConsumer> iterator = 
buffers.iterator();
+                               Iterators.advance(iterator, pos + 1);
+                               while (iterator.hasNext()) {
+                                       BufferConsumer buffer = iterator.next();
+
+                                       if (buffer.isBuffer()) {
+                                               try (BufferConsumer bc = 
buffer.copy()) {
+                                                       
inflightBufferSnapshot.add(bc.build());
+                                               }
                                        }
                                }
                        }
+                       return;
+               }

Review comment:
       Why do we need this change? In what scenarios are you expecting more 
than one priority event in the output buffer?  (if there is a reason that I'm 
forgetting about, please add it to the commit message)
   
   edit: (after reading commit message a couple of times) Or you are just 
re-using here a class, that you are mostly intending to use later in the future 
(on the inputs?)? If so maybe it needs some more explanation in the commit 
message?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -195,28 +193,27 @@ void retriggerSubpartitionRequest(int subpartitionIndex) 
throws IOException {
        }
 
        @Override
-       public void spillInflightBuffers(long checkpointId, ChannelStateWriter 
channelStateWriter) throws IOException {
+       public void spillInflightBuffers(long checkpointId, ChannelStateWriter 
channelStateWriter) {
                synchronized (receivedBuffers) {
-                       checkState(checkpointId > lastRequestedCheckpointId, 
"Need to request the next checkpointId");
-
-                       final List<Buffer> inflightBuffers = new 
ArrayList<>(receivedBuffers.size());
-                       for (Buffer buffer : receivedBuffers) {
-                               CheckpointBarrier checkpointBarrier = 
parseCheckpointBarrierOrNull(buffer);
-                               if (checkpointBarrier != null && 
checkpointBarrier.getId() >= checkpointId) {
-                                       break;
+                       final Integer numRecords = 
numRecordsOvertaken.remove(checkpointId);

Review comment:
       shouldn't we remove also obsolete values from this map? (to prevent a 
potential memory leak?)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -171,19 +175,42 @@ private void handleAddingBarrier(BufferConsumer 
bufferConsumer, boolean insertAs
                        checkState(inflightBufferSnapshot.isEmpty(), 
"Supporting only one concurrent checkpoint in unaligned " +
                                "checkpoints");
 
-                       // Meanwhile prepare the collection of in-flight 
buffers which would be fetched in the next step later.
-                       for (BufferConsumer buffer : buffers) {
-                               try (BufferConsumer bc = buffer.copy()) {
-                                       if (bc.isBuffer()) {
-                                               
inflightBufferSnapshot.add(bc.build());
+                       final int pos = buffers.getNumPriorityElements();
+                       buffers.addPriorityElement(bufferConsumer);
+
+                       boolean unalignedCheckpoint = 
isUnalignedCheckpoint(bufferConsumer);
+                       if (unalignedCheckpoint) {
+                               final Iterator<BufferConsumer> iterator = 
buffers.iterator();
+                               Iterators.advance(iterator, pos + 1);
+                               while (iterator.hasNext()) {
+                                       BufferConsumer buffer = iterator.next();
+
+                                       if (buffer.isBuffer()) {
+                                               try (BufferConsumer bc = 
buffer.copy()) {
+                                                       
inflightBufferSnapshot.add(bc.build());
+                                               }
                                        }
                                }
                        }
+                       return;
+               }
+               buffers.add(bufferConsumer);

Review comment:
       nit: it was a bit confusing for me for a moment which code paths are 
doing what. IMO it would be easier to follow it, if the shorter branch would be 
first, and if the more complicated part would have one lever of nesting less:
   ```
   if (!insertAsHead) {
     buffers.add(bufferConsumer);
     return;
   }
   //rest of the code
   ```
   in that case it's more obvious that `!insertAsHead` is a trivial case and 
that it doesn't interact with the other branch at all.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -195,28 +193,27 @@ void retriggerSubpartitionRequest(int subpartitionIndex) 
throws IOException {
        }
 
        @Override
-       public void spillInflightBuffers(long checkpointId, ChannelStateWriter 
channelStateWriter) throws IOException {
+       public void spillInflightBuffers(long checkpointId, ChannelStateWriter 
channelStateWriter) {
                synchronized (receivedBuffers) {
-                       checkState(checkpointId > lastRequestedCheckpointId, 
"Need to request the next checkpointId");
-
-                       final List<Buffer> inflightBuffers = new 
ArrayList<>(receivedBuffers.size());
-                       for (Buffer buffer : receivedBuffers) {
-                               CheckpointBarrier checkpointBarrier = 
parseCheckpointBarrierOrNull(buffer);
-                               if (checkpointBarrier != null && 
checkpointBarrier.getId() >= checkpointId) {
-                                       break;
+                       final Integer numRecords = 
numRecordsOvertaken.remove(checkpointId);

Review comment:
       Do I understand it correctly? Currently there is a fragile contract, 
that `numRecordsOvertaken` value wouldn't change between `onBuffer(...)` where 
we are setting it and this `spillInflightBuffers(...)` call? In other words, it 
assumes that between enqueueing of the priority event and the 
`spillInflightBuffers(...)`, task thread is not allowed to process any buffers?
   
   Maybe it would be better to embed the `numRecordsOvertaken` value in the 
priority event that would be processed by the task thread?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
##########
@@ -74,34 +106,34 @@ public CheckpointedInputGate(
        }
 
        @Override
-       public Optional<BufferOrEvent> pollNext() throws Exception {
-               while (true) {

Review comment:
       Isn't it changing the semantic slightly? Am I right, that the only case 
on the master branch which actually causes another iteration of this loop is 
   ```
   barrierHandler.processCancellationBarrier((CancelCheckpointMarker) 
bufferOrEvent.getEvent());
   ```
   and all of the other cases were exiting the loop? Are now all of the cases 
exiting always?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -195,28 +193,27 @@ void retriggerSubpartitionRequest(int subpartitionIndex) 
throws IOException {
        }
 
        @Override
-       public void spillInflightBuffers(long checkpointId, ChannelStateWriter 
channelStateWriter) throws IOException {
+       public void spillInflightBuffers(long checkpointId, ChannelStateWriter 
channelStateWriter) {
                synchronized (receivedBuffers) {
-                       checkState(checkpointId > lastRequestedCheckpointId, 
"Need to request the next checkpointId");
-
-                       final List<Buffer> inflightBuffers = new 
ArrayList<>(receivedBuffers.size());
-                       for (Buffer buffer : receivedBuffers) {
-                               CheckpointBarrier checkpointBarrier = 
parseCheckpointBarrierOrNull(buffer);
-                               if (checkpointBarrier != null && 
checkpointBarrier.getId() >= checkpointId) {
-                                       break;
+                       final Integer numRecords = 
numRecordsOvertaken.remove(checkpointId);

Review comment:
       `numRecordsOvertaken` -> `numBuffersOvertaken`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##########
@@ -770,34 +808,50 @@ void triggerPartitionStateCheck(ResultPartitionID 
partitionId) {
                        }));
        }
 
-       private void queueChannel(InputChannel channel) {
-               int availableChannels;
+       private void queueChannel(InputChannel channel, boolean priority) {

Review comment:
       Heh, there are quite a bit more of corner/edge cases now.
   
   I wonder if there is maybe some other way to express the priority events, 
that would simplify the input gates code?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
##########
@@ -159,10 +158,12 @@ public InputStatus emitNext(DataOutput<T> output) throws 
Exception {
                        if (bufferOrEvent.isPresent()) {
                                // return to the mailbox after receiving a 
checkpoint barrier to avoid processing of
                                // data after the barrier before checkpoint is 
performed for unaligned checkpoint mode
-                               if (bufferOrEvent.get().isEvent() && 
bufferOrEvent.get().getEvent() instanceof CheckpointBarrier) {
+                               if (bufferOrEvent.get().isBuffer()) {
+                                       processBuffer(bufferOrEvent.get());
+                               } else {
+                                       processEvent(bufferOrEvent.get());

Review comment:
       Is this spit into `processBuffer` and `processEvent` relevant?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
##########
@@ -133,14 +136,14 @@ public boolean isAvailable() {
         * @param bufferAndBacklog
         *              current buffer and backlog including information about 
the next buffer
         */
-       private boolean isAvailable(BufferAndBacklog bufferAndBacklog) {
+       @Nullable
+       private Buffer.DataType getNextDataType(BufferAndBacklog 
bufferAndBacklog) {
                // BEWARE: this must be in sync with #isAvailable()!
-               if (numCreditsAvailable > 0) {
-                       return bufferAndBacklog.isDataAvailable();
-               }
-               else {
-                       return bufferAndBacklog.isEventAvailable();
+               final Buffer.DataType nextDataType = 
bufferAndBacklog.getNextDataType();
+               if (numCreditsAvailable > 0 || (nextDataType != null && 
nextDataType.isEvent())) {
+                       return nextDataType;
                }
+               return null;

Review comment:
       hmmm, maybe add another enum type for this purpose? (I'm not sure, just 
brain storming)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##########
@@ -621,61 +626,84 @@ public boolean isFinished() {
                return Optional.of(transformToBufferOrEvent(
                        inputWithData.data.buffer(),
                        inputWithData.moreAvailable,
-                       inputWithData.input));
+                       inputWithData.input,
+                       inputWithData.morePriorityEvents));
        }
 
        private Optional<InputWithData<InputChannel, BufferAndAvailability>> 
waitAndGetNextData(boolean blocking)
                        throws IOException, InterruptedException {
                while (true) {
-                       Optional<InputChannel> inputChannel = 
getChannel(blocking);
-                       if (!inputChannel.isPresent()) {
+                       Optional<InputChannel> inputChannelOpt = 
getChannel(blocking);
+                       if (!inputChannelOpt.isPresent()) {
                                return Optional.empty();
                        }
 
                        // Do not query inputChannel under the lock, to avoid 
potential deadlocks coming from
                        // notifications.
-                       Optional<BufferAndAvailability> result = 
inputChannel.get().getNextBuffer();
+                       final InputChannel inputChannel = inputChannelOpt.get();
+                       Optional<BufferAndAvailability> 
bufferAndAvailabilityOpt = inputChannel.getNextBuffer();
 
                        synchronized (inputChannelsWithData) {
-                               if (result.isPresent() && 
result.get().moreAvailable()) {
+                               if (!bufferAndAvailabilityOpt.isPresent()) {
+                                       if (inputChannelsWithData.isEmpty()) {
+                                               
availabilityHelper.resetUnavailable();
+                                       }
+                                       continue;

Review comment:
       maybe if the `result` variable rename and adding `continue` branch had 
happened in an independent "refactor" commit, It would have saved me a couple 
of minutes while reading this code while I was trying to understand the change 
:( 
   
   maybe, as I can see how the changes are a bit interconnected.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
##########
@@ -63,9 +66,38 @@
         */
        public CheckpointedInputGate(
                        InputGate inputGate,
-                       CheckpointBarrierHandler barrierHandler) {
+                       CheckpointBarrierHandler barrierHandler,
+                       MailboxExecutor mailboxExecutor) {
                this.inputGate = inputGate;
                this.barrierHandler = barrierHandler;
+               this.mailboxExecutor = mailboxExecutor;
+
+               waitForPriorityEvents(inputGate, mailboxExecutor);
+       }
+
+       /**
+        * Eagerly pulls and processes all priority events. Must be called from 
task thread.
+        *
+        * <p>Basic assumption is that no priority event needs to be handled by 
the {@link StreamTaskNetworkInput}.
+        */
+       private void processPriorityEvents() throws IOException, 
InterruptedException {
+               // check if the priority event is still not processed (could 
have been pulled before mail was being executed)
+               final boolean hasPriorityEvents = 
inputGate.getPriorityEventAvailableFuture().isDone();
+               if (hasPriorityEvents) {
+                       // process as many priority events as possible
+                       while 
(pollNext().map(BufferOrEvent::morePriorityEvents).orElse(false)) {
+                       }
+               }
+
+               // re-enqueue mail to process priority events
+               waitForPriorityEvents(inputGate, mailboxExecutor);
+       }
+
+       private void waitForPriorityEvents(InputGate inputGate, MailboxExecutor 
mailboxExecutor) {
+               final CompletableFuture<?> priorityEventAvailableFuture = 
inputGate.getPriorityEventAvailableFuture();
+               priorityEventAvailableFuture.thenRun(() -> {
+                       mailboxExecutor.execute(this::processPriorityEvents, 
"process priority even @ gate %s", inputGate);
+               });

Review comment:
       1. Again, do I understand this correctly? Is this assuming that nobody 
polls anything between completing `getPriorityEventAvailableFuture` and 
executing `this::processPriorityEvents`? Isn't that a bit fragile?
   
   2. What was the motivation for no passing the priority events to the 
`StreamTaskNetworkInput`?
   
   3. What about processing priority events as part of `pollNext()`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
##########
@@ -74,34 +106,34 @@ public CheckpointedInputGate(
        }
 
        @Override
-       public Optional<BufferOrEvent> pollNext() throws Exception {
-               while (true) {
-                       Optional<BufferOrEvent> next = inputGate.pollNext();
+       public Optional<BufferOrEvent> pollNext() throws IOException, 
InterruptedException {
+               Optional<BufferOrEvent> next = inputGate.pollNext();
 
-                       if (!next.isPresent()) {
-                               return handleEmptyBuffer();
-                       }
+               if (!next.isPresent()) {
+                       return handleEmptyBuffer();
+               }
 
-                       BufferOrEvent bufferOrEvent = next.get();
-                       
checkState(!barrierHandler.isBlocked(bufferOrEvent.getChannelInfo()));
+               BufferOrEvent bufferOrEvent = next.get();
+               
checkState(!barrierHandler.isBlocked(bufferOrEvent.getChannelInfo()));
 
-                       if (bufferOrEvent.isBuffer()) {
-                               return next;
-                       }
-                       else if (bufferOrEvent.getEvent().getClass() == 
CheckpointBarrier.class) {
-                               CheckpointBarrier checkpointBarrier = 
(CheckpointBarrier) bufferOrEvent.getEvent();
-                               
barrierHandler.processBarrier(checkpointBarrier, 
bufferOrEvent.getChannelInfo());
-                               return next;
-                       }
-                       else if (bufferOrEvent.getEvent().getClass() == 
CancelCheckpointMarker.class) {
-                               
barrierHandler.processCancellationBarrier((CancelCheckpointMarker) 
bufferOrEvent.getEvent());
-                       }
-                       else {
-                               if (bufferOrEvent.getEvent().getClass() == 
EndOfPartitionEvent.class) {
-                                       barrierHandler.processEndOfPartition();
-                               }
-                               return next;
-                       }
+               if (bufferOrEvent.isEvent()) {
+                       handleEvent(bufferOrEvent);
+               } else {
+                       barrierHandler.processBuffer(bufferOrEvent.getBuffer(), 
bufferOrEvent.getChannelInfo());

Review comment:
       for now in this commit, this is just a NO-OP call?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
##########
@@ -63,9 +66,38 @@
         */
        public CheckpointedInputGate(
                        InputGate inputGate,
-                       CheckpointBarrierHandler barrierHandler) {
+                       CheckpointBarrierHandler barrierHandler,
+                       MailboxExecutor mailboxExecutor) {
                this.inputGate = inputGate;
                this.barrierHandler = barrierHandler;
+               this.mailboxExecutor = mailboxExecutor;
+
+               waitForPriorityEvents(inputGate, mailboxExecutor);
+       }
+
+       /**
+        * Eagerly pulls and processes all priority events. Must be called from 
task thread.
+        *
+        * <p>Basic assumption is that no priority event needs to be handled by 
the {@link StreamTaskNetworkInput}.
+        */
+       private void processPriorityEvents() throws IOException, 
InterruptedException {
+               // check if the priority event is still not processed (could 
have been pulled before mail was being executed)
+               final boolean hasPriorityEvents = 
inputGate.getPriorityEventAvailableFuture().isDone();
+               if (hasPriorityEvents) {
+                       // process as many priority events as possible
+                       while 
(pollNext().map(BufferOrEvent::morePriorityEvents).orElse(false)) {
+                       }

Review comment:
       As I understand it, it assumes that this `pollNext()` can not return 
anything else besides a priority event?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -92,306 +90,173 @@
                super(toNotifyOnCheckpoint);
 
                this.taskName = taskName;
-               hasInflightBuffers = Arrays.stream(inputGates)
+               this.inputGates = inputGates;
+               storeNewBuffers = Arrays.stream(inputGates)
                        .flatMap(gate -> gate.getChannelInfos().stream())
                        .collect(Collectors.toMap(Function.identity(), info -> 
false));
-               threadSafeUnaligner = new 
ThreadSafeUnaligner(checkNotNull(checkpointCoordinator), this, inputGates);
+               numOpenChannels = storeNewBuffers.size();
+               this.checkpointCoordinator = checkpointCoordinator;
        }
 
-       /**
-        * We still need to trigger checkpoint via {@link 
ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, InputChannelInfo)}
-        * while reading the first barrier from one channel, because this might 
happen
-        * earlier than the previous async trigger via mailbox by netty thread.
-        *
-        * <p>Note this is also suitable for the trigger case of local input 
channel.
-        */
        @Override
-       public void processBarrier(CheckpointBarrier receivedBarrier, 
InputChannelInfo channelInfo) throws IOException {
-               long barrierId = receivedBarrier.getId();
-               if (currentConsumedCheckpointId > barrierId || 
(currentConsumedCheckpointId == barrierId && !isCheckpointPending())) {
+       public void processBarrier(CheckpointBarrier barrier, InputChannelInfo 
channelInfo) throws IOException {
+               long barrierId = barrier.getId();
+               if (currentCheckpointId > barrierId || (currentCheckpointId == 
barrierId && !isCheckpointPending())) {
                        // ignore old and cancelled barriers
                        return;
                }
-               if (currentConsumedCheckpointId < barrierId) {
-                       currentConsumedCheckpointId = barrierId;
-                       numBarrierConsumed = 0;
-                       hasInflightBuffers.entrySet().forEach(hasInflightBuffer 
-> hasInflightBuffer.setValue(true));
+               if (currentCheckpointId < barrierId) {
+                       handleNewCheckpoint(barrier);
+                       notifyCheckpoint(barrier, 0);
                }
-               if (currentConsumedCheckpointId == barrierId) {
-                       hasInflightBuffers.put(channelInfo, false);
-                       numBarrierConsumed++;
+               if (currentCheckpointId == barrierId) {
+                       if (storeNewBuffers.put(channelInfo, false)) {
+                               LOG.debug("{}: Received barrier from channel {} 
@ {}.", taskName, channelInfo, barrierId);
+
+                               
inputGates[channelInfo.getGateIdx()].getChannel(channelInfo.getInputChannelIdx())
+                                       .spillInflightBuffers(barrierId, 
checkpointCoordinator.getChannelStateWriter());
+
+                               if (++numBarriersReceived == numOpenChannels) {
+                                       
allBarriersReceivedFuture.complete(null);
+                               }
+                       }
                }
-               threadSafeUnaligner.notifyBarrierReceived(receivedBarrier, 
channelInfo);
        }
 
        @Override
        public void abortPendingCheckpoint(long checkpointId, 
CheckpointException exception) throws IOException {
-               threadSafeUnaligner.tryAbortPendingCheckpoint(checkpointId, 
exception);
+               tryAbortPendingCheckpoint(checkpointId, exception);
 
-               if (checkpointId > currentConsumedCheckpointId) {
-                       resetPendingCheckpoint(checkpointId);
+               if (checkpointId > currentCheckpointId) {
+                       resetPendingCheckpoint();
                }
        }
 
        @Override
        public void processCancellationBarrier(CancelCheckpointMarker 
cancelBarrier) throws IOException {
                final long cancelledId = cancelBarrier.getCheckpointId();
-               boolean shouldAbort = 
threadSafeUnaligner.setCancelledCheckpointId(cancelledId);
+               boolean shouldAbort = setCancelledCheckpointId(cancelledId);
                if (shouldAbort) {
                        notifyAbort(
                                cancelledId,
                                new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
                }
 
-               if (cancelledId >= currentConsumedCheckpointId) {
-                       resetPendingCheckpoint(cancelledId);
-                       currentConsumedCheckpointId = cancelledId;
+               if (cancelledId >= currentCheckpointId) {
+                       resetPendingCheckpoint();
+                       currentCheckpointId = cancelledId;
                }
        }
 
        @Override
        public void processEndOfPartition() throws IOException {
-               threadSafeUnaligner.onChannelClosed();
-               resetPendingCheckpoint(-1L);
+               numOpenChannels--;
+
+               resetPendingCheckpoint();
+               notifyAbort(
+                       currentCheckpointId,
+                       new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
        }
 
-       private void resetPendingCheckpoint(long checkpointId) {
-               if (isCheckpointPending()) {
-                       LOG.warn("{}: Received barrier or EndOfPartition(-1) {} 
before completing current checkpoint {}. " +
-                                       "Skipping current checkpoint.",
-                               taskName,
-                               checkpointId,
-                               currentConsumedCheckpointId);
+       private void resetPendingCheckpoint() {
+               LOG.warn("{}: Received barrier or EndOfPartition(-1) before 
completing current checkpoint {}. " +
+                               "Skipping current checkpoint.",
+                       taskName,
+                       currentCheckpointId);
 
-                       hasInflightBuffers.entrySet().forEach(hasInflightBuffer 
-> hasInflightBuffer.setValue(false));
-                       numBarrierConsumed = 0;
-               }
+               storeNewBuffers.entrySet().forEach(storeNewBuffer -> 
storeNewBuffer.setValue(false));
+               numBarriersReceived = 0;
        }
 
        @Override
        public long getLatestCheckpointId() {
-               return currentConsumedCheckpointId;
+               return currentCheckpointId;
        }
 
        @Override
        public String toString() {
-               return String.format("%s: last checkpoint: %d", taskName, 
currentConsumedCheckpointId);
+               return String.format("%s: last checkpoint: %d", taskName, 
currentCheckpointId);
        }
 
        @Override
        public void close() throws IOException {
                super.close();
-               threadSafeUnaligner.close();
-       }
-
-       @Override
-       public boolean hasInflightData(long checkpointId, InputChannelInfo 
channelInfo) {
-               if (checkpointId < currentConsumedCheckpointId) {
-                       return false;
-               }
-               if (checkpointId > currentConsumedCheckpointId) {
-                       return true;
-               }
-               return hasInflightBuffers.get(channelInfo);
-       }
-
-       @Override
-       public CompletableFuture<Void> getAllBarriersReceivedFuture(long 
checkpointId) {
-               return 
threadSafeUnaligner.getAllBarriersReceivedFuture(checkpointId);
-       }
-
-       @Override
-       public Optional<BufferReceivedListener> getBufferReceivedListener() {
-               return Optional.of(threadSafeUnaligner);
+               allBarriersReceivedFuture.cancel(false);
        }
 
        @Override
        protected boolean isCheckpointPending() {
-               return numBarrierConsumed > 0;
-       }
-
-       @VisibleForTesting
-       int getNumOpenChannels() {
-               return threadSafeUnaligner.getNumOpenChannels();
-       }
-
-       @VisibleForTesting
-       ThreadSafeUnaligner getThreadSafeUnaligner() {
-               return threadSafeUnaligner;
+               return numBarriersReceived > 0;
        }
 
-       private void notifyCheckpoint(CheckpointBarrier barrier) throws 
IOException {
-               // ignore the previous triggered checkpoint by netty thread if 
it was already canceled or aborted before.
-               if (barrier.getId() >= 
threadSafeUnaligner.getCurrentCheckpointId()) {
-                       super.notifyCheckpoint(barrier, 0);
+       @Override
+       public void processBuffer(Buffer buffer, InputChannelInfo channelInfo) {
+               if (storeNewBuffers.get(channelInfo)) {
+                       
checkpointCoordinator.getChannelStateWriter().addInputData(
+                               currentCheckpointId,
+                               channelInfo,
+                               ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                               ofElement(buffer.retainBuffer(), 
Buffer::recycleBuffer));

Review comment:
       Here, we are persisting in-flight buffers, only as they are being 
processed? Doesn't it mean, that unaligned checkpoint will be completed only 
after we process all of the buffers, making unaligned checkpoint just as quick 
as aligned?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to