[
https://issues.apache.org/jira/browse/FLINK-23471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yun Tang updated FLINK-23471:
-----------------------------
Summary: Try best to ensure all operators and state manager handle the
checkpoint notification complete (was: Try best to ensure all operators and
state manager handle the checkpoint notification)
> Try best to ensure all operators and state manager handle the checkpoint
> notification complete
> ----------------------------------------------------------------------------------------------
>
> Key: FLINK-23471
> URL: https://issues.apache.org/jira/browse/FLINK-23471
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing, Runtime / Task
> Reporter: Yun Tang
> Assignee: Yun Tang
> Priority: Major
> Fix For: 1.14.0
>
>
> Current {{SubtaskCheckpointCoordinatorImpl#notifyCheckpointComplete}} has
> implementation below:
> {code:java}
> @Override
> public void notifyCheckpointComplete(
> long checkpointId, OperatorChain<?, ?> operatorChain,
> Supplier<Boolean> isRunning)
> throws Exception {
> if (!isRunning.get()) {
> LOG.debug(
> "Ignoring notification of complete checkpoint {} for
> not-running task {}",
> checkpointId,
> taskName);
> } else if (operatorChain.isFinishedOnRestore()) {
> LOG.debug(
> "Ignoring notification of complete checkpoint {} for
> finished on restore task {}",
> checkpointId,
> taskName);
> } else {
> LOG.debug(
> "Notification of completed checkpoint {} for task {}",
> checkpointId, taskName);
> for (StreamOperatorWrapper<?, ?> operatorWrapper :
> operatorChain.getAllOperators(true)) {
> operatorWrapper.notifyCheckpointComplete(checkpointId);
> }
> }
> env.getTaskStateManager().notifyCheckpointComplete(checkpointId);
> }
> {code}
> If one operator in the operator chain throws exception out, the following
> operators and {{TaskStateManager}} would not receive the notification anymore.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)