Hello, Zexian. This is a great work for job's stability when failover or catching the lag. I just have some questions:
1. For some source tasks, their consuming splits may be more than others. The simple calculation QPS based on parallelism may cause imbalance. Can we support a calculation based on the split number as a common way? 2. In our company, we support to config each operator's qps by ConfigOption<Map>. This is convenient if we can get the operatorId easily before submitting. But in open source, it may be hard to do so. I wonder whether there exist similar way so that we do not need to implement each connector. Best, Jiangang Liu Zexian WU <wzx3122351...@gmail.com> 于2025年8月4日周一 14:35写道: > Hi Shengkai, > > Thank you for your thoughtful feedback and excellent questions. These are > all great points. Here are my thoughts: > > Regarding the requestSize parameter and its behavior: requestSize is not a > request for permission but a post-facto count of records that have already > been emitted. If this count (e.g., 3) exceeds the available rate credit > (e.g., for 2 records), RateLimiter#acquire will return a CompletionStage > that completes in the future. This creates a non-blocking pause in the > SourceReader, allowing it to pay back the "time debt" and ensuring the > average rate is maintained. > > Regarding notifyCheckpointAborted: You are absolutely right; this is a > critical point for preventing deadlocks. The specific plan is to add a > notifyCheckpointAborted method to the RateLimiter interface. Then, within > the SourceReader implementation, I will handle the checkpoint abort > callback and invoke this new method to reset the rate limiter's internal > state. > > Regarding configuration and unsupported connectors: You've made an > excellent point about this being a connector-specific feature. > > I agree that scan.rate-limit.record-per-second is a great name, and I will > propose it as a best practice. I plan to use it in our initial reference > implementation, likely for the Kafka connector, and it can serve as a > reference for other connectors in the future. > You are correct that the decision to support this feature and how to name > the option lies with each connector. The framework only provides the > underlying mechanism (the RateLimiter integration). The mechanism for > providing clear error feedback works perfectly with this model: a connector > that implements rate limiting will declare the option in its > DynamicTableFactory. Connectors that don't support it will not declare the > option. Consequently, Flink’s built-in validation will automatically throw > a ValidationException for an unknown option, providing the desired user > feedback. > > Best, > Zexian > > Shengkai Fang <fskm...@gmail.com> 于2025年7月28日周一 14:57写道: > > > Hi, Zexian. > > > > Thanks for your FLIP. I think rate litmit is very important feature for > our > > users. But I have some questions about the FLIP: > > > > 1. How do I determine the input parameter `requestSize` for > > `RateLimiter#acquire`? If the rate limiter indicates there are 2 > remaining > > requests that can be emitted, but the `requestSize` is 3, what is the > > behavior here? > > 2. CheckpointListener also has a method named notifyCheckpointAborted, I > > think RateLimiter also needs this. If the checkpoint aborts, please clear > > the status of the rate limiter. > > 3. I think `scan.rate-limit.record-per-second` is better than > > `scan.rate.limit.record-per-second`. It seems only FLIP-27 source > supports > > rate-limit, it's better we can throw an exception to notify users if the > > source doesn't support this feature. > > > > Best, > > Shengkai > > > > Zexian WU <wzx3122351...@gmail.com> 于2025年7月28日周一 11:52写道: > > > > > Hi Leonard, > > > > > > Thanks a lot for your support and positive feedback! I'm glad to hear > you > > > think the design meets the needs of most scenarios. > > > > > > Indeed, rate limiting is a fundamental and important feature, and I'm > > > excited to help fill this gap. > > > > > > I'm also looking forward to more ideas and potential improvements from > > > other community members. > > > > > > Thanks again! > > > > > > Best, > > > Zexian > > > > > > > > > On 2025/07/24 12:19:39 Leonard Xu wrote: > > > > Thanks Zexian for driving this work. > > > > > > > > Rate limiting is a common requirement, TBH, we should have supported > it > > > in earlier stage, and the proposed design integrating it into the > source > > > operator lifecycle, it is already able to meet the vast majority of > > > scenarios, looks good from my side. > > > > > > > > Best, > > > > Leonard > > > > > > > > > > > > > 2025 7月 18 12:01,Zexian WU <wz...@gmail.com> 写道: > > > > > > > > > > Hi everyone, > > > > > I would like to start a discussion on a new Flink Improvement > > Proposal > > > (FLIP), FLIP-535: Introduce RateLimiter to Source. > > > > > The full proposal can be found on the wiki: > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-535%3A+Introduce+RateLimiter+to+Source > > > > > Motivation > > > > > > > > > > In many production environments, Flink sources read from shared > > > external systems (like Kafka, Pulsar, or databases) where resources are > > > limited. An uncontrolled data ingestion rate can lead to critical > issues: > > > > > Resource Contention: A high-throughput Flink job can saturate a > > shared > > > Kafka cluster's bandwidth, starving other essential services. > > > > > System Instability: Aggressive polling from a database (e.g., > MySQL) > > > can overwhelm its IOPS, degrading performance for transactional > queries. > > > > > This proposal aims to provide a built-in, precise, and easy-to-use > > > rate-limiting mechanism for Flink Sources to ensure system stability > and > > > fair resource sharing. > > > > > Proposed Solution > > > > > > > > > > The core idea is to integrate a flexible, record-based > rate-limiting > > > mechanism directly into the SourceReaderBase, making it available to > all > > > connectors built on the new source interface. > > > > > Key Changes: > > > > > Seamless Integration via SourceReaderBase: > > > > > New constructors accepting a RateLimiterStrategy will be added > > > directly to SourceReaderBase. This allows any connector built on the > > modern > > > source interface (like Kafka or Pulsar) to enable rate limiting with > > > minimal code changes. > > > > > Accurate, Post-Emission Throttling: > > > > > To support sources with unpredictable batch sizes (e.g., Kafka), > rate > > > limiting is applied after records are emitted. The reader counts the > > > records after each recordEmitter.emitRecord method call, counts the > > > records, and only then consults the RateLimiter. This ensures > throttling > > is > > > based on the precise number of records processed. > > > > > Fully Non-Blocking Operation: > > > > > The entire mechanism is asynchronous and non-blocking. If a rate > > limit > > > is hit, the reader pauses by returning InputStatus.MORE_AVAILABLE. This > > > yields control to the Flink task's event loop without blocking the > > > processing thread, ensuring that critical operations like checkpointing > > are > > > never delayed. > > > > > Unified Configuration via SQL/Table API: > > > > > Users can configure rate limits consistently across different > sources > > > using a simple SQL table option, such as WITH ('scan.rate.limit' = > > '1000'). > > > This provides a unified and user-friendly experience for pipeline > tuning. > > > > > The design is backward-compatible, and existing custom sources will > > > continue to work without any modification. > > > > > I believe this feature will be a valuable addition for Flink users > > > operating in complex, multi-tenant environments. I'm looking forward to > > > your feedback, suggestions, and any potential concerns you might have. > > > > > Thanks, > > > > > Zexian Wu > > > > > > > > > > >