[
https://issues.apache.org/jira/browse/FLINK-38883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias Pohl updated FLINK-38883:
----------------------------------
Description:
We noticed an issue where the REST API reported a job being globally terminated
({{{}FAILED{}}}) but the JRS entry wasn't created (due to some object store
problems). The external monitor marked that as terminal due to the REST API
call but the job recovered because no JRS entry existed and the job data wasn't
cleaned up, i.e. during recovery of the JobManager the job was picked up again.
Conceptually (and theoretically), the problem stems from the fact that the JRS
entry is only written after the job reached the globally terminal state (which
is reported via the REST API). Instead, it should be written before reaching
that state (i.e. as part of the {{{}CANCELLING{}}}, {{FAILING}} and
{{FINISHING}} job state; where the latter one doesn't even exist in the job
state transition graph).
There are different options to handle that problem:
* Extend the {{JobDetails}} endpoint to include a flag that states whether the
JRS entry was written for the job. The logic would live in the {{Dispatcher}}
and might be the least invasive option. The REST API endpoint would need to
become the only means to determine whether a job actually terminated globally.
This is mentioned because of the {{stopWithSavepoint}} feature that where the
result might be misinterpreted as the job is finished even though it doesn't
include any information about the JRS entry.
** The JRS lookup needs to be cached somehow because the REST endpoints would
be access way more regularly then what the JRS was initially meant to handle
(lookup during job termination)
* Retrieve {{JobStatus}} based on whether the JRS entry was written in all the
{{{}JobStatus{}}}-including REST endpoint on the {{Dispatcher}} side.
** Would require some additional logic to store the job's previous status.
** Downside: The API is not necessarily aligned with the Flink logs
* Handle the dirty JRS entry write in the scheduler before finishing the code.
For the {{{}AdaptiveScheduler{}}}, we could handle such a callback in
{{{}AdaptiveScheduler#onFinished{}}}. For the {{{}DefaultScheduler{}}}, the
implementation is a bit trickier because the {{DefaultScheduler}} is closely
coupled to the {{{}ExecutionGraph{}}}: The {{JobStatus}} transitions to
{{FINISHED}} in {{DefaultExecutionGraph#jobFinished}}
** An easier approach is to use a wrapping class that owns its own
{{terminationFuture}} that forwards the jobs {{terminationFuture}} in
{{{}DefaultScheduler{}}}. That way, we could inject the JRS dirty entry writing
before actually completing the termination future of the the wrapper class.
** Another approach is to refactor the DefaultScheduler code to move to a
state machine approach similar to what the {{AdaptiveScheduler}} is using to
improve the overall code quality and to decouple the {{ExecutionGraph}} from
the scheduler. That would be a larger effort though.
was:
We noticed an issue where the REST API reported a job being globally terminated
({{FAILED}}) but the JRS entry wasn't created (due to some object store
problems). The external monitor marked that as terminal due to the REST API
call but the job recovered because no JRS entry existed and the job data wasn't
cleaned up, i.e. during recovery of the JobManager the job was picked up again.
Conceptually, the problem stems from the fact that the JRS entry is only
written after the job reached the globally terminal state (which is reported
via the REST API). Instead, it should be written before reaching that state
(i.e. as part of the CANCELLING, FAILING and FINISHING job state; where the
latter one doesn't even exist in the job state transition graph).
There are different options to handle that problem:
* Extend the JobDetails endpoint to include a flag that states whether the JRS
entry was written for the job. The logic would live in the Dispatcher and might
be the least invasive option. The REST API endpoint would need to become the
only means to determine whether a job actually terminated globally. This is
mentioned because of the stopWithSavepoint feature that where the result might
be misinterpreted as the job is finished even though it doesn't include any
information about the JRS entry.
* The JRS lookup needs to be cached somehow because the REST endpoints would
be access way more regularly then what the JRS was initially meant to handle
(lookup during job termination)
* Retrieve JobStatus based on whether the JRS entry was written in all the
JobStatus-including REST endpoint on the Dispatcher side.
* Would require some additional logic to store the job's previous status.
* Downside: The API is not necessarily aligned with the Flink logs
* Handle the dirty JRS entry write in the scheduler before finishing the code.
For the AdaptiveScheduler, we could handle such a callback in
`AdaptiveScheduler#onFinished`. For the `DefaultScheduler`, the implementation
is a bit trickier because the DefaultScheduler is closely coupled to the
ExecutionGraph: The JobStatus transitions to `FINISHED` in
`DefaultExecutionGraph#jobFinished`
* An easier approach is to use a wrapping class that owns its own
terminationFuture that forwards the jobs terminationFuture in
`DefaultScheduler`. That way, we could inject the JRS dirty entry writing
before actually completing the termination future of the the wrapper class.
> Race condition of REST API and JRS entry might lead to inconsistent state
> -------------------------------------------------------------------------
>
> Key: FLINK-38883
> URL: https://issues.apache.org/jira/browse/FLINK-38883
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 2.0.1, 1.20.3, 2.2.0, 2.1.1
> Reporter: Matthias Pohl
> Priority: Major
>
> We noticed an issue where the REST API reported a job being globally
> terminated ({{{}FAILED{}}}) but the JRS entry wasn't created (due to some
> object store problems). The external monitor marked that as terminal due to
> the REST API call but the job recovered because no JRS entry existed and the
> job data wasn't cleaned up, i.e. during recovery of the JobManager the job
> was picked up again.
> Conceptually (and theoretically), the problem stems from the fact that the
> JRS entry is only written after the job reached the globally terminal state
> (which is reported via the REST API). Instead, it should be written before
> reaching that state (i.e. as part of the {{{}CANCELLING{}}}, {{FAILING}} and
> {{FINISHING}} job state; where the latter one doesn't even exist in the job
> state transition graph).
> There are different options to handle that problem:
> * Extend the {{JobDetails}} endpoint to include a flag that states whether
> the JRS entry was written for the job. The logic would live in the
> {{Dispatcher}} and might be the least invasive option. The REST API endpoint
> would need to become the only means to determine whether a job actually
> terminated globally. This is mentioned because of the {{stopWithSavepoint}}
> feature that where the result might be misinterpreted as the job is finished
> even though it doesn't include any information about the JRS entry.
> ** The JRS lookup needs to be cached somehow because the REST endpoints
> would be access way more regularly then what the JRS was initially meant to
> handle (lookup during job termination)
> * Retrieve {{JobStatus}} based on whether the JRS entry was written in all
> the {{{}JobStatus{}}}-including REST endpoint on the {{Dispatcher}} side.
> ** Would require some additional logic to store the job's previous status.
> ** Downside: The API is not necessarily aligned with the Flink logs
> * Handle the dirty JRS entry write in the scheduler before finishing the
> code. For the {{{}AdaptiveScheduler{}}}, we could handle such a callback in
> {{{}AdaptiveScheduler#onFinished{}}}. For the {{{}DefaultScheduler{}}}, the
> implementation is a bit trickier because the {{DefaultScheduler}} is closely
> coupled to the {{{}ExecutionGraph{}}}: The {{JobStatus}} transitions to
> {{FINISHED}} in {{DefaultExecutionGraph#jobFinished}}
> ** An easier approach is to use a wrapping class that owns its own
> {{terminationFuture}} that forwards the jobs {{terminationFuture}} in
> {{{}DefaultScheduler{}}}. That way, we could inject the JRS dirty entry
> writing before actually completing the termination future of the the wrapper
> class.
> ** Another approach is to refactor the DefaultScheduler code to move to a
> state machine approach similar to what the {{AdaptiveScheduler}} is using to
> improve the overall code quality and to decouple the {{ExecutionGraph}} from
> the scheduler. That would be a larger effort though.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)