This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new eb20a0d KAFKA-9306: The consumer must close KafkaConsumerMetrics
(#7839)
eb20a0d is described below
commit eb20a0d1b762f44a6922f90fec83a42c00942675
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Thu Dec 19 09:38:26 2019 -0800
KAFKA-9306: The consumer must close KafkaConsumerMetrics (#7839)
Reviewers: Vikas Singh <[email protected]>, Jason Gustafson
<[email protected]>, Shailesh Panwar <[email protected]>
(cherry picked from commit 7e36865541307554ebad410d61c8186f6d641c55)
---
.../kafka/clients/consumer/KafkaConsumer.java | 1 +
.../consumer/internals/KafkaConsumerMetrics.java | 23 +++++++++++++--------
.../kafka/clients/consumer/KafkaConsumerTest.java | 24 ++++++++++++++++++++++
3 files changed, 40 insertions(+), 8 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 77499ff..9d3cf6b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -2298,6 +2298,7 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
}
Utils.closeQuietly(fetcher, "fetcher", firstException);
Utils.closeQuietly(interceptors, "consumer interceptors",
firstException);
+ Utils.closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics",
firstException);
Utils.closeQuietly(metrics, "consumer metrics", firstException);
Utils.closeQuietly(client, "consumer network client", firstException);
Utils.closeQuietly(keyDeserializer, "consumer key deserializer",
firstException);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
index ae61ff1..aab3133 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
@@ -24,11 +25,11 @@ import org.apache.kafka.common.metrics.stats.Max;
import java.util.concurrent.TimeUnit;
-public class KafkaConsumerMetrics {
+public class KafkaConsumerMetrics implements AutoCloseable {
private final Metrics metrics;
-
- private Sensor timeBetweenPollSensor;
- private Sensor pollIdleSensor;
+ private final MetricName lastPollMetricName;
+ private final Sensor timeBetweenPollSensor;
+ private final Sensor pollIdleSensor;
private long lastPollMs;
private long pollStartMs;
private long timeSinceLastPollMs;
@@ -44,10 +45,9 @@ public class KafkaConsumerMetrics {
else
return TimeUnit.SECONDS.convert(now - lastPollMs,
TimeUnit.MILLISECONDS);
};
- metrics.addMetric(metrics.metricName("last-poll-seconds-ago",
- metricGroupName,
- "The number of seconds since the last poll() invocation."),
- lastPoll);
+ this.lastPollMetricName = metrics.metricName("last-poll-seconds-ago",
+ metricGroupName, "The number of seconds since the last poll()
invocation.");
+ metrics.addMetric(lastPollMetricName, lastPoll);
this.timeBetweenPollSensor = metrics.sensor("time-between-poll");
this.timeBetweenPollSensor.add(metrics.metricName("time-between-poll-avg",
@@ -78,4 +78,11 @@ public class KafkaConsumerMetrics {
double pollIdleRatio = pollTimeMs * 1.0 / (pollTimeMs +
timeSinceLastPollMs);
this.pollIdleSensor.record(pollIdleRatio);
}
+
+ @Override
+ public void close() {
+ metrics.removeMetric(lastPollMetricName);
+ metrics.removeSensor(timeBetweenPollSensor.name());
+ metrics.removeSensor(pollIdleSensor.name());
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 1fd9fb3..a31e77a 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -2258,4 +2258,28 @@ public class KafkaConsumerTest {
// Avg of three data points
assertEquals((1.0d + 0.0d + 0.5d) / 3,
consumer.metrics().get(pollIdleRatio).metricValue());
}
+
+ private static boolean consumerMetricPresent(KafkaConsumer consumer,
String name) {
+ MetricName metricName = new MetricName(name, "consumer-metrics", "",
Collections.emptyMap());
+ return consumer.metrics.metrics().containsKey(metricName);
+ }
+
+ @Test
+ public void testClosingConsumerUnregistersConsumerMetrics() {
+ Time time = new MockTime();
+ SubscriptionState subscription = new SubscriptionState(new
LogContext(), OffsetResetStrategy.EARLIEST);
+ ConsumerMetadata metadata = createMetadata(subscription);
+ MockClient client = new MockClient(time, metadata);
+ initMetadata(client, Collections.singletonMap(topic, 1));
+ KafkaConsumer<String, String> consumer = newConsumer(time, client,
subscription, metadata,
+ new RoundRobinAssignor(), true, groupInstanceId);
+ consumer.subscribe(singletonList(topic));
+ assertTrue(consumerMetricPresent(consumer, "last-poll-seconds-ago"));
+ assertTrue(consumerMetricPresent(consumer, "time-between-poll-avg"));
+ assertTrue(consumerMetricPresent(consumer, "time-between-poll-max"));
+ consumer.close();
+ assertFalse(consumerMetricPresent(consumer, "last-poll-seconds-ago"));
+ assertFalse(consumerMetricPresent(consumer, "time-between-poll-avg"));
+ assertFalse(consumerMetricPresent(consumer, "time-between-poll-max"));
+ }
}