This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 7318e48  Fix for topic lag metrics where topic contains period (.)
     new ab53bb3  Merge pull request #1131 from ntent-ad/bug/SAMZA-2295
7318e48 is described below

commit 7318e4840f18269b662947c8d7f303ae10fcefa4
Author: Thunder Stumpges <[email protected]>
AuthorDate: Wed Aug 7 13:17:32 2019 -0700

    Fix for topic lag metrics where topic contains period (.)
    
    (messages-behind-high-watermark and high-watermark) are calculated from 
kafka's "records-lag" consumer metric. In version 1.1 or so, when metrics moved 
from including the topic name in the metric name to using tags, they added a 
replacement of period to underscore. See commit :
    
https://github.com/apache/kafka/commit/5d81639907869ce7355c40d2bac176a655e52074#diff-b45245913eaae46aa847d2615d62cde0R1331
---
 .../main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java  | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index f85f5bf..88f8510 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -397,7 +397,9 @@ public class KafkaConsumerProxy<K, V> {
         // These are required by the KafkaConsumer to get the metrics
         HashMap<String, String> tags = new HashMap<>();
         tags.put("client-id", clientId);
-        tags.put("topic", tp.topic());
+        // kafka replaces '.' with underscore '_' in many/all of their metrics 
tags for topic names.
+        // see 
https://github.com/apache/kafka/commit/5d81639907869ce7355c40d2bac176a655e52074#diff-b45245913eaae46aa847d2615d62cde0R1331
+        tags.put("topic", tp.topic().replace('.', '_'));
         tags.put("partition", Integer.toString(tp.partition()));
 
         perPartitionMetrics.put(ssp, new MetricName("records-lag", 
"consumer-fetch-manager-metrics", "", tags));

Reply via email to