I had the same reaction initially as some of the others on this thread --
which is "Use Kafka quotas"..  I agree that in general a service should
protect itself with it's own rate limiting rather than building it into
clients like the FlinkKafkaConsumer.

However, there are a few reasons we need to do this in our company
currently:
  - We can't use Kafka quotas right now because the Kafka vendor we're
using doesn't support them
  - Flink jobs that also make calls to RPC services are frequently DDOS'd
by Flink apps and we simply need to slow them down when processing a
backlog to protect external services.  You could argue those services
should protect themselve, and I agree, but for various technical reasons
that's not possible ATM.
  - If you are going to artificially rate limit a Flink job the best place
to do it is definitely in the source -- otherwise you end up with issues
with backpressure and checkpointing.

So, that said I suspect other users have the same issue so I think it's a
good general feature to add to the Kafka consumer.  It already exists in
the Kinesis consumer as well.

In terms of code bloat -- well the code is dead simple.  It's just adding a
Guava RateLimiter to the poll() loop and it's opt-in.  The code has already
been implemented for this.

@Lakshmi Gururaja Rao <l...@lyft.com>  Can you put up a apache/flink PR for
this since it's already finished internally?

Anyway, I'm not opposed to making the KafkaConsumer a little more
customizable via adding some hooks if that's what others prefer -- however,
let's also make the rate limited KafkaConsumer available in the Flink
project at large rather than keeping it internal at Lyft.  I think it's
generally useful.

-Jamie


On Tue, Jan 29, 2019 at 8:57 PM Thomas Weise <t...@apache.org> wrote:

> It is preferred for the service to rate limit. The problem is that not all
> Kafka setups have that control enabled / support for it.
>
> Even when rate limiting was enabled, it may still be *nice* for the client
> to gracefully handle it.
>
> There was discussion in the past that we should not bloat the Kafka
> consumer further and I agree with that.
>
> On the other hand it would be good if the consumer can be augmented a bit
> to provide hooks for customization (we had done that for the Kinesis
> consumer also).
>
> Thanks,
> Thomas
>
>
> On Mon, Jan 28, 2019 at 3:14 AM Becket Qin <becket....@gmail.com> wrote:
>
> > Hi Lakshmi,
> >
> > As Nagajun mentioned, you might want to configure quota on the Kafka
> broker
> > side for your Flink connector client.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Sat, Jan 26, 2019 at 10:44 AM Ning Shi <nings...@gmail.com> wrote:
> >
> > > > We have a Flink job reading from Kafka (specifically it uses
> > > > FlinkKafkaConsumer011). There are instances when the job is
> processing
> > a
> > > > backlog and it ends up reading at a significantly high throughput and
> > > > degrades the underlying Kafka cluster. If there was a way to rate
> limit
> > > the
> > > > calls to Kafka (by controlling how often the *consumer*.poll() is
> > > called),
> > > > it would be a useful feature for our use case.
> > > >
> > > > Has anyone has run into a similar issue? Are there are any
> > > efforts/thoughts
> > > > on implementing a rate-limiting feature in the Flink Kafka connector?
> > >
> > > We has similar problem and ended up putting a Guava rate limiter
> > > inside the Kafka consumer to limit the consumption rate. Since we use
> > > POJO, this is easily done by putting the rate limiter inside the POJO
> > > deserializer, which runs in the Kafka source.
> > >
> > > This has the benefit of not slowing down checkpoints because the
> > > source doesn't have to do alignment. If you don't care about
> > > checkpoint alignment, you can also add a map function with a Guava
> > > rate limiter immediately after the Kafka source. When it throttles,
> > > back pressure should eventually cause the Kafka source to slowdown
> > > consumption.
> > >
> > > Ning
> > >
> >
>

Reply via email to