This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 930f165546a KAFKA-17248: Add reporter for adding thread metrics to 
telemetry pipeline and a test [2/N] (#17376)
930f165546a is described below

commit 930f165546a83bd3a67567b95249dd485df375fe
Author: Bill Bejeck <[email protected]>
AuthorDate: Sat Oct 5 18:28:31 2024 -0400

    KAFKA-17248: Add reporter for adding thread metrics to telemetry pipeline 
and a test [2/N] (#17376)
    
    This PR adds a Reporter instance that will add streams thread metrics to 
the telemetry pipeline.
    For testing, the PR adds a unit test.
    
    Reviewers: Matthias Sax <[email protected]>
---
 .../apache/kafka/clients/consumer/Consumer.java    |  10 ++
 .../kafka/clients/consumer/KafkaConsumer.java      |  11 ++
 .../kafka/clients/consumer/MockConsumer.java       |  17 +++
 .../consumer/internals/AsyncKafkaConsumer.java     |  11 ++
 .../consumer/internals/ClassicKafkaConsumer.java   |  12 ++
 .../StreamsThreadMetricsDelegatingReporter.java    |  86 +++++++++++++++
 ...StreamsThreadMetricsDelegatingReporterTest.java | 121 +++++++++++++++++++++
 7 files changed, 268 insertions(+)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index bfef86aec54..055fcfb1b4f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metrics.KafkaMetric;
 
 import java.io.Closeable;
 import java.time.Duration;
@@ -122,6 +123,15 @@ public interface Consumer<K, V> extends Closeable {
      */
     void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, 
OffsetCommitCallback callback);
 
+    /**
+     * @see KafkaConsumer#registerMetricForSubscription(KafkaMetric)
+     */
+    void registerMetricForSubscription(KafkaMetric metric);
+
+    /**
+     * @see KafkaConsumer#unregisterMetricFromSubscription(KafkaMetric)
+     */
+    void unregisterMetricFromSubscription(KafkaMetric metric);
     /**
      * @see KafkaConsumer#seek(TopicPartition, long)
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index bd4f66904a7..6710212c566 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.LogContext;
@@ -1744,4 +1745,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
     boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
         return delegate.updateAssignmentMetadataIfNeeded(timer);
     }
+
+    @Override
+    public void registerMetricForSubscription(KafkaMetric metric) {
+        throw new UnsupportedOperationException("not implemented");
+    }
+
+    @Override
+    public void unregisterMetricFromSubscription(KafkaMetric metric) {
+        throw new UnsupportedOperationException("not implemented");
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 6ae28406cf5..8acdfdca4bf 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.utils.LogContext;
 
 import java.time.Duration;
@@ -76,6 +77,8 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     private Uuid clientInstanceId;
     private int injectTimeoutExceptionCounter;
 
+    private final List<KafkaMetric> addedMetrics = new ArrayList<>();
+
     public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
         this.subscriptions = new SubscriptionState(new LogContext(), 
offsetResetStrategy);
         this.partitions = new HashMap<>();
@@ -176,6 +179,16 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         subscriptions.assignFromSubscribed(assignedPartitions);
     }
 
+    @Override
+    public void registerMetricForSubscription(KafkaMetric metric) {
+        addedMetrics.add(metric);
+    }
+
+    @Override
+    public void unregisterMetricFromSubscription(KafkaMetric metric) {
+        addedMetrics.remove(metric);
+    }
+
     @Override
     public synchronized void assign(Collection<TopicPartition> partitions) {
         ensureNotClosed();
@@ -632,4 +645,8 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     public Duration lastPollTimeout() {
         return lastPollTimeout;
     }
+
+    public List<KafkaMetric> addedMetrics() {
+        return Collections.unmodifiableList(addedMetrics);
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 325524e9eae..01a4d29471e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -77,6 +77,7 @@ import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.requests.JoinGroupRequest;
@@ -642,6 +643,16 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         );
     }
 
+    @Override
+    public void registerMetricForSubscription(KafkaMetric metric) {
+        throw new UnsupportedOperationException("not implemented");
+    }
+
+    @Override
+    public void unregisterMetricFromSubscription(KafkaMetric metric) {
+        throw new UnsupportedOperationException("not implemented");
+    }
+
     /**
      * poll implementation using {@link ApplicationEventHandler}.
      *  1. Poll for background events. If there's a fetch response event, 
process the record and return it. If it is
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index b6cf39c0c78..65cff05bce3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -48,6 +48,7 @@ import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.serialization.Deserializer;
@@ -426,6 +427,17 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         subscribeInternal(topics, Optional.of(listener));
     }
 
+
+    @Override
+    public void registerMetricForSubscription(KafkaMetric metric) {
+        throw new UnsupportedOperationException("not implemented");
+    }
+
+    @Override
+    public void unregisterMetricFromSubscription(KafkaMetric metric) {
+        throw new UnsupportedOperationException("not implemented");
+    }
+
     @Override
     public void subscribe(Collection<String> topics) {
         subscribeInternal(topics, Optional.empty());
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
 
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
new file mode 100644
index 00000000000..0e2a238a29a
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.internals.metrics;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsReporter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class StreamsThreadMetricsDelegatingReporter implements MetricsReporter 
{
+    
+    private static final Logger log = 
LoggerFactory.getLogger(StreamsThreadMetricsDelegatingReporter.class);
+    private static final String THREAD_ID_TAG = "thread-id";
+    private final Consumer<byte[], byte[]> consumer;
+    private final String threadId;
+    private final String stateUpdaterThreadId;
+
+
+    public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[], 
byte[]> consumer, final String threadId, final String stateUpdaterThreadId) {
+        this.consumer = Objects.requireNonNull(consumer);
+        this.threadId = Objects.requireNonNull(threadId);
+        this.stateUpdaterThreadId = 
Objects.requireNonNull(stateUpdaterThreadId);
+        log.debug("Creating MetricsReporter for threadId {} and stateUpdaterId 
{}", threadId, stateUpdaterThreadId);
+    }
+
+    @Override
+    public void init(final List<KafkaMetric> metrics) {
+        metrics.forEach(this::metricChange);
+    }
+
+    @Override
+    public void metricChange(final KafkaMetric metric) {
+        if (tagMatchStreamOrStateUpdaterThreadId(metric)) {
+            log.debug("Registering metric {}", metric.metricName());
+            consumer.registerMetricForSubscription(metric);
+        }
+    }
+
+    private boolean tagMatchStreamOrStateUpdaterThreadId(final KafkaMetric 
metric) {
+        final Map<String, String> tags = metric.metricName().tags();
+        final boolean shouldInclude = tags.containsKey(THREAD_ID_TAG) && 
(tags.get(THREAD_ID_TAG).equals(threadId) || 
tags.get(THREAD_ID_TAG).equals(stateUpdaterThreadId));
+        if (!shouldInclude) {
+            log.trace("Rejecting metric {}", metric.metricName());
+        }
+        return shouldInclude;
+    }
+
+    @Override
+    public void metricRemoval(final KafkaMetric metric) {
+        if (tagMatchStreamOrStateUpdaterThreadId(metric)) {
+            log.debug("Unregistering metric {}", metric.metricName());
+            consumer.unregisterMetricFromSubscription(metric);
+        }
+    }
+
+    @Override
+    public void close() {
+        // No op
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs) {
+        // No op
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporterTest.java
new file mode 100644
index 00000000000..03dbacb82b7
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporterTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.internals.metrics;
+
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.utils.Time;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class StreamsThreadMetricsDelegatingReporterTest {
+
+    private MockConsumer<byte[], byte[]> mockConsumer;
+    private StreamsThreadMetricsDelegatingReporter 
streamsThreadMetricsDelegatingReporter;
+
+    private KafkaMetric kafkaMetricOneHasThreadIdTag;
+    private KafkaMetric kafkaMetricTwoHasThreadIdTag;
+    private KafkaMetric kafkaMetricThreeHasThreadIdTag;
+    private KafkaMetric kafkaMetricWithoutThreadIdTag;
+    private final Object lock = new Object();
+    private final MetricConfig metricConfig = new MetricConfig();
+
+
+    @BeforeEach
+    public void setUp() {
+        final Map<String, String> threadIdTagMap = new HashMap<>();
+        final String threadId = "abcxyz-StreamThread-1";
+        threadIdTagMap.put("thread-id", threadId);
+
+        final Map<String, String> threadIdWithStateUpdaterTagMap = new 
HashMap<>();
+        final String stateUpdaterId = "deftuv-StateUpdater-1";
+        threadIdWithStateUpdaterTagMap.put("thread-id", stateUpdaterId);
+
+        final Map<String, String> noThreadIdTagMap = new HashMap<>();
+        noThreadIdTagMap.put("client-id", "foo");
+
+        mockConsumer = new MockConsumer<>(OffsetResetStrategy.NONE);
+        streamsThreadMetricsDelegatingReporter = new 
StreamsThreadMetricsDelegatingReporter(mockConsumer, threadId, stateUpdaterId);
+
+        final MetricName metricNameOne = new MetricName("metric-one", 
"test-group-one", "foo bar baz", threadIdTagMap);
+        final MetricName metricNameTwo = new MetricName("metric-two", 
"test-group-two", "description two", threadIdWithStateUpdaterTagMap);
+        final MetricName metricNameThree = new MetricName("metric-three", 
"test-group-three", "description three", threadIdTagMap);
+        final MetricName metricNameFour = new MetricName("metric-four", 
"test-group-three", "description three", noThreadIdTagMap);
+
+        kafkaMetricOneHasThreadIdTag = new KafkaMetric(lock, metricNameOne, 
(Measurable) (m, now) -> 1.0, metricConfig, Time.SYSTEM);
+        kafkaMetricTwoHasThreadIdTag = new KafkaMetric(lock, metricNameTwo, 
(Measurable) (m, now) -> 2.0, metricConfig, Time.SYSTEM);
+        kafkaMetricThreeHasThreadIdTag = new KafkaMetric(lock, 
metricNameThree, (Measurable) (m, now) -> 3.0, metricConfig, Time.SYSTEM);
+        kafkaMetricWithoutThreadIdTag = new KafkaMetric(lock, metricNameFour, 
(Measurable) (m, now) -> 4.0, metricConfig, Time.SYSTEM);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        mockConsumer.close();
+    }
+
+
+    @Test
+    @DisplayName("Init method should register metrics it receives as 
parameters")
+    public void shouldInitMetrics() {
+        final List<KafkaMetric> allMetrics = 
Arrays.asList(kafkaMetricOneHasThreadIdTag, kafkaMetricTwoHasThreadIdTag, 
kafkaMetricThreeHasThreadIdTag);
+        final List<KafkaMetric> expectedMetrics = 
Arrays.asList(kafkaMetricOneHasThreadIdTag, kafkaMetricTwoHasThreadIdTag, 
kafkaMetricThreeHasThreadIdTag);
+        streamsThreadMetricsDelegatingReporter.init(allMetrics);
+        assertEquals(expectedMetrics, mockConsumer.addedMetrics());
+    }
+
+    @Test
+    @DisplayName("Should register metrics with thread-id in tag map")
+    public void shouldRegisterMetrics() {
+        
streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricOneHasThreadIdTag);
+        assertEquals(kafkaMetricOneHasThreadIdTag, 
mockConsumer.addedMetrics().get(0));
+    }
+
+    @Test
+    @DisplayName("Should remove metrics")
+    public void shouldRemoveMetrics() {
+        
streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricOneHasThreadIdTag);
+        
streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricTwoHasThreadIdTag);
+        
streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricThreeHasThreadIdTag);
+        List<KafkaMetric> expected = 
Arrays.asList(kafkaMetricOneHasThreadIdTag, kafkaMetricTwoHasThreadIdTag, 
kafkaMetricThreeHasThreadIdTag);
+        assertEquals(expected, mockConsumer.addedMetrics());
+        
streamsThreadMetricsDelegatingReporter.metricRemoval(kafkaMetricOneHasThreadIdTag);
+        expected = Arrays.asList(kafkaMetricTwoHasThreadIdTag, 
kafkaMetricThreeHasThreadIdTag);
+        assertEquals(expected, mockConsumer.addedMetrics());
+    }
+
+    @Test
+    @DisplayName("Should not register metrics without thread-id tag")
+    public void shouldNotRegisterMetricsWithoutThreadIdTag() {
+        
streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricWithoutThreadIdTag);
+        assertEquals(0, mockConsumer.addedMetrics().size());
+    }
+}
\ No newline at end of file

Reply via email to