loserwang1024 opened a new issue, #3304:
URL: https://github.com/apache/fluss/issues/3304

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and 
found nothing similar.
   
   
   ### Fluss version
   
   0.9.0 (latest release)
   
   ### Please describe the bug 🐞
   
   When a Flink source subscribes to multiple partition tables (or multiple 
buckets) whose consuming progress differs significantly, the reported 
`currentFetchEventTimeLag` metric is much smaller than the actual 
`currentEmitEventTimeLag`. This makes the metric misleading for monitoring and 
alerting, because:
   
   - `currentFetchEventTimeLag` stays near 0, suggesting the source has caught 
up.
   - `currentEmitEventTimeLag` reports a large lag (e.g. 2 hours), suggesting 
records are piling up.
   - Yet the Flink job has **no backpressure**, so the gap cannot be explained 
by downstream slowness.
   
   ## Root Cause
   
   In 
[`FlinkSourceSplitReader#forLogRecords`](file:///Users/loserwang/workstation/github/fluss/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java#L426-L494):
   
   ```java
   maxConsumerRecordTimestampInFetch =
           Math.max(maxConsumerRecordTimestampInFetch, lastRecord.timestamp());
   ...
   flinkSourceReaderMetrics.reportRecordEventTime(
           fetchTimestamp - maxConsumerRecordTimestampInFetch);
   ```
   
   The aggregated timestamp is the **MAX** across all buckets in the fetch, so 
the reported lag is effectively the **MIN** lag across buckets.
   
   ### Example scenario
   
   - Partition A is 2 hours behind → its last record timestamp is 2 hours old.
   - Partition B is reading the latest data → its last record timestamp ≈ `now`.
   
   Every `logScanner.poll(POLL_TIMEOUT)` tends to return partition B's fresh 
data (since it is always locally ready as a `CompletedFetch`). The `Math.max` 
across buckets then picks partition B's near-`now` timestamp, so:
   
   ```
   fetchTimestamp - maxTimestamp ≈ 0   → currentFetchEventTimeLag reports ~0
   ```
   
   Meanwhile, emit lag is computed **per record** and aggregated as **max**, so 
partition A's 2-hour-old records correctly inflate `currentEmitEventTimeLag`. 
This is why the two metrics diverge dramatically even though there is no 
backpressure.
   
   
   
   ### Solution
   
   1. **Align fetch lag semantics with emit lag (max-lag across buckets):**
      Change the aggregation in `forLogRecords` from `Math.max` to `Math.min` 
on record timestamp (equivalent to `max` on lag), so `currentFetchEventTimeLag` 
reports the worst-case lag across buckets in the fetch.
   
   2. **Expose per-bucket / per-split fetch lag:**
      Add a new gauge `currentFetchEventTimeLag` under the existing per-bucket 
metric group (next to `currentOffset`):
      - Non-partitioned: 
`fluss.reader.bucket.{bucket_id}.currentFetchEventTimeLag`
      - Partitioned: 
`fluss.reader.partition.{partition_id}.bucket.{bucket_id}.currentFetchEventTimeLag`
   
      This allows users to observe exactly which partition/bucket is lagging, 
making diagnosis like the scenario above straightforward.
   
   
   
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to