This is an automated email from the ASF dual-hosted git repository.

gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 11b0b591822 feat: Include supervisorId in Kafka consumer metrics. 
(#19525)
11b0b591822 is described below

commit 11b0b5918220b51b7e2c0c684d1113ab038d9c15
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Jun 3 07:12:59 2026 -0700

    feat: Include supervisorId in Kafka consumer metrics. (#19525)
    
    This patch updates KafkaConsumerMonitor to accept the task's
    metric builder, which includes supervisorId as well as other dimensions
    from IndexTaskUtils.setTaskDimensions.
---
 .../druid/indexing/kafka/KafkaConsumerMonitor.java | 21 ++++++---
 .../druid/indexing/kafka/KafkaIndexTask.java       |  9 +++-
 .../druid/indexing/kafka/KafkaRecordSupplier.java  | 17 ++++++--
 .../druid/indexing/kafka/KafkaSamplerSpec.java     |  8 +++-
 .../indexing/kafka/supervisor/KafkaSupervisor.java |  3 +-
 .../indexing/kafka/KafkaRecordSupplierTest.java    | 51 ++++++++++++++--------
 .../kafka/supervisor/KafkaSupervisorTest.java      |  3 +-
 7 files changed, 79 insertions(+), 33 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
index a24779ec61e..08f933a31b0 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
@@ -21,7 +21,6 @@ package org.apache.druid.indexing.kafka;
 
 import com.google.common.util.concurrent.AtomicDouble;
 import org.apache.druid.error.DruidException;
-import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.java.util.metrics.AbstractMonitor;
@@ -29,18 +28,18 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 
+import javax.annotation.Nullable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 public class KafkaConsumerMonitor extends AbstractMonitor
 {
-  private static final Logger log = new Logger(KafkaConsumerMonitor.class);
-
   private volatile boolean stopAfterNext = false;
 
   private static final String CLIENT_ID_TAG = "client-id";
@@ -137,12 +136,23 @@ public class KafkaConsumerMonitor extends AbstractMonitor
       ).collect(Collectors.toMap(KafkaConsumerMetric::getKafkaMetricName, 
Function.identity()));
 
   private final KafkaConsumer<?, ?> consumer;
+
+  /**
+   * Supplies a new metric builder for each emitted metric.
+   */
+  @Nullable
+  private final Supplier<ServiceMetricEvent.Builder> metricBuilderSupplier;
+
   private final Map<MetricName, AtomicLong> counters = new HashMap<>();
   private final AtomicDouble pollIdleRatioAvg = new AtomicDouble(1.0d);
 
-  public KafkaConsumerMonitor(final KafkaConsumer<?, ?> consumer)
+  public KafkaConsumerMonitor(
+      final KafkaConsumer<?, ?> consumer,
+      @Nullable final Supplier<ServiceMetricEvent.Builder> 
metricBuilderSupplier
+  )
   {
     this.consumer = consumer;
+    this.metricBuilderSupplier = metricBuilderSupplier;
   }
 
   @Override
@@ -173,7 +183,8 @@ public class KafkaConsumerMonitor extends AbstractMonitor
         }
 
         if (emitValue != null && !Double.isNaN(emitValue.doubleValue())) {
-          final ServiceMetricEvent.Builder builder = new 
ServiceMetricEvent.Builder();
+          final ServiceMetricEvent.Builder builder =
+              metricBuilderSupplier != null ? metricBuilderSupplier.get() : 
new ServiceMetricEvent.Builder();
           for (final String dimension : kafkaConsumerMetric.getDimensions()) {
             if (!CLIENT_ID_TAG.equals(dimension)) {
               builder.setDimension(dimension, 
metricName.tags().get(dimension));
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index f16a9a35cf2..2b1bc58e190 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -116,8 +116,13 @@ public class KafkaIndexTask extends 
SeekableStreamIndexTask<KafkaTopicPartition,
       props.put("auto.offset.reset", "none");
 
       final KafkaRecordSupplier recordSupplier =
-          new KafkaRecordSupplier(props, configMapper, 
kafkaIndexTaskIOConfig.getConfigOverrides(),
-                                  kafkaIndexTaskIOConfig.isMultiTopic());
+          new KafkaRecordSupplier(
+              props,
+              configMapper,
+              kafkaIndexTaskIOConfig.getConfigOverrides(),
+              kafkaIndexTaskIOConfig.isMultiTopic(),
+              this::getMetricBuilder
+          );
 
       if (toolbox.getMonitorScheduler() != null) {
         toolbox.getMonitorScheduler().addMonitor(recordSupplier.monitor());
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
index c76e5f4965f..26dce74b0ae 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
@@ -35,6 +35,7 @@ import 
org.apache.druid.indexing.seekablestream.common.StreamException;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.java.util.metrics.Monitor;
 import org.apache.druid.metadata.DynamicConfigProvider;
 import org.apache.druid.metadata.PasswordProvider;
@@ -46,6 +47,7 @@ import 
org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Type;
@@ -58,6 +60,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -90,21 +93,27 @@ public class KafkaRecordSupplier implements 
RecordSupplier<KafkaTopicPartition,
       Map<String, Object> consumerProperties,
       ObjectMapper sortingMapper,
       KafkaConfigOverrides configOverrides,
-      boolean multiTopic
+      boolean multiTopic,
+      @Nullable Supplier<ServiceMetricEvent.Builder> metricBuilderSupplier
   )
   {
-    this(getKafkaConsumer(sortingMapper, consumerProperties, configOverrides), 
multiTopic);
+    this(
+        getKafkaConsumer(sortingMapper, consumerProperties, configOverrides),
+        multiTopic,
+        metricBuilderSupplier
+    );
   }
 
   @VisibleForTesting
   public KafkaRecordSupplier(
       KafkaConsumer<byte[], byte[]> consumer,
-      boolean multiTopic
+      boolean multiTopic,
+      @Nullable Supplier<ServiceMetricEvent.Builder> metricBuilderSupplier
   )
   {
     this.consumer = consumer;
     this.multiTopic = multiTopic;
-    this.monitor = new KafkaConsumerMonitor(consumer);
+    this.monitor = new KafkaConsumerMonitor(consumer, metricBuilderSupplier);
   }
 
   @Override
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
index d718e35d7cd..d04d97472b7 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
@@ -69,8 +69,12 @@ public class KafkaSamplerSpec extends 
SeekableStreamSamplerSpec
       props.put("request.timeout.ms", 
Integer.toString(samplerConfig.getTimeoutMs()));
       KafkaSupervisorIOConfig kafkaSupervisorIOConfig = 
(KafkaSupervisorIOConfig) ioConfig;
 
-      return new KafkaRecordSupplier(props, objectMapper, 
kafkaSupervisorIOConfig.getConfigOverrides(),
-                                     kafkaSupervisorIOConfig.isMultiTopic()
+      return new KafkaRecordSupplier(
+          props,
+          objectMapper,
+          kafkaSupervisorIOConfig.getConfigOverrides(),
+          kafkaSupervisorIOConfig.isMultiTopic(),
+          null
       );
     }
     finally {
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 5863284cc2d..0d8fa70f7f1 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -137,7 +137,8 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
         spec.getIoConfig().getConsumerProperties(),
         sortingMapper,
         spec.getIoConfig().getConfigOverrides(),
-        spec.getIoConfig().isMultiTopic()
+        spec.getIoConfig().isMultiTopic(),
+        null
     );
   }
 
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
index a01ae639c04..f7c65df3d19 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
@@ -31,10 +31,12 @@ import 
org.apache.druid.indexing.kafka.test.EmbeddedKafkaBroker;
 import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.java.util.metrics.Monitor;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.metadata.DynamicConfigProvider;
 import org.apache.druid.metadata.MapStringDynamicConfigProvider;
+import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.segment.TestHelper;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -60,6 +62,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -230,7 +233,7 @@ public class KafkaRecordSupplierTest
     );
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
+        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, null);
 
     Assert.assertTrue(recordSupplier.getAssignment().isEmpty());
 
@@ -263,14 +266,14 @@ public class KafkaRecordSupplierTest
     properties.put("sasl.oauthbearer.token.endpoint.url", 
"http://localhost:8080/token";);
 
     MatcherAssert.assertThat(
-        assertThrows(KafkaException.class, () -> new 
KafkaRecordSupplier(properties, OBJECT_MAPPER, null, false)),
+        assertThrows(KafkaException.class, () -> new 
KafkaRecordSupplier(properties, OBJECT_MAPPER, null, false, null)),
         CoreMatchers.instanceOf(KafkaException.class)
     );
 
     properties.remove("sasl.oauthbearer.token.endpoint.url");
     properties.put("sasl.oauthbearer.jwks.endpoint.url", 
"http://localhost:8080/jwks";);
     MatcherAssert.assertThat(
-        assertThrows(KafkaException.class, () -> new 
KafkaRecordSupplier(properties, OBJECT_MAPPER, null, false)),
+        assertThrows(KafkaException.class, () -> new 
KafkaRecordSupplier(properties, OBJECT_MAPPER, null, false, null)),
         CoreMatchers.instanceOf(KafkaException.class)
     );
   }
@@ -287,7 +290,7 @@ public class KafkaRecordSupplierTest
     insertData();
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, true);
+        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, true, null);
 
     String stream = Pattern.quote(TOPIC) + "|" + Pattern.quote(otherTopic);
     Set<KafkaTopicPartition> partitions = 
recordSupplier.getPartitionIds(stream);
@@ -323,7 +326,8 @@ public class KafkaRecordSupplierTest
         properties,
         OBJECT_MAPPER,
         null,
-        false
+        false,
+        null
     );
 
     Assert.assertTrue(recordSupplier.getAssignment().isEmpty());
@@ -351,7 +355,8 @@ public class KafkaRecordSupplierTest
             properties,
             OBJECT_MAPPER,
             null,
-            false
+            false,
+            null
     );
 
     Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test 
recordSupplier is initiated
@@ -370,7 +375,8 @@ public class KafkaRecordSupplierTest
             properties,
             OBJECT_MAPPER,
             null,
-            false
+            false,
+            null
     );
 
     Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test 
recordSupplier is initiated
@@ -397,7 +403,8 @@ public class KafkaRecordSupplierTest
         properties,
         OBJECT_MAPPER,
         null,
-        false
+        false,
+        null
     );
 
     recordSupplier.assign(partitions);
@@ -431,11 +438,14 @@ public class KafkaRecordSupplierTest
         StreamPartition.of(TOPIC, PARTITION_1)
     );
 
+    final Supplier<ServiceMetricEvent.Builder> metricBuilderSupplier =
+        () -> new 
ServiceMetricEvent.Builder().setDimension(DruidMetrics.SUPERVISOR_ID, 
"supervisor-1");
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
         KAFKA_SERVER.consumerProperties(),
         OBJECT_MAPPER,
         null,
-        false
+        false,
+        metricBuilderSupplier
     );
 
     final Monitor monitor = recordSupplier.monitor();
@@ -472,6 +482,11 @@ public class KafkaRecordSupplierTest
     emitter.verifyEmitted("kafka/consumer/outgoingBytes", 2);
     emitter.verifyEmitted("kafka/consumer/pollIdleRatio", 1);
 
+    // All emitted metrics carry the supervisorId dimension.
+    for (final ServiceMetricEvent event : 
emitter.getMetricEvents("kafka/consumer/bytesConsumed")) {
+      Assert.assertEquals("supervisor-1", 
event.getUserDims().get(DruidMetrics.SUPERVISOR_ID));
+    }
+
     recordSupplier.close();
     Assert.assertFalse(monitor.monitor(emitter));
   }
@@ -497,7 +512,7 @@ public class KafkaRecordSupplierTest
 
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
+        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, null);
     recordSupplier.assign(partitions);
     recordSupplier.seekToEarliest(partitions);
 
@@ -567,7 +582,7 @@ public class KafkaRecordSupplierTest
     );
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
+        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, null);
 
     recordSupplier.assign(partitions);
     recordSupplier.seekToEarliest(partitions);
@@ -610,7 +625,7 @@ public class KafkaRecordSupplierTest
     );
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
+        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, null);
 
     recordSupplier.assign(partitions);
     recordSupplier.seekToEarliest(partitions);
@@ -643,7 +658,7 @@ public class KafkaRecordSupplierTest
     );
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
+        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, null);
 
     recordSupplier.assign(partitions);
 
@@ -669,7 +684,7 @@ public class KafkaRecordSupplierTest
     );
 
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
+        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, null);
 
     recordSupplier.assign(partitions);
     recordSupplier.seekToEarliest(partitions);
@@ -704,7 +719,7 @@ public class KafkaRecordSupplierTest
   public void 
getLatestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull()
   {
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
+        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, null);
     StreamPartition<KafkaTopicPartition> streamPartition = 
StreamPartition.of(TOPIC, PARTITION_0);
     Set<StreamPartition<KafkaTopicPartition>> partitions = 
ImmutableSet.of(streamPartition);
     recordSupplier.assign(partitions);
@@ -716,7 +731,7 @@ public class KafkaRecordSupplierTest
   public void 
getEarliestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull()
   {
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
+        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, null);
     StreamPartition<KafkaTopicPartition> streamPartition = 
StreamPartition.of(TOPIC, PARTITION_0);
     Set<StreamPartition<KafkaTopicPartition>> partitions = 
ImmutableSet.of(streamPartition);
     recordSupplier.assign(partitions);
@@ -728,7 +743,7 @@ public class KafkaRecordSupplierTest
   public void 
getLatestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull()
   {
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
+        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, null);
     StreamPartition<KafkaTopicPartition> streamPartition = 
StreamPartition.of(TOPIC, PARTITION_0);
     Set<StreamPartition<KafkaTopicPartition>> partitions = 
ImmutableSet.of(streamPartition);
     recordSupplier.assign(partitions);
@@ -740,7 +755,7 @@ public class KafkaRecordSupplierTest
   public void 
getEarliestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull()
   {
     KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
-        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
+        KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, null);
     StreamPartition<KafkaTopicPartition> streamPartition = 
StreamPartition.of(TOPIC, PARTITION_0);
     Set<StreamPartition<KafkaTopicPartition>> partitions = 
ImmutableSet.of(streamPartition);
     recordSupplier.assign(partitions);
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index d1f082af24e..65c60c38418 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -6348,7 +6348,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
       Deserializer valueDeserializerObject = new ByteArrayDeserializer();
       return new KafkaRecordSupplier(
           new KafkaConsumer<>(props, keyDeserializerObject, 
valueDeserializerObject),
-          getIoConfig().isMultiTopic()
+          getIoConfig().isMultiTopic(),
+          null
       );
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to