1996fanrui commented on code in PR #20137:
URL: https://github.com/apache/flink/pull/20137#discussion_r1032795523
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##########
@@ -125,11 +162,72 @@ public void start() throws IllegalStateException {
@Override
public void submit(ChannelStateWriteRequest request) throws Exception {
- submitInternal(request, () -> deque.add(request));
+ BlockingQueue<ChannelStateWriteRequest> unreadyQueue =
+ unreadyQueues.get(
+ SubtaskID.of(
+ request.getJobID(),
+ request.getJobVertexID(),
+ request.getSubtaskIndex()));
+ checkState(unreadyQueue != null, "The subtask %s is not yet
registered");
+ submitInternal(
+ request,
+ () -> {
+ synchronized (unreadyQueue) {
+ // 1. unreadyQueue isn't empty, the new request must
keep the order, so add
+ // the new request to the unreadyQueue tail.
+ if (!unreadyQueue.isEmpty()) {
+ unreadyQueue.add(request);
+ return;
+ }
+ // 2. unreadyQueue is empty, and new request is ready,
so add it to the
+ // readyQueue.
+ if (request.getReadyFuture().isDone()) {
+ deque.add(request);
+ return;
+ }
+ // 3. unreadyQueue is empty, and new request isn't
ready, so add it to the
+ // readyQueue,
+ // and register it as the first request.
+ unreadyQueue.add(request);
+ registerFirstRequestFuture(request, unreadyQueue);
+ }
+ });
+ }
+
+ private void registerFirstRequestFuture(
+ @Nonnull ChannelStateWriteRequest firstRequest,
+ BlockingQueue<ChannelStateWriteRequest> unreadyQueue) {
+ assert Thread.holdsLock(unreadyQueue);
+ checkState(firstRequest == unreadyQueue.peek(), "The request isn't the
first request.");
+ firstRequest
+ .getReadyFuture()
+ .thenAccept(o -> moveReadyRequestToReadyQueue(unreadyQueue,
firstRequest))
+ .exceptionally(
+ throwable -> {
+ moveReadyRequestToReadyQueue(unreadyQueue,
firstRequest);
+ return null;
+ });
Review Comment:
When dataFuture is completed, just move the request to readyQueue.
And if the dataFuture isCompletedExceptionally, the `writer.fail` will be
called later. You can take a look
`ChannelStateWriteRequest#buildFutureWriteRequest.`, so I don't think the
exception needs be handled here.
```
static ChannelStateWriteRequest buildFutureWriteRequest(
JobID jobID,
JobVertexID jobVertexID,
int subtaskIndex,
long checkpointId,
String name,
CompletableFuture<List<Buffer>> dataFuture,
BiConsumer<ChannelStateCheckpointWriter, Buffer> bufferConsumer) {
return new CheckpointInProgressRequest(
name,
jobID,
jobVertexID,
subtaskIndex,
checkpointId,
writer -> {
checkState(
dataFuture.isDone(), "It should be executed when
dataFuture is done.");
List<Buffer> buffers;
try {
buffers = dataFuture.get();
} catch (ExecutionException e) {
// If dataFuture fails, fail only the single related
writer
writer.fail(jobID, jobVertexID, subtaskIndex, e);
return;
}
for (Buffer buffer : buffers) {
checkBufferIsBuffer(buffer);
bufferConsumer.accept(writer, buffer);
}
},
throwable ->
dataFuture.thenAccept(
buffers -> {
try {
CloseableIterator.fromList(buffers,
Buffer::recycleBuffer)
.close();
} catch (Exception e) {
LOG.error(
"Failed to recycle the output
buffer of channel state.",
e);
}
}),
dataFuture);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]