I had the same reaction initially as some of the others on this thread -- which is "Use Kafka quotas".. I agree that in general a service should protect itself with it's own rate limiting rather than building it into clients like the FlinkKafkaConsumer.
However, there are a few reasons we need to do this in our company currently: - We can't use Kafka quotas right now because the Kafka vendor we're using doesn't support them - Flink jobs that also make calls to RPC services are frequently DDOS'd by Flink apps and we simply need to slow them down when processing a backlog to protect external services. You could argue those services should protect themselve, and I agree, but for various technical reasons that's not possible ATM. - If you are going to artificially rate limit a Flink job the best place to do it is definitely in the source -- otherwise you end up with issues with backpressure and checkpointing. So, that said I suspect other users have the same issue so I think it's a good general feature to add to the Kafka consumer. It already exists in the Kinesis consumer as well. In terms of code bloat -- well the code is dead simple. It's just adding a Guava RateLimiter to the poll() loop and it's opt-in. The code has already been implemented for this. @Lakshmi Gururaja Rao <l...@lyft.com> Can you put up a apache/flink PR for this since it's already finished internally? Anyway, I'm not opposed to making the KafkaConsumer a little more customizable via adding some hooks if that's what others prefer -- however, let's also make the rate limited KafkaConsumer available in the Flink project at large rather than keeping it internal at Lyft. I think it's generally useful. -Jamie On Tue, Jan 29, 2019 at 8:57 PM Thomas Weise <t...@apache.org> wrote: > It is preferred for the service to rate limit. The problem is that not all > Kafka setups have that control enabled / support for it. > > Even when rate limiting was enabled, it may still be *nice* for the client > to gracefully handle it. > > There was discussion in the past that we should not bloat the Kafka > consumer further and I agree with that. > > On the other hand it would be good if the consumer can be augmented a bit > to provide hooks for customization (we had done that for the Kinesis > consumer also). > > Thanks, > Thomas > > > On Mon, Jan 28, 2019 at 3:14 AM Becket Qin <becket....@gmail.com> wrote: > > > Hi Lakshmi, > > > > As Nagajun mentioned, you might want to configure quota on the Kafka > broker > > side for your Flink connector client. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Sat, Jan 26, 2019 at 10:44 AM Ning Shi <nings...@gmail.com> wrote: > > > > > > We have a Flink job reading from Kafka (specifically it uses > > > > FlinkKafkaConsumer011). There are instances when the job is > processing > > a > > > > backlog and it ends up reading at a significantly high throughput and > > > > degrades the underlying Kafka cluster. If there was a way to rate > limit > > > the > > > > calls to Kafka (by controlling how often the *consumer*.poll() is > > > called), > > > > it would be a useful feature for our use case. > > > > > > > > Has anyone has run into a similar issue? Are there are any > > > efforts/thoughts > > > > on implementing a rate-limiting feature in the Flink Kafka connector? > > > > > > We has similar problem and ended up putting a Guava rate limiter > > > inside the Kafka consumer to limit the consumption rate. Since we use > > > POJO, this is easily done by putting the rate limiter inside the POJO > > > deserializer, which runs in the Kafka source. > > > > > > This has the benefit of not slowing down checkpoints because the > > > source doesn't have to do alignment. If you don't care about > > > checkpoint alignment, you can also add a map function with a Guava > > > rate limiter immediately after the Kafka source. When it throttles, > > > back pressure should eventually cause the Kafka source to slowdown > > > consumption. > > > > > > Ning > > > > > >