[ 
https://issues.apache.org/jira/browse/FLINK-39891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lrsb updated FLINK-39891:
-------------------------
    Description: 
Background

FlinkSessionJob submission generates a random JobID, records it in status as
RECONCILING, then submits to the session cluster.
FLINK-38858 hardened this against the common crash window: on a retry the 
operator
reuses the recorded JobID while state is RECONCILING, and Flink rejects a
same JobID resubmit with DuplicateJobSubmissionException. This prevents
duplicates when the recorded JobID survives.

Problem

A residual window remains where the operator generates a new JobID while a
job from a previous submission is still running on the session cluster, leaving 
the
old job orphaned. Two jobs then run concurrently against the same sources/sinks,
breaking exactly-once semantics and double-committing to external systems. This
happens when the recorded JobID is not reused, e.g.: the submit REST call 
succeeds on the cluster but the status patch persisting the JobID was lost 
(operator crash) before it was durably recorded -> on retry existingJobId == 
null -> new JobID generated.

The existing checkIfAlreadyUpgraded only probes the currently recorded JobID.

Proposed change

Add an opt-in safeguard: before submitting (only when *not* reusing an existing
JobID), list non-terminal jobs on the session cluster, cancel any that belong to
this FlinkSessionJob, wait for them to reach a terminal state, then submit. Gate
behind a new dynamic config option, default false 
(kubernetes.operator.session-job.cancel-orphaned-on-submit).

  was:
Background

FlinkSessionJob submission generates a random JobID, records it in status as
RECONCILING, then submits to the session cluster.
FLINK-38858 hardened this against the common crash window: on a retry the 
operator
reuses the recorded JobID while state is RECONCILING, and Flink rejects a
same JobID resubmit with DuplicateJobSubmissionException. This prevents
duplicates when the recorded JobID survives.

Problem

A residual window remains where the operator generates a new JobID while a
job from a previous submission is still running on the session cluster, leaving 
the
old job orphaned. Two jobs then run concurrently against the same sources/sinks,
breaking exactly-once semantics and double-committing to external systems. This
happens when the recorded JobID is not reused, e.g.: the submit REST call 
succeeds on the cluster but the status patch persisting the JobID was lost 
(operator crash) before it was durably recorded -> on retry existingJobId == 
null -> new JobID generated.

The existing checkIfAlreadyUpgraded only probes the ** currently recorded JobID.

Proposed change

Add an opt-in safeguard: before submitting (only when *not* reusing an existing
JobID), list non-terminal jobs on the session cluster, cancel any that belong to
this FlinkSessionJob, wait for them to reach a terminal state, then submit. Gate
behind a new dynamic config option, default false 
(kubernetes.operator.session-job.cancel-orphaned-on-submit).


> Cancel orphaned session jobs on the session cluster before (re)submitting a 
> FlinkSessionJob 
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39891
>                 URL: https://issues.apache.org/jira/browse/FLINK-39891
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kubernetes Operator
>    Affects Versions: 1.12.0, 1.14.0, 1.15.0, 1.13
>            Reporter: lrsb
>            Priority: Minor
>              Labels: pull-request-available
>
> Background
> FlinkSessionJob submission generates a random JobID, records it in status as
> RECONCILING, then submits to the session cluster.
> FLINK-38858 hardened this against the common crash window: on a retry the 
> operator
> reuses the recorded JobID while state is RECONCILING, and Flink rejects a
> same JobID resubmit with DuplicateJobSubmissionException. This prevents
> duplicates when the recorded JobID survives.
> Problem
> A residual window remains where the operator generates a new JobID while a
> job from a previous submission is still running on the session cluster, 
> leaving the
> old job orphaned. Two jobs then run concurrently against the same 
> sources/sinks,
> breaking exactly-once semantics and double-committing to external systems. 
> This
> happens when the recorded JobID is not reused, e.g.: the submit REST call 
> succeeds on the cluster but the status patch persisting the JobID was lost 
> (operator crash) before it was durably recorded -> on retry existingJobId == 
> null -> new JobID generated.
> The existing checkIfAlreadyUpgraded only probes the currently recorded JobID.
> Proposed change
> Add an opt-in safeguard: before submitting (only when *not* reusing an 
> existing
> JobID), list non-terminal jobs on the session cluster, cancel any that belong 
> to
> this FlinkSessionJob, wait for them to reach a terminal state, then submit. 
> Gate
> behind a new dynamic config option, default false 
> (kubernetes.operator.session-job.cancel-orphaned-on-submit).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to