AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476659669



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -171,19 +175,42 @@ private void handleAddingBarrier(BufferConsumer 
bufferConsumer, boolean insertAs
                        checkState(inflightBufferSnapshot.isEmpty(), 
"Supporting only one concurrent checkpoint in unaligned " +
                                "checkpoints");
 
-                       // Meanwhile prepare the collection of in-flight 
buffers which would be fetched in the next step later.
-                       for (BufferConsumer buffer : buffers) {
-                               try (BufferConsumer bc = buffer.copy()) {
-                                       if (bc.isBuffer()) {
-                                               
inflightBufferSnapshot.add(bc.build());
+                       final int pos = buffers.getNumPriorityElements();
+                       buffers.addPriorityElement(bufferConsumer);
+
+                       boolean unalignedCheckpoint = 
isUnalignedCheckpoint(bufferConsumer);
+                       if (unalignedCheckpoint) {
+                               final Iterator<BufferConsumer> iterator = 
buffers.iterator();
+                               Iterators.advance(iterator, pos + 1);
+                               while (iterator.hasNext()) {
+                                       BufferConsumer buffer = iterator.next();
+
+                                       if (buffer.isBuffer()) {
+                                               try (BufferConsumer bc = 
buffer.copy()) {
+                                                       
inflightBufferSnapshot.add(bc.build());
+                                               }
                                        }
                                }
                        }
+                       return;
+               }

Review comment:
       In general, I wanted to drop the assumption that there is only one 
priority event going on at any given time. That's especially true when we make 
cancellation events also a priority and we have a more or less fully blocked 
channel.
   
   Specifically, this change had following motivations:
   * drop the assumption that all priority events are unaligned checkpoints.
   * drop the assumption that the new priority event is always at position 0.
   * a small performance improvement where buffers are only copied after it's 
clear that they are not containing an event.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to