1996fanrui commented on code in PR #21131:
URL: https://github.com/apache/flink/pull/21131#discussion_r1012728556


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -73,11 +79,27 @@ public void dispatch(ChannelStateWriteRequest request) 
throws Exception {
     private void dispatchInternal(ChannelStateWriteRequest request) throws 
Exception {
         if (request instanceof CheckpointStartRequest) {
             checkState(
-                    !writers.containsKey(request.getCheckpointId()),
-                    "writer not found for request " + request);
-            writers.put(request.getCheckpointId(), 
buildWriter((CheckpointStartRequest) request));
+                    request.getCheckpointId() > ongoingCheckpointId,
+                    String.format(
+                            "Checkpoint must be incremented, 
ongoingCheckpointId is %s, but the request is %s.",
+                            ongoingCheckpointId, request));
+            if (writer != null && !writer.isDone()) {
+                writer.fail(
+                        new IllegalStateException(
+                                String.format(
+                                        "Task[name=%s, subtaskIndex=%s] has 
uncompleted channelState writer of checkpointId=%s, "
+                                                + "but it received a new 
checkpoint start request of checkpointId=%s, it maybe "
+                                                + "a bug due to currently not 
supported concurrent unaligned checkpoint.",
+                                        taskName,
+                                        subtaskIndex,
+                                        ongoingCheckpointId,
+                                        request.getCheckpointId())));
+            }

Review Comment:
   If checkpoint 42 is completed, the `ongoingCheckpointId=42` and `writer == 
null`.
   
   - `start(43)` has been added to the queue, but not be executed.
   - `abort(44)` add the queue head and tail
   
   When the dispatcher received the `abort(44)`, is the `abortedCheckpointId` 
42?  Or the `isAborted` will be the true? 
    If yes, I prefer use `maxAbortedCheckpointId=44`. We can discard the 
`start(43)` directly. 
   
   
   Also, `maxAbortedCheckpointId` is more friendly to channel state file 
merging. Multiple subtasks will share the same 
`ChannelStateWriteRequestDispatcherImpl` in the future, the order of requests 
is not guaranteed between multiple subtasks.  A subtask's checkpoint 44 is 
aborted, and other subtasks should also be aborted here. But other subtasks may 
also `start(42)`.
   
   Do you think so?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to