This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 35950287937 Kafka: Emit additional consumer metrics. (#17919)
35950287937 is described below
commit 35950287937c37d91f73771424098108f82d9c79
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Apr 16 01:06:29 2025 -0700
Kafka: Emit additional consumer metrics. (#17919)
* Kafka: Emit additional consumer metrics.
Adds more metrics to the ones originally added in #14582. These metrics
help provide insight into the workings of the Kafka consumer.
As a bonus, one of the metrics, "kafka/consumer/recordsLag", has a
dimension "partition" which can be used to more easily determine which
tasks are reading which partitions. It also provides a view of lag from
the task perspective rather than the Overlord perspective, which is
useful if the Overlord is ever unable to report metrics properly.
* Two more metrics.
* Fix style.
* Comments from review.
---
.../druid/indexing/kafka/KafkaConsumerMetric.java | 80 +++++++++++
.../druid/indexing/kafka/KafkaConsumerMonitor.java | 148 +++++++++++++++++----
.../indexing/kafka/KafkaRecordSupplierTest.java | 10 ++
3 files changed, 209 insertions(+), 29 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMetric.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMetric.java
new file mode 100644
index 00000000000..ac444c5072c
--- /dev/null
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMetric.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import java.util.Set;
+
+public class KafkaConsumerMetric
+{
+ private final String kafkaMetricName;
+ private final String druidMetricName;
+ private final Set<String> dimensions;
+ private final MetricType metricType;
+
+ public KafkaConsumerMetric(
+ String kafkaMetricName,
+ String druidMetricName,
+ Set<String> dimensions,
+ MetricType metricType
+ )
+ {
+ this.kafkaMetricName = kafkaMetricName;
+ this.druidMetricName = druidMetricName;
+ this.dimensions = dimensions;
+ this.metricType = metricType;
+ }
+
+ public String getKafkaMetricName()
+ {
+ return kafkaMetricName;
+ }
+
+ public String getDruidMetricName()
+ {
+ return druidMetricName;
+ }
+
+ public Set<String> getDimensions()
+ {
+ return dimensions;
+ }
+
+ public MetricType getMetricType()
+ {
+ return metricType;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "KafkaConsumerMetric{" +
+ "kafkaMetricName='" + kafkaMetricName + '\'' +
+ ", druidMetricName='" + druidMetricName + '\'' +
+ ", dimensions=" + dimensions +
+ ", metricType=" + metricType +
+ '}';
+ }
+
+ public enum MetricType
+ {
+ GAUGE,
+ COUNTER
+ }
+}
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
index 6a8c6c14905..80b42e2f997 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
@@ -19,9 +19,8 @@
package org.apache.druid.indexing.kafka;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.AbstractMonitor;
@@ -33,22 +32,103 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class KafkaConsumerMonitor extends AbstractMonitor
{
+ private static final Logger log = new Logger(KafkaConsumerMonitor.class);
+
private volatile boolean stopAfterNext = false;
- // Kafka metric name -> Druid metric name
- private static final Map<String, String> METRICS =
- ImmutableMap.<String, String>builder()
- .put("bytes-consumed-total", "kafka/consumer/bytesConsumed")
- .put("records-consumed-total",
"kafka/consumer/recordsConsumed")
- .build();
+ private static final String CLIENT_ID_TAG = "client-id";
private static final String TOPIC_TAG = "topic";
- private static final Set<String> TOPIC_METRIC_TAGS =
ImmutableSet.of("client-id", TOPIC_TAG);
+ private static final String PARTITION_TAG = "partition";
+ private static final String NODE_ID_TAG = "node-id";
+
+ /**
+ * Kafka metric name -> Kafka metric descriptor. Taken from
+ * https://kafka.apache.org/documentation/#consumer_fetch_monitoring.
+ */
+ private static final Map<String, KafkaConsumerMetric> METRICS =
+ Stream.of(
+ new KafkaConsumerMetric(
+ "bytes-consumed-total",
+ "kafka/consumer/bytesConsumed",
+ Set.of(CLIENT_ID_TAG, TOPIC_TAG),
+ KafkaConsumerMetric.MetricType.COUNTER
+ ),
+ new KafkaConsumerMetric(
+ "records-consumed-total",
+ "kafka/consumer/recordsConsumed",
+ Set.of(CLIENT_ID_TAG, TOPIC_TAG),
+ KafkaConsumerMetric.MetricType.COUNTER
+ ),
+ new KafkaConsumerMetric(
+ "fetch-total",
+ "kafka/consumer/fetch",
+ Set.of(CLIENT_ID_TAG),
+ KafkaConsumerMetric.MetricType.COUNTER
+ ),
+ new KafkaConsumerMetric(
+ "fetch-rate",
+ "kafka/consumer/fetchRate",
+ Set.of(CLIENT_ID_TAG),
+ KafkaConsumerMetric.MetricType.GAUGE
+ ),
+ new KafkaConsumerMetric(
+ "fetch-latency-avg",
+ "kafka/consumer/fetchLatencyAvg",
+ Set.of(CLIENT_ID_TAG),
+ KafkaConsumerMetric.MetricType.GAUGE
+ ),
+ new KafkaConsumerMetric(
+ "fetch-latency-max",
+ "kafka/consumer/fetchLatencyMax",
+ Set.of(CLIENT_ID_TAG),
+ KafkaConsumerMetric.MetricType.GAUGE
+ ),
+ new KafkaConsumerMetric(
+ "fetch-size-avg",
+ "kafka/consumer/fetchSizeAvg",
+ Set.of(CLIENT_ID_TAG, TOPIC_TAG),
+ KafkaConsumerMetric.MetricType.GAUGE
+ ),
+ new KafkaConsumerMetric(
+ "fetch-size-max",
+ "kafka/consumer/fetchSizeMax",
+ Set.of(CLIENT_ID_TAG, TOPIC_TAG),
+ KafkaConsumerMetric.MetricType.GAUGE
+ ),
+ new KafkaConsumerMetric(
+ "records-lag",
+ "kafka/consumer/recordsLag",
+ Set.of(CLIENT_ID_TAG, TOPIC_TAG, PARTITION_TAG),
+ KafkaConsumerMetric.MetricType.GAUGE
+ ),
+ new KafkaConsumerMetric(
+ "records-per-request-avg",
+ "kafka/consumer/recordsPerRequestAvg",
+ Set.of(CLIENT_ID_TAG, TOPIC_TAG),
+ KafkaConsumerMetric.MetricType.GAUGE
+ ),
+ new KafkaConsumerMetric(
+ "outgoing-byte-total",
+ "kafka/consumer/outgoingBytes",
+ Set.of(CLIENT_ID_TAG, NODE_ID_TAG),
+ KafkaConsumerMetric.MetricType.COUNTER
+ ),
+ new KafkaConsumerMetric(
+ "incoming-byte-total",
+ "kafka/consumer/incomingBytes",
+ Set.of(CLIENT_ID_TAG, NODE_ID_TAG),
+ KafkaConsumerMetric.MetricType.COUNTER
+ )
+ ).collect(Collectors.toMap(KafkaConsumerMetric::getKafkaMetricName,
Function.identity()));
private final KafkaConsumer<?, ?> consumer;
- private final Map<String, AtomicLong> counters = new HashMap<>();
+ private final Map<MetricName, AtomicLong> counters = new HashMap<>();
public KafkaConsumerMonitor(final KafkaConsumer<?, ?> consumer)
{
@@ -60,18 +140,36 @@ public class KafkaConsumerMonitor extends AbstractMonitor
{
for (final Map.Entry<MetricName, ? extends Metric> entry :
consumer.metrics().entrySet()) {
final MetricName metricName = entry.getKey();
+ final KafkaConsumerMetric kafkaConsumerMetric =
METRICS.get(metricName.name());
- if (METRICS.containsKey(metricName.name()) && isTopicMetric(metricName))
{
- final String topic = metricName.tags().get(TOPIC_TAG);
- final long newValue = ((Number)
entry.getValue().metricValue()).longValue();
- final long priorValue =
- counters.computeIfAbsent(metricName.name(), ignored -> new
AtomicLong())
- .getAndSet(newValue);
+ // Certain metrics are emitted both as grand totals and broken down by
various tags; we want to ignore the
+ // grand total and only emit the broken-down metrics. To do this, we
check that metricName.tags() equals the
+ // set of expected dimensions.
+ if (kafkaConsumerMetric != null &&
+
kafkaConsumerMetric.getDimensions().equals(metricName.tags().keySet())) {
+ final Number newValue = (Number) entry.getValue().metricValue();
+ final Number emitValue;
+
+ if (kafkaConsumerMetric.getMetricType() ==
KafkaConsumerMetric.MetricType.GAUGE || newValue == null) {
+ emitValue = newValue;
+ } else if (kafkaConsumerMetric.getMetricType() ==
KafkaConsumerMetric.MetricType.COUNTER) {
+ final long newValueAsLong = newValue.longValue();
+ final long priorValue =
+ counters.computeIfAbsent(metricName, ignored -> new AtomicLong())
+ .getAndSet(newValueAsLong);
+ emitValue = newValueAsLong - priorValue;
+ } else {
+ throw DruidException.defensive("Unexpected metric type[%s]",
kafkaConsumerMetric.getMetricType());
+ }
- if (newValue != priorValue) {
- final ServiceMetricEvent.Builder builder =
- new ServiceMetricEvent.Builder().setDimension(TOPIC_TAG, topic);
- emitter.emit(builder.setMetric(METRICS.get(metricName.name()),
newValue - priorValue));
+ if (emitValue != null && !Double.isNaN(emitValue.doubleValue())) {
+ final ServiceMetricEvent.Builder builder = new
ServiceMetricEvent.Builder();
+ for (final String dimension : kafkaConsumerMetric.getDimensions()) {
+ if (!CLIENT_ID_TAG.equals(dimension)) {
+ builder.setDimension(dimension,
metricName.tags().get(dimension));
+ }
+ }
+
emitter.emit(builder.setMetric(kafkaConsumerMetric.getDruidMetricName(),
emitValue));
}
}
}
@@ -83,12 +181,4 @@ public class KafkaConsumerMonitor extends AbstractMonitor
{
stopAfterNext = true;
}
-
- private static boolean isTopicMetric(final MetricName metricName)
- {
- // Certain metrics are emitted both as grand totals and broken down by
topic; we want to ignore the grand total and
- // only look at the per-topic metrics. See
https://kafka.apache.org/documentation/#consumer_fetch_monitoring.
- return TOPIC_METRIC_TAGS.equals(metricName.tags().keySet())
- && !Strings.isNullOrEmpty(metricName.tags().get(TOPIC_TAG));
- }
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
index bfd81464ba2..13c2219cd6f 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
@@ -437,6 +437,16 @@ public class KafkaRecordSupplierTest
Assert.assertTrue(monitor.monitor(emitter));
emitter.verifyEmitted("kafka/consumer/bytesConsumed", 1);
emitter.verifyEmitted("kafka/consumer/recordsConsumed", 1);
+ emitter.verifyEmitted("kafka/consumer/fetch", 1);
+ emitter.verifyEmitted("kafka/consumer/recordsLag", 2); // per partition
+ emitter.verifyEmitted("kafka/consumer/fetchRate", 1);
+ emitter.verifyEmitted("kafka/consumer/fetchLatencyAvg", 1);
+ emitter.verifyEmitted("kafka/consumer/fetchLatencyMax", 1);
+ emitter.verifyEmitted("kafka/consumer/fetchSizeAvg", 1);
+ emitter.verifyEmitted("kafka/consumer/fetchSizeMax", 1);
+ emitter.verifyEmitted("kafka/consumer/recordsPerRequestAvg", 1);
+ emitter.verifyEmitted("kafka/consumer/incomingBytes", 2);
+ emitter.verifyEmitted("kafka/consumer/outgoingBytes", 2);
recordSupplier.close();
Assert.assertFalse(monitor.monitor(emitter));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]