[
https://issues.apache.org/jira/browse/FLINK-39043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Robert Metzger updated FLINK-39043:
-----------------------------------
Affects Version/s: kubernetes-operator-1.13.0
> UpgradeFailureException from SnapshotObserver blocks reconciliation of spec
> changes for FINISHED batch jobs
> -----------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39043
> URL: https://issues.apache.org/jira/browse/FLINK-39043
> Project: Flink
> Issue Type: Bug
> Components: Kubernetes Operator
> Affects Versions: kubernetes-operator-1.13.0
> Reporter: Robert Metzger
> Priority: Major
>
> I have a batch FlinkDeployment with the following status:
> {code:java}
> status:
> clusterInfo:
> flink-revision: 38f2688 @ 2026-01-15T15:50:15+01:00
> flink-version: 0.0119-88-cp
> state-size: "1940"
> total-cpu: "1.0"
> total-memory: "2147483648"
> error:
> '{"type":"org.apache.flink.kubernetes.operator.exception.UpgradeFailureException","message":"Latest
> checkpoint not externally addressable, Manual restore
> required.","additionalMetadata":{},"throwableList":[]}'
> jobManagerDeploymentStatus: READY
> jobStatus:
> checkpointInfo:
> lastPeriodicCheckpointTimestamp: 0
> jobId: d7711dd6e599d917195e0b62feb51635
> jobName: select-2
> savepointInfo:
> lastPeriodicSavepointTimestamp: 0
> savepointHistory: []
> startTime: "1770322034225"
> state: FINISHED
> updateTime: "1770322169548"
> lifecycleState: STABLE
> observedGeneration: 1
> reconciliationStatus:
> lastReconciledSpec: '..'
> reconciliationTimestamp: 1770321989373
> state: DEPLOYED
> taskManager:
> labelSelector: component=taskmanager,app=select-2
> replicas: 1
> {code}
> -------
> Beware, below text is Claude Opus 4.6 generated, but has been checked and
> seems plausible:
> h2. Description
> When a Flink job completes (reaches FINISHED state) and its latest checkpoint
> is not externally addressable (e.g., the job used
> in-memory or non-persistent checkpoint storage), the operator enters an
> infinite error loop that prevents any spec changes from being
> reconciled — including {{state: suspended}}.
> This means that once a bounded job finishes with non-persistent checkpoints,
> the FlinkDeployment becomes effectively unmanageable by the
> operator. The {{observedGeneration}} never advances to match {{generation}},
> and any spec updates (suspend, delete, config changes) are
> silently ignored.
> h2. Root Cause
> The issue is in {{FlinkDeploymentController.reconcile()}}. The observer runs
> before the reconciler in the same try block:
> {code:java}
> // FlinkDeploymentController.java, lines ~146-166
> try {
> observerFactory.getOrCreate(flinkApp).observe(ctx); // (1) throws
> StatusRecorder.updateStatusImmediately(...);
> if (!validateDeployment(ctx)) { ... }
> reconcilerFactory.getOrCreate(flinkApp).reconcile(ctx); // (2) never
> reached
> } catch (UpgradeFailureException ufe) {
> ReconciliationUtils.updateForReconciliationError(ctx, ufe);
> triggerErrorEvent(ctx, ufe, ufe.getReason());
> }
> {code}
> The call chain that throws:
> {{AbstractFlinkDeploymentObserver.observeInternal()}} →
> {{observeFlinkCluster()}}
> {{ApplicationObserver.observeFlinkCluster()}} →
> {{savepointObserver.observeSavepointStatus()}}
> {{SnapshotObserver.observeSavepointStatus()}} — for terminal jobs, calls
> {{observeLatestCheckpoint()}}
> {{observeLatestCheckpoint()}} → {{flinkService.getLastCheckpoint()}}
> {{AbstractFlinkService.getLastCheckpoint()}} — throws
> {{UpgradeFailureException("Latest checkpoint not externally addressable,
> Manual
> restore required.", "CheckpointNotFound")}} when the checkpoint's external
> pointer equals
> {{NonPersistentMetadataCheckpointStorageLocation.EXTERNAL_POINTER}}
> Because the exception is thrown in step 1, the reconciler at step 2 never
> executes. This repeats every reconciliation cycle (~15
> seconds), indefinitely. The operator emits a {{CHECKPOINTNOTFOUND}} warning
> event each cycle but never processes the pending spec
> change.
> h2. Why Machine Size Affects Reproducibility
> This bug is deterministic once the conditions are met, but it may appear
> intermittent depending on hardware resources.
> The only way the reconciler can bypass the broken observer path is when
> {{isJmDeploymentReady()}} returns false — i.e., the JM pod is
> not ready. In application mode, the JM process shuts down after the job
> finishes. The key factor is how quickly the JM process completes
> its shutdown:
> - On larger machines: The JM shutdown completes faster, the container exits,
> the pod becomes NotReady, and the operator's next
> reconciliation cycle skips the observer and processes the suspend. The bug is
> masked by lucky timing.
> - On smaller/resource-constrained machines: CPU contention slows the JM
> shutdown. The JM process remains alive for minutes (observed:
> container still Running with 0 restarts well after the job reported
> FINISHED). The pod stays Ready, so the observer runs every cycle and
> keeps throwing. The bug is triggered reliably.
> h2. Steps to Reproduce
> Create a FlinkDeployment with {{upgradeMode: stateless}} and no persistent
> checkpoint storage configured (or using default in-memory
> checkpoint storage)
> Submit a bounded Flink job (e.g., a SELECT query on a bounded source that
> produces a finite number of rows)
> Wait for the job to reach {{FINISHED}} state
> Update the FlinkDeployment spec to set {{state: suspended}} (or any other
> spec change)
> Observe that:
> #* {{generation}} increments but {{observedGeneration}} stays at its previous
> value
> #* The operator logs show {{CHECKPOINTNOTFOUND}} warnings repeating every ~15
> seconds
> #* The FlinkDeployment pods are never cleaned up
> #* The spec change is never reconciled
> This reproduces most reliably on resource-constrained machines where the JM
> shutdown is slow. On larger machines, the JM may exit
> quickly enough that the operator accidentally bypasses the bug.
> h2. Expected Behavior
> When a job has FINISHED and a spec change (especially {{state: suspended}})
> is requested, the operator should process the spec change
> regardless of the checkpoint state. A FINISHED job with {{upgradeMode:
> stateless}} does not need an externally addressable checkpoint to
> be suspended or cleaned up.
> h2. Observed Behavior
> The operator enters an infinite loop of {{UpgradeFailureException}} warnings
> and never reconciles the spec change. The FlinkDeployment
> becomes stuck with {{observedGeneration}} < {{generation}} permanently.
> h2. Impact
> - FlinkDeployments for bounded/batch jobs with non-persistent checkpoints
> become unmanageable after job completion
> - Resources (pods, services) are never cleaned up
--
This message was sent by Atlassian Jira
(v8.20.10#820010)