pnowojski commented on a change in pull request #16582:
URL: https://github.com/apache/flink/pull/16582#discussion_r709876746



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
##########
@@ -46,18 +48,24 @@
 class CheckpointSubsumeHelper {
     private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointSubsumeHelper.class);
 
-    public static void subsume(
+    @Nullable
+    public static CompletedCheckpoint subsume(
             Deque<CompletedCheckpoint> checkpoints, int numRetain, 
SubsumeAction subsumeAction)
             throws Exception {
         if (checkpoints.isEmpty() || checkpoints.size() <= numRetain) {
-            return;
+            return null;

Review comment:
       `Optional` for compile time checks?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -339,78 +343,107 @@ public void checkpointState(
     public void notifyCheckpointComplete(
             long checkpointId, OperatorChain<?, ?> operatorChain, 
Supplier<Boolean> isRunning)
             throws Exception {
-        Exception previousException = null;
-        if (!isRunning.get()) {
-            LOG.debug(
-                    "Ignoring notification of complete checkpoint {} for 
not-running task {}",
-                    checkpointId,
-                    taskName);
-        } else if (operatorChain.isFinishedOnRestore()) {
-            LOG.debug(
-                    "Ignoring notification of complete checkpoint {} for 
finished on restore task {}",
-                    checkpointId,
-                    taskName);
-        } else {
-            LOG.debug(
-                    "Notification of completed checkpoint {} for task {}", 
checkpointId, taskName);
 
-            for (StreamOperatorWrapper<?, ?> operatorWrapper :
-                    operatorChain.getAllOperators(true)) {
-                try {
-                    operatorWrapper.notifyCheckpointComplete(checkpointId);
-                } catch (Exception e) {
-                    previousException = ExceptionUtils.firstOrSuppressed(e, 
previousException);
-                }
-            }
-        }
-        env.getTaskStateManager().notifyCheckpointComplete(checkpointId);
-        ExceptionUtils.tryRethrowException(previousException);
+        notifyCheckpoint(
+                checkpointId,
+                operatorChain,
+                isRunning,
+                "Ignoring notification of complete checkpoint {} for 
not-running task {}",
+                "Ignoring notification of complete checkpoint {} for finished 
on restore task {}",
+                "Notification of aborted checkpoint {} for task {}",
+                null,
+                (opw) -> opw.notifyCheckpointComplete(checkpointId),
+                (tsm) -> tsm.notifyCheckpointComplete(checkpointId));
     }
 
     @Override
     public void notifyCheckpointAborted(
             long checkpointId, OperatorChain<?, ?> operatorChain, 
Supplier<Boolean> isRunning)
             throws Exception {
 
+        notifyCheckpoint(
+                checkpointId,
+                operatorChain,
+                isRunning,
+                "Ignoring notification of aborted checkpoint {} for 
not-running task {}",
+                "Ignoring notification of aborted checkpoint {} for finished 
on restore task {}",
+                "Notification of aborted checkpoint {} for task {}",
+                () -> {
+                    boolean canceled = 
cancelAsyncCheckpointRunnable(checkpointId);
+
+                    if (!canceled) {
+                        if (checkpointId > lastCheckpointId) {
+                            // only record checkpoints that have not triggered 
on task side.
+                            abortedCheckpointIds.add(checkpointId);
+                        }
+                    }
+
+                    channelStateWriter.abort(
+                            checkpointId,
+                            new CancellationException("checkpoint aborted via 
notification"),
+                            false);
+                },
+                (opw) -> 
opw.getStreamOperator().notifyCheckpointAborted(checkpointId),
+                (tsm) -> tsm.notifyCheckpointAborted(checkpointId));
+    }
+
+    @Override
+    public void notifyCheckpointSubsumed(
+            long checkpointId, OperatorChain<?, ?> operatorChain, 
Supplier<Boolean> isRunning)
+            throws Exception {
+        notifyCheckpoint(
+                checkpointId,
+                operatorChain,
+                isRunning,
+                "Ignoring notification of subsumed checkpoint {} for 
not-running task {}",
+                "Ignoring notification of subsumed checkpoint {} for finished 
on restore task {}",
+                "Notification of subsumed checkpoint {} for task {}",
+                null,
+                (opw) -> opw.notifyCheckpointSubsumed(checkpointId),
+                null);
+    }
+
+    private void notifyCheckpoint(
+            long checkpointId,
+            OperatorChain<?, ?> operatorChain,
+            Supplier<Boolean> isRunning,
+            String logFormatIfNotRunning,
+            String logFormatIfFinishedOnRestore,
+            String logFormatIfNotificationStart,
+            @Nullable Runnable runnableBeforeNotifyOperator,
+            @Nonnull ThrowingConsumer<StreamOperatorWrapper<?, ?>, Exception> 
operatorConsumer,
+            @Nullable ThrowingConsumer<TaskStateManager, Exception> 
taskStateManagerConsumer)
+            throws Exception {

Review comment:
       Instead of using `Runnable` and `ThrowingConsumer` in order to 
deduplicate the code, I would suggest to consider deduplicating it by 
decomposing this `notifyCheckpoint` method into smaller parts. For example:
   
   ```
   
   public void notifyCheckpointAborted(...) {
     if (notifyCheckpointOperationrShouldBeExecuted()) {
       // inlined content of the `runnableBeforeNotifyOperator`
       
       operatorChain.forEach((operatorWrapper) -> 
operatorWrapper.notifyCheckpointSubsumed(checkpointId));
       
       env.getTaskStateManager().notifyCheckpointAborted(checkpointId);
     }
   }
   
   public void notifyCheckpointSubsumed(...) {
     if (notifyCheckpointOperationrShouldBeExecuted()) {
       operatorChain.forEach((operatorWrapper) -> 
operatorWrapper.notifyCheckpointAborted(checkpointId));
     }
   }
   
   private boolean notifyCheckpointOperationrShouldBeExecuted(...) {
     // isRunning/isFinishedOnRestore checks + logging
   }
   ```
   
   Yes, we would still have a bit of code duplication, but it would avoid the 
problem of generic `Runnable` `ThrowingConsumer` that are mangling/hiding what 
the code is doing. In your current version to understand what this code is 
doing, you have to keep jumping back and forth between 
`notifyCheckpointAborted()` method (where those `Runnable` and 
`ThrowingConsumers` are defined) and the actual `notifyCheckpoint` 
implementation. What makes it even worse, is that you can not use "find 
implementations" from the IDE to help you, as there are just so many 
`Runnable`s in the project. So maybe a bit of code duplication is not that bad 
in that case? (I'm not 100% sure)

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -339,78 +343,107 @@ public void checkpointState(
     public void notifyCheckpointComplete(
             long checkpointId, OperatorChain<?, ?> operatorChain, 
Supplier<Boolean> isRunning)
             throws Exception {
-        Exception previousException = null;
-        if (!isRunning.get()) {
-            LOG.debug(
-                    "Ignoring notification of complete checkpoint {} for 
not-running task {}",
-                    checkpointId,
-                    taskName);
-        } else if (operatorChain.isFinishedOnRestore()) {
-            LOG.debug(
-                    "Ignoring notification of complete checkpoint {} for 
finished on restore task {}",
-                    checkpointId,
-                    taskName);
-        } else {
-            LOG.debug(
-                    "Notification of completed checkpoint {} for task {}", 
checkpointId, taskName);
 
-            for (StreamOperatorWrapper<?, ?> operatorWrapper :
-                    operatorChain.getAllOperators(true)) {
-                try {
-                    operatorWrapper.notifyCheckpointComplete(checkpointId);
-                } catch (Exception e) {
-                    previousException = ExceptionUtils.firstOrSuppressed(e, 
previousException);
-                }
-            }
-        }
-        env.getTaskStateManager().notifyCheckpointComplete(checkpointId);
-        ExceptionUtils.tryRethrowException(previousException);
+        notifyCheckpoint(
+                checkpointId,
+                operatorChain,
+                isRunning,
+                "Ignoring notification of complete checkpoint {} for 
not-running task {}",
+                "Ignoring notification of complete checkpoint {} for finished 
on restore task {}",
+                "Notification of aborted checkpoint {} for task {}",
+                null,
+                (opw) -> opw.notifyCheckpointComplete(checkpointId),
+                (tsm) -> tsm.notifyCheckpointComplete(checkpointId));
     }
 
     @Override
     public void notifyCheckpointAborted(
             long checkpointId, OperatorChain<?, ?> operatorChain, 
Supplier<Boolean> isRunning)
             throws Exception {
 
+        notifyCheckpoint(
+                checkpointId,
+                operatorChain,
+                isRunning,
+                "Ignoring notification of aborted checkpoint {} for 
not-running task {}",
+                "Ignoring notification of aborted checkpoint {} for finished 
on restore task {}",
+                "Notification of aborted checkpoint {} for task {}",
+                () -> {
+                    boolean canceled = 
cancelAsyncCheckpointRunnable(checkpointId);
+
+                    if (!canceled) {
+                        if (checkpointId > lastCheckpointId) {
+                            // only record checkpoints that have not triggered 
on task side.
+                            abortedCheckpointIds.add(checkpointId);
+                        }
+                    }
+
+                    channelStateWriter.abort(
+                            checkpointId,
+                            new CancellationException("checkpoint aborted via 
notification"),
+                            false);
+                },
+                (opw) -> 
opw.getStreamOperator().notifyCheckpointAborted(checkpointId),
+                (tsm) -> tsm.notifyCheckpointAborted(checkpointId));

Review comment:
       please no abbreviations. By Looking at this code I have no idea what 
`opw` or `tsm` are. 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
##########
@@ -67,6 +75,7 @@ public static void subsume(
             }
             // Don't break out from the loop to subsume intermediate savepoints
         }
+        return lastSubsumedCheckpoint;

Review comment:
       > In general, we should make the minimal changes to existing interfaces.
   
   I think changes in both of those proposals are quite comparable. Adding one 
getter vs changing return type of a method to substitute for this getter.
   
   > Adding method of CompletedCheckpointStore#getLatestSubsumedCheckpointID 
would introduce another method and would also make us lost the timing when the 
checkpoint is subsumed.
   
   What is the problem here? Could you elaborate what are you worried about?
   
   On the other hand, I like the @rkhachatryan's approach that there seems to 
be clear owner of this knowledge, what was/is the last subsumed checkpoint and 
as a result of this, we don't need to modify `CheckpointCoordinator` that much. 
The approach with returning this information in the `addCheckpoint` method, 
that needs to be stored/cached externally in the `CheckpointCoordinator` 
introduces more dependencies in the code.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -1352,84 +1352,73 @@ public void triggerCheckpointBarrier(
     }
 
     public void notifyCheckpointComplete(final long checkpointID) {
-        final AbstractInvokable invokable = this.invokable;
-
-        if (executionState == ExecutionState.RUNNING && invokable != null) {
-            try {
-                invokable.notifyCheckpointCompleteAsync(checkpointID);
-            } catch (RejectedExecutionException ex) {
-                // This may happen if the mailbox is closed. It means that the 
task is shutting
-                // down, so we just ignore it.
-                LOG.debug(
-                        "Notify checkpoint complete {} for {} ({}) was 
rejected by the mailbox",
-                        checkpointID,
-                        taskNameWithSubtask,
-                        executionId);
-            } catch (Throwable t) {
-                if (getExecutionState() == ExecutionState.RUNNING) {
-                    // fail task if checkpoint confirmation failed.
-                    failExternally(new RuntimeException("Error while 
confirming checkpoint", t));
-                }
-            }
-        } else {
-            LOG.debug(
-                    "Ignoring checkpoint commit notification for non-running 
task {}.",
-                    taskNameWithSubtask);
-        }
+        notifyCheckpoint(
+                checkpointID,
+                (invokable) -> 
invokable.notifyCheckpointCompleteAsync(checkpointID),
+                "Notify checkpoint complete {} for {} ({}) was rejected by the 
mailbox",
+                "Error while confirming checkpoint {}.",
+                (t) -> failExternally(new RuntimeException("Error while 
confirming checkpoint", t)),
+                "Ignoring checkpoint commit notification for non-running task 
{}.");
     }
 
     public void notifyCheckpointAborted(
             final long checkpointID, final long latestCompletedCheckpointId) {
-        final AbstractInvokable invokable = this.invokable;
+        notifyCheckpoint(
+                checkpointID,
+                (invokable) ->
+                        invokable.notifyCheckpointAbortAsync(
+                                checkpointID, latestCompletedCheckpointId),
+                "Notify checkpoint abort {} for {} ({}) was rejected by the 
mailbox",
+                "Error while aborting checkpoint {}.",
+                (t) -> failExternally(new RuntimeException("Error while 
aborting checkpoint", t)),
+                "Ignoring checkpoint aborted notification for non-running task 
{}.");
+    }
 
-        if (executionState == ExecutionState.RUNNING && invokable != null) {
-            try {
-                invokable.notifyCheckpointAbortAsync(checkpointID, 
latestCompletedCheckpointId);
-            } catch (RejectedExecutionException ex) {
-                // This may happen if the mailbox is closed. It means that the 
task is shutting
-                // down, so we just ignore it.
-                LOG.debug(
-                        "Notify checkpoint abort {} for {} ({}) was rejected 
by the mailbox",
-                        checkpointID,
-                        taskNameWithSubtask,
-                        executionId);
-            } catch (Throwable t) {
-                if (getExecutionState() == ExecutionState.RUNNING) {
-                    // fail task if checkpoint aborted notification failed.
-                    failExternally(new RuntimeException("Error while aborting 
checkpoint", t));
-                }
-            }
-        } else {
-            LOG.info(
-                    "Ignoring checkpoint aborted notification for non-running 
task {}.",
-                    taskNameWithSubtask);
-        }
+    public void notifyCheckpointSubsumed(long checkpointID) {
+        notifyCheckpoint(
+                checkpointID,
+                (invokable) -> 
invokable.notifyCheckpointSubsumedAsync(checkpointID),
+                "Notify checkpoint subsume {} for {} ({}) was rejected by the 
mailbox",
+                "Error while subsuming checkpoint {}.",
+                null,
+                "Ignoring checkpoint subsume notification for non-running task 
{}.");
     }
 
-    public void notifyCheckpointSubsumed(long checkpointId) {
+    private void notifyCheckpoint(
+            long checkpointId,
+            Consumer<AbstractInvokable> invokableNotifier,
+            String logFormatIfRejected,
+            String logFormatIfFailedToNotify,
+            @Nullable Consumer<Throwable> handleThrowableIfTaskRunning,
+            String logFormatIfNotRunning) {

Review comment:
       For me this method has the same problem as in my previous comment with 
generic Consumer's. However here I would propose to add an `enum 
NotifyCheckpointOperation` with three values `ABORT`, `COMPLETE`, `SUBSUME`, 
and just switch/case it to replace `Consumer<AbstractInvokable> 
invokableNotifier`.
   
   On the other hand `handleThrowableIfTaskRunning` could be replaced by 
boolean flag `boolean shouldFailExternallyOnException`, so the signature of 
this method could be:
   ```
       private void notifyCheckpoint(
               long checkpointId,
               NotifyCheckpointOperation notifyCheckpointOperation,
               boolean shouldFailExternallyOnException)
   ```
   
   Thirdly, the log/error formats I think could be handled also easier, by just 
using something like:
   ```
   String.format("Notify checkpoint {} {} for {} ({}) was rejected by the 
mailbox", notifyCheckpointOperation.toString(), ...);
   ```
   instead of
   ```
   String.format("Notify checkpoint abort {} for {} ({}) was rejected by the 
mailbox",  ...);
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to