This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 35950287937 Kafka: Emit additional consumer metrics. (#17919)
35950287937 is described below

commit 35950287937c37d91f73771424098108f82d9c79
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Apr 16 01:06:29 2025 -0700

    Kafka: Emit additional consumer metrics. (#17919)
    
    * Kafka: Emit additional consumer metrics.
    
    Adds more metrics to the ones originally added in #14582. These metrics
    help provide insight into the workings of the Kafka consumer.
    
    As a bonus, one of the metrics, "kafka/consumer/recordsLag", has a
    dimension "partition" which can be used to more easily determine which
    tasks are reading which partitions. It also provides a view of lag from
    the task perspective rather than the Overlord perspective, which is
    useful if the Overlord is ever unable to report metrics properly.
    
    * Two more metrics.
    
    * Fix style.
    
    * Comments from review.
---
 .../druid/indexing/kafka/KafkaConsumerMetric.java  |  80 +++++++++++
 .../druid/indexing/kafka/KafkaConsumerMonitor.java | 148 +++++++++++++++++----
 .../indexing/kafka/KafkaRecordSupplierTest.java    |  10 ++
 3 files changed, 209 insertions(+), 29 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMetric.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMetric.java
new file mode 100644
index 00000000000..ac444c5072c
--- /dev/null
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMetric.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import java.util.Set;
+
+public class KafkaConsumerMetric
+{
+  private final String kafkaMetricName;
+  private final String druidMetricName;
+  private final Set<String> dimensions;
+  private final MetricType metricType;
+
+  public KafkaConsumerMetric(
+      String kafkaMetricName,
+      String druidMetricName,
+      Set<String> dimensions,
+      MetricType metricType
+  )
+  {
+    this.kafkaMetricName = kafkaMetricName;
+    this.druidMetricName = druidMetricName;
+    this.dimensions = dimensions;
+    this.metricType = metricType;
+  }
+
+  public String getKafkaMetricName()
+  {
+    return kafkaMetricName;
+  }
+
+  public String getDruidMetricName()
+  {
+    return druidMetricName;
+  }
+
+  public Set<String> getDimensions()
+  {
+    return dimensions;
+  }
+
+  public MetricType getMetricType()
+  {
+    return metricType;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "KafkaConsumerMetric{" +
+           "kafkaMetricName='" + kafkaMetricName + '\'' +
+           ", druidMetricName='" + druidMetricName + '\'' +
+           ", dimensions=" + dimensions +
+           ", metricType=" + metricType +
+           '}';
+  }
+
+  public enum MetricType
+  {
+    GAUGE,
+    COUNTER
+  }
+}
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
index 6a8c6c14905..80b42e2f997 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
@@ -19,9 +19,8 @@
 
 package org.apache.druid.indexing.kafka;
 
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.java.util.metrics.AbstractMonitor;
@@ -33,22 +32,103 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class KafkaConsumerMonitor extends AbstractMonitor
 {
+  private static final Logger log = new Logger(KafkaConsumerMonitor.class);
+
   private volatile boolean stopAfterNext = false;
 
-  // Kafka metric name -> Druid metric name
-  private static final Map<String, String> METRICS =
-      ImmutableMap.<String, String>builder()
-                  .put("bytes-consumed-total", "kafka/consumer/bytesConsumed")
-                  .put("records-consumed-total", 
"kafka/consumer/recordsConsumed")
-                  .build();
+  private static final String CLIENT_ID_TAG = "client-id";
   private static final String TOPIC_TAG = "topic";
-  private static final Set<String> TOPIC_METRIC_TAGS = 
ImmutableSet.of("client-id", TOPIC_TAG);
+  private static final String PARTITION_TAG = "partition";
+  private static final String NODE_ID_TAG = "node-id";
+
+  /**
+   * Kafka metric name -> Kafka metric descriptor. Taken from
+   * https://kafka.apache.org/documentation/#consumer_fetch_monitoring.
+   */
+  private static final Map<String, KafkaConsumerMetric> METRICS =
+      Stream.of(
+          new KafkaConsumerMetric(
+              "bytes-consumed-total",
+              "kafka/consumer/bytesConsumed",
+              Set.of(CLIENT_ID_TAG, TOPIC_TAG),
+              KafkaConsumerMetric.MetricType.COUNTER
+          ),
+          new KafkaConsumerMetric(
+              "records-consumed-total",
+              "kafka/consumer/recordsConsumed",
+              Set.of(CLIENT_ID_TAG, TOPIC_TAG),
+              KafkaConsumerMetric.MetricType.COUNTER
+          ),
+          new KafkaConsumerMetric(
+              "fetch-total",
+              "kafka/consumer/fetch",
+              Set.of(CLIENT_ID_TAG),
+              KafkaConsumerMetric.MetricType.COUNTER
+          ),
+          new KafkaConsumerMetric(
+              "fetch-rate",
+              "kafka/consumer/fetchRate",
+              Set.of(CLIENT_ID_TAG),
+              KafkaConsumerMetric.MetricType.GAUGE
+          ),
+          new KafkaConsumerMetric(
+              "fetch-latency-avg",
+              "kafka/consumer/fetchLatencyAvg",
+              Set.of(CLIENT_ID_TAG),
+              KafkaConsumerMetric.MetricType.GAUGE
+          ),
+          new KafkaConsumerMetric(
+              "fetch-latency-max",
+              "kafka/consumer/fetchLatencyMax",
+              Set.of(CLIENT_ID_TAG),
+              KafkaConsumerMetric.MetricType.GAUGE
+          ),
+          new KafkaConsumerMetric(
+              "fetch-size-avg",
+              "kafka/consumer/fetchSizeAvg",
+              Set.of(CLIENT_ID_TAG, TOPIC_TAG),
+              KafkaConsumerMetric.MetricType.GAUGE
+          ),
+          new KafkaConsumerMetric(
+              "fetch-size-max",
+              "kafka/consumer/fetchSizeMax",
+              Set.of(CLIENT_ID_TAG, TOPIC_TAG),
+              KafkaConsumerMetric.MetricType.GAUGE
+          ),
+          new KafkaConsumerMetric(
+              "records-lag",
+              "kafka/consumer/recordsLag",
+              Set.of(CLIENT_ID_TAG, TOPIC_TAG, PARTITION_TAG),
+              KafkaConsumerMetric.MetricType.GAUGE
+          ),
+          new KafkaConsumerMetric(
+              "records-per-request-avg",
+              "kafka/consumer/recordsPerRequestAvg",
+              Set.of(CLIENT_ID_TAG, TOPIC_TAG),
+              KafkaConsumerMetric.MetricType.GAUGE
+          ),
+          new KafkaConsumerMetric(
+              "outgoing-byte-total",
+              "kafka/consumer/outgoingBytes",
+              Set.of(CLIENT_ID_TAG, NODE_ID_TAG),
+              KafkaConsumerMetric.MetricType.COUNTER
+          ),
+          new KafkaConsumerMetric(
+              "incoming-byte-total",
+              "kafka/consumer/incomingBytes",
+              Set.of(CLIENT_ID_TAG, NODE_ID_TAG),
+              KafkaConsumerMetric.MetricType.COUNTER
+          )
+      ).collect(Collectors.toMap(KafkaConsumerMetric::getKafkaMetricName, 
Function.identity()));
 
   private final KafkaConsumer<?, ?> consumer;
-  private final Map<String, AtomicLong> counters = new HashMap<>();
+  private final Map<MetricName, AtomicLong> counters = new HashMap<>();
 
   public KafkaConsumerMonitor(final KafkaConsumer<?, ?> consumer)
   {
@@ -60,18 +140,36 @@ public class KafkaConsumerMonitor extends AbstractMonitor
   {
     for (final Map.Entry<MetricName, ? extends Metric> entry : 
consumer.metrics().entrySet()) {
       final MetricName metricName = entry.getKey();
+      final KafkaConsumerMetric kafkaConsumerMetric = 
METRICS.get(metricName.name());
 
-      if (METRICS.containsKey(metricName.name()) && isTopicMetric(metricName)) 
{
-        final String topic = metricName.tags().get(TOPIC_TAG);
-        final long newValue = ((Number) 
entry.getValue().metricValue()).longValue();
-        final long priorValue =
-            counters.computeIfAbsent(metricName.name(), ignored -> new 
AtomicLong())
-                    .getAndSet(newValue);
+      // Certain metrics are emitted both as grand totals and broken down by 
various tags; we want to ignore the
+      // grand total and only emit the broken-down metrics. To do this, we 
check that metricName.tags() equals the
+      // set of expected dimensions.
+      if (kafkaConsumerMetric != null &&
+          
kafkaConsumerMetric.getDimensions().equals(metricName.tags().keySet())) {
+        final Number newValue = (Number) entry.getValue().metricValue();
+        final Number emitValue;
+
+        if (kafkaConsumerMetric.getMetricType() == 
KafkaConsumerMetric.MetricType.GAUGE || newValue == null) {
+          emitValue = newValue;
+        } else if (kafkaConsumerMetric.getMetricType() == 
KafkaConsumerMetric.MetricType.COUNTER) {
+          final long newValueAsLong = newValue.longValue();
+          final long priorValue =
+              counters.computeIfAbsent(metricName, ignored -> new AtomicLong())
+                      .getAndSet(newValueAsLong);
+          emitValue = newValueAsLong - priorValue;
+        } else {
+          throw DruidException.defensive("Unexpected metric type[%s]", 
kafkaConsumerMetric.getMetricType());
+        }
 
-        if (newValue != priorValue) {
-          final ServiceMetricEvent.Builder builder =
-              new ServiceMetricEvent.Builder().setDimension(TOPIC_TAG, topic);
-          emitter.emit(builder.setMetric(METRICS.get(metricName.name()), 
newValue - priorValue));
+        if (emitValue != null && !Double.isNaN(emitValue.doubleValue())) {
+          final ServiceMetricEvent.Builder builder = new 
ServiceMetricEvent.Builder();
+          for (final String dimension : kafkaConsumerMetric.getDimensions()) {
+            if (!CLIENT_ID_TAG.equals(dimension)) {
+              builder.setDimension(dimension, 
metricName.tags().get(dimension));
+            }
+          }
+          
emitter.emit(builder.setMetric(kafkaConsumerMetric.getDruidMetricName(), 
emitValue));
         }
       }
     }
@@ -83,12 +181,4 @@ public class KafkaConsumerMonitor extends AbstractMonitor
   {
     stopAfterNext = true;
   }
-
-  private static boolean isTopicMetric(final MetricName metricName)
-  {
-    // Certain metrics are emitted both as grand totals and broken down by 
topic; we want to ignore the grand total and
-    // only look at the per-topic metrics. See 
https://kafka.apache.org/documentation/#consumer_fetch_monitoring.
-    return TOPIC_METRIC_TAGS.equals(metricName.tags().keySet())
-           && !Strings.isNullOrEmpty(metricName.tags().get(TOPIC_TAG));
-  }
 }
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
index bfd81464ba2..13c2219cd6f 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
@@ -437,6 +437,16 @@ public class KafkaRecordSupplierTest
     Assert.assertTrue(monitor.monitor(emitter));
     emitter.verifyEmitted("kafka/consumer/bytesConsumed", 1);
     emitter.verifyEmitted("kafka/consumer/recordsConsumed", 1);
+    emitter.verifyEmitted("kafka/consumer/fetch", 1);
+    emitter.verifyEmitted("kafka/consumer/recordsLag", 2); // per partition
+    emitter.verifyEmitted("kafka/consumer/fetchRate", 1);
+    emitter.verifyEmitted("kafka/consumer/fetchLatencyAvg", 1);
+    emitter.verifyEmitted("kafka/consumer/fetchLatencyMax", 1);
+    emitter.verifyEmitted("kafka/consumer/fetchSizeAvg", 1);
+    emitter.verifyEmitted("kafka/consumer/fetchSizeMax", 1);
+    emitter.verifyEmitted("kafka/consumer/recordsPerRequestAvg", 1);
+    emitter.verifyEmitted("kafka/consumer/incomingBytes", 2);
+    emitter.verifyEmitted("kafka/consumer/outgoingBytes", 2);
 
     recordSupplier.close();
     Assert.assertFalse(monitor.monitor(emitter));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to