[ 
https://issues.apache.org/jira/browse/FLINK-32316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cai Liuyang updated FLINK-32316:
--------------------------------
    Description: 
When we try SourceAlignment feature, we found there will be a duplicated 
announceCombinedWatermark task will be scheduled after JobManager failover, and 
auto recover job from checkpoint.

The reason i think is  we should schedule announceCombinedWatermark task during 
SourceCoordinator::start function not in SourceCoordinator construct function 
(see  
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
 ), because when jobManager encounter failover and auto recover job, it will 
create SourceCoordinator twice:
 * The first one is  when JobMaster is created it will create the 
DefaultExecutionGraph, this will init the first sourceCoordinator but will not 
start it.
 * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
method, which will be reset old sourceCoordinator and initialize a new one, but 
because the first sourceCoordinator is not started(SourceCoordinator will be 
started before SchedulerBase::startScheduling), so the first SourceCoordinator 
will not be fully closed.

 

  was:
When we try SourceAlignment feature, we found there will be a duplicated 
announceCombinedWatermark task will be scheduled after JobManager failover, and 
auto recover job from checkpoint.

The reason i think is  we should schedule announceCombinedWatermark task during 
SourceCoordinator::start function not in SourceCoordinator construct function 
(see  
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
 ), because when jobManager encounter failover and auto recover job, it will 
create SourceCoordinator twice:
 * The first one is  when JobMaster is create it will create the 
DefaultExecutionGraph.
 * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
method, which will be reset old sourceCoordinator and initialize a new one, but 
because the first sourceCoordinator is not started(SourceCoordinator will be 
started before SchedulerBase::startScheduling, so the first SourceCoordinator 
will not be fully closed).

 


> Duplicated announceCombinedWatermark task maybe scheduled if jobmanager 
> failover
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-32316
>                 URL: https://issues.apache.org/jira/browse/FLINK-32316
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.16.0
>            Reporter: Cai Liuyang
>            Priority: Major
>
> When we try SourceAlignment feature, we found there will be a duplicated 
> announceCombinedWatermark task will be scheduled after JobManager failover, 
> and auto recover job from checkpoint.
> The reason i think is  we should schedule announceCombinedWatermark task 
> during SourceCoordinator::start function not in SourceCoordinator construct 
> function (see  
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
>  ), because when jobManager encounter failover and auto recover job, it will 
> create SourceCoordinator twice:
>  * The first one is  when JobMaster is created it will create the 
> DefaultExecutionGraph, this will init the first sourceCoordinator but will 
> not start it.
>  * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
> method, which will be reset old sourceCoordinator and initialize a new one, 
> but because the first sourceCoordinator is not started(SourceCoordinator will 
> be started before SchedulerBase::startScheduling), so the first 
> SourceCoordinator will not be fully closed.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to