This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fc6e922 MINOR: fix NamedCache metrics in Streams (#4917)
fc6e922 is described below
commit fc6e92260c3ff3b5ef72ac05e7a518a8cb2e7090
Author: John Roesler <[email protected]>
AuthorDate: Thu Apr 26 12:01:17 2018 -0500
MINOR: fix NamedCache metrics in Streams (#4917)
* Fixes a bug in which all NamedCache instances in a process shared
one parent metric.
* Also fixes a bug which incorrectly computed the per-cache metric tag
(which was undetected due to the former bug).
* Drop the StreamsMetricsConventions#xLevelSensorName convention
in favor of StreamsMetricsImpl#xLevelSensor to allow StreamsMetricsImpl
to track thread- and cache-level metrics, so that they may be cleanly
declared
from anywhere but still unloaded at the appropriate time. This was necessary
right now so that the NamedCache could register a thread-level parent sensor
to be unloaded when the thread, not the cache, is closed.
* The above changes made it mostly unnecessary for the StreamsMetricsImpl to
expose a reference to the underlying Metrics registry, so I did a little
extra work
to remove that reference, including removing inconsistently-used and
unnecessary
calls to Metrics#close() in the tests.
The existing tests should be sufficient to verify this change.
Reviewers: Bill Bejeck <[email protected]>, Guozhang Wang
<[email protected]>
---
.../org/apache/kafka/streams/KafkaStreams.java | 3 +-
.../processor/internals/GlobalStreamThread.java | 2 +-
.../streams/processor/internals/StreamTask.java | 53 +++++++++-
.../streams/processor/internals/StreamThread.java | 82 ++++++---------
.../metrics/StreamsMetricsConventions.java | 39 -------
.../internals/metrics/StreamsMetricsImpl.java | 117 ++++++++++++++++++---
.../kafka/streams/state/internals/NamedCache.java | 83 +++++++++------
.../kafka/streams/state/internals/ThreadCache.java | 10 +-
...KStreamSessionWindowAggregateProcessorTest.java | 1 -
.../processor/internals/ProcessorNodeTest.java | 1 -
.../streams/processor/internals/SinkNodeTest.java | 6 --
.../processor/internals/StreamTaskTest.java | 1 -
.../processor/internals/StreamThreadTest.java | 9 --
.../state/internals/AbstractKeyValueStoreTest.java | 1 -
.../state/internals/CachingKeyValueStoreTest.java | 1 -
.../state/internals/CachingSessionStoreTest.java | 1 -
.../state/internals/CachingWindowStoreTest.java | 1 -
.../ChangeLoggingKeyValueBytesStoreTest.java | 1 -
.../state/internals/MeteredWindowStoreTest.java | 6 --
.../streams/state/internals/NamedCacheTest.java | 25 ++---
.../RocksDBKeyValueStoreSupplierTest.java | 1 -
.../internals/RocksDBSegmentedBytesStoreTest.java | 1 -
.../internals/RocksDBSessionStoreSupplierTest.java | 1 -
.../state/internals/RocksDBSessionStoreTest.java | 1 -
.../streams/state/internals/RocksDBStoreTest.java | 1 -
.../internals/RocksDBWindowStoreSupplierTest.java | 1 -
.../state/internals/RocksDBWindowStoreTest.java | 1 -
.../state/internals/SegmentIteratorTest.java | 1 -
.../streams/state/internals/SegmentsTest.java | 1 -
.../state/internals/StoreChangeLoggerTest.java | 6 --
.../kafka/test/InternalMockProcessorContext.java | 6 --
.../org/apache/kafka/test/KStreamTestDriver.java | 8 +-
32 files changed, 258 insertions(+), 214 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index a99f8cc..776dde7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -673,7 +673,8 @@ public class KafkaStreams {
throw new StreamsException(fatal);
}
- final MetricConfig metricConfig = new
MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
+ final MetricConfig metricConfig = new MetricConfig()
+ .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
TimeUnit.MILLISECONDS);
final List<MetricsReporter> reporters =
config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index af3c7db..1c34897 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -298,7 +298,7 @@ public class GlobalStreamThread extends Thread {
log.error("Failed to close state maintainer due to the
following error:", e);
}
- streamsMetrics.removeOwnedSensors();
+ streamsMetrics.removeAllThreadLevelSensors();
setState(DEAD);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index d9515b1..b975324 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -22,9 +22,14 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
@@ -42,6 +47,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static java.lang.String.format;
import static java.util.Collections.singleton;
@@ -72,16 +78,57 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
protected static final class TaskMetrics {
final StreamsMetricsImpl metrics;
final Sensor taskCommitTimeSensor;
+ private final String taskName;
TaskMetrics(final TaskId id, final StreamsMetricsImpl metrics) {
- final String name = id.toString();
+ taskName = id.toString();
this.metrics = metrics;
- taskCommitTimeSensor =
metrics.addLatencyAndThroughputSensor("task", name, "commit",
Sensor.RecordingLevel.DEBUG);
+ final String group = "stream-task-metrics";
+
+ // first add the global operation metrics if not yet, with the
global tags only
+ final Map<String, String> allTagMap = metrics.tagMap("task-id",
"all");
+ final Sensor parent = metrics.threadLevelSensor("commit",
Sensor.RecordingLevel.DEBUG);
+ parent.add(
+ new MetricName("commit-latency-avg", group, "The average
latency of commit operation.", allTagMap),
+ new Avg()
+ );
+ parent.add(
+ new MetricName("commit-latency-max", group, "The max latency
of commit operation.", allTagMap),
+ new Max()
+ );
+ parent.add(
+ new MetricName("commit-rate", group, "The average number of
occurrence of commit operation per second.", allTagMap),
+ new Rate(TimeUnit.SECONDS, new Count())
+ );
+ parent.add(
+ new MetricName("commit-total", group, "The total number of
occurrence of commit operations.", allTagMap),
+ new Count()
+ );
+
+ // add the operation metrics with additional tags
+ final Map<String, String> tagMap = metrics.tagMap("task-id",
taskName);
+ taskCommitTimeSensor = metrics.taskLevelSensor("commit", taskName,
Sensor.RecordingLevel.DEBUG, parent);
+ taskCommitTimeSensor.add(
+ new MetricName("commit-latency-avg", group, "The average
latency of commit operation.", tagMap),
+ new Avg()
+ );
+ taskCommitTimeSensor.add(
+ new MetricName("commit-latency-max", group, "The max latency
of commit operation.", tagMap),
+ new Max()
+ );
+ taskCommitTimeSensor.add(
+ new MetricName("commit-rate", group, "The average number of
occurrence of commit operation per second.", tagMap),
+ new Rate(TimeUnit.SECONDS, new Count())
+ );
+ taskCommitTimeSensor.add(
+ new MetricName("commit-total", group, "The total number of
occurrence of commit operations.", tagMap),
+ new Count()
+ );
}
void removeAllSensors() {
- metrics.removeSensor(taskCommitTimeSensor);
+ metrics.removeAllTaskLevelSensors(taskName);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 39727be..e4ad138 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -52,10 +52,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -64,7 +62,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Collections.singleton;
-import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsConventions.threadLevelSensorName;
public class StreamThread extends Thread {
@@ -509,58 +506,41 @@ public class StreamThread extends Thread {
private final Sensor taskCreatedSensor;
private final Sensor tasksClosedSensor;
- private final Deque<String> ownedSensors = new LinkedList<>();
-
StreamsMetricsThreadImpl(final Metrics metrics, final String
threadName) {
super(metrics, threadName);
- final String groupName = "stream-metrics";
-
- commitTimeSensor =
metrics.sensor(threadLevelSensorName(threadName, "commit-latency"),
Sensor.RecordingLevel.INFO);
- commitTimeSensor.add(metrics.metricName("commit-latency-avg",
groupName, "The average commit time in ms", tags()), new Avg());
- commitTimeSensor.add(metrics.metricName("commit-latency-max",
groupName, "The maximum commit time in ms", tags()), new Max());
- commitTimeSensor.add(metrics.metricName("commit-rate", groupName,
"The average per-second number of commit calls", tags()), new
Rate(TimeUnit.SECONDS, new Count()));
- commitTimeSensor.add(metrics.metricName("commit-total", groupName,
"The total number of commit calls", tags()), new Count());
- ownedSensors.push(commitTimeSensor.name());
-
- pollTimeSensor = metrics.sensor(threadLevelSensorName(threadName,
"poll-latency"), Sensor.RecordingLevel.INFO);
- pollTimeSensor.add(metrics.metricName("poll-latency-avg",
groupName, "The average poll time in ms", tags()), new Avg());
- pollTimeSensor.add(metrics.metricName("poll-latency-max",
groupName, "The maximum poll time in ms", tags()), new Max());
- pollTimeSensor.add(metrics.metricName("poll-rate", groupName, "The
average per-second number of record-poll calls", tags()), new
Rate(TimeUnit.SECONDS, new Count()));
- pollTimeSensor.add(metrics.metricName("poll-total", groupName,
"The total number of record-poll calls", tags()), new Count());
- ownedSensors.push(pollTimeSensor.name());
-
- processTimeSensor =
metrics.sensor(threadLevelSensorName(threadName, "process-latency"),
Sensor.RecordingLevel.INFO);
- processTimeSensor.add(metrics.metricName("process-latency-avg",
groupName, "The average process time in ms", tags()), new Avg());
- processTimeSensor.add(metrics.metricName("process-latency-max",
groupName, "The maximum process time in ms", tags()), new Max());
- processTimeSensor.add(metrics.metricName("process-rate",
groupName, "The average per-second number of process calls", tags()), new
Rate(TimeUnit.SECONDS, new Count()));
- processTimeSensor.add(metrics.metricName("process-total",
groupName, "The total number of process calls", tags()), new Count());
- ownedSensors.push(processTimeSensor.name());
-
- punctuateTimeSensor =
metrics.sensor(threadLevelSensorName(threadName, "punctuate-latency"),
Sensor.RecordingLevel.INFO);
-
punctuateTimeSensor.add(metrics.metricName("punctuate-latency-avg", groupName,
"The average punctuate time in ms", tags()), new Avg());
-
punctuateTimeSensor.add(metrics.metricName("punctuate-latency-max", groupName,
"The maximum punctuate time in ms", tags()), new Max());
- punctuateTimeSensor.add(metrics.metricName("punctuate-rate",
groupName, "The average per-second number of punctuate calls", tags()), new
Rate(TimeUnit.SECONDS, new Count()));
- punctuateTimeSensor.add(metrics.metricName("punctuate-total",
groupName, "The total number of punctuate calls", tags()), new Count());
- ownedSensors.push(punctuateTimeSensor.name());
-
- taskCreatedSensor =
metrics.sensor(threadLevelSensorName(threadName, "task-created"),
Sensor.RecordingLevel.INFO);
+ final String group = "stream-metrics";
+
+ commitTimeSensor = threadLevelSensor("commit-latency",
Sensor.RecordingLevel.INFO);
+ commitTimeSensor.add(metrics.metricName("commit-latency-avg",
group, "The average commit time in ms", tags()), new Avg());
+ commitTimeSensor.add(metrics.metricName("commit-latency-max",
group, "The maximum commit time in ms", tags()), new Max());
+ commitTimeSensor.add(metrics.metricName("commit-rate", group, "The
average per-second number of commit calls", tags()), new Rate(TimeUnit.SECONDS,
new Count()));
+ commitTimeSensor.add(metrics.metricName("commit-total", group,
"The total number of commit calls", tags()), new Count());
+
+ pollTimeSensor = threadLevelSensor("poll-latency",
Sensor.RecordingLevel.INFO);
+ pollTimeSensor.add(metrics.metricName("poll-latency-avg", group,
"The average poll time in ms", tags()), new Avg());
+ pollTimeSensor.add(metrics.metricName("poll-latency-max", group,
"The maximum poll time in ms", tags()), new Max());
+ pollTimeSensor.add(metrics.metricName("poll-rate", group, "The
average per-second number of record-poll calls", tags()), new
Rate(TimeUnit.SECONDS, new Count()));
+ pollTimeSensor.add(metrics.metricName("poll-total", group, "The
total number of record-poll calls", tags()), new Count());
+
+ processTimeSensor = threadLevelSensor("process-latency",
Sensor.RecordingLevel.INFO);
+ processTimeSensor.add(metrics.metricName("process-latency-avg",
group, "The average process time in ms", tags()), new Avg());
+ processTimeSensor.add(metrics.metricName("process-latency-max",
group, "The maximum process time in ms", tags()), new Max());
+ processTimeSensor.add(metrics.metricName("process-rate", group,
"The average per-second number of process calls", tags()), new
Rate(TimeUnit.SECONDS, new Count()));
+ processTimeSensor.add(metrics.metricName("process-total", group,
"The total number of process calls", tags()), new Count());
+
+ punctuateTimeSensor = threadLevelSensor("punctuate-latency",
Sensor.RecordingLevel.INFO);
+
punctuateTimeSensor.add(metrics.metricName("punctuate-latency-avg", group, "The
average punctuate time in ms", tags()), new Avg());
+
punctuateTimeSensor.add(metrics.metricName("punctuate-latency-max", group, "The
maximum punctuate time in ms", tags()), new Max());
+ punctuateTimeSensor.add(metrics.metricName("punctuate-rate",
group, "The average per-second number of punctuate calls", tags()), new
Rate(TimeUnit.SECONDS, new Count()));
+ punctuateTimeSensor.add(metrics.metricName("punctuate-total",
group, "The total number of punctuate calls", tags()), new Count());
+
+ taskCreatedSensor = threadLevelSensor("task-created",
Sensor.RecordingLevel.INFO);
taskCreatedSensor.add(metrics.metricName("task-created-rate",
"stream-metrics", "The average per-second number of newly created tasks",
tags()), new Rate(TimeUnit.SECONDS, new Count()));
taskCreatedSensor.add(metrics.metricName("task-created-total",
"stream-metrics", "The total number of newly created tasks", tags()), new
Total());
- ownedSensors.push(taskCreatedSensor.name());
-
- tasksClosedSensor =
metrics.sensor(threadLevelSensorName(threadName, "task-closed"),
Sensor.RecordingLevel.INFO);
- tasksClosedSensor.add(metrics.metricName("task-closed-rate",
groupName, "The average per-second number of closed tasks", tags()), new
Rate(TimeUnit.SECONDS, new Count()));
- tasksClosedSensor.add(metrics.metricName("task-closed-total",
groupName, "The total number of closed tasks", tags()), new Total());
- ownedSensors.push(tasksClosedSensor.name());
- }
- public void removeOwnedSensors() {
- synchronized (ownedSensors) {
- super.removeOwnedSensors();
- while (!ownedSensors.isEmpty()) {
- registry().removeSensor(ownedSensors.pop());
- }
- }
+ tasksClosedSensor = threadLevelSensor("task-closed",
Sensor.RecordingLevel.INFO);
+ tasksClosedSensor.add(metrics.metricName("task-closed-rate",
group, "The average per-second number of closed tasks", tags()), new
Rate(TimeUnit.SECONDS, new Count()));
+ tasksClosedSensor.add(metrics.metricName("task-closed-total",
group, "The total number of closed tasks", tags()), new Total());
}
}
@@ -1165,7 +1145,7 @@ public class StreamThread extends Thread {
} catch (final Throwable e) {
log.error("Failed to close restore consumer due to the following
error:", e);
}
- streamsMetrics.removeOwnedSensors();
+ streamsMetrics.removeAllThreadLevelSensors();
setState(State.DEAD);
log.info("Shutdown complete");
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsConventions.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsConventions.java
deleted file mode 100644
index cfe206c..0000000
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsConventions.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals.metrics;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-public final class StreamsMetricsConventions {
- private StreamsMetricsConventions() {
- }
-
- public static String threadLevelSensorName(final String threadName, final
String sensorName) {
- return "thread." + threadName + "." + sensorName;
- }
-
- static Map<String, String> threadLevelTags(final String threadName, final
Map<String, String> tags) {
- if (tags.containsKey("client-id")) {
- return tags;
- } else {
- final LinkedHashMap<String, String> newTags = new
LinkedHashMap<>(tags);
- newTags.put("client-id", threadName);
- return newTags;
- }
- }
-}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 76a1d2b..0251265 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -31,35 +31,125 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
-import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsConventions.threadLevelSensorName;
-
public class StreamsMetricsImpl implements StreamsMetrics {
private final Metrics metrics;
private final Map<String, String> tags;
private final Map<Sensor, Sensor> parentSensors;
- private final Deque<String> ownedSensors = new LinkedList<>();
private final Sensor skippedRecordsSensor;
+ private final String threadName;
+
+ private final Deque<String> threadLevelSensors = new LinkedList<>();
+ private final Map<String, Deque<String>> taskLevelSensors = new
HashMap<>();
+ private final Map<String, Deque<String>> cacheLevelSensors = new
HashMap<>();
public StreamsMetricsImpl(final Metrics metrics, final String threadName) {
Objects.requireNonNull(metrics, "Metrics cannot be null");
+ this.threadName = threadName;
this.metrics = metrics;
- this.tags = StreamsMetricsConventions.threadLevelTags(threadName,
Collections.<String, String>emptyMap());
+
+
+ final HashMap<String, String> tags = new LinkedHashMap<>();
+ tags.put("client-id", threadName);
+ this.tags = Collections.unmodifiableMap(tags);
+
this.parentSensors = new HashMap<>();
- skippedRecordsSensor =
metrics.sensor(threadLevelSensorName(threadName, "skipped-records"),
Sensor.RecordingLevel.INFO);
- skippedRecordsSensor.add(metrics.metricName("skipped-records-rate",
"stream-metrics", "The average per-second number of skipped records", tags),
new Rate(TimeUnit.SECONDS, new Count()));
- skippedRecordsSensor.add(metrics.metricName("skipped-records-total",
"stream-metrics", "The total number of skipped records", tags), new Total());
- ownedSensors.push(skippedRecordsSensor.name());
+ final String group = "stream-metrics";
+ skippedRecordsSensor = threadLevelSensor("skipped-records",
Sensor.RecordingLevel.INFO);
+ skippedRecordsSensor.add(metrics.metricName("skipped-records-rate",
group, "The average per-second number of skipped records", tags), new
Rate(TimeUnit.SECONDS, new Count()));
+ skippedRecordsSensor.add(metrics.metricName("skipped-records-total",
group, "The total number of skipped records", tags), new Total());
+ }
+
+ public final Sensor threadLevelSensor(final String sensorName,
+ final Sensor.RecordingLevel
recordingLevel,
+ final Sensor... parents) {
+ synchronized (threadLevelSensors) {
+ final String fullSensorName = threadName + "." + sensorName;
+ final Sensor sensor = metrics.sensor(fullSensorName,
recordingLevel, parents);
+ threadLevelSensors.push(fullSensorName);
+
+ return sensor;
+ }
+ }
+
+ public final void removeAllThreadLevelSensors() {
+ synchronized (threadLevelSensors) {
+ while (!threadLevelSensors.isEmpty()) {
+ metrics.removeSensor(threadLevelSensors.pop());
+ }
+ }
+ }
+
+ public final Sensor taskLevelSensor(final String taskName,
+ final String sensorName,
+ final Sensor.RecordingLevel
recordingLevel,
+ final Sensor... parents) {
+ final String key = threadName + "." + taskName;
+ synchronized (taskLevelSensors) {
+ if (!taskLevelSensors.containsKey(key)) {
+ taskLevelSensors.put(key, new LinkedList<String>());
+ }
+
+ final String fullSensorName = key + "." + sensorName;
+
+ final Sensor sensor = metrics.sensor(fullSensorName,
recordingLevel, parents);
+
+ taskLevelSensors.get(key).push(fullSensorName);
+
+ return sensor;
+ }
}
- public final Metrics registry() {
- return metrics;
+ public final void removeAllTaskLevelSensors(final String taskName) {
+ final String key = threadName + "." + taskName;
+ synchronized (taskLevelSensors) {
+ if (taskLevelSensors.containsKey(key)) {
+ while (!taskLevelSensors.get(key).isEmpty()) {
+ metrics.removeSensor(taskLevelSensors.get(key).pop());
+ }
+ taskLevelSensors.remove(key);
+ }
+ }
+ }
+
+ public final Sensor cacheLevelSensor(final String taskName,
+ final String cacheName,
+ final String sensorName,
+ final Sensor.RecordingLevel
recordingLevel,
+ final Sensor... parents) {
+ final String key = threadName + "." + taskName + "." + cacheName;
+ synchronized (cacheLevelSensors) {
+ if (!cacheLevelSensors.containsKey(key)) {
+ cacheLevelSensors.put(key, new LinkedList<String>());
+ }
+
+ final String fullSensorName = key + "." + sensorName;
+
+ final Sensor sensor = metrics.sensor(fullSensorName,
recordingLevel, parents);
+
+ cacheLevelSensors.get(key).push(fullSensorName);
+
+ return sensor;
+ }
+ }
+
+ public final void removeAllCacheLevelSensors(final String taskName, final
String cacheName) {
+ final String key = threadName + "." + taskName + "." + cacheName;
+ synchronized (cacheLevelSensors) {
+ if (cacheLevelSensors.containsKey(key)) {
+ while (!cacheLevelSensors.get(key).isEmpty()) {
+ metrics.removeSensor(cacheLevelSensors.get(key).pop());
+ }
+ cacheLevelSensors.remove(key);
+ }
+ }
}
protected final Map<String, String> tags() {
@@ -236,11 +326,4 @@ public class StreamsMetricsImpl implements StreamsMetrics {
}
}
- public void removeOwnedSensors() {
- synchronized (ownedSensors) {
- while (!ownedSensors.isEmpty()) {
- metrics.removeSensor(ownedSensors.pop());
- }
- }
- }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index de62a2d..d058c9c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -16,13 +16,13 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,7 +53,7 @@ class NamedCache {
private long numOverwrites = 0;
private long numFlushes = 0;
- NamedCache(final String name, final StreamsMetrics metrics) {
+ NamedCache(final String name, final StreamsMetricsImpl metrics) {
this.name = name;
this.namedCacheMetrics = new NamedCacheMetrics(metrics, name);
}
@@ -355,45 +355,66 @@ class NamedCache {
private static class NamedCacheMetrics {
private final StreamsMetricsImpl metrics;
- private final String groupName;
- private final Map<String, String> metricTags;
- private final Map<String, String> allMetricTags;
+
private final Sensor hitRatioSensor;
+ private final String taskName;
+ private final String cacheName;
- private NamedCacheMetrics(final StreamsMetrics metrics, final String
name) {
- final String scope = "record-cache";
- final String opName = "hitRatio";
- final String tagKey = scope + "-id";
- final String tagValue =
ThreadCache.underlyingStoreNamefromCacheName(name);
- this.groupName = "stream-" + scope + "-metrics";
- this.metrics = (StreamsMetricsImpl) metrics;
- this.allMetricTags = ((StreamsMetricsImpl) metrics).tagMap(tagKey,
"all",
- "task-id", ThreadCache.taskIDfromCacheName(name));
- this.metricTags = ((StreamsMetricsImpl) metrics).tagMap(tagKey,
tagValue,
- "task-id", ThreadCache.taskIDfromCacheName(name));
+ private NamedCacheMetrics(final StreamsMetricsImpl metrics, final
String cacheName) {
+ taskName = ThreadCache.taskIDfromCacheName(cacheName);
+ this.cacheName = cacheName;
+ this.metrics = metrics;
+ final String group = "stream-record-cache-metrics";
// add parent
- final Sensor parent = this.metrics.registry().sensor(opName,
Sensor.RecordingLevel.DEBUG);
- parent.add(this.metrics.registry().metricName(opName + "-avg",
groupName,
- "The average cache hit ratio.", allMetricTags), new Avg());
- parent.add(this.metrics.registry().metricName(opName + "-min",
groupName,
- "The minimum cache hit ratio.", allMetricTags), new Min());
- parent.add(this.metrics.registry().metricName(opName + "-max",
groupName,
- "The maximum cache hit ratio.", allMetricTags), new Max());
+ final Map<String, String> allMetricTags = metrics.tagMap(
+ "record-cache-id", "all",
+ "task-id", taskName
+ );
+ final Sensor taskLevelHitRatioSensor =
metrics.taskLevelSensor("hitRatio", taskName, Sensor.RecordingLevel.DEBUG);
+ taskLevelHitRatioSensor.add(
+ new MetricName("hitRatio-avg", group, "The average cache hit
ratio.", allMetricTags),
+ new Avg()
+ );
+ taskLevelHitRatioSensor.add(
+ new MetricName("hitRatio-min", group, "The minimum cache hit
ratio.", allMetricTags),
+ new Min()
+ );
+ taskLevelHitRatioSensor.add(
+ new MetricName("hitRatio-max", group, "The maximum cache hit
ratio.", allMetricTags),
+ new Max()
+ );
// add child
- hitRatioSensor = this.metrics.registry().sensor(opName,
Sensor.RecordingLevel.DEBUG, parent);
- hitRatioSensor.add(this.metrics.registry().metricName(opName +
"-avg", groupName,
- "The average cache hit ratio.", metricTags), new Avg());
- hitRatioSensor.add(this.metrics.registry().metricName(opName +
"-min", groupName,
- "The minimum cache hit ratio.", metricTags), new Min());
- hitRatioSensor.add(this.metrics.registry().metricName(opName +
"-max", groupName,
- "The maximum cache hit ratio.", metricTags), new Max());
+ final Map<String, String> metricTags = metrics.tagMap(
+ "record-cache-id",
ThreadCache.underlyingStoreNamefromCacheName(cacheName),
+ "task-id", taskName
+ );
+
+ hitRatioSensor = metrics.cacheLevelSensor(
+ taskName,
+ cacheName,
+ "hitRatio",
+ Sensor.RecordingLevel.DEBUG,
+ taskLevelHitRatioSensor
+ );
+ hitRatioSensor.add(
+ new MetricName("hitRatio-avg", group, "The average cache hit
ratio.", metricTags),
+ new Avg()
+ );
+ hitRatioSensor.add(
+ new MetricName("hitRatio-min", group, "The minimum cache hit
ratio.", metricTags),
+ new Min()
+ );
+ hitRatioSensor.add(
+ new MetricName("hitRatio-max", group, "The maximum cache hit
ratio.", metricTags),
+ new Max()
+ );
}
private void removeAllSensors() {
- metrics.removeSensor(hitRatioSensor);
+ metrics.removeAllCacheLevelSensors(taskName, cacheName);
}
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index b1fd198..b947664 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -19,8 +19,8 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.internals.RecordContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger;
import java.util.Collections;
@@ -39,7 +39,7 @@ import java.util.NoSuchElementException;
public class ThreadCache {
private final Logger log;
private final long maxCacheSizeBytes;
- private final StreamsMetrics metrics;
+ private final StreamsMetricsImpl metrics;
private final Map<String, NamedCache> caches = new HashMap<>();
// internal stats
@@ -52,7 +52,7 @@ public class ThreadCache {
void apply(final List<DirtyEntry> dirty);
}
- public ThreadCache(final LogContext logContext, long maxCacheSizeBytes,
final StreamsMetrics metrics) {
+ public ThreadCache(final LogContext logContext, long maxCacheSizeBytes,
final StreamsMetricsImpl metrics) {
this.maxCacheSizeBytes = maxCacheSizeBytes;
this.metrics = metrics;
this.log = logContext.logger(getClass());
@@ -91,7 +91,7 @@ public class ThreadCache {
* @return
*/
public static String taskIDfromCacheName(final String cacheName) {
- String[] tokens = cacheName.split("-");
+ String[] tokens = cacheName.split("-", 2);
return tokens[0];
}
@@ -101,7 +101,7 @@ public class ThreadCache {
* @return
*/
public static String underlyingStoreNamefromCacheName(final String
cacheName) {
- String[] tokens = cacheName.split("-");
+ String[] tokens = cacheName.split("-", 2);
return tokens[1];
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 301d448..8cb2eae 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -121,7 +121,6 @@ public class KStreamSessionWindowAggregateProcessorTest {
@After
public void closeStore() {
- context.close();
sessionStore.close();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 1409d68..a7a2610 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -148,7 +148,6 @@ public class ProcessorNodeTest {
"The average number of occurrence of " + throughputOperation + "
operation per second.", metricTags)));
- context.close();
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index 0013167..753d26b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -26,7 +26,6 @@ import
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext;
-import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -55,11 +54,6 @@ public class SinkNodeTest {
sink.init(context);
}
- @After
- public void after() {
- context.close();
- }
-
@Test
@SuppressWarnings("unchecked")
public void shouldThrowStreamsExceptionOnInputRecordWithInvalidTimestamp()
{
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 28e0b46..598e47e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -232,7 +232,6 @@ public class StreamTaskTest {
public void testMetrics() {
task = createStatelessTask(createConfig(false));
- assertNotNull(metrics.getSensor("commit"));
assertNotNull(getMetric("%s-latency-avg", "The average latency of %s
operation.", task.id().toString()));
assertNotNull(getMetric("%s-latency-max", "The max latency of %s
operation.", task.id().toString()));
assertNotNull(getMetric("%s-rate", "The average number of occurrence
of %s operation per second.", task.id().toString()));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 36a1bce..3ae7acb 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -243,17 +243,8 @@ public class StreamThreadTest {
public void testMetricsCreatedAtStartup() {
final StreamThread thread = createStreamThread(clientId, config,
false);
final String defaultGroupName = "stream-metrics";
- final String defaultPrefix = "thread." + thread.getName();
final Map<String, String> defaultTags =
Collections.singletonMap("client-id", thread.getName());
- assertNotNull(metrics.getSensor(defaultPrefix + ".commit-latency"));
- assertNotNull(metrics.getSensor(defaultPrefix + ".poll-latency"));
- assertNotNull(metrics.getSensor(defaultPrefix + ".process-latency"));
- assertNotNull(metrics.getSensor(defaultPrefix + ".punctuate-latency"));
- assertNotNull(metrics.getSensor(defaultPrefix + ".task-created"));
- assertNotNull(metrics.getSensor(defaultPrefix + ".task-closed"));
- assertNotNull(metrics.getSensor(defaultPrefix + ".skipped-records"));
-
assertNotNull(metrics.metrics().get(metrics.metricName("commit-latency-avg",
defaultGroupName, "The average commit time in ms", defaultTags)));
assertNotNull(metrics.metrics().get(metrics.metricName("commit-latency-max",
defaultGroupName, "The maximum commit time in ms", defaultTags)));
assertNotNull(metrics.metrics().get(metrics.metricName("commit-rate",
defaultGroupName, "The average per-second number of commit calls",
defaultTags)));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 51c782a..c4536df 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -63,7 +63,6 @@ public abstract class AbstractKeyValueStoreTest {
@After
public void after() {
store.close();
- context.close();
driver.clear();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 8705326..3e0241e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -82,7 +82,6 @@ public class CachingKeyValueStoreTest extends
AbstractKeyValueStoreTest {
@After
public void after() {
super.after();
- context.close();
}
@SuppressWarnings("unchecked")
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index a9a66e9..b77f4e9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -82,7 +82,6 @@ public class CachingSessionStoreTest {
@After
public void close() {
- context.close();
cachingStore.close();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index c25655b..a87b2e4 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -87,7 +87,6 @@ public class CachingWindowStoreTest {
@After
public void closeStore() {
- context.close();
cachingStore.close();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 7342c93..5bb0de7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -76,7 +76,6 @@ public class ChangeLoggingKeyValueBytesStoreTest {
@After
public void after() {
- context.close();
store.close();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index eab523e..19bd523 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -35,7 +35,6 @@ import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
-import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -83,11 +82,6 @@ public class MeteredWindowStoreTest {
);
}
- @After
- public void after() {
- context.close();
- }
-
@Test
public void shouldRecordRestoreLatencyOnInit() {
innerStoreMock.init(context, store);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 6b410dc..9ae0feb 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -16,12 +16,11 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.junit.Before;
import org.junit.Test;
@@ -44,13 +43,14 @@ import static org.junit.Assert.assertSame;
public class NamedCacheTest {
private NamedCache cache;
- private MockStreamsMetrics streamMetrics;
+ private StreamsMetricsImpl metrics;
private final String taskIDString = "0.0";
private final String underlyingStoreName = "storeName";
+
@Before
public void setUp() {
- streamMetrics = new MockStreamsMetrics(new Metrics());
- cache = new NamedCache(taskIDString + "-" + underlyingStoreName,
streamMetrics);
+ metrics = new MockStreamsMetrics(new Metrics());
+ cache = new NamedCache(taskIDString + "-" + underlyingStoreName,
metrics);
}
@Test
@@ -83,18 +83,15 @@ public class NamedCacheTest {
metricTags.put("task-id", taskIDString);
metricTags.put("client-id", "test");
- assertNotNull(streamMetrics.registry().getSensor("hitRatio"));
- final Map<MetricName, KafkaMetric> metrics1 =
streamMetrics.registry().metrics();
- getMetricByNameFilterByTags(metrics1, "hitRatio-avg",
"stream-record-cache-metrics", metricTags);
- getMetricByNameFilterByTags(metrics1, "hitRatio-min",
"stream-record-cache-metrics", metricTags);
- getMetricByNameFilterByTags(metrics1, "hitRatio-max",
"stream-record-cache-metrics", metricTags);
+ getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-avg",
"stream-record-cache-metrics", metricTags);
+ getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-min",
"stream-record-cache-metrics", metricTags);
+ getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-max",
"stream-record-cache-metrics", metricTags);
// test "all"
metricTags.put("record-cache-id", "all");
- final Map<MetricName, KafkaMetric> metrics =
streamMetrics.registry().metrics();
- getMetricByNameFilterByTags(metrics, "hitRatio-avg",
"stream-record-cache-metrics", metricTags);
- getMetricByNameFilterByTags(metrics, "hitRatio-min",
"stream-record-cache-metrics", metricTags);
- getMetricByNameFilterByTags(metrics, "hitRatio-max",
"stream-record-cache-metrics", metricTags);
+ getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-avg",
"stream-record-cache-metrics", metricTags);
+ getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-min",
"stream-record-cache-metrics", metricTags);
+ getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-max",
"stream-record-cache-metrics", metricTags);
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
index 098c326..b25b8cb 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
@@ -53,7 +53,6 @@ public class RocksDBKeyValueStoreSupplierTest {
@After
public void close() {
- context.close();
store.close();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index 388a2fc..bd2fa91 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -82,7 +82,6 @@ public class RocksDBSegmentedBytesStoreTest {
@After
public void close() {
- context.close();
bytesStore.close();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
index 272e0b0..c50dfba 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
@@ -68,7 +68,6 @@ public class RocksDBSessionStoreSupplierTest {
@After
public void close() {
- context.close();
store.close();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index 6495315..bcb411b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -69,7 +69,6 @@ public class RocksDBSessionStoreTest {
@After
public void close() {
- context.close();
sessionStore.close();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index a09d87d..b7a9d37 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -79,7 +79,6 @@ public class RocksDBStoreTest {
@After
public void tearDown() {
rocksDBStore.close();
- context.close();
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
index a6ccfdf..7409a13 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
@@ -53,7 +53,6 @@ public class RocksDBWindowStoreSupplierTest {
@After
public void close() {
- context.close();
if (store != null) {
store.close();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index bf556ad..92edbd8 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -126,7 +126,6 @@ public class RocksDBWindowStoreTest {
@After
public void closeStore() {
- context.close();
if (windowStore != null) {
windowStore.close();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
index d61218e..7a7b266 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
@@ -76,7 +76,6 @@ public class SegmentIteratorTest {
}
segmentOne.close();
segmentTwo.close();
- context.close();
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
index ec59a00..bfa317d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
@@ -65,7 +65,6 @@ public class SegmentsTest {
@After
public void close() {
- context.close();
segments.close();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 21b5c5c..6bacd91 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext;
-import org.junit.After;
import org.junit.Test;
import java.util.HashMap;
@@ -68,11 +67,6 @@ public class StoreChangeLoggerTest {
private final StoreChangeLogger<Integer, String> changeLogger = new
StoreChangeLogger<>(topic, context, StateSerdes.withBuiltinTypes(topic,
Integer.class, String.class));
- @After
- public void after() {
- context.close();
- }
-
@Test
public void testAddRemove() {
context.setTime(1);
diff --git
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 57e3efb..5e61910 100644
---
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -51,7 +51,6 @@ import java.util.Map;
public class InternalMockProcessorContext extends AbstractProcessorContext
implements RecordCollector.Supplier {
private final File stateDir;
- private final Metrics metrics;
private final RecordCollector.Supplier recordCollectorSupplier;
private final Map<String, StateStore> storeMap = new LinkedHashMap<>();
private final Map<String, StateRestoreCallback> restoreFuncs = new
HashMap<>();
@@ -135,7 +134,6 @@ public class InternalMockProcessorContext extends
AbstractProcessorContext imple
this.stateDir = stateDir;
this.keySerde = keySerde;
this.valSerde = valSerde;
- this.metrics = metrics.registry();
this.recordCollectorSupplier = collectorSupplier;
}
@@ -306,10 +304,6 @@ public class InternalMockProcessorContext extends
AbstractProcessorContext imple
restoreListener.onRestoreEnd(null, storeName, 0L);
}
- public void close() {
- metrics.close();
- }
-
private StateRestoreListener getStateRestoreListener(StateRestoreCallback
restoreCallback) {
if (restoreCallback instanceof StateRestoreListener) {
return (StateRestoreListener) restoreCallback;
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 7313414..3daf051 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -147,7 +147,12 @@ public class KStreamTestDriver extends ExternalResource {
private void initTopology(final ProcessorTopology topology, final
List<StateStore> stores) {
for (final StateStore store : stores) {
- store.init(context, store);
+ try {
+ store.init(context, store);
+ } catch (final RuntimeException e) {
+ new RuntimeException("Fatal exception initializing store.",
e).printStackTrace();
+ throw e;
+ }
}
for (final ProcessorNode node : topology.processors()) {
@@ -230,7 +235,6 @@ public class KStreamTestDriver extends ExternalResource {
}
closeState();
- context.close();
}
public Set<String> allProcessorNames() {
--
To stop receiving notification emails like this one, please contact
[email protected].