[ 
https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17699629#comment-17699629
 ] 

Tales Tonini commented on KAFKA-14597:
--------------------------------------

Hi [~cadonna] , I went through 
[KIP-613|https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams],
 its discussion thread, the associated PRs, the trunk code and the related 
tests. AFAIU:
 # right before starting the source node processing, the processor context is 
updated such that its system time matches the wall clock time;
 # then all the topology nodes are DFS-processed by their processors, each 
finishing with a call to forward(...) on the processor context;
 # forward(...) records the e2e latency when it reaches the topology terminal 
node, subtracting the record timestamp from the processor context system time.

 

My observations:

*Ob1)* In the case of a {*}batch of records small enough to process within a 
period when the processor context system time is NOT updated{*}, I was able to 
observe [~atuljainiitk] 's issue with his Streams code. I obtained less than 3s 
for the record-e2e-latency-max metric (as well as -min and -avg), on tag 
stream-processor-node-metrics, for both source and sink nodes. The issue 
disappears once the task has a chance to update the processor context system 
time: the e2e latency metrics on both source and sink nodes start to "pile up" 
many 3s.

Q1: Is this a concern, or is it ok as initial metrics dilute with the flow of 
heavier volumes of records?

 

*Ob2)* Now, irrespective of the batch of records size, source and sink nodes 
have always the same values for e2e latency -max, -min and -avg, respectively. 
After the processor context system time update in step 1, it is never updated 
again until at least the end of step 3 (system time is an old cache by then). I 
noticed in the KIP discussion that some work was carried out to keep the 
metrics framework performant, so it's not desirable to update the system time 
for real very often.

Q2: Updating the processor context system time within forward(...) (before 
recording the e2e latency metric for the terminal node) would fix issues in Ob1 
and Ob2. Would that be desirable, or are we avoiding the performance hit?

 

*Ob3)* KIP-613 intends to measure e2e latency times using the {_}system 
time{_}, as opposed to the stream time, however the associated test 
StreamTaskTest.
shouldRecordE2ELatencyOnSourceNodeAndTerminalNodes() clearly measures e2e 
latency on the terminal node against the {_}stream time{_}. I noticed the test 
was updated about 40 days after the last update in the KIP page. 
Q3: Is the KIP out-of-date and the test is correct? I suppose that because the 
processor context system time is not updated in step 3, system time or stream 
time use is not making any difference here, but it should as the code mentions 
system time, but test mentions stream time.
 
 
*For next steps, could you please help out clarifying the 3 questions above? If 
appropriate and desirable, I can submit a PR.*
 
 
Other observations to consider at some point:
*Ob4)* Tag stream-processor-node-metrics does not present neither of 
process-rate or process-total metrics for the sink node.

*Ob5)* Tag stream-processor-node-metrics presents both process-rate and 
process-total metrics for the source node, but they were both always zero for 
as many records I sent through (I did notice process-rate being updated and 
process-total being incremented under the stream-thread-metrics tag, though).

> [Streams] record-e2e-latency-max is not reporting correct metrics 
> ------------------------------------------------------------------
>
>                 Key: KAFKA-14597
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14597
>             Project: Kafka
>          Issue Type: Bug
>          Components: metrics, streams
>            Reporter: Atul Jain
>            Assignee: Tales Tonini
>            Priority: Major
>         Attachments: process-latency-max.jpg, record-e2e-latency-max.jpg
>
>
> I was following this KIP documentation 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams])
>  and kafka streams documentation 
> ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end])
>  . Based on these documentations , the *record-e2e-latency-max* should 
> monitor the full end to end latencies, which includes both *consumption 
> latencies* and  {*}processing delays{*}.
> However, based on my observations , record-e2e-latency-max seems to be only 
> measuring the consumption latencies. processing delays can be measured using 
> *process-latency-max* .I am checking all this using a simple topology 
> consisting of source, processors and sink (code added). I have added some 
> sleep time (of 3 seconds) in one of the processors to ensure some delays in 
> the processing logic. These delays are not getting accounted in the 
> record-e2e-latency-max but are accounted in process-latency-max. 
> process-latency-max was observed to be 3002 ms which accounts for sleep time 
> of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, 
> which does not account for 3 seconds of sleep time.
>  
> Code describing my topology:
> {code:java}
>    static Topology buildTopology(String inputTopic, String outputTopic) {
>         log.info("Input topic: " + inputTopic + " and output topic: " + 
> outputTopic);
>         Serde<String> stringSerde = Serdes.String();
>         StreamsBuilder builder = new StreamsBuilder();
>         builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
>                 .peek((k,v) -> log.info("Observed event: key" + k + " value: 
> " + v))
>                 .mapValues(s -> {
>                     try {
>                         System.out.println("sleeping for 3 seconds");
>                         Thread.sleep(3000);
>                     }
>                     catch (InterruptedException e) {
>                         e.printStackTrace();
>                     }
>                     return  s.toUpperCase();
>                 })
>                 .peek((k,v) -> log.info("Transformed event: key" + k + " 
> value: " + v))
>                 .to(outputTopic, Produced.with(stringSerde, stringSerde));
>         return builder.build();
>     } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to