This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1c998f8ef36 KAFKA-17869: Adding tests to ensure KIP-1076 doesn't 
interfere with consumer metrics[1/3] (#17781)
1c998f8ef36 is described below

commit 1c998f8ef3620ed6bf782a01696f61811da2dea2
Author: Bill Bejeck <[email protected]>
AuthorDate: Thu Nov 21 13:41:29 2024 -0500

    KAFKA-17869: Adding tests to ensure KIP-1076 doesn't interfere with 
consumer metrics[1/3] (#17781)
    
    Adding tests to ensure the KIP-1076 methods don't interfere with existing 
metrics in clients
    
    Reviewers: Apoorv Mittal <[email protected]>, Matthias Sax 
<[email protected]>
---
 checkstyle/import-control.xml                      |   1 +
 .../consumer/internals/AsyncKafkaConsumer.java     |  14 ++-
 .../consumer/internals/ClassicKafkaConsumer.java   |  14 ++-
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 123 +++++++++++++++++++++
 4 files changed, 140 insertions(+), 12 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index eb4131fbc47..bb6f48a73f0 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -225,6 +225,7 @@
     <allow pkg="org.apache.kafka.common" />
     <allow pkg="org.apache.kafka.clients" exact-match="true"/>
     <allow pkg="org.apache.kafka.test" />
+    <allow class="org.apache.log4j.Level" />
 
     <subpackage name="consumer">
       <allow pkg="org.apache.kafka.clients.consumer" />
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index da3f3f2f25b..ba7eed19f11 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -662,17 +662,19 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public void registerMetricForSubscription(KafkaMetric metric) {
-        if (clientTelemetryReporter.isPresent()) {
-            ClientTelemetryReporter reporter = clientTelemetryReporter.get();
-            reporter.metricChange(metric);
+        if (!metrics().containsKey(metric.metricName())) {
+            clientTelemetryReporter.ifPresent(reporter -> 
reporter.metricChange(metric));
+        } else {
+            log.debug("Skipping registration for metric {}. Existing consumer 
metrics cannot be overwritten.", metric.metricName());
         }
     }
 
     @Override
     public void unregisterMetricFromSubscription(KafkaMetric metric) {
-        if (clientTelemetryReporter.isPresent()) {
-            ClientTelemetryReporter reporter = clientTelemetryReporter.get();
-            reporter.metricRemoval(metric);
+        if (!metrics().containsKey(metric.metricName())) {
+            clientTelemetryReporter.ifPresent(reporter -> 
reporter.metricRemoval(metric));
+        } else {
+            log.debug("Skipping unregistration for metric {}. Existing 
consumer metrics cannot be removed.", metric.metricName());
         }
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index 82a9bd2a53b..e423c261763 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -430,17 +430,19 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public void registerMetricForSubscription(KafkaMetric metric) {
-        if (clientTelemetryReporter.isPresent()) {
-            ClientTelemetryReporter reporter = clientTelemetryReporter.get();
-            reporter.metricChange(metric);
+        if (!metrics().containsKey(metric.metricName())) {
+            clientTelemetryReporter.ifPresent(reporter -> 
reporter.metricChange(metric));
+        } else {
+            log.debug("Skipping registration for metric {}. Existing consumer 
metrics cannot be overwritten.", metric.metricName());
         }
     }
 
     @Override
     public void unregisterMetricFromSubscription(KafkaMetric metric) {
-        if (clientTelemetryReporter.isPresent()) {
-            ClientTelemetryReporter reporter = clientTelemetryReporter.get();
-            reporter.metricRemoval(metric);
+        if (!metrics().containsKey(metric.metricName())) {
+            clientTelemetryReporter.ifPresent(reporter -> 
reporter.metricRemoval(metric));
+        }  else {
+            log.debug("Skipping unregistration for metric {}. Existing 
consumer metrics cannot be removed.", metric.metricName());
         }
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index c260fa48c01..7d122c2986c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -21,6 +21,8 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
+import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer;
 import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
@@ -58,6 +60,9 @@ import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartit
 import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -88,6 +93,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
 import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
@@ -96,11 +102,13 @@ import org.apache.kafka.test.MockConsumerInterceptor;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.TestUtils;
 
+import org.apache.log4j.Level;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 import org.mockito.internal.stubbing.answers.CallsRealMethods;
 
 import java.lang.management.ManagementFactory;
@@ -145,6 +153,7 @@ import static java.util.Collections.singletonMap;
 import static 
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.DEFAULT_REASON;
 import static 
org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
 import static org.apache.kafka.common.utils.Utils.propsToMap;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -157,8 +166,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.atMostOnce;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -224,6 +235,107 @@ public class KafkaConsumerTest {
         }
     }
 
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void 
testSubscribingCustomMetricsDoesntAffectConsumerMetrics(GroupProtocol 
groupProtocol) {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name());
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+        consumer = newConsumer(props, new StringDeserializer(), new 
StringDeserializer());
+
+        Map<MetricName, KafkaMetric> customMetrics = customMetrics();
+        customMetrics.forEach((name, metric) -> 
consumer.registerMetricForSubscription(metric));
+
+        Map<MetricName, ? extends Metric> consumerMetrics = consumer.metrics();
+        customMetrics.forEach((name, metric) -> 
assertFalse(consumerMetrics.containsKey(name)));
+    }
+
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void 
testSubscribingCustomMetricsWithSameNameDoesntAffectConsumerMetrics(GroupProtocol
 groupProtocol) {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name());
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+        Class<?> consumerClass = groupProtocol == GroupProtocol.CLASSIC ? 
ClassicKafkaConsumer.class : AsyncKafkaConsumer.class;
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister()) {
+            appender.setClassLogger(consumerClass, Level.DEBUG);
+            consumer = newConsumer(props, new StringDeserializer(), new 
StringDeserializer());
+            KafkaMetric existingMetricToAdd = (KafkaMetric) 
consumer.metrics().entrySet().iterator().next().getValue();
+            consumer.registerMetricForSubscription(existingMetricToAdd);
+            final String expectedMessage = String.format("Skipping 
registration for metric %s. Existing consumer metrics cannot be overwritten.", 
existingMetricToAdd.metricName());
+            assertTrue(appender.getMessages().stream().anyMatch(m -> 
m.contains(expectedMessage)));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void 
testUnsubscribingCustomMetricsWithSameNameDoesntAffectConsumerMetrics(GroupProtocol
 groupProtocol) {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name());
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+        Class<?> consumerClass = groupProtocol == GroupProtocol.CLASSIC ? 
ClassicKafkaConsumer.class : AsyncKafkaConsumer.class;
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister()) {
+            appender.setClassLogger(consumerClass, Level.DEBUG);
+            consumer = newConsumer(props, new StringDeserializer(), new 
StringDeserializer());
+            KafkaMetric existingMetricToRemove = (KafkaMetric) 
consumer.metrics().entrySet().iterator().next().getValue();
+            consumer.unregisterMetricFromSubscription(existingMetricToRemove);
+            final String expectedMessage = String.format("Skipping 
unregistration for metric %s. Existing consumer metrics cannot be removed.", 
existingMetricToRemove.metricName());
+            assertTrue(appender.getMessages().stream().anyMatch(m -> 
m.contains(expectedMessage)));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void 
testShouldOnlyCallMetricReporterMetricChangeOnceWithExistingConsumerMetric(GroupProtocol
 groupProtocol) {
+        try (MockedStatic<CommonClientConfigs> mockedCommonClientConfigs = 
mockStatic(CommonClientConfigs.class, new CallsRealMethods())) {
+            ClientTelemetryReporter clientTelemetryReporter = 
mock(ClientTelemetryReporter.class);
+            clientTelemetryReporter.configure(any());
+            mockedCommonClientConfigs.when(() -> 
CommonClientConfigs.telemetryReporter(anyString(), 
any())).thenReturn(Optional.of(clientTelemetryReporter));
+
+            Properties props = new Properties();
+            props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name());
+            props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+            consumer = newConsumer(props, new StringDeserializer(), new 
StringDeserializer());
+
+            KafkaMetric existingMetric = (KafkaMetric) 
consumer.metrics().entrySet().iterator().next().getValue();
+            consumer.registerMetricForSubscription(existingMetric);
+            // This test would fail without the check as the exising metric is 
registered in the consumer on startup
+            Mockito.verify(clientTelemetryReporter, 
atMostOnce()).metricChange(existingMetric);
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void 
testShouldNotCallMetricReporterMetricRemovalWithExistingConsumerMetric(GroupProtocol
 groupProtocol) {
+        try (MockedStatic<CommonClientConfigs> mockedCommonClientConfigs = 
mockStatic(CommonClientConfigs.class, new CallsRealMethods())) {
+            ClientTelemetryReporter clientTelemetryReporter = 
mock(ClientTelemetryReporter.class);
+            clientTelemetryReporter.configure(any());
+            mockedCommonClientConfigs.when(() -> 
CommonClientConfigs.telemetryReporter(anyString(), 
any())).thenReturn(Optional.of(clientTelemetryReporter));
+
+            Properties props = new Properties();
+            props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name());
+            props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+            consumer = newConsumer(props, new StringDeserializer(), new 
StringDeserializer());
+
+            KafkaMetric existingMetric = (KafkaMetric) 
consumer.metrics().entrySet().iterator().next().getValue();
+            consumer.unregisterMetricFromSubscription(existingMetric);
+            Mockito.verify(clientTelemetryReporter, 
never()).metricRemoval(existingMetric);
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void 
testUnSubscribingNonExisingMetricsDoesntCauseError(GroupProtocol groupProtocol) 
{
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name());
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+        consumer = newConsumer(props, new StringDeserializer(), new 
StringDeserializer());
+
+        Map<MetricName, KafkaMetric> customMetrics = customMetrics();
+        //Metrics never registered but removed should not cause an error
+        customMetrics.forEach((name, metric) -> assertDoesNotThrow(() -> 
consumer.unregisterMetricFromSubscription(metric)));
+    }
+
     @ParameterizedTest
     @EnumSource(GroupProtocol.class)
     public void testMetricsReporterAutoGeneratedClientId(GroupProtocol 
groupProtocol) {
@@ -3501,6 +3613,17 @@ public void 
testClosingConsumerUnregistersConsumerMetrics(GroupProtocol groupPro
         return client.requests().stream().anyMatch(request -> 
request.requestBuilder().apiKey().equals(apiKey));
     }
 
+    private Map<MetricName, KafkaMetric> customMetrics() {
+        MetricConfig metricConfig = new MetricConfig();
+        Object lock = new Object();
+        MetricName metricNameOne = new MetricName("metricOne", 
"stream-metrics", "description for metric one", new HashMap<>());
+        MetricName metricNameTwo = new MetricName("metricTwo", 
"stream-metrics", "description for metric two", new HashMap<>());
+
+        KafkaMetric streamClientMetricOne = new KafkaMetric(lock, 
metricNameOne, (Measurable) (m, now) -> 1.0, metricConfig, Time.SYSTEM);
+        KafkaMetric streamClientMetricTwo = new KafkaMetric(lock, 
metricNameTwo, (Measurable) (m, now) -> 2.0, metricConfig, Time.SYSTEM);
+        return Map.of(metricNameOne, streamClientMetricOne, metricNameTwo, 
streamClientMetricTwo);
+    }
+
     private static final List<String> CLIENT_IDS = new ArrayList<>();
     public static class DeserializerForClientId implements 
Deserializer<byte[]> {
         @Override

Reply via email to