Hi Zexian, the general idea and approach LGTM. A couple of questions: * If we don't want to provide RateLimiter on operator level (seems to be out of scope for this FLIP), can we still make the RateLimiter a property of the Source similar to Watermark strategy and pass it through the implementation? I'm guessing that we will then only have very coarse-grain control because it's basically implemented on SourceOperator and hence can limit batches only. The upside is that we don't need to touch each and every connector and get something "for free" that probably works good enough for many cases. * We could additionally introduce some kind of mixin to the SourceReader interface that would be able to push down the RateLimiter into the connector. That would enable fine-grain control as you proposed. * Note that each and every change to the public APIs will break connectors in a way (your migration section is a bit slim on that part). In this case, if you add a new overload to the ctor of SourceReaderBase, implementing connectors will not run on Flink 2.1- anymore. Unfortunately, mixins have the same issue. I have yet to find a good solution to support features optionally, such that the source runs fine on older versions (without rate limiting) and on newer versions. I guess the only way would be some reflection magic and I'd rather have less of that in Flink than more. * I'm not sold on the idea that we need to pass notifyCheckpointAborted to the RateLimiter for the sources. Can you expand on why this is needed? Checkpoint barrier is injected differently into the sources than other operators. notifyCheckpointAborted is also not guaranteed to be called, so if there is a risk of deadlock, we should find other options.
Best, Arvid On Tue, Aug 5, 2025 at 8:48 AM Jiangang Liu <liujiangangp...@gmail.com> wrote: > Thanks for your explanation. > 1. It makes sense to me to solve the imbalance in other issues. > 2. In fact, we support to adjust the source's qps dynamically in our inner > flink. We can support this feature later. > > Best, > Jiangang Liu > > Zexian WU <wzx3122351...@gmail.com> 于2025年8月5日周二 14:35写道: > > > Hi Jiangang, > > > > Thank you very much for your valuable feedback and insightful questions. > > The two scenarios you've distilled from real-world production experience > > are extremely helpful for improving this feature. > > > > 1. Regarding the load imbalance issue between source tasks: > > > > Your point about load imbalance between source tasks potentially leading > to > > a suboptimal rate-limiting effect is absolutely correct. Ideally, a > > mechanism that can dynamically allocate the rate based on the number of > > splits assigned to each subtask would certainly be a superior solution. > > > > In fact, in the current design, we've introduced the notifyAddingSplit > > interface, which provides the possibility for some level of local dynamic > > adjustment. However, to implement a globally unified, on-demand dynamic > > allocation, it would indeed require introducing a new communication > channel > > between the SourceReader and the existing Coordinator, which would add > > implementation complexity and potential performance overhead. > > > > Furthermore, a more direct optimization is to have the Enumerator in the > > Coordinator distribute splits as evenly as possible. I've noticed that > the > > community is already working on this with the ongoing FLIP-537: > Enumerator > > with Global Split Assignment Distribution for Balanced Split assignment. > > Improving the split distribution logic seems like a simpler and more > direct > > solution to the root problem than introducing a complex coordination > > mechanism just for rate limiting. Therefore, at this stage, we have opted > > for a simpler, self-contained approach as a robust first step. > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment > > > > 2. Regarding the generic, operator-level configuration: > > > > Regarding the generic configuration method you mentioned—using a > > ConfigOption<Map> to configure QPS for any operator—that is indeed a very > > flexible and powerful design. > > > > I'd like to confirm with you: is the main purpose of your proposed > solution > > also to enable dynamic rate limiting more conveniently (for example, > > adjusting the rate at runtime via external signals)? This FLIP currently > > focuses primarily on the source side, as it addresses the most immediate > > pain point. Your idea is very insightful, and if our goals are aligned, > it > > serves as an excellent reference for the future evolution of this > feature. > > > > Thanks again for your insightful contributions; they are crucial for > > refining and evolving this feature. > > > > Best, > > Zexian > > > > Jiangang Liu <liujiangangp...@gmail.com> 于2025年8月4日周一 23:13写道: > > > > > Hello, Zexian. This is a great work for job's stability when failover > or > > > catching the lag. I just have some questions: > > > > > > 1. For some source tasks, their consuming splits may be more than > > > others. The simple calculation QPS based on parallelism may cause > > > imbalance. Can we support a calculation based on the split number > as a > > > common way? > > > 2. In our company, we support to config each operator's qps by > > > ConfigOption<Map>. This is convenient if we can get the operatorId > > > easily > > > before submitting. But in open source, it may be hard to do so. I > > wonder > > > whether there exist similar way so that we do not need to implement > > each > > > connector. > > > > > > Best, > > > Jiangang Liu > > > > > > Zexian WU <wzx3122351...@gmail.com> 于2025年8月4日周一 14:35写道: > > > > > > > Hi Shengkai, > > > > > > > > Thank you for your thoughtful feedback and excellent questions. These > > are > > > > all great points. Here are my thoughts: > > > > > > > > Regarding the requestSize parameter and its behavior: requestSize is > > not > > > a > > > > request for permission but a post-facto count of records that have > > > already > > > > been emitted. If this count (e.g., 3) exceeds the available rate > credit > > > > (e.g., for 2 records), RateLimiter#acquire will return a > > CompletionStage > > > > that completes in the future. This creates a non-blocking pause in > the > > > > SourceReader, allowing it to pay back the "time debt" and ensuring > the > > > > average rate is maintained. > > > > > > > > Regarding notifyCheckpointAborted: You are absolutely right; this is > a > > > > critical point for preventing deadlocks. The specific plan is to add > a > > > > notifyCheckpointAborted method to the RateLimiter interface. Then, > > within > > > > the SourceReader implementation, I will handle the checkpoint abort > > > > callback and invoke this new method to reset the rate limiter's > > internal > > > > state. > > > > > > > > Regarding configuration and unsupported connectors: You've made an > > > > excellent point about this being a connector-specific feature. > > > > > > > > I agree that scan.rate-limit.record-per-second is a great name, and I > > > will > > > > propose it as a best practice. I plan to use it in our initial > > reference > > > > implementation, likely for the Kafka connector, and it can serve as a > > > > reference for other connectors in the future. > > > > You are correct that the decision to support this feature and how to > > name > > > > the option lies with each connector. The framework only provides the > > > > underlying mechanism (the RateLimiter integration). The mechanism for > > > > providing clear error feedback works perfectly with this model: a > > > connector > > > > that implements rate limiting will declare the option in its > > > > DynamicTableFactory. Connectors that don't support it will not > declare > > > the > > > > option. Consequently, Flink’s built-in validation will automatically > > > throw > > > > a ValidationException for an unknown option, providing the desired > user > > > > feedback. > > > > > > > > Best, > > > > Zexian > > > > > > > > Shengkai Fang <fskm...@gmail.com> 于2025年7月28日周一 14:57写道: > > > > > > > > > Hi, Zexian. > > > > > > > > > > Thanks for your FLIP. I think rate litmit is very important feature > > for > > > > our > > > > > users. But I have some questions about the FLIP: > > > > > > > > > > 1. How do I determine the input parameter `requestSize` for > > > > > `RateLimiter#acquire`? If the rate limiter indicates there are 2 > > > > remaining > > > > > requests that can be emitted, but the `requestSize` is 3, what is > the > > > > > behavior here? > > > > > 2. CheckpointListener also has a method named > > notifyCheckpointAborted, > > > I > > > > > think RateLimiter also needs this. If the checkpoint aborts, please > > > clear > > > > > the status of the rate limiter. > > > > > 3. I think `scan.rate-limit.record-per-second` is better than > > > > > `scan.rate.limit.record-per-second`. It seems only FLIP-27 source > > > > supports > > > > > rate-limit, it's better we can throw an exception to notify users > if > > > the > > > > > source doesn't support this feature. > > > > > > > > > > Best, > > > > > Shengkai > > > > > > > > > > Zexian WU <wzx3122351...@gmail.com> 于2025年7月28日周一 11:52写道: > > > > > > > > > > > Hi Leonard, > > > > > > > > > > > > Thanks a lot for your support and positive feedback! I'm glad to > > hear > > > > you > > > > > > think the design meets the needs of most scenarios. > > > > > > > > > > > > Indeed, rate limiting is a fundamental and important feature, and > > I'm > > > > > > excited to help fill this gap. > > > > > > > > > > > > I'm also looking forward to more ideas and potential improvements > > > from > > > > > > other community members. > > > > > > > > > > > > Thanks again! > > > > > > > > > > > > Best, > > > > > > Zexian > > > > > > > > > > > > > > > > > > On 2025/07/24 12:19:39 Leonard Xu wrote: > > > > > > > Thanks Zexian for driving this work. > > > > > > > > > > > > > > Rate limiting is a common requirement, TBH, we should have > > > supported > > > > it > > > > > > in earlier stage, and the proposed design integrating it into the > > > > source > > > > > > operator lifecycle, it is already able to meet the vast majority > of > > > > > > scenarios, looks good from my side. > > > > > > > > > > > > > > Best, > > > > > > > Leonard > > > > > > > > > > > > > > > > > > > > > > 2025 7月 18 12:01,Zexian WU <wz...@gmail.com> 写道: > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > I would like to start a discussion on a new Flink Improvement > > > > > Proposal > > > > > > (FLIP), FLIP-535: Introduce RateLimiter to Source. > > > > > > > > The full proposal can be found on the wiki: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-535%3A+Introduce+RateLimiter+to+Source > > > > > > > > Motivation > > > > > > > > > > > > > > > > In many production environments, Flink sources read from > shared > > > > > > external systems (like Kafka, Pulsar, or databases) where > resources > > > are > > > > > > limited. An uncontrolled data ingestion rate can lead to critical > > > > issues: > > > > > > > > Resource Contention: A high-throughput Flink job can > saturate a > > > > > shared > > > > > > Kafka cluster's bandwidth, starving other essential services. > > > > > > > > System Instability: Aggressive polling from a database (e.g., > > > > MySQL) > > > > > > can overwhelm its IOPS, degrading performance for transactional > > > > queries. > > > > > > > > This proposal aims to provide a built-in, precise, and > > > easy-to-use > > > > > > rate-limiting mechanism for Flink Sources to ensure system > > stability > > > > and > > > > > > fair resource sharing. > > > > > > > > Proposed Solution > > > > > > > > > > > > > > > > The core idea is to integrate a flexible, record-based > > > > rate-limiting > > > > > > mechanism directly into the SourceReaderBase, making it available > > to > > > > all > > > > > > connectors built on the new source interface. > > > > > > > > Key Changes: > > > > > > > > Seamless Integration via SourceReaderBase: > > > > > > > > New constructors accepting a RateLimiterStrategy will be > added > > > > > > directly to SourceReaderBase. This allows any connector built on > > the > > > > > modern > > > > > > source interface (like Kafka or Pulsar) to enable rate limiting > > with > > > > > > minimal code changes. > > > > > > > > Accurate, Post-Emission Throttling: > > > > > > > > To support sources with unpredictable batch sizes (e.g., > > Kafka), > > > > rate > > > > > > limiting is applied after records are emitted. The reader counts > > the > > > > > > records after each recordEmitter.emitRecord method call, counts > the > > > > > > records, and only then consults the RateLimiter. This ensures > > > > throttling > > > > > is > > > > > > based on the precise number of records processed. > > > > > > > > Fully Non-Blocking Operation: > > > > > > > > The entire mechanism is asynchronous and non-blocking. If a > > rate > > > > > limit > > > > > > is hit, the reader pauses by returning > InputStatus.MORE_AVAILABLE. > > > This > > > > > > yields control to the Flink task's event loop without blocking > the > > > > > > processing thread, ensuring that critical operations like > > > checkpointing > > > > > are > > > > > > never delayed. > > > > > > > > Unified Configuration via SQL/Table API: > > > > > > > > Users can configure rate limits consistently across different > > > > sources > > > > > > using a simple SQL table option, such as WITH ('scan.rate.limit' > = > > > > > '1000'). > > > > > > This provides a unified and user-friendly experience for pipeline > > > > tuning. > > > > > > > > The design is backward-compatible, and existing custom > sources > > > will > > > > > > continue to work without any modification. > > > > > > > > I believe this feature will be a valuable addition for Flink > > > users > > > > > > operating in complex, multi-tenant environments. I'm looking > > forward > > > to > > > > > > your feedback, suggestions, and any potential concerns you might > > > have. > > > > > > > > Thanks, > > > > > > > > Zexian Wu > > > > > > > > > > > > > > > > > > > > > > > > > > > > >