This is an automated email from the ASF dual-hosted git repository.
gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 11b0b591822 feat: Include supervisorId in Kafka consumer metrics.
(#19525)
11b0b591822 is described below
commit 11b0b5918220b51b7e2c0c684d1113ab038d9c15
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Jun 3 07:12:59 2026 -0700
feat: Include supervisorId in Kafka consumer metrics. (#19525)
This patch updates KafkaConsumerMonitor to accept the task's
metric builder, which includes supervisorId as well as other dimensions
from IndexTaskUtils.setTaskDimensions.
---
.../druid/indexing/kafka/KafkaConsumerMonitor.java | 21 ++++++---
.../druid/indexing/kafka/KafkaIndexTask.java | 9 +++-
.../druid/indexing/kafka/KafkaRecordSupplier.java | 17 ++++++--
.../druid/indexing/kafka/KafkaSamplerSpec.java | 8 +++-
.../indexing/kafka/supervisor/KafkaSupervisor.java | 3 +-
.../indexing/kafka/KafkaRecordSupplierTest.java | 51 ++++++++++++++--------
.../kafka/supervisor/KafkaSupervisorTest.java | 3 +-
7 files changed, 79 insertions(+), 33 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
index a24779ec61e..08f933a31b0 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
@@ -21,7 +21,6 @@ package org.apache.druid.indexing.kafka;
import com.google.common.util.concurrent.AtomicDouble;
import org.apache.druid.error.DruidException;
-import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.AbstractMonitor;
@@ -29,18 +28,18 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
+import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class KafkaConsumerMonitor extends AbstractMonitor
{
- private static final Logger log = new Logger(KafkaConsumerMonitor.class);
-
private volatile boolean stopAfterNext = false;
private static final String CLIENT_ID_TAG = "client-id";
@@ -137,12 +136,23 @@ public class KafkaConsumerMonitor extends AbstractMonitor
).collect(Collectors.toMap(KafkaConsumerMetric::getKafkaMetricName,
Function.identity()));
private final KafkaConsumer<?, ?> consumer;
+
+ /**
+ * Supplies a new metric builder for each emitted metric.
+ */
+ @Nullable
+ private final Supplier<ServiceMetricEvent.Builder> metricBuilderSupplier;
+
private final Map<MetricName, AtomicLong> counters = new HashMap<>();
private final AtomicDouble pollIdleRatioAvg = new AtomicDouble(1.0d);
- public KafkaConsumerMonitor(final KafkaConsumer<?, ?> consumer)
+ public KafkaConsumerMonitor(
+ final KafkaConsumer<?, ?> consumer,
+ @Nullable final Supplier<ServiceMetricEvent.Builder>
metricBuilderSupplier
+ )
{
this.consumer = consumer;
+ this.metricBuilderSupplier = metricBuilderSupplier;
}
@Override
@@ -173,7 +183,8 @@ public class KafkaConsumerMonitor extends AbstractMonitor
}
if (emitValue != null && !Double.isNaN(emitValue.doubleValue())) {
- final ServiceMetricEvent.Builder builder = new
ServiceMetricEvent.Builder();
+ final ServiceMetricEvent.Builder builder =
+ metricBuilderSupplier != null ? metricBuilderSupplier.get() :
new ServiceMetricEvent.Builder();
for (final String dimension : kafkaConsumerMetric.getDimensions()) {
if (!CLIENT_ID_TAG.equals(dimension)) {
builder.setDimension(dimension,
metricName.tags().get(dimension));
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index f16a9a35cf2..2b1bc58e190 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -116,8 +116,13 @@ public class KafkaIndexTask extends
SeekableStreamIndexTask<KafkaTopicPartition,
props.put("auto.offset.reset", "none");
final KafkaRecordSupplier recordSupplier =
- new KafkaRecordSupplier(props, configMapper,
kafkaIndexTaskIOConfig.getConfigOverrides(),
- kafkaIndexTaskIOConfig.isMultiTopic());
+ new KafkaRecordSupplier(
+ props,
+ configMapper,
+ kafkaIndexTaskIOConfig.getConfigOverrides(),
+ kafkaIndexTaskIOConfig.isMultiTopic(),
+ this::getMetricBuilder
+ );
if (toolbox.getMonitorScheduler() != null) {
toolbox.getMonitorScheduler().addMonitor(recordSupplier.monitor());
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
index c76e5f4965f..26dce74b0ae 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
@@ -35,6 +35,7 @@ import
org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.Monitor;
import org.apache.druid.metadata.DynamicConfigProvider;
import org.apache.druid.metadata.PasswordProvider;
@@ -46,6 +47,7 @@ import
org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
@@ -58,6 +60,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -90,21 +93,27 @@ public class KafkaRecordSupplier implements
RecordSupplier<KafkaTopicPartition,
Map<String, Object> consumerProperties,
ObjectMapper sortingMapper,
KafkaConfigOverrides configOverrides,
- boolean multiTopic
+ boolean multiTopic,
+ @Nullable Supplier<ServiceMetricEvent.Builder> metricBuilderSupplier
)
{
- this(getKafkaConsumer(sortingMapper, consumerProperties, configOverrides),
multiTopic);
+ this(
+ getKafkaConsumer(sortingMapper, consumerProperties, configOverrides),
+ multiTopic,
+ metricBuilderSupplier
+ );
}
@VisibleForTesting
public KafkaRecordSupplier(
KafkaConsumer<byte[], byte[]> consumer,
- boolean multiTopic
+ boolean multiTopic,
+ @Nullable Supplier<ServiceMetricEvent.Builder> metricBuilderSupplier
)
{
this.consumer = consumer;
this.multiTopic = multiTopic;
- this.monitor = new KafkaConsumerMonitor(consumer);
+ this.monitor = new KafkaConsumerMonitor(consumer, metricBuilderSupplier);
}
@Override
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
index d718e35d7cd..d04d97472b7 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
@@ -69,8 +69,12 @@ public class KafkaSamplerSpec extends
SeekableStreamSamplerSpec
props.put("request.timeout.ms",
Integer.toString(samplerConfig.getTimeoutMs()));
KafkaSupervisorIOConfig kafkaSupervisorIOConfig =
(KafkaSupervisorIOConfig) ioConfig;
- return new KafkaRecordSupplier(props, objectMapper,
kafkaSupervisorIOConfig.getConfigOverrides(),
- kafkaSupervisorIOConfig.isMultiTopic()
+ return new KafkaRecordSupplier(
+ props,
+ objectMapper,
+ kafkaSupervisorIOConfig.getConfigOverrides(),
+ kafkaSupervisorIOConfig.isMultiTopic(),
+ null
);
}
finally {
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 5863284cc2d..0d8fa70f7f1 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -137,7 +137,8 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
spec.getIoConfig().getConsumerProperties(),
sortingMapper,
spec.getIoConfig().getConfigOverrides(),
- spec.getIoConfig().isMultiTopic()
+ spec.getIoConfig().isMultiTopic(),
+ null
);
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
index a01ae639c04..f7c65df3d19 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
@@ -31,10 +31,12 @@ import
org.apache.druid.indexing.kafka.test.EmbeddedKafkaBroker;
import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.Monitor;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.DynamicConfigProvider;
import org.apache.druid.metadata.MapStringDynamicConfigProvider;
+import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.TestHelper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -60,6 +62,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -230,7 +233,7 @@ public class KafkaRecordSupplierTest
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
- KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
+ KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, null);
Assert.assertTrue(recordSupplier.getAssignment().isEmpty());
@@ -263,14 +266,14 @@ public class KafkaRecordSupplierTest
properties.put("sasl.oauthbearer.token.endpoint.url",
"http://localhost:8080/token");
MatcherAssert.assertThat(
- assertThrows(KafkaException.class, () -> new
KafkaRecordSupplier(properties, OBJECT_MAPPER, null, false)),
+ assertThrows(KafkaException.class, () -> new
KafkaRecordSupplier(properties, OBJECT_MAPPER, null, false, null)),
CoreMatchers.instanceOf(KafkaException.class)
);
properties.remove("sasl.oauthbearer.token.endpoint.url");
properties.put("sasl.oauthbearer.jwks.endpoint.url",
"http://localhost:8080/jwks");
MatcherAssert.assertThat(
- assertThrows(KafkaException.class, () -> new
KafkaRecordSupplier(properties, OBJECT_MAPPER, null, false)),
+ assertThrows(KafkaException.class, () -> new
KafkaRecordSupplier(properties, OBJECT_MAPPER, null, false, null)),
CoreMatchers.instanceOf(KafkaException.class)
);
}
@@ -287,7 +290,7 @@ public class KafkaRecordSupplierTest
insertData();
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
- KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, true);
+ KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, true, null);
String stream = Pattern.quote(TOPIC) + "|" + Pattern.quote(otherTopic);
Set<KafkaTopicPartition> partitions =
recordSupplier.getPartitionIds(stream);
@@ -323,7 +326,8 @@ public class KafkaRecordSupplierTest
properties,
OBJECT_MAPPER,
null,
- false
+ false,
+ null
);
Assert.assertTrue(recordSupplier.getAssignment().isEmpty());
@@ -351,7 +355,8 @@ public class KafkaRecordSupplierTest
properties,
OBJECT_MAPPER,
null,
- false
+ false,
+ null
);
Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test
recordSupplier is initiated
@@ -370,7 +375,8 @@ public class KafkaRecordSupplierTest
properties,
OBJECT_MAPPER,
null,
- false
+ false,
+ null
);
Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test
recordSupplier is initiated
@@ -397,7 +403,8 @@ public class KafkaRecordSupplierTest
properties,
OBJECT_MAPPER,
null,
- false
+ false,
+ null
);
recordSupplier.assign(partitions);
@@ -431,11 +438,14 @@ public class KafkaRecordSupplierTest
StreamPartition.of(TOPIC, PARTITION_1)
);
+ final Supplier<ServiceMetricEvent.Builder> metricBuilderSupplier =
+ () -> new
ServiceMetricEvent.Builder().setDimension(DruidMetrics.SUPERVISOR_ID,
"supervisor-1");
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
KAFKA_SERVER.consumerProperties(),
OBJECT_MAPPER,
null,
- false
+ false,
+ metricBuilderSupplier
);
final Monitor monitor = recordSupplier.monitor();
@@ -472,6 +482,11 @@ public class KafkaRecordSupplierTest
emitter.verifyEmitted("kafka/consumer/outgoingBytes", 2);
emitter.verifyEmitted("kafka/consumer/pollIdleRatio", 1);
+ // All emitted metrics carry the supervisorId dimension.
+ for (final ServiceMetricEvent event :
emitter.getMetricEvents("kafka/consumer/bytesConsumed")) {
+ Assert.assertEquals("supervisor-1",
event.getUserDims().get(DruidMetrics.SUPERVISOR_ID));
+ }
+
recordSupplier.close();
Assert.assertFalse(monitor.monitor(emitter));
}
@@ -497,7 +512,7 @@ public class KafkaRecordSupplierTest
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
- KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
+ KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, null);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
@@ -567,7 +582,7 @@ public class KafkaRecordSupplierTest
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
- KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
+ KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, null);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
@@ -610,7 +625,7 @@ public class KafkaRecordSupplierTest
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
- KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
+ KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, null);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
@@ -643,7 +658,7 @@ public class KafkaRecordSupplierTest
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
- KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
+ KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, null);
recordSupplier.assign(partitions);
@@ -669,7 +684,7 @@ public class KafkaRecordSupplierTest
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
- KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
+ KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, null);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
@@ -704,7 +719,7 @@ public class KafkaRecordSupplierTest
public void
getLatestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull()
{
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
- KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
+ KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, null);
StreamPartition<KafkaTopicPartition> streamPartition =
StreamPartition.of(TOPIC, PARTITION_0);
Set<StreamPartition<KafkaTopicPartition>> partitions =
ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions);
@@ -716,7 +731,7 @@ public class KafkaRecordSupplierTest
public void
getEarliestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull()
{
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
- KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
+ KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, null);
StreamPartition<KafkaTopicPartition> streamPartition =
StreamPartition.of(TOPIC, PARTITION_0);
Set<StreamPartition<KafkaTopicPartition>> partitions =
ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions);
@@ -728,7 +743,7 @@ public class KafkaRecordSupplierTest
public void
getLatestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull()
{
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
- KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
+ KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, null);
StreamPartition<KafkaTopicPartition> streamPartition =
StreamPartition.of(TOPIC, PARTITION_0);
Set<StreamPartition<KafkaTopicPartition>> partitions =
ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions);
@@ -740,7 +755,7 @@ public class KafkaRecordSupplierTest
public void
getEarliestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull()
{
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
- KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false);
+ KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, null);
StreamPartition<KafkaTopicPartition> streamPartition =
StreamPartition.of(TOPIC, PARTITION_0);
Set<StreamPartition<KafkaTopicPartition>> partitions =
ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions);
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index d1f082af24e..65c60c38418 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -6348,7 +6348,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
Deserializer valueDeserializerObject = new ByteArrayDeserializer();
return new KafkaRecordSupplier(
new KafkaConsumer<>(props, keyDeserializerObject,
valueDeserializerObject),
- getIoConfig().isMultiTopic()
+ getIoConfig().isMultiTopic(),
+ null
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]