Hi everyone,
I would like to start a discussion on a new Flink Improvement Proposal (FLIP),
FLIP-535: Introduce RateLimiter to Source.
The full proposal can be found on the wiki:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-535%3A+Introduce+RateLimiter+to+Source
Motivation
In many production environments, Flink sources read from shared external
systems (like Kafka, Pulsar, or databases) where resources are limited. An
uncontrolled data ingestion rate can lead to critical issues:
Resource Contention: A high-throughput Flink job can saturate a shared Kafka
cluster's bandwidth, starving other essential services.
System Instability: Aggressive polling from a database (e.g., MySQL) can
overwhelm its IOPS, degrading performance for transactional queries.
This proposal aims to provide a built-in, precise, and easy-to-use
rate-limiting mechanism for Flink Sources to ensure system stability and fair
resource sharing.
Proposed Solution
The core idea is to integrate a flexible, record-based rate-limiting mechanism
directly into the SourceReaderBase, making it available to all connectors built
on the new source interface.
Key Changes:
Seamless Integration via SourceReaderBase:
New constructors accepting a RateLimiterStrategy will be added directly to
SourceReaderBase. This allows any connector built on the modern source
interface (like Kafka or Pulsar) to enable rate limiting with minimal code
changes.
Accurate, Post-Emission Throttling:
To support sources with unpredictable batch sizes (e.g., Kafka), rate limiting
is applied after records are emitted. The reader counts the records after each
recordEmitter.emitRecord method call, counts the records, and only then
consults the RateLimiter. This ensures throttling is based on the precise
number of records processed.
Fully Non-Blocking Operation:
The entire mechanism is asynchronous and non-blocking. If a rate limit is hit,
the reader pauses by returning InputStatus.MORE_AVAILABLE. This yields control
to the Flink task's event loop without blocking the processing thread, ensuring
that critical operations like checkpointing are never delayed.
Unified Configuration via SQL/Table API:
Users can configure rate limits consistently across different sources using a
simple SQL table option, such as WITH ('scan.rate.limit' = '1000'). This
provides a unified and user-friendly experience for pipeline tuning.
The design is backward-compatible, and existing custom sources will continue to
work without any modification.
I believe this feature will be a valuable addition for Flink users operating in
complex, multi-tenant environments. I'm looking forward to your feedback,
suggestions, and any potential concerns you might have.
Thanks,
Zexian Wu