pnowojski commented on code in PR #20137:
URL: https://github.com/apache/flink/pull/20137#discussion_r1033474743


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##########
@@ -125,11 +162,72 @@ public void start() throws IllegalStateException {
 
     @Override
     public void submit(ChannelStateWriteRequest request) throws Exception {
-        submitInternal(request, () -> deque.add(request));
+        BlockingQueue<ChannelStateWriteRequest> unreadyQueue =
+                unreadyQueues.get(
+                        SubtaskID.of(
+                                request.getJobID(),
+                                request.getJobVertexID(),
+                                request.getSubtaskIndex()));
+        checkState(unreadyQueue != null, "The subtask %s is not yet 
registered");
+        submitInternal(
+                request,
+                () -> {
+                    synchronized (unreadyQueue) {
+                        // 1. unreadyQueue isn't empty, the new request must 
keep the order, so add
+                        // the new request to the unreadyQueue tail.
+                        if (!unreadyQueue.isEmpty()) {
+                            unreadyQueue.add(request);
+                            return;
+                        }
+                        // 2. unreadyQueue is empty, and new request is ready, 
so add it to the
+                        // readyQueue.
+                        if (request.getReadyFuture().isDone()) {
+                            deque.add(request);
+                            return;
+                        }
+                        // 3. unreadyQueue is empty, and new request isn't 
ready, so add it to the
+                        // readyQueue,
+                        // and register it as the first request.
+                        unreadyQueue.add(request);
+                        registerFirstRequestFuture(request, unreadyQueue);
+                    }
+                });
+    }
+
+    private void registerFirstRequestFuture(
+            @Nonnull ChannelStateWriteRequest firstRequest,
+            BlockingQueue<ChannelStateWriteRequest> unreadyQueue) {
+        assert Thread.holdsLock(unreadyQueue);
+        checkState(firstRequest == unreadyQueue.peek(), "The request isn't the 
first request.");
+        firstRequest
+                .getReadyFuture()
+                .thenAccept(o -> moveReadyRequestToReadyQueue(unreadyQueue, 
firstRequest))
+                .exceptionally(
+                        throwable -> {
+                            moveReadyRequestToReadyQueue(unreadyQueue, 
firstRequest);
+                            return null;
+                        });

Review Comment:
   Thanks for the explanation. Can you add a comment explaining that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to