weiqingy opened a new pull request, #27083: URL: https://github.com/apache/flink/pull/27083
## What is the purpose of the change This PR fixes a watermark aggregation bug that can cause zero output from time-based operators (e.g., IntervalJoin) when some upstream subtasks finish while others later become temporarily idle. Details of the issue are described in [FLINK-38477](https://issues.apache.org/jira/browse/FLINK-38477). **Root cause**: Flink currently allows finished subtasks to dominate watermark aggregation by emitting Long.MAX_VALUE instead of being excluded like idle inputs. This prematurely advances downstream watermarks to infinity (Long.MAX_VALUE), halting event-time progress and eliminating output. ## Brief change log **Goal:** Ensure a finished input is **excluded** from operator watermark aggregation, just like an idle input. This prevents `Long.MAX_VALUE` from dominating aggregation when some channels are finished and others are idle. ### Implementation - Introduced a **new watermark status: `FINISHED`**. - When an operator finishes, it emits: - `status = FINISHED` - `watermark = Long.MAX_VALUE` (atomically, in sync with the status). - Adjusted aggregation rules: - If there are **active channels** → `status = ACTIVE`, `watermark = min(all active channel watermarks)`. - Else if there are **idle channels** → `status = IDLE`, `watermark = max(all idle channel watermarks)`. - Else (**all channels finished**) → `status = FINISHED`, `watermark = Long.MAX_VALUE`. This ensures finished inputs are ignored during aggregation until every input has completed, at which point the operator watermark correctly advances to `Long.MAX_VALUE`. ### Core issues identified - **Current state model:** `ACTIVE` ←→ `IDLE` - Cannot distinguish between: - `IDLE` = “temporarily no data, may resume” - `FINISHED` = “permanently done, never resuming” - **Rejected approach:** treating finished as idle. - **Issue 1: IDLE status gets overwritten** - `StreamTask.endData()` emits `WatermarkStatus.IDLE` when a source completes. - Then `operatorChain.finishOperators()` calls `WatermarkAssignerOperator.processWatermark()`. - When `WatermarkAssignerOperator` receives `Long.MAX_VALUE`, it forcibly emits `WatermarkStatus.ACTIVE`. - Result: completed channels are incorrectly marked as ACTIVE, so the finished tasks still participate in watermark aggregation. - **Issue 2: Incorrect aggregation when all channels are IDLE** - Even if Issue 1 is fixed, when all channels become IDLE (including finished tasks whose watermark is `Long.MAX_VALUE`): - `StatusWatermarkValve` calls `findAndOutputMaxWatermarkAcrossAllChannels()`. - This still emits a `Long.MAX_VALUE` watermark. - It cannot distinguish between “all temporarily idle” vs. “all permanently finished.” **Conclusion:** - Introducing the explicit `FINISHED` status is necessary to create a **three-state system (ACTIVE / IDLE / FINISHED)** that properly distinguishes temporary idleness from permanent completion. ### Long-term refactoring (follow-up) This PR fixes the immediate bug, but the broader problem is **fragmented watermark lifecycle management**. #### Current problems - **Multiple, conflicting emission points:** For example: ``` StreamTask.endData(): ├─ advanceToEndOfEventTime() │ └─ emits Long.MAX_VALUE watermark │ └─ operatorChain.finishOperators(): ├─ ContinuousFileReaderOperator.finish() │ └─ emits Long.MAX_VALUE (deprecated?) │ ├─ WatermarkAssignerOperator.finish() │ ├─ emits Long.MAX_VALUE watermark │ └─ switches IDLE→ACTIVE if idleness enabled └─ ... ``` - Wrapper operators interfere with completion semantics. - Status and watermark emitted separately → race conditions. - Sources don’t fully control their own lifecycle. #### Proposed long-term direction - **Source-owned completion:** sources declare when they are active, idle, or finished, emitting status + watermark atomically. Wrapper operators should not override this. - **Embed watermark strategy in source, not wrapper.** - *Current Architecture:* ``` Source → WatermarkAssignerOperator (wrapper) → Downstream ↑ Adds watermark strategy ↑ Can conflict with source completion ``` - *Benefits:* - No wrapper operator to conflict with source status. - Source and watermark strategy have consistent lifecycle. - Table API can configure watermark strategy at source creation, not via wrapper. #### Key principles 1. **Single responsibility** - sources own completion, strategies own generation, runtime owns aggregation. 2. **Lifecycle ownership** - no overrides by wrappers. This refactoring would prevent the entire class of bugs we've been fixing and make watermark completion semantics clear and maintainable. ## Verifying this change ### Unit tests (new) - Mixed scenario: some channels `FINISHED`, others `IDLE` → no premature `Long.MAX_VALUE`. - `FINISHED` channels emit only `Long.MAX_VALUE`; any non-MAX ignored. - Aggregation rule selection verified: - Active → min(active watermarks). - Idle (with no active) → idle status; finished excluded. - All finished → operator watermark = `Long.MAX_VALUE`. - Emission ordering tested: status emitted **before** watermark. ### E2e scenario (manual/e2e) 1. Set job parallelism greater than Kafka partitions; disable dynamic partition discovery. 2. Some subtasks finish immediately, others keep running. 3. Stall the running subtasks until they become idle (`table.exec.source.idle-timeout=10s`). 4. **Before fix:** operator watermark = `Long.MAX_VALUE`, all records dropped → no output. **After fix:** watermark does not jump; resumed records are processed correctly. ## Does this pull request potentially affect one of the following parts: - Dependencies: **no** - Public API (`@Public(Evolving)`): **no** - Serializers: **no** - Runtime per-record code paths: **yes** (watermark aggregation; minimal guarded changes) - Deployment/recovery (JM, checkpointing, K8s/Yarn, ZooKeeper): **no** - S3 file system connector: **no** ## Documentation - New feature: **no** (bug fix only; introduces internal `FINISHED` watermark status) - Documentation: *not applicable* -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
