Hi Sean.

Thanks for writing this KIP. Sounds like a great addition. Few comments.

1. Currently, I see that you have proposed partition-level records-latency
metrics and a global records-latency-max metric across all partitions for a
given consumer group. Some Kafka users may organize their topics such that
some topics are more important than others. *Why not have the latency
metric at the topic level as well?* Although one could imagine having
metrics aggregation outside of JMX to generate the topic-level metrics, I
suppose having topic level metrics will allow Kafka users to setup alarms
at the topic level with greater ease. IMHO, this KIP should address this
use case. Even if you believe we should not expose topic level metrics, it
would be nice to see the KIP explain why.

2. Some existing solutions already expose the consumer group lap in time.
See
https://www.lightbend.com/blog/monitor-kafka-consumer-group-latency-with-kafka-lag-exporter
for
an example. *The KIP should reference existing solutions and suggest the
benefits of using the native solution that you propose*.

3. If a message was produced a long time ago, and a new consumer group has
been created, then the latency metrics are going to be very high in value
until the consumer group catches up. This is especially true in the context
of KIP-405 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage)
which allows reading very old messages. Therefore, a consumer application
that relies on reading all messages from the past will report a high
records-latency for a while. *I think that the KIP should note down the
caveat that setting SLAs on records-latency makes sense only in steady
state, and not for bootstrapping new consumer groups.*

4. Since the community has used the term consumer-lag so often, *why not
call the metric consumer-lag-millis which makes the units clear as well*.
records-latency is a bit confusing at least for me.

Cheers.

On Wed, Dec 18, 2019 at 3:28 PM Habib Nahas <ha...@hbnet.io> wrote:

> Thanks Sean. Look forward to the updated KIP.
>
> Regards,
> Habib
>
> On Fri, Dec 13, 2019, at 6:22 AM, Sean Glover wrote:
> > Hi,
> >
> > After my last reply I had a nagging feeling something wasn't right, and I
> > remembered that epoch time is UTC. This makes the discussion about
> > timezone irrelevant, since we're always using UTC. This makes the need
> for
> > the LatencyTime interface that I proposed in the design irrelevant as
> well,
> > since I can no longer think about how that might be useful. I'll update
> > the KIP. I'll also review KIP-32 to understand message timestamps better
> > so I can explain the different types of latency results that could be
> > reported with this metric.
> >
> > Regards,
> > Sean
> >
> > On Thu, Dec 12, 2019 at 6:25 PM Sean Glover <sean.glo...@lightbend.com>
> > wrote:
> >
> > > Hi Habib,
> > >
> > > Thanks for question! If the consumer is in a different timezone than
> the
> > > timezone used to produce messages to a partition then you can use an
> > > implementation of LatencyTime to return the current time of that
> timezone.
> > > The current design assumes that messages produced to a partition must
> all
> > > be produced from the same timezone. If timezone metadata were encoded
> into
> > > each message then it would be possible to automatically determine the
> > > source timezone and calculate latency, however the current design will
> not
> > > pass individual messages into LatencyTime to retrieve message metadata.
> > > Instead, the LatencyTime.getWallClockTime method is only called once
> per
> > > fetch request response per partition and then the metric is recorded
> once
> > > the latency calculation is complete. This follows the same design as
> the
> > > current consumer lag metric which calculates offset lag based on the
> last
> > > message of the fetch request response for a partition. Since the
> metric is
> > > just an aggregate (max/mean) over some time window we only need to
> > > occasionally calculate latency, which will have negligible impact on
> the
> > > performance of consumer polling.
> > >
> > > A simple implementation of LatencyTime that returns wall clock time for
> > > the Asia/Singapore timezone for all partitions:
> > >
> > > import java.time.*;
> > >
> > > class SingaporeTime implements LatencyTime {
> > > ZoneId zoneSingapore = ZoneId.of("Asia/Singapore");
> > > Clock clockSingapore = Clock.system(zoneSingapore);
> > >
> > > @Override
> > > public long getWallClockTime(TopicPartition tp) {
> > > return clockSingapore.instant.getEpochSecond();
> > > }
> > >
> > > ...
> > > }
> > >
> > > Regards,
> > > Sean
> > >
> > >
> > > On Thu, Dec 12, 2019 at 6:18 AM Habib Nahas <ha...@hbnet.io> wrote:
> > >
> > >> Hi Sean,
> > >>
> > >> Thanks for the KIP.
> > >>
> > >> As I understand it users are free to set their own timestamp on
> > >> ProducerRecord. What is the recommendation for the proposed metric in
> a
> > >> scenario where the user sets this timestamp in timezone A and
> consumes the
> > >> record in timezone B. Its not clear to me if a custom implementation
> of
> > >> LatencyTime will help here.
> > >>
> > >> Thanks,
> > >> Habib
> > >>
> > >> On Wed, Dec 11, 2019, at 4:52 PM, Sean Glover wrote:
> > >> > Hello again,
> > >> >
> > >> > There has been some interest in this KIP recently. I'm bumping the
> > >> thread
> > >> > to encourage feedback on the design.
> > >> >
> > >> > Regards,
> > >> > Sean
> > >> >
> > >> > On Mon, Jul 15, 2019 at 9:01 AM Sean Glover <
> sean.glo...@lightbend.com>
> > >> > wrote:
> > >> >
> > >> > > To hopefully spark some discussion I've copied the motivation
> section
> > >> from
> > >> > > the KIP:
> > >> > >
> > >> > > Consumer lag is a useful metric to monitor how many records are
> > >> queued to
> > >> > > be processed. We can look at individual lag per partition or we
> may
> > >> > > aggregate metrics. For example, we may want to monitor what the
> > >> maximum lag
> > >> > > of any particular partition in our consumer subscription so we can
> > >> identify
> > >> > > hot partitions, caused by an insufficient producing partitioning
> > >> strategy.
> > >> > > We may want to monitor a sum of lag across all partitions so we
> have a
> > >> > > sense as to our total backlog of messages to consume. Lag in
> offsets
> > >> is
> > >> > > useful when you have a good understanding of your messages and
> > >> processing
> > >> > > characteristics, but it doesn’t tell us how far behind *in time*
> we
> > >> are.
> > >> > > This is known as wait time in queueing theory, or more informally
> it’s
> > >> > > referred to as latency.
> > >> > >
> > >> > > The latency of a message can be defined as the difference between
> when
> > >> > > that message was first produced to when the message is received
> by a
> > >> > > consumer. The latency of records in a partition correlates with
> lag,
> > >> but a
> > >> > > larger lag doesn’t necessarily mean a larger latency. For
> example, a
> > >> topic
> > >> > > consumed by two separate application consumer groups A and B may
> have
> > >> > > similar lag, but different latency per partition. Application A
> is a
> > >> > > consumer which performs CPU intensive business logic on each
> message
> > >> it
> > >> > > receives. It’s distributed across many consumer group members to
> > >> handle the
> > >> > > load quickly enough, but since its processing time is slower, it
> takes
> > >> > > longer to process each message per partition. Meanwhile,
> Application
> > >> B is
> > >> > > a consumer which performs a simple ETL operation to land streaming
> > >> data in
> > >> > > another system, such as HDFS. It may have similar lag to
> Application
> > >> A, but
> > >> > > because it has a faster processing time its latency per partition
> is
> > >> > > significantly less.
> > >> > >
> > >> > > If the Kafka Consumer reported a latency metric it would be
> easier to
> > >> > > build Service Level Agreements (SLAs) based on non-functional
> > >> requirements
> > >> > > of the streaming system. For example, the system must never have a
> > >> latency
> > >> > > of greater than 10 minutes. This SLA could be used in monitoring
> > >> alerts or
> > >> > > as input to automatic scaling solutions.
> > >> > >
> > >> > > On Thu, Jul 11, 2019 at 12:36 PM Sean Glover <
> > >> sean.glo...@lightbend.com>
> > >> > > wrote:
> > >> > >
> > >> > >> Hi kafka-dev,
> > >> > >>
> > >> > >> I've created KIP-489 as a proposal for adding latency metrics to
> the
> > >> > >> Kafka Consumer in a similar way as record-lag metrics are
> > >> implemented.
> > >> > >>
> > >> > >>
> > >> > >>
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/489%3A+Kafka+Consumer+Record+Latency+Metric
> > >> > >>
> > >> > >> Regards,
> > >> > >> Sean
> > >> > >>
> > >> > >> --
> > >> > >> Principal Engineer, Lightbend, Inc.
> > >> > >>
> > >> > >> <http://lightbend.com>
> > >> > >>
> > >> > >> @seg1o <https://twitter.com/seg1o>, in/seanaglover
> > >> > >> <https://www.linkedin.com/in/seanaglover/>
> > >> > >>
> > >> > >
> > >> > >
> > >> > > --
> > >> > > Principal Engineer, Lightbend, Inc.
> > >> > >
> > >> > > <http://lightbend.com>
> > >> > >
> > >> > > @seg1o <https://twitter.com/seg1o>, in/seanaglover
> > >> > > <https://www.linkedin.com/in/seanaglover/>
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
> > --
> > Sean Glover
> > Principal Engineer, Alpakka, Lightbend, Inc. <https://lightbend.com>
> > @seg1o <https://twitter.com/seg1o>, in/seanaglover
> > <https://www.linkedin.com/in/seanaglover/>
> >
>


-- 
Gokul Ramanan Subramanian
Senior SDE, Amazon AWS

Reply via email to