Hi, Zexian.

Thanks for your FLIP. I think rate litmit is very important feature for our
users. But I have some questions about the FLIP:

1. How do I determine the input parameter `requestSize` for
`RateLimiter#acquire`? If the rate limiter indicates there are 2 remaining
requests that can be emitted, but the `requestSize` is 3, what is the
behavior here?
2. CheckpointListener also has a method named notifyCheckpointAborted, I
think RateLimiter also needs this. If the checkpoint aborts, please clear
the status of the rate limiter.
3. I think `scan.rate-limit.record-per-second` is better than
`scan.rate.limit.record-per-second`. It seems only FLIP-27 source supports
rate-limit, it's better we can throw an exception to notify users if the
source doesn't support this feature.

Best,
Shengkai

Zexian WU <wzx3122351...@gmail.com> 于2025年7月28日周一 11:52写道:

> Hi Leonard,
>
> Thanks a lot for your support and positive feedback! I'm glad to hear you
> think the design meets the needs of most scenarios.
>
> Indeed, rate limiting is a fundamental and important feature, and I'm
> excited to help fill this gap.
>
> I'm also looking forward to more ideas and potential improvements from
> other community members.
>
> Thanks again!
>
> Best,
> Zexian
>
>
> On 2025/07/24 12:19:39 Leonard Xu wrote:
> > Thanks Zexian for driving this work.
> >
> > Rate limiting is a common requirement, TBH, we should have supported it
> in earlier stage, and the proposed design integrating it into the source
> operator lifecycle, it is already able to meet the vast majority of
> scenarios, looks good from my side.
> >
> > Best,
> > Leonard
> >
> >
> > > 2025 7月 18 12:01,Zexian WU <wz...@gmail.com> 写道:
> > >
> > > Hi everyone,
> > > I would like to start a discussion on a new Flink Improvement Proposal
> (FLIP), FLIP-535: Introduce RateLimiter to Source.
> > > The full proposal can be found on the wiki:
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-535%3A+Introduce+RateLimiter+to+Source
> > > Motivation
> > >
> > > In many production environments, Flink sources read from shared
> external systems (like Kafka, Pulsar, or databases) where resources are
> limited. An uncontrolled data ingestion rate can lead to critical issues:
> > > Resource Contention: A high-throughput Flink job can saturate a shared
> Kafka cluster's bandwidth, starving other essential services.
> > > System Instability: Aggressive polling from a database (e.g., MySQL)
> can overwhelm its IOPS, degrading performance for transactional queries.
> > > This proposal aims to provide a built-in, precise, and easy-to-use
> rate-limiting mechanism for Flink Sources to ensure system stability and
> fair resource sharing.
> > > Proposed Solution
> > >
> > > The core idea is to integrate a flexible, record-based rate-limiting
> mechanism directly into the SourceReaderBase, making it available to all
> connectors built on the new source interface.
> > > Key Changes:
> > > Seamless Integration via SourceReaderBase:
> > > New constructors accepting a RateLimiterStrategy will be added
> directly to SourceReaderBase. This allows any connector built on the modern
> source interface (like Kafka or Pulsar) to enable rate limiting with
> minimal code changes.
> > > Accurate, Post-Emission Throttling:
> > > To support sources with unpredictable batch sizes (e.g., Kafka), rate
> limiting is applied after records are emitted. The reader counts the
> records after each recordEmitter.emitRecord method call, counts the
> records, and only then consults the RateLimiter. This ensures throttling is
> based on the precise number of records processed.
> > > Fully Non-Blocking Operation:
> > > The entire mechanism is asynchronous and non-blocking. If a rate limit
> is hit, the reader pauses by returning InputStatus.MORE_AVAILABLE. This
> yields control to the Flink task's event loop without blocking the
> processing thread, ensuring that critical operations like checkpointing are
> never delayed.
> > > Unified Configuration via SQL/Table API:
> > > Users can configure rate limits consistently across different sources
> using a simple SQL table option, such as WITH ('scan.rate.limit' = '1000').
> This provides a unified and user-friendly experience for pipeline tuning.
> > > The design is backward-compatible, and existing custom sources will
> continue to work without any modification.
> > > I believe this feature will be a valuable addition for Flink users
> operating in complex, multi-tenant environments. I'm looking forward to
> your feedback, suggestions, and any potential concerns you might have.
> > > Thanks,
> > > Zexian Wu
> >
> >

Reply via email to