Hi, Zexian. Thanks for your FLIP. I think rate litmit is very important feature for our users. But I have some questions about the FLIP:
1. How do I determine the input parameter `requestSize` for `RateLimiter#acquire`? If the rate limiter indicates there are 2 remaining requests that can be emitted, but the `requestSize` is 3, what is the behavior here? 2. CheckpointListener also has a method named notifyCheckpointAborted, I think RateLimiter also needs this. If the checkpoint aborts, please clear the status of the rate limiter. 3. I think `scan.rate-limit.record-per-second` is better than `scan.rate.limit.record-per-second`. It seems only FLIP-27 source supports rate-limit, it's better we can throw an exception to notify users if the source doesn't support this feature. Best, Shengkai Zexian WU <wzx3122351...@gmail.com> 于2025年7月28日周一 11:52写道: > Hi Leonard, > > Thanks a lot for your support and positive feedback! I'm glad to hear you > think the design meets the needs of most scenarios. > > Indeed, rate limiting is a fundamental and important feature, and I'm > excited to help fill this gap. > > I'm also looking forward to more ideas and potential improvements from > other community members. > > Thanks again! > > Best, > Zexian > > > On 2025/07/24 12:19:39 Leonard Xu wrote: > > Thanks Zexian for driving this work. > > > > Rate limiting is a common requirement, TBH, we should have supported it > in earlier stage, and the proposed design integrating it into the source > operator lifecycle, it is already able to meet the vast majority of > scenarios, looks good from my side. > > > > Best, > > Leonard > > > > > > > 2025 7月 18 12:01,Zexian WU <wz...@gmail.com> 写道: > > > > > > Hi everyone, > > > I would like to start a discussion on a new Flink Improvement Proposal > (FLIP), FLIP-535: Introduce RateLimiter to Source. > > > The full proposal can be found on the wiki: > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-535%3A+Introduce+RateLimiter+to+Source > > > Motivation > > > > > > In many production environments, Flink sources read from shared > external systems (like Kafka, Pulsar, or databases) where resources are > limited. An uncontrolled data ingestion rate can lead to critical issues: > > > Resource Contention: A high-throughput Flink job can saturate a shared > Kafka cluster's bandwidth, starving other essential services. > > > System Instability: Aggressive polling from a database (e.g., MySQL) > can overwhelm its IOPS, degrading performance for transactional queries. > > > This proposal aims to provide a built-in, precise, and easy-to-use > rate-limiting mechanism for Flink Sources to ensure system stability and > fair resource sharing. > > > Proposed Solution > > > > > > The core idea is to integrate a flexible, record-based rate-limiting > mechanism directly into the SourceReaderBase, making it available to all > connectors built on the new source interface. > > > Key Changes: > > > Seamless Integration via SourceReaderBase: > > > New constructors accepting a RateLimiterStrategy will be added > directly to SourceReaderBase. This allows any connector built on the modern > source interface (like Kafka or Pulsar) to enable rate limiting with > minimal code changes. > > > Accurate, Post-Emission Throttling: > > > To support sources with unpredictable batch sizes (e.g., Kafka), rate > limiting is applied after records are emitted. The reader counts the > records after each recordEmitter.emitRecord method call, counts the > records, and only then consults the RateLimiter. This ensures throttling is > based on the precise number of records processed. > > > Fully Non-Blocking Operation: > > > The entire mechanism is asynchronous and non-blocking. If a rate limit > is hit, the reader pauses by returning InputStatus.MORE_AVAILABLE. This > yields control to the Flink task's event loop without blocking the > processing thread, ensuring that critical operations like checkpointing are > never delayed. > > > Unified Configuration via SQL/Table API: > > > Users can configure rate limits consistently across different sources > using a simple SQL table option, such as WITH ('scan.rate.limit' = '1000'). > This provides a unified and user-friendly experience for pipeline tuning. > > > The design is backward-compatible, and existing custom sources will > continue to work without any modification. > > > I believe this feature will be a valuable addition for Flink users > operating in complex, multi-tenant environments. I'm looking forward to > your feedback, suggestions, and any potential concerns you might have. > > > Thanks, > > > Zexian Wu > > > >