[ 
https://issues.apache.org/jira/browse/FLINK-39586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39586:
-----------------------------------
    Labels: pull-request-available  (was: )

> CombinedWatermarkStatus.updateCombinedWatermark() fails to mark idle when 
> partialWatermarks is empty
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39586
>                 URL: https://issues.apache.org/jira/browse/FLINK-39586
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Chen Zhang
>            Priority: Minor
>              Labels: pull-request-available
>
> h2. Summary
> {{CombinedWatermarkStatus.updateCombinedWatermark()}} short-circuits with 
> {{return false}} when {{partialWatermarks}} is empty, without setting 
> {{this.idle = true}}. If any per-split output previously emitted a watermark 
> (setting {{idle = false}}), the idle flag becomes permanently stuck at 
> {{false}} after all per-split outputs are unregistered. This prevents 
> downstream watermark idle propagation.
> h2. Affected Versions
> Flink 1.x (all current 1.x releases). FLINK-38454 (fixed in 2.2.0) addresses 
> a related but *different* case — where outputs remain registered but go idle. 
> It does *not* fix the case where all outputs are *unregistered* (empty 
> {{partialWatermarks}}).
> h2. Steps to Reproduce
> This occurs with {{HybridSource}} (bounded → unbounded) using per-split 
> watermark generation:
> # Start a {{HybridSource}} with sourceIndex=0 (bounded, e.g. Iceberg) and 
> sourceIndex=1 (unbounded, e.g. Kafka), with a {{WatermarkStrategy}} that uses 
> per-split outputs.
> # Take a checkpoint while some subtasks still have active bounded-source 
> splits. During processing, {{WatermarkGenerator.onEvent()}} calls 
> {{output.emitWatermark()}}, which sets {{CombinedWatermarkStatus.idle = 
> false}} on those subtasks' {{splitLocalOutput}}.
> # Restore from that checkpoint. Subtasks re-process remaining bounded splits 
> and then finish the bounded phase.
> # Bounded splits complete. {{SourceReaderBase.releaseOutputForSplit()}} 
> removes all {{PartialWatermark}} entries from the 
> {{WatermarkOutputMultiplexer}}, making {{partialWatermarks}} empty.
> # Unbounded phase starts. Some subtasks (those with no partition assigned) 
> have no splits and should become idle. But they never do.
> h2. Root Cause
> The bug is in {{CombinedWatermarkStatus.updateCombinedWatermark()}}:
> {code:java}
> if (partialWatermarks.isEmpty()) {
>     return false;  // BUG: returns without setting this.idle = true
> }
> {code}
> When all per-split outputs are released (unregistered), {{partialWatermarks}} 
> becomes empty. The method returns {{false}} immediately, leaving the {{idle}} 
> field at whatever value it was previously set to. If any watermark was 
> emitted during the bounded phase, {{idle}} was set to {{false}} and remains 
> {{false}} permanently.
> h3. Consequence Chain
> # {{CombinedWatermarkStatus.isIdle()}} returns *false* (stuck)
> # {{WatermarkOutputMultiplexer}} never calls {{splitLocalOutput.markIdle()}}
> # {{splitLocalOutput.isIdle}} stays *false* (was set to false by 
> {{emitWatermark()}} during the bounded phase)
> # 
> {{ProgressiveTimestampsAndWatermarks.IdlenessManager.maybeMarkUnderlyingOutputAsIdle()}}
>  requires *both* {{splitLocalOutput.isIdle}} *and* {{mainOutput.isIdle}} to 
> be true — the check fails
> # {{WatermarkToDataOutput.markIdle()}} is never called — no 
> {{WatermarkStatus.IDLE}} sent downstream
> # Downstream {{StatusWatermarkValve}} waits indefinitely for watermark 
> advancement from those subtasks
> # *Global watermark stalls* at the last bounded-source-era watermark value
> h2. Evidence
> * *Observed behavior*: After task-level recovery from a checkpoint taken 
> during the bounded phase, watermark progression stalls permanently once all 
> subtasks switch to the unbounded source. Watermark stays stuck at 
> approximately {{switchPoint - maxOutOfOrderness}}.
> * *JM restart "fixes" it*: A JobManager restart loads a later checkpoint 
> (taken after the source switch). Subtasks restore with empty reader state — 
> no bounded-source per-split outputs are created — {{CombinedWatermarkStatus}} 
> never has {{idle}} set to {{false}} by {{emitWatermark()}} — 
> {{splitLocalOutput.isIdle}} stays at its initial {{true}} — idle propagation 
> works correctly. This confirms the root cause is stale {{idle = false}} state 
> carried over from the bounded phase.
> h2. Distinction from Related Issues
> ||Scenario||{{partialWatermarks}}||{{idle}} before||Bug triggered?||
> |FLINK-38477 (never had splits)|empty|{{true}} (initial)|No — {{idle}} is 
> already correct|
> |FLINK-38454 (splits go idle but stay registered)|non-empty, all 
> idle|{{false}}|Yes — fixed in 2.2.0|
> |*This bug* (splits unregistered after being active)|*empty* (was 
> non-empty)|*{{false}}* (set by {{emitWatermark}})|*Yes — not fixed by either*|
> h2. Expected Behavior
> When {{partialWatermarks}} becomes empty (all per-split outputs 
> unregistered), {{CombinedWatermarkStatus.updateCombinedWatermark()}} should 
> set {{this.idle = true}} before returning. An empty set of outputs is 
> semantically equivalent to "all outputs are idle."
> h2. Proposed Fix
> {code:java}
> if (partialWatermarks.isEmpty()) {
>     this.idle = true;  // No outputs registered - treat as idle
>     return false;
> }
> {code}
> This is consistent with the semantics: if there are no partial watermarks to 
> track, the combined status should be idle.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to