Hi Shengkai, Thank you for your thoughtful feedback and excellent questions. These are all great points. Here are my thoughts:
Regarding the requestSize parameter and its behavior: requestSize is not a request for permission but a post-facto count of records that have already been emitted. If this count (e.g., 3) exceeds the available rate credit (e.g., for 2 records), RateLimiter#acquire will return a CompletionStage that completes in the future. This creates a non-blocking pause in the SourceReader, allowing it to pay back the "time debt" and ensuring the average rate is maintained. Regarding notifyCheckpointAborted: You are absolutely right; this is a critical point for preventing deadlocks. The specific plan is to add a notifyCheckpointAborted method to the RateLimiter interface. Then, within the SourceReader implementation, I will handle the checkpoint abort callback and invoke this new method to reset the rate limiter's internal state. Regarding configuration and unsupported connectors: You've made an excellent point about this being a connector-specific feature. I agree that scan.rate-limit.record-per-second is a great name, and I will propose it as a best practice. I plan to use it in our initial reference implementation, likely for the Kafka connector, and it can serve as a reference for other connectors in the future. You are correct that the decision to support this feature and how to name the option lies with each connector. The framework only provides the underlying mechanism (the RateLimiter integration). The mechanism for providing clear error feedback works perfectly with this model: a connector that implements rate limiting will declare the option in its DynamicTableFactory. Connectors that don't support it will not declare the option. Consequently, Flink’s built-in validation will automatically throw a ValidationException for an unknown option, providing the desired user feedback. Best, Zexian Shengkai Fang <fskm...@gmail.com> 于2025年7月28日周一 14:57写道: > Hi, Zexian. > > Thanks for your FLIP. I think rate litmit is very important feature for our > users. But I have some questions about the FLIP: > > 1. How do I determine the input parameter `requestSize` for > `RateLimiter#acquire`? If the rate limiter indicates there are 2 remaining > requests that can be emitted, but the `requestSize` is 3, what is the > behavior here? > 2. CheckpointListener also has a method named notifyCheckpointAborted, I > think RateLimiter also needs this. If the checkpoint aborts, please clear > the status of the rate limiter. > 3. I think `scan.rate-limit.record-per-second` is better than > `scan.rate.limit.record-per-second`. It seems only FLIP-27 source supports > rate-limit, it's better we can throw an exception to notify users if the > source doesn't support this feature. > > Best, > Shengkai > > Zexian WU <wzx3122351...@gmail.com> 于2025年7月28日周一 11:52写道: > > > Hi Leonard, > > > > Thanks a lot for your support and positive feedback! I'm glad to hear you > > think the design meets the needs of most scenarios. > > > > Indeed, rate limiting is a fundamental and important feature, and I'm > > excited to help fill this gap. > > > > I'm also looking forward to more ideas and potential improvements from > > other community members. > > > > Thanks again! > > > > Best, > > Zexian > > > > > > On 2025/07/24 12:19:39 Leonard Xu wrote: > > > Thanks Zexian for driving this work. > > > > > > Rate limiting is a common requirement, TBH, we should have supported it > > in earlier stage, and the proposed design integrating it into the source > > operator lifecycle, it is already able to meet the vast majority of > > scenarios, looks good from my side. > > > > > > Best, > > > Leonard > > > > > > > > > > 2025 7月 18 12:01,Zexian WU <wz...@gmail.com> 写道: > > > > > > > > Hi everyone, > > > > I would like to start a discussion on a new Flink Improvement > Proposal > > (FLIP), FLIP-535: Introduce RateLimiter to Source. > > > > The full proposal can be found on the wiki: > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-535%3A+Introduce+RateLimiter+to+Source > > > > Motivation > > > > > > > > In many production environments, Flink sources read from shared > > external systems (like Kafka, Pulsar, or databases) where resources are > > limited. An uncontrolled data ingestion rate can lead to critical issues: > > > > Resource Contention: A high-throughput Flink job can saturate a > shared > > Kafka cluster's bandwidth, starving other essential services. > > > > System Instability: Aggressive polling from a database (e.g., MySQL) > > can overwhelm its IOPS, degrading performance for transactional queries. > > > > This proposal aims to provide a built-in, precise, and easy-to-use > > rate-limiting mechanism for Flink Sources to ensure system stability and > > fair resource sharing. > > > > Proposed Solution > > > > > > > > The core idea is to integrate a flexible, record-based rate-limiting > > mechanism directly into the SourceReaderBase, making it available to all > > connectors built on the new source interface. > > > > Key Changes: > > > > Seamless Integration via SourceReaderBase: > > > > New constructors accepting a RateLimiterStrategy will be added > > directly to SourceReaderBase. This allows any connector built on the > modern > > source interface (like Kafka or Pulsar) to enable rate limiting with > > minimal code changes. > > > > Accurate, Post-Emission Throttling: > > > > To support sources with unpredictable batch sizes (e.g., Kafka), rate > > limiting is applied after records are emitted. The reader counts the > > records after each recordEmitter.emitRecord method call, counts the > > records, and only then consults the RateLimiter. This ensures throttling > is > > based on the precise number of records processed. > > > > Fully Non-Blocking Operation: > > > > The entire mechanism is asynchronous and non-blocking. If a rate > limit > > is hit, the reader pauses by returning InputStatus.MORE_AVAILABLE. This > > yields control to the Flink task's event loop without blocking the > > processing thread, ensuring that critical operations like checkpointing > are > > never delayed. > > > > Unified Configuration via SQL/Table API: > > > > Users can configure rate limits consistently across different sources > > using a simple SQL table option, such as WITH ('scan.rate.limit' = > '1000'). > > This provides a unified and user-friendly experience for pipeline tuning. > > > > The design is backward-compatible, and existing custom sources will > > continue to work without any modification. > > > > I believe this feature will be a valuable addition for Flink users > > operating in complex, multi-tenant environments. I'm looking forward to > > your feedback, suggestions, and any potential concerns you might have. > > > > Thanks, > > > > Zexian Wu > > > > > > >