This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new eb20a0d  KAFKA-9306: The consumer must close KafkaConsumerMetrics 
(#7839)
eb20a0d is described below

commit eb20a0d1b762f44a6922f90fec83a42c00942675
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Thu Dec 19 09:38:26 2019 -0800

    KAFKA-9306: The consumer must close KafkaConsumerMetrics (#7839)
    
    Reviewers: Vikas Singh <[email protected]>, Jason Gustafson 
<[email protected]>, Shailesh Panwar <[email protected]>
    (cherry picked from commit 7e36865541307554ebad410d61c8186f6d641c55)
---
 .../kafka/clients/consumer/KafkaConsumer.java      |  1 +
 .../consumer/internals/KafkaConsumerMetrics.java   | 23 +++++++++++++--------
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 24 ++++++++++++++++++++++
 3 files changed, 40 insertions(+), 8 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 77499ff..9d3cf6b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -2298,6 +2298,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
         }
         Utils.closeQuietly(fetcher, "fetcher", firstException);
         Utils.closeQuietly(interceptors, "consumer interceptors", 
firstException);
+        Utils.closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", 
firstException);
         Utils.closeQuietly(metrics, "consumer metrics", firstException);
         Utils.closeQuietly(client, "consumer network client", firstException);
         Utils.closeQuietly(keyDeserializer, "consumer key deserializer", 
firstException);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
index ae61ff1..aab3133 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
@@ -24,11 +25,11 @@ import org.apache.kafka.common.metrics.stats.Max;
 
 import java.util.concurrent.TimeUnit;
 
-public class KafkaConsumerMetrics {
+public class KafkaConsumerMetrics implements AutoCloseable {
     private final Metrics metrics;
-
-    private Sensor timeBetweenPollSensor;
-    private Sensor pollIdleSensor;
+    private final MetricName lastPollMetricName;
+    private final Sensor timeBetweenPollSensor;
+    private final Sensor pollIdleSensor;
     private long lastPollMs;
     private long pollStartMs;
     private long timeSinceLastPollMs;
@@ -44,10 +45,9 @@ public class KafkaConsumerMetrics {
             else
                 return TimeUnit.SECONDS.convert(now - lastPollMs, 
TimeUnit.MILLISECONDS);
         };
-        metrics.addMetric(metrics.metricName("last-poll-seconds-ago",
-                metricGroupName,
-                "The number of seconds since the last poll() invocation."),
-                lastPoll);
+        this.lastPollMetricName = metrics.metricName("last-poll-seconds-ago",
+                metricGroupName, "The number of seconds since the last poll() 
invocation.");
+        metrics.addMetric(lastPollMetricName, lastPoll);
 
         this.timeBetweenPollSensor = metrics.sensor("time-between-poll");
         
this.timeBetweenPollSensor.add(metrics.metricName("time-between-poll-avg",
@@ -78,4 +78,11 @@ public class KafkaConsumerMetrics {
         double pollIdleRatio = pollTimeMs * 1.0 / (pollTimeMs + 
timeSinceLastPollMs);
         this.pollIdleSensor.record(pollIdleRatio);
     }
+
+    @Override
+    public void close() {
+        metrics.removeMetric(lastPollMetricName);
+        metrics.removeSensor(timeBetweenPollSensor.name());
+        metrics.removeSensor(pollIdleSensor.name());
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 1fd9fb3..a31e77a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -2258,4 +2258,28 @@ public class KafkaConsumerTest {
         // Avg of three data points
         assertEquals((1.0d + 0.0d + 0.5d) / 3, 
consumer.metrics().get(pollIdleRatio).metricValue());
     }
+
+    private static boolean consumerMetricPresent(KafkaConsumer consumer, 
String name) {
+        MetricName metricName = new MetricName(name, "consumer-metrics", "", 
Collections.emptyMap());
+        return consumer.metrics.metrics().containsKey(metricName);
+    }
+
+    @Test
+    public void testClosingConsumerUnregistersConsumerMetrics() {
+        Time time = new MockTime();
+        SubscriptionState subscription = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.EARLIEST);
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, 
subscription, metadata,
+            new RoundRobinAssignor(), true, groupInstanceId);
+        consumer.subscribe(singletonList(topic));
+        assertTrue(consumerMetricPresent(consumer, "last-poll-seconds-ago"));
+        assertTrue(consumerMetricPresent(consumer, "time-between-poll-avg"));
+        assertTrue(consumerMetricPresent(consumer, "time-between-poll-max"));
+        consumer.close();
+        assertFalse(consumerMetricPresent(consumer, "last-poll-seconds-ago"));
+        assertFalse(consumerMetricPresent(consumer, "time-between-poll-avg"));
+        assertFalse(consumerMetricPresent(consumer, "time-between-poll-max"));
+    }
 }

Reply via email to