Hello, Zexian. This is a great work for job's stability when failover or
catching the lag. I just have some questions:

   1. For some source tasks, their consuming splits may be more than
   others. The simple calculation QPS based on parallelism may cause
   imbalance. Can we support a calculation based on the split number as a
   common way?
   2. In our company, we support to config each operator's qps by
   ConfigOption<Map>. This is convenient if we can get the operatorId easily
   before submitting. But in open source, it may be hard to do so. I wonder
   whether there exist similar way so that we do not need to implement each
   connector.

Best,
Jiangang Liu

Zexian WU <wzx3122351...@gmail.com> 于2025年8月4日周一 14:35写道:

> Hi Shengkai,
>
> Thank you for your thoughtful feedback and excellent questions. These are
> all great points. Here are my thoughts:
>
> Regarding the requestSize parameter and its behavior: requestSize is not a
> request for permission but a post-facto count of records that have already
> been emitted. If this count (e.g., 3) exceeds the available rate credit
> (e.g., for 2 records), RateLimiter#acquire will return a CompletionStage
> that completes in the future. This creates a non-blocking pause in the
> SourceReader, allowing it to pay back the "time debt" and ensuring the
> average rate is maintained.
>
> Regarding notifyCheckpointAborted: You are absolutely right; this is a
> critical point for preventing deadlocks. The specific plan is to add a
> notifyCheckpointAborted method to the RateLimiter interface. Then, within
> the SourceReader implementation, I will handle the checkpoint abort
> callback and invoke this new method to reset the rate limiter's internal
> state.
>
> Regarding configuration and unsupported connectors: You've made an
> excellent point about this being a connector-specific feature.
>
> I agree that scan.rate-limit.record-per-second is a great name, and I will
> propose it as a best practice. I plan to use it in our initial reference
> implementation, likely for the Kafka connector, and it can serve as a
> reference for other connectors in the future.
> You are correct that the decision to support this feature and how to name
> the option lies with each connector. The framework only provides the
> underlying mechanism (the RateLimiter integration). The mechanism for
> providing clear error feedback works perfectly with this model: a connector
> that implements rate limiting will declare the option in its
> DynamicTableFactory. Connectors that don't support it will not declare the
> option. Consequently, Flink’s built-in validation will automatically throw
> a ValidationException for an unknown option, providing the desired user
> feedback.
>
> Best,
> Zexian
>
> Shengkai Fang <fskm...@gmail.com> 于2025年7月28日周一 14:57写道:
>
> > Hi, Zexian.
> >
> > Thanks for your FLIP. I think rate litmit is very important feature for
> our
> > users. But I have some questions about the FLIP:
> >
> > 1. How do I determine the input parameter `requestSize` for
> > `RateLimiter#acquire`? If the rate limiter indicates there are 2
> remaining
> > requests that can be emitted, but the `requestSize` is 3, what is the
> > behavior here?
> > 2. CheckpointListener also has a method named notifyCheckpointAborted, I
> > think RateLimiter also needs this. If the checkpoint aborts, please clear
> > the status of the rate limiter.
> > 3. I think `scan.rate-limit.record-per-second` is better than
> > `scan.rate.limit.record-per-second`. It seems only FLIP-27 source
> supports
> > rate-limit, it's better we can throw an exception to notify users if the
> > source doesn't support this feature.
> >
> > Best,
> > Shengkai
> >
> > Zexian WU <wzx3122351...@gmail.com> 于2025年7月28日周一 11:52写道:
> >
> > > Hi Leonard,
> > >
> > > Thanks a lot for your support and positive feedback! I'm glad to hear
> you
> > > think the design meets the needs of most scenarios.
> > >
> > > Indeed, rate limiting is a fundamental and important feature, and I'm
> > > excited to help fill this gap.
> > >
> > > I'm also looking forward to more ideas and potential improvements from
> > > other community members.
> > >
> > > Thanks again!
> > >
> > > Best,
> > > Zexian
> > >
> > >
> > > On 2025/07/24 12:19:39 Leonard Xu wrote:
> > > > Thanks Zexian for driving this work.
> > > >
> > > > Rate limiting is a common requirement, TBH, we should have supported
> it
> > > in earlier stage, and the proposed design integrating it into the
> source
> > > operator lifecycle, it is already able to meet the vast majority of
> > > scenarios, looks good from my side.
> > > >
> > > > Best,
> > > > Leonard
> > > >
> > > >
> > > > > 2025 7月 18 12:01,Zexian WU <wz...@gmail.com> 写道:
> > > > >
> > > > > Hi everyone,
> > > > > I would like to start a discussion on a new Flink Improvement
> > Proposal
> > > (FLIP), FLIP-535: Introduce RateLimiter to Source.
> > > > > The full proposal can be found on the wiki:
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-535%3A+Introduce+RateLimiter+to+Source
> > > > > Motivation
> > > > >
> > > > > In many production environments, Flink sources read from shared
> > > external systems (like Kafka, Pulsar, or databases) where resources are
> > > limited. An uncontrolled data ingestion rate can lead to critical
> issues:
> > > > > Resource Contention: A high-throughput Flink job can saturate a
> > shared
> > > Kafka cluster's bandwidth, starving other essential services.
> > > > > System Instability: Aggressive polling from a database (e.g.,
> MySQL)
> > > can overwhelm its IOPS, degrading performance for transactional
> queries.
> > > > > This proposal aims to provide a built-in, precise, and easy-to-use
> > > rate-limiting mechanism for Flink Sources to ensure system stability
> and
> > > fair resource sharing.
> > > > > Proposed Solution
> > > > >
> > > > > The core idea is to integrate a flexible, record-based
> rate-limiting
> > > mechanism directly into the SourceReaderBase, making it available to
> all
> > > connectors built on the new source interface.
> > > > > Key Changes:
> > > > > Seamless Integration via SourceReaderBase:
> > > > > New constructors accepting a RateLimiterStrategy will be added
> > > directly to SourceReaderBase. This allows any connector built on the
> > modern
> > > source interface (like Kafka or Pulsar) to enable rate limiting with
> > > minimal code changes.
> > > > > Accurate, Post-Emission Throttling:
> > > > > To support sources with unpredictable batch sizes (e.g., Kafka),
> rate
> > > limiting is applied after records are emitted. The reader counts the
> > > records after each recordEmitter.emitRecord method call, counts the
> > > records, and only then consults the RateLimiter. This ensures
> throttling
> > is
> > > based on the precise number of records processed.
> > > > > Fully Non-Blocking Operation:
> > > > > The entire mechanism is asynchronous and non-blocking. If a rate
> > limit
> > > is hit, the reader pauses by returning InputStatus.MORE_AVAILABLE. This
> > > yields control to the Flink task's event loop without blocking the
> > > processing thread, ensuring that critical operations like checkpointing
> > are
> > > never delayed.
> > > > > Unified Configuration via SQL/Table API:
> > > > > Users can configure rate limits consistently across different
> sources
> > > using a simple SQL table option, such as WITH ('scan.rate.limit' =
> > '1000').
> > > This provides a unified and user-friendly experience for pipeline
> tuning.
> > > > > The design is backward-compatible, and existing custom sources will
> > > continue to work without any modification.
> > > > > I believe this feature will be a valuable addition for Flink users
> > > operating in complex, multi-tenant environments. I'm looking forward to
> > > your feedback, suggestions, and any potential concerns you might have.
> > > > > Thanks,
> > > > > Zexian Wu
> > > >
> > > >
> >
>

Reply via email to