AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r476659669
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -171,19 +175,42 @@ private void handleAddingBarrier(BufferConsumer
bufferConsumer, boolean insertAs
checkState(inflightBufferSnapshot.isEmpty(),
"Supporting only one concurrent checkpoint in unaligned " +
"checkpoints");
- // Meanwhile prepare the collection of in-flight
buffers which would be fetched in the next step later.
- for (BufferConsumer buffer : buffers) {
- try (BufferConsumer bc = buffer.copy()) {
- if (bc.isBuffer()) {
-
inflightBufferSnapshot.add(bc.build());
+ final int pos = buffers.getNumPriorityElements();
+ buffers.addPriorityElement(bufferConsumer);
+
+ boolean unalignedCheckpoint =
isUnalignedCheckpoint(bufferConsumer);
+ if (unalignedCheckpoint) {
+ final Iterator<BufferConsumer> iterator =
buffers.iterator();
+ Iterators.advance(iterator, pos + 1);
+ while (iterator.hasNext()) {
+ BufferConsumer buffer = iterator.next();
+
+ if (buffer.isBuffer()) {
+ try (BufferConsumer bc =
buffer.copy()) {
+
inflightBufferSnapshot.add(bc.build());
+ }
}
}
}
+ return;
+ }
Review comment:
In general, I wanted to drop the assumption that there is only one
priority event going on at any given time. That's especially true when we make
cancellation events also a priority and we have a more or less fully blocked
channel.
Specifically, this change had following motivations:
* drop the assumption that all priority events are unaligned checkpoints.
* drop the assumption that the new priority event is always at position 0.
* a small performance improvement where buffers are only copied after it's
clear that they are not containing an event.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]