Thanks zexian for driving this work, +1 from my side to start a vote if no more feedback in this week.
Best, Leonard > 2025 8月 12 11:07,Zexian WU <wzx3122351...@gmail.com> 写道: > > Hi Arvid, > > Hope you are having a great week. > > I'm writing to gently follow up on the discussion for FLIP-535. Thank you > again for your insightful feedback earlier; it was very helpful for > refining the proposal. > > I wanted to check in and see if you have any further thoughts or questions. > Your input is highly valued. > > If there are no other major concerns from the community, I'm hoping to move > the proposal towards a formal vote soon. > > Best regards, > Zexian Wu > > Arvid Heise <ahe...@confluent.io.invalid> 于2025年8月5日周二 17:49写道: > >> Hi Zexian, >> >> the general idea and approach LGTM. A couple of questions: >> * If we don't want to provide RateLimiter on operator level (seems to be >> out of scope for this FLIP), can we still make the RateLimiter a property >> of the Source similar to Watermark strategy and pass it through the >> implementation? I'm guessing that we will then only have very coarse-grain >> control because it's basically implemented on SourceOperator and hence can >> limit batches only. The upside is that we don't need to touch each and >> every connector and get something "for free" that probably works good >> enough for many cases. >> * We could additionally introduce some kind of mixin to the SourceReader >> interface that would be able to push down the RateLimiter into the >> connector. That would enable fine-grain control as you proposed. >> * Note that each and every change to the public APIs will break connectors >> in a way (your migration section is a bit slim on that part). In this case, >> if you add a new overload to the ctor of SourceReaderBase, implementing >> connectors will not run on Flink 2.1- anymore. Unfortunately, mixins have >> the same issue. I have yet to find a good solution to support features >> optionally, such that the source runs fine on older versions (without rate >> limiting) and on newer versions. I guess the only way would be some >> reflection magic and I'd rather have less of that in Flink than more. >> * I'm not sold on the idea that we need to pass notifyCheckpointAborted to >> the RateLimiter for the sources. Can you expand on why this is needed? >> Checkpoint barrier is injected differently into the sources than other >> operators. notifyCheckpointAborted is also not guaranteed to be called, so >> if there is a risk of deadlock, we should find other options. >> >> Best, >> >> Arvid >> >> On Tue, Aug 5, 2025 at 8:48 AM Jiangang Liu <liujiangangp...@gmail.com> >> wrote: >> >>> Thanks for your explanation. >>> 1. It makes sense to me to solve the imbalance in other issues. >>> 2. In fact, we support to adjust the source's qps dynamically in our >> inner >>> flink. We can support this feature later. >>> >>> Best, >>> Jiangang Liu >>> >>> Zexian WU <wzx3122351...@gmail.com> 于2025年8月5日周二 14:35写道: >>> >>>> Hi Jiangang, >>>> >>>> Thank you very much for your valuable feedback and insightful >> questions. >>>> The two scenarios you've distilled from real-world production >> experience >>>> are extremely helpful for improving this feature. >>>> >>>> 1. Regarding the load imbalance issue between source tasks: >>>> >>>> Your point about load imbalance between source tasks potentially >> leading >>> to >>>> a suboptimal rate-limiting effect is absolutely correct. Ideally, a >>>> mechanism that can dynamically allocate the rate based on the number of >>>> splits assigned to each subtask would certainly be a superior solution. >>>> >>>> In fact, in the current design, we've introduced the notifyAddingSplit >>>> interface, which provides the possibility for some level of local >> dynamic >>>> adjustment. However, to implement a globally unified, on-demand dynamic >>>> allocation, it would indeed require introducing a new communication >>> channel >>>> between the SourceReader and the existing Coordinator, which would add >>>> implementation complexity and potential performance overhead. >>>> >>>> Furthermore, a more direct optimization is to have the Enumerator in >> the >>>> Coordinator distribute splits as evenly as possible. I've noticed that >>> the >>>> community is already working on this with the ongoing FLIP-537: >>> Enumerator >>>> with Global Split Assignment Distribution for Balanced Split >> assignment. >>>> Improving the split distribution logic seems like a simpler and more >>> direct >>>> solution to the root problem than introducing a complex coordination >>>> mechanism just for rate limiting. Therefore, at this stage, we have >> opted >>>> for a simpler, self-contained approach as a robust first step. >>>> >>>> >>>> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>> >>>> 2. Regarding the generic, operator-level configuration: >>>> >>>> Regarding the generic configuration method you mentioned—using a >>>> ConfigOption<Map> to configure QPS for any operator—that is indeed a >> very >>>> flexible and powerful design. >>>> >>>> I'd like to confirm with you: is the main purpose of your proposed >>> solution >>>> also to enable dynamic rate limiting more conveniently (for example, >>>> adjusting the rate at runtime via external signals)? This FLIP >> currently >>>> focuses primarily on the source side, as it addresses the most >> immediate >>>> pain point. Your idea is very insightful, and if our goals are aligned, >>> it >>>> serves as an excellent reference for the future evolution of this >>> feature. >>>> >>>> Thanks again for your insightful contributions; they are crucial for >>>> refining and evolving this feature. >>>> >>>> Best, >>>> Zexian >>>> >>>> Jiangang Liu <liujiangangp...@gmail.com> 于2025年8月4日周一 23:13写道: >>>> >>>>> Hello, Zexian. This is a great work for job's stability when failover >>> or >>>>> catching the lag. I just have some questions: >>>>> >>>>> 1. For some source tasks, their consuming splits may be more than >>>>> others. The simple calculation QPS based on parallelism may cause >>>>> imbalance. Can we support a calculation based on the split number >>> as a >>>>> common way? >>>>> 2. In our company, we support to config each operator's qps by >>>>> ConfigOption<Map>. This is convenient if we can get the operatorId >>>>> easily >>>>> before submitting. But in open source, it may be hard to do so. I >>>> wonder >>>>> whether there exist similar way so that we do not need to >> implement >>>> each >>>>> connector. >>>>> >>>>> Best, >>>>> Jiangang Liu >>>>> >>>>> Zexian WU <wzx3122351...@gmail.com> 于2025年8月4日周一 14:35写道: >>>>> >>>>>> Hi Shengkai, >>>>>> >>>>>> Thank you for your thoughtful feedback and excellent questions. >> These >>>> are >>>>>> all great points. Here are my thoughts: >>>>>> >>>>>> Regarding the requestSize parameter and its behavior: requestSize >> is >>>> not >>>>> a >>>>>> request for permission but a post-facto count of records that have >>>>> already >>>>>> been emitted. If this count (e.g., 3) exceeds the available rate >>> credit >>>>>> (e.g., for 2 records), RateLimiter#acquire will return a >>>> CompletionStage >>>>>> that completes in the future. This creates a non-blocking pause in >>> the >>>>>> SourceReader, allowing it to pay back the "time debt" and ensuring >>> the >>>>>> average rate is maintained. >>>>>> >>>>>> Regarding notifyCheckpointAborted: You are absolutely right; this >> is >>> a >>>>>> critical point for preventing deadlocks. The specific plan is to >> add >>> a >>>>>> notifyCheckpointAborted method to the RateLimiter interface. Then, >>>> within >>>>>> the SourceReader implementation, I will handle the checkpoint abort >>>>>> callback and invoke this new method to reset the rate limiter's >>>> internal >>>>>> state. >>>>>> >>>>>> Regarding configuration and unsupported connectors: You've made an >>>>>> excellent point about this being a connector-specific feature. >>>>>> >>>>>> I agree that scan.rate-limit.record-per-second is a great name, >> and I >>>>> will >>>>>> propose it as a best practice. I plan to use it in our initial >>>> reference >>>>>> implementation, likely for the Kafka connector, and it can serve >> as a >>>>>> reference for other connectors in the future. >>>>>> You are correct that the decision to support this feature and how >> to >>>> name >>>>>> the option lies with each connector. The framework only provides >> the >>>>>> underlying mechanism (the RateLimiter integration). The mechanism >> for >>>>>> providing clear error feedback works perfectly with this model: a >>>>> connector >>>>>> that implements rate limiting will declare the option in its >>>>>> DynamicTableFactory. Connectors that don't support it will not >>> declare >>>>> the >>>>>> option. Consequently, Flink’s built-in validation will >> automatically >>>>> throw >>>>>> a ValidationException for an unknown option, providing the desired >>> user >>>>>> feedback. >>>>>> >>>>>> Best, >>>>>> Zexian >>>>>> >>>>>> Shengkai Fang <fskm...@gmail.com> 于2025年7月28日周一 14:57写道: >>>>>> >>>>>>> Hi, Zexian. >>>>>>> >>>>>>> Thanks for your FLIP. I think rate litmit is very important >> feature >>>> for >>>>>> our >>>>>>> users. But I have some questions about the FLIP: >>>>>>> >>>>>>> 1. How do I determine the input parameter `requestSize` for >>>>>>> `RateLimiter#acquire`? If the rate limiter indicates there are 2 >>>>>> remaining >>>>>>> requests that can be emitted, but the `requestSize` is 3, what is >>> the >>>>>>> behavior here? >>>>>>> 2. CheckpointListener also has a method named >>>> notifyCheckpointAborted, >>>>> I >>>>>>> think RateLimiter also needs this. If the checkpoint aborts, >> please >>>>> clear >>>>>>> the status of the rate limiter. >>>>>>> 3. I think `scan.rate-limit.record-per-second` is better than >>>>>>> `scan.rate.limit.record-per-second`. It seems only FLIP-27 source >>>>>> supports >>>>>>> rate-limit, it's better we can throw an exception to notify users >>> if >>>>> the >>>>>>> source doesn't support this feature. >>>>>>> >>>>>>> Best, >>>>>>> Shengkai >>>>>>> >>>>>>> Zexian WU <wzx3122351...@gmail.com> 于2025年7月28日周一 11:52写道: >>>>>>> >>>>>>>> Hi Leonard, >>>>>>>> >>>>>>>> Thanks a lot for your support and positive feedback! I'm glad >> to >>>> hear >>>>>> you >>>>>>>> think the design meets the needs of most scenarios. >>>>>>>> >>>>>>>> Indeed, rate limiting is a fundamental and important feature, >> and >>>> I'm >>>>>>>> excited to help fill this gap. >>>>>>>> >>>>>>>> I'm also looking forward to more ideas and potential >> improvements >>>>> from >>>>>>>> other community members. >>>>>>>> >>>>>>>> Thanks again! >>>>>>>> >>>>>>>> Best, >>>>>>>> Zexian >>>>>>>> >>>>>>>> >>>>>>>> On 2025/07/24 12:19:39 Leonard Xu wrote: >>>>>>>>> Thanks Zexian for driving this work. >>>>>>>>> >>>>>>>>> Rate limiting is a common requirement, TBH, we should have >>>>> supported >>>>>> it >>>>>>>> in earlier stage, and the proposed design integrating it into >> the >>>>>> source >>>>>>>> operator lifecycle, it is already able to meet the vast >> majority >>> of >>>>>>>> scenarios, looks good from my side. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Leonard >>>>>>>>> >>>>>>>>> >>>>>>>>>> 2025 7月 18 12:01,Zexian WU <wz...@gmail.com> 写道: >>>>>>>>>> >>>>>>>>>> Hi everyone, >>>>>>>>>> I would like to start a discussion on a new Flink >> Improvement >>>>>>> Proposal >>>>>>>> (FLIP), FLIP-535: Introduce RateLimiter to Source. >>>>>>>>>> The full proposal can be found on the wiki: >>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-535%3A+Introduce+RateLimiter+to+Source >>>>>>>>>> Motivation >>>>>>>>>> >>>>>>>>>> In many production environments, Flink sources read from >>> shared >>>>>>>> external systems (like Kafka, Pulsar, or databases) where >>> resources >>>>> are >>>>>>>> limited. An uncontrolled data ingestion rate can lead to >> critical >>>>>> issues: >>>>>>>>>> Resource Contention: A high-throughput Flink job can >>> saturate a >>>>>>> shared >>>>>>>> Kafka cluster's bandwidth, starving other essential services. >>>>>>>>>> System Instability: Aggressive polling from a database >> (e.g., >>>>>> MySQL) >>>>>>>> can overwhelm its IOPS, degrading performance for transactional >>>>>> queries. >>>>>>>>>> This proposal aims to provide a built-in, precise, and >>>>> easy-to-use >>>>>>>> rate-limiting mechanism for Flink Sources to ensure system >>>> stability >>>>>> and >>>>>>>> fair resource sharing. >>>>>>>>>> Proposed Solution >>>>>>>>>> >>>>>>>>>> The core idea is to integrate a flexible, record-based >>>>>> rate-limiting >>>>>>>> mechanism directly into the SourceReaderBase, making it >> available >>>> to >>>>>> all >>>>>>>> connectors built on the new source interface. >>>>>>>>>> Key Changes: >>>>>>>>>> Seamless Integration via SourceReaderBase: >>>>>>>>>> New constructors accepting a RateLimiterStrategy will be >>> added >>>>>>>> directly to SourceReaderBase. This allows any connector built >> on >>>> the >>>>>>> modern >>>>>>>> source interface (like Kafka or Pulsar) to enable rate limiting >>>> with >>>>>>>> minimal code changes. >>>>>>>>>> Accurate, Post-Emission Throttling: >>>>>>>>>> To support sources with unpredictable batch sizes (e.g., >>>> Kafka), >>>>>> rate >>>>>>>> limiting is applied after records are emitted. The reader >> counts >>>> the >>>>>>>> records after each recordEmitter.emitRecord method call, counts >>> the >>>>>>>> records, and only then consults the RateLimiter. This ensures >>>>>> throttling >>>>>>> is >>>>>>>> based on the precise number of records processed. >>>>>>>>>> Fully Non-Blocking Operation: >>>>>>>>>> The entire mechanism is asynchronous and non-blocking. If a >>>> rate >>>>>>> limit >>>>>>>> is hit, the reader pauses by returning >>> InputStatus.MORE_AVAILABLE. >>>>> This >>>>>>>> yields control to the Flink task's event loop without blocking >>> the >>>>>>>> processing thread, ensuring that critical operations like >>>>> checkpointing >>>>>>> are >>>>>>>> never delayed. >>>>>>>>>> Unified Configuration via SQL/Table API: >>>>>>>>>> Users can configure rate limits consistently across >> different >>>>>> sources >>>>>>>> using a simple SQL table option, such as WITH >> ('scan.rate.limit' >>> = >>>>>>> '1000'). >>>>>>>> This provides a unified and user-friendly experience for >> pipeline >>>>>> tuning. >>>>>>>>>> The design is backward-compatible, and existing custom >>> sources >>>>> will >>>>>>>> continue to work without any modification. >>>>>>>>>> I believe this feature will be a valuable addition for >> Flink >>>>> users >>>>>>>> operating in complex, multi-tenant environments. I'm looking >>>> forward >>>>> to >>>>>>>> your feedback, suggestions, and any potential concerns you >> might >>>>> have. >>>>>>>>>> Thanks, >>>>>>>>>> Zexian Wu >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>