This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cf2c5e9 MINOR: clean up node and store sensors (#5450)
cf2c5e9 is described below
commit cf2c5e9ffc066aad37090fb6f2953a602cd8621b
Author: John Roesler <[email protected]>
AuthorDate: Sat Aug 4 00:43:18 2018 -0500
MINOR: clean up node and store sensors (#5450)
Reviewers: Guozhang Wang <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../streams/processor/internals/ProcessorNode.java | 169 +++++++-------
.../streams/processor/internals/StreamTask.java | 12 +-
.../streams/processor/internals/StreamThread.java | 40 ++--
.../internals/metrics/StreamsMetricsImpl.java | 252 +++++++++++++--------
.../state/internals/MeteredKeyValueStore.java | 93 +++-----
.../state/internals/MeteredSessionStore.java | 43 ++--
.../state/internals/MeteredWindowStore.java | 51 +++--
.../streams/state/internals/metrics/Sensors.java | 48 ++++
.../processor/internals/ProcessorNodeTest.java | 22 +-
.../processor/internals/StreamTaskTest.java | 24 +-
.../internals/StreamsMetricsImplTest.java | 11 +-
.../StreamThreadStateStoreProviderTest.java | 4 +-
.../apache/kafka/streams/TopologyTestDriver.java | 3 +-
13 files changed, 426 insertions(+), 346 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 64ef538..8dc6417 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -31,6 +31,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency;
+import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+
public class ProcessorNode<K, V> {
// TODO: 'children' can be removed when #forward() via index is removed
@@ -42,32 +45,6 @@ public class ProcessorNode<K, V> {
private final String name;
private final Time time;
- private K key;
- private V value;
- private final Runnable processDelegate = new Runnable() {
- @Override
- public void run() {
- processor.process(key, value);
- }
- };
- private ProcessorContext context;
- private final Runnable initDelegate = new Runnable() {
- @Override
- public void run() {
- if (processor != null) {
- processor.init(context);
- }
- }
- };
- private final Runnable closeDelegate = new Runnable() {
- @Override
- public void run() {
- if (processor != null) {
- processor.close();
- }
- }
- };
-
public final Set<String> stateStores;
public ProcessorNode(final String name) {
@@ -107,10 +84,13 @@ public class ProcessorNode<K, V> {
}
public void init(final InternalProcessorContext context) {
- this.context = context;
try {
nodeMetrics = new NodeMetrics(context.metrics(), name, context);
- runAndMeasureLatency(time, initDelegate,
nodeMetrics.nodeCreationSensor);
+ final long startNs = time.nanoseconds();
+ if (processor != null) {
+ processor.init(context);
+ }
+ nodeMetrics.nodeCreationSensor.record(time.nanoseconds() -
startNs);
} catch (final Exception e) {
throw new StreamsException(String.format("failed to initialize
processor %s", name), e);
}
@@ -118,7 +98,11 @@ public class ProcessorNode<K, V> {
public void close() {
try {
- runAndMeasureLatency(time, closeDelegate,
nodeMetrics.nodeDestructionSensor);
+ final long startNs = time.nanoseconds();
+ if (processor != null) {
+ processor.close();
+ }
+ nodeMetrics.nodeDestructionSensor.record(time.nanoseconds() -
startNs);
nodeMetrics.removeAllSensors();
} catch (final Exception e) {
throw new StreamsException(String.format("failed to close
processor %s", name), e);
@@ -127,20 +111,15 @@ public class ProcessorNode<K, V> {
public void process(final K key, final V value) {
- this.key = key;
- this.value = value;
-
- runAndMeasureLatency(time, processDelegate,
nodeMetrics.nodeProcessTimeSensor);
+ final long startNs = time.nanoseconds();
+ processor.process(key, value);
+ nodeMetrics.nodeProcessTimeSensor.record(time.nanoseconds() - startNs);
}
public void punctuate(final long timestamp, final Punctuator punctuator) {
- final Runnable punctuateDelegate = new Runnable() {
- @Override
- public void run() {
- punctuator.punctuate(timestamp);
- }
- };
- runAndMeasureLatency(time, punctuateDelegate,
nodeMetrics.nodePunctuateTimeSensor);
+ final long startNs = time.nanoseconds();
+ punctuator.punctuate(timestamp);
+ nodeMetrics.nodePunctuateTimeSensor.record(time.nanoseconds() -
startNs);
}
/**
@@ -180,70 +159,90 @@ public class ProcessorNode<K, V> {
private final Sensor sourceNodeForwardSensor;
private final Sensor nodeCreationSensor;
private final Sensor nodeDestructionSensor;
+ private final String taskName;
+ private final String processorNodeName;
private NodeMetrics(final StreamsMetricsImpl metrics, final String
processorNodeName, final ProcessorContext context) {
this.metrics = metrics;
- // these are all latency metrics
- this.nodeProcessTimeSensor = metrics.addLatencyAndThroughputSensor(
- context.taskId().toString(),
- "processor-node",
- processorNodeName,
+ final String group = "stream-processor-node-metrics";
+ final String taskName = context.taskId().toString();
+ final Map<String, String> tagMap = metrics.tagMap("task-id",
context.taskId().toString(), "processor-node-id", processorNodeName);
+ final Map<String, String> allTagMap = metrics.tagMap("task-id",
context.taskId().toString(), "processor-node-id", "all");
+
+ nodeProcessTimeSensor =
createTaskAndNodeLatencyAndThroughputSensors(
"process",
- Sensor.RecordingLevel.DEBUG,
- "task-id", context.taskId().toString()
- );
- this.nodePunctuateTimeSensor =
metrics.addLatencyAndThroughputSensor(
- context.taskId().toString(),
- "processor-node",
+ metrics,
+ group,
+ taskName,
processorNodeName,
- "punctuate",
- Sensor.RecordingLevel.DEBUG,
- "task-id", context.taskId().toString()
+ allTagMap,
+ tagMap
);
- this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor(
- context.taskId().toString(),
- "processor-node",
+
+ nodePunctuateTimeSensor =
createTaskAndNodeLatencyAndThroughputSensors(
+ "punctuate",
+ metrics,
+ group,
+ taskName,
processorNodeName,
- "create",
- Sensor.RecordingLevel.DEBUG,
- "task-id", context.taskId().toString()
+ allTagMap,
+ tagMap
);
- this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor(
- context.taskId().toString(),
- "processor-node",
+
+ nodeCreationSensor = createTaskAndNodeLatencyAndThroughputSensors(
+ "create",
+ metrics,
+ group,
+ taskName,
processorNodeName,
- "destroy",
- Sensor.RecordingLevel.DEBUG,
- "task-id", context.taskId().toString()
+ allTagMap,
+ tagMap
);
- this.sourceNodeForwardSensor = metrics.addThroughputSensor(
- context.taskId().toString(),
- "processor-node",
+
+ // note: this metric can be removed in the future, as it is only
recorded before being immediately removed
+ nodeDestructionSensor =
createTaskAndNodeLatencyAndThroughputSensors(
+ "destroy",
+ metrics,
+ group,
+ taskName,
processorNodeName,
+ allTagMap,
+ tagMap
+ );
+
+ sourceNodeForwardSensor =
createTaskAndNodeLatencyAndThroughputSensors(
"forward",
- Sensor.RecordingLevel.DEBUG,
- "task-id", context.taskId().toString()
+ metrics,
+ group,
+ taskName,
+ processorNodeName,
+ allTagMap,
+ tagMap
);
+
+ this.taskName = taskName;
+ this.processorNodeName = processorNodeName;
}
private void removeAllSensors() {
- metrics.removeSensor(nodeProcessTimeSensor);
- metrics.removeSensor(nodePunctuateTimeSensor);
- metrics.removeSensor(sourceNodeForwardSensor);
- metrics.removeSensor(nodeCreationSensor);
- metrics.removeSensor(nodeDestructionSensor);
+ metrics.removeAllNodeLevelSensors(taskName, processorNodeName);
}
- }
- private static void runAndMeasureLatency(final Time time, final Runnable
action, final Sensor sensor) {
- long startNs = -1;
- if (sensor.shouldRecord()) {
- startNs = time.nanoseconds();
- }
- action.run();
- if (startNs != -1) {
- sensor.record(time.nanoseconds() - startNs);
+ private static Sensor
createTaskAndNodeLatencyAndThroughputSensors(final String operation,
+
final StreamsMetricsImpl metrics,
+
final String group,
+
final String taskName,
+
final String processorNodeName,
+
final Map<String, String> taskTags,
+
final Map<String, String> nodeTags) {
+ final Sensor parent = metrics.taskLevelSensor(taskName, operation,
Sensor.RecordingLevel.DEBUG);
+ addAvgMaxLatency(parent, group, taskTags, operation);
+ addInvocationRateAndCount(parent, group, taskTags, operation);
+ final Sensor sensor = metrics.nodeLevelSensor(taskName,
processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent);
+ addAvgMaxLatency(sensor, group, nodeTags, operation);
+ addInvocationRateAndCount(sensor, group, nodeTags, operation);
+ return sensor;
}
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 6f3b031..7835a54 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -77,6 +77,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
private int waits = WAIT_ON_PARTIAL_INPUT;
private final Time time;
private final TaskMetrics taskMetrics;
+ private Sensor closeSensor;
protected static final class TaskMetrics {
final StreamsMetricsImpl metrics;
@@ -158,8 +159,9 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
final StateDirectory stateDirectory,
final ThreadCache cache,
final Time time,
- final Producer<byte[], byte[]> producer) {
- this(id, partitions, topology, consumer, changelogReader, config,
metrics, stateDirectory, cache, time, producer, null);
+ final Producer<byte[], byte[]> producer,
+ final Sensor closeSensor) {
+ this(id, partitions, topology, consumer, changelogReader, config,
metrics, stateDirectory, cache, time, producer, null, closeSensor);
}
public StreamTask(final TaskId id,
@@ -173,11 +175,13 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
final ThreadCache cache,
final Time time,
final Producer<byte[], byte[]> producer,
- final RecordCollector recordCollector) {
+ final RecordCollector recordCollector,
+ final Sensor closeSensor) {
super(id, partitions, topology, consumer, changelogReader, false,
stateDirectory, config);
this.time = time;
this.producer = producer;
+ this.closeSensor = closeSensor;
this.taskMetrics = new TaskMetrics(id, metrics);
final ProductionExceptionHandler productionExceptionHandler =
config.defaultProductionExceptionHandler();
@@ -617,6 +621,8 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
}
}
+ closeSensor.record();
+
if (firstException != null) {
throw firstException;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 42f55ef..968e577 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -31,9 +31,7 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
-import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.utils.LogContext;
@@ -438,8 +436,8 @@ public class StreamThread extends Thread {
stateDirectory,
cache,
time,
- createProducer(taskId)
- );
+ createProducer(taskId),
+ streamsMetrics.tasksClosedSensor);
}
private Producer<byte[], byte[]> createProducer(final TaskId id) {
@@ -527,36 +525,30 @@ public class StreamThread extends Thread {
final String group = "stream-metrics";
commitTimeSensor = threadLevelSensor("commit-latency",
Sensor.RecordingLevel.INFO);
- commitTimeSensor.add(metrics.metricName("commit-latency-avg",
group, "The average commit time in ms", tags()), new Avg());
- commitTimeSensor.add(metrics.metricName("commit-latency-max",
group, "The maximum commit time in ms", tags()), new Max());
- commitTimeSensor.add(metrics.metricName("commit-rate", group, "The
average per-second number of commit calls", tags()), new Rate(TimeUnit.SECONDS,
new Count()));
- commitTimeSensor.add(metrics.metricName("commit-total", group,
"The total number of commit calls", tags()), new Count());
+ addAvgMaxLatency(commitTimeSensor, group, tagMap(), "commit");
+ addInvocationRateAndCount(commitTimeSensor, group, tagMap(),
"commit");
pollTimeSensor = threadLevelSensor("poll-latency",
Sensor.RecordingLevel.INFO);
- pollTimeSensor.add(metrics.metricName("poll-latency-avg", group,
"The average poll time in ms", tags()), new Avg());
- pollTimeSensor.add(metrics.metricName("poll-latency-max", group,
"The maximum poll time in ms", tags()), new Max());
- pollTimeSensor.add(metrics.metricName("poll-rate", group, "The
average per-second number of record-poll calls", tags()), new
Rate(TimeUnit.SECONDS, new Count()));
- pollTimeSensor.add(metrics.metricName("poll-total", group, "The
total number of record-poll calls", tags()), new Count());
+ addAvgMaxLatency(pollTimeSensor, group, tagMap(), "poll");
+ // can't use addInvocationRateAndCount due to non-standard
description string
+ pollTimeSensor.add(metrics.metricName("poll-rate", group, "The
average per-second number of record-poll calls", tagMap()), new
Rate(TimeUnit.SECONDS, new Count()));
+ pollTimeSensor.add(metrics.metricName("poll-total", group, "The
total number of record-poll calls", tagMap()), new Count());
processTimeSensor = threadLevelSensor("process-latency",
Sensor.RecordingLevel.INFO);
- processTimeSensor.add(metrics.metricName("process-latency-avg",
group, "The average process time in ms", tags()), new Avg());
- processTimeSensor.add(metrics.metricName("process-latency-max",
group, "The maximum process time in ms", tags()), new Max());
- processTimeSensor.add(metrics.metricName("process-rate", group,
"The average per-second number of process calls", tags()), new
Rate(TimeUnit.SECONDS, new Count()));
- processTimeSensor.add(metrics.metricName("process-total", group,
"The total number of process calls", tags()), new Count());
+ addAvgMaxLatency(processTimeSensor, group, tagMap(), "process");
+ addInvocationRateAndCount(processTimeSensor, group, tagMap(),
"process");
punctuateTimeSensor = threadLevelSensor("punctuate-latency",
Sensor.RecordingLevel.INFO);
-
punctuateTimeSensor.add(metrics.metricName("punctuate-latency-avg", group, "The
average punctuate time in ms", tags()), new Avg());
-
punctuateTimeSensor.add(metrics.metricName("punctuate-latency-max", group, "The
maximum punctuate time in ms", tags()), new Max());
- punctuateTimeSensor.add(metrics.metricName("punctuate-rate",
group, "The average per-second number of punctuate calls", tags()), new
Rate(TimeUnit.SECONDS, new Count()));
- punctuateTimeSensor.add(metrics.metricName("punctuate-total",
group, "The total number of punctuate calls", tags()), new Count());
+ addAvgMaxLatency(punctuateTimeSensor, group, tagMap(),
"punctuate");
+ addInvocationRateAndCount(punctuateTimeSensor, group, tagMap(),
"punctuate");
taskCreatedSensor = threadLevelSensor("task-created",
Sensor.RecordingLevel.INFO);
- taskCreatedSensor.add(metrics.metricName("task-created-rate",
"stream-metrics", "The average per-second number of newly created tasks",
tags()), new Rate(TimeUnit.SECONDS, new Count()));
- taskCreatedSensor.add(metrics.metricName("task-created-total",
"stream-metrics", "The total number of newly created tasks", tags()), new
Total());
+ taskCreatedSensor.add(metrics.metricName("task-created-rate",
"stream-metrics", "The average per-second number of newly created tasks",
tagMap()), new Rate(TimeUnit.SECONDS, new Count()));
+ taskCreatedSensor.add(metrics.metricName("task-created-total",
"stream-metrics", "The total number of newly created tasks", tagMap()), new
Total());
tasksClosedSensor = threadLevelSensor("task-closed",
Sensor.RecordingLevel.INFO);
- tasksClosedSensor.add(metrics.metricName("task-closed-rate",
group, "The average per-second number of closed tasks", tags()), new
Rate(TimeUnit.SECONDS, new Count()));
- tasksClosedSensor.add(metrics.metricName("task-closed-total",
group, "The total number of closed tasks", tags()), new Total());
+ tasksClosedSensor.add(metrics.metricName("task-closed-rate",
group, "The average per-second number of closed tasks", tagMap()), new
Rate(TimeUnit.SECONDS, new Count()));
+ tasksClosedSensor.add(metrics.metricName("task-closed-total",
group, "The total number of closed tasks", tagMap()), new Total());
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index d36acdc..56166a4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -31,7 +31,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
@@ -39,14 +38,18 @@ import java.util.concurrent.TimeUnit;
public class StreamsMetricsImpl implements StreamsMetrics {
private final Metrics metrics;
- private final Map<String, String> tags;
private final Map<Sensor, Sensor> parentSensors;
private final Sensor skippedRecordsSensor;
private final String threadName;
private final Deque<String> threadLevelSensors = new LinkedList<>();
private final Map<String, Deque<String>> taskLevelSensors = new
HashMap<>();
+ private final Map<String, Deque<String>> nodeLevelSensors = new
HashMap<>();
private final Map<String, Deque<String>> cacheLevelSensors = new
HashMap<>();
+ private final Map<String, Deque<String>> storeLevelSensors = new
HashMap<>();
+
+ private static final String SENSOR_PREFIX_DELIMITER = ".";
+ private static final String SENSOR_NAME_DELIMITER = ".s.";
public StreamsMetricsImpl(final Metrics metrics, final String threadName) {
Objects.requireNonNull(metrics, "Metrics cannot be null");
@@ -54,24 +57,19 @@ public class StreamsMetricsImpl implements StreamsMetrics {
this.metrics = metrics;
-
- final HashMap<String, String> tags = new LinkedHashMap<>();
- tags.put("client-id", threadName);
- this.tags = Collections.unmodifiableMap(tags);
-
this.parentSensors = new HashMap<>();
final String group = "stream-metrics";
skippedRecordsSensor = threadLevelSensor("skipped-records",
Sensor.RecordingLevel.INFO);
- skippedRecordsSensor.add(metrics.metricName("skipped-records-rate",
group, "The average per-second number of skipped records", tags), new
Rate(TimeUnit.SECONDS, new Count()));
- skippedRecordsSensor.add(metrics.metricName("skipped-records-total",
group, "The total number of skipped records", tags), new Total());
+ skippedRecordsSensor.add(new MetricName("skipped-records-rate", group,
"The average per-second number of skipped records", tagMap()), new
Rate(TimeUnit.SECONDS, new Count()));
+ skippedRecordsSensor.add(new MetricName("skipped-records-total",
group, "The total number of skipped records", tagMap()), new Total());
}
public final Sensor threadLevelSensor(final String sensorName,
final Sensor.RecordingLevel
recordingLevel,
final Sensor... parents) {
synchronized (threadLevelSensors) {
- final String fullSensorName = threadName + "." + sensorName;
+ final String fullSensorName = threadSensorPrefix() +
SENSOR_NAME_DELIMITER + sensorName;
final Sensor sensor = metrics.sensor(fullSensorName,
recordingLevel, parents);
threadLevelSensors.push(fullSensorName);
@@ -87,17 +85,21 @@ public class StreamsMetricsImpl implements StreamsMetrics {
}
}
+ private String threadSensorPrefix() {
+ return "internal" + SENSOR_PREFIX_DELIMITER + threadName;
+ }
+
public final Sensor taskLevelSensor(final String taskName,
final String sensorName,
final Sensor.RecordingLevel
recordingLevel,
final Sensor... parents) {
- final String key = threadName + "." + taskName;
+ final String key = taskSensorPrefix(taskName);
synchronized (taskLevelSensors) {
if (!taskLevelSensors.containsKey(key)) {
taskLevelSensors.put(key, new LinkedList<>());
}
- final String fullSensorName = key + "." + sensorName;
+ final String fullSensorName = key + SENSOR_NAME_DELIMITER +
sensorName;
final Sensor sensor = metrics.sensor(fullSensorName,
recordingLevel, parents);
@@ -108,7 +110,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
}
public final void removeAllTaskLevelSensors(final String taskName) {
- final String key = threadName + "." + taskName;
+ final String key = taskSensorPrefix(taskName);
synchronized (taskLevelSensors) {
if (taskLevelSensors.containsKey(key)) {
while (!taskLevelSensors.get(key).isEmpty()) {
@@ -119,18 +121,58 @@ public class StreamsMetricsImpl implements StreamsMetrics
{
}
}
+ private String taskSensorPrefix(final String taskName) {
+ return threadSensorPrefix() + SENSOR_PREFIX_DELIMITER + "task" +
SENSOR_PREFIX_DELIMITER + taskName;
+ }
+
+ public Sensor nodeLevelSensor(final String taskName,
+ final String processorNodeName,
+ final String sensorName,
+ final Sensor.RecordingLevel recordingLevel,
+ final Sensor... parents) {
+ final String key = nodeSensorPrefix(taskName, processorNodeName);
+ synchronized (nodeLevelSensors) {
+ if (!nodeLevelSensors.containsKey(key)) {
+ nodeLevelSensors.put(key, new LinkedList<>());
+ }
+
+ final String fullSensorName = key + SENSOR_NAME_DELIMITER +
sensorName;
+
+ final Sensor sensor = metrics.sensor(fullSensorName,
recordingLevel, parents);
+
+ nodeLevelSensors.get(key).push(fullSensorName);
+
+ return sensor;
+ }
+ }
+
+ public final void removeAllNodeLevelSensors(final String taskName, final
String processorNodeName) {
+ final String key = nodeSensorPrefix(taskName, processorNodeName);
+ synchronized (nodeLevelSensors) {
+ if (nodeLevelSensors.containsKey(key)) {
+ while (!nodeLevelSensors.get(key).isEmpty()) {
+ metrics.removeSensor(nodeLevelSensors.get(key).pop());
+ }
+ }
+ }
+ }
+
+ private String nodeSensorPrefix(final String taskName, final String
processorNodeName) {
+ return taskSensorPrefix(taskName) + SENSOR_PREFIX_DELIMITER + "node" +
SENSOR_PREFIX_DELIMITER + processorNodeName;
+ }
+
public final Sensor cacheLevelSensor(final String taskName,
final String cacheName,
final String sensorName,
final Sensor.RecordingLevel
recordingLevel,
final Sensor... parents) {
- final String key = threadName + "." + taskName + "." + cacheName;
+ final String key = cacheSensorPrefix(taskName, cacheName);
synchronized (cacheLevelSensors) {
if (!cacheLevelSensors.containsKey(key)) {
- cacheLevelSensors.put(key, new LinkedList<String>());
+ cacheLevelSensors.put(key, new LinkedList<>());
}
- final String fullSensorName = key + "." + sensorName;
+ final String fullSensorName = key + SENSOR_NAME_DELIMITER +
sensorName;
final Sensor sensor = metrics.sensor(fullSensorName,
recordingLevel, parents);
@@ -141,7 +183,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
}
public final void removeAllCacheLevelSensors(final String taskName, final
String cacheName) {
- final String key = threadName + "." + taskName + "." + cacheName;
+ final String key = cacheSensorPrefix(taskName, cacheName);
synchronized (cacheLevelSensors) {
if (cacheLevelSensors.containsKey(key)) {
while (!cacheLevelSensors.get(key).isEmpty()) {
@@ -152,8 +194,45 @@ public class StreamsMetricsImpl implements StreamsMetrics {
}
}
- protected final Map<String, String> tags() {
- return tags;
+ private String cacheSensorPrefix(final String taskName, final String
cacheName) {
+ return taskSensorPrefix(taskName) + SENSOR_PREFIX_DELIMITER + "cache"
+ SENSOR_PREFIX_DELIMITER + cacheName;
+ }
+
+ public final Sensor storeLevelSensor(final String taskName,
+ final String storeName,
+ final String sensorName,
+ final Sensor.RecordingLevel
recordingLevel,
+ final Sensor... parents) {
+ final String key = storeSensorPrefix(taskName, storeName);
+ synchronized (storeLevelSensors) {
+ if (!storeLevelSensors.containsKey(key)) {
+ storeLevelSensors.put(key, new LinkedList<>());
+ }
+
+ final String fullSensorName = key + SENSOR_NAME_DELIMITER +
sensorName;
+
+ final Sensor sensor = metrics.sensor(fullSensorName,
recordingLevel, parents);
+
+ storeLevelSensors.get(key).push(fullSensorName);
+
+ return sensor;
+ }
+ }
+
+ public final void removeAllStoreLevelSensors(final String taskName, final
String storeName) {
+ final String key = storeSensorPrefix(taskName, storeName);
+ synchronized (storeLevelSensors) {
+ if (storeLevelSensors.containsKey(key)) {
+ while (!storeLevelSensors.get(key).isEmpty()) {
+ metrics.removeSensor(storeLevelSensors.get(key).pop());
+ }
+ storeLevelSensors.remove(key);
+ }
+ }
+ }
+
+ private String storeSensorPrefix(final String taskName, final String
storeName) {
+ return taskSensorPrefix(taskName) + SENSOR_PREFIX_DELIMITER + "store"
+ SENSOR_PREFIX_DELIMITER + storeName;
}
public final Sensor skippedRecordsSensor() {
@@ -185,22 +264,8 @@ public class StreamsMetricsImpl implements StreamsMetrics {
sensor.record(value);
}
-
- private String groupNameFromScope(final String scopeName) {
- return "stream-" + scopeName + "-metrics";
- }
-
- private String sensorName(final String operationName, final String
entityName) {
- if (entityName == null) {
- return operationName;
- } else {
- return entityName + "-" + operationName;
- }
- }
-
- public Map<String, String> tagMap(final String... tags) {
- // extract the additional tags if there are any
- final Map<String, String> tagMap = new HashMap<>(this.tags);
+ public final Map<String, String> tagMap(final String... tags) {
+ final Map<String, String> tagMap = new HashMap<>();
if (tags != null) {
if ((tags.length % 2) != 0) {
throw new IllegalArgumentException("Tags needs to be specified
in key-value pairs");
@@ -209,6 +274,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
for (int i = 0; i < tags.length; i += 2)
tagMap.put(tags[i], tags[i + 1]);
}
+ tagMap.put("client-id", threadName);
return tagMap;
}
@@ -220,6 +286,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
return tagMap(updatedTags);
}
+
/**
* @throws IllegalArgumentException if tags is not constructed in
key-value pairs
*/
@@ -229,39 +296,25 @@ public class StreamsMetricsImpl implements StreamsMetrics
{
final String operationName,
final Sensor.RecordingLevel
recordingLevel,
final String... tags) {
-
- return addLatencyAndThroughputSensor(null,
- scopeName,
- entityName,
- operationName,
- recordingLevel,
- tags);
-
- }
-
- public Sensor addLatencyAndThroughputSensor(final String taskName,
- final String scopeName,
- final String entityName,
- final String operationName,
- final Sensor.RecordingLevel
recordingLevel,
- final String... tags) {
+ final String group = groupNameFromScope(scopeName);
final Map<String, String> tagMap = constructTags(scopeName,
entityName, tags);
final Map<String, String> allTagMap = constructTags(scopeName, "all",
tags);
// first add the global operation metrics if not yet, with the global
tags only
- final Sensor parent =
metrics.sensor(sensorName(buildUniqueSensorName(operationName, taskName),
null), recordingLevel);
- addLatencyMetrics(scopeName, parent, operationName, allTagMap);
- addThroughputMetrics(scopeName, parent, operationName, allTagMap);
+ final Sensor parent =
metrics.sensor(externalParentSensorName(operationName), recordingLevel);
+ addAvgMaxLatency(parent, group, allTagMap, operationName);
+ addInvocationRateAndCount(parent, group, allTagMap, operationName);
// add the operation metrics with additional tags
- final Sensor sensor =
metrics.sensor(sensorName(buildUniqueSensorName(operationName, taskName),
entityName), recordingLevel, parent);
- addLatencyMetrics(scopeName, sensor, operationName, tagMap);
- addThroughputMetrics(scopeName, sensor, operationName, tagMap);
+ final Sensor sensor =
metrics.sensor(externalChildSensorName(operationName, entityName),
recordingLevel, parent);
+ addAvgMaxLatency(sensor, group, tagMap, operationName);
+ addInvocationRateAndCount(sensor, group, tagMap, operationName);
parentSensors.put(sensor, parent);
return sensor;
+
}
/**
@@ -273,33 +326,18 @@ public class StreamsMetricsImpl implements StreamsMetrics
{
final String operationName,
final Sensor.RecordingLevel
recordingLevel,
final String... tags) {
-
- return addThroughputSensor(null,
- scopeName,
- entityName,
- operationName,
- recordingLevel,
- tags);
-
- }
-
- public Sensor addThroughputSensor(final String taskName,
- final String scopeName,
- final String entityName,
- final String operationName,
- final Sensor.RecordingLevel
recordingLevel,
- final String... tags) {
+ final String group = groupNameFromScope(scopeName);
final Map<String, String> tagMap = constructTags(scopeName,
entityName, tags);
final Map<String, String> allTagMap = constructTags(scopeName, "all",
tags);
// first add the global operation metrics if not yet, with the global
tags only
- final Sensor parent =
metrics.sensor(sensorName(buildUniqueSensorName(operationName, taskName),
null), recordingLevel);
- addThroughputMetrics(scopeName, parent, operationName, allTagMap);
+ final Sensor parent =
metrics.sensor(externalParentSensorName(operationName), recordingLevel);
+ addInvocationRateAndCount(parent, group, allTagMap, operationName);
// add the operation metrics with additional tags
- final Sensor sensor =
metrics.sensor(sensorName(buildUniqueSensorName(operationName, taskName),
entityName), recordingLevel, parent);
- addThroughputMetrics(scopeName, sensor, operationName, tagMap);
+ final Sensor sensor =
metrics.sensor(externalChildSensorName(operationName, entityName),
recordingLevel, parent);
+ addInvocationRateAndCount(sensor, group, tagMap, operationName);
parentSensors.put(sensor, parent);
@@ -307,46 +345,57 @@ public class StreamsMetricsImpl implements StreamsMetrics
{
}
+ private String externalChildSensorName(final String operationName, final
String entityName) {
+ return "external" + SENSOR_PREFIX_DELIMITER + threadName
+ + SENSOR_PREFIX_DELIMITER + "entity" + SENSOR_PREFIX_DELIMITER +
entityName
+ + SENSOR_NAME_DELIMITER + operationName;
+ }
- private String buildUniqueSensorName(final String operationName, final
String taskName) {
- final String task = taskName == null ? "" : taskName + ".";
- return threadName + "." + task + operationName;
+ private String externalParentSensorName(final String operationName) {
+ return "external" + SENSOR_PREFIX_DELIMITER + threadName +
SENSOR_NAME_DELIMITER + operationName;
}
- private void addLatencyMetrics(final String scopeName, final Sensor
sensor, final String opName, final Map<String, String> tags) {
+
+ public static void addAvgMaxLatency(final Sensor sensor,
+ final String group,
+ final Map<String, String> tags,
+ final String operation) {
sensor.add(
- metrics.metricName(
- opName + "-latency-avg",
- groupNameFromScope(scopeName),
- "The average latency of " + opName + " operation.", tags),
+ new MetricName(
+ operation + "-latency-avg",
+ group,
+ "The average latency of " + operation + " operation.",
+ tags),
new Avg()
);
sensor.add(
- metrics.metricName(
- opName + "-latency-max",
- groupNameFromScope(scopeName),
- "The max latency of " + opName + " operation.",
- tags
- ),
+ new MetricName(
+ operation + "-latency-max",
+ group,
+ "The max latency of " + operation + " operation.",
+ tags),
new Max()
);
}
- private void addThroughputMetrics(final String scopeName, final Sensor
sensor, final String opName, final Map<String, String> tags) {
+ public static void addInvocationRateAndCount(final Sensor sensor,
+ final String group,
+ final Map<String, String>
tags,
+ final String operation) {
sensor.add(
- metrics.metricName(
- opName + "-rate",
- groupNameFromScope(scopeName),
- "The average number of occurrence of " + opName + " operation
per second.",
+ new MetricName(
+ operation + "-rate",
+ group,
+ "The average number of occurrence of " + operation + "
operation per second.",
tags
),
new Rate(TimeUnit.SECONDS, new Count())
);
sensor.add(
- metrics.metricName(
- opName + "-total",
- groupNameFromScope(scopeName),
- "The total number of occurrence of " + opName + " operations.",
+ new MetricName(
+ operation + "-total",
+ group,
+ "The total number of occurrence of " + operation + "
operations.",
tags
),
new Count()
@@ -367,4 +416,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
}
}
+ private static String groupNameFromScope(final String scopeName) {
+ return "stream-" + scopeName + "-metrics";
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index fd79543..57458fb 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -32,6 +32,10 @@ import org.apache.kafka.streams.state.StateSerdes;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG;
+import static
org.apache.kafka.streams.state.internals.metrics.Sensors.createTaskAndStoreLatencyAndThroughputSensors;
/**
* A Metered {@link KeyValueStore} wrapper that is used for recording
operation metrics, and hence its
@@ -59,6 +63,7 @@ public class MeteredKeyValueStore<K, V> extends
WrappedStateStore.AbstractStateS
private Sensor rangeTime;
private Sensor flushTime;
private StreamsMetricsImpl metrics;
+ private String taskName;
MeteredKeyValueStore(final KeyValueStore<Bytes, byte[]> inner,
final String metricScope,
@@ -77,79 +82,27 @@ public class MeteredKeyValueStore<K, V> extends
WrappedStateStore.AbstractStateS
@Override
public void init(final ProcessorContext context,
final StateStore root) {
- final String name = name();
- final String tagKey = "task-id";
- final String taskName = context.taskId().toString();
+ this.metrics = (StreamsMetricsImpl) context.metrics();
+
+ taskName = context.taskId().toString();
+ final String metricsGroup = "stream-" + metricScope + "-metrics";
+ final Map<String, String> taskTags = metrics.tagMap("task-id",
taskName, metricScope + "-id", "all");
+ final Map<String, String> storeTags = metrics.tagMap("task-id",
taskName, metricScope + "-id", name());
this.serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(),
name()),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
- this.metrics = (StreamsMetricsImpl) context.metrics();
- this.putTime = this.metrics.addLatencyAndThroughputSensor(
- taskName,
- metricScope,
- name,
- "put",
- Sensor.RecordingLevel.DEBUG,
- tagKey, taskName);
- this.putIfAbsentTime = this.metrics.addLatencyAndThroughputSensor(
- taskName,
- metricScope,
- name,
- "put-if-absent",
- Sensor.RecordingLevel.DEBUG,
- tagKey, taskName);
- this.getTime = this.metrics.addLatencyAndThroughputSensor(
- taskName,
- metricScope,
- name,
- "get",
- Sensor.RecordingLevel.DEBUG,
- tagKey, taskName);
- this.deleteTime = this.metrics.addLatencyAndThroughputSensor(
- taskName,
- metricScope,
- name,
- "delete",
- Sensor.RecordingLevel.DEBUG,
- tagKey, taskName);
- this.putAllTime = this.metrics.addLatencyAndThroughputSensor(
- taskName,
- metricScope,
- name,
- "put-all",
- Sensor.RecordingLevel.DEBUG,
- tagKey, taskName);
- this.allTime = this.metrics.addLatencyAndThroughputSensor(
- taskName,
- metricScope,
- name,
- "all",
- Sensor.RecordingLevel.DEBUG,
- tagKey, taskName);
- this.rangeTime = this.metrics.addLatencyAndThroughputSensor(
- taskName,
- metricScope,
- name,
- "range",
- Sensor.RecordingLevel.DEBUG,
- tagKey, taskName);
- this.flushTime = this.metrics.addLatencyAndThroughputSensor(
- taskName,
- metricScope,
- name,
- "flush",
- Sensor.RecordingLevel.DEBUG,
- tagKey, taskName);
- final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(
- taskName,
- metricScope,
- name,
- "restore",
- Sensor.RecordingLevel.DEBUG,
- tagKey, taskName);
+ putTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "put",
metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+ putIfAbsentTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG,
"put-if-absent", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+ putAllTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG,
"put-all", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+ getTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "get",
metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+ allTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "all",
metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+ rangeTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG,
"range", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+ flushTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG,
"flush", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+ deleteTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG,
"delete", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+ final Sensor restoreTime =
createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "restore", metrics,
metricsGroup, taskName, name(), taskTags, storeTags);
// register and possibly restore the state from the logs
if (restoreTime.shouldRecord()) {
@@ -165,6 +118,12 @@ public class MeteredKeyValueStore<K, V> extends
WrappedStateStore.AbstractStateS
}
@Override
+ public void close() {
+ super.close();
+ metrics.removeAllStoreLevelSensors(taskName, name());
+ }
+
+ @Override
public long approximateNumEntries() {
return inner.approximateNumEntries();
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index b285b65..65ab758 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -30,8 +30,12 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StateSerdes;
+import java.util.Map;
import java.util.Objects;
+import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG;
+import static
org.apache.kafka.streams.state.internals.metrics.Sensors.createTaskAndStoreLatencyAndThroughputSensors;
+
public class MeteredSessionStore<K, V> extends
WrappedStateStore.AbstractStateStore implements SessionStore<K, V> {
private final SessionStore<Bytes, byte[]> inner;
private final String metricScope;
@@ -44,6 +48,7 @@ public class MeteredSessionStore<K, V> extends
WrappedStateStore.AbstractStateSt
private Sensor fetchTime;
private Sensor flushTime;
private Sensor removeTime;
+ private String taskName;
MeteredSessionStore(final SessionStore<Bytes, byte[]> inner,
final String metricScope,
@@ -65,30 +70,40 @@ public class MeteredSessionStore<K, V> extends
WrappedStateStore.AbstractStateSt
this.serdes = new
StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(),
name()),
keySerde == null ? (Serde<K>)
context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>)
context.valueSerde() : valueSerde);
- final String tagKey = "task-id";
- final String taskName = context.taskId().toString();
this.metrics = (StreamsMetricsImpl) context.metrics();
- this.putTime = this.metrics.addLatencyAndThroughputSensor(taskName,
metricScope, name(), "put",
-
Sensor.RecordingLevel.DEBUG, tagKey, taskName);
- this.fetchTime = this.metrics.addLatencyAndThroughputSensor(taskName,
metricScope, name(), "fetch",
-
Sensor.RecordingLevel.DEBUG, tagKey, taskName);
- this.flushTime = this.metrics.addLatencyAndThroughputSensor(taskName,
metricScope, name(), "flush",
-
Sensor.RecordingLevel.DEBUG, tagKey, taskName);
- this.removeTime = this.metrics.addLatencyAndThroughputSensor(taskName,
metricScope, name(), "remove",
-
Sensor.RecordingLevel.DEBUG, tagKey, taskName);
-
- final Sensor restoreTime =
this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(),
"restore",
-
Sensor.RecordingLevel.DEBUG, tagKey, taskName);
+
+ taskName = context.taskId().toString();
+ final String metricsGroup = "stream-" + metricScope + "-metrics";
+ final Map<String, String> taskTags = metrics.tagMap("task-id",
taskName, metricScope + "-id", "all");
+ final Map<String, String> storeTags = metrics.tagMap("task-id",
taskName, metricScope + "-id", name());
+
+ putTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "put",
metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+ fetchTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG,
"fetch", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+ flushTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG,
"flush", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+ removeTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG,
"remove", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+ final Sensor restoreTime =
createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "restore", metrics,
metricsGroup, taskName, name(), taskTags, storeTags);
+
// register and possibly restore the state from the logs
final long startNs = time.nanoseconds();
try {
inner.init(context, root);
} finally {
- this.metrics.recordLatency(restoreTime, startNs,
time.nanoseconds());
+ this.metrics.recordLatency(
+ restoreTime,
+ startNs,
+ time.nanoseconds()
+ );
}
}
@Override
+ public void close() {
+ super.close();
+ metrics.removeAllStoreLevelSensors(taskName, name());
+ }
+
+
+ @Override
public KeyValueIterator<Windowed<K>, V> findSessions(final K key,
final long
earliestSessionEndTime,
final long
latestSessionStartTime) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 62ed6c6..5a27ed4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -31,6 +31,11 @@ import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
+import java.util.Map;
+
+import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG;
+import static
org.apache.kafka.streams.state.internals.metrics.Sensors.createTaskAndStoreLatencyAndThroughputSensors;
+
public class MeteredWindowStore<K, V> extends
WrappedStateStore.AbstractStateStore implements WindowStore<K, V> {
private final WindowStore<Bytes, byte[]> inner;
@@ -44,6 +49,7 @@ public class MeteredWindowStore<K, V> extends
WrappedStateStore.AbstractStateSto
private Sensor flushTime;
private StateSerdes<K, V> serdes;
private ProcessorContext context;
+ private String taskName;
MeteredWindowStore(final WindowStore<Bytes, byte[]> inner,
final String metricScope,
@@ -65,27 +71,38 @@ public class MeteredWindowStore<K, V> extends
WrappedStateStore.AbstractStateSto
this.serdes = new
StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(),
name()),
keySerde == null ? (Serde<K>)
context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>)
context.valueSerde() : valueSerde);
- final String tagKey = "task-id";
- final String taskName = context.taskId().toString();
this.metrics = (StreamsMetricsImpl) context.metrics();
- this.putTime = this.metrics.addLatencyAndThroughputSensor(taskName,
metricScope, name(), "put",
-
Sensor.RecordingLevel.DEBUG, tagKey, taskName);
- this.fetchTime = this.metrics.addLatencyAndThroughputSensor(taskName,
metricScope, name(), "fetch",
-
Sensor.RecordingLevel.DEBUG, tagKey, taskName);
- this.flushTime = this.metrics.addLatencyAndThroughputSensor(taskName,
metricScope, name(), "flush",
-
Sensor.RecordingLevel.DEBUG, tagKey, taskName);
- final Sensor restoreTime =
this.metrics.addLatencyAndThroughputSensor(taskName, metricScope, name(),
"restore",
-
Sensor.RecordingLevel.DEBUG, tagKey, taskName);
+
+ taskName = context.taskId().toString();
+ final String metricsGroup = "stream-" + metricScope + "-metrics";
+ final Map<String, String> taskTags = metrics.tagMap("task-id",
taskName, metricScope + "-id", "all");
+ final Map<String, String> storeTags = metrics.tagMap("task-id",
taskName, metricScope + "-id", name());
+
+ putTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "put",
metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+ fetchTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG,
"fetch", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+ flushTime = createTaskAndStoreLatencyAndThroughputSensors(DEBUG,
"flush", metrics, metricsGroup, taskName, name(), taskTags, storeTags);
+ final Sensor restoreTime =
createTaskAndStoreLatencyAndThroughputSensors(DEBUG, "restore", metrics,
metricsGroup, taskName, name(), taskTags, storeTags);
+
// register and possibly restore the state from the logs
final long startNs = time.nanoseconds();
try {
inner.init(context, root);
} finally {
- this.metrics.recordLatency(restoreTime, startNs,
time.nanoseconds());
+ this.metrics.recordLatency(
+ restoreTime,
+ startNs,
+ time.nanoseconds()
+ );
}
}
@Override
+ public void close() {
+ super.close();
+ metrics.removeAllStoreLevelSensors(taskName, name());
+ }
+
+ @Override
public void put(final K key, final V value) {
put(key, value, context.timestamp());
}
@@ -134,16 +151,16 @@ public class MeteredWindowStore<K, V> extends
WrappedStateStore.AbstractStateSto
public KeyValueIterator<Windowed<K>, V> all() {
return new MeteredWindowedKeyValueIterator<>(inner.all(), fetchTime,
metrics, serdes, time);
}
-
+
@Override
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
final long timeTo) {
- return new MeteredWindowedKeyValueIterator<>(inner.fetchAll(timeFrom,
timeTo),
- fetchTime,
- metrics,
- serdes,
+ return new MeteredWindowedKeyValueIterator<>(inner.fetchAll(timeFrom,
timeTo),
+ fetchTime,
+ metrics,
+ serdes,
time);
}
-
+
@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to,
final long timeFrom, final long timeTo) {
return new
MeteredWindowedKeyValueIterator<>(inner.fetch(keyBytes(from), keyBytes(to),
timeFrom, timeTo),
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java
new file mode 100644
index 0000000..fdbc7c8
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals.metrics;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+
+import java.util.Map;
+
+import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency;
+import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+
+public final class Sensors {
+
+ private Sensors() {}
+
+ public static Sensor createTaskAndStoreLatencyAndThroughputSensors(final
Sensor.RecordingLevel level,
+ final
String operation,
+ final
StreamsMetricsImpl metrics,
+ final
String metricsGroup,
+ final
String taskName,
+ final
String storeName,
+ final
Map<String, String> taskTags,
+ final
Map<String, String> storeTags) {
+ final Sensor taskSensor = metrics.taskLevelSensor(taskName, operation,
level);
+ addAvgMaxLatency(taskSensor, metricsGroup, taskTags, operation);
+ addInvocationRateAndCount(taskSensor, metricsGroup, taskTags,
operation);
+ final Sensor sensor = metrics.storeLevelSensor(taskName, storeName,
operation, level, taskSensor);
+ addAvgMaxLatency(sensor, metricsGroup, storeTags, operation);
+ addInvocationRateAndCount(sensor, metricsGroup, storeTags, operation);
+ return sensor;
+ }
+}
+
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 65dd022..4e14143 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -111,17 +111,11 @@ public class ProcessorNodeTest {
metricTags.put("task-id", context.taskId().toString());
metricTags.put("client-id", "mock");
-
- for (final String operation : latencyOperations) {
- assertNotNull(metrics.getSensor("name-mock.0_0." + operation));
- }
- assertNotNull(metrics.getSensor("name-mock.0_0." +
throughputOperation));
-
for (final String opName : latencyOperations) {
- StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),
opName + "-latency-avg", groupName, metricTags);
- StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),
opName + "-latency-max", groupName, metricTags);
- StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),
opName + "-rate", groupName, metricTags);
- StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),
opName + "-total", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),
opName + "-latency-avg", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),
opName + "-latency-max", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),
opName + "-rate", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),
opName + "-total", groupName, metricTags);
}
assertNotNull(metrics.metrics().get(metrics.metricName(throughputOperation +
"-rate", groupName,
"The average
number of occurrence of " + throughputOperation + " operation per second.",
@@ -130,10 +124,10 @@ public class ProcessorNodeTest {
// test "all"
metricTags.put("processor-node-id", "all");
for (final String opName : latencyOperations) {
- StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),
opName + "-latency-avg", groupName, metricTags);
- StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),
opName + "-latency-max", groupName, metricTags);
- StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),
opName + "-rate", groupName, metricTags);
- StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),
opName + "-total", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),
opName + "-latency-avg", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),
opName + "-latency-max", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),
opName + "-rate", groupName, metricTags);
+ StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(),
opName + "-total", groupName, metricTags);
}
assertNotNull(metrics.metrics().get(metrics.metricName(throughputOperation +
"-rate",
groupName,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 146bcb3..8f25c53 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -627,8 +627,8 @@ public class StreamTaskTest {
public void flush() {
flushed.set(true);
}
- }
- );
+ },
+ metrics.sensor("dummy"));
streamTask.flushState();
assertTrue(flushed.get());
}
@@ -972,8 +972,8 @@ public class StreamTaskTest {
stateDirectory,
null,
time,
- producer
- );
+ producer,
+ metrics.sensor("dummy"));
task.initializeStateStores();
task.initializeTopology();
@@ -1041,8 +1041,8 @@ public class StreamTaskTest {
stateDirectory,
null,
time,
- producer
- );
+ producer,
+ metrics.sensor("dummy"));
}
private StreamTask createStatefulTaskThatThrowsExceptionOnClose() {
@@ -1063,8 +1063,8 @@ public class StreamTaskTest {
stateDirectory,
null,
time,
- producer
- );
+ producer,
+ metrics.sensor("dummy"));
}
private StreamTask createStatelessTask(final StreamsConfig streamsConfig) {
@@ -1089,8 +1089,8 @@ public class StreamTaskTest {
stateDirectory,
null,
time,
- producer
- );
+ producer,
+ metrics.sensor("dummy"));
}
// this task will throw exception when processing (on partition2),
flushing, suspending and closing
@@ -1116,8 +1116,8 @@ public class StreamTaskTest {
stateDirectory,
null,
time,
- producer
- ) {
+ producer,
+ metrics.sensor("dummy")) {
@Override
protected void flushState() {
throw new RuntimeException("KABOOM!");
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
index a72dc79..b065e2c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
@@ -40,7 +40,6 @@ public class StreamsMetricsImplTest {
@Test
public void testRemoveSensor() {
final String sensorName = "sensor1";
- final String taskName = "task";
final String scope = "scope";
final String entity = "entity";
final String operation = "put";
@@ -52,10 +51,10 @@ public class StreamsMetricsImplTest {
final Sensor sensor1a = streamsMetrics.addSensor(sensorName,
Sensor.RecordingLevel.DEBUG, sensor1);
streamsMetrics.removeSensor(sensor1a);
- final Sensor sensor2 =
streamsMetrics.addLatencyAndThroughputSensor(taskName, scope, entity,
operation, Sensor.RecordingLevel.DEBUG);
+ final Sensor sensor2 =
streamsMetrics.addLatencyAndThroughputSensor(scope, entity, operation,
Sensor.RecordingLevel.DEBUG);
streamsMetrics.removeSensor(sensor2);
- final Sensor sensor3 = streamsMetrics.addThroughputSensor(taskName,
scope, entity, operation, Sensor.RecordingLevel.DEBUG);
+ final Sensor sensor3 = streamsMetrics.addThroughputSensor(scope,
entity, operation, Sensor.RecordingLevel.DEBUG);
streamsMetrics.removeSensor(sensor3);
}
@@ -64,12 +63,11 @@ public class StreamsMetricsImplTest {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new
Metrics(), "");
final int defaultMetrics = streamsMetrics.metrics().size();
- final String taskName = "task";
final String scope = "scope";
final String entity = "entity";
final String operation = "put";
- final Sensor sensor1 =
streamsMetrics.addLatencyAndThroughputSensor(taskName, scope, entity,
operation, Sensor.RecordingLevel.DEBUG);
+ final Sensor sensor1 =
streamsMetrics.addLatencyAndThroughputSensor(scope, entity, operation,
Sensor.RecordingLevel.DEBUG);
// 2 meters and 4 non-meter metrics plus a common metric that keeps
track of total registered metrics in Metrics() constructor
final int meterMetricsCount = 2; // Each Meter is a combination of a
Rate and a Total
@@ -85,12 +83,11 @@ public class StreamsMetricsImplTest {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new
Metrics(), "");
final int defaultMetrics = streamsMetrics.metrics().size();
- final String taskName = "task";
final String scope = "scope";
final String entity = "entity";
final String operation = "put";
- final Sensor sensor1 = streamsMetrics.addThroughputSensor(taskName,
scope, entity, operation, Sensor.RecordingLevel.DEBUG);
+ final Sensor sensor1 = streamsMetrics.addThroughputSensor(scope,
entity, operation, Sensor.RecordingLevel.DEBUG);
final int meterMetricsCount = 2; // Each Meter is a combination of a
Rate and a Total
// 2 meter metrics plus a common metric that keeps track of total
registered metrics in Metrics() constructor
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 4916cb0..75bb219 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -190,8 +190,8 @@ public class StreamThreadStateStoreProviderTest {
stateDirectory,
null,
new MockTime(),
- clientSupplier.getProducer(new HashMap<String, Object>())
- ) {
+ clientSupplier.getProducer(new HashMap<String, Object>()),
+ metrics.sensor("dummy")) {
@Override
protected void updateOffsetLimits() {}
};
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 74fa8ca..d2796db 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -338,7 +338,8 @@ public class TopologyTestDriver implements Closeable {
stateDirectory,
cache,
mockWallClockTime,
- producer);
+ producer,
+ metrics.sensor("dummy"));
task.initializeStateStores();
task.initializeTopology();
context = (InternalProcessorContext) task.context();