Responses inline.

> > However, one thing which has not come out of the JIRA discussion is the
> > > actual use cases for batch expiry.
> >
> > There are two usecases I can think of for batch expiry mechanism
> > irrespective of how we try to bound the time (batch.expiry.ms or
> > max.message.delivery.wait.ms). Let's call it X.
> >
> > 1. A real-time app (e.g., periodic healthcheck producer, temperature
> sensor
> > producer) has a soft upper bound on both message delivery and failure
> > notification of message delivery. In both cases, it wants to know. Such
> an
> > app does not close the producer on the first error reported (due to batch
> > expiry) because there's data lined up right behind. It's ok to lose a few
> > samples of temperature measurement (IoT scenario). So it simply drops it
> > and moves on. May be when drop rate is like 70% it would close it. Such
> an
> > app may use acks=0. In this case, X will have some value in single digit
> > minutes. But X=MAX_LONG is not suitable.
> >
>
> I guess my question is: batches would only start expiring if partition(s)
> are unavailable. What would 'moving on' mean for a real time app in this
> case? How would that help?
>
Partition unavailability is one of the cases. The other one is kmm "catch
up" mode where producer's outbound throughput is lower than consumer's
inbound tput. This can happen when kmm consumers consume locally but the
producer produces remotely. In such a case, the pipeline latency SLA starts
to break. Batches remain in the accumulator for too long. But there's no
unavailable partition. Because it's kmm, it can't just give up on the
batches. It will shut itself down and alerts will fire.

By "moving on", I mean fire and forget. For a real-time app, in both cases
(a partition is unavailable, or simply just outbound tput is low), it's
important to deliver newer data rather than trying hard to delivery old,
stale data.

>> After expiring one set of batches, the next set
>> would also be expired until the cluster comes back. So what is achieved
by
>> expiring the batches at all?
Yes, expiring is important. Even next series of batches should be expired
if they have crossed batch.expiry.ms. That means, the data is stale and
it's a hint to the middleware that don't bother with this stale data. Try
to use the resources on newer data further back in the queue. An example of
such an app would be healthcheck sensor thread that publishes
application-specific metrics periodically. Correct answer too late is a
wrong answer.


>
> >
> > 2. Today we run KMM in Linkedin as if batch.expiry==MAX_LONG. We expire
> > under the condition: (!muted.contains(tp) && (isMetadataStale ||
> > cluster.leaderFor(tp) == null)) In essence, as long as the partition is
> > making progress (even if it's a trickle), the producer keeps on going.
> > We've other internal systems to detect whether a pipeline is making
> > *sufficient* progress or not. We're not dependent on the producer to tell
> > us that it's not making progress on a certain partition.
> >
> > This is less than ideal though. We would be happy to configure
> > batch.expiry.ms to 1800,000 or so and upon notification of expiry
> restart
> > the process and what not. It can tell also tell us which specific
> > partitions of a specific topic is falling behind. We achieve a similar
> > effect via alternative mechanisms.
>
>
> I presume you are referring to partitions here? If a some partitions are
> unavailable, what could mirror maker do with that knowledge? Presumably
> focus on the partitions which are online. But does that really help?  Today
> the expiry message already contains the partition information, is that
> being used? If not, how would accurate expiry times with something like
> batch.expiry.ms change that fact?
>
You are right, KMM can't do much with the partition information.  My point
is that if we assume for a moment that Producer is the only piece that can
tell us whether a kmm pipeline is making progress or not. Would you
configure it to wait forever (batch.expiry=max_long) or would it be some
small value? It's the later we prefer. However, we really don't need
producer to tell us because we've other infrastructure pieces to tell us
about non-progress.

>
>
> >
> > > Also, the KIP document states the
> > > following:
> > >
> > > *The per message timeout is easy to compute - linger.ms
> > > > <http://linger.ms/> + (retries + 1) * request.timeout.ms
> > > > <http://request.timeout.ms/>". *This is false.
> > >
> >
> > > Why is the statement false? Doesn't that provide an accurate upperbound
> > on
> > > the timeout for a produce request today?
> > >
> > The KIP-91 write-up describes the reasons why. Just reiterating the
> reason:
> > "the condition that if the metadata for a partition is known then we do
> not
> > expire its batches even if they are ready".  Do you not agree with the
> > explanation? If not, what part?
> >
> >
> Unless I have missed something, the producer doesn't seem to rely on the
> metadata being known at least since https://github.com/apache/
> kafka/pull/503.
> The current code to decide whether a batch should be expired is here :
> https://github.com/apache/kafka/blob/trunk/clients/src/
> main/java/org/apache/kafka/clients/producer/internals/
> ProducerBatch.java#L298
> .
>
That's right current implementation does not depend on metadata freshness.
But it should.


> There seems to be no relationship with cluster metadata availability or
> staleness. Expiry is just based on the time since the batch has been ready.
> Please correct me if I am wrong.
>

I was not very specific about where we do expiration. I glossed over some
details because (again) we've other mechanisms to detect non progress. The
condition (!muted.contains(tp) && (isMetadataStale ||
> cluster.leaderFor(tp) == null)) is used in
RecordAccumualtor.expiredBatches:
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L443


Effectively, we expire in all the following cases
1) producer is partitioned from the brokers. When metadata age grows beyond
3x it's max value. It's safe to say that we're not talking to the brokers
at all. Report.
2) fresh metadata && leader for a partition is not known && a batch is
sitting there for longer than request.timeout.ms. This is one case we would
like to improve and use batch.expiry.ms because request.timeout.ms is too
small.
3) fresh metadata && leader for a partition is known && batch is sitting
there for longer than batch.expiry.ms. This is a new case that is different
from #2. This is the catch-up mode case. Things are moving too slowly.
Pipeline SLAs are broken. Report and shutdown kmm.

The second and the third cases are useful to a real-time app for a
completely different reason. Report, forget about the batch, and just move
on (without shutting down).

>
>
>
> > >
> > > In this spirit, it might make sense to clarify the use case that
> > motivates
> > > this additional setting. For instance, with this new configuration, how
> > > would your existing application handle a batch expired exception?
> >
> > Again, a real-time app would just move on. KMM would halt. Any
> > order-sensitive app which needs to provide durability guarantees would
> > halt.
> >
>
> I had a question about the real time app earlier. But KMM and other
> applications can already halt today once they get backed up without this
> new proposed config. How does being slightly more precise of *when* to halt
> change anything about these semantics in a meaningful way?
>
Right, kmm can halt today but it's based on request.timeout.ms. It's not
the right config for this purpose. request.timeout.ms is for network-level
timeouts. Meant to be a small value. A few seconds at most. On the
contrary, batch.expiry.ms would be in double digit minutes. It would be
equal to the pipeline SLA for KMM. The same config would be equal to the
period for a real-time app. It's not a subtle difference. It's meaningful.

Hope that helps.

Regards,
Sumant

Reply via email to