This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 0.11.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.11.0 by this push:
     new df96b7a  KAFKA-7660: fix streams and Metrics memory leaks (#5983)
df96b7a is described below

commit df96b7a46fd2854aeba7176a3bb9d1f0b76e85d7
Author: John Roesler <vvcep...@users.noreply.github.com>
AuthorDate: Wed Dec 5 19:17:44 2018 -0600

    KAFKA-7660: fix streams and Metrics memory leaks (#5983)
---
 .../java/org/apache/kafka/common/metrics/Metrics.java    |  6 ++++++
 .../java/org/apache/kafka/common/metrics/Sensor.java     | 10 ++++++++--
 .../org/apache/kafka/common/metrics/MetricsTest.java     | 16 ++++++++++++++++
 .../streams/processor/internals/StreamsMetricsImpl.java  |  5 ++---
 .../processor/internals/StreamsMetricsImplTest.java      | 14 +++++++-------
 5 files changed, 39 insertions(+), 12 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index c4cd676..f6b6a4f 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -435,6 +435,12 @@ public class Metrics implements Closeable {
                             removeMetric(metric.metricName());
                         log.debug("Removed sensor with name {}", name);
                         childSensors = childrenSensors.remove(sensor);
+                        for (final Sensor parent : sensor.parents()) {
+                            final List<Sensor> peers = 
childrenSensors.get(parent);
+                            if (peers != null) {
+                                peers.remove(sensor);
+                            }
+                        }
                     }
                 }
             }
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index ae331e7..47f3fba 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -22,13 +22,15 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import static java.util.Arrays.asList;
+import static java.util.Collections.unmodifiableList;
+
 /**
  * A sensor applies a continuous sequence of numerical values to a set of 
associated metrics. For example a sensor on
  * message size would record a sequence of message sizes using the {@link 
#record(double)} api and would maintain a set
@@ -132,6 +134,10 @@ public final class Sensor {
         return this.name;
     }
 
+    List<Sensor> parents() {
+        return unmodifiableList(asList(parents));
+    }
+
     /**
      * Record an occurrence, this is just short-hand for {@link 
#record(double) record(1.0)}
      */
@@ -266,6 +272,6 @@ public final class Sensor {
     }
 
     synchronized List<KafkaMetric> metrics() {
-        return Collections.unmodifiableList(this.metrics);
+        return unmodifiableList(this.metrics);
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java 
b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 0904a41..1a0efa3 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.metrics;
 
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -174,6 +176,20 @@ public class MetricsTest {
     }
 
     @Test
+    public void testRemoveChildSensor() {
+        final Metrics metrics = new Metrics();
+
+        final Sensor parent = metrics.sensor("parent");
+        final Sensor child = metrics.sensor("child", parent);
+
+        assertEquals(singletonList(child), 
metrics.childrenSensors().get(parent));
+
+        metrics.removeSensor("child");
+
+        assertEquals(emptyList(), metrics.childrenSensors().get(parent));
+    }
+
+    @Test
     public void testRemoveSensor() {
         int size = metrics.metrics().size();
         Sensor parent1 = metrics.sensor("test.parent1");
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
index 7f269e0..7871ec4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
@@ -209,14 +209,13 @@ public class StreamsMetricsImpl implements StreamsMetrics 
{
      */
     @Override
     public void removeSensor(Sensor sensor) {
-        Sensor parent = null;
         Objects.requireNonNull(sensor, "Sensor is null");
 
         metrics.removeSensor(sensor.name());
-        parent = parentSensors.get(sensor);
+
+        final Sensor parent = parentSensors.remove(sensor);
         if (parent != null) {
             metrics.removeSensor(parent.name());
-            parentSensors.remove(sensor);
         }
 
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
index 0f91ae1..7f16841 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
-import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -30,6 +29,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import static java.util.Collections.unmodifiableMap;
 import static org.junit.Assert.assertEquals;
 
 public class StreamsMetricsImplTest {
@@ -58,26 +58,26 @@ public class StreamsMetricsImplTest {
         String operation = "put";
         Map<String, String> tags = new HashMap<>();
         final Metrics metrics = new Metrics();
-        final Map<MetricName, KafkaMetric> initialMetrics = 
Collections.unmodifiableMap(new LinkedHashMap<>(metrics.metrics()));
+        final Map<MetricName, KafkaMetric> initialMetrics = 
unmodifiableMap(new LinkedHashMap<>(metrics.metrics()));
         StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, 
groupName, tags);
 
         Sensor sensor1 = streamsMetrics.addSensor(sensorName, 
Sensor.RecordingLevel.DEBUG);
         streamsMetrics.removeSensor(sensor1);
-        Assert.assertEquals(initialMetrics, metrics.metrics());
+        assertEquals(initialMetrics, metrics.metrics());
 
         Sensor sensor1a = streamsMetrics.addSensor(sensorName, 
Sensor.RecordingLevel.DEBUG, sensor1);
         streamsMetrics.removeSensor(sensor1a);
-        Assert.assertEquals(initialMetrics, metrics.metrics());
+        assertEquals(initialMetrics, metrics.metrics());
 
         Sensor sensor2 = streamsMetrics.addLatencyAndThroughputSensor(scope, 
entity, operation, Sensor.RecordingLevel.DEBUG);
         streamsMetrics.removeSensor(sensor2);
-        Assert.assertEquals(initialMetrics, metrics.metrics());
+        assertEquals(initialMetrics, metrics.metrics());
 
         Sensor sensor3 = streamsMetrics.addThroughputSensor(scope, entity, 
operation, Sensor.RecordingLevel.DEBUG);
         streamsMetrics.removeSensor(sensor3);
-        Assert.assertEquals(initialMetrics, metrics.metrics());
+        assertEquals(initialMetrics, metrics.metrics());
 
-        Assert.assertEquals(Collections.emptyMap(), 
streamsMetrics.parentSensors);
+        assertEquals(Collections.emptyMap(), streamsMetrics.parentSensors);
     }
 
     @Test

Reply via email to