pnowojski commented on code in PR #19723:
URL: https://github.com/apache/flink/pull/19723#discussion_r876796118
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java:
##########
@@ -240,9 +255,171 @@ private boolean processPriorityBuffer(BufferConsumer
bufferConsumer, int partial
inflightBuffers.toArray(new Buffer[0]));
}
}
- return numPriorityElements == 1
- && !isBlocked; // if subpartition is blocked then downstream
doesn't expect any
- // notifications
+ return needNotifyPriorityEvent();
+ }
+
+ // It just be called after add priorityEvent.
+ private boolean needNotifyPriorityEvent() {
+ assert Thread.holdsLock(buffers);
+ // if subpartition is blocked then downstream doesn't expect any
notifications
+ return buffers.getNumPriorityElements() == 1 && !isBlocked;
+ }
+
+ private void receiveTimeoutableCheckpointBarrier(BufferConsumer
bufferConsumer) {
+ CheckpointBarrier barrier =
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+ checkState(
+ !channelStateFutures.containsKey(barrier.getId()),
+ "%s has received the checkpoint barrier %d, it maybe a bug.",
+ toString(),
+ barrier.getId());
+
+ checkChannelStateFutures(barrier.getId());
+ CompletableFuture<List<Buffer>> dataFuture = new CompletableFuture<>();
+ channelStateFutures.put(barrier.getId(), dataFuture);
+ channelStateWriter.addOutputDataFuture(
+ barrier.getId(),
+ subpartitionInfo,
+ ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+ dataFuture);
+ }
+
+ private void checkChannelStateFutures(long currentCheckpointId) {
+ assert Thread.holdsLock(buffers);
+
+ while (!channelStateFutures.isEmpty()) {
+ Long checkpointId = channelStateFutures.firstKey();
+ if (checkpointId >= currentCheckpointId) {
+ break;
+ }
+ String exceptionMessage =
+ String.format(
+ "Received the barrier of checkpointId=%d, complete
checkpointId=%d "
+ + "future by exception due to currently
does not support "
+ + "concurrent unaligned checkpoints.",
+ currentCheckpointId, checkpointId);
+ channelStateFutures
+ .pollFirstEntry()
+ .getValue()
+ .completeExceptionally(new
IllegalStateException(exceptionMessage));
+ LOG.info(exceptionMessage);
+ }
+ }
Review Comment:
> I guess it might be supported in the future.
There are no plans for that. We can always reimplement it in the future,
while for the time being we would have a simpler easier to read/maintain code.
We just have to make sure that we put some `checkState()` or fail concurrent
checkpoint here, that would ensure if someone implements/enables concurrent
checkpoints, he won't accidentally miss place to modify.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]