[
https://issues.apache.org/jira/browse/FLINK-38870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ConfX updated FLINK-38870:
--------------------------
Description:
When a JobManager loses leadership (e.g., in HA scenarios), running jobs are
suspended. The job's `ExecutionGraph` transitions to `JobStatus.SUSPENDED` and
is archived. When a client later queries the job result via
`Dispatcher.requestJobResult()`, the system attempts to create a `JobResult`
from the archived execution graph.
The issue is that `ApplicationStatus.fromJobStatus()` only maps globally
terminal states:
- `FINISHED` → `SUCCEEDED`
- `FAILED` → `FAILED`
- `CANCELED` → `CANCELED`
But `SUSPENDED` is a locally terminal state (not globally terminal), so it has
no mapping and defaults to `UNKNOWN`:
{code:java}
// ApplicationStatus.java
public static ApplicationStatus fromJobStatus(JobStatus jobStatus) {
return JOB_STATUS_APPLICATION_STATUS_BI_MAP.getOrDefault(jobStatus,
UNKNOWN);
} {code}
This causes `JobResult.toJobExecutionResult()` to throw:
{code:java}
org.apache.flink.runtime.client.JobExecutionException: Job completed with
illegal application status: UNKNOWN. {code}
A better exception meesage would show the SUSPEND job status in the error
message with more informative logs.
One way we can do is to:
Changes to JobResult.java
1. Add `jobStatus` field to preserve the original state:
{code:java}
@Nullable private final JobStatus jobStatus; {code}
2. Update `toJobExecutionResult()` to handle `UNKNOWN` with the actual
`JobStatus`:
{code:java}
} else if (applicationStatus == ApplicationStatus.UNKNOWN) {
if (jobStatus == JobStatus.SUSPENDED) {
exception = new JobExecutionException(
jobId,
"Job is in state SUSPENDED. This commonly happens when the "
+ "JobManager lost leadership. The job may recover "
+ "automatically if High Availability and a persistent "
+ "job store are configured. If recovery is not
possible "
+ "(e.g., non-persistent ExecutionPlanStore), the job "
+ "needs to be resubmitted.",
cause);
} else {
exception = new JobExecutionException(
jobId,
"Job reached a terminal state without a corresponding "
+ "ApplicationStatus. JobStatus=" + jobStatus
+ ", ApplicationStatus=" + applicationStatus + ".",
cause);
}
} {code}
Before above fix:
{code:java}
org.apache.flink.runtime.client.JobExecutionException: Job completed with
illegal application status: UNKNOWN.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:148)
{code}
After:
{code:java}
org.apache.flink.runtime.client.JobExecutionException: Job is in state
SUSPENDED. This commonly happens when the JobManager lost leadership. The job
may recover automatically if High Availability and a persistent job store are
configured. If recovery is not possible (e.g., non-persistent
ExecutionPlanStore), the job needs to be resubmitted.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:161)
{code}
Besides this fix, do you think there are anything else needed to handle UNKOWN
<-> SUSPEND state?
was:
When a JobManager loses leadership (e.g., in HA scenarios), running jobs are
suspended. The job's `ExecutionGraph` transitions to `JobStatus.SUSPENDED` and
is archived. When a client later queries the job result via
`Dispatcher.requestJobResult()`, the system attempts to create a `JobResult`
from the archived execution graph.
The issue is that `ApplicationStatus.fromJobStatus()` only maps *{*}globally
terminal{*}* states:
- `FINISHED` → `SUCCEEDED`
- `FAILED` → `FAILED`
- `CANCELED` → `CANCELED`
But `SUSPENDED` is a *{*}locally terminal{*}* state (not globally terminal), so
it has no mapping and defaults to `UNKNOWN`:
{code:java}
// ApplicationStatus.java
public static ApplicationStatus fromJobStatus(JobStatus jobStatus) {
return JOB_STATUS_APPLICATION_STATUS_BI_MAP.getOrDefault(jobStatus,
UNKNOWN);
} {code}
This causes `JobResult.toJobExecutionResult()` to throw:
{code:java}
org.apache.flink.runtime.client.JobExecutionException: Job completed with
illegal application status: UNKNOWN. {code}
A better exception meesage would show the SUSPEND job status in the error
message with more informative logs.
One way we can do is to:
Changes to JobResult.java
1. Add `jobStatus` field to preserve the original state:
{code:java}
@Nullable private final JobStatus jobStatus; {code}
2. Update `toJobExecutionResult()` to handle `UNKNOWN` with the actual
`JobStatus`:
{code:java}
} else if (applicationStatus == ApplicationStatus.UNKNOWN) {
if (jobStatus == JobStatus.SUSPENDED) {
exception = new JobExecutionException(
jobId,
"Job is in state SUSPENDED. This commonly happens when the "
+ "JobManager lost leadership. The job may recover "
+ "automatically if High Availability and a persistent "
+ "job store are configured. If recovery is not
possible "
+ "(e.g., non-persistent ExecutionPlanStore), the job "
+ "needs to be resubmitted.",
cause);
} else {
exception = new JobExecutionException(
jobId,
"Job reached a terminal state without a corresponding "
+ "ApplicationStatus. JobStatus=" + jobStatus
+ ", ApplicationStatus=" + applicationStatus + ".",
cause);
}
} {code}
Before above fix:
{code:java}
org.apache.flink.runtime.client.JobExecutionException: Job completed with
illegal application status: UNKNOWN.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:148)
{code}
After:
{code:java}
org.apache.flink.runtime.client.JobExecutionException: Job is in state
SUSPENDED. This commonly happens when the JobManager lost leadership. The job
may recover automatically if High Availability and a persistent job store are
configured. If recovery is not possible (e.g., non-persistent
ExecutionPlanStore), the job needs to be resubmitted.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:161)
{code}
Besides this fix, do you think there are anything else needed to handle UNKOWN
<-> SUSPEND state?
> Uninformative error message when job is suspended due to JobManager
> leadership loss
> -----------------------------------------------------------------------------------
>
> Key: FLINK-38870
> URL: https://issues.apache.org/jira/browse/FLINK-38870
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 2.2.0
> Reporter: ConfX
> Priority: Major
>
> When a JobManager loses leadership (e.g., in HA scenarios), running jobs are
> suspended. The job's `ExecutionGraph` transitions to `JobStatus.SUSPENDED`
> and is archived. When a client later queries the job result via
> `Dispatcher.requestJobResult()`, the system attempts to create a `JobResult`
> from the archived execution graph.
> The issue is that `ApplicationStatus.fromJobStatus()` only maps globally
> terminal states:
> - `FINISHED` → `SUCCEEDED`
> - `FAILED` → `FAILED`
> - `CANCELED` → `CANCELED`
> But `SUSPENDED` is a locally terminal state (not globally terminal), so it
> has no mapping and defaults to `UNKNOWN`:
> {code:java}
> // ApplicationStatus.java
> public static ApplicationStatus fromJobStatus(JobStatus jobStatus) {
> return JOB_STATUS_APPLICATION_STATUS_BI_MAP.getOrDefault(jobStatus,
> UNKNOWN);
> } {code}
> This causes `JobResult.toJobExecutionResult()` to throw:
> {code:java}
> org.apache.flink.runtime.client.JobExecutionException: Job completed with
> illegal application status: UNKNOWN. {code}
> A better exception meesage would show the SUSPEND job status in the error
> message with more informative logs.
> One way we can do is to:
>
> Changes to JobResult.java
> 1. Add `jobStatus` field to preserve the original state:
> {code:java}
> @Nullable private final JobStatus jobStatus; {code}
> 2. Update `toJobExecutionResult()` to handle `UNKNOWN` with the actual
> `JobStatus`:
> {code:java}
> } else if (applicationStatus == ApplicationStatus.UNKNOWN) {
> if (jobStatus == JobStatus.SUSPENDED) {
> exception = new JobExecutionException(
> jobId,
> "Job is in state SUSPENDED. This commonly happens when the "
> + "JobManager lost leadership. The job may recover "
> + "automatically if High Availability and a
> persistent "
> + "job store are configured. If recovery is not
> possible "
> + "(e.g., non-persistent ExecutionPlanStore), the job
> "
> + "needs to be resubmitted.",
> cause);
> } else {
> exception = new JobExecutionException(
> jobId,
> "Job reached a terminal state without a corresponding "
> + "ApplicationStatus. JobStatus=" + jobStatus
> + ", ApplicationStatus=" + applicationStatus + ".",
> cause);
> }
> } {code}
> Before above fix:
>
> {code:java}
> org.apache.flink.runtime.client.JobExecutionException: Job completed with
> illegal application status: UNKNOWN.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:148)
> {code}
> After:
> {code:java}
> org.apache.flink.runtime.client.JobExecutionException: Job is in state
> SUSPENDED. This commonly happens when the JobManager lost leadership. The job
> may recover automatically if High Availability and a persistent job store are
> configured. If recovery is not possible (e.g., non-persistent
> ExecutionPlanStore), the job needs to be resubmitted.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:161)
> {code}
>
> Besides this fix, do you think there are anything else needed to handle
> UNKOWN <-> SUSPEND state?
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)