In general max.message.delivery.wait.ms is a cleaner approach. That would
make the guarantee clearer. That said, there seem subtleties in some
scenarios:

1. I agree with Sumante that it is a little weird that a message could be
expired immediately if it happens to enter a batch that is about to be
expired. But as Jun said, as long as we have multiple messages in a batch,
there isn't a cheap way to achieve a precise timeout. So the question
actually becomes whether it is more user-friendly to expire early (based on
the batch creation time) or expire late (based on the batch close time). I
think both are acceptable. Personally I think most users do not really care
about expire a little late as long as it eventually expires. So I would use
batch close time as long as there is a bound on that. But it looks that we
do not really have a bound on when we will close a batch. So expiration
based on batch create time may be the only option if we don't want to
introduce complexity.

2. If we timeout a batch in a request when it is still in flight, the end
result of that batch is unclear to the users. It would be weird that user
receive exception saying those messages are expired while they actually
have been sent successfully. Also if idempotence is set to true, what would
the next sequence ID be after the expired batch? Reusing the same sequence
Id may result in data loss, and increment the sequence ID may cause
OutOfOrderSequenceException. Besides, extracting an expired batch from a
request also introduces some complexity. Again, personally I think it is
fine to expire a little bit late. So maybe we don't need to expire a batch
that is already in flight. In the worst case we will expire it with delay
of request.timeout.ms.

Thanks,

Jiangjie (Becket) Qin



On Tue, Aug 22, 2017 at 3:08 AM, Ismael Juma <ism...@juma.me.uk> wrote:

> Hi all,
>
> The discussion has been going on for a while, would it help to have a call
> to discuss this? I'd like to start a vote soonish so that we can include
> this in 1.0.0. I personally prefer max.message.delivery.wait.ms. It seems
> like Jun, Apurva and Jason also prefer that. Sumant, it seems like you
> still prefer a batch.expiry.ms, is that right? What are your thoughts
> Joel and Becket?
>
> Ismael
>
> On Wed, Aug 16, 2017 at 6:34 PM, Jun Rao <j...@confluent.io> wrote:
>
>> Hi, Sumant,
>>
>> The semantics of linger.ms is a bit subtle. The reasoning for the current
>> implementation is the following. Let's say one sets linger.ms to 0 (our
>> current default value). Creating a batch for every message will be bad for
>> throughput. Instead, the current implementation only forms a batch when
>> the
>> batch is sendable (i.e., broker is available, inflight request limit is
>> not
>> exceeded, etc). That way, the producer has more chance for batching. The
>> implication is that a batch could be closed longer than linger.ms.
>>
>> Now, on your concern about not having a precise way to control delay in
>> the
>> accumulator. It seems the batch.expiry.ms approach will have the same
>> issue. If you start the clock when a batch is initialized, you may expire
>> some messages in the same batch early than batch.expiry.ms. If you start
>> the clock when the batch is closed, the expiration time could be unbounded
>> because of the linger.ms implementation described above. Starting the
>> expiration clock on batch initialization will at least guarantee the time
>> to expire the first message is precise, which is probably good enough.
>>
>> Thanks,
>>
>> Jun
>>
>>
>>
>> On Tue, Aug 15, 2017 at 3:46 PM, Sumant Tambe <suta...@gmail.com> wrote:
>>
>> > Question about "the closing of a batch can be delayed longer than
>> > linger.ms":
>> > Is it possible to cause an indefinite delay? At some point bytes limit
>> > might kick in. Also, why is closing of a batch coupled with
>> availability of
>> > its destination? In this approach a batch chosen for eviction due to
>> delay
>> > needs to "close" anyway, right (without regards to destination
>> > availability)?
>> >
>> > I'm not too worried about notifying at super-exact time specified in the
>> > configs. But expiring before the full wait-span has elapsed sounds a
>> little
>> > weird. So expiration time has a +/- spread. It works more like a hint
>> than
>> > max. So why not message.delivery.wait.hint.ms?
>> >
>> > Yeah, cancellable future will be similar in complexity.
>> >
>> > I'm unsure if max.message.delivery.wait.ms will the final nail for
>> > producer
>> > timeouts. We still won't have a precise way to control delay in just the
>> > accumulator segment. batch.expiry.ms does not try to abstract. It's
>> very
>> > specific.
>> >
>> > My biggest concern at the moment is implementation complexity.
>> >
>> > At this state, I would like to encourage other independent opinions.
>> >
>> > Regards,
>> > Sumant
>> >
>> > On 11 August 2017 at 17:35, Jun Rao <j...@confluent.io> wrote:
>> >
>> > > Hi, Sumant,
>> > >
>> > > 1. Yes, it's probably reasonable to require
>> max.message.delivery.wait.ms
>> > >
>> > > linger.ms. As for retries, perhaps we can set the default retries to
>> > > infinite or just ignore it. Then the latency will be bounded by
>> > > max.message.delivery.wait.ms. request.timeout.ms is the max time the
>> > > request will be spending on the server. The client can expire an
>> inflight
>> > > request early if needed.
>> > >
>> > > 2. Well, since max.message.delivery.wait.ms specifies the max,
>> calling
>> > the
>> > > callback a bit early may be ok? Note that
>> max.message.delivery.wait.ms
>> > > only
>> > > comes into play in the rare error case. So, I am not sure if we need
>> to
>> > be
>> > > very precise. The issue with starting the clock on closing a batch is
>> > that
>> > > currently if the leader is not available, the closing of a batch can
>> be
>> > > delayed longer than linger.ms.
>> > >
>> > > 4. As you said, future.get(timeout) itself doesn't solve the problem
>> > since
>> > > you still need a way to expire the record in the sender. The amount of
>> > work
>> > > to implement a cancellable future is probably the same?
>> > >
>> > > Overall, my concern with patch work is that we have iterated on the
>> > produce
>> > > request timeout multiple times and new issues keep coming back.
>> Ideally,
>> > > this time, we want to have a solution that covers all cases, even
>> though
>> > > that requires a bit more work.
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > >
>> > > On Fri, Aug 11, 2017 at 12:30 PM, Sumant Tambe <suta...@gmail.com>
>> > wrote:
>> > >
>> > > > Hi Jun,
>> > > >
>> > > > Thanks for looking into it.
>> > > >
>> > > > Yes, we did consider this message-level timeout approach and
>> expiring
>> > > > batches selectively in a request but rejected it due to the reasons
>> of
>> > > > added complexity without a strong benefit to counter-weigh that.
>> Your
>> > > > proposal is a slight variation so I'll mention some issues here.
>> > > >
>> > > > 1. It sounds like max.message.delivery.wait.ms will overlap with
>> "time
>> > > > segments" of both linger.ms and retries * (request.timeout.ms +
>> > > > retry.backoff.ms). In that case, which config set takes
>> precedence? It
>> > > > would not make sense to configure configs from both sets.
>> Especially,
>> > we
>> > > > discussed exhaustively internally that retries and
>> > > > max.message.delivery.wait.ms can't / shouldn't be configured
>> together.
>> > > > Retires become moot as you already mention. I think that's going to
>> be
>> > > > surprising to anyone wanting to use max.message.delivery.wait.ms.
>> We
>> > > > probably need max.message.delivery.wait.ms > linger.ms or something
>> > like
>> > > > that.
>> > > >
>> > > > 2. If clock starts when a batch is created and expire when
>> > > > max.message.delivery.wait.ms is over in the accumulator, the last
>> few
>> > > > messages in the expiring batch may not have lived long enough. As
>> the
>> > > > config seems to suggests per-message timeout, it's incorrect to
>> expire
>> > > > messages prematurely. On the other hand if clock starts after batch
>> is
>> > > > closed (which also implies that linger.ms is not covered by the
>> > > > max.message.delivery.wait.ms config), no message would be be
>> expired
>> > too
>> > > > soon. Yeah, expiration may be little bit too late but hey, this
>> ain't
>> > > > real-time service.
>> > > >
>> > > > 3. I agree that steps #3, #4, (and #5) are complex to implement. On
>> the
>> > > > other hand, batch.expiry.ms is next to trivial to implement. We
>> just
>> > > pass
>> > > > the config all the way down to ProducerBatch.maybeExpire and be done
>> > with
>> > > > it.
>> > > >
>> > > > 4. Do you think the effect of max.message.delivery.wait.ms can be
>> > > > simulated
>> > > > with future.get(timeout) method? Copying excerpt from the kip-91: An
>> > > > end-to-end timeout may be partially emulated using the
>> > > future.get(timeout).
>> > > > The timeout must be greater than (batch.expiry.ms + nRetries * (
>> > > > request.timeout.ms + retry.backoff.ms)). Note that when future
>> times
>> > > out,
>> > > > Sender may continue to send the records in the background. To avoid
>> > that,
>> > > > implementing a cancellable future is a possibility.
>> > > >
>> > > > For simplicity, we could just implement a trivial method in producer
>> > > > ProducerConfigs.maxMessageDeliveryWaitMs() and return a number
>> based
>> > on
>> > > > this formula? Users of future.get can use this timeout value.
>> > > >
>> > > > Thoughts?
>> > > >
>> > > > Regards,
>> > > > Sumant
>> > > >
>> > > >
>> > > >
>> > > > On 11 August 2017 at 07:50, Sumant Tambe <suta...@gmail.com> wrote:
>> > > >
>> > > > >
>> > > > > Thanks for the KIP. Nice documentation on all current issues with
>> the
>> > > > >> timeout.
>> > > > >
>> > > > > For the KIP writeup, all credit goes to Joel Koshy.
>> > > > >
>> > > > > I'll follow up on your comments a little later.
>> > > > >
>> > > > >
>> > > > >>
>> > > > >> You also brought up a good use case for timing out a message. For
>> > > > >> applications that collect and send sensor data to Kafka, if the
>> data
>> > > > can't
>> > > > >> be sent to Kafka for some reason, the application may prefer to
>> > buffer
>> > > > the
>> > > > >> more recent data in the accumulator. Without a timeout, the
>> > > accumulator
>> > > > >> will be filled with old records and new records can't be added.
>> > > > >>
>> > > > >> Your proposal makes sense for a developer who is familiar with
>> how
>> > the
>> > > > >> producer works. I am not sure if this is very intuitive to the
>> users
>> > > > since
>> > > > >> it may not be very easy for them to figure out how to configure
>> the
>> > > new
>> > > > >> knob to bound the amount of the time when a message is completed.
>> > > > >>
>> > > > >> From users' perspective, Apurva's suggestion of
>> > > > >> max.message.delivery.wait.ms (which
>> > > > >> bounds the time when a message is in the accumulator to the time
>> > when
>> > > > the
>> > > > >> callback is called) seems more intuition. You listed this in the
>> > > > rejected
>> > > > >> section since it requires additional logic to rebatch when a
>> produce
>> > > > >> request expires. However, this may not be too bad. The following
>> are
>> > > the
>> > > > >> things that we have to do.
>> > > > >>
>> > > > >> 1. The clock starts when a batch is created.
>> > > > >> 2. If the batch can't be drained within
>> > max.message.delivery.wait.ms,
>> > > > all
>> > > > >> messages in the batch will fail and the callback will be called.
>> > > > >> 3. When sending a produce request, we calculate an expireTime for
>> > the
>> > > > >> request that equals to the remaining expiration time for the
>> oldest
>> > > > batch
>> > > > >> in the request.
>> > > > >> 4. We set the minimum of the expireTime of all inflight requests
>> as
>> > > the
>> > > > >> timeout in the selector poll call (so that the selector can wake
>> up
>> > > > before
>> > > > >> the expiration time).
>> > > > >> 5. If the produce response can't be received within expireTime,
>> we
>> > > > expire
>> > > > >> all batches in the produce request whose expiration time has been
>> > > > reached.
>> > > > >> For the rest of the batches, we resend them in a new produce
>> > request.
>> > > > >> 6. If the producer response has a retriable error, we just
>> backoff a
>> > > bit
>> > > > >> and then retry the produce request as today. The number of
>> retries
>> > > > doesn't
>> > > > >> really matter now. We just keep retrying until the expiration
>> time
>> > is
>> > > > >> reached. It's possible that a produce request is never retried
>> due
>> > to
>> > > > >> expiration. However, this seems the right thing to do since the
>> > users
>> > > > want
>> > > > >> to timeout the message at this time.
>> > > > >>
>> > > > >> Implementation wise, there will be a bit more complexity in step
>> 3
>> > and
>> > > > 4,
>> > > > >> but probably not too bad. The benefit is that this is more
>> intuitive
>> > > to
>> > > > >> the
>> > > > >> end user.
>> > > > >>
>> > > > >> Does that sound reasonable to you?
>> > > > >>
>> > > > >> Thanks,
>> > > > >>
>> > > > >> Jun
>> > > > >>
>> > > > >>
>> > > > >> On Wed, Aug 9, 2017 at 10:03 PM, Sumant Tambe <suta...@gmail.com
>> >
>> > > > wrote:
>> > > > >>
>> > > > >> > On Wed, Aug 9, 2017 at 1:28 PM Apurva Mehta <
>> apu...@confluent.io>
>> > > > >> wrote:
>> > > > >> >
>> > > > >> > > > > There seems to be no relationship with cluster metadata
>> > > > >> availability
>> > > > >> > or
>> > > > >> > > > > staleness. Expiry is just based on the time since the
>> batch
>> > > has
>> > > > >> been
>> > > > >> > > > ready.
>> > > > >> > > > > Please correct me if I am wrong.
>> > > > >> > > > >
>> > > > >> > > >
>> > > > >> > > > I was not very specific about where we do expiration. I
>> > glossed
>> > > > over
>> > > > >> > some
>> > > > >> > > > details because (again) we've other mechanisms to detect
>> non
>> > > > >> progress.
>> > > > >> > > The
>> > > > >> > > > condition (!muted.contains(tp) && (isMetadataStale ||
>> > > > >> > > > > cluster.leaderFor(tp) == null)) is used in
>> > > > >> > > > RecordAccumualtor.expiredBatches:
>> > > > >> > > > https://github.com/apache/kafka/blob/trunk/clients/src/
>> > > > >> > > > main/java/org/apache/kafka/clients/producer/internals/
>> > > > >> > > > RecordAccumulator.java#L443
>> > > > >> > > >
>> > > > >> > > >
>> > > > >> > > > Effectively, we expire in all the following cases
>> > > > >> > > > 1) producer is partitioned from the brokers. When metadata
>> age
>> > > > grows
>> > > > >> > > beyond
>> > > > >> > > > 3x it's max value. It's safe to say that we're not talking
>> to
>> > > the
>> > > > >> > brokers
>> > > > >> > > > at all. Report.
>> > > > >> > > > 2) fresh metadata && leader for a partition is not known
>> && a
>> > > > batch
>> > > > >> is
>> > > > >> > > > sitting there for longer than request.timeout.ms. This is
>> one
>> > > > case
>> > > > >> we
>> > > > >> > > > would
>> > > > >> > > > like to improve and use batch.expiry.ms because
>> > > > request.timeout.ms
>> > > > >> is
>> > > > >> > > too
>> > > > >> > > > small.
>> > > > >> > > > 3) fresh metadata && leader for a partition is known &&
>> batch
>> > is
>> > > > >> > sitting
>> > > > >> > > > there for longer than batch.expiry.ms. This is a new case
>> > that
>> > > is
>> > > > >> > > > different
>> > > > >> > > > from #2. This is the catch-up mode case. Things are moving
>> too
>> > > > >> slowly.
>> > > > >> > > > Pipeline SLAs are broken. Report and shutdown kmm.
>> > > > >> > > >
>> > > > >> > > > The second and the third cases are useful to a real-time
>> app
>> > > for a
>> > > > >> > > > completely different reason. Report, forget about the
>> batch,
>> > and
>> > > > >> just
>> > > > >> > > move
>> > > > >> > > > on (without shutting down).
>> > > > >> > > >
>> > > > >> > > >
>> > > > >> > > If I understand correctly, you are talking about a fork of
>> > apache
>> > > > >> kafka
>> > > > >> > > which has these additional conditions? Because that check
>> > doesn't
>> > > > >> exist
>> > > > >> > on
>> > > > >> > > trunk today.
>> > > > >> >
>> > > > >> > Right. It is our internal release in LinkedIn.
>> > > > >> >
>> > > > >> > Or are you proposing to change the behavior of expiry to
>> > > > >> > > account for stale metadata and partitioned producers as part
>> of
>> > > this
>> > > > >> KIP?
>> > > > >> >
>> > > > >> >
>> > > > >> > No. It's our temporary solution in the absence of kip-91. Note
>> > that
>> > > we
>> > > > >> dont
>> > > > >> > like increasing request.timeout.ms. Without our extra
>> conditions
>> > > our
>> > > > >> > batches expire too soon--a problem in kmm catchup mode.
>> > > > >> >
>> > > > >> > If we get batch.expiry.ms, we will configure it to 20 mins.
>> > > > maybeExpire
>> > > > >> > will use the config instead of r.t.ms. The extra conditions
>> will
>> > be
>> > > > >> > unnecessary. All three cases shall be covered via the
>> batch.expiry
>> > > > >> timeout.
>> > > > >> >
>> > > > >> > >
>> > > > >> > >
>> > > > >> >
>> > > > >>
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to