[
https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Cai Liuyang updated FLINK-32316:
--------------------------------
Description:
When we try SourceAlignment feature, we found there will be a duplicated
announceCombinedWatermark task will be scheduled after JobManager failover and
auto recover job from checkpoint.
The reason i think is we should schedule announceCombinedWatermark task during
SourceCoordinator::start function not in SourceCoordinator construct function
(see
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
), because when jobManager encounter failover and auto recover job, it will
create SourceCoordinator twice:
* The first one is when JobMaster is created it will create the
DefaultExecutionGraph, this will init the first sourceCoordinator but will not
start it.
* The Second one is JobMaster call restoreLatestCheckpointedStateInternal
method, which will be reset old sourceCoordinator and initialize a new one, but
because the first sourceCoordinator is not started(SourceCoordinator will be
started before SchedulerBase::startScheduling), so the first SourceCoordinator
will not be fully closed.
And we also found another problem see jira:
https://issues.apache.org/jira/browse/FLINK-32362
was:
When we try SourceAlignment feature, we found there will be a duplicated
announceCombinedWatermark task will be scheduled after JobManager failover and
auto recover job from checkpoint.
The reason i think is we should schedule announceCombinedWatermark task during
SourceCoordinator::start function not in SourceCoordinator construct function
(see
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
), because when jobManager encounter failover and auto recover job, it will
create SourceCoordinator twice:
* The first one is when JobMaster is created it will create the
DefaultExecutionGraph, this will init the first sourceCoordinator but will not
start it.
* The Second one is JobMaster call restoreLatestCheckpointedStateInternal
method, which will be reset old sourceCoordinator and initialize a new one, but
because the first sourceCoordinator is not started(SourceCoordinator will be
started before SchedulerBase::startScheduling), so the first SourceCoordinator
will not be fully closed.
-And we also found there is another problem that announceCombinedWatermark may
throw a exception (like "subtask 25 is not ready yet to receive events" , this
subtask maybe under failover) lead the period task not running any more
(ThreadPoolExecutor will not schedule the period task if it throw a exception),
i think we should increase the robustness of announceCombinedWatermark function
to cover this case (see
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L199]
)-
(see jira: https://issues.apache.org/jira/browse/FLINK-32362)
> Duplicated announceCombinedWatermark task maybe scheduled if jobmanager
> failover
> --------------------------------------------------------------------------------
>
> Key: FLINK-32316
> URL: https://issues.apache.org/jira/browse/FLINK-32316
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.16.0
> Reporter: Cai Liuyang
> Priority: Major
>
> When we try SourceAlignment feature, we found there will be a duplicated
> announceCombinedWatermark task will be scheduled after JobManager failover
> and auto recover job from checkpoint.
> The reason i think is we should schedule announceCombinedWatermark task
> during SourceCoordinator::start function not in SourceCoordinator construct
> function (see
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
> ), because when jobManager encounter failover and auto recover job, it will
> create SourceCoordinator twice:
> * The first one is when JobMaster is created it will create the
> DefaultExecutionGraph, this will init the first sourceCoordinator but will
> not start it.
> * The Second one is JobMaster call restoreLatestCheckpointedStateInternal
> method, which will be reset old sourceCoordinator and initialize a new one,
> but because the first sourceCoordinator is not started(SourceCoordinator will
> be started before SchedulerBase::startScheduling), so the first
> SourceCoordinator will not be fully closed.
>
> And we also found another problem see jira:
> https://issues.apache.org/jira/browse/FLINK-32362
--
This message was sent by Atlassian Jira
(v8.20.10#820010)