pnowojski commented on a change in pull request #16582:
URL: https://github.com/apache/flink/pull/16582#discussion_r709876746
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
##########
@@ -46,18 +48,24 @@
class CheckpointSubsumeHelper {
private static final Logger LOG =
LoggerFactory.getLogger(CheckpointSubsumeHelper.class);
- public static void subsume(
+ @Nullable
+ public static CompletedCheckpoint subsume(
Deque<CompletedCheckpoint> checkpoints, int numRetain,
SubsumeAction subsumeAction)
throws Exception {
if (checkpoints.isEmpty() || checkpoints.size() <= numRetain) {
- return;
+ return null;
Review comment:
`Optional` for compile time checks?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -339,78 +343,107 @@ public void checkpointState(
public void notifyCheckpointComplete(
long checkpointId, OperatorChain<?, ?> operatorChain,
Supplier<Boolean> isRunning)
throws Exception {
- Exception previousException = null;
- if (!isRunning.get()) {
- LOG.debug(
- "Ignoring notification of complete checkpoint {} for
not-running task {}",
- checkpointId,
- taskName);
- } else if (operatorChain.isFinishedOnRestore()) {
- LOG.debug(
- "Ignoring notification of complete checkpoint {} for
finished on restore task {}",
- checkpointId,
- taskName);
- } else {
- LOG.debug(
- "Notification of completed checkpoint {} for task {}",
checkpointId, taskName);
- for (StreamOperatorWrapper<?, ?> operatorWrapper :
- operatorChain.getAllOperators(true)) {
- try {
- operatorWrapper.notifyCheckpointComplete(checkpointId);
- } catch (Exception e) {
- previousException = ExceptionUtils.firstOrSuppressed(e,
previousException);
- }
- }
- }
- env.getTaskStateManager().notifyCheckpointComplete(checkpointId);
- ExceptionUtils.tryRethrowException(previousException);
+ notifyCheckpoint(
+ checkpointId,
+ operatorChain,
+ isRunning,
+ "Ignoring notification of complete checkpoint {} for
not-running task {}",
+ "Ignoring notification of complete checkpoint {} for finished
on restore task {}",
+ "Notification of aborted checkpoint {} for task {}",
+ null,
+ (opw) -> opw.notifyCheckpointComplete(checkpointId),
+ (tsm) -> tsm.notifyCheckpointComplete(checkpointId));
}
@Override
public void notifyCheckpointAborted(
long checkpointId, OperatorChain<?, ?> operatorChain,
Supplier<Boolean> isRunning)
throws Exception {
+ notifyCheckpoint(
+ checkpointId,
+ operatorChain,
+ isRunning,
+ "Ignoring notification of aborted checkpoint {} for
not-running task {}",
+ "Ignoring notification of aborted checkpoint {} for finished
on restore task {}",
+ "Notification of aborted checkpoint {} for task {}",
+ () -> {
+ boolean canceled =
cancelAsyncCheckpointRunnable(checkpointId);
+
+ if (!canceled) {
+ if (checkpointId > lastCheckpointId) {
+ // only record checkpoints that have not triggered
on task side.
+ abortedCheckpointIds.add(checkpointId);
+ }
+ }
+
+ channelStateWriter.abort(
+ checkpointId,
+ new CancellationException("checkpoint aborted via
notification"),
+ false);
+ },
+ (opw) ->
opw.getStreamOperator().notifyCheckpointAborted(checkpointId),
+ (tsm) -> tsm.notifyCheckpointAborted(checkpointId));
+ }
+
+ @Override
+ public void notifyCheckpointSubsumed(
+ long checkpointId, OperatorChain<?, ?> operatorChain,
Supplier<Boolean> isRunning)
+ throws Exception {
+ notifyCheckpoint(
+ checkpointId,
+ operatorChain,
+ isRunning,
+ "Ignoring notification of subsumed checkpoint {} for
not-running task {}",
+ "Ignoring notification of subsumed checkpoint {} for finished
on restore task {}",
+ "Notification of subsumed checkpoint {} for task {}",
+ null,
+ (opw) -> opw.notifyCheckpointSubsumed(checkpointId),
+ null);
+ }
+
+ private void notifyCheckpoint(
+ long checkpointId,
+ OperatorChain<?, ?> operatorChain,
+ Supplier<Boolean> isRunning,
+ String logFormatIfNotRunning,
+ String logFormatIfFinishedOnRestore,
+ String logFormatIfNotificationStart,
+ @Nullable Runnable runnableBeforeNotifyOperator,
+ @Nonnull ThrowingConsumer<StreamOperatorWrapper<?, ?>, Exception>
operatorConsumer,
+ @Nullable ThrowingConsumer<TaskStateManager, Exception>
taskStateManagerConsumer)
+ throws Exception {
Review comment:
Instead of using `Runnable` and `ThrowingConsumer` in order to
deduplicate the code, I would suggest to consider deduplicating it by
decomposing this `notifyCheckpoint` method into smaller parts. For example:
```
public void notifyCheckpointAborted(...) {
if (notifyCheckpointOperationrShouldBeExecuted()) {
// inlined content of the `runnableBeforeNotifyOperator`
operatorChain.forEach((operatorWrapper) ->
operatorWrapper.notifyCheckpointSubsumed(checkpointId));
env.getTaskStateManager().notifyCheckpointAborted(checkpointId);
}
}
public void notifyCheckpointSubsumed(...) {
if (notifyCheckpointOperationrShouldBeExecuted()) {
operatorChain.forEach((operatorWrapper) ->
operatorWrapper.notifyCheckpointAborted(checkpointId));
}
}
private boolean notifyCheckpointOperationrShouldBeExecuted(...) {
// isRunning/isFinishedOnRestore checks + logging
}
```
Yes, we would still have a bit of code duplication, but it would avoid the
problem of generic `Runnable` `ThrowingConsumer` that are mangling/hiding what
the code is doing. In your current version to understand what this code is
doing, you have to keep jumping back and forth between
`notifyCheckpointAborted()` method (where those `Runnable` and
`ThrowingConsumers` are defined) and the actual `notifyCheckpoint`
implementation. What makes it even worse, is that you can not use "find
implementations" from the IDE to help you, as there are just so many
`Runnable`s in the project. So maybe a bit of code duplication is not that bad
in that case? (I'm not 100% sure)
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -339,78 +343,107 @@ public void checkpointState(
public void notifyCheckpointComplete(
long checkpointId, OperatorChain<?, ?> operatorChain,
Supplier<Boolean> isRunning)
throws Exception {
- Exception previousException = null;
- if (!isRunning.get()) {
- LOG.debug(
- "Ignoring notification of complete checkpoint {} for
not-running task {}",
- checkpointId,
- taskName);
- } else if (operatorChain.isFinishedOnRestore()) {
- LOG.debug(
- "Ignoring notification of complete checkpoint {} for
finished on restore task {}",
- checkpointId,
- taskName);
- } else {
- LOG.debug(
- "Notification of completed checkpoint {} for task {}",
checkpointId, taskName);
- for (StreamOperatorWrapper<?, ?> operatorWrapper :
- operatorChain.getAllOperators(true)) {
- try {
- operatorWrapper.notifyCheckpointComplete(checkpointId);
- } catch (Exception e) {
- previousException = ExceptionUtils.firstOrSuppressed(e,
previousException);
- }
- }
- }
- env.getTaskStateManager().notifyCheckpointComplete(checkpointId);
- ExceptionUtils.tryRethrowException(previousException);
+ notifyCheckpoint(
+ checkpointId,
+ operatorChain,
+ isRunning,
+ "Ignoring notification of complete checkpoint {} for
not-running task {}",
+ "Ignoring notification of complete checkpoint {} for finished
on restore task {}",
+ "Notification of aborted checkpoint {} for task {}",
+ null,
+ (opw) -> opw.notifyCheckpointComplete(checkpointId),
+ (tsm) -> tsm.notifyCheckpointComplete(checkpointId));
}
@Override
public void notifyCheckpointAborted(
long checkpointId, OperatorChain<?, ?> operatorChain,
Supplier<Boolean> isRunning)
throws Exception {
+ notifyCheckpoint(
+ checkpointId,
+ operatorChain,
+ isRunning,
+ "Ignoring notification of aborted checkpoint {} for
not-running task {}",
+ "Ignoring notification of aborted checkpoint {} for finished
on restore task {}",
+ "Notification of aborted checkpoint {} for task {}",
+ () -> {
+ boolean canceled =
cancelAsyncCheckpointRunnable(checkpointId);
+
+ if (!canceled) {
+ if (checkpointId > lastCheckpointId) {
+ // only record checkpoints that have not triggered
on task side.
+ abortedCheckpointIds.add(checkpointId);
+ }
+ }
+
+ channelStateWriter.abort(
+ checkpointId,
+ new CancellationException("checkpoint aborted via
notification"),
+ false);
+ },
+ (opw) ->
opw.getStreamOperator().notifyCheckpointAborted(checkpointId),
+ (tsm) -> tsm.notifyCheckpointAborted(checkpointId));
Review comment:
please no abbreviations. By Looking at this code I have no idea what
`opw` or `tsm` are.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
##########
@@ -67,6 +75,7 @@ public static void subsume(
}
// Don't break out from the loop to subsume intermediate savepoints
}
+ return lastSubsumedCheckpoint;
Review comment:
> In general, we should make the minimal changes to existing interfaces.
I think changes in both of those proposals are quite comparable. Adding one
getter vs changing return type of a method to substitute for this getter.
> Adding method of CompletedCheckpointStore#getLatestSubsumedCheckpointID
would introduce another method and would also make us lost the timing when the
checkpoint is subsumed.
What is the problem here? Could you elaborate what are you worried about?
On the other hand, I like the @rkhachatryan's approach that there seems to
be clear owner of this knowledge, what was/is the last subsumed checkpoint and
as a result of this, we don't need to modify `CheckpointCoordinator` that much.
The approach with returning this information in the `addCheckpoint` method,
that needs to be stored/cached externally in the `CheckpointCoordinator`
introduces more dependencies in the code.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -1352,84 +1352,73 @@ public void triggerCheckpointBarrier(
}
public void notifyCheckpointComplete(final long checkpointID) {
- final AbstractInvokable invokable = this.invokable;
-
- if (executionState == ExecutionState.RUNNING && invokable != null) {
- try {
- invokable.notifyCheckpointCompleteAsync(checkpointID);
- } catch (RejectedExecutionException ex) {
- // This may happen if the mailbox is closed. It means that the
task is shutting
- // down, so we just ignore it.
- LOG.debug(
- "Notify checkpoint complete {} for {} ({}) was
rejected by the mailbox",
- checkpointID,
- taskNameWithSubtask,
- executionId);
- } catch (Throwable t) {
- if (getExecutionState() == ExecutionState.RUNNING) {
- // fail task if checkpoint confirmation failed.
- failExternally(new RuntimeException("Error while
confirming checkpoint", t));
- }
- }
- } else {
- LOG.debug(
- "Ignoring checkpoint commit notification for non-running
task {}.",
- taskNameWithSubtask);
- }
+ notifyCheckpoint(
+ checkpointID,
+ (invokable) ->
invokable.notifyCheckpointCompleteAsync(checkpointID),
+ "Notify checkpoint complete {} for {} ({}) was rejected by the
mailbox",
+ "Error while confirming checkpoint {}.",
+ (t) -> failExternally(new RuntimeException("Error while
confirming checkpoint", t)),
+ "Ignoring checkpoint commit notification for non-running task
{}.");
}
public void notifyCheckpointAborted(
final long checkpointID, final long latestCompletedCheckpointId) {
- final AbstractInvokable invokable = this.invokable;
+ notifyCheckpoint(
+ checkpointID,
+ (invokable) ->
+ invokable.notifyCheckpointAbortAsync(
+ checkpointID, latestCompletedCheckpointId),
+ "Notify checkpoint abort {} for {} ({}) was rejected by the
mailbox",
+ "Error while aborting checkpoint {}.",
+ (t) -> failExternally(new RuntimeException("Error while
aborting checkpoint", t)),
+ "Ignoring checkpoint aborted notification for non-running task
{}.");
+ }
- if (executionState == ExecutionState.RUNNING && invokable != null) {
- try {
- invokable.notifyCheckpointAbortAsync(checkpointID,
latestCompletedCheckpointId);
- } catch (RejectedExecutionException ex) {
- // This may happen if the mailbox is closed. It means that the
task is shutting
- // down, so we just ignore it.
- LOG.debug(
- "Notify checkpoint abort {} for {} ({}) was rejected
by the mailbox",
- checkpointID,
- taskNameWithSubtask,
- executionId);
- } catch (Throwable t) {
- if (getExecutionState() == ExecutionState.RUNNING) {
- // fail task if checkpoint aborted notification failed.
- failExternally(new RuntimeException("Error while aborting
checkpoint", t));
- }
- }
- } else {
- LOG.info(
- "Ignoring checkpoint aborted notification for non-running
task {}.",
- taskNameWithSubtask);
- }
+ public void notifyCheckpointSubsumed(long checkpointID) {
+ notifyCheckpoint(
+ checkpointID,
+ (invokable) ->
invokable.notifyCheckpointSubsumedAsync(checkpointID),
+ "Notify checkpoint subsume {} for {} ({}) was rejected by the
mailbox",
+ "Error while subsuming checkpoint {}.",
+ null,
+ "Ignoring checkpoint subsume notification for non-running task
{}.");
}
- public void notifyCheckpointSubsumed(long checkpointId) {
+ private void notifyCheckpoint(
+ long checkpointId,
+ Consumer<AbstractInvokable> invokableNotifier,
+ String logFormatIfRejected,
+ String logFormatIfFailedToNotify,
+ @Nullable Consumer<Throwable> handleThrowableIfTaskRunning,
+ String logFormatIfNotRunning) {
Review comment:
For me this method has the same problem as in my previous comment with
generic Consumer's. However here I would propose to add an `enum
NotifyCheckpointOperation` with three values `ABORT`, `COMPLETE`, `SUBSUME`,
and just switch/case it to replace `Consumer<AbstractInvokable>
invokableNotifier`.
On the other hand `handleThrowableIfTaskRunning` could be replaced by
boolean flag `boolean shouldFailExternallyOnException`, so the signature of
this method could be:
```
private void notifyCheckpoint(
long checkpointId,
NotifyCheckpointOperation notifyCheckpointOperation,
boolean shouldFailExternallyOnException)
```
Thirdly, the log/error formats I think could be handled also easier, by just
using something like:
```
String.format("Notify checkpoint {} {} for {} ({}) was rejected by the
mailbox", notifyCheckpointOperation.toString(), ...);
```
instead of
```
String.format("Notify checkpoint abort {} for {} ({}) was rejected by the
mailbox", ...);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]