xishuaidelin opened a new issue, #2509: URL: https://github.com/apache/fluss/issues/2509
### Search before asking - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and found nothing similar. ### Motivation In [FLIP-309](https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog#FLIP309:Supportusinglargercheckpointingintervalwhensourceisprocessingbacklog-ProposedChanges), Flink introduced the concept of **ProcessingBacklog** to indicate whether a record should be processed in a **low-latency** or **high-throughput** manner. **ProcessingBacklog** can be used to dynamically adjust a job’s checkpoint interval at runtime. The **Fluss Source** can set **ProcessingBacklog** by marking offsets at the starting position to identify **backlog** data and report this signal to Flink, enabling Flink to perform relevant optimizations based on it. ### Solution ## Proposed Changes - **Add backlog boundary tracking in `FlinkSourceEnumerator`** - Introduce `backlogOffsets: Map<TableBucket, Long>` to store the backlog boundary offset per bucket. - Add/extend `resetBacklog(...)` to initialize splits and persist the backlog boundary offsets. - Update `initPartitionedSplits()` to pass `initializeAtBeginning` / backlog boundary offset information into the created splits. - Update `handleSourceEvent(...)` to handle a new `BacklogFinishEvent` and update the enumerator’s backlog tracking state. - **Propagate backlog boundary offset via split metadata** - Extend split types to carry backlog boundary offset: - `LogSplit`: add `backlogOffset` - `HybridSnapshotLogSplit`: add `backlogOffset` - **Introduce a new source event for backlog completion** - Add `BacklogFinishEvent extends SourceEvent` to signal that a specific `TableBucket` has finished consuming backlog (i.e., reader has reached/passed the backlog boundary offset). - **Report backlog completion from `FlinkSourceSplitReader`** - Add new fields: - `backlogMarkedOffsets: Map<TableBucket, Long>`: backlog boundary offsets per bucket (from assigned splits). - `onlySnapshotBuckets: Set<TableBucket>`: buckets with snapshot-only data (no log backlog to track). - `backlogEventSentTbls: Set<TableBucket>`: dedup to ensure `BacklogFinishEvent` is sent once per bucket. - `context: SourceReaderContext`: used to send `BacklogFinishEvent` to the coordinator. - Update reader logic to detect backlog completion and emit events: - `fetch(...)`: hook backlog completion reporting when a bucket completes backlog. - `subscribeLog(...)` and `forLogRecords(...)`: check current log offsets against `backlogMarkedOffsets`; once the boundary is reached, send `BacklogFinishEvent` (deduplicated). ### Anything else? _No response_ ### Willingness to contribute - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
