Atul Jain created KAFKA-14597:
---------------------------------

             Summary: [Streams] record-e2e-latency-max is not reporting correct 
metrics 
                 Key: KAFKA-14597
                 URL: https://issues.apache.org/jira/browse/KAFKA-14597
             Project: Kafka
          Issue Type: Bug
            Reporter: Atul Jain
         Attachments: process-latency-max.jpg, record-e2e-latency-max.jpg

I was following this KIP documentation 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams])
 and kafka streams documentation 
([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end])
 . Based on these documentations , the *record-e2e-latency-max* should monitor 
the full end to end latencies, which includes both *consumption latencies* and  
{*}processing delays{*}.

However, based on my observations , record-e2e-latency-max seems to be only 
measuring the consumption latencies. processing delays can be measured using 
*process-latency-max* .I am checking all this using a simple topology 
consisting of source, processors and sink (code added). I have added some sleep 
time (of 3 seconds) in one of the processors to ensure some delays in the 
processing logic. These delays are not getting accounted in the 
record-e2e-latency-max but are accounted in process-latency-max. 
process-latency-max was observed to be 3002 ms which accounts for sleep time of 
3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, which 
does not account for 3 seconds of sleep time.


Code describing my topology:
    static Topology buildTopology(String inputTopic, String outputTopic) \{
        log.info("Input topic: " + inputTopic + " and output topic: " + 
outputTopic);

        Serde<String> stringSerde = Serdes.String();
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
                .peek((k,v) -> log.info("Observed event: key" + k + " value: " 
+ v))
                .mapValues(s -> {
                    try {
                        System.out.println("sleeping for 3 seconds");
                        Thread.sleep(3000);
                    }
                    catch (InterruptedException e) \{
                        e.printStackTrace();
                    }
                    return  s.toUpperCase();
                })
                .peek((k,v) -> log.info("Transformed event: key" + k + " value: 
" + v))
                .to(outputTopic, Produced.with(stringSerde, stringSerde));
        return builder.build();
    }
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to