1996fanrui commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1049806879


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -90,33 +103,62 @@ public void dispatch(ChannelStateWriteRequest request) 
throws Exception {
     }
 
     private void dispatchInternal(ChannelStateWriteRequest request) throws 
Exception {
+        if (request instanceof SubtaskRegisterRequest) {
+            SubtaskRegisterRequest req = (SubtaskRegisterRequest) request;
+            SubtaskID subtaskID =
+                    SubtaskID.of(req.getJobID(), req.getJobVertexID(), 
req.getSubtaskIndex());
+            subtasks.add(subtaskID);
+            return;
+        } else if (request instanceof SubtaskReleaseRequest) {
+            SubtaskReleaseRequest req = (SubtaskReleaseRequest) request;
+            SubtaskID subtaskID =
+                    SubtaskID.of(req.getJobID(), req.getJobVertexID(), 
req.getSubtaskIndex());
+            subtasks.remove(subtaskID);
+            if (writer == null) {
+                return;
+            }
+            writer.releaseSubtask(subtaskID);
+            return;
+        }
         if (isAbortedCheckpoint(request.getCheckpointId())) {
-            if (request.getCheckpointId() == maxAbortedCheckpointId) {
+            if (request.getCheckpointId() != maxAbortedCheckpointId) {
+                request.cancel(new 
CheckpointException(CHECKPOINT_DECLINED_SUBSUMED));
+                return;
+            }
+
+            SubtaskID requestSubtask =
+                    SubtaskID.of(
+                            request.getJobID(),
+                            request.getJobVertexID(),
+                            request.getSubtaskIndex());
+            if (requestSubtask.equals(abortedSubtaskID)) {
                 request.cancel(abortedCause);
             } else {
-                request.cancel(new 
CheckpointException(CHECKPOINT_DECLINED_SUBSUMED));
+                request.cancel(
+                        new CheckpointException(
+                                CHANNEL_STATE_SHARED_STREAM_EXCEPTION, 
abortedCause));
             }
             return;
         }
 
         if (request instanceof CheckpointStartRequest) {
             checkState(
-                    request.getCheckpointId() > ongoingCheckpointId,
+                    request.getCheckpointId() >= ongoingCheckpointId,
                     String.format(
                             "Checkpoint must be incremented, 
ongoingCheckpointId is %s, but the request is %s.",
                             ongoingCheckpointId, request));
-            failAndClearWriter(
-                    new IllegalStateException(
-                            String.format(
-                                    "Task[name=%s, subtaskIndex=%s] has 
uncompleted channelState writer of checkpointId=%s, "
-                                            + "but it received a new 
checkpoint start request of checkpointId=%s, it maybe "
-                                            + "a bug due to currently not 
supported concurrent unaligned checkpoint.",
-                                    taskName,
-                                    subtaskIndex,
-                                    ongoingCheckpointId,
-                                    request.getCheckpointId())));
-            this.writer = buildWriter((CheckpointStartRequest) request);
-            this.ongoingCheckpointId = request.getCheckpointId();
+            if (request.getCheckpointId() > ongoingCheckpointId) {
+                // Clear the previous writer.
+                failAndClearWriter(new 
CheckpointException(CHECKPOINT_DECLINED_SUBSUMED));
+            }
+            CheckpointStartRequest req = (CheckpointStartRequest) request;
+            if (writer == null) {

Review Comment:
   > This can not be checkState(writer == null) because single dispatcher will 
handle 5 CheckpointStartRequests from 5 subtasks (assuming 5 subtasks are 
configured to share the same file?).
   
   Yes, totally right. I will add comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to