SteNicholas commented on a change in pull request #13827:
URL: https://github.com/apache/flink/pull/13827#discussion_r514985679



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -517,6 +517,21 @@ public void checkpointStopped(long checkpointId) {
                }
        }
 
+       @Override
+       public void convertToPriorityEvent(int sequenceNumber) throws 
IOException {
+               boolean firstPriorityEvent;
+               synchronized (receivedBuffers) {
+                       checkState(!channelStatePersister.hasBarrierReceived());
+                       SequenceBuffer toPrioritize = 
receivedBuffers.getAndRemove(
+                               sequenceBuffer -> sequenceBuffer.sequenceNumber 
== sequenceNumber);
+                       checkState(!toPrioritize.buffer.isBuffer());
+                       firstPriorityEvent = addPriorityBuffer(toPrioritize);
+               }
+               if (firstPriorityEvent) {
+                       notifyPriorityEvent(sequenceNumber);
+               }
+       }
+

Review comment:
       @pnowojski , I thought that `firstPriorityEvent` variable is unused and 
could be removed like following:
   ```
   public void convertToPriorityEvent(int sequenceNumber) throws IOException {
                synchronized (receivedBuffers) {
                        checkState(!channelStatePersister.hasBarrierReceived());
                        SequenceBuffer toPrioritize = 
receivedBuffers.getAndRemove(
                                sequenceBuffer -> sequenceBuffer.sequenceNumber 
== sequenceNumber);
                        checkState(!toPrioritize.buffer.isBuffer());
               if (addPriorityBuffer(toPrioritize)) {
                           notifyPriorityEvent(sequenceNumber);
                    }
                }
        }
   ```
   And I have a question about the firstPriorityEvent whether to mean the 
recently aligned `CheckpointBarrier `.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to