This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 930f165546a KAFKA-17248: Add reporter for adding thread metrics to
telemetry pipeline and a test [2/N] (#17376)
930f165546a is described below
commit 930f165546a83bd3a67567b95249dd485df375fe
Author: Bill Bejeck <[email protected]>
AuthorDate: Sat Oct 5 18:28:31 2024 -0400
KAFKA-17248: Add reporter for adding thread metrics to telemetry pipeline
and a test [2/N] (#17376)
This PR adds a Reporter instance that will add streams thread metrics to
the telemetry pipeline.
For testing, the PR adds a unit test.
Reviewers: Matthias Sax <[email protected]>
---
.../apache/kafka/clients/consumer/Consumer.java | 10 ++
.../kafka/clients/consumer/KafkaConsumer.java | 11 ++
.../kafka/clients/consumer/MockConsumer.java | 17 +++
.../consumer/internals/AsyncKafkaConsumer.java | 11 ++
.../consumer/internals/ClassicKafkaConsumer.java | 12 ++
.../StreamsThreadMetricsDelegatingReporter.java | 86 +++++++++++++++
...StreamsThreadMetricsDelegatingReporterTest.java | 121 +++++++++++++++++++++
7 files changed, 268 insertions(+)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index bfef86aec54..055fcfb1b4f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metrics.KafkaMetric;
import java.io.Closeable;
import java.time.Duration;
@@ -122,6 +123,15 @@ public interface Consumer<K, V> extends Closeable {
*/
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets,
OffsetCommitCallback callback);
+ /**
+ * @see KafkaConsumer#registerMetricForSubscription(KafkaMetric)
+ */
+ void registerMetricForSubscription(KafkaMetric metric);
+
+ /**
+ * @see KafkaConsumer#unregisterMetricFromSubscription(KafkaMetric)
+ */
+ void unregisterMetricFromSubscription(KafkaMetric metric);
/**
* @see KafkaConsumer#seek(TopicPartition, long)
*/
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index bd4f66904a7..6710212c566 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.LogContext;
@@ -1744,4 +1745,14 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
return delegate.updateAssignmentMetadataIfNeeded(timer);
}
+
+ @Override
+ public void registerMetricForSubscription(KafkaMetric metric) {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ @Override
+ public void unregisterMetricFromSubscription(KafkaMetric metric) {
+ throw new UnsupportedOperationException("not implemented");
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 6ae28406cf5..8acdfdca4bf 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.utils.LogContext;
import java.time.Duration;
@@ -76,6 +77,8 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
private Uuid clientInstanceId;
private int injectTimeoutExceptionCounter;
+ private final List<KafkaMetric> addedMetrics = new ArrayList<>();
+
public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
this.subscriptions = new SubscriptionState(new LogContext(),
offsetResetStrategy);
this.partitions = new HashMap<>();
@@ -176,6 +179,16 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
subscriptions.assignFromSubscribed(assignedPartitions);
}
+ @Override
+ public void registerMetricForSubscription(KafkaMetric metric) {
+ addedMetrics.add(metric);
+ }
+
+ @Override
+ public void unregisterMetricFromSubscription(KafkaMetric metric) {
+ addedMetrics.remove(metric);
+ }
+
@Override
public synchronized void assign(Collection<TopicPartition> partitions) {
ensureNotClosed();
@@ -632,4 +645,8 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
public Duration lastPollTimeout() {
return lastPollTimeout;
}
+
+ public List<KafkaMetric> addedMetrics() {
+ return Collections.unmodifiableList(addedMetrics);
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 325524e9eae..01a4d29471e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -77,6 +77,7 @@ import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.requests.JoinGroupRequest;
@@ -642,6 +643,16 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
);
}
+ @Override
+ public void registerMetricForSubscription(KafkaMetric metric) {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ @Override
+ public void unregisterMetricFromSubscription(KafkaMetric metric) {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
/**
* poll implementation using {@link ApplicationEventHandler}.
* 1. Poll for background events. If there's a fetch response event,
process the record and return it. If it is
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index b6cf39c0c78..65cff05bce3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -48,6 +48,7 @@ import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.Deserializer;
@@ -426,6 +427,17 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
subscribeInternal(topics, Optional.of(listener));
}
+
+ @Override
+ public void registerMetricForSubscription(KafkaMetric metric) {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ @Override
+ public void unregisterMetricFromSubscription(KafkaMetric metric) {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
@Override
public void subscribe(Collection<String> topics) {
subscribeInternal(topics, Optional.empty());
diff --git
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
new file mode 100644
index 00000000000..0e2a238a29a
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.internals.metrics;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsReporter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class StreamsThreadMetricsDelegatingReporter implements MetricsReporter
{
+
+ private static final Logger log =
LoggerFactory.getLogger(StreamsThreadMetricsDelegatingReporter.class);
+ private static final String THREAD_ID_TAG = "thread-id";
+ private final Consumer<byte[], byte[]> consumer;
+ private final String threadId;
+ private final String stateUpdaterThreadId;
+
+
+ public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[],
byte[]> consumer, final String threadId, final String stateUpdaterThreadId) {
+ this.consumer = Objects.requireNonNull(consumer);
+ this.threadId = Objects.requireNonNull(threadId);
+ this.stateUpdaterThreadId =
Objects.requireNonNull(stateUpdaterThreadId);
+ log.debug("Creating MetricsReporter for threadId {} and stateUpdaterId
{}", threadId, stateUpdaterThreadId);
+ }
+
+ @Override
+ public void init(final List<KafkaMetric> metrics) {
+ metrics.forEach(this::metricChange);
+ }
+
+ @Override
+ public void metricChange(final KafkaMetric metric) {
+ if (tagMatchStreamOrStateUpdaterThreadId(metric)) {
+ log.debug("Registering metric {}", metric.metricName());
+ consumer.registerMetricForSubscription(metric);
+ }
+ }
+
+ private boolean tagMatchStreamOrStateUpdaterThreadId(final KafkaMetric
metric) {
+ final Map<String, String> tags = metric.metricName().tags();
+ final boolean shouldInclude = tags.containsKey(THREAD_ID_TAG) &&
(tags.get(THREAD_ID_TAG).equals(threadId) ||
tags.get(THREAD_ID_TAG).equals(stateUpdaterThreadId));
+ if (!shouldInclude) {
+ log.trace("Rejecting metric {}", metric.metricName());
+ }
+ return shouldInclude;
+ }
+
+ @Override
+ public void metricRemoval(final KafkaMetric metric) {
+ if (tagMatchStreamOrStateUpdaterThreadId(metric)) {
+ log.debug("Unregistering metric {}", metric.metricName());
+ consumer.unregisterMetricFromSubscription(metric);
+ }
+ }
+
+ @Override
+ public void close() {
+ // No op
+ }
+
+ @Override
+ public void configure(final Map<String, ?> configs) {
+ // No op
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporterTest.java
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporterTest.java
new file mode 100644
index 00000000000..03dbacb82b7
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporterTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.internals.metrics;
+
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.utils.Time;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class StreamsThreadMetricsDelegatingReporterTest {
+
+ private MockConsumer<byte[], byte[]> mockConsumer;
+ private StreamsThreadMetricsDelegatingReporter
streamsThreadMetricsDelegatingReporter;
+
+ private KafkaMetric kafkaMetricOneHasThreadIdTag;
+ private KafkaMetric kafkaMetricTwoHasThreadIdTag;
+ private KafkaMetric kafkaMetricThreeHasThreadIdTag;
+ private KafkaMetric kafkaMetricWithoutThreadIdTag;
+ private final Object lock = new Object();
+ private final MetricConfig metricConfig = new MetricConfig();
+
+
+ @BeforeEach
+ public void setUp() {
+ final Map<String, String> threadIdTagMap = new HashMap<>();
+ final String threadId = "abcxyz-StreamThread-1";
+ threadIdTagMap.put("thread-id", threadId);
+
+ final Map<String, String> threadIdWithStateUpdaterTagMap = new
HashMap<>();
+ final String stateUpdaterId = "deftuv-StateUpdater-1";
+ threadIdWithStateUpdaterTagMap.put("thread-id", stateUpdaterId);
+
+ final Map<String, String> noThreadIdTagMap = new HashMap<>();
+ noThreadIdTagMap.put("client-id", "foo");
+
+ mockConsumer = new MockConsumer<>(OffsetResetStrategy.NONE);
+ streamsThreadMetricsDelegatingReporter = new
StreamsThreadMetricsDelegatingReporter(mockConsumer, threadId, stateUpdaterId);
+
+ final MetricName metricNameOne = new MetricName("metric-one",
"test-group-one", "foo bar baz", threadIdTagMap);
+ final MetricName metricNameTwo = new MetricName("metric-two",
"test-group-two", "description two", threadIdWithStateUpdaterTagMap);
+ final MetricName metricNameThree = new MetricName("metric-three",
"test-group-three", "description three", threadIdTagMap);
+ final MetricName metricNameFour = new MetricName("metric-four",
"test-group-three", "description three", noThreadIdTagMap);
+
+ kafkaMetricOneHasThreadIdTag = new KafkaMetric(lock, metricNameOne,
(Measurable) (m, now) -> 1.0, metricConfig, Time.SYSTEM);
+ kafkaMetricTwoHasThreadIdTag = new KafkaMetric(lock, metricNameTwo,
(Measurable) (m, now) -> 2.0, metricConfig, Time.SYSTEM);
+ kafkaMetricThreeHasThreadIdTag = new KafkaMetric(lock,
metricNameThree, (Measurable) (m, now) -> 3.0, metricConfig, Time.SYSTEM);
+ kafkaMetricWithoutThreadIdTag = new KafkaMetric(lock, metricNameFour,
(Measurable) (m, now) -> 4.0, metricConfig, Time.SYSTEM);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ mockConsumer.close();
+ }
+
+
+ @Test
+ @DisplayName("Init method should register metrics it receives as
parameters")
+ public void shouldInitMetrics() {
+ final List<KafkaMetric> allMetrics =
Arrays.asList(kafkaMetricOneHasThreadIdTag, kafkaMetricTwoHasThreadIdTag,
kafkaMetricThreeHasThreadIdTag);
+ final List<KafkaMetric> expectedMetrics =
Arrays.asList(kafkaMetricOneHasThreadIdTag, kafkaMetricTwoHasThreadIdTag,
kafkaMetricThreeHasThreadIdTag);
+ streamsThreadMetricsDelegatingReporter.init(allMetrics);
+ assertEquals(expectedMetrics, mockConsumer.addedMetrics());
+ }
+
+ @Test
+ @DisplayName("Should register metrics with thread-id in tag map")
+ public void shouldRegisterMetrics() {
+
streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricOneHasThreadIdTag);
+ assertEquals(kafkaMetricOneHasThreadIdTag,
mockConsumer.addedMetrics().get(0));
+ }
+
+ @Test
+ @DisplayName("Should remove metrics")
+ public void shouldRemoveMetrics() {
+
streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricOneHasThreadIdTag);
+
streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricTwoHasThreadIdTag);
+
streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricThreeHasThreadIdTag);
+ List<KafkaMetric> expected =
Arrays.asList(kafkaMetricOneHasThreadIdTag, kafkaMetricTwoHasThreadIdTag,
kafkaMetricThreeHasThreadIdTag);
+ assertEquals(expected, mockConsumer.addedMetrics());
+
streamsThreadMetricsDelegatingReporter.metricRemoval(kafkaMetricOneHasThreadIdTag);
+ expected = Arrays.asList(kafkaMetricTwoHasThreadIdTag,
kafkaMetricThreeHasThreadIdTag);
+ assertEquals(expected, mockConsumer.addedMetrics());
+ }
+
+ @Test
+ @DisplayName("Should not register metrics without thread-id tag")
+ public void shouldNotRegisterMetricsWithoutThreadIdTag() {
+
streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricWithoutThreadIdTag);
+ assertEquals(0, mockConsumer.addedMetrics().size());
+ }
+}
\ No newline at end of file