[ 
https://issues.apache.org/jira/browse/FLINK-38883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-38883:
----------------------------------
    Description: 
We noticed an issue where the REST API reported a job being globally terminated 
({{{}FAILED{}}}) but the JRS entry wasn't created (due to some object store 
problems). The external monitor marked that as terminal due to the REST API 
call but the job recovered because no JRS entry existed and the job data wasn't 
cleaned up, i.e. during recovery of the JobManager the job was picked up again.

Conceptually (and theoretically), the problem stems from the fact that the JRS 
entry is only written after the job reached the globally terminal state (which 
is reported via the REST API). Instead, it should be written before reaching 
that state (i.e. as part of the {{{}CANCELLING{}}}, {{FAILING}} and 
{{FINISHING}} job state; where the latter one doesn't even exist in the job 
state transition graph).

There are different options to handle that problem:
 * Extend the {{JobDetails}} endpoint to include a flag that states whether the 
JRS entry was written for the job. The logic would live in the {{Dispatcher}} 
and might be the least invasive option. The REST API endpoint would need to 
become the only means to determine whether a job actually terminated globally. 
This is mentioned because of the {{stopWithSavepoint}} feature that where the 
result might be misinterpreted as the job is finished even though it doesn't 
include any information about the JRS entry.
 ** The JRS lookup needs to be cached somehow because the REST endpoints would 
be access way more regularly then what the JRS was initially meant to handle 
(lookup during job termination)
 * Retrieve {{JobStatus}} based on whether the JRS entry was written in all the 
{{{}JobStatus{}}}-including REST endpoint on the {{Dispatcher}} side.
 ** Would require some additional logic to store the job's previous status.
 ** Downside: The API is not necessarily aligned with the Flink logs
 * Handle the dirty JRS entry write in the scheduler before finishing the code. 
For the {{{}AdaptiveScheduler{}}}, we could handle such a callback in 
{{{}AdaptiveScheduler#onFinished{}}}. For the {{{}DefaultScheduler{}}}, the 
implementation is a bit trickier because the {{DefaultScheduler}} is closely 
coupled to the {{{}ExecutionGraph{}}}: The {{JobStatus}} transitions to 
{{FINISHED}} in {{DefaultExecutionGraph#jobFinished}}
 ** An easier approach is to use a wrapping class that owns its own 
{{terminationFuture}} that forwards the jobs {{terminationFuture}} in 
{{{}DefaultScheduler{}}}. That way, we could inject the JRS dirty entry writing 
before actually completing the termination future of the the wrapper class.
 ** Another approach is to refactor the DefaultScheduler code to move to a 
state machine approach similar to what the {{AdaptiveScheduler}} is using to 
improve the overall code quality and to decouple the {{ExecutionGraph}} from 
the scheduler. That would be a larger effort though.

  was:
We noticed an issue where the REST API reported a job being globally terminated 
({{FAILED}}) but the JRS entry wasn't created (due to some object store 
problems). The external monitor marked that as terminal due to the REST API 
call but the job recovered because no JRS entry existed and the job data wasn't 
cleaned up, i.e. during recovery of the JobManager the job was picked up again.

Conceptually, the problem stems from the fact that the JRS entry is only 
written after the job reached the globally terminal state (which is reported 
via the REST API). Instead, it should be written before reaching that state 
(i.e. as part of the CANCELLING, FAILING and FINISHING job state; where the 
latter one doesn't even exist in the job state transition graph).

There are different options to handle that problem:
* Extend the JobDetails endpoint to include a flag that states whether the JRS 
entry was written for the job. The logic would live in the Dispatcher and might 
be the least invasive option. The REST API endpoint would need to become the 
only means to determine whether a job actually terminated globally. This is 
mentioned because of the stopWithSavepoint feature that where the result might 
be misinterpreted as the job is finished even though it doesn't include any 
information about the JRS entry.
  * The JRS lookup needs to be cached somehow because the REST endpoints would 
be access way more regularly then what the JRS was initially meant to handle 
(lookup during job termination)
* Retrieve JobStatus based on whether the JRS entry was written in all the 
JobStatus-including REST endpoint on the Dispatcher side.
  * Would require some additional logic to store the job's previous status.
  * Downside: The API is not necessarily aligned with the Flink logs
* Handle the dirty JRS entry write in the scheduler before finishing the code. 
For the AdaptiveScheduler, we could handle such a callback in 
`AdaptiveScheduler#onFinished`. For the `DefaultScheduler`, the implementation 
is a bit trickier because the DefaultScheduler is closely coupled to the 
ExecutionGraph: The JobStatus transitions to `FINISHED` in 
`DefaultExecutionGraph#jobFinished`
  * An easier approach is to use a wrapping class that owns its own 
terminationFuture that forwards the jobs terminationFuture in 
`DefaultScheduler`. That way, we could inject the JRS dirty entry writing 
before actually completing the termination future of the the wrapper class. 


> Race condition of REST API and JRS entry might lead to inconsistent state
> -------------------------------------------------------------------------
>
>                 Key: FLINK-38883
>                 URL: https://issues.apache.org/jira/browse/FLINK-38883
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 2.0.1, 1.20.3, 2.2.0, 2.1.1
>            Reporter: Matthias Pohl
>            Priority: Major
>
> We noticed an issue where the REST API reported a job being globally 
> terminated ({{{}FAILED{}}}) but the JRS entry wasn't created (due to some 
> object store problems). The external monitor marked that as terminal due to 
> the REST API call but the job recovered because no JRS entry existed and the 
> job data wasn't cleaned up, i.e. during recovery of the JobManager the job 
> was picked up again.
> Conceptually (and theoretically), the problem stems from the fact that the 
> JRS entry is only written after the job reached the globally terminal state 
> (which is reported via the REST API). Instead, it should be written before 
> reaching that state (i.e. as part of the {{{}CANCELLING{}}}, {{FAILING}} and 
> {{FINISHING}} job state; where the latter one doesn't even exist in the job 
> state transition graph).
> There are different options to handle that problem:
>  * Extend the {{JobDetails}} endpoint to include a flag that states whether 
> the JRS entry was written for the job. The logic would live in the 
> {{Dispatcher}} and might be the least invasive option. The REST API endpoint 
> would need to become the only means to determine whether a job actually 
> terminated globally. This is mentioned because of the {{stopWithSavepoint}} 
> feature that where the result might be misinterpreted as the job is finished 
> even though it doesn't include any information about the JRS entry.
>  ** The JRS lookup needs to be cached somehow because the REST endpoints 
> would be access way more regularly then what the JRS was initially meant to 
> handle (lookup during job termination)
>  * Retrieve {{JobStatus}} based on whether the JRS entry was written in all 
> the {{{}JobStatus{}}}-including REST endpoint on the {{Dispatcher}} side.
>  ** Would require some additional logic to store the job's previous status.
>  ** Downside: The API is not necessarily aligned with the Flink logs
>  * Handle the dirty JRS entry write in the scheduler before finishing the 
> code. For the {{{}AdaptiveScheduler{}}}, we could handle such a callback in 
> {{{}AdaptiveScheduler#onFinished{}}}. For the {{{}DefaultScheduler{}}}, the 
> implementation is a bit trickier because the {{DefaultScheduler}} is closely 
> coupled to the {{{}ExecutionGraph{}}}: The {{JobStatus}} transitions to 
> {{FINISHED}} in {{DefaultExecutionGraph#jobFinished}}
>  ** An easier approach is to use a wrapping class that owns its own 
> {{terminationFuture}} that forwards the jobs {{terminationFuture}} in 
> {{{}DefaultScheduler{}}}. That way, we could inject the JRS dirty entry 
> writing before actually completing the termination future of the the wrapper 
> class.
>  ** Another approach is to refactor the DefaultScheduler code to move to a 
> state machine approach similar to what the {{AdaptiveScheduler}} is using to 
> improve the overall code quality and to decouple the {{ExecutionGraph}} from 
> the scheduler. That would be a larger effort though.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to