This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1c998f8ef36 KAFKA-17869: Adding tests to ensure KIP-1076 doesn't
interfere with consumer metrics[1/3] (#17781)
1c998f8ef36 is described below
commit 1c998f8ef3620ed6bf782a01696f61811da2dea2
Author: Bill Bejeck <[email protected]>
AuthorDate: Thu Nov 21 13:41:29 2024 -0500
KAFKA-17869: Adding tests to ensure KIP-1076 doesn't interfere with
consumer metrics[1/3] (#17781)
Adding tests to ensure the KIP-1076 methods don't interfere with existing
metrics in clients
Reviewers: Apoorv Mittal <[email protected]>, Matthias Sax
<[email protected]>
---
checkstyle/import-control.xml | 1 +
.../consumer/internals/AsyncKafkaConsumer.java | 14 ++-
.../consumer/internals/ClassicKafkaConsumer.java | 14 ++-
.../kafka/clients/consumer/KafkaConsumerTest.java | 123 +++++++++++++++++++++
4 files changed, 140 insertions(+), 12 deletions(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index eb4131fbc47..bb6f48a73f0 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -225,6 +225,7 @@
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.clients" exact-match="true"/>
<allow pkg="org.apache.kafka.test" />
+ <allow class="org.apache.log4j.Level" />
<subpackage name="consumer">
<allow pkg="org.apache.kafka.clients.consumer" />
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index da3f3f2f25b..ba7eed19f11 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -662,17 +662,19 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
- if (clientTelemetryReporter.isPresent()) {
- ClientTelemetryReporter reporter = clientTelemetryReporter.get();
- reporter.metricChange(metric);
+ if (!metrics().containsKey(metric.metricName())) {
+ clientTelemetryReporter.ifPresent(reporter ->
reporter.metricChange(metric));
+ } else {
+ log.debug("Skipping registration for metric {}. Existing consumer
metrics cannot be overwritten.", metric.metricName());
}
}
@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
- if (clientTelemetryReporter.isPresent()) {
- ClientTelemetryReporter reporter = clientTelemetryReporter.get();
- reporter.metricRemoval(metric);
+ if (!metrics().containsKey(metric.metricName())) {
+ clientTelemetryReporter.ifPresent(reporter ->
reporter.metricRemoval(metric));
+ } else {
+ log.debug("Skipping unregistration for metric {}. Existing
consumer metrics cannot be removed.", metric.metricName());
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index 82a9bd2a53b..e423c261763 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -430,17 +430,19 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
- if (clientTelemetryReporter.isPresent()) {
- ClientTelemetryReporter reporter = clientTelemetryReporter.get();
- reporter.metricChange(metric);
+ if (!metrics().containsKey(metric.metricName())) {
+ clientTelemetryReporter.ifPresent(reporter ->
reporter.metricChange(metric));
+ } else {
+ log.debug("Skipping registration for metric {}. Existing consumer
metrics cannot be overwritten.", metric.metricName());
}
}
@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
- if (clientTelemetryReporter.isPresent()) {
- ClientTelemetryReporter reporter = clientTelemetryReporter.get();
- reporter.metricRemoval(metric);
+ if (!metrics().containsKey(metric.metricName())) {
+ clientTelemetryReporter.ifPresent(reporter ->
reporter.metricRemoval(metric));
+ } else {
+ log.debug("Skipping unregistration for metric {}. Existing
consumer metrics cannot be removed.", metric.metricName());
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index c260fa48c01..7d122c2986c 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -21,6 +21,8 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
+import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
@@ -58,6 +60,9 @@ import
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartit
import
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@@ -88,6 +93,7 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
@@ -96,11 +102,13 @@ import org.apache.kafka.test.MockConsumerInterceptor;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.TestUtils;
+import org.apache.log4j.Level;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.MockedStatic;
+import org.mockito.Mockito;
import org.mockito.internal.stubbing.answers.CallsRealMethods;
import java.lang.management.ManagementFactory;
@@ -145,6 +153,7 @@ import static java.util.Collections.singletonMap;
import static
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.DEFAULT_REASON;
import static
org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
import static org.apache.kafka.common.utils.Utils.propsToMap;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -157,8 +166,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.atMostOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -224,6 +235,107 @@ public class KafkaConsumerTest {
}
}
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void
testSubscribingCustomMetricsDoesntAffectConsumerMetrics(GroupProtocol
groupProtocol) {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name());
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9999");
+ consumer = newConsumer(props, new StringDeserializer(), new
StringDeserializer());
+
+ Map<MetricName, KafkaMetric> customMetrics = customMetrics();
+ customMetrics.forEach((name, metric) ->
consumer.registerMetricForSubscription(metric));
+
+ Map<MetricName, ? extends Metric> consumerMetrics = consumer.metrics();
+ customMetrics.forEach((name, metric) ->
assertFalse(consumerMetrics.containsKey(name)));
+ }
+
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void
testSubscribingCustomMetricsWithSameNameDoesntAffectConsumerMetrics(GroupProtocol
groupProtocol) {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name());
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9999");
+ Class<?> consumerClass = groupProtocol == GroupProtocol.CLASSIC ?
ClassicKafkaConsumer.class : AsyncKafkaConsumer.class;
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister()) {
+ appender.setClassLogger(consumerClass, Level.DEBUG);
+ consumer = newConsumer(props, new StringDeserializer(), new
StringDeserializer());
+ KafkaMetric existingMetricToAdd = (KafkaMetric)
consumer.metrics().entrySet().iterator().next().getValue();
+ consumer.registerMetricForSubscription(existingMetricToAdd);
+ final String expectedMessage = String.format("Skipping
registration for metric %s. Existing consumer metrics cannot be overwritten.",
existingMetricToAdd.metricName());
+ assertTrue(appender.getMessages().stream().anyMatch(m ->
m.contains(expectedMessage)));
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void
testUnsubscribingCustomMetricsWithSameNameDoesntAffectConsumerMetrics(GroupProtocol
groupProtocol) {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name());
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9999");
+ Class<?> consumerClass = groupProtocol == GroupProtocol.CLASSIC ?
ClassicKafkaConsumer.class : AsyncKafkaConsumer.class;
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister()) {
+ appender.setClassLogger(consumerClass, Level.DEBUG);
+ consumer = newConsumer(props, new StringDeserializer(), new
StringDeserializer());
+ KafkaMetric existingMetricToRemove = (KafkaMetric)
consumer.metrics().entrySet().iterator().next().getValue();
+ consumer.unregisterMetricFromSubscription(existingMetricToRemove);
+ final String expectedMessage = String.format("Skipping
unregistration for metric %s. Existing consumer metrics cannot be removed.",
existingMetricToRemove.metricName());
+ assertTrue(appender.getMessages().stream().anyMatch(m ->
m.contains(expectedMessage)));
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void
testShouldOnlyCallMetricReporterMetricChangeOnceWithExistingConsumerMetric(GroupProtocol
groupProtocol) {
+ try (MockedStatic<CommonClientConfigs> mockedCommonClientConfigs =
mockStatic(CommonClientConfigs.class, new CallsRealMethods())) {
+ ClientTelemetryReporter clientTelemetryReporter =
mock(ClientTelemetryReporter.class);
+ clientTelemetryReporter.configure(any());
+ mockedCommonClientConfigs.when(() ->
CommonClientConfigs.telemetryReporter(anyString(),
any())).thenReturn(Optional.of(clientTelemetryReporter));
+
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name());
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9999");
+ consumer = newConsumer(props, new StringDeserializer(), new
StringDeserializer());
+
+ KafkaMetric existingMetric = (KafkaMetric)
consumer.metrics().entrySet().iterator().next().getValue();
+ consumer.registerMetricForSubscription(existingMetric);
+ // This test would fail without the check as the exising metric is
registered in the consumer on startup
+ Mockito.verify(clientTelemetryReporter,
atMostOnce()).metricChange(existingMetric);
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void
testShouldNotCallMetricReporterMetricRemovalWithExistingConsumerMetric(GroupProtocol
groupProtocol) {
+ try (MockedStatic<CommonClientConfigs> mockedCommonClientConfigs =
mockStatic(CommonClientConfigs.class, new CallsRealMethods())) {
+ ClientTelemetryReporter clientTelemetryReporter =
mock(ClientTelemetryReporter.class);
+ clientTelemetryReporter.configure(any());
+ mockedCommonClientConfigs.when(() ->
CommonClientConfigs.telemetryReporter(anyString(),
any())).thenReturn(Optional.of(clientTelemetryReporter));
+
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name());
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9999");
+ consumer = newConsumer(props, new StringDeserializer(), new
StringDeserializer());
+
+ KafkaMetric existingMetric = (KafkaMetric)
consumer.metrics().entrySet().iterator().next().getValue();
+ consumer.unregisterMetricFromSubscription(existingMetric);
+ Mockito.verify(clientTelemetryReporter,
never()).metricRemoval(existingMetric);
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void
testUnSubscribingNonExisingMetricsDoesntCauseError(GroupProtocol groupProtocol)
{
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name());
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9999");
+ consumer = newConsumer(props, new StringDeserializer(), new
StringDeserializer());
+
+ Map<MetricName, KafkaMetric> customMetrics = customMetrics();
+ //Metrics never registered but removed should not cause an error
+ customMetrics.forEach((name, metric) -> assertDoesNotThrow(() ->
consumer.unregisterMetricFromSubscription(metric)));
+ }
+
@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testMetricsReporterAutoGeneratedClientId(GroupProtocol
groupProtocol) {
@@ -3501,6 +3613,17 @@ public void
testClosingConsumerUnregistersConsumerMetrics(GroupProtocol groupPro
return client.requests().stream().anyMatch(request ->
request.requestBuilder().apiKey().equals(apiKey));
}
+ private Map<MetricName, KafkaMetric> customMetrics() {
+ MetricConfig metricConfig = new MetricConfig();
+ Object lock = new Object();
+ MetricName metricNameOne = new MetricName("metricOne",
"stream-metrics", "description for metric one", new HashMap<>());
+ MetricName metricNameTwo = new MetricName("metricTwo",
"stream-metrics", "description for metric two", new HashMap<>());
+
+ KafkaMetric streamClientMetricOne = new KafkaMetric(lock,
metricNameOne, (Measurable) (m, now) -> 1.0, metricConfig, Time.SYSTEM);
+ KafkaMetric streamClientMetricTwo = new KafkaMetric(lock,
metricNameTwo, (Measurable) (m, now) -> 2.0, metricConfig, Time.SYSTEM);
+ return Map.of(metricNameOne, streamClientMetricOne, metricNameTwo,
streamClientMetricTwo);
+ }
+
private static final List<String> CLIENT_IDS = new ArrayList<>();
public static class DeserializerForClientId implements
Deserializer<byte[]> {
@Override