weiqingy opened a new pull request, #27083:
URL: https://github.com/apache/flink/pull/27083

   ## What is the purpose of the change
   
   This PR fixes a watermark aggregation bug that can cause zero output from 
time-based operators (e.g., IntervalJoin) when some upstream subtasks finish 
while others later become temporarily idle.
   
   Details of the issue are described in 
[FLINK-38477](https://issues.apache.org/jira/browse/FLINK-38477).
   
   **Root cause**:
   Flink currently allows finished subtasks to dominate watermark aggregation 
by emitting Long.MAX_VALUE instead of being excluded like idle inputs. This 
prematurely advances downstream watermarks to infinity (Long.MAX_VALUE), 
halting event-time progress and eliminating output.
   
   ## Brief change log
   **Goal:**  Ensure a finished input is **excluded** from operator watermark 
aggregation, just like an idle input. This prevents `Long.MAX_VALUE` from 
dominating aggregation when some channels are finished and others are idle.
   
   ### Implementation
   
   - Introduced a **new watermark status: `FINISHED`**.  
   - When an operator finishes, it emits:
     - `status = FINISHED`  
     - `watermark = Long.MAX_VALUE` (atomically, in sync with the status).  
   - Adjusted aggregation rules:
     - If there are **active channels** → `status = ACTIVE`, `watermark = 
min(all active channel watermarks)`.
     - Else if there are **idle channels** → `status = IDLE`, `watermark = 
max(all idle channel watermarks)`.
     - Else (**all channels finished**) → `status = FINISHED`, `watermark = 
Long.MAX_VALUE`.
   
   This ensures finished inputs are ignored during aggregation until every 
input has completed, at which point the operator watermark correctly advances 
to `Long.MAX_VALUE`.
   
   ### Core issues identified
   
   - **Current state model:** `ACTIVE` ←→ `IDLE`  
     - Cannot distinguish between:
       - `IDLE` = “temporarily no data, may resume”  
       - `FINISHED` = “permanently done, never resuming”  
   
   - **Rejected approach:** treating finished as idle.  
     - **Issue 1: IDLE status gets overwritten**  
       - `StreamTask.endData()` emits `WatermarkStatus.IDLE` when a source 
completes.  
       - Then `operatorChain.finishOperators()` calls 
`WatermarkAssignerOperator.processWatermark()`.  
       - When `WatermarkAssignerOperator` receives `Long.MAX_VALUE`, it 
forcibly emits `WatermarkStatus.ACTIVE`.  
       - Result: completed channels are incorrectly marked as ACTIVE, so the 
finished tasks still participate in watermark aggregation.  
   
     - **Issue 2: Incorrect aggregation when all channels are IDLE**  
       - Even if Issue 1 is fixed, when all channels become IDLE (including 
finished tasks whose watermark is `Long.MAX_VALUE`):  
       - `StatusWatermarkValve` calls 
`findAndOutputMaxWatermarkAcrossAllChannels()`.  
       - This still emits a `Long.MAX_VALUE` watermark.  
       - It cannot distinguish between “all temporarily idle” vs. “all 
permanently finished.”  
   
   **Conclusion:**  
   - Introducing the explicit `FINISHED` status is necessary to create a 
**three-state system (ACTIVE / IDLE / FINISHED)** that properly distinguishes 
temporary idleness from permanent completion.
   
   ### Long-term refactoring (follow-up)
   
   This PR fixes the immediate bug, but the broader problem is **fragmented 
watermark lifecycle management**.
   
   #### Current problems
   - **Multiple, conflicting emission points:** For example:
   ```
   StreamTask.endData():
     ├─ advanceToEndOfEventTime()
     │   └─ emits Long.MAX_VALUE watermark
     │
     └─ operatorChain.finishOperators():
         ├─ ContinuousFileReaderOperator.finish()
         │   └─ emits Long.MAX_VALUE (deprecated?)
         │
         ├─ WatermarkAssignerOperator.finish()
         │    ├─ emits Long.MAX_VALUE watermark
         │    └─ switches IDLE→ACTIVE if idleness enabled 
         └─ ...
   ```
   - Wrapper operators interfere with completion semantics.  
   - Status and watermark emitted separately → race conditions.  
   - Sources don’t fully control their own lifecycle.
   
   #### Proposed long-term direction
   - **Source-owned completion:** sources declare when they are active, idle, 
or finished, emitting status + watermark atomically. Wrapper operators should 
not override this.  
   
   - **Embed watermark strategy in source, not wrapper.**  
     - *Current Architecture:*  
       ```
       Source → WatermarkAssignerOperator (wrapper) → Downstream
                ↑ Adds watermark strategy
                ↑ Can conflict with source completion
       ```
     - *Benefits:*  
       - No wrapper operator to conflict with source status.  
       - Source and watermark strategy have consistent lifecycle.  
       - Table API can configure watermark strategy at source creation, not via 
wrapper. 
   
   #### Key principles
   1. **Single responsibility** - sources own completion, strategies own 
generation, runtime owns aggregation.  
   2. **Lifecycle ownership** -  no overrides by wrappers.  
   This refactoring would prevent the entire class of bugs we've been fixing 
and make watermark completion semantics clear and maintainable.
   
   ## Verifying this change
   
   ### Unit tests (new)
   - Mixed scenario: some channels `FINISHED`, others `IDLE` → no premature 
`Long.MAX_VALUE`.  
   - `FINISHED` channels emit only `Long.MAX_VALUE`; any non-MAX ignored.  
   - Aggregation rule selection verified:
     - Active → min(active watermarks).  
     - Idle (with no active) → idle status; finished excluded.  
     - All finished → operator watermark = `Long.MAX_VALUE`.  
   - Emission ordering tested: status emitted **before** watermark.
   
   ### E2e scenario (manual/e2e)
   1. Set job parallelism greater than Kafka partitions; disable dynamic 
partition discovery.  
   2. Some subtasks finish immediately, others keep running.  
   3. Stall the running subtasks until they become idle 
(`table.exec.source.idle-timeout=10s`).  
   4. **Before fix:** operator watermark = `Long.MAX_VALUE`, all records 
dropped → no output.  
      **After fix:** watermark does not jump; resumed records are processed 
correctly.
   
   ## Does this pull request potentially affect one of the following parts:
   - Dependencies: **no**  
   - Public API (`@Public(Evolving)`): **no**  
   - Serializers: **no**  
   - Runtime per-record code paths: **yes** (watermark aggregation; minimal 
guarded changes)  
   - Deployment/recovery (JM, checkpointing, K8s/Yarn, ZooKeeper): **no**  
   - S3 file system connector: **no**
   
   ## Documentation
   - New feature: **no** (bug fix only; introduces internal `FINISHED` 
watermark status)  
   - Documentation: *not applicable*
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to