xishuaidelin opened a new issue, #2509:
URL: https://github.com/apache/fluss/issues/2509

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and 
found nothing similar.
   
   
   ### Motivation
   
   In 
[FLIP-309](https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog#FLIP309:Supportusinglargercheckpointingintervalwhensourceisprocessingbacklog-ProposedChanges),
 Flink introduced the concept of **ProcessingBacklog** to indicate whether a 
record should be processed in a **low-latency** or **high-throughput** manner. 
**ProcessingBacklog** can be used to dynamically adjust a job’s checkpoint 
interval at runtime.
   
   The **Fluss Source** can set **ProcessingBacklog** by marking offsets at the 
starting position to identify **backlog** data and report this signal to Flink, 
enabling Flink to perform relevant optimizations based on it.
   
   ### Solution
   
   ## Proposed Changes
   
   - **Add backlog boundary tracking in `FlinkSourceEnumerator`**
     - Introduce `backlogOffsets: Map<TableBucket, Long>` to store the backlog 
boundary offset per bucket.
     - Add/extend `resetBacklog(...)` to initialize splits and persist the 
backlog boundary offsets.
     - Update `initPartitionedSplits()` to pass `initializeAtBeginning` / 
backlog boundary offset information into the created splits.
     - Update `handleSourceEvent(...)` to handle a new `BacklogFinishEvent` and 
update the enumerator’s backlog tracking state.
   
   - **Propagate backlog boundary offset via split metadata**
     - Extend split types to carry backlog boundary offset:
       - `LogSplit`: add `backlogOffset`
       - `HybridSnapshotLogSplit`: add `backlogOffset`
   
   - **Introduce a new source event for backlog completion**
     - Add `BacklogFinishEvent extends SourceEvent` to signal that a specific 
`TableBucket` has finished consuming backlog (i.e., reader has reached/passed 
the backlog boundary offset).
   
   - **Report backlog completion from `FlinkSourceSplitReader`**
     - Add new fields:
       - `backlogMarkedOffsets: Map<TableBucket, Long>`: backlog boundary 
offsets per bucket (from assigned splits).
       - `onlySnapshotBuckets: Set<TableBucket>`: buckets with snapshot-only 
data (no log backlog to track).
       - `backlogEventSentTbls: Set<TableBucket>`: dedup to ensure 
`BacklogFinishEvent` is sent once per bucket.
       - `context: SourceReaderContext`: used to send `BacklogFinishEvent` to 
the coordinator.
     - Update reader logic to detect backlog completion and emit events:
       - `fetch(...)`: hook backlog completion reporting when a bucket 
completes backlog.
       - `subscribeLog(...)` and `forLogRecords(...)`: check current log 
offsets against `backlogMarkedOffsets`; once the boundary is reached, send 
`BacklogFinishEvent` (deduplicated).
   
   ### Anything else?
   
   _No response_
   
   ### Willingness to contribute
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to