> We have a Flink job reading from Kafka (specifically it uses > FlinkKafkaConsumer011). There are instances when the job is processing a > backlog and it ends up reading at a significantly high throughput and > degrades the underlying Kafka cluster. If there was a way to rate limit the > calls to Kafka (by controlling how often the *consumer*.poll() is called), > it would be a useful feature for our use case. > > Has anyone has run into a similar issue? Are there are any efforts/thoughts > on implementing a rate-limiting feature in the Flink Kafka connector?
We has similar problem and ended up putting a Guava rate limiter inside the Kafka consumer to limit the consumption rate. Since we use POJO, this is easily done by putting the rate limiter inside the POJO deserializer, which runs in the Kafka source. This has the benefit of not slowing down checkpoints because the source doesn't have to do alignment. If you don't care about checkpoint alignment, you can also add a map function with a Guava rate limiter immediately after the Kafka source. When it throttles, back pressure should eventually cause the Kafka source to slowdown consumption. Ning