Hi, Sumant,

Thanks for the KIP. Nice documentation on all current issues with the

You also brought up a good use case for timing out a message. For
applications that collect and send sensor data to Kafka, if the data can't
be sent to Kafka for some reason, the application may prefer to buffer the
more recent data in the accumulator. Without a timeout, the accumulator
will be filled with old records and new records can't be added.

Your proposal makes sense for a developer who is familiar with how the
producer works. I am not sure if this is very intuitive to the users since
it may not be very easy for them to figure out how to configure the new
knob to bound the amount of the time when a message is completed.

>From users' perspective, Apurva's suggestion of
max.message.delivery.wait.ms (which
bounds the time when a message is in the accumulator to the time when the
callback is called) seems more intuition. You listed this in the rejected
section since it requires additional logic to rebatch when a produce
request expires. However, this may not be too bad. The following are the
things that we have to do.

1. The clock starts when a batch is created.
2. If the batch can't be drained within max.message.delivery.wait.ms, all
messages in the batch will fail and the callback will be called.
3. When sending a produce request, we calculate an expireTime for the
request that equals to the remaining expiration time for the oldest batch
in the request.
4. We set the minimum of the expireTime of all inflight requests as the
timeout in the selector poll call (so that the selector can wake up before
the expiration time).
5. If the produce response can't be received within expireTime, we expire
all batches in the produce request whose expiration time has been reached.
For the rest of the batches, we resend them in a new produce request.
6. If the producer response has a retriable error, we just backoff a bit
and then retry the produce request as today. The number of retries doesn't
really matter now. We just keep retrying until the expiration time is
reached. It's possible that a produce request is never retried due to
expiration. However, this seems the right thing to do since the users want
to timeout the message at this time.

Implementation wise, there will be a bit more complexity in step 3 and 4,
but probably not too bad. The benefit is that this is more intuitive to the
end user.

Does that sound reasonable to you?



On Wed, Aug 9, 2017 at 10:03 PM, Sumant Tambe <suta...@gmail.com> wrote:

> On Wed, Aug 9, 2017 at 1:28 PM Apurva Mehta <apu...@confluent.io> wrote:
> > > > There seems to be no relationship with cluster metadata availability
> or
> > > > staleness. Expiry is just based on the time since the batch has been
> > > ready.
> > > > Please correct me if I am wrong.
> > > >
> > >
> > > I was not very specific about where we do expiration. I glossed over
> some
> > > details because (again) we've other mechanisms to detect non progress.
> > The
> > > condition (!muted.contains(tp) && (isMetadataStale ||
> > > > cluster.leaderFor(tp) == null)) is used in
> > > RecordAccumualtor.expiredBatches:
> > > https://github.com/apache/kafka/blob/trunk/clients/src/
> > > main/java/org/apache/kafka/clients/producer/internals/
> > > RecordAccumulator.java#L443
> > >
> > >
> > > Effectively, we expire in all the following cases
> > > 1) producer is partitioned from the brokers. When metadata age grows
> > beyond
> > > 3x it's max value. It's safe to say that we're not talking to the
> brokers
> > > at all. Report.
> > > 2) fresh metadata && leader for a partition is not known && a batch is
> > > sitting there for longer than request.timeout.ms. This is one case we
> > > would
> > > like to improve and use batch.expiry.ms because request.timeout.ms is
> > too
> > > small.
> > > 3) fresh metadata && leader for a partition is known && batch is
> sitting
> > > there for longer than batch.expiry.ms. This is a new case that is
> > > different
> > > from #2. This is the catch-up mode case. Things are moving too slowly.
> > > Pipeline SLAs are broken. Report and shutdown kmm.
> > >
> > > The second and the third cases are useful to a real-time app for a
> > > completely different reason. Report, forget about the batch, and just
> > move
> > > on (without shutting down).
> > >
> > >
> > If I understand correctly, you are talking about a fork of apache kafka
> > which has these additional conditions? Because that check doesn't exist
> on
> > trunk today.
> Right. It is our internal release in LinkedIn.
> Or are you proposing to change the behavior of expiry to
> > account for stale metadata and partitioned producers as part of this KIP?
> No. It's our temporary solution in the absence of kip-91. Note that we dont
> like increasing request.timeout.ms. Without our extra conditions our
> batches expire too soon--a problem in kmm catchup mode.
> If we get batch.expiry.ms, we will configure it to 20 mins. maybeExpire
> will use the config instead of r.t.ms. The extra conditions will be
> unnecessary. All three cases shall be covered via the batch.expiry timeout.
> >
> >

Reply via email to