Hey Tom, thanks for taking a look.

> It's a bit weird that there's a separate start(Time) method

Good call, I think we can use a second constructor instead.

> No metrics for batch rates?

Good call. TBH I assumed there would already be put/poll rates, but looking
again I don't see them. Will add to the KIP.

> I think it might be nicer to have a consistent configuration mechanism

I had previously implemented this as you propose (same as SMTs), but found
it to be a little heavy for the common use-cases. I didn't like how users
needed to specify the classnames in order to use the built-in rate limiters.

But thinking again about this, if we include default values for
rate.limiters, rate.limiter.record.type, and rate.limiter.batch.type, we'd
get the same effect. Namely, most users would just need to
specify rate.limiter.record.limit or rate.limiter.batch.limit.

So I think you're right -- the common use-cases don't necessarily suffer,
and custom rate limiters would definitely benefit. I'll fix.

> hard.rate.limiters [..vs..] rate.limiters

I think the difference may be immaterial. As implemented currently,
RecordRateLimiter and RecordBatchRateLimiter are very "soft" in that they
don't define a window of time in which a max number of records or batches
can be processed. Instead, they just tap the breaks when the instantaneous
rate is observed to be too high. But a "hard" rate limiter could be
implemented with the same interface, e.g. by sleeping until the end of the
current window.

Ryanne

On Fri, May 21, 2021 at 7:10 AM Tom Bentley <tbent...@redhat.com> wrote:

> Hi Ryanne,
>
> Thanks for the KIP. I can see this would be useful.
>
> 1. Can you elaborate on the life cycle of the RateLimiter interface (in the
> Javadoc)? In particular it's not clear to me how calls to accumulate() and
> throttleTime() can be interleaved (I assume arbitrarily).
>
> 2. It's a bit weird that there's a separate start(Time) method in addition
> to the configure() inherited from Configurable. Perhaps passing the Time to
> accumulate() would be simpler than needing a two stage configuration step,
> even if it would be the same instance on every call. If start() really is
> needed you should document that it's called after configure().
>
> 3. Maybe including the unit in the method name, i.e. throttleTimeMs(), to
> avoid any ambiguity about how the result is interpreted?
>
> 4. The metrics: Are they windowed over some time period, if so, what?
>
> 5. No metrics for batch rates?
>
> 6. It doesn't seem to be stated, but I assume the throttle time used is the
> maximum of the throttleTime() returned by all the limiters.
>
> 7. The configuration uses a different mechanism than for SMTs and also
> requires to add three common configs (with a risk of collision with any
> connector which already defines configs with these names). I think it might
> be nicer to have a consistent configuration mechanism, so for example
>   rate.limiters=record,batch
>   rate.limiter.record.type=RecordRateLimiter
>   rate.limiter.record.limit=123
>   rate.limiter.batch.type=RecordBatchRateLimiter
>   rate.limiter.batch.limit=456
> This means there's only a single new common config, as the others depend on
> the aliases used, so further collisions can be avoided.
>
> 8. A cluster where every connector has a quota could end up being
> underutilised, yet a subset of connectors could be running at their limit.
> While this makes sense for the firehose problem it seems to be problematic
> for the noisy neighbour case, where the spare capacity could be shared
> between all the throttled tasks on the worker. While I'm not suggesting you
> need to implement this as part of the KIP, maybe the API could accommodate
> it being added later. Perhaps this could be as simple as using
> hard.rate.limiters rather than just rate.limiters, so that
> soft.rate.limiters could be added later, though maybe there are use cases
> where a single limiter needs to supply both soft and hard limits.
>
> Thanks again,
>
> Tom
>
> On Fri, May 14, 2021 at 6:26 PM Ryanne Dolan <ryannedo...@gmail.com>
> wrote:
>
> > Hey y'all, I've expanded the scope of this KIP slightly to include a
> > pluggable interface, RateLimiter.
> >
> > After implementing this a few different ways, it's clear that the
> > configuration story is actually simpler with a pluggable model.
> > Out-of-the-box, we have just two configuration properties to tweak:
> > record.rate.limit and record.batch.rate.limit (subj to change ofc). These
> > are provided by built-in RecordRateLimiter and RecordBatchRateLimiter
> > impls.
> >
> > From there, additional custom RateLimiters can be enabled with whatever
> > configuration they need. This is essentially the same pattern taken with
> > MetricsReporters and others.
> >
> > I had originally envisioned that the set of built-in limits would expand
> > over time, eg individual put/poll/commit/flush limits. However, these can
> > all be throttled adequately with the proposed API by limiting overall
> > record and batch thruput.
> >
> > Please let me know what you think. The voting thread is open.
> >
> > Ryanne
> >
> > On Fri, Apr 9, 2021, 1:41 PM Ryanne Dolan <ryannedo...@gmail.com> wrote:
> >
> > > Hey y'all, I'd like to draw you attention to a new KIP:
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-731%3A+Record+Rate+Limiting+for+Kafka+Connect
> > >
> > > Lemme know what you think. Thanks!
> > >
> > > Ryanne
> > >
> >
>

Reply via email to