pnowojski commented on code in PR #20137: URL: https://github.com/apache/flink/pull/20137#discussion_r1032479797
########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java: ########## @@ -51,27 +64,46 @@ class ChannelStateWriteRequestExecutorImpl implements ChannelStateWriteRequestEx private final Thread thread; private volatile Exception thrown = null; private volatile boolean wasClosed = false; - private final String taskName; + + private final Map<SubtaskID, BlockingQueue<ChannelStateWriteRequest>> unreadyQueues = + new ConcurrentHashMap<>(); + + private final JobID jobID; + private final Set<SubtaskID> subtasks; + private final AtomicBoolean isRegistering = new AtomicBoolean(true); Review Comment: Is this used for anything else than `checkState`/`checkArgument` calls? ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java: ########## @@ -125,11 +162,72 @@ public void start() throws IllegalStateException { @Override public void submit(ChannelStateWriteRequest request) throws Exception { - submitInternal(request, () -> deque.add(request)); + BlockingQueue<ChannelStateWriteRequest> unreadyQueue = + unreadyQueues.get( + SubtaskID.of( + request.getJobID(), + request.getJobVertexID(), + request.getSubtaskIndex())); + checkState(unreadyQueue != null, "The subtask %s is not yet registered"); + submitInternal( + request, + () -> { + synchronized (unreadyQueue) { + // 1. unreadyQueue isn't empty, the new request must keep the order, so add + // the new request to the unreadyQueue tail. + if (!unreadyQueue.isEmpty()) { + unreadyQueue.add(request); + return; + } + // 2. unreadyQueue is empty, and new request is ready, so add it to the + // readyQueue. + if (request.getReadyFuture().isDone()) { + deque.add(request); + return; + } + // 3. unreadyQueue is empty, and new request isn't ready, so add it to the + // readyQueue, + // and register it as the first request. + unreadyQueue.add(request); + registerFirstRequestFuture(request, unreadyQueue); + } + }); + } + + private void registerFirstRequestFuture( + @Nonnull ChannelStateWriteRequest firstRequest, + BlockingQueue<ChannelStateWriteRequest> unreadyQueue) { + assert Thread.holdsLock(unreadyQueue); + checkState(firstRequest == unreadyQueue.peek(), "The request isn't the first request."); + firstRequest + .getReadyFuture() + .thenAccept(o -> moveReadyRequestToReadyQueue(unreadyQueue, firstRequest)) + .exceptionally( + throwable -> { + moveReadyRequestToReadyQueue(unreadyQueue, firstRequest); + return null; + }); Review Comment: Shouldn't you handle this error somehow? Fail this executor and propagate it to the subtasks? ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java: ########## @@ -51,27 +64,46 @@ class ChannelStateWriteRequestExecutorImpl implements ChannelStateWriteRequestEx private final Thread thread; private volatile Exception thrown = null; private volatile boolean wasClosed = false; - private final String taskName; + + private final Map<SubtaskID, BlockingQueue<ChannelStateWriteRequest>> unreadyQueues = + new ConcurrentHashMap<>(); + + private final JobID jobID; + private final Set<SubtaskID> subtasks; + private final AtomicBoolean isRegistering = new AtomicBoolean(true); Review Comment: There is `BlockingQueue`, two `volatile`, `ConcurrentHashMap` and `AtomicBoolean`. I think that's a bit too much and it should be all replaced with a single `synchronized (this)` or `private finla Object lock = new Object();` and later `synchronized (lock)` combined with non-thread safe queue, Exception, boolean and Map. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org