This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 0.11.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/0.11.0 by this push: new df96b7a KAFKA-7660: fix streams and Metrics memory leaks (#5983) df96b7a is described below commit df96b7a46fd2854aeba7176a3bb9d1f0b76e85d7 Author: John Roesler <vvcep...@users.noreply.github.com> AuthorDate: Wed Dec 5 19:17:44 2018 -0600 KAFKA-7660: fix streams and Metrics memory leaks (#5983) --- .../java/org/apache/kafka/common/metrics/Metrics.java | 6 ++++++ .../java/org/apache/kafka/common/metrics/Sensor.java | 10 ++++++++-- .../org/apache/kafka/common/metrics/MetricsTest.java | 16 ++++++++++++++++ .../streams/processor/internals/StreamsMetricsImpl.java | 5 ++--- .../processor/internals/StreamsMetricsImplTest.java | 14 +++++++------- 5 files changed, 39 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java index c4cd676..f6b6a4f 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java @@ -435,6 +435,12 @@ public class Metrics implements Closeable { removeMetric(metric.metricName()); log.debug("Removed sensor with name {}", name); childSensors = childrenSensors.remove(sensor); + for (final Sensor parent : sensor.parents()) { + final List<Sensor> peers = childrenSensors.get(parent); + if (peers != null) { + peers.remove(sensor); + } + } } } } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java index ae331e7..47f3fba 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java @@ -22,13 +22,15 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Set; import java.util.concurrent.TimeUnit; +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; + /** * A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on * message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set @@ -132,6 +134,10 @@ public final class Sensor { return this.name; } + List<Sensor> parents() { + return unmodifiableList(asList(parents)); + } + /** * Record an occurrence, this is just short-hand for {@link #record(double) record(1.0)} */ @@ -266,6 +272,6 @@ public final class Sensor { } synchronized List<KafkaMetric> metrics() { - return Collections.unmodifiableList(this.metrics); + return unmodifiableList(this.metrics); } } diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java index 0904a41..1a0efa3 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.common.metrics; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -174,6 +176,20 @@ public class MetricsTest { } @Test + public void testRemoveChildSensor() { + final Metrics metrics = new Metrics(); + + final Sensor parent = metrics.sensor("parent"); + final Sensor child = metrics.sensor("child", parent); + + assertEquals(singletonList(child), metrics.childrenSensors().get(parent)); + + metrics.removeSensor("child"); + + assertEquals(emptyList(), metrics.childrenSensors().get(parent)); + } + + @Test public void testRemoveSensor() { int size = metrics.metrics().size(); Sensor parent1 = metrics.sensor("test.parent1"); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java index 7f269e0..7871ec4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java @@ -209,14 +209,13 @@ public class StreamsMetricsImpl implements StreamsMetrics { */ @Override public void removeSensor(Sensor sensor) { - Sensor parent = null; Objects.requireNonNull(sensor, "Sensor is null"); metrics.removeSensor(sensor.name()); - parent = parentSensors.get(sensor); + + final Sensor parent = parentSensors.remove(sensor); if (parent != null) { metrics.removeSensor(parent.name()); - parentSensors.remove(sensor); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java index 0f91ae1..7f16841 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; -import org.junit.Assert; import org.junit.Test; import java.util.Collections; @@ -30,6 +29,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; +import static java.util.Collections.unmodifiableMap; import static org.junit.Assert.assertEquals; public class StreamsMetricsImplTest { @@ -58,26 +58,26 @@ public class StreamsMetricsImplTest { String operation = "put"; Map<String, String> tags = new HashMap<>(); final Metrics metrics = new Metrics(); - final Map<MetricName, KafkaMetric> initialMetrics = Collections.unmodifiableMap(new LinkedHashMap<>(metrics.metrics())); + final Map<MetricName, KafkaMetric> initialMetrics = unmodifiableMap(new LinkedHashMap<>(metrics.metrics())); StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, groupName, tags); Sensor sensor1 = streamsMetrics.addSensor(sensorName, Sensor.RecordingLevel.DEBUG); streamsMetrics.removeSensor(sensor1); - Assert.assertEquals(initialMetrics, metrics.metrics()); + assertEquals(initialMetrics, metrics.metrics()); Sensor sensor1a = streamsMetrics.addSensor(sensorName, Sensor.RecordingLevel.DEBUG, sensor1); streamsMetrics.removeSensor(sensor1a); - Assert.assertEquals(initialMetrics, metrics.metrics()); + assertEquals(initialMetrics, metrics.metrics()); Sensor sensor2 = streamsMetrics.addLatencyAndThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG); streamsMetrics.removeSensor(sensor2); - Assert.assertEquals(initialMetrics, metrics.metrics()); + assertEquals(initialMetrics, metrics.metrics()); Sensor sensor3 = streamsMetrics.addThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG); streamsMetrics.removeSensor(sensor3); - Assert.assertEquals(initialMetrics, metrics.metrics()); + assertEquals(initialMetrics, metrics.metrics()); - Assert.assertEquals(Collections.emptyMap(), streamsMetrics.parentSensors); + assertEquals(Collections.emptyMap(), streamsMetrics.parentSensors); } @Test