[ 
https://issues.apache.org/jira/browse/FLINK-38870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18050315#comment-18050315
 ] 

ConfX commented on FLINK-38870:
-------------------------------

I'm happy to send a PR for this if you think it's reasonable.

> Uninformative error message when job is suspended due to JobManager 
> leadership loss
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-38870
>                 URL: https://issues.apache.org/jira/browse/FLINK-38870
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 2.2.0
>            Reporter: ConfX
>            Priority: Major
>
> When a JobManager loses leadership (e.g., in HA scenarios), running jobs are 
> suspended. The job's `ExecutionGraph` transitions to `JobStatus.SUSPENDED` 
> and is archived. When a client later queries the job result via 
> `Dispatcher.requestJobResult()`, the system attempts to create a `JobResult` 
> from the archived execution graph.
> The issue is that `ApplicationStatus.fromJobStatus()` only maps globally 
> terminal states:
>  - `FINISHED` → `SUCCEEDED`
>  - `FAILED` → `FAILED`
>  - `CANCELED` → `CANCELED`
> But `SUSPENDED` is a locally terminal state (not globally terminal), so it 
> has no mapping and defaults to `UNKNOWN`:
> {code:java}
> // ApplicationStatus.java
> public static ApplicationStatus fromJobStatus(JobStatus jobStatus) {
>     return JOB_STATUS_APPLICATION_STATUS_BI_MAP.getOrDefault(jobStatus, 
> UNKNOWN);
> } {code}
> This causes `JobResult.toJobExecutionResult()` to throw:
> {code:java}
> org.apache.flink.runtime.client.JobExecutionException: Job completed with 
> illegal application status: UNKNOWN. {code}
> A better exception meesage would show the SUSPEND job status in the error 
> message with more informative logs.
> One way we can do is to:
>  
> Changes to JobResult.java
> 1. Add `jobStatus` field to preserve the original state:
> {code:java}
> @Nullable private final JobStatus jobStatus; {code}
> 2. Update `toJobExecutionResult()` to handle `UNKNOWN` with the actual 
> `JobStatus`:
> {code:java}
> } else if (applicationStatus == ApplicationStatus.UNKNOWN) {
>     if (jobStatus == JobStatus.SUSPENDED) {
>         exception = new JobExecutionException(
>                 jobId,
>                 "Job is in state SUSPENDED. This commonly happens when the "
>                         + "JobManager lost leadership. The job may recover "
>                         + "automatically if High Availability and a 
> persistent "
>                         + "job store are configured. If recovery is not 
> possible "
>                         + "(e.g., non-persistent ExecutionPlanStore), the job 
> "
>                         + "needs to be resubmitted.",
>                 cause);
>     } else {
>         exception = new JobExecutionException(
>                 jobId,
>                 "Job reached a terminal state without a corresponding "
>                         + "ApplicationStatus. JobStatus=" + jobStatus
>                         + ", ApplicationStatus=" + applicationStatus + ".",
>                 cause);
>     }
> } {code}
> Before above fix:
>  
> {code:java}
> org.apache.flink.runtime.client.JobExecutionException: Job completed with 
> illegal application status: UNKNOWN. 
> at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:148)
>   {code}
> After:
> {code:java}
> org.apache.flink.runtime.client.JobExecutionException: Job is in state 
> SUSPENDED. This commonly happens when the JobManager lost leadership. The job 
> may recover automatically if High Availability and a persistent job store are 
> configured. If recovery is not possible (e.g., non-persistent 
> ExecutionPlanStore), the job needs to be resubmitted.
>     at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:161)
>  {code}
>  
> Besides this fix, do you think there are anything else needed to handle 
> UNKOWN <-> SUSPEND state?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to