[
https://issues.apache.org/jira/browse/FLINK-34518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17820679#comment-17820679
]
Matthias Pohl edited comment on FLINK-34518 at 2/26/24 12:38 PM:
-----------------------------------------------------------------
{quote}
I would have expected a bit more content after the log line about "Shutting
KubernetesApplicationClusterEntrypoint down with [...]", e.g. the info that the
currently running job was suspended in DefaultExecutionGraph:1060.
{quote}
Ok, never mind. The cancellation of the {{ExecutionGraph}} in the
{{Restarting}} state's constructor
([Restarting:65|https://github.com/apache/flink/blob/676e8e5528f99eb8ba5747f7489b0f02ee025dd6/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java#L65])
switches the {{ExecutionGraph}} eventually into a globally terminal state
prior to suspending the graph as part of the JM failover which prevents the
suspend logic to kick in
[DefaultExecutionGraph:1029|https://github.com/apache/flink/blob/c3c836216eaaaf24c1add3b490c8f425fda01d7c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java#L1029].
Suspending the job will still be executed in the {{Restarting}} state (see
[StateWithExecutionGraph:168|https://github.com/apache/flink/blob/16bac7802284563c95cfe18fcf153e91dc06216e/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java#L168])
resulting in the {{AdaptiveScheduler}} reaching {{Finished}} state with an
{{ArchivedExecutionGraph}} in {{CANCELED}} state (which is [globally
terminal|https://github.com/apache/flink/blob/582941b0f13d1cc51077e0e69fd100afe080779f/flink-core/src/main/java/org/apache/flink/api/common/JobStatus.java#L48]).
The {{ExecutionGraph}}'s status is then translated to a successful
{{JobManagerRunnerResult}} instance in
[DefaultJobMasterServiceProcess:219|https://github.com/apache/flink/blob/4882fbd9744d456e09ca60b6c7cf7a5b60326c73/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcess.java#L219]
which takes precendence over the completion of the {{JobManagerRunner}}'s
result in
[JobMasterServiceLeadershipRunner:143|https://github.com/apache/flink/blob/c9fcb0c74b1354f4f0f1b7c7f62191b8cc6b5725/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java#L143]
that happens when closing the runner as part of the JM failover.
But the actual race condition happens between the {{ExecutionGraph#cancel}}
call in the constructor of the {{Restarting}} state and the
{{ExecutionGraph#suspend}} call in {{StateWithExecutionGraph#suspend}}. The
behavior we're seeing can only happen if the cancellation completes before the
suspend call is triggered (because the {{ExecutionGraph}} will only then reach
the globally-terminal JobStatus {{CANCELLED}}).
was (Author: mapohl):
{quote}
I would have expected a bit more content after the log line about "Shutting
KubernetesApplicationClusterEntrypoint down with [...]", e.g. the info that the
currently running job was suspended in DefaultExecutionGraph:1060.
{quote}
Ok, never mind. The cancellation of the {{ExecutionGraph}} in the
{{Restarting}} state's constructor
([Restarting:65|https://github.com/apache/flink/blob/676e8e5528f99eb8ba5747f7489b0f02ee025dd6/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java#L65])
switches the {{ExecutionGraph}} eventually into a globally terminal state
prior to suspending the graph as part of the JM failover which prevents the
suspend logic to kick in
[DefaultExecutionGraph:1029|https://github.com/apache/flink/blob/c3c836216eaaaf24c1add3b490c8f425fda01d7c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java#L1029].
Suspending the job will still be executed in the {{Restarting}} state (see
[StateWithExecutionGraph:168|https://github.com/apache/flink/blob/16bac7802284563c95cfe18fcf153e91dc06216e/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java#L168])
resulting in the {{AdaptiveScheduler}} reaching {{Finished}} state with an
{{ArchivedExecutionGraph}} in {{CANCELED}} state (which is [globally
terminal|https://github.com/apache/flink/blob/582941b0f13d1cc51077e0e69fd100afe080779f/flink-core/src/main/java/org/apache/flink/api/common/JobStatus.java#L48]).
The {{ExecutionGraph}}'s status is then translated to a successful
{{JobManagerRunnerResult}} instance in
[DefaultJobMasterServiceProcess:219|https://github.com/apache/flink/blob/4882fbd9744d456e09ca60b6c7cf7a5b60326c73/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcess.java#L219]
which takes precendence over the completion of the {{JobManagerRunner}}'s
result in
[JobMasterServiceLeadershipRunner:143|https://github.com/apache/flink/blob/c9fcb0c74b1354f4f0f1b7c7f62191b8cc6b5725/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java#L143]
that happens when closing the runner as part of the JM failover.
> Adaptive Scheduler restores from empty state if JM fails during restarting
> state
> --------------------------------------------------------------------------------
>
> Key: FLINK-34518
> URL: https://issues.apache.org/jira/browse/FLINK-34518
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing, Runtime / Coordination
> Affects Versions: 1.18.1
> Reporter: Gyula Fora
> Priority: Critical
>
> If a JobManager failover occurs while the Job is in a Restarting state, the
> HA metadata is deleted (as if it was a globally terminal state) and the job
> restarts from an empty state after the JM comes back up:
> Jobmanager killed after killing Taskmanager (restarting phase):
> {noformat}
> 2024-02-26 10:10:12,147 DEBUG
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Ignore
> TaskManager pod that is already added: autoscaling-example-taskmanager-3-2
> 2024-02-26 10:10:13,799 DEBUG
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Trigger heartbeat request.
> 2024-02-26 10:10:13,799 DEBUG
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Trigger heartbeat request.
> 2024-02-26 10:10:13,799 DEBUG org.apache.flink.runtime.jobmaster.JobMaster
> [] - Received heartbeat request from
> 9b7e17b75812ab60ecf028e02368d0c2.
> 2024-02-26 10:10:13,799 DEBUG
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Received heartbeat from 251c25cf794e3c9396fc02306613507b.
> 2024-02-26 10:10:14,091 DEBUG
> org.apache.pekko.remote.transport.netty.NettyTransport [] - Remote
> connection to [/10.244.0.120:55647] was disconnected because of [id:
> 0x4a61a791, /10.244.0.120:55647 :> /10.244.0.118:6123] DISCONNECTED
> 2024-02-26 10:10:14,091 DEBUG
> org.apache.pekko.remote.transport.ProtocolStateActor [] - Association
> between local [tcp://[email protected]:6123] and remote
> [tcp://[email protected]:55647] was disassociated because the
> ProtocolStateActor failed: Unknown
> 2024-02-26 10:10:14,092 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - RECEIVED
> SIGNAL 15: SIGTERM. Shutting down as requested.
> 2024-02-26 10:10:14,094 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting
> KubernetesApplicationClusterEntrypoint down with application status UNKNOWN.
> Diagnostics Cluster entrypoint has been closed externally..
> 2024-02-26 10:10:14,095 INFO org.apache.flink.runtime.blob.BlobServer
> [] - Stopped BLOB server at 0.0.0.0:6124
> 2024-02-26 10:10:14,095 INFO
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting
> down rest endpoint.
> 2024-02-26 10:10:14,315 DEBUG
> org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.Watcher [] -
> Watcher closed
> 2024-02-26 10:10:14,511 DEBUG org.apache.pekko.actor.CoordinatedShutdown
> [] - Performing task [terminate-system] in CoordinatedShutdown
> phase [actor-system-terminate]
> 2024-02-26 10:10:14,595 INFO
> org.apache.pekko.remote.RemoteActorRefProvider$RemotingTerminator [] -
> Shutting down remote daemon.
> 2024-02-26 10:10:14,596 INFO
> org.apache.pekko.remote.RemoteActorRefProvider$RemotingTerminator [] - Remote
> daemon shut down; proceeding with flushing remote transports.{noformat}
> Then the new JM comes back it doesn't find any checkpoints as the HA metadata
> was deleted (we couldn't see this in the logs of the shutting down JM):
> {noformat}
> 2024-02-26 10:10:30,294 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] -
> Recovering checkpoints from
> KubernetesStateHandleStore{configMapName='autoscaling-example-5ddd0b1ba346d3bfd5ef53a63772e43c-config-map'}.2024-02-26
> 10:10:30,394 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] -
> Found 0 checkpoints in
> KubernetesStateHandleStore{configMapName='autoscaling-example-5ddd0b1ba346d3bfd5ef53a63772e43c-config-map'}.{noformat}
> Even the main method is re-run and the jobgraph is regenerated (which is
> expected given the HA metadata was removed incorrectly)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)