1996fanrui commented on code in PR #19723:
URL: https://github.com/apache/flink/pull/19723#discussion_r880013345
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java:
##########
@@ -240,9 +260,158 @@ private boolean processPriorityBuffer(BufferConsumer
bufferConsumer, int partial
inflightBuffers.toArray(new Buffer[0]));
}
}
- return numPriorityElements == 1
- && !isBlocked; // if subpartition is blocked then downstream
doesn't expect any
- // notifications
+ return needNotifyPriorityEvent();
+ }
+
+ // It just be called after add priorityEvent.
+ private boolean needNotifyPriorityEvent() {
+ assert Thread.holdsLock(buffers);
+ // if subpartition is blocked then downstream doesn't expect any
notifications
+ return buffers.getNumPriorityElements() == 1 && !isBlocked;
+ }
+
+ private void processTimeoutableCheckpointBarrier(BufferConsumer
bufferConsumer) {
+ CheckpointBarrier barrier =
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+ channelStateWriter.addOutputDataFuture(
+ barrier.getId(),
+ subpartitionInfo,
+ ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+ createChannelStateFuture(barrier.getId()));
+ }
+
+ private void completeTimeoutableCheckpointBarrier(BufferConsumer
bufferConsumer) {
+ CheckpointBarrier barrier =
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+ if (channelStateFutureIsAvailable(barrier.getId())) {
+ completeChannelStateFuture(Collections.emptyList(), null);
+ }
+ }
Review Comment:
Actually, it may also be false here.
You mentioned in a [previous
comment](https://github.com/apache/flink/pull/19723#discussion_r875800201). It
happens when a checkpoint that has been aborted previously.
BTW, for these 2 comments, I'll add a brief description to the code
explaining why we can't checkState, to prevent it from being corrected later.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]