[ 
https://issues.apache.org/jira/browse/FLINK-27569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17535814#comment-17535814
 ] 

Yang Wang commented on FLINK-27569:
-----------------------------------

cc @[~mapohl] Would you like to share some insights on this ticket? It seems 
that the job result store could not work together with 
{{{}execution.shutdown-on-application-finish = false{}}}.

> Terminated Flink job restarted from empty state when 
> execution.shutdown-on-application-finish is false
> ------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-27569
>                 URL: https://issues.apache.org/jira/browse/FLINK-27569
>             Project: Flink
>          Issue Type: Bug
>          Components: Deployment / Kubernetes, Runtime / Checkpointing
>    Affects Versions: 1.15.0
>            Reporter: Gyula Fora
>            Priority: Critical
>         Attachments: Screenshot 2022-05-11 at 08.46.51.png, Screenshot 
> 2022-05-11 at 08.50.03.png
>
>
> When Jobmanager HA is enabled and execution.shutdown-on-application-finish = 
> false, terminated jobs (failed, cancelled etc) will be resubmitted from a 
> compeltely empty state on jobmanager failover.
> Please see the following situation. Flink 1.15, HA enabled, shutdown on app 
> finish off:
> 1. Submit Flink application cluster
> 2. Call cancel with savepoint -> see logs below
> job succesfully finishes with savepoint
> {noformat}
> 2022-05-11 06:42:48,562 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job 
> 00000000000000000000000000000000 reached terminal state FINISHED.
> 2022-05-11 06:42:48,624 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job 
> 00000000000000000000000000000000 has been registered for cleanup in the 
> JobResultStore after reaching a terminal state.
> 2022-05-11 06:42:48,626 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Stopping the JobMaster for job 'State machine job' 
> (00000000000000000000000000000000).
> 2022-05-11 06:42:48,629 INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Shutting down
> 2022-05-11 06:42:48,647 INFO  
> org.apache.flink.kubernetes.highavailability.KubernetesCheckpointIDCounter [] 
> - Shutting down.
> 2022-05-11 06:42:48,647 INFO  
> org.apache.flink.kubernetes.highavailability.KubernetesCheckpointIDCounter [] 
> - Removing counter from ConfigMap 
> basic-checkpoint-ha-example-00000000000000000000000000000000-config-map
> 2022-05-11 06:42:48,652 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - 
> Releasing slot [0cdb18eefcb2133049223214d4716fa0].
> 2022-05-11 06:42:48,653 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - 
> Releasing slot [bf5ece74692d786f6ba2b067c76ee1d9].
> 2022-05-11 06:42:48,653 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Close ResourceManager connection 
> 220ea961c86ea8042fde2151fd05a5c9: Stopping JobMaster for job 'State machine 
> job' (00000000000000000000000000000000).
> 2022-05-11 06:42:48,653 INFO  
> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
> Stopping DefaultLeaderRetrievalService.
> 2022-05-11 06:42:48,653 INFO  
> org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver 
> [] - Stopping 
> KubernetesLeaderRetrievalDriver{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
> 2022-05-11 06:42:48,655 INFO  
> org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer
>  [] - Stopped to watch for 
> default/basic-checkpoint-ha-example-cluster-config-map, watching 
> id:9a1bc36b-6a76-4970-96a0-945e9a12b66d
> 2022-05-11 06:42:48,655 INFO  
> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
> Stopping DefaultLeaderRetrievalService.
> 2022-05-11 06:42:48,655 INFO  
> org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver 
> [] - Stopping 
> KubernetesLeaderRetrievalDriver{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
> 2022-05-11 06:42:48,655 INFO  
> org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer
>  [] - Stopped to watch for 
> default/basic-checkpoint-ha-example-cluster-config-map, watching 
> id:5facec4c-d888-43b4-88d0-d1f34912d35a
> 2022-05-11 06:42:48,655 INFO  
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
> Disconnect job manager 
> [email protected]://[email protected]:6123/user/rpc/jobmanager_2
>  for job 00000000000000000000000000000000 from the resource manager.
> 2022-05-11 06:42:48,660 INFO  
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
> Stopping DefaultLeaderElectionService.
> 2022-05-11 06:42:48,723 INFO  
> org.apache.flink.kubernetes.highavailability.KubernetesMultipleComponentLeaderElectionHaServices
>  [] - Clean up the high availability data for job 
> 00000000000000000000000000000000.
> 2022-05-11 06:42:48,753 INFO  
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Removed job 
> graph 00000000000000000000000000000000 from 
> KubernetesStateHandleStore{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
> 2022-05-11 06:42:48,758 INFO  
> org.apache.flink.kubernetes.highavailability.KubernetesMultipleComponentLeaderElectionHaServices
>  [] - Finished cleaning up the high availability data for job 
> 00000000000000000000000000000000.
> 2022-05-11 06:42:50,321 INFO  
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap 
> [] - Application completed SUCCESSFULLY{noformat}
> !Screenshot 2022-05-11 at 08.46.51.png|width=882,height=106!
> 3. Trigger JobManager failover
> Jobmanager recovers, but resubmits job from empty state:
> {noformat}
> 2022-05-11 06:48:04,535 INFO  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - Job 00000000000000000000000000000000 is submitted.
> 2022-05-11 06:48:04,535 INFO  
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - Submitting Job with JobId=00000000000000000000000000000000.
> 2022-05-11 06:48:04,629 INFO  
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered 0 
> pods from previous attempts, current attempt id is 1.
> 2022-05-11 06:48:04,629 INFO  
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
> Recovered 0 workers from previous attempt.
> 2022-05-11 06:48:04,650 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Received 
> JobGraph submission 'State machine job' (00000000000000000000000000000000).
> 2022-05-11 06:48:04,652 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Submitting 
> job 'State machine job' (00000000000000000000000000000000).
> 2022-05-11 06:48:04,746 INFO  
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Added 
> JobGraph(jobId: 00000000000000000000000000000000) to 
> KubernetesStateHandleStore{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
> 2022-05-11 06:48:04,826 INFO  
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
> Starting DefaultLeaderElectionService with 
> org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@370a1b27.
> 2022-05-11 06:48:04,838 INFO  
> org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting 
> RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at 
> akka://flink/user/rpc/jobmanager_2 .
> 2022-05-11 06:48:04,843 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Initializing job 'State machine job' 
> (00000000000000000000000000000000).
> 2022-05-11 06:48:04,926 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Using restart back off time strategy 
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, 
> backoffTimeMS=1000) for State machine job (00000000000000000000000000000000).
> 2022-05-11 06:48:04,955 INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - 
> Recovering checkpoints from 
> KubernetesStateHandleStore{configMapName='basic-checkpoint-ha-example-00000000000000000000000000000000-config-map'}.
> 2022-05-11 06:48:04,959 INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - 
> Found 0 checkpoints in 
> KubernetesStateHandleStore{configMapName='basic-checkpoint-ha-example-00000000000000000000000000000000-config-map'}.
> 2022-05-11 06:48:04,959 INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - 
> Trying to fetch 0 checkpoints from storage.
> 2022-05-11 06:48:04,974 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Running initialization on master for job State machine job 
> (00000000000000000000000000000000).
> 2022-05-11 06:48:04,974 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Successfully ran initialization on master in 0 ms.
> 2022-05-11 06:48:05,032 INFO  
> org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - 
> Built 1 new pipelined regions in 0 ms, total 1 pipelined regions currently.
> 2022-05-11 06:48:05,035 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - No state backend has been configured, using default 
> (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@670a312c
> 2022-05-11 06:48:05,035 INFO  
> org.apache.flink.runtime.state.StateBackendLoader            [] - State 
> backend loader loads the state backend as HashMapStateBackend
> 2022-05-11 06:48:05,036 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Checkpoint storage is set to 'jobmanager'
> 2022-05-11 06:48:05,053 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No 
> checkpoint found during restore.
> 2022-05-11 06:48:05,058 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Using failover strategy 
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@bdfaf5f
>  for State machine job (00000000000000000000000000000000).
> 2022-05-11 06:48:05,065 INFO  
> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
> Starting DefaultLeaderRetrievalService with 
> KubernetesLeaderRetrievalDriver{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
> 2022-05-11 06:48:05,066 INFO  
> org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer
>  [] - Starting to watch for 
> default/basic-checkpoint-ha-example-cluster-config-map, watching 
> id:83411d91-3094-46c5-b2cc-0576bf5cc161
> 2022-05-11 06:48:05,126 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Starting execution of job 'State machine job' 
> (00000000000000000000000000000000) under job master id 
> 9c63401786b3856e5c8a0cf069e44198.
> 2022-05-11 06:48:05,132 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Starting scheduling with scheduling strategy 
> [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
> {noformat}
> !Screenshot 2022-05-11 at 08.50.03.png|width=861,height=79!
>  
> In addition, checkpoint history is also lost (which is probably the main 
> cause of the issue)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to