[
https://issues.apache.org/jira/browse/FLINK-39043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Robert Metzger updated FLINK-39043:
-----------------------------------
Description:
I have a batch FlinkDeployment with the following status:
{code:java}
status:
clusterInfo:
flink-revision: 38f2688 @ 2026-01-15T15:50:15+01:00
flink-version: 0.0119-88-cp
state-size: "1940"
total-cpu: "1.0"
total-memory: "2147483648"
error:
'{"type":"org.apache.flink.kubernetes.operator.exception.UpgradeFailureException","message":"Latest
checkpoint not externally addressable, Manual restore
required.","additionalMetadata":{},"throwableList":[]}'
jobManagerDeploymentStatus: READY
jobStatus:
checkpointInfo:
lastPeriodicCheckpointTimestamp: 0
jobId: d7711dd6e599d917195e0b62feb51635
jobName: select-2
savepointInfo:
lastPeriodicSavepointTimestamp: 0
savepointHistory: []
startTime: "1770322034225"
state: FINISHED
updateTime: "1770322169548"
lifecycleState: STABLE
observedGeneration: 1
reconciliationStatus:
lastReconciledSpec: '..'
reconciliationTimestamp: 1770321989373
state: DEPLOYED
taskManager:
labelSelector: component=taskmanager,app=select-2
replicas: 1
{code}
-------
Beware, below text is Claude Opus 4.6 generated, but has been checked and seems
plausible:
When a Flink job completes (reaches FINISHED state) and its latest checkpoint
is not externally addressable (e.g., the job used
in-memory or non-persistent checkpoint storage), the operator enters an
infinite error loop that prevents any spec changes from being
reconciled — including state: suspended.
This means that once a bounded job finishes with non-persistent checkpoints,
the FlinkDeployment becomes effectively unmanageable by the
operator. The observedGeneration never advances to match generation, and any
spec updates (suspend, delete, config changes) are
silently ignored.
Root Cause
The issue is in FlinkDeploymentController.reconcile(). The observer runs
before the reconciler in the same try block:
// FlinkDeploymentController.java
try {
observerFactory.getOrCreate(flinkApp).observe(ctx); // (1) throws
StatusRecorder.updateStatusImmediately(...);
if (!validateDeployment(ctx)) { ... }
reconcilerFactory.getOrCreate(flinkApp).reconcile(ctx); // (2) never
reached
} catch (UpgradeFailureException ufe) {
ReconciliationUtils.updateForReconciliationError(ctx, ufe);
triggerErrorEvent(ctx, ufe, ufe.getReason());
}
The call chain that throws:
1. AbstractFlinkDeploymentObserver.observeInternal() → observeFlinkCluster()
2. ApplicationObserver.observeFlinkCluster() →
savepointObserver.observeSavepointStatus()
3. SnapshotObserver.observeSavepointStatus() — for terminal jobs, calls
observeLatestCheckpoint()
4. observeLatestCheckpoint() → flinkService.getLastCheckpoint()
5. AbstractFlinkService.getLastCheckpoint() — throws
UpgradeFailureException("Latest checkpoint not externally addressable, Manual
restore required.", "CheckpointNotFound") when the checkpoint's external
pointer equals
NonPersistentMetadataCheckpointStorageLocation.EXTERNAL_POINTER
Because the exception is thrown in step (1), the reconciler at step (2) never
executes. This repeats every reconciliation cycle (~15
seconds), indefinitely. The operator emits a CHECKPOINTNOTFOUND warning event
each cycle but never processes the pending spec change.
This bug is deterministic once the conditions are met, but it may appear
intermittent depending on hardware resources.
The only way the reconciler can bypass the broken observer path is when
isJmDeploymentReady() returns false — i.e., the JM pod is not
ready. In application mode, the JM process shuts down after the job finishes.
The key factor is how quickly the JM process completes its
shutdown:
- On larger machines: The JM shutdown completes faster, the container exits,
the pod becomes NotReady, and the operator's next
reconciliation cycle skips the observer and processes the suspend. The bug is
masked by lucky timing.
- On smaller/resource-constrained machines: CPU contention slows the JM
shutdown. The JM process remains alive for minutes (observed:
container still Running with 0 restarts well after the job reported
FINISHED). The pod stays Ready, so the observer runs every cycle and
keeps throwing. The bug is triggered reliably.
Steps to Reproduce
1. Create a FlinkDeployment with upgradeMode: stateless and no persistent
checkpoint storage configured (or using default in-memory
checkpoint storage)
2. Submit a bounded Flink job (e.g., a SELECT query on a bounded source that
produces a finite number of rows)
3. Wait for the job to reach FINISHED state
4. Update the FlinkDeployment spec to set state: suspended (or any other spec
change)
5. Observe that:
- generation increments but observedGeneration stays at its previous value
- The operator logs show CHECKPOINTNOTFOUND warnings repeating every ~15
seconds
- The FlinkDeployment pods are never cleaned up
- The spec change is never reconciled
This reproduces most reliably on resource-constrained machines where the JM
shutdown is slow. On larger machines, the JM may exit
quickly enough that the operator accidentally bypasses the bug.
Expected Behavior
When a job has FINISHED and a spec change (especially state: suspended) is
requested, the operator should process the spec change
regardless of the checkpoint state. A FINISHED job with upgradeMode:
stateless does not need an externally addressable checkpoint to be
suspended or cleaned up.
Observed Behavior
The operator enters an infinite loop of UpgradeFailureException warnings and
never reconciles the spec change. The FlinkDeployment
becomes stuck with observedGeneration < generation permanently.
Impact
- FlinkDeployments for bounded/batch jobs with non-persistent checkpoints
become unmanageable after job completion
- Resources (pods, services) are never cleaned up
was:
I have a batch with the following status:
{code:java}
status:
clusterInfo:
flink-revision: 38f2688 @ 2026-01-15T15:50:15+01:00
flink-version: 0.0119-88-cp
state-size: "1940"
total-cpu: "1.0"
total-memory: "2147483648"
error:
'{"type":"org.apache.flink.kubernetes.operator.exception.UpgradeFailureException","message":"Latest
checkpoint not externally addressable, Manual restore
required.","additionalMetadata":{},"throwableList":[]}'
jobManagerDeploymentStatus: READY
jobStatus:
checkpointInfo:
lastPeriodicCheckpointTimestamp: 0
jobId: d7711dd6e599d917195e0b62feb51635
jobName: select-2
savepointInfo:
lastPeriodicSavepointTimestamp: 0
savepointHistory: []
startTime: "1770322034225"
state: FINISHED
updateTime: "1770322169548"
lifecycleState: STABLE
observedGeneration: 1
reconciliationStatus:
lastReconciledSpec: '..'
reconciliationTimestamp: 1770321989373
state: DEPLOYED
taskManager:
labelSelector: component=taskmanager,app=select-2
replicas: 1
{code}
Beware, below text is Claude Opus 4.6 generated, but has been checked and seems
plausible.
When a Flink job completes (reaches FINISHED state) and its latest checkpoint
is not externally addressable (e.g., the job used
in-memory or non-persistent checkpoint storage), the operator enters an
infinite error loop that prevents any spec changes from being
reconciled — including state: suspended.
This means that once a bounded job finishes with non-persistent checkpoints,
the FlinkDeployment becomes effectively unmanageable by the
operator. The observedGeneration never advances to match generation, and any
spec updates (suspend, delete, config changes) are
silently ignored.
Root Cause
The issue is in FlinkDeploymentController.reconcile(). The observer runs
before the reconciler in the same try block:
// FlinkDeploymentController.java
try {
observerFactory.getOrCreate(flinkApp).observe(ctx); // (1) throws
StatusRecorder.updateStatusImmediately(...);
if (!validateDeployment(ctx)) { ... }
reconcilerFactory.getOrCreate(flinkApp).reconcile(ctx); // (2) never
reached
} catch (UpgradeFailureException ufe) {
ReconciliationUtils.updateForReconciliationError(ctx, ufe);
triggerErrorEvent(ctx, ufe, ufe.getReason());
}
The call chain that throws:
1. AbstractFlinkDeploymentObserver.observeInternal() → observeFlinkCluster()
2. ApplicationObserver.observeFlinkCluster() →
savepointObserver.observeSavepointStatus()
3. SnapshotObserver.observeSavepointStatus() — for terminal jobs, calls
observeLatestCheckpoint()
4. observeLatestCheckpoint() → flinkService.getLastCheckpoint()
5. AbstractFlinkService.getLastCheckpoint() — throws
UpgradeFailureException("Latest checkpoint not externally addressable, Manual
restore required.", "CheckpointNotFound") when the checkpoint's external
pointer equals
NonPersistentMetadataCheckpointStorageLocation.EXTERNAL_POINTER
Because the exception is thrown in step (1), the reconciler at step (2) never
executes. This repeats every reconciliation cycle (~15
seconds), indefinitely. The operator emits a CHECKPOINTNOTFOUND warning event
each cycle but never processes the pending spec change.
This bug is deterministic once the conditions are met, but it may appear
intermittent depending on hardware resources.
The only way the reconciler can bypass the broken observer path is when
isJmDeploymentReady() returns false — i.e., the JM pod is not
ready. In application mode, the JM process shuts down after the job finishes.
The key factor is how quickly the JM process completes its
shutdown:
- On larger machines: The JM shutdown completes faster, the container exits,
the pod becomes NotReady, and the operator's next
reconciliation cycle skips the observer and processes the suspend. The bug is
masked by lucky timing.
- On smaller/resource-constrained machines: CPU contention slows the JM
shutdown. The JM process remains alive for minutes (observed:
container still Running with 0 restarts well after the job reported
FINISHED). The pod stays Ready, so the observer runs every cycle and
keeps throwing. The bug is triggered reliably.
Steps to Reproduce
1. Create a FlinkDeployment with upgradeMode: stateless and no persistent
checkpoint storage configured (or using default in-memory
checkpoint storage)
2. Submit a bounded Flink job (e.g., a SELECT query on a bounded source that
produces a finite number of rows)
3. Wait for the job to reach FINISHED state
4. Update the FlinkDeployment spec to set state: suspended (or any other spec
change)
5. Observe that:
- generation increments but observedGeneration stays at its previous value
- The operator logs show CHECKPOINTNOTFOUND warnings repeating every ~15
seconds
- The FlinkDeployment pods are never cleaned up
- The spec change is never reconciled
This reproduces most reliably on resource-constrained machines where the JM
shutdown is slow. On larger machines, the JM may exit
quickly enough that the operator accidentally bypasses the bug.
Expected Behavior
When a job has FINISHED and a spec change (especially state: suspended) is
requested, the operator should process the spec change
regardless of the checkpoint state. A FINISHED job with upgradeMode:
stateless does not need an externally addressable checkpoint to be
suspended or cleaned up.
Observed Behavior
The operator enters an infinite loop of UpgradeFailureException warnings and
never reconciles the spec change. The FlinkDeployment
becomes stuck with observedGeneration < generation permanently.
Impact
- FlinkDeployments for bounded/batch jobs with non-persistent checkpoints
become unmanageable after job completion
- Resources (pods, services) are never cleaned up
> UpgradeFailureException from SnapshotObserver blocks reconciliation of spec
> changes for FINISHED batch jobs
> -----------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39043
> URL: https://issues.apache.org/jira/browse/FLINK-39043
> Project: Flink
> Issue Type: Bug
> Components: Kubernetes Operator
> Reporter: Robert Metzger
> Priority: Major
>
> I have a batch FlinkDeployment with the following status:
> {code:java}
> status:
> clusterInfo:
> flink-revision: 38f2688 @ 2026-01-15T15:50:15+01:00
> flink-version: 0.0119-88-cp
> state-size: "1940"
> total-cpu: "1.0"
> total-memory: "2147483648"
> error:
> '{"type":"org.apache.flink.kubernetes.operator.exception.UpgradeFailureException","message":"Latest
> checkpoint not externally addressable, Manual restore
> required.","additionalMetadata":{},"throwableList":[]}'
> jobManagerDeploymentStatus: READY
> jobStatus:
> checkpointInfo:
> lastPeriodicCheckpointTimestamp: 0
> jobId: d7711dd6e599d917195e0b62feb51635
> jobName: select-2
> savepointInfo:
> lastPeriodicSavepointTimestamp: 0
> savepointHistory: []
> startTime: "1770322034225"
> state: FINISHED
> updateTime: "1770322169548"
> lifecycleState: STABLE
> observedGeneration: 1
> reconciliationStatus:
> lastReconciledSpec: '..'
> reconciliationTimestamp: 1770321989373
> state: DEPLOYED
> taskManager:
> labelSelector: component=taskmanager,app=select-2
> replicas: 1
> {code}
> -------
> Beware, below text is Claude Opus 4.6 generated, but has been checked and
> seems plausible:
> When a Flink job completes (reaches FINISHED state) and its latest
> checkpoint is not externally addressable (e.g., the job used
> in-memory or non-persistent checkpoint storage), the operator enters an
> infinite error loop that prevents any spec changes from being
> reconciled — including state: suspended.
> This means that once a bounded job finishes with non-persistent
> checkpoints, the FlinkDeployment becomes effectively unmanageable by the
> operator. The observedGeneration never advances to match generation, and
> any spec updates (suspend, delete, config changes) are
> silently ignored.
> Root Cause
> The issue is in FlinkDeploymentController.reconcile(). The observer runs
> before the reconciler in the same try block:
> // FlinkDeploymentController.java
> try {
> observerFactory.getOrCreate(flinkApp).observe(ctx); // (1) throws
> StatusRecorder.updateStatusImmediately(...);
> if (!validateDeployment(ctx)) { ... }
> reconcilerFactory.getOrCreate(flinkApp).reconcile(ctx); // (2) never
> reached
> } catch (UpgradeFailureException ufe) {
> ReconciliationUtils.updateForReconciliationError(ctx, ufe);
> triggerErrorEvent(ctx, ufe, ufe.getReason());
> }
> The call chain that throws:
> 1. AbstractFlinkDeploymentObserver.observeInternal() → observeFlinkCluster()
> 2. ApplicationObserver.observeFlinkCluster() →
> savepointObserver.observeSavepointStatus()
> 3. SnapshotObserver.observeSavepointStatus() — for terminal jobs, calls
> observeLatestCheckpoint()
> 4. observeLatestCheckpoint() → flinkService.getLastCheckpoint()
> 5. AbstractFlinkService.getLastCheckpoint() — throws
> UpgradeFailureException("Latest checkpoint not externally addressable, Manual
> restore required.", "CheckpointNotFound") when the checkpoint's external
> pointer equals
> NonPersistentMetadataCheckpointStorageLocation.EXTERNAL_POINTER
> Because the exception is thrown in step (1), the reconciler at step (2)
> never executes. This repeats every reconciliation cycle (~15
> seconds), indefinitely. The operator emits a CHECKPOINTNOTFOUND warning
> event each cycle but never processes the pending spec change.
> This bug is deterministic once the conditions are met, but it may appear
> intermittent depending on hardware resources.
> The only way the reconciler can bypass the broken observer path is when
> isJmDeploymentReady() returns false — i.e., the JM pod is not
> ready. In application mode, the JM process shuts down after the job
> finishes. The key factor is how quickly the JM process completes its
> shutdown:
> - On larger machines: The JM shutdown completes faster, the container
> exits, the pod becomes NotReady, and the operator's next
> reconciliation cycle skips the observer and processes the suspend. The bug
> is masked by lucky timing.
> - On smaller/resource-constrained machines: CPU contention slows the JM
> shutdown. The JM process remains alive for minutes (observed:
> container still Running with 0 restarts well after the job reported
> FINISHED). The pod stays Ready, so the observer runs every cycle and
> keeps throwing. The bug is triggered reliably.
> Steps to Reproduce
> 1. Create a FlinkDeployment with upgradeMode: stateless and no persistent
> checkpoint storage configured (or using default in-memory
> checkpoint storage)
> 2. Submit a bounded Flink job (e.g., a SELECT query on a bounded source
> that produces a finite number of rows)
> 3. Wait for the job to reach FINISHED state
> 4. Update the FlinkDeployment spec to set state: suspended (or any other
> spec change)
> 5. Observe that:
> - generation increments but observedGeneration stays at its previous value
> - The operator logs show CHECKPOINTNOTFOUND warnings repeating every ~15
> seconds
> - The FlinkDeployment pods are never cleaned up
> - The spec change is never reconciled
> This reproduces most reliably on resource-constrained machines where the JM
> shutdown is slow. On larger machines, the JM may exit
> quickly enough that the operator accidentally bypasses the bug.
> Expected Behavior
> When a job has FINISHED and a spec change (especially state: suspended) is
> requested, the operator should process the spec change
> regardless of the checkpoint state. A FINISHED job with upgradeMode:
> stateless does not need an externally addressable checkpoint to be
> suspended or cleaned up.
> Observed Behavior
> The operator enters an infinite loop of UpgradeFailureException warnings
> and never reconciles the spec change. The FlinkDeployment
> becomes stuck with observedGeneration < generation permanently.
> Impact
> - FlinkDeployments for bounded/batch jobs with non-persistent checkpoints
> become unmanageable after job completion
> - Resources (pods, services) are never cleaned up
--
This message was sent by Atlassian Jira
(v8.20.10#820010)