Hi everyone,
I would like to start a discussion on a new Flink Improvement Proposal (FLIP), 
FLIP-535: Introduce RateLimiter to Source.
The full proposal can be found on the wiki:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-535%3A+Introduce+RateLimiter+to+Source
Motivation

In many production environments, Flink sources read from shared external 
systems (like Kafka, Pulsar, or databases) where resources are limited. An 
uncontrolled data ingestion rate can lead to critical issues:
Resource Contention: A high-throughput Flink job can saturate a shared Kafka 
cluster's bandwidth, starving other essential services.
System Instability: Aggressive polling from a database (e.g., MySQL) can 
overwhelm its IOPS, degrading performance for transactional queries.
This proposal aims to provide a built-in, precise, and easy-to-use 
rate-limiting mechanism for Flink Sources to ensure system stability and fair 
resource sharing.
Proposed Solution

The core idea is to integrate a flexible, record-based rate-limiting mechanism 
directly into the SourceReaderBase, making it available to all connectors built 
on the new source interface.
Key Changes:
Seamless Integration via SourceReaderBase:
New constructors accepting a RateLimiterStrategy will be added directly to 
SourceReaderBase. This allows any connector built on the modern source 
interface (like Kafka or Pulsar) to enable rate limiting with minimal code 
changes.
Accurate, Post-Emission Throttling:
To support sources with unpredictable batch sizes (e.g., Kafka), rate limiting 
is applied after records are emitted. The reader counts the records after each 
recordEmitter.emitRecord method call, counts the records, and only then 
consults the RateLimiter. This ensures throttling is based on the precise 
number of records processed.
Fully Non-Blocking Operation:
The entire mechanism is asynchronous and non-blocking. If a rate limit is hit, 
the reader pauses by returning InputStatus.MORE_AVAILABLE. This yields control 
to the Flink task's event loop without blocking the processing thread, ensuring 
that critical operations like checkpointing are never delayed.
Unified Configuration via SQL/Table API:
Users can configure rate limits consistently across different sources using a 
simple SQL table option, such as WITH ('scan.rate.limit' = '1000'). This 
provides a unified and user-friendly experience for pipeline tuning.
The design is backward-compatible, and existing custom sources will continue to 
work without any modification.
I believe this feature will be a valuable addition for Flink users operating in 
complex, multi-tenant environments. I'm looking forward to your feedback, 
suggestions, and any potential concerns you might have.
Thanks,
Zexian Wu

Reply via email to