This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7f8a592ad19 KAFKA-17869: Adding tests to ensure KIP-1076 doesn't
interfere producer metrics[2/3] (#17783)
7f8a592ad19 is described below
commit 7f8a592ad19dd13b13353649a559a4bebdc0f450
Author: Bill Bejeck <[email protected]>
AuthorDate: Mon Nov 25 16:24:16 2024 -0500
KAFKA-17869: Adding tests to ensure KIP-1076 doesn't interfere producer
metrics[2/3] (#17783)
Adding producer tests to ensure the KIP-1076 methods don't interfere with
existing metrics
Reviewers: Matthias Sax <[email protected]>
---
.../kafka/clients/producer/KafkaProducer.java | 14 +--
.../kafka/clients/producer/KafkaProducerTest.java | 116 +++++++++++++++++++++
2 files changed, 124 insertions(+), 6 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index f65365f274e..e5c70b46c41 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1287,9 +1287,10 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
*/
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
- if (clientTelemetryReporter.isPresent()) {
- ClientTelemetryReporter reporter = clientTelemetryReporter.get();
- reporter.metricChange(metric);
+ if (!metrics().containsKey(metric.metricName())) {
+ clientTelemetryReporter.ifPresent(reporter ->
reporter.metricChange(metric));
+ } else {
+ log.debug("Skipping registration for metric {}. Existing producer
metrics cannot be overwritten.", metric.metricName());
}
}
@@ -1304,9 +1305,10 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
*/
@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
- if (clientTelemetryReporter.isPresent()) {
- ClientTelemetryReporter reporter = clientTelemetryReporter.get();
- reporter.metricRemoval(metric);
+ if (!metrics().containsKey(metric.metricName())) {
+ clientTelemetryReporter.ifPresent(reporter ->
reporter.metricRemoval(metric));
+ } else {
+ log.debug("Skipping unregistration for metric {}. Existing
producer metrics cannot be removed.", metric.metricName());
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index e2e8ca7fed2..da1e4bf9568 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -54,6 +54,9 @@ import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@@ -79,6 +82,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
@@ -88,12 +92,14 @@ import org.apache.kafka.test.MockProducerInterceptor;
import org.apache.kafka.test.MockSerializer;
import org.apache.kafka.test.TestUtils;
+import org.apache.log4j.Level;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.MockedStatic;
+import org.mockito.Mockito;
import org.mockito.internal.stubbing.answers.CallsRealMethods;
import java.lang.management.ManagementFactory;
@@ -144,6 +150,7 @@ import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.notNull;
+import static org.mockito.Mockito.atMostOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
@@ -2542,4 +2549,113 @@ public class KafkaProducerTest {
assertDoesNotThrow(() -> new KafkaProducer<>(configs, new
StringSerializer(), new StringSerializer()).close());
}
+
+ @Test
+ public void testSubscribingCustomMetricsDoesntAffectProducerMetrics() {
+ Map<String, Object> props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+
+ KafkaProducer<String, String> producer = new KafkaProducer<>(
+ props, new StringSerializer(), new StringSerializer());
+
+ Map<MetricName, KafkaMetric> customMetrics = customMetrics();
+ customMetrics.forEach((name, metric) ->
producer.registerMetricForSubscription(metric));
+
+ Map<MetricName, ? extends Metric> producerMetrics = producer.metrics();
+ customMetrics.forEach((name, metric) ->
assertFalse(producerMetrics.containsKey(name)));
+ }
+
+ @Test
+ public void testUnSubscribingNonExisingMetricsDoesntCauseError() {
+ Map<String, Object> props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+
+ KafkaProducer<String, String> producer = new KafkaProducer<>(
+ props, new StringSerializer(), new StringSerializer());
+
+ Map<MetricName, KafkaMetric> customMetrics = customMetrics();
+ //Metrics never registered but removed should not cause an error
+ customMetrics.forEach((name, metric) -> assertDoesNotThrow(() ->
producer.unregisterMetricFromSubscription(metric)));
+ }
+
+ @Test
+ public void
testSubscribingCustomMetricsWithSameNameDoesntAffectProducerMetrics() {
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister()) {
+ appender.setClassLogger(KafkaProducer.class, Level.DEBUG);
+ Map<String, Object> props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9999");
+ KafkaProducer<String, String> producer = new KafkaProducer<>(
+ props, new StringSerializer(), new StringSerializer());
+ KafkaMetric existingMetricToAdd = (KafkaMetric)
producer.metrics().entrySet().iterator().next().getValue();
+ producer.registerMetricForSubscription(existingMetricToAdd);
+ final String expectedMessage = String.format("Skipping
registration for metric %s. Existing producer metrics cannot be overwritten.",
existingMetricToAdd.metricName());
+ assertTrue(appender.getMessages().stream().anyMatch(m ->
m.contains(expectedMessage)));
+ }
+ }
+
+ @Test
+ public void
testUnsubscribingCustomMetricWithSameNameAsExistingMetricDoesntAffectProducerMetric()
{
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister()) {
+ appender.setClassLogger(KafkaProducer.class, Level.DEBUG);
+ Map<String, Object> props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9999");
+ KafkaProducer<String, String> producer = new KafkaProducer<>(
+ props, new StringSerializer(), new StringSerializer());
+ KafkaMetric existingMetricToRemove = (KafkaMetric)
producer.metrics().entrySet().iterator().next().getValue();
+ producer.unregisterMetricFromSubscription(existingMetricToRemove);
+ final String expectedMessage = String.format("Skipping
unregistration for metric %s. Existing producer metrics cannot be removed.",
existingMetricToRemove.metricName());
+ assertTrue(appender.getMessages().stream().anyMatch(m ->
m.contains(expectedMessage)));
+ }
+ }
+
+ @Test
+ public void
testShouldOnlyCallMetricReporterMetricChangeOnceWithExistingProducerMetric() {
+ try (MockedStatic<CommonClientConfigs> mockedCommonClientConfigs =
mockStatic(CommonClientConfigs.class, new CallsRealMethods())) {
+ ClientTelemetryReporter clientTelemetryReporter =
mock(ClientTelemetryReporter.class);
+ clientTelemetryReporter.configure(any());
+ mockedCommonClientConfigs.when(() ->
CommonClientConfigs.telemetryReporter(anyString(),
any())).thenReturn(Optional.of(clientTelemetryReporter));
+
+ Map<String, Object> props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9999");
+ KafkaProducer<String, String> producer = new KafkaProducer<>(
+ props, new StringSerializer(), new StringSerializer());
+ KafkaMetric existingMetric = (KafkaMetric)
producer.metrics().entrySet().iterator().next().getValue();
+ producer.registerMetricForSubscription(existingMetric);
+ // This test would fail without the check as the exising metric is
registered in the producer on startup
+ Mockito.verify(clientTelemetryReporter,
atMostOnce()).metricChange(existingMetric);
+ }
+ }
+
+ @Test
+ public void
testShouldNotCallMetricReporterMetricRemovalWithExistingProducerMetric() {
+ try (MockedStatic<CommonClientConfigs> mockedCommonClientConfigs =
mockStatic(CommonClientConfigs.class, new CallsRealMethods())) {
+ ClientTelemetryReporter clientTelemetryReporter =
mock(ClientTelemetryReporter.class);
+ clientTelemetryReporter.configure(any());
+ mockedCommonClientConfigs.when(() ->
CommonClientConfigs.telemetryReporter(anyString(),
any())).thenReturn(Optional.of(clientTelemetryReporter));
+
+ Map<String, Object> props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9999");
+ KafkaProducer<String, String> producer = new KafkaProducer<>(
+ props, new StringSerializer(), new StringSerializer());
+ KafkaMetric existingMetric = (KafkaMetric)
producer.metrics().entrySet().iterator().next().getValue();
+ producer.unregisterMetricFromSubscription(existingMetric);
+ // This test would fail without the check as the exising metric is
registered in the consumer on startup
+ Mockito.verify(clientTelemetryReporter,
never()).metricRemoval(existingMetric);
+ }
+ }
+
+
+ private Map<MetricName, KafkaMetric> customMetrics() {
+ MetricConfig metricConfig = new MetricConfig();
+ Object lock = new Object();
+ MetricName metricNameOne = new MetricName("metricOne",
"stream-metrics", "description for metric one", new HashMap<>());
+ MetricName metricNameTwo = new MetricName("metricTwo",
"stream-metrics", "description for metric two", new HashMap<>());
+
+ KafkaMetric streamClientMetricOne = new KafkaMetric(lock,
metricNameOne, (Measurable) (m, now) -> 1.0, metricConfig, Time.SYSTEM);
+ KafkaMetric streamClientMetricTwo = new KafkaMetric(lock,
metricNameTwo, (Measurable) (m, now) -> 2.0, metricConfig, Time.SYSTEM);
+ return Map.of(metricNameOne, streamClientMetricOne, metricNameTwo,
streamClientMetricTwo);
+ }
+
+
+
}