[ 
https://issues.apache.org/jira/browse/FLINK-39043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-39043:
-----------------------------------
    Description: 
I have a batch FlinkDeployment with the following status:


{code:java}
  status:
    clusterInfo:
      flink-revision: 38f2688 @ 2026-01-15T15:50:15+01:00
      flink-version: 0.0119-88-cp
      state-size: "1940"
      total-cpu: "1.0"
      total-memory: "2147483648"
    error: 
'{"type":"org.apache.flink.kubernetes.operator.exception.UpgradeFailureException","message":"Latest
      checkpoint not externally addressable, Manual restore 
required.","additionalMetadata":{},"throwableList":[]}'
    jobManagerDeploymentStatus: READY
    jobStatus:
      checkpointInfo:
        lastPeriodicCheckpointTimestamp: 0
      jobId: d7711dd6e599d917195e0b62feb51635
      jobName: select-2
      savepointInfo:
        lastPeriodicSavepointTimestamp: 0
        savepointHistory: []
      startTime: "1770322034225"
      state: FINISHED
      updateTime: "1770322169548"
    lifecycleState: STABLE
    observedGeneration: 1
    reconciliationStatus:
      lastReconciledSpec: '..'
      reconciliationTimestamp: 1770321989373
      state: DEPLOYED
    taskManager:
      labelSelector: component=taskmanager,app=select-2
      replicas: 1
{code}

-------

Beware, below text is Claude Opus 4.6 generated, but has been checked and seems 
plausible:

h2. Description

When a Flink job completes (reaches FINISHED state) and its latest checkpoint 
is not externally addressable (e.g., the job used
in-memory or non-persistent checkpoint storage), the operator enters an 
infinite error loop that prevents any spec changes from being
reconciled — including {{state: suspended}}.

This means that once a bounded job finishes with non-persistent checkpoints, 
the FlinkDeployment becomes effectively unmanageable by the
 operator. The {{observedGeneration}} never advances to match {{generation}}, 
and any spec updates (suspend, delete, config changes) are
 silently ignored.

h2. Root Cause

The issue is in {{FlinkDeploymentController.reconcile()}}. The observer runs 
before the reconciler in the same try block:

{code:java}
// FlinkDeploymentController.java, lines ~146-166
try {
    observerFactory.getOrCreate(flinkApp).observe(ctx);       // (1) throws
    StatusRecorder.updateStatusImmediately(...);
    if (!validateDeployment(ctx)) { ... }
    reconcilerFactory.getOrCreate(flinkApp).reconcile(ctx);   // (2) never 
reached
} catch (UpgradeFailureException ufe) {
    ReconciliationUtils.updateForReconciliationError(ctx, ufe);
    triggerErrorEvent(ctx, ufe, ufe.getReason());
}
{code}

The call chain that throws:

{{AbstractFlinkDeploymentObserver.observeInternal()}} → 
{{observeFlinkCluster()}}

{{ApplicationObserver.observeFlinkCluster()}} → 
{{savepointObserver.observeSavepointStatus()}}

{{SnapshotObserver.observeSavepointStatus()}} — for terminal jobs, calls 
{{observeLatestCheckpoint()}}

{{observeLatestCheckpoint()}} → {{flinkService.getLastCheckpoint()}}

{{AbstractFlinkService.getLastCheckpoint()}} — throws 
{{UpgradeFailureException("Latest checkpoint not externally addressable, Manual
restore required.", "CheckpointNotFound")}} when the checkpoint's external 
pointer equals
{{NonPersistentMetadataCheckpointStorageLocation.EXTERNAL_POINTER}}

Because the exception is thrown in step 1, the reconciler at step 2 never 
executes. This repeats every reconciliation cycle (~15
seconds), indefinitely. The operator emits a {{CHECKPOINTNOTFOUND}} warning 
event each cycle but never processes the pending spec
change.

h2. Why Machine Size Affects Reproducibility

This bug is deterministic once the conditions are met, but it may appear 
intermittent depending on hardware resources.

The only way the reconciler can bypass the broken observer path is when 
{{isJmDeploymentReady()}} returns false — i.e., the JM pod is
not ready. In application mode, the JM process shuts down after the job 
finishes. The key factor is how quickly the JM process completes
 its shutdown:

- On larger machines: The JM shutdown completes faster, the container exits, 
the pod becomes NotReady, and the operator's next
reconciliation cycle skips the observer and processes the suspend. The bug is 
masked by lucky timing.
- On smaller/resource-constrained machines: CPU contention slows the JM 
shutdown. The JM process remains alive for minutes (observed:
container still Running with 0 restarts well after the job reported FINISHED). 
The pod stays Ready, so the observer runs every cycle and
 keeps throwing. The bug is triggered reliably.

h2. Steps to Reproduce

Create a FlinkDeployment with {{upgradeMode: stateless}} and no persistent 
checkpoint storage configured (or using default in-memory
checkpoint storage)

Submit a bounded Flink job (e.g., a SELECT query on a bounded source that 
produces a finite number of rows)

Wait for the job to reach {{FINISHED}} state

Update the FlinkDeployment spec to set {{state: suspended}} (or any other spec 
change)

Observe that:

#* {{generation}} increments but {{observedGeneration}} stays at its previous 
value
#* The operator logs show {{CHECKPOINTNOTFOUND}} warnings repeating every ~15 
seconds
#* The FlinkDeployment pods are never cleaned up
#* The spec change is never reconciled

This reproduces most reliably on resource-constrained machines where the JM 
shutdown is slow. On larger machines, the JM may exit
quickly enough that the operator accidentally bypasses the bug.

h2. Expected Behavior

When a job has FINISHED and a spec change (especially {{state: suspended}}) is 
requested, the operator should process the spec change
regardless of the checkpoint state. A FINISHED job with {{upgradeMode: 
stateless}} does not need an externally addressable checkpoint to
 be suspended or cleaned up.

h2. Observed Behavior

The operator enters an infinite loop of {{UpgradeFailureException}} warnings 
and never reconciles the spec change. The FlinkDeployment
becomes stuck with {{observedGeneration}} < {{generation}} permanently.

h2. Impact

- FlinkDeployments for bounded/batch jobs with non-persistent checkpoints 
become unmanageable after job completion
- Resources (pods, services) are never cleaned up



  was:
I have a batch FlinkDeployment with the following status:


{code:java}
  status:
    clusterInfo:
      flink-revision: 38f2688 @ 2026-01-15T15:50:15+01:00
      flink-version: 0.0119-88-cp
      state-size: "1940"
      total-cpu: "1.0"
      total-memory: "2147483648"
    error: 
'{"type":"org.apache.flink.kubernetes.operator.exception.UpgradeFailureException","message":"Latest
      checkpoint not externally addressable, Manual restore 
required.","additionalMetadata":{},"throwableList":[]}'
    jobManagerDeploymentStatus: READY
    jobStatus:
      checkpointInfo:
        lastPeriodicCheckpointTimestamp: 0
      jobId: d7711dd6e599d917195e0b62feb51635
      jobName: select-2
      savepointInfo:
        lastPeriodicSavepointTimestamp: 0
        savepointHistory: []
      startTime: "1770322034225"
      state: FINISHED
      updateTime: "1770322169548"
    lifecycleState: STABLE
    observedGeneration: 1
    reconciliationStatus:
      lastReconciledSpec: '..'
      reconciliationTimestamp: 1770321989373
      state: DEPLOYED
    taskManager:
      labelSelector: component=taskmanager,app=select-2
      replicas: 1
{code}

-------

Beware, below text is Claude Opus 4.6 generated, but has been checked and seems 
plausible:

h2. Description

When a Flink job completes (reaches FINISHED state) and its latest checkpoint 
is not externally addressable (e.g., the job used
in-memory or non-persistent checkpoint storage), the operator enters an 
infinite error loop that prevents any spec changes from being
reconciled — including {{state: suspended}}.

This means that once a bounded job finishes with non-persistent checkpoints, 
the FlinkDeployment becomes effectively unmanageable by the
 operator. The {{observedGeneration}} never advances to match {{generation}}, 
and any spec updates (suspend, delete, config changes) are
 silently ignored.

h2. Root Cause

The issue is in {{FlinkDeploymentController.reconcile()}}. The observer runs 
before the reconciler in the same try block:

{code:java}
// FlinkDeploymentController.java, lines ~146-166
try {
    observerFactory.getOrCreate(flinkApp).observe(ctx);       // (1) throws
    StatusRecorder.updateStatusImmediately(...);
    if (!validateDeployment(ctx)) { ... }
    reconcilerFactory.getOrCreate(flinkApp).reconcile(ctx);   // (2) never 
reached
} catch (UpgradeFailureException ufe) {
    ReconciliationUtils.updateForReconciliationError(ctx, ufe);
    triggerErrorEvent(ctx, ufe, ufe.getReason());
}
{code}

The call chain that throws:

{{AbstractFlinkDeploymentObserver.observeInternal()}} → 
{{observeFlinkCluster()}}

{{ApplicationObserver.observeFlinkCluster()}} → 
{{savepointObserver.observeSavepointStatus()}}

{{SnapshotObserver.observeSavepointStatus()}} — for terminal jobs, calls 
{{observeLatestCheckpoint()}}

{{observeLatestCheckpoint()}} → {{flinkService.getLastCheckpoint()}}

{{AbstractFlinkService.getLastCheckpoint()}} — throws 
{{UpgradeFailureException("Latest checkpoint not externally addressable, Manual
restore required.", "CheckpointNotFound")}} when the checkpoint's external 
pointer equals
{{NonPersistentMetadataCheckpointStorageLocation.EXTERNAL_POINTER}}

Because the exception is thrown in step 1, the reconciler at step 2 never 
executes. This repeats every reconciliation cycle (~15
seconds), indefinitely. The operator emits a {{CHECKPOINTNOTFOUND}} warning 
event each cycle but never processes the pending spec
change.

h2. Why Machine Size Affects Reproducibility

This bug is deterministic once the conditions are met, but it may appear 
intermittent depending on hardware resources.

The only way the reconciler can bypass the broken observer path is when 
{{isJmDeploymentReady()}} returns false — i.e., the JM pod is
not ready. In application mode, the JM process shuts down after the job 
finishes. The key factor is how quickly the JM process completes
 its shutdown:

- On larger machines: The JM shutdown completes faster, the container exits, 
the pod becomes NotReady, and the operator's next
reconciliation cycle skips the observer and processes the suspend. The bug is 
masked by lucky timing.
- On smaller/resource-constrained machines: CPU contention slows the JM 
shutdown. The JM process remains alive for minutes (observed:
container still Running with 0 restarts well after the job reported FINISHED). 
The pod stays Ready, so the observer runs every cycle and
 keeps throwing. The bug is triggered reliably.

h2. Steps to Reproduce

Create a FlinkDeployment with {{upgradeMode: stateless}} and no persistent 
checkpoint storage configured (or using default in-memory
checkpoint storage)

Submit a bounded Flink job (e.g., a SELECT query on a bounded source that 
produces a finite number of rows)

Wait for the job to reach {{FINISHED}} state

Update the FlinkDeployment spec to set {{state: suspended}} (or any other spec 
change)

Observe that:

#* {{generation}} increments but {{observedGeneration}} stays at its previous 
value
#* The operator logs show {{CHECKPOINTNOTFOUND}} warnings repeating every ~15 
seconds
#* The FlinkDeployment pods are never cleaned up
#* The spec change is never reconciled

This reproduces most reliably on resource-constrained machines where the JM 
shutdown is slow. On larger machines, the JM may exit
quickly enough that the operator accidentally bypasses the bug.

h2. Expected Behavior

When a job has FINISHED and a spec change (especially {{state: suspended}}) is 
requested, the operator should process the spec change
regardless of the checkpoint state. A FINISHED job with {{upgradeMode: 
stateless}} does not need an externally addressable checkpoint to
 be suspended or cleaned up.

h2. Observed Behavior

The operator enters an infinite loop of {{UpgradeFailureException}} warnings 
and never reconciles the spec change. The FlinkDeployment
becomes stuck with {{observedGeneration}} < {{generation}} permanently.

h2. Impact

- FlinkDeployments for bounded/batch jobs with non-persistent checkpoints 
become unmanageable after job completion
- Resources (pods, services) are never cleaned up










 When a Flink job completes (reaches FINISHED state) and its latest checkpoint 
is not externally addressable (e.g., the job used
  in-memory or non-persistent checkpoint storage), the operator enters an 
infinite error loop that prevents any spec changes from being
  reconciled — including state: suspended.

  This means that once a bounded job finishes with non-persistent checkpoints, 
the FlinkDeployment becomes effectively unmanageable by the
   operator. The observedGeneration never advances to match generation, and any 
spec updates (suspend, delete, config changes) are
  silently ignored.

  Root Cause

  The issue is in FlinkDeploymentController.reconcile(). The observer runs 
before the reconciler in the same try block:

  // FlinkDeploymentController.java
  try {
      observerFactory.getOrCreate(flinkApp).observe(ctx);       // (1) throws
      StatusRecorder.updateStatusImmediately(...);
      if (!validateDeployment(ctx)) { ... }
      reconcilerFactory.getOrCreate(flinkApp).reconcile(ctx);   // (2) never 
reached
  } catch (UpgradeFailureException ufe) {
      ReconciliationUtils.updateForReconciliationError(ctx, ufe);
      triggerErrorEvent(ctx, ufe, ufe.getReason());
  }

  The call chain that throws:

  1. AbstractFlinkDeploymentObserver.observeInternal() → observeFlinkCluster()
  2. ApplicationObserver.observeFlinkCluster() → 
savepointObserver.observeSavepointStatus()
  3. SnapshotObserver.observeSavepointStatus() — for terminal jobs, calls 
observeLatestCheckpoint()
  4. observeLatestCheckpoint() → flinkService.getLastCheckpoint()
  5. AbstractFlinkService.getLastCheckpoint() — throws 
UpgradeFailureException("Latest checkpoint not externally addressable, Manual
  restore required.", "CheckpointNotFound") when the checkpoint's external 
pointer equals
  NonPersistentMetadataCheckpointStorageLocation.EXTERNAL_POINTER

  Because the exception is thrown in step (1), the reconciler at step (2) never 
executes. This repeats every reconciliation cycle (~15
  seconds), indefinitely. The operator emits a CHECKPOINTNOTFOUND warning event 
each cycle but never processes the pending spec change.

  This bug is deterministic once the conditions are met, but it may appear 
intermittent depending on hardware resources.

  The only way the reconciler can bypass the broken observer path is when 
isJmDeploymentReady() returns false — i.e., the JM pod is not
  ready. In application mode, the JM process shuts down after the job finishes. 
The key factor is how quickly the JM process completes its
   shutdown:

  - On larger machines: The JM shutdown completes faster, the container exits, 
the pod becomes NotReady, and the operator's next
  reconciliation cycle skips the observer and processes the suspend. The bug is 
masked by lucky timing.
  - On smaller/resource-constrained machines: CPU contention slows the JM 
shutdown. The JM process remains alive for minutes (observed:
  container still Running with 0 restarts well after the job reported 
FINISHED). The pod stays Ready, so the observer runs every cycle and
   keeps throwing. The bug is triggered reliably.

  Steps to Reproduce

  1. Create a FlinkDeployment with upgradeMode: stateless and no persistent 
checkpoint storage configured (or using default in-memory
  checkpoint storage)
  2. Submit a bounded Flink job (e.g., a SELECT query on a bounded source that 
produces a finite number of rows)
  3. Wait for the job to reach FINISHED state
  4. Update the FlinkDeployment spec to set state: suspended (or any other spec 
change)
  5. Observe that:
    - generation increments but observedGeneration stays at its previous value
    - The operator logs show CHECKPOINTNOTFOUND warnings repeating every ~15 
seconds
    - The FlinkDeployment pods are never cleaned up
    - The spec change is never reconciled

  This reproduces most reliably on resource-constrained machines where the JM 
shutdown is slow. On larger machines, the JM may exit
  quickly enough that the operator accidentally bypasses the bug.

  Expected Behavior

  When a job has FINISHED and a spec change (especially state: suspended) is 
requested, the operator should process the spec change
  regardless of the checkpoint state. A FINISHED job with upgradeMode: 
stateless does not need an externally addressable checkpoint to be
  suspended or cleaned up.

  Observed Behavior

  The operator enters an infinite loop of UpgradeFailureException warnings and 
never reconciles the spec change. The FlinkDeployment
  becomes stuck with observedGeneration < generation permanently.

  Impact

  - FlinkDeployments for bounded/batch jobs with non-persistent checkpoints 
become unmanageable after job completion
  - Resources (pods, services) are never cleaned up




> UpgradeFailureException from SnapshotObserver blocks reconciliation of spec 
> changes for FINISHED batch jobs
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39043
>                 URL: https://issues.apache.org/jira/browse/FLINK-39043
>             Project: Flink
>          Issue Type: Bug
>          Components: Kubernetes Operator
>            Reporter: Robert Metzger
>            Priority: Major
>
> I have a batch FlinkDeployment with the following status:
> {code:java}
>   status:
>     clusterInfo:
>       flink-revision: 38f2688 @ 2026-01-15T15:50:15+01:00
>       flink-version: 0.0119-88-cp
>       state-size: "1940"
>       total-cpu: "1.0"
>       total-memory: "2147483648"
>     error: 
> '{"type":"org.apache.flink.kubernetes.operator.exception.UpgradeFailureException","message":"Latest
>       checkpoint not externally addressable, Manual restore 
> required.","additionalMetadata":{},"throwableList":[]}'
>     jobManagerDeploymentStatus: READY
>     jobStatus:
>       checkpointInfo:
>         lastPeriodicCheckpointTimestamp: 0
>       jobId: d7711dd6e599d917195e0b62feb51635
>       jobName: select-2
>       savepointInfo:
>         lastPeriodicSavepointTimestamp: 0
>         savepointHistory: []
>       startTime: "1770322034225"
>       state: FINISHED
>       updateTime: "1770322169548"
>     lifecycleState: STABLE
>     observedGeneration: 1
>     reconciliationStatus:
>       lastReconciledSpec: '..'
>       reconciliationTimestamp: 1770321989373
>       state: DEPLOYED
>     taskManager:
>       labelSelector: component=taskmanager,app=select-2
>       replicas: 1
> {code}
> -------
> Beware, below text is Claude Opus 4.6 generated, but has been checked and 
> seems plausible:
> h2. Description
> When a Flink job completes (reaches FINISHED state) and its latest checkpoint 
> is not externally addressable (e.g., the job used
> in-memory or non-persistent checkpoint storage), the operator enters an 
> infinite error loop that prevents any spec changes from being
> reconciled — including {{state: suspended}}.
> This means that once a bounded job finishes with non-persistent checkpoints, 
> the FlinkDeployment becomes effectively unmanageable by the
>  operator. The {{observedGeneration}} never advances to match {{generation}}, 
> and any spec updates (suspend, delete, config changes) are
>  silently ignored.
> h2. Root Cause
> The issue is in {{FlinkDeploymentController.reconcile()}}. The observer runs 
> before the reconciler in the same try block:
> {code:java}
> // FlinkDeploymentController.java, lines ~146-166
> try {
>     observerFactory.getOrCreate(flinkApp).observe(ctx);       // (1) throws
>     StatusRecorder.updateStatusImmediately(...);
>     if (!validateDeployment(ctx)) { ... }
>     reconcilerFactory.getOrCreate(flinkApp).reconcile(ctx);   // (2) never 
> reached
> } catch (UpgradeFailureException ufe) {
>     ReconciliationUtils.updateForReconciliationError(ctx, ufe);
>     triggerErrorEvent(ctx, ufe, ufe.getReason());
> }
> {code}
> The call chain that throws:
> {{AbstractFlinkDeploymentObserver.observeInternal()}} → 
> {{observeFlinkCluster()}}
> {{ApplicationObserver.observeFlinkCluster()}} → 
> {{savepointObserver.observeSavepointStatus()}}
> {{SnapshotObserver.observeSavepointStatus()}} — for terminal jobs, calls 
> {{observeLatestCheckpoint()}}
> {{observeLatestCheckpoint()}} → {{flinkService.getLastCheckpoint()}}
> {{AbstractFlinkService.getLastCheckpoint()}} — throws 
> {{UpgradeFailureException("Latest checkpoint not externally addressable, 
> Manual
> restore required.", "CheckpointNotFound")}} when the checkpoint's external 
> pointer equals
> {{NonPersistentMetadataCheckpointStorageLocation.EXTERNAL_POINTER}}
> Because the exception is thrown in step 1, the reconciler at step 2 never 
> executes. This repeats every reconciliation cycle (~15
> seconds), indefinitely. The operator emits a {{CHECKPOINTNOTFOUND}} warning 
> event each cycle but never processes the pending spec
> change.
> h2. Why Machine Size Affects Reproducibility
> This bug is deterministic once the conditions are met, but it may appear 
> intermittent depending on hardware resources.
> The only way the reconciler can bypass the broken observer path is when 
> {{isJmDeploymentReady()}} returns false — i.e., the JM pod is
> not ready. In application mode, the JM process shuts down after the job 
> finishes. The key factor is how quickly the JM process completes
>  its shutdown:
> - On larger machines: The JM shutdown completes faster, the container exits, 
> the pod becomes NotReady, and the operator's next
> reconciliation cycle skips the observer and processes the suspend. The bug is 
> masked by lucky timing.
> - On smaller/resource-constrained machines: CPU contention slows the JM 
> shutdown. The JM process remains alive for minutes (observed:
> container still Running with 0 restarts well after the job reported 
> FINISHED). The pod stays Ready, so the observer runs every cycle and
>  keeps throwing. The bug is triggered reliably.
> h2. Steps to Reproduce
> Create a FlinkDeployment with {{upgradeMode: stateless}} and no persistent 
> checkpoint storage configured (or using default in-memory
> checkpoint storage)
> Submit a bounded Flink job (e.g., a SELECT query on a bounded source that 
> produces a finite number of rows)
> Wait for the job to reach {{FINISHED}} state
> Update the FlinkDeployment spec to set {{state: suspended}} (or any other 
> spec change)
> Observe that:
> #* {{generation}} increments but {{observedGeneration}} stays at its previous 
> value
> #* The operator logs show {{CHECKPOINTNOTFOUND}} warnings repeating every ~15 
> seconds
> #* The FlinkDeployment pods are never cleaned up
> #* The spec change is never reconciled
> This reproduces most reliably on resource-constrained machines where the JM 
> shutdown is slow. On larger machines, the JM may exit
> quickly enough that the operator accidentally bypasses the bug.
> h2. Expected Behavior
> When a job has FINISHED and a spec change (especially {{state: suspended}}) 
> is requested, the operator should process the spec change
> regardless of the checkpoint state. A FINISHED job with {{upgradeMode: 
> stateless}} does not need an externally addressable checkpoint to
>  be suspended or cleaned up.
> h2. Observed Behavior
> The operator enters an infinite loop of {{UpgradeFailureException}} warnings 
> and never reconciles the spec change. The FlinkDeployment
> becomes stuck with {{observedGeneration}} < {{generation}} permanently.
> h2. Impact
> - FlinkDeployments for bounded/batch jobs with non-persistent checkpoints 
> become unmanageable after job completion
> - Resources (pods, services) are never cleaned up



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to