Hi Jiangang, Thank you very much for your valuable feedback and insightful questions. The two scenarios you've distilled from real-world production experience are extremely helpful for improving this feature.
1. Regarding the load imbalance issue between source tasks: Your point about load imbalance between source tasks potentially leading to a suboptimal rate-limiting effect is absolutely correct. Ideally, a mechanism that can dynamically allocate the rate based on the number of splits assigned to each subtask would certainly be a superior solution. In fact, in the current design, we've introduced the notifyAddingSplit interface, which provides the possibility for some level of local dynamic adjustment. However, to implement a globally unified, on-demand dynamic allocation, it would indeed require introducing a new communication channel between the SourceReader and the existing Coordinator, which would add implementation complexity and potential performance overhead. Furthermore, a more direct optimization is to have the Enumerator in the Coordinator distribute splits as evenly as possible. I've noticed that the community is already working on this with the ongoing FLIP-537: Enumerator with Global Split Assignment Distribution for Balanced Split assignment. Improving the split distribution logic seems like a simpler and more direct solution to the root problem than introducing a complex coordination mechanism just for rate limiting. Therefore, at this stage, we have opted for a simpler, self-contained approach as a robust first step. https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment 2. Regarding the generic, operator-level configuration: Regarding the generic configuration method you mentioned—using a ConfigOption<Map> to configure QPS for any operator—that is indeed a very flexible and powerful design. I'd like to confirm with you: is the main purpose of your proposed solution also to enable dynamic rate limiting more conveniently (for example, adjusting the rate at runtime via external signals)? This FLIP currently focuses primarily on the source side, as it addresses the most immediate pain point. Your idea is very insightful, and if our goals are aligned, it serves as an excellent reference for the future evolution of this feature. Thanks again for your insightful contributions; they are crucial for refining and evolving this feature. Best, Zexian Jiangang Liu <liujiangangp...@gmail.com> 于2025年8月4日周一 23:13写道: > Hello, Zexian. This is a great work for job's stability when failover or > catching the lag. I just have some questions: > > 1. For some source tasks, their consuming splits may be more than > others. The simple calculation QPS based on parallelism may cause > imbalance. Can we support a calculation based on the split number as a > common way? > 2. In our company, we support to config each operator's qps by > ConfigOption<Map>. This is convenient if we can get the operatorId > easily > before submitting. But in open source, it may be hard to do so. I wonder > whether there exist similar way so that we do not need to implement each > connector. > > Best, > Jiangang Liu > > Zexian WU <wzx3122351...@gmail.com> 于2025年8月4日周一 14:35写道: > > > Hi Shengkai, > > > > Thank you for your thoughtful feedback and excellent questions. These are > > all great points. Here are my thoughts: > > > > Regarding the requestSize parameter and its behavior: requestSize is not > a > > request for permission but a post-facto count of records that have > already > > been emitted. If this count (e.g., 3) exceeds the available rate credit > > (e.g., for 2 records), RateLimiter#acquire will return a CompletionStage > > that completes in the future. This creates a non-blocking pause in the > > SourceReader, allowing it to pay back the "time debt" and ensuring the > > average rate is maintained. > > > > Regarding notifyCheckpointAborted: You are absolutely right; this is a > > critical point for preventing deadlocks. The specific plan is to add a > > notifyCheckpointAborted method to the RateLimiter interface. Then, within > > the SourceReader implementation, I will handle the checkpoint abort > > callback and invoke this new method to reset the rate limiter's internal > > state. > > > > Regarding configuration and unsupported connectors: You've made an > > excellent point about this being a connector-specific feature. > > > > I agree that scan.rate-limit.record-per-second is a great name, and I > will > > propose it as a best practice. I plan to use it in our initial reference > > implementation, likely for the Kafka connector, and it can serve as a > > reference for other connectors in the future. > > You are correct that the decision to support this feature and how to name > > the option lies with each connector. The framework only provides the > > underlying mechanism (the RateLimiter integration). The mechanism for > > providing clear error feedback works perfectly with this model: a > connector > > that implements rate limiting will declare the option in its > > DynamicTableFactory. Connectors that don't support it will not declare > the > > option. Consequently, Flink’s built-in validation will automatically > throw > > a ValidationException for an unknown option, providing the desired user > > feedback. > > > > Best, > > Zexian > > > > Shengkai Fang <fskm...@gmail.com> 于2025年7月28日周一 14:57写道: > > > > > Hi, Zexian. > > > > > > Thanks for your FLIP. I think rate litmit is very important feature for > > our > > > users. But I have some questions about the FLIP: > > > > > > 1. How do I determine the input parameter `requestSize` for > > > `RateLimiter#acquire`? If the rate limiter indicates there are 2 > > remaining > > > requests that can be emitted, but the `requestSize` is 3, what is the > > > behavior here? > > > 2. CheckpointListener also has a method named notifyCheckpointAborted, > I > > > think RateLimiter also needs this. If the checkpoint aborts, please > clear > > > the status of the rate limiter. > > > 3. I think `scan.rate-limit.record-per-second` is better than > > > `scan.rate.limit.record-per-second`. It seems only FLIP-27 source > > supports > > > rate-limit, it's better we can throw an exception to notify users if > the > > > source doesn't support this feature. > > > > > > Best, > > > Shengkai > > > > > > Zexian WU <wzx3122351...@gmail.com> 于2025年7月28日周一 11:52写道: > > > > > > > Hi Leonard, > > > > > > > > Thanks a lot for your support and positive feedback! I'm glad to hear > > you > > > > think the design meets the needs of most scenarios. > > > > > > > > Indeed, rate limiting is a fundamental and important feature, and I'm > > > > excited to help fill this gap. > > > > > > > > I'm also looking forward to more ideas and potential improvements > from > > > > other community members. > > > > > > > > Thanks again! > > > > > > > > Best, > > > > Zexian > > > > > > > > > > > > On 2025/07/24 12:19:39 Leonard Xu wrote: > > > > > Thanks Zexian for driving this work. > > > > > > > > > > Rate limiting is a common requirement, TBH, we should have > supported > > it > > > > in earlier stage, and the proposed design integrating it into the > > source > > > > operator lifecycle, it is already able to meet the vast majority of > > > > scenarios, looks good from my side. > > > > > > > > > > Best, > > > > > Leonard > > > > > > > > > > > > > > > > 2025 7月 18 12:01,Zexian WU <wz...@gmail.com> 写道: > > > > > > > > > > > > Hi everyone, > > > > > > I would like to start a discussion on a new Flink Improvement > > > Proposal > > > > (FLIP), FLIP-535: Introduce RateLimiter to Source. > > > > > > The full proposal can be found on the wiki: > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-535%3A+Introduce+RateLimiter+to+Source > > > > > > Motivation > > > > > > > > > > > > In many production environments, Flink sources read from shared > > > > external systems (like Kafka, Pulsar, or databases) where resources > are > > > > limited. An uncontrolled data ingestion rate can lead to critical > > issues: > > > > > > Resource Contention: A high-throughput Flink job can saturate a > > > shared > > > > Kafka cluster's bandwidth, starving other essential services. > > > > > > System Instability: Aggressive polling from a database (e.g., > > MySQL) > > > > can overwhelm its IOPS, degrading performance for transactional > > queries. > > > > > > This proposal aims to provide a built-in, precise, and > easy-to-use > > > > rate-limiting mechanism for Flink Sources to ensure system stability > > and > > > > fair resource sharing. > > > > > > Proposed Solution > > > > > > > > > > > > The core idea is to integrate a flexible, record-based > > rate-limiting > > > > mechanism directly into the SourceReaderBase, making it available to > > all > > > > connectors built on the new source interface. > > > > > > Key Changes: > > > > > > Seamless Integration via SourceReaderBase: > > > > > > New constructors accepting a RateLimiterStrategy will be added > > > > directly to SourceReaderBase. This allows any connector built on the > > > modern > > > > source interface (like Kafka or Pulsar) to enable rate limiting with > > > > minimal code changes. > > > > > > Accurate, Post-Emission Throttling: > > > > > > To support sources with unpredictable batch sizes (e.g., Kafka), > > rate > > > > limiting is applied after records are emitted. The reader counts the > > > > records after each recordEmitter.emitRecord method call, counts the > > > > records, and only then consults the RateLimiter. This ensures > > throttling > > > is > > > > based on the precise number of records processed. > > > > > > Fully Non-Blocking Operation: > > > > > > The entire mechanism is asynchronous and non-blocking. If a rate > > > limit > > > > is hit, the reader pauses by returning InputStatus.MORE_AVAILABLE. > This > > > > yields control to the Flink task's event loop without blocking the > > > > processing thread, ensuring that critical operations like > checkpointing > > > are > > > > never delayed. > > > > > > Unified Configuration via SQL/Table API: > > > > > > Users can configure rate limits consistently across different > > sources > > > > using a simple SQL table option, such as WITH ('scan.rate.limit' = > > > '1000'). > > > > This provides a unified and user-friendly experience for pipeline > > tuning. > > > > > > The design is backward-compatible, and existing custom sources > will > > > > continue to work without any modification. > > > > > > I believe this feature will be a valuable addition for Flink > users > > > > operating in complex, multi-tenant environments. I'm looking forward > to > > > > your feedback, suggestions, and any potential concerns you might > have. > > > > > > Thanks, > > > > > > Zexian Wu > > > > > > > > > > > > > > > >