[ 
https://issues.apache.org/jira/browse/FLINK-39043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-39043:
-----------------------------------
    Affects Version/s: kubernetes-operator-1.13.0

> UpgradeFailureException from SnapshotObserver blocks reconciliation of spec 
> changes for FINISHED batch jobs
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39043
>                 URL: https://issues.apache.org/jira/browse/FLINK-39043
>             Project: Flink
>          Issue Type: Bug
>          Components: Kubernetes Operator
>    Affects Versions: kubernetes-operator-1.13.0
>            Reporter: Robert Metzger
>            Priority: Major
>
> I have a batch FlinkDeployment with the following status:
> {code:java}
>   status:
>     clusterInfo:
>       flink-revision: 38f2688 @ 2026-01-15T15:50:15+01:00
>       flink-version: 0.0119-88-cp
>       state-size: "1940"
>       total-cpu: "1.0"
>       total-memory: "2147483648"
>     error: 
> '{"type":"org.apache.flink.kubernetes.operator.exception.UpgradeFailureException","message":"Latest
>       checkpoint not externally addressable, Manual restore 
> required.","additionalMetadata":{},"throwableList":[]}'
>     jobManagerDeploymentStatus: READY
>     jobStatus:
>       checkpointInfo:
>         lastPeriodicCheckpointTimestamp: 0
>       jobId: d7711dd6e599d917195e0b62feb51635
>       jobName: select-2
>       savepointInfo:
>         lastPeriodicSavepointTimestamp: 0
>         savepointHistory: []
>       startTime: "1770322034225"
>       state: FINISHED
>       updateTime: "1770322169548"
>     lifecycleState: STABLE
>     observedGeneration: 1
>     reconciliationStatus:
>       lastReconciledSpec: '..'
>       reconciliationTimestamp: 1770321989373
>       state: DEPLOYED
>     taskManager:
>       labelSelector: component=taskmanager,app=select-2
>       replicas: 1
> {code}
> -------
> Beware, below text is Claude Opus 4.6 generated, but has been checked and 
> seems plausible:
> h2. Description
> When a Flink job completes (reaches FINISHED state) and its latest checkpoint 
> is not externally addressable (e.g., the job used
> in-memory or non-persistent checkpoint storage), the operator enters an 
> infinite error loop that prevents any spec changes from being
> reconciled — including {{state: suspended}}.
> This means that once a bounded job finishes with non-persistent checkpoints, 
> the FlinkDeployment becomes effectively unmanageable by the
>  operator. The {{observedGeneration}} never advances to match {{generation}}, 
> and any spec updates (suspend, delete, config changes) are
>  silently ignored.
> h2. Root Cause
> The issue is in {{FlinkDeploymentController.reconcile()}}. The observer runs 
> before the reconciler in the same try block:
> {code:java}
> // FlinkDeploymentController.java, lines ~146-166
> try {
>     observerFactory.getOrCreate(flinkApp).observe(ctx);       // (1) throws
>     StatusRecorder.updateStatusImmediately(...);
>     if (!validateDeployment(ctx)) { ... }
>     reconcilerFactory.getOrCreate(flinkApp).reconcile(ctx);   // (2) never 
> reached
> } catch (UpgradeFailureException ufe) {
>     ReconciliationUtils.updateForReconciliationError(ctx, ufe);
>     triggerErrorEvent(ctx, ufe, ufe.getReason());
> }
> {code}
> The call chain that throws:
> {{AbstractFlinkDeploymentObserver.observeInternal()}} → 
> {{observeFlinkCluster()}}
> {{ApplicationObserver.observeFlinkCluster()}} → 
> {{savepointObserver.observeSavepointStatus()}}
> {{SnapshotObserver.observeSavepointStatus()}} — for terminal jobs, calls 
> {{observeLatestCheckpoint()}}
> {{observeLatestCheckpoint()}} → {{flinkService.getLastCheckpoint()}}
> {{AbstractFlinkService.getLastCheckpoint()}} — throws 
> {{UpgradeFailureException("Latest checkpoint not externally addressable, 
> Manual
> restore required.", "CheckpointNotFound")}} when the checkpoint's external 
> pointer equals
> {{NonPersistentMetadataCheckpointStorageLocation.EXTERNAL_POINTER}}
> Because the exception is thrown in step 1, the reconciler at step 2 never 
> executes. This repeats every reconciliation cycle (~15
> seconds), indefinitely. The operator emits a {{CHECKPOINTNOTFOUND}} warning 
> event each cycle but never processes the pending spec
> change.
> h2. Why Machine Size Affects Reproducibility
> This bug is deterministic once the conditions are met, but it may appear 
> intermittent depending on hardware resources.
> The only way the reconciler can bypass the broken observer path is when 
> {{isJmDeploymentReady()}} returns false — i.e., the JM pod is
> not ready. In application mode, the JM process shuts down after the job 
> finishes. The key factor is how quickly the JM process completes
>  its shutdown:
> - On larger machines: The JM shutdown completes faster, the container exits, 
> the pod becomes NotReady, and the operator's next
> reconciliation cycle skips the observer and processes the suspend. The bug is 
> masked by lucky timing.
> - On smaller/resource-constrained machines: CPU contention slows the JM 
> shutdown. The JM process remains alive for minutes (observed:
> container still Running with 0 restarts well after the job reported 
> FINISHED). The pod stays Ready, so the observer runs every cycle and
>  keeps throwing. The bug is triggered reliably.
> h2. Steps to Reproduce
> Create a FlinkDeployment with {{upgradeMode: stateless}} and no persistent 
> checkpoint storage configured (or using default in-memory
> checkpoint storage)
> Submit a bounded Flink job (e.g., a SELECT query on a bounded source that 
> produces a finite number of rows)
> Wait for the job to reach {{FINISHED}} state
> Update the FlinkDeployment spec to set {{state: suspended}} (or any other 
> spec change)
> Observe that:
> #* {{generation}} increments but {{observedGeneration}} stays at its previous 
> value
> #* The operator logs show {{CHECKPOINTNOTFOUND}} warnings repeating every ~15 
> seconds
> #* The FlinkDeployment pods are never cleaned up
> #* The spec change is never reconciled
> This reproduces most reliably on resource-constrained machines where the JM 
> shutdown is slow. On larger machines, the JM may exit
> quickly enough that the operator accidentally bypasses the bug.
> h2. Expected Behavior
> When a job has FINISHED and a spec change (especially {{state: suspended}}) 
> is requested, the operator should process the spec change
> regardless of the checkpoint state. A FINISHED job with {{upgradeMode: 
> stateless}} does not need an externally addressable checkpoint to
>  be suspended or cleaned up.
> h2. Observed Behavior
> The operator enters an infinite loop of {{UpgradeFailureException}} warnings 
> and never reconciles the spec change. The FlinkDeployment
> becomes stuck with {{observedGeneration}} < {{generation}} permanently.
> h2. Impact
> - FlinkDeployments for bounded/batch jobs with non-persistent checkpoints 
> become unmanageable after job completion
> - Resources (pods, services) are never cleaned up



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to