[
https://issues.apache.org/jira/browse/FLINK-39891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
lrsb updated FLINK-39891:
-------------------------
Description:
Background
FlinkSessionJob submission generates a random JobID, records it in status as
RECONCILING, then submits to the session cluster.
FLINK-38858 hardened this against the common crash window: on a retry the
operator
reuses the recorded JobID while state is RECONCILING, and Flink rejects a
same JobID resubmit with DuplicateJobSubmissionException. This prevents
duplicates when the recorded JobID survives.
Problem
A residual window remains where the operator generates a new JobID while a
job from a previous submission is still running on the session cluster, leaving
the
old job orphaned. Two jobs then run concurrently against the same sources/sinks,
breaking exactly-once semantics and double-committing to external systems. This
happens when the recorded JobID is not reused, e.g.: the submit REST call
succeeds on the cluster but the status patch persisting the JobID was lost
(operator crash) before it was durably recorded -> on retry existingJobId ==
null -> new JobID generated.
The existing checkIfAlreadyUpgraded only probes the currently recorded JobID.
Proposed change
Add an opt-in safeguard: before submitting (only when *not* reusing an existing
JobID), list non-terminal jobs on the session cluster, cancel any that belong to
this FlinkSessionJob, wait for them to reach a terminal state, then submit. Gate
behind a new dynamic config option, default false
(kubernetes.operator.session-job.cancel-orphaned-on-submit).
was:
Background
FlinkSessionJob submission generates a random JobID, records it in status as
RECONCILING, then submits to the session cluster.
FLINK-38858 hardened this against the common crash window: on a retry the
operator
reuses the recorded JobID while state is RECONCILING, and Flink rejects a
same JobID resubmit with DuplicateJobSubmissionException. This prevents
duplicates when the recorded JobID survives.
Problem
A residual window remains where the operator generates a new JobID while a
job from a previous submission is still running on the session cluster, leaving
the
old job orphaned. Two jobs then run concurrently against the same sources/sinks,
breaking exactly-once semantics and double-committing to external systems. This
happens when the recorded JobID is not reused, e.g.: the submit REST call
succeeds on the cluster but the status patch persisting the JobID was lost
(operator crash) before it was durably recorded -> on retry existingJobId ==
null -> new JobID generated.
The existing checkIfAlreadyUpgraded only probes the ** currently recorded JobID.
Proposed change
Add an opt-in safeguard: before submitting (only when *not* reusing an existing
JobID), list non-terminal jobs on the session cluster, cancel any that belong to
this FlinkSessionJob, wait for them to reach a terminal state, then submit. Gate
behind a new dynamic config option, default false
(kubernetes.operator.session-job.cancel-orphaned-on-submit).
> Cancel orphaned session jobs on the session cluster before (re)submitting a
> FlinkSessionJob
> --------------------------------------------------------------------------------------------
>
> Key: FLINK-39891
> URL: https://issues.apache.org/jira/browse/FLINK-39891
> Project: Flink
> Issue Type: Improvement
> Components: Kubernetes Operator
> Affects Versions: 1.12.0, 1.14.0, 1.15.0, 1.13
> Reporter: lrsb
> Priority: Minor
> Labels: pull-request-available
>
> Background
> FlinkSessionJob submission generates a random JobID, records it in status as
> RECONCILING, then submits to the session cluster.
> FLINK-38858 hardened this against the common crash window: on a retry the
> operator
> reuses the recorded JobID while state is RECONCILING, and Flink rejects a
> same JobID resubmit with DuplicateJobSubmissionException. This prevents
> duplicates when the recorded JobID survives.
> Problem
> A residual window remains where the operator generates a new JobID while a
> job from a previous submission is still running on the session cluster,
> leaving the
> old job orphaned. Two jobs then run concurrently against the same
> sources/sinks,
> breaking exactly-once semantics and double-committing to external systems.
> This
> happens when the recorded JobID is not reused, e.g.: the submit REST call
> succeeds on the cluster but the status patch persisting the JobID was lost
> (operator crash) before it was durably recorded -> on retry existingJobId ==
> null -> new JobID generated.
> The existing checkIfAlreadyUpgraded only probes the currently recorded JobID.
> Proposed change
> Add an opt-in safeguard: before submitting (only when *not* reusing an
> existing
> JobID), list non-terminal jobs on the session cluster, cancel any that belong
> to
> this FlinkSessionJob, wait for them to reach a terminal state, then submit.
> Gate
> behind a new dynamic config option, default false
> (kubernetes.operator.session-job.cancel-orphaned-on-submit).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)