This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 7318e48 Fix for topic lag metrics where topic contains period (.)
new ab53bb3 Merge pull request #1131 from ntent-ad/bug/SAMZA-2295
7318e48 is described below
commit 7318e4840f18269b662947c8d7f303ae10fcefa4
Author: Thunder Stumpges <[email protected]>
AuthorDate: Wed Aug 7 13:17:32 2019 -0700
Fix for topic lag metrics where topic contains period (.)
(messages-behind-high-watermark and high-watermark) are calculated from
kafka's "records-lag" consumer metric. In version 1.1 or so, when metrics moved
from including the topic name in the metric name to using tags, they added a
replacement of period to underscore. See commit :
https://github.com/apache/kafka/commit/5d81639907869ce7355c40d2bac176a655e52074#diff-b45245913eaae46aa847d2615d62cde0R1331
---
.../main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index f85f5bf..88f8510 100644
---
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -397,7 +397,9 @@ public class KafkaConsumerProxy<K, V> {
// These are required by the KafkaConsumer to get the metrics
HashMap<String, String> tags = new HashMap<>();
tags.put("client-id", clientId);
- tags.put("topic", tp.topic());
+ // kafka replaces '.' with underscore '_' in many/all of their metrics
tags for topic names.
+ // see
https://github.com/apache/kafka/commit/5d81639907869ce7355c40d2bac176a655e52074#diff-b45245913eaae46aa847d2615d62cde0R1331
+ tags.put("topic", tp.topic().replace('.', '_'));
tags.put("partition", Integer.toString(tp.partition()));
perPartitionMetrics.put(ssp, new MetricName("records-lag",
"consumer-fetch-manager-metrics", "", tags));