pnowojski commented on code in PR #21131:
URL: https://github.com/apache/flink/pull/21131#discussion_r1009147524


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -73,11 +79,27 @@ public void dispatch(ChannelStateWriteRequest request) 
throws Exception {
     private void dispatchInternal(ChannelStateWriteRequest request) throws 
Exception {
         if (request instanceof CheckpointStartRequest) {
             checkState(
-                    !writers.containsKey(request.getCheckpointId()),
-                    "writer not found for request " + request);
-            writers.put(request.getCheckpointId(), 
buildWriter((CheckpointStartRequest) request));
+                    request.getCheckpointId() > ongoingCheckpointId,
+                    String.format(
+                            "Checkpoint must be incremented, 
ongoingCheckpointId is %s, but the request is %s.",
+                            ongoingCheckpointId, request));
+            if (writer != null && !writer.isDone()) {
+                writer.fail(
+                        new IllegalStateException(
+                                String.format(
+                                        "Task[name=%s, subtaskIndex=%s] has 
uncompleted channelState writer of checkpointId=%s, "
+                                                + "but it received a new 
checkpoint start request of checkpointId=%s, it maybe "
+                                                + "a bug due to currently not 
supported concurrent unaligned checkpoint.",
+                                        taskName,
+                                        subtaskIndex,
+                                        ongoingCheckpointId,
+                                        request.getCheckpointId())));
+            }

Review Comment:
   I think this might fail currently. I don't see `writer` being set to `null` 
if its checkpoint is aborted. Thus the following sequence I think will trigger 
this `IllegalStateException`.
   
   1. start checkpoint 42
   2. abort checkpoint 42
   3. start checkpoint 43



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -98,18 +120,26 @@ private ChannelStateCheckpointWriter 
buildWriter(CheckpointStartRequest request)
                 streamFactoryResolver.resolveCheckpointStorageLocation(
                         request.getCheckpointId(), 
request.getLocationReference()),
                 serializer,
-                () -> writers.remove(request.getCheckpointId()));
+                () -> {
+                    checkState(
+                            request.getCheckpointId() == ongoingCheckpointId,
+                            "The ongoingCheckpointId[%s] was changed when 
clear writer of checkpoint[%s], it might be a bug.",
+                            ongoingCheckpointId,
+                            request.getCheckpointId());
+                    this.writer = null;
+                });

Review Comment:
   I think this `checkState` might be working now, only because we do not 
fail/reset writer if checkpoint is aborted. Otherwise this would fail when 
there is a race condition between
   1. receiving last of the checkpoint barriers from the input 
`ChannelStateWriter#finishInput` for a checkpoint `42` in 
`CheckpointBarrierHandler`
   2. aborting checkpoint `43` for example.
   
   
   Maybe the best equivalent of the old code would be something like:
   ```
   if (request.getCheckpointId() < ongoingCheckpointId) {
     // ignore obsolete request, do nothing
   } else if (request.getCheckpointId() == ongoingCheckpointId) {
     this.writer = null;
   } else { 
     throw new IllegalStateException("This should never happened, trying to 
completed checkpoint[%s] while the ongoingCheckpointId[%s] is older", ...);
   }
   ```
   
   Since the the code on the master also doesn't check if `writers.remove(42)` 
removed anything or not?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to