pnowojski commented on code in PR #19723:
URL: https://github.com/apache/flink/pull/19723#discussion_r879507376
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java:
##########
@@ -112,6 +115,20 @@ public class PipelinedSubpartition extends
ResultSubpartition
private int bufferSize = Integer.MAX_VALUE;
+ /**
+ * The channelState Future of unaligned checkpoint. Access to the
channelStateFutures is
+ * synchronized on buffers.
Review Comment:
I would drop:
> Access to the channelStateFutures is synchronized on buffers.
It duplicates `GuardedBy` annotation.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java:
##########
@@ -240,9 +260,158 @@ private boolean processPriorityBuffer(BufferConsumer
bufferConsumer, int partial
inflightBuffers.toArray(new Buffer[0]));
}
}
- return numPriorityElements == 1
- && !isBlocked; // if subpartition is blocked then downstream
doesn't expect any
- // notifications
+ return needNotifyPriorityEvent();
+ }
+
+ // It just be called after add priorityEvent.
+ private boolean needNotifyPriorityEvent() {
+ assert Thread.holdsLock(buffers);
+ // if subpartition is blocked then downstream doesn't expect any
notifications
+ return buffers.getNumPriorityElements() == 1 && !isBlocked;
+ }
+
+ private void processTimeoutableCheckpointBarrier(BufferConsumer
bufferConsumer) {
+ CheckpointBarrier barrier =
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+ channelStateWriter.addOutputDataFuture(
+ barrier.getId(),
+ subpartitionInfo,
+ ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+ createChannelStateFuture(barrier.getId()));
+ }
+
+ private void completeTimeoutableCheckpointBarrier(BufferConsumer
bufferConsumer) {
+ CheckpointBarrier barrier =
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+ if (channelStateFutureIsAvailable(barrier.getId())) {
+ completeChannelStateFuture(Collections.emptyList(), null);
+ }
+ }
+
+ private CompletableFuture<List<Buffer>> createChannelStateFuture(long
checkpointId) {
+ assert Thread.holdsLock(buffers);
+ if (channelStateFuture != null) {
+ completeChannelStateFuture(
+ null,
+ new IllegalStateException(
+ String.format(
+ "%s has uncompleted channelStateFuture of
checkpointId=%s, but it received "
+ + "a new timeoutable checkpoint
barrier of checkpointId=%s, it maybe "
+ + "a bug due to currently does not
support concurrent unaligned checkpoints.",
Review Comment:
nit:
> a bug due to currently not supported concurrent unaligned checkpoint
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java:
##########
@@ -240,9 +260,158 @@ private boolean processPriorityBuffer(BufferConsumer
bufferConsumer, int partial
inflightBuffers.toArray(new Buffer[0]));
}
}
- return numPriorityElements == 1
- && !isBlocked; // if subpartition is blocked then downstream
doesn't expect any
- // notifications
+ return needNotifyPriorityEvent();
+ }
+
+ // It just be called after add priorityEvent.
+ private boolean needNotifyPriorityEvent() {
+ assert Thread.holdsLock(buffers);
+ // if subpartition is blocked then downstream doesn't expect any
notifications
+ return buffers.getNumPriorityElements() == 1 && !isBlocked;
+ }
+
+ private void processTimeoutableCheckpointBarrier(BufferConsumer
bufferConsumer) {
+ CheckpointBarrier barrier =
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+ channelStateWriter.addOutputDataFuture(
+ barrier.getId(),
+ subpartitionInfo,
+ ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+ createChannelStateFuture(barrier.getId()));
+ }
+
+ private void completeTimeoutableCheckpointBarrier(BufferConsumer
bufferConsumer) {
+ CheckpointBarrier barrier =
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+ if (channelStateFutureIsAvailable(barrier.getId())) {
+ completeChannelStateFuture(Collections.emptyList(), null);
+ }
+ }
Review Comment:
Shouldn't we `checkState` here actually that
`channelStateFutureIsAvailable(barrier.getId())` is true? Is there a valid
scenario where this method should return false?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java:
##########
@@ -240,9 +260,158 @@ private boolean processPriorityBuffer(BufferConsumer
bufferConsumer, int partial
inflightBuffers.toArray(new Buffer[0]));
}
}
- return numPriorityElements == 1
- && !isBlocked; // if subpartition is blocked then downstream
doesn't expect any
- // notifications
+ return needNotifyPriorityEvent();
+ }
+
+ // It just be called after add priorityEvent.
+ private boolean needNotifyPriorityEvent() {
+ assert Thread.holdsLock(buffers);
+ // if subpartition is blocked then downstream doesn't expect any
notifications
+ return buffers.getNumPriorityElements() == 1 && !isBlocked;
+ }
+
+ private void processTimeoutableCheckpointBarrier(BufferConsumer
bufferConsumer) {
+ CheckpointBarrier barrier =
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+ channelStateWriter.addOutputDataFuture(
+ barrier.getId(),
+ subpartitionInfo,
+ ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+ createChannelStateFuture(barrier.getId()));
+ }
+
+ private void completeTimeoutableCheckpointBarrier(BufferConsumer
bufferConsumer) {
+ CheckpointBarrier barrier =
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+ if (channelStateFutureIsAvailable(barrier.getId())) {
+ completeChannelStateFuture(Collections.emptyList(), null);
+ }
+ }
Review Comment:
nit: move `completeTimeoutableCheckpointBarrier()` below `pollBuffer()`
(where it's being used?)
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java:
##########
@@ -103,18 +104,21 @@ static ChannelStateWriteRequest buildFutureWriteRequest(
checkBufferIsBuffer(buffer);
bufferConsumer.accept(writer, buffer);
}
- } catch (Throwable e) {
+ } catch (ExecutionException e) {
writer.fail(e);
}
},
throwable -> {
- List<Buffer> buffers = dataFuture.get();
- if (buffers == null || buffers.isEmpty()) {
- return;
+ try {
+ List<Buffer> buffers = dataFuture.get();
+ if (buffers == null || buffers.isEmpty()) {
+ return;
+ }
+ CloseableIterator<Buffer> iterator =
+ CloseableIterator.fromList(buffers,
Buffer::recycleBuffer);
+ iterator.close();
+ } catch (ExecutionException ignored) {
Review Comment:
Ohhh, ok. Thanks for the detailed explanation. I think either optionB or
optionC should be fine.
Maybe just add a comment
```
} catch (ExecutionException e) {
// If dataFuture fails, fail only the single related writer
writer.fail(e);
}
```
?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java:
##########
@@ -112,6 +115,20 @@ public class PipelinedSubpartition extends
ResultSubpartition
private int bufferSize = Integer.MAX_VALUE;
+ /**
+ * The channelState Future of unaligned checkpoint. Access to the
channelStateFutures is
+ * synchronized on buffers.
+ */
+ @GuardedBy("buffers")
+ private CompletableFuture<List<Buffer>> channelStateFuture;
+
+ /**
+ * It is the checkpointId corresponding to channelStateFuture. And It
should be always update
+ * with channelStateFuture.
Review Comment:
nitty nit:
`{@link #channelStateFuture}`
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java:
##########
@@ -240,9 +260,158 @@ private boolean processPriorityBuffer(BufferConsumer
bufferConsumer, int partial
inflightBuffers.toArray(new Buffer[0]));
}
}
- return numPriorityElements == 1
- && !isBlocked; // if subpartition is blocked then downstream
doesn't expect any
- // notifications
+ return needNotifyPriorityEvent();
+ }
+
+ // It just be called after add priorityEvent.
+ private boolean needNotifyPriorityEvent() {
+ assert Thread.holdsLock(buffers);
+ // if subpartition is blocked then downstream doesn't expect any
notifications
+ return buffers.getNumPriorityElements() == 1 && !isBlocked;
+ }
+
+ private void processTimeoutableCheckpointBarrier(BufferConsumer
bufferConsumer) {
+ CheckpointBarrier barrier =
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+ channelStateWriter.addOutputDataFuture(
+ barrier.getId(),
+ subpartitionInfo,
+ ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+ createChannelStateFuture(barrier.getId()));
+ }
+
+ private void completeTimeoutableCheckpointBarrier(BufferConsumer
bufferConsumer) {
+ CheckpointBarrier barrier =
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+ if (channelStateFutureIsAvailable(barrier.getId())) {
+ completeChannelStateFuture(Collections.emptyList(), null);
+ }
+ }
+
+ private CompletableFuture<List<Buffer>> createChannelStateFuture(long
checkpointId) {
+ assert Thread.holdsLock(buffers);
+ if (channelStateFuture != null) {
+ completeChannelStateFuture(
+ null,
+ new IllegalStateException(
+ String.format(
+ "%s has uncompleted channelStateFuture of
checkpointId=%s, but it received "
+ + "a new timeoutable checkpoint
barrier of checkpointId=%s, it maybe "
+ + "a bug due to currently does not
support concurrent unaligned checkpoints.",
+ this, channelStateCheckpointId,
checkpointId)));
+ }
+ channelStateFuture = new CompletableFuture<>();
+ channelStateCheckpointId = checkpointId;
+ return channelStateFuture;
+ }
+
+ private void completeChannelStateFuture(List<Buffer> channelResult,
Throwable e) {
+ assert Thread.holdsLock(buffers);
+ if (e != null) {
+ channelStateFuture.completeExceptionally(e);
+ } else {
+ channelStateFuture.complete(channelResult);
+ }
+ channelStateFuture = null;
+ }
+
+ private boolean channelStateFutureIsAvailable(long checkpointId) {
Review Comment:
nit: `isChannelStateFutureAvailable`
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java:
##########
@@ -335,6 +380,34 @@ public void checkpointState(
}
}
+ private void registerAlignmentTimer(
+ long checkpointId,
+ OperatorChain<?, ?> operatorChain,
+ CheckpointBarrier checkpointBarrier) {
+ if (alignmentTimer != null) {
Review Comment:
shouldn't this be `checkState(alignmentTimer == null)`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]