pnowojski commented on code in PR #20137:
URL: https://github.com/apache/flink/pull/20137#discussion_r1032479797


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##########
@@ -51,27 +64,46 @@ class ChannelStateWriteRequestExecutorImpl implements 
ChannelStateWriteRequestEx
     private final Thread thread;
     private volatile Exception thrown = null;
     private volatile boolean wasClosed = false;
-    private final String taskName;
+
+    private final Map<SubtaskID, BlockingQueue<ChannelStateWriteRequest>> 
unreadyQueues =
+            new ConcurrentHashMap<>();
+
+    private final JobID jobID;
+    private final Set<SubtaskID> subtasks;
+    private final AtomicBoolean isRegistering = new AtomicBoolean(true);

Review Comment:
   Is this used for anything else than `checkState`/`checkArgument` calls?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##########
@@ -125,11 +162,72 @@ public void start() throws IllegalStateException {
 
     @Override
     public void submit(ChannelStateWriteRequest request) throws Exception {
-        submitInternal(request, () -> deque.add(request));
+        BlockingQueue<ChannelStateWriteRequest> unreadyQueue =
+                unreadyQueues.get(
+                        SubtaskID.of(
+                                request.getJobID(),
+                                request.getJobVertexID(),
+                                request.getSubtaskIndex()));
+        checkState(unreadyQueue != null, "The subtask %s is not yet 
registered");
+        submitInternal(
+                request,
+                () -> {
+                    synchronized (unreadyQueue) {
+                        // 1. unreadyQueue isn't empty, the new request must 
keep the order, so add
+                        // the new request to the unreadyQueue tail.
+                        if (!unreadyQueue.isEmpty()) {
+                            unreadyQueue.add(request);
+                            return;
+                        }
+                        // 2. unreadyQueue is empty, and new request is ready, 
so add it to the
+                        // readyQueue.
+                        if (request.getReadyFuture().isDone()) {
+                            deque.add(request);
+                            return;
+                        }
+                        // 3. unreadyQueue is empty, and new request isn't 
ready, so add it to the
+                        // readyQueue,
+                        // and register it as the first request.
+                        unreadyQueue.add(request);
+                        registerFirstRequestFuture(request, unreadyQueue);
+                    }
+                });
+    }
+
+    private void registerFirstRequestFuture(
+            @Nonnull ChannelStateWriteRequest firstRequest,
+            BlockingQueue<ChannelStateWriteRequest> unreadyQueue) {
+        assert Thread.holdsLock(unreadyQueue);
+        checkState(firstRequest == unreadyQueue.peek(), "The request isn't the 
first request.");
+        firstRequest
+                .getReadyFuture()
+                .thenAccept(o -> moveReadyRequestToReadyQueue(unreadyQueue, 
firstRequest))
+                .exceptionally(
+                        throwable -> {
+                            moveReadyRequestToReadyQueue(unreadyQueue, 
firstRequest);
+                            return null;
+                        });

Review Comment:
   Shouldn't you handle this error somehow? Fail this executor and propagate it 
to the subtasks?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##########
@@ -51,27 +64,46 @@ class ChannelStateWriteRequestExecutorImpl implements 
ChannelStateWriteRequestEx
     private final Thread thread;
     private volatile Exception thrown = null;
     private volatile boolean wasClosed = false;
-    private final String taskName;
+
+    private final Map<SubtaskID, BlockingQueue<ChannelStateWriteRequest>> 
unreadyQueues =
+            new ConcurrentHashMap<>();
+
+    private final JobID jobID;
+    private final Set<SubtaskID> subtasks;
+    private final AtomicBoolean isRegistering = new AtomicBoolean(true);

Review Comment:
   There is `BlockingQueue`, two `volatile`, `ConcurrentHashMap` and 
`AtomicBoolean`. I think that's a bit too much and it should be all replaced 
with a single `synchronized (this)` or `private finla Object lock = new 
Object();` and later `synchronized (lock)` combined with non-thread safe queue, 
Exception, boolean and Map.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to