Thanks zexian for driving this work, +1 from my side to start a vote if no more 
feedback in this week. 

Best,
Leonard

> 2025 8月 12 11:07,Zexian WU <wzx3122351...@gmail.com> 写道:
> 
> Hi Arvid,
> 
> Hope you are having a great week.
> 
> I'm writing to gently follow up on the discussion for FLIP-535. Thank you
> again for your insightful feedback earlier; it was very helpful for
> refining the proposal.
> 
> I wanted to check in and see if you have any further thoughts or questions.
> Your input is highly valued.
> 
> If there are no other major concerns from the community, I'm hoping to move
> the proposal towards a formal vote soon.
> 
> Best regards,
> Zexian Wu
> 
> Arvid Heise <ahe...@confluent.io.invalid> 于2025年8月5日周二 17:49写道:
> 
>> Hi Zexian,
>> 
>> the general idea and approach LGTM. A couple of questions:
>> * If we don't want to provide RateLimiter on operator level (seems to be
>> out of scope for this FLIP), can we still make the RateLimiter a property
>> of the Source similar to Watermark strategy and pass it through the
>> implementation? I'm guessing that we will then only have very coarse-grain
>> control because it's basically implemented on SourceOperator and hence can
>> limit batches only. The upside is that we don't need to touch each and
>> every connector and get something "for free" that probably works good
>> enough for many cases.
>> * We could additionally introduce some kind of mixin to the SourceReader
>> interface that would be able to push down the RateLimiter into the
>> connector. That would enable fine-grain control as you proposed.
>> * Note that each and every change to the public APIs will break connectors
>> in a way (your migration section is a bit slim on that part). In this case,
>> if you add a new overload to the ctor of SourceReaderBase, implementing
>> connectors will not run on Flink 2.1- anymore. Unfortunately, mixins have
>> the same issue. I have yet to find a good solution to support features
>> optionally, such that the source runs fine on older versions (without rate
>> limiting) and on newer versions. I guess the only way would be some
>> reflection magic and I'd rather have less of that in Flink than more.
>> * I'm not sold on the idea that we need to pass notifyCheckpointAborted to
>> the RateLimiter for the sources. Can you expand on why this is needed?
>> Checkpoint barrier is injected differently into the sources than other
>> operators. notifyCheckpointAborted is also not guaranteed to be called, so
>> if there is a risk of deadlock, we should find other options.
>> 
>> Best,
>> 
>> Arvid
>> 
>> On Tue, Aug 5, 2025 at 8:48 AM Jiangang Liu <liujiangangp...@gmail.com>
>> wrote:
>> 
>>> Thanks for your explanation.
>>> 1. It makes sense to me to solve the imbalance in other issues.
>>> 2. In fact, we support to adjust the source's qps dynamically in our
>> inner
>>> flink. We can support this feature later.
>>> 
>>> Best,
>>> Jiangang Liu
>>> 
>>> Zexian WU <wzx3122351...@gmail.com> 于2025年8月5日周二 14:35写道:
>>> 
>>>> Hi Jiangang,
>>>> 
>>>> Thank you very much for your valuable feedback and insightful
>> questions.
>>>> The two scenarios you've distilled from real-world production
>> experience
>>>> are extremely helpful for improving this feature.
>>>> 
>>>> 1. Regarding the load imbalance issue between source tasks:
>>>> 
>>>> Your point about load imbalance between source tasks potentially
>> leading
>>> to
>>>> a suboptimal rate-limiting effect is absolutely correct. Ideally, a
>>>> mechanism that can dynamically allocate the rate based on the number of
>>>> splits assigned to each subtask would certainly be a superior solution.
>>>> 
>>>> In fact, in the current design, we've introduced the notifyAddingSplit
>>>> interface, which provides the possibility for some level of local
>> dynamic
>>>> adjustment. However, to implement a globally unified, on-demand dynamic
>>>> allocation, it would indeed require introducing a new communication
>>> channel
>>>> between the SourceReader and the existing Coordinator, which would add
>>>> implementation complexity and potential performance overhead.
>>>> 
>>>> Furthermore, a more direct optimization is to have the Enumerator in
>> the
>>>> Coordinator distribute splits as evenly as possible. I've noticed that
>>> the
>>>> community is already working on this with the ongoing FLIP-537:
>>> Enumerator
>>>> with Global Split Assignment Distribution for Balanced Split
>> assignment.
>>>> Improving the split distribution logic seems like a simpler and more
>>> direct
>>>> solution to the root problem than introducing a complex coordination
>>>> mechanism just for rate limiting. Therefore, at this stage, we have
>> opted
>>>> for a simpler, self-contained approach as a robust first step.
>>>> 
>>>> 
>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>> 
>>>> 2. Regarding the generic, operator-level configuration:
>>>> 
>>>> Regarding the generic configuration method you mentioned—using a
>>>> ConfigOption<Map> to configure QPS for any operator—that is indeed a
>> very
>>>> flexible and powerful design.
>>>> 
>>>> I'd like to confirm with you: is the main purpose of your proposed
>>> solution
>>>> also to enable dynamic rate limiting more conveniently (for example,
>>>> adjusting the rate at runtime via external signals)? This FLIP
>> currently
>>>> focuses primarily on the source side, as it addresses the most
>> immediate
>>>> pain point. Your idea is very insightful, and if our goals are aligned,
>>> it
>>>> serves as an excellent reference for the future evolution of this
>>> feature.
>>>> 
>>>> Thanks again for your insightful contributions; they are crucial for
>>>> refining and evolving this feature.
>>>> 
>>>> Best,
>>>> Zexian
>>>> 
>>>> Jiangang Liu <liujiangangp...@gmail.com> 于2025年8月4日周一 23:13写道:
>>>> 
>>>>> Hello, Zexian. This is a great work for job's stability when failover
>>> or
>>>>> catching the lag. I just have some questions:
>>>>> 
>>>>>   1. For some source tasks, their consuming splits may be more than
>>>>>   others. The simple calculation QPS based on parallelism may cause
>>>>>   imbalance. Can we support a calculation based on the split number
>>> as a
>>>>>   common way?
>>>>>   2. In our company, we support to config each operator's qps by
>>>>>   ConfigOption<Map>. This is convenient if we can get the operatorId
>>>>> easily
>>>>>   before submitting. But in open source, it may be hard to do so. I
>>>> wonder
>>>>>   whether there exist similar way so that we do not need to
>> implement
>>>> each
>>>>>   connector.
>>>>> 
>>>>> Best,
>>>>> Jiangang Liu
>>>>> 
>>>>> Zexian WU <wzx3122351...@gmail.com> 于2025年8月4日周一 14:35写道:
>>>>> 
>>>>>> Hi Shengkai,
>>>>>> 
>>>>>> Thank you for your thoughtful feedback and excellent questions.
>> These
>>>> are
>>>>>> all great points. Here are my thoughts:
>>>>>> 
>>>>>> Regarding the requestSize parameter and its behavior: requestSize
>> is
>>>> not
>>>>> a
>>>>>> request for permission but a post-facto count of records that have
>>>>> already
>>>>>> been emitted. If this count (e.g., 3) exceeds the available rate
>>> credit
>>>>>> (e.g., for 2 records), RateLimiter#acquire will return a
>>>> CompletionStage
>>>>>> that completes in the future. This creates a non-blocking pause in
>>> the
>>>>>> SourceReader, allowing it to pay back the "time debt" and ensuring
>>> the
>>>>>> average rate is maintained.
>>>>>> 
>>>>>> Regarding notifyCheckpointAborted: You are absolutely right; this
>> is
>>> a
>>>>>> critical point for preventing deadlocks. The specific plan is to
>> add
>>> a
>>>>>> notifyCheckpointAborted method to the RateLimiter interface. Then,
>>>> within
>>>>>> the SourceReader implementation, I will handle the checkpoint abort
>>>>>> callback and invoke this new method to reset the rate limiter's
>>>> internal
>>>>>> state.
>>>>>> 
>>>>>> Regarding configuration and unsupported connectors: You've made an
>>>>>> excellent point about this being a connector-specific feature.
>>>>>> 
>>>>>> I agree that scan.rate-limit.record-per-second is a great name,
>> and I
>>>>> will
>>>>>> propose it as a best practice. I plan to use it in our initial
>>>> reference
>>>>>> implementation, likely for the Kafka connector, and it can serve
>> as a
>>>>>> reference for other connectors in the future.
>>>>>> You are correct that the decision to support this feature and how
>> to
>>>> name
>>>>>> the option lies with each connector. The framework only provides
>> the
>>>>>> underlying mechanism (the RateLimiter integration). The mechanism
>> for
>>>>>> providing clear error feedback works perfectly with this model: a
>>>>> connector
>>>>>> that implements rate limiting will declare the option in its
>>>>>> DynamicTableFactory. Connectors that don't support it will not
>>> declare
>>>>> the
>>>>>> option. Consequently, Flink’s built-in validation will
>> automatically
>>>>> throw
>>>>>> a ValidationException for an unknown option, providing the desired
>>> user
>>>>>> feedback.
>>>>>> 
>>>>>> Best,
>>>>>> Zexian
>>>>>> 
>>>>>> Shengkai Fang <fskm...@gmail.com> 于2025年7月28日周一 14:57写道:
>>>>>> 
>>>>>>> Hi, Zexian.
>>>>>>> 
>>>>>>> Thanks for your FLIP. I think rate litmit is very important
>> feature
>>>> for
>>>>>> our
>>>>>>> users. But I have some questions about the FLIP:
>>>>>>> 
>>>>>>> 1. How do I determine the input parameter `requestSize` for
>>>>>>> `RateLimiter#acquire`? If the rate limiter indicates there are 2
>>>>>> remaining
>>>>>>> requests that can be emitted, but the `requestSize` is 3, what is
>>> the
>>>>>>> behavior here?
>>>>>>> 2. CheckpointListener also has a method named
>>>> notifyCheckpointAborted,
>>>>> I
>>>>>>> think RateLimiter also needs this. If the checkpoint aborts,
>> please
>>>>> clear
>>>>>>> the status of the rate limiter.
>>>>>>> 3. I think `scan.rate-limit.record-per-second` is better than
>>>>>>> `scan.rate.limit.record-per-second`. It seems only FLIP-27 source
>>>>>> supports
>>>>>>> rate-limit, it's better we can throw an exception to notify users
>>> if
>>>>> the
>>>>>>> source doesn't support this feature.
>>>>>>> 
>>>>>>> Best,
>>>>>>> Shengkai
>>>>>>> 
>>>>>>> Zexian WU <wzx3122351...@gmail.com> 于2025年7月28日周一 11:52写道:
>>>>>>> 
>>>>>>>> Hi Leonard,
>>>>>>>> 
>>>>>>>> Thanks a lot for your support and positive feedback! I'm glad
>> to
>>>> hear
>>>>>> you
>>>>>>>> think the design meets the needs of most scenarios.
>>>>>>>> 
>>>>>>>> Indeed, rate limiting is a fundamental and important feature,
>> and
>>>> I'm
>>>>>>>> excited to help fill this gap.
>>>>>>>> 
>>>>>>>> I'm also looking forward to more ideas and potential
>> improvements
>>>>> from
>>>>>>>> other community members.
>>>>>>>> 
>>>>>>>> Thanks again!
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> Zexian
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On 2025/07/24 12:19:39 Leonard Xu wrote:
>>>>>>>>> Thanks Zexian for driving this work.
>>>>>>>>> 
>>>>>>>>> Rate limiting is a common requirement, TBH, we should have
>>>>> supported
>>>>>> it
>>>>>>>> in earlier stage, and the proposed design integrating it into
>> the
>>>>>> source
>>>>>>>> operator lifecycle, it is already able to meet the vast
>> majority
>>> of
>>>>>>>> scenarios, looks good from my side.
>>>>>>>>> 
>>>>>>>>> Best,
>>>>>>>>> Leonard
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> 2025 7月 18 12:01,Zexian WU <wz...@gmail.com> 写道:
>>>>>>>>>> 
>>>>>>>>>> Hi everyone,
>>>>>>>>>> I would like to start a discussion on a new Flink
>> Improvement
>>>>>>> Proposal
>>>>>>>> (FLIP), FLIP-535: Introduce RateLimiter to Source.
>>>>>>>>>> The full proposal can be found on the wiki:
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-535%3A+Introduce+RateLimiter+to+Source
>>>>>>>>>> Motivation
>>>>>>>>>> 
>>>>>>>>>> In many production environments, Flink sources read from
>>> shared
>>>>>>>> external systems (like Kafka, Pulsar, or databases) where
>>> resources
>>>>> are
>>>>>>>> limited. An uncontrolled data ingestion rate can lead to
>> critical
>>>>>> issues:
>>>>>>>>>> Resource Contention: A high-throughput Flink job can
>>> saturate a
>>>>>>> shared
>>>>>>>> Kafka cluster's bandwidth, starving other essential services.
>>>>>>>>>> System Instability: Aggressive polling from a database
>> (e.g.,
>>>>>> MySQL)
>>>>>>>> can overwhelm its IOPS, degrading performance for transactional
>>>>>> queries.
>>>>>>>>>> This proposal aims to provide a built-in, precise, and
>>>>> easy-to-use
>>>>>>>> rate-limiting mechanism for Flink Sources to ensure system
>>>> stability
>>>>>> and
>>>>>>>> fair resource sharing.
>>>>>>>>>> Proposed Solution
>>>>>>>>>> 
>>>>>>>>>> The core idea is to integrate a flexible, record-based
>>>>>> rate-limiting
>>>>>>>> mechanism directly into the SourceReaderBase, making it
>> available
>>>> to
>>>>>> all
>>>>>>>> connectors built on the new source interface.
>>>>>>>>>> Key Changes:
>>>>>>>>>> Seamless Integration via SourceReaderBase:
>>>>>>>>>> New constructors accepting a RateLimiterStrategy will be
>>> added
>>>>>>>> directly to SourceReaderBase. This allows any connector built
>> on
>>>> the
>>>>>>> modern
>>>>>>>> source interface (like Kafka or Pulsar) to enable rate limiting
>>>> with
>>>>>>>> minimal code changes.
>>>>>>>>>> Accurate, Post-Emission Throttling:
>>>>>>>>>> To support sources with unpredictable batch sizes (e.g.,
>>>> Kafka),
>>>>>> rate
>>>>>>>> limiting is applied after records are emitted. The reader
>> counts
>>>> the
>>>>>>>> records after each recordEmitter.emitRecord method call, counts
>>> the
>>>>>>>> records, and only then consults the RateLimiter. This ensures
>>>>>> throttling
>>>>>>> is
>>>>>>>> based on the precise number of records processed.
>>>>>>>>>> Fully Non-Blocking Operation:
>>>>>>>>>> The entire mechanism is asynchronous and non-blocking. If a
>>>> rate
>>>>>>> limit
>>>>>>>> is hit, the reader pauses by returning
>>> InputStatus.MORE_AVAILABLE.
>>>>> This
>>>>>>>> yields control to the Flink task's event loop without blocking
>>> the
>>>>>>>> processing thread, ensuring that critical operations like
>>>>> checkpointing
>>>>>>> are
>>>>>>>> never delayed.
>>>>>>>>>> Unified Configuration via SQL/Table API:
>>>>>>>>>> Users can configure rate limits consistently across
>> different
>>>>>> sources
>>>>>>>> using a simple SQL table option, such as WITH
>> ('scan.rate.limit'
>>> =
>>>>>>> '1000').
>>>>>>>> This provides a unified and user-friendly experience for
>> pipeline
>>>>>> tuning.
>>>>>>>>>> The design is backward-compatible, and existing custom
>>> sources
>>>>> will
>>>>>>>> continue to work without any modification.
>>>>>>>>>> I believe this feature will be a valuable addition for
>> Flink
>>>>> users
>>>>>>>> operating in complex, multi-tenant environments. I'm looking
>>>> forward
>>>>> to
>>>>>>>> your feedback, suggestions, and any potential concerns you
>> might
>>>>> have.
>>>>>>>>>> Thanks,
>>>>>>>>>> Zexian Wu
>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 

Reply via email to