This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7f8a592ad19 KAFKA-17869: Adding tests to ensure KIP-1076 doesn't 
interfere producer metrics[2/3] (#17783)
7f8a592ad19 is described below

commit 7f8a592ad19dd13b13353649a559a4bebdc0f450
Author: Bill Bejeck <[email protected]>
AuthorDate: Mon Nov 25 16:24:16 2024 -0500

    KAFKA-17869: Adding tests to ensure KIP-1076 doesn't interfere producer 
metrics[2/3] (#17783)
    
    Adding producer tests to ensure the KIP-1076 methods don't interfere with 
existing metrics
    Reviewers: Matthias Sax <[email protected]>
---
 .../kafka/clients/producer/KafkaProducer.java      |  14 +--
 .../kafka/clients/producer/KafkaProducerTest.java  | 116 +++++++++++++++++++++
 2 files changed, 124 insertions(+), 6 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index f65365f274e..e5c70b46c41 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1287,9 +1287,10 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
      */
     @Override
     public void registerMetricForSubscription(KafkaMetric metric) {
-        if (clientTelemetryReporter.isPresent()) {
-            ClientTelemetryReporter reporter = clientTelemetryReporter.get();
-            reporter.metricChange(metric);
+        if (!metrics().containsKey(metric.metricName())) {
+            clientTelemetryReporter.ifPresent(reporter -> 
reporter.metricChange(metric));
+        }  else {
+            log.debug("Skipping registration for metric {}. Existing producer 
metrics cannot be overwritten.", metric.metricName());
         }
     }
 
@@ -1304,9 +1305,10 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
      */
     @Override
     public void unregisterMetricFromSubscription(KafkaMetric metric) {
-        if (clientTelemetryReporter.isPresent()) {
-            ClientTelemetryReporter reporter = clientTelemetryReporter.get();
-            reporter.metricRemoval(metric);
+        if (!metrics().containsKey(metric.metricName())) {
+            clientTelemetryReporter.ifPresent(reporter -> 
reporter.metricRemoval(metric));
+        } else {
+            log.debug("Skipping unregistration for metric {}. Existing 
producer metrics cannot be removed.", metric.metricName());
         }
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index e2e8ca7fed2..da1e4bf9568 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -54,6 +54,9 @@ import org.apache.kafka.common.message.EndTxnResponseData;
 import org.apache.kafka.common.message.InitProducerIdResponseData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
 import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -79,6 +82,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
 import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
 import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
@@ -88,12 +92,14 @@ import org.apache.kafka.test.MockProducerInterceptor;
 import org.apache.kafka.test.MockSerializer;
 import org.apache.kafka.test.TestUtils;
 
+import org.apache.log4j.Level;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 import org.mockito.internal.stubbing.answers.CallsRealMethods;
 
 import java.lang.management.ManagementFactory;
@@ -144,6 +150,7 @@ import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.notNull;
+import static org.mockito.Mockito.atMostOnce;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.never;
@@ -2542,4 +2549,113 @@ public class KafkaProducerTest {
         assertDoesNotThrow(() -> new KafkaProducer<>(configs, new 
StringSerializer(), new StringSerializer()).close());
     }
 
+
+    @Test
+    public void testSubscribingCustomMetricsDoesntAffectProducerMetrics() {
+        Map<String, Object> props = new HashMap<>();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+
+        KafkaProducer<String, String> producer = new KafkaProducer<>(
+            props, new StringSerializer(), new StringSerializer());
+
+        Map<MetricName, KafkaMetric> customMetrics = customMetrics();
+        customMetrics.forEach((name, metric) -> 
producer.registerMetricForSubscription(metric));
+
+        Map<MetricName, ? extends Metric> producerMetrics = producer.metrics();
+        customMetrics.forEach((name, metric) -> 
assertFalse(producerMetrics.containsKey(name)));
+    }
+
+    @Test
+    public void testUnSubscribingNonExisingMetricsDoesntCauseError() {
+        Map<String, Object> props = new HashMap<>();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+
+        KafkaProducer<String, String> producer = new KafkaProducer<>(
+            props, new StringSerializer(), new StringSerializer());
+
+        Map<MetricName, KafkaMetric> customMetrics = customMetrics();
+        //Metrics never registered but removed should not cause an error
+        customMetrics.forEach((name, metric) -> assertDoesNotThrow(() -> 
producer.unregisterMetricFromSubscription(metric)));
+    }
+
+    @Test
+    public void 
testSubscribingCustomMetricsWithSameNameDoesntAffectProducerMetrics() {
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister()) {
+            appender.setClassLogger(KafkaProducer.class, Level.DEBUG);
+            Map<String, Object> props = new HashMap<>();
+            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+            KafkaProducer<String, String> producer = new KafkaProducer<>(
+                    props, new StringSerializer(), new StringSerializer());
+            KafkaMetric existingMetricToAdd = (KafkaMetric) 
producer.metrics().entrySet().iterator().next().getValue();
+            producer.registerMetricForSubscription(existingMetricToAdd);
+            final String expectedMessage = String.format("Skipping 
registration for metric %s. Existing producer metrics cannot be overwritten.", 
existingMetricToAdd.metricName());
+            assertTrue(appender.getMessages().stream().anyMatch(m -> 
m.contains(expectedMessage)));
+        }
+    }
+
+    @Test
+    public void 
testUnsubscribingCustomMetricWithSameNameAsExistingMetricDoesntAffectProducerMetric()
 {
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister()) {
+            appender.setClassLogger(KafkaProducer.class, Level.DEBUG);
+            Map<String, Object> props = new HashMap<>();
+            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+            KafkaProducer<String, String> producer = new KafkaProducer<>(
+                    props, new StringSerializer(), new StringSerializer());
+            KafkaMetric existingMetricToRemove = (KafkaMetric) 
producer.metrics().entrySet().iterator().next().getValue();
+            producer.unregisterMetricFromSubscription(existingMetricToRemove);
+            final String expectedMessage = String.format("Skipping 
unregistration for metric %s. Existing producer metrics cannot be removed.", 
existingMetricToRemove.metricName());
+            assertTrue(appender.getMessages().stream().anyMatch(m -> 
m.contains(expectedMessage)));
+        }
+    }
+
+    @Test
+    public void 
testShouldOnlyCallMetricReporterMetricChangeOnceWithExistingProducerMetric() {
+        try (MockedStatic<CommonClientConfigs> mockedCommonClientConfigs = 
mockStatic(CommonClientConfigs.class, new CallsRealMethods())) {
+            ClientTelemetryReporter clientTelemetryReporter = 
mock(ClientTelemetryReporter.class);
+            clientTelemetryReporter.configure(any());
+            mockedCommonClientConfigs.when(() -> 
CommonClientConfigs.telemetryReporter(anyString(), 
any())).thenReturn(Optional.of(clientTelemetryReporter));
+
+            Map<String, Object> props = new HashMap<>();
+            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+            KafkaProducer<String, String> producer = new KafkaProducer<>(
+                    props, new StringSerializer(), new StringSerializer());
+            KafkaMetric existingMetric = (KafkaMetric) 
producer.metrics().entrySet().iterator().next().getValue();
+            producer.registerMetricForSubscription(existingMetric);
+            // This test would fail without the check as the exising metric is 
registered in the producer on startup
+            Mockito.verify(clientTelemetryReporter, 
atMostOnce()).metricChange(existingMetric);
+        }
+    }
+
+    @Test
+    public void 
testShouldNotCallMetricReporterMetricRemovalWithExistingProducerMetric() {
+        try (MockedStatic<CommonClientConfigs> mockedCommonClientConfigs = 
mockStatic(CommonClientConfigs.class, new CallsRealMethods())) {
+            ClientTelemetryReporter clientTelemetryReporter = 
mock(ClientTelemetryReporter.class);
+            clientTelemetryReporter.configure(any());
+            mockedCommonClientConfigs.when(() -> 
CommonClientConfigs.telemetryReporter(anyString(), 
any())).thenReturn(Optional.of(clientTelemetryReporter));
+
+            Map<String, Object> props = new HashMap<>();
+            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+            KafkaProducer<String, String> producer = new KafkaProducer<>(
+                    props, new StringSerializer(), new StringSerializer());
+            KafkaMetric existingMetric = (KafkaMetric) 
producer.metrics().entrySet().iterator().next().getValue();
+            producer.unregisterMetricFromSubscription(existingMetric);
+            // This test would fail without the check as the exising metric is 
registered in the consumer on startup
+            Mockito.verify(clientTelemetryReporter, 
never()).metricRemoval(existingMetric);
+        }
+    }
+
+
+    private Map<MetricName, KafkaMetric> customMetrics() {
+        MetricConfig metricConfig = new MetricConfig();
+        Object lock = new Object();
+        MetricName metricNameOne = new MetricName("metricOne", 
"stream-metrics", "description for metric one", new HashMap<>());
+        MetricName metricNameTwo = new MetricName("metricTwo", 
"stream-metrics", "description for metric two", new HashMap<>());
+
+        KafkaMetric streamClientMetricOne = new KafkaMetric(lock, 
metricNameOne, (Measurable) (m, now) -> 1.0, metricConfig, Time.SYSTEM);
+        KafkaMetric streamClientMetricTwo = new KafkaMetric(lock, 
metricNameTwo, (Measurable) (m, now) -> 2.0, metricConfig, Time.SYSTEM);
+        return Map.of(metricNameOne, streamClientMetricOne, metricNameTwo, 
streamClientMetricTwo);
+    }
+
+    
+
 }

Reply via email to