[ 
https://issues.apache.org/jira/browse/FLINK-39043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-39043:
-----------------------------------
    Description: 
I have a batch FlinkDeployment with the following status:


{code:java}
  status:
    clusterInfo:
      flink-revision: 38f2688 @ 2026-01-15T15:50:15+01:00
      flink-version: 0.0119-88-cp
      state-size: "1940"
      total-cpu: "1.0"
      total-memory: "2147483648"
    error: 
'{"type":"org.apache.flink.kubernetes.operator.exception.UpgradeFailureException","message":"Latest
      checkpoint not externally addressable, Manual restore 
required.","additionalMetadata":{},"throwableList":[]}'
    jobManagerDeploymentStatus: READY
    jobStatus:
      checkpointInfo:
        lastPeriodicCheckpointTimestamp: 0
      jobId: d7711dd6e599d917195e0b62feb51635
      jobName: select-2
      savepointInfo:
        lastPeriodicSavepointTimestamp: 0
        savepointHistory: []
      startTime: "1770322034225"
      state: FINISHED
      updateTime: "1770322169548"
    lifecycleState: STABLE
    observedGeneration: 1
    reconciliationStatus:
      lastReconciledSpec: '..'
      reconciliationTimestamp: 1770321989373
      state: DEPLOYED
    taskManager:
      labelSelector: component=taskmanager,app=select-2
      replicas: 1
{code}

-------

Beware, below text is Claude Opus 4.6 generated, but has been checked and seems 
plausible:

 When a Flink job completes (reaches FINISHED state) and its latest checkpoint 
is not externally addressable (e.g., the job used
  in-memory or non-persistent checkpoint storage), the operator enters an 
infinite error loop that prevents any spec changes from being
  reconciled — including state: suspended.

  This means that once a bounded job finishes with non-persistent checkpoints, 
the FlinkDeployment becomes effectively unmanageable by the
   operator. The observedGeneration never advances to match generation, and any 
spec updates (suspend, delete, config changes) are
  silently ignored.

  Root Cause

  The issue is in FlinkDeploymentController.reconcile(). The observer runs 
before the reconciler in the same try block:

  // FlinkDeploymentController.java
  try {
      observerFactory.getOrCreate(flinkApp).observe(ctx);       // (1) throws
      StatusRecorder.updateStatusImmediately(...);
      if (!validateDeployment(ctx)) { ... }
      reconcilerFactory.getOrCreate(flinkApp).reconcile(ctx);   // (2) never 
reached
  } catch (UpgradeFailureException ufe) {
      ReconciliationUtils.updateForReconciliationError(ctx, ufe);
      triggerErrorEvent(ctx, ufe, ufe.getReason());
  }

  The call chain that throws:

  1. AbstractFlinkDeploymentObserver.observeInternal() → observeFlinkCluster()
  2. ApplicationObserver.observeFlinkCluster() → 
savepointObserver.observeSavepointStatus()
  3. SnapshotObserver.observeSavepointStatus() — for terminal jobs, calls 
observeLatestCheckpoint()
  4. observeLatestCheckpoint() → flinkService.getLastCheckpoint()
  5. AbstractFlinkService.getLastCheckpoint() — throws 
UpgradeFailureException("Latest checkpoint not externally addressable, Manual
  restore required.", "CheckpointNotFound") when the checkpoint's external 
pointer equals
  NonPersistentMetadataCheckpointStorageLocation.EXTERNAL_POINTER

  Because the exception is thrown in step (1), the reconciler at step (2) never 
executes. This repeats every reconciliation cycle (~15
  seconds), indefinitely. The operator emits a CHECKPOINTNOTFOUND warning event 
each cycle but never processes the pending spec change.

  This bug is deterministic once the conditions are met, but it may appear 
intermittent depending on hardware resources.

  The only way the reconciler can bypass the broken observer path is when 
isJmDeploymentReady() returns false — i.e., the JM pod is not
  ready. In application mode, the JM process shuts down after the job finishes. 
The key factor is how quickly the JM process completes its
   shutdown:

  - On larger machines: The JM shutdown completes faster, the container exits, 
the pod becomes NotReady, and the operator's next
  reconciliation cycle skips the observer and processes the suspend. The bug is 
masked by lucky timing.
  - On smaller/resource-constrained machines: CPU contention slows the JM 
shutdown. The JM process remains alive for minutes (observed:
  container still Running with 0 restarts well after the job reported 
FINISHED). The pod stays Ready, so the observer runs every cycle and
   keeps throwing. The bug is triggered reliably.

  Steps to Reproduce

  1. Create a FlinkDeployment with upgradeMode: stateless and no persistent 
checkpoint storage configured (or using default in-memory
  checkpoint storage)
  2. Submit a bounded Flink job (e.g., a SELECT query on a bounded source that 
produces a finite number of rows)
  3. Wait for the job to reach FINISHED state
  4. Update the FlinkDeployment spec to set state: suspended (or any other spec 
change)
  5. Observe that:
    - generation increments but observedGeneration stays at its previous value
    - The operator logs show CHECKPOINTNOTFOUND warnings repeating every ~15 
seconds
    - The FlinkDeployment pods are never cleaned up
    - The spec change is never reconciled

  This reproduces most reliably on resource-constrained machines where the JM 
shutdown is slow. On larger machines, the JM may exit
  quickly enough that the operator accidentally bypasses the bug.

  Expected Behavior

  When a job has FINISHED and a spec change (especially state: suspended) is 
requested, the operator should process the spec change
  regardless of the checkpoint state. A FINISHED job with upgradeMode: 
stateless does not need an externally addressable checkpoint to be
  suspended or cleaned up.

  Observed Behavior

  The operator enters an infinite loop of UpgradeFailureException warnings and 
never reconciles the spec change. The FlinkDeployment
  becomes stuck with observedGeneration < generation permanently.

  Impact

  - FlinkDeployments for bounded/batch jobs with non-persistent checkpoints 
become unmanageable after job completion
  - Resources (pods, services) are never cleaned up



  was:
I have a batch with the following status:


{code:java}
  status:
    clusterInfo:
      flink-revision: 38f2688 @ 2026-01-15T15:50:15+01:00
      flink-version: 0.0119-88-cp
      state-size: "1940"
      total-cpu: "1.0"
      total-memory: "2147483648"
    error: 
'{"type":"org.apache.flink.kubernetes.operator.exception.UpgradeFailureException","message":"Latest
      checkpoint not externally addressable, Manual restore 
required.","additionalMetadata":{},"throwableList":[]}'
    jobManagerDeploymentStatus: READY
    jobStatus:
      checkpointInfo:
        lastPeriodicCheckpointTimestamp: 0
      jobId: d7711dd6e599d917195e0b62feb51635
      jobName: select-2
      savepointInfo:
        lastPeriodicSavepointTimestamp: 0
        savepointHistory: []
      startTime: "1770322034225"
      state: FINISHED
      updateTime: "1770322169548"
    lifecycleState: STABLE
    observedGeneration: 1
    reconciliationStatus:
      lastReconciledSpec: '..'
      reconciliationTimestamp: 1770321989373
      state: DEPLOYED
    taskManager:
      labelSelector: component=taskmanager,app=select-2
      replicas: 1
{code}

Beware, below text is Claude Opus 4.6 generated, but has been checked and seems 
plausible.

 When a Flink job completes (reaches FINISHED state) and its latest checkpoint 
is not externally addressable (e.g., the job used
  in-memory or non-persistent checkpoint storage), the operator enters an 
infinite error loop that prevents any spec changes from being
  reconciled — including state: suspended.

  This means that once a bounded job finishes with non-persistent checkpoints, 
the FlinkDeployment becomes effectively unmanageable by the
   operator. The observedGeneration never advances to match generation, and any 
spec updates (suspend, delete, config changes) are
  silently ignored.

  Root Cause

  The issue is in FlinkDeploymentController.reconcile(). The observer runs 
before the reconciler in the same try block:

  // FlinkDeploymentController.java
  try {
      observerFactory.getOrCreate(flinkApp).observe(ctx);       // (1) throws
      StatusRecorder.updateStatusImmediately(...);
      if (!validateDeployment(ctx)) { ... }
      reconcilerFactory.getOrCreate(flinkApp).reconcile(ctx);   // (2) never 
reached
  } catch (UpgradeFailureException ufe) {
      ReconciliationUtils.updateForReconciliationError(ctx, ufe);
      triggerErrorEvent(ctx, ufe, ufe.getReason());
  }

  The call chain that throws:

  1. AbstractFlinkDeploymentObserver.observeInternal() → observeFlinkCluster()
  2. ApplicationObserver.observeFlinkCluster() → 
savepointObserver.observeSavepointStatus()
  3. SnapshotObserver.observeSavepointStatus() — for terminal jobs, calls 
observeLatestCheckpoint()
  4. observeLatestCheckpoint() → flinkService.getLastCheckpoint()
  5. AbstractFlinkService.getLastCheckpoint() — throws 
UpgradeFailureException("Latest checkpoint not externally addressable, Manual
  restore required.", "CheckpointNotFound") when the checkpoint's external 
pointer equals
  NonPersistentMetadataCheckpointStorageLocation.EXTERNAL_POINTER

  Because the exception is thrown in step (1), the reconciler at step (2) never 
executes. This repeats every reconciliation cycle (~15
  seconds), indefinitely. The operator emits a CHECKPOINTNOTFOUND warning event 
each cycle but never processes the pending spec change.

  This bug is deterministic once the conditions are met, but it may appear 
intermittent depending on hardware resources.

  The only way the reconciler can bypass the broken observer path is when 
isJmDeploymentReady() returns false — i.e., the JM pod is not
  ready. In application mode, the JM process shuts down after the job finishes. 
The key factor is how quickly the JM process completes its
   shutdown:

  - On larger machines: The JM shutdown completes faster, the container exits, 
the pod becomes NotReady, and the operator's next
  reconciliation cycle skips the observer and processes the suspend. The bug is 
masked by lucky timing.
  - On smaller/resource-constrained machines: CPU contention slows the JM 
shutdown. The JM process remains alive for minutes (observed:
  container still Running with 0 restarts well after the job reported 
FINISHED). The pod stays Ready, so the observer runs every cycle and
   keeps throwing. The bug is triggered reliably.

  Steps to Reproduce

  1. Create a FlinkDeployment with upgradeMode: stateless and no persistent 
checkpoint storage configured (or using default in-memory
  checkpoint storage)
  2. Submit a bounded Flink job (e.g., a SELECT query on a bounded source that 
produces a finite number of rows)
  3. Wait for the job to reach FINISHED state
  4. Update the FlinkDeployment spec to set state: suspended (or any other spec 
change)
  5. Observe that:
    - generation increments but observedGeneration stays at its previous value
    - The operator logs show CHECKPOINTNOTFOUND warnings repeating every ~15 
seconds
    - The FlinkDeployment pods are never cleaned up
    - The spec change is never reconciled

  This reproduces most reliably on resource-constrained machines where the JM 
shutdown is slow. On larger machines, the JM may exit
  quickly enough that the operator accidentally bypasses the bug.

  Expected Behavior

  When a job has FINISHED and a spec change (especially state: suspended) is 
requested, the operator should process the spec change
  regardless of the checkpoint state. A FINISHED job with upgradeMode: 
stateless does not need an externally addressable checkpoint to be
  suspended or cleaned up.

  Observed Behavior

  The operator enters an infinite loop of UpgradeFailureException warnings and 
never reconciles the spec change. The FlinkDeployment
  becomes stuck with observedGeneration < generation permanently.

  Impact

  - FlinkDeployments for bounded/batch jobs with non-persistent checkpoints 
become unmanageable after job completion
  - Resources (pods, services) are never cleaned up




> UpgradeFailureException from SnapshotObserver blocks reconciliation of spec 
> changes for FINISHED batch jobs
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39043
>                 URL: https://issues.apache.org/jira/browse/FLINK-39043
>             Project: Flink
>          Issue Type: Bug
>          Components: Kubernetes Operator
>            Reporter: Robert Metzger
>            Priority: Major
>
> I have a batch FlinkDeployment with the following status:
> {code:java}
>   status:
>     clusterInfo:
>       flink-revision: 38f2688 @ 2026-01-15T15:50:15+01:00
>       flink-version: 0.0119-88-cp
>       state-size: "1940"
>       total-cpu: "1.0"
>       total-memory: "2147483648"
>     error: 
> '{"type":"org.apache.flink.kubernetes.operator.exception.UpgradeFailureException","message":"Latest
>       checkpoint not externally addressable, Manual restore 
> required.","additionalMetadata":{},"throwableList":[]}'
>     jobManagerDeploymentStatus: READY
>     jobStatus:
>       checkpointInfo:
>         lastPeriodicCheckpointTimestamp: 0
>       jobId: d7711dd6e599d917195e0b62feb51635
>       jobName: select-2
>       savepointInfo:
>         lastPeriodicSavepointTimestamp: 0
>         savepointHistory: []
>       startTime: "1770322034225"
>       state: FINISHED
>       updateTime: "1770322169548"
>     lifecycleState: STABLE
>     observedGeneration: 1
>     reconciliationStatus:
>       lastReconciledSpec: '..'
>       reconciliationTimestamp: 1770321989373
>       state: DEPLOYED
>     taskManager:
>       labelSelector: component=taskmanager,app=select-2
>       replicas: 1
> {code}
> -------
> Beware, below text is Claude Opus 4.6 generated, but has been checked and 
> seems plausible:
>  When a Flink job completes (reaches FINISHED state) and its latest 
> checkpoint is not externally addressable (e.g., the job used
>   in-memory or non-persistent checkpoint storage), the operator enters an 
> infinite error loop that prevents any spec changes from being
>   reconciled — including state: suspended.
>   This means that once a bounded job finishes with non-persistent 
> checkpoints, the FlinkDeployment becomes effectively unmanageable by the
>    operator. The observedGeneration never advances to match generation, and 
> any spec updates (suspend, delete, config changes) are
>   silently ignored.
>   Root Cause
>   The issue is in FlinkDeploymentController.reconcile(). The observer runs 
> before the reconciler in the same try block:
>   // FlinkDeploymentController.java
>   try {
>       observerFactory.getOrCreate(flinkApp).observe(ctx);       // (1) throws
>       StatusRecorder.updateStatusImmediately(...);
>       if (!validateDeployment(ctx)) { ... }
>       reconcilerFactory.getOrCreate(flinkApp).reconcile(ctx);   // (2) never 
> reached
>   } catch (UpgradeFailureException ufe) {
>       ReconciliationUtils.updateForReconciliationError(ctx, ufe);
>       triggerErrorEvent(ctx, ufe, ufe.getReason());
>   }
>   The call chain that throws:
>   1. AbstractFlinkDeploymentObserver.observeInternal() → observeFlinkCluster()
>   2. ApplicationObserver.observeFlinkCluster() → 
> savepointObserver.observeSavepointStatus()
>   3. SnapshotObserver.observeSavepointStatus() — for terminal jobs, calls 
> observeLatestCheckpoint()
>   4. observeLatestCheckpoint() → flinkService.getLastCheckpoint()
>   5. AbstractFlinkService.getLastCheckpoint() — throws 
> UpgradeFailureException("Latest checkpoint not externally addressable, Manual
>   restore required.", "CheckpointNotFound") when the checkpoint's external 
> pointer equals
>   NonPersistentMetadataCheckpointStorageLocation.EXTERNAL_POINTER
>   Because the exception is thrown in step (1), the reconciler at step (2) 
> never executes. This repeats every reconciliation cycle (~15
>   seconds), indefinitely. The operator emits a CHECKPOINTNOTFOUND warning 
> event each cycle but never processes the pending spec change.
>   This bug is deterministic once the conditions are met, but it may appear 
> intermittent depending on hardware resources.
>   The only way the reconciler can bypass the broken observer path is when 
> isJmDeploymentReady() returns false — i.e., the JM pod is not
>   ready. In application mode, the JM process shuts down after the job 
> finishes. The key factor is how quickly the JM process completes its
>    shutdown:
>   - On larger machines: The JM shutdown completes faster, the container 
> exits, the pod becomes NotReady, and the operator's next
>   reconciliation cycle skips the observer and processes the suspend. The bug 
> is masked by lucky timing.
>   - On smaller/resource-constrained machines: CPU contention slows the JM 
> shutdown. The JM process remains alive for minutes (observed:
>   container still Running with 0 restarts well after the job reported 
> FINISHED). The pod stays Ready, so the observer runs every cycle and
>    keeps throwing. The bug is triggered reliably.
>   Steps to Reproduce
>   1. Create a FlinkDeployment with upgradeMode: stateless and no persistent 
> checkpoint storage configured (or using default in-memory
>   checkpoint storage)
>   2. Submit a bounded Flink job (e.g., a SELECT query on a bounded source 
> that produces a finite number of rows)
>   3. Wait for the job to reach FINISHED state
>   4. Update the FlinkDeployment spec to set state: suspended (or any other 
> spec change)
>   5. Observe that:
>     - generation increments but observedGeneration stays at its previous value
>     - The operator logs show CHECKPOINTNOTFOUND warnings repeating every ~15 
> seconds
>     - The FlinkDeployment pods are never cleaned up
>     - The spec change is never reconciled
>   This reproduces most reliably on resource-constrained machines where the JM 
> shutdown is slow. On larger machines, the JM may exit
>   quickly enough that the operator accidentally bypasses the bug.
>   Expected Behavior
>   When a job has FINISHED and a spec change (especially state: suspended) is 
> requested, the operator should process the spec change
>   regardless of the checkpoint state. A FINISHED job with upgradeMode: 
> stateless does not need an externally addressable checkpoint to be
>   suspended or cleaned up.
>   Observed Behavior
>   The operator enters an infinite loop of UpgradeFailureException warnings 
> and never reconciles the spec change. The FlinkDeployment
>   becomes stuck with observedGeneration < generation permanently.
>   Impact
>   - FlinkDeployments for bounded/batch jobs with non-persistent checkpoints 
> become unmanageable after job completion
>   - Resources (pods, services) are never cleaned up



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to