[
https://issues.apache.org/jira/browse/FLINK-38476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Weiqing Yang closed FLINK-38476.
--------------------------------
Resolution: Duplicate
> Add FINISHED watermark status to support proper watermark aggregation
> ---------------------------------------------------------------------
>
> Key: FLINK-38476
> URL: https://issues.apache.org/jira/browse/FLINK-38476
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Network
> Affects Versions: 1.16.3, 2.1.0, 1.20.3
> Reporter: Weiqing Yang
> Priority: Major
>
> *Environment / Preconditions*
> * Source: Kafka connector with dynamic partition discovery disabled (no
> split rescan after startup).
> * Input topic(s): total partition count = {*}N{*}.
> * Job parallelism = {*}P{*}, where *P > N* (some source subtasks start with
> no assigned partitions).
> * Idle detection configured (e.g., {{{}table.exec.source.idle-timeout =
> 10s{}}}).
> * Downstream uses event-time semantics (e.g., {{{}IntervalJoin{}}},
> time-bounded aggregations, windows).{*}{*}
> *Problem Summary*
> * When *P > N* and Kafka partition discovery is disabled, some source
> subtasks start with no splits and {*}finish immediately{*}. Finished subtasks
> emit a watermark of {{Long.MAX_VALUE}} but are *not excluded* from watermark
> aggregation. If remaining active subtasks later go *IDLE* (e.g., during
> lulls/backpressure and idle-timeout expiry), the only “non-idle” watermark
> seen downstream becomes {{{}Long.MAX_VALUE{}}}, which advances operator
> watermarks to infinity. That causes {*}all subsequent records to be treated
> as late{*}, timers to clean up state, and effectively *no output* from
> time-aware operators.
> *Detailed Behavior*
> * Event-time progress for an operator is the *minimum* watermark across its
> *non-idle* inputs.
> * Finished subtasks today:
> ** set watermark to {{{}Long.MAX_VALUE{}}},
> ** but are *not* marked idle / excluded from aggregation.
> * Failure condition:
> ## Some subtasks finish at startup (no splits).
> ## Later, active subtasks go *IDLE* due to
> {{{}table.exec.source.idle-timeout{}}}.
> ## Aggregation sees only the finished subtasks as “non-idle” → min watermark
> = {{{}Long.MAX_VALUE{}}}.
> ## Downstream (e.g., {{{}IntervalJoin{}}}) advances its operator watermark
> to {{{}Long.MAX_VALUE{}}}.
> ## All incoming records are “late,” time-bounded join/aggregate can’t match,
> cleanup timers fire → {*}zero output{*}.
> *Why it reproduces with P > N*
> * With {*}P > N{*}, at least *P − N* source subtasks receive no splits and
> finish immediately.
> * With {*}P == N{*}, each subtask has a split, so the “finished watermark
> dominates” condition does not arise at startup.
> *Steps to Reproduce (minimal)*
> # Create a Kafka topic with *N* partitions. Disable connector’s dynamic
> partition discovery.
> # Launch a Flink job with *P > N* source parallelism (e.g., a simple
> pipeline: KafkaSource → {{IntervalJoin}} or time-windowed op → sink).
> # Configure idle detection (e.g., {{{}table.exec.source.idle-timeout =
> 10s{}}}).
> # Start with some traffic, then pause/slow it enough that active subtasks
> trip idle timeout.
> # Observe downstream operator watermark jump to {{{}Long.MAX_VALUE{}}},
> records subsequently dropped as late, no output emitted.
> *Expected Result*
> Finished inputs should be *excluded* from aggregated watermark progression
> (behave like “non-contributing” channels) until *all* inputs are finished.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)