Yun Tang created FLINK-23471:
--------------------------------
Summary: Try best to ensure all operators and state manager handle
the checkpoint notification
Key: FLINK-23471
URL: https://issues.apache.org/jira/browse/FLINK-23471
Project: Flink
Issue Type: Improvement
Components: Runtime / Checkpointing, Runtime / Task
Reporter: Yun Tang
Assignee: Yun Tang
Fix For: 1.14.0
Current {{SubtaskCheckpointCoordinatorImpl#notifyCheckpointComplete}} has
implementation below:
{code:java}
@Override
public void notifyCheckpointComplete(
long checkpointId, OperatorChain<?, ?> operatorChain,
Supplier<Boolean> isRunning)
throws Exception {
if (!isRunning.get()) {
LOG.debug(
"Ignoring notification of complete checkpoint {} for
not-running task {}",
checkpointId,
taskName);
} else if (operatorChain.isFinishedOnRestore()) {
LOG.debug(
"Ignoring notification of complete checkpoint {} for
finished on restore task {}",
checkpointId,
taskName);
} else {
LOG.debug(
"Notification of completed checkpoint {} for task {}",
checkpointId, taskName);
for (StreamOperatorWrapper<?, ?> operatorWrapper :
operatorChain.getAllOperators(true)) {
operatorWrapper.notifyCheckpointComplete(checkpointId);
}
}
env.getTaskStateManager().notifyCheckpointComplete(checkpointId);
}
{code}
If one operator in the operator chain throws exception out, the following
operators and {{TaskStateManager}} would not receive the notification anymore.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)