This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a7d0fdd  KAFKA-8578: Add basic functionality to expose RocksDB metrics 
(#6979)
a7d0fdd is described below

commit a7d0fdd534ef55533a868ea7388bbc081ee42718
Author: cadonna <br...@confluent.io>
AuthorDate: Fri Aug 2 18:51:03 2019 +0200

    KAFKA-8578: Add basic functionality to expose RocksDB metrics (#6979)
    
    * Adds RocksDBMetrics class that provides methods to get sensors from the 
Kafka metrics registry and to setup the sensors to record RocksDB metrics
    
    * Extends StreamsMetricsImpl with functionality to add the required metrics 
to the sensors.
    
    Reviewers: Boyang Chen <boy...@confluent.io>, Bill Bejeck 
<b...@confluent.io>, Matthias J. Sax <matth...@confluent.io>, John Roesler 
<vvcep...@users.noreply.github.com>, Guozhang Wang <wangg...@gmail.com>
---
 .../internals/metrics/StreamsMetricsImpl.java      |  83 ++++-
 .../state/internals/metrics/RocksDBMetrics.java    | 382 +++++++++++++++++++++
 .../internals/metrics/StreamsMetricsImplTest.java  | 113 +++++-
 .../internals/metrics/RocksDBMetricsTest.java      | 283 +++++++++++++++
 4 files changed, 852 insertions(+), 9 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index b6bfcc5..ae3d953 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -23,9 +23,12 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.metrics.stats.WindowedCount;
+import org.apache.kafka.common.metrics.stats.WindowedSum;
 import org.apache.kafka.streams.StreamsMetrics;
 
 import java.util.Arrays;
@@ -54,17 +57,21 @@ public class StreamsMetricsImpl implements StreamsMetrics {
 
     public static final String THREAD_ID_TAG = "client-id";
     public static final String TASK_ID_TAG = "task-id";
+    public static final String STORE_ID_TAG = "state-id";
 
     public static final String ALL_TASKS = "all";
 
     public static final String LATENCY_SUFFIX = "-latency";
     public static final String AVG_SUFFIX = "-avg";
     public static final String MAX_SUFFIX = "-max";
+    public static final String MIN_SUFFIX = "-min";
     public static final String RATE_SUFFIX = "-rate";
     public static final String TOTAL_SUFFIX = "-total";
+    public static final String RATIO_SUFFIX = "-ratio";
 
     public static final String THREAD_LEVEL_GROUP = "stream-metrics";
     public static final String TASK_LEVEL_GROUP = "stream-task-metrics";
+    public static final String STATE_LEVEL_GROUP = "stream-state-metrics";
 
     public static final String PROCESSOR_NODE_METRICS_GROUP = 
"stream-processor-node-metrics";
     public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
@@ -123,6 +130,18 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         }
     }
 
+    public Map<String, String> taskLevelTagMap(final String taskName) {
+        final Map<String, String> tagMap = threadLevelTagMap();
+        tagMap.put(TASK_ID_TAG, taskName);
+        return tagMap;
+    }
+
+    public Map<String, String> storeLevelTagMap(final String taskName, final 
String storeType, final String storeName) {
+        final Map<String, String> tagMap = taskLevelTagMap(taskName);
+        tagMap.put(storeType + "-" + STORE_ID_TAG, storeName);
+        return tagMap;
+    }
+
     public final Sensor taskLevelSensor(final String taskName,
                                         final String sensorName,
                                         final RecordingLevel recordingLevel,
@@ -237,9 +256,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
             if (!storeLevelSensors.containsKey(key)) {
                 storeLevelSensors.put(key, new LinkedList<>());
             }
-
             final String fullSensorName = key + SENSOR_NAME_DELIMITER + 
sensorName;
-
             final Sensor sensor = metrics.sensor(fullSensorName, 
recordingLevel, parents);
 
             storeLevelSensors.get(key).push(fullSensorName);
@@ -454,12 +471,62 @@ public class StreamsMetricsImpl implements StreamsMetrics 
{
                                                  final String group,
                                                  final Map<String, String> 
tags,
                                                  final String operation) {
-        addInvocationRateAndCount(sensor,
-                                  group,
-                                  tags,
-                                  operation,
-                                  "The total number of " + operation,
-                                  "The average per-second number of " + 
operation);
+        addInvocationRateAndCount(
+            sensor,
+            group,
+            tags,
+            operation,
+            "The total number of " + operation,
+            "The average per-second number of " + operation
+        );
+    }
+
+    public static void addRateOfSumAndSumMetricsToSensor(final Sensor sensor,
+                                                         final String group,
+                                                         final Map<String, 
String> tags,
+                                                         final String 
operation,
+                                                         final String 
descriptionOfRate,
+                                                         final String 
descriptionOfTotal) {
+        addRateOfSumMetricToSensor(sensor, group, tags, operation, 
descriptionOfRate);
+        addSumMetricToSensor(sensor, group, tags, operation, 
descriptionOfTotal);
+    }
+
+    public static void addRateOfSumMetricToSensor(final Sensor sensor,
+                                                  final String group,
+                                                  final Map<String, String> 
tags,
+                                                  final String operation,
+                                                  final String description) {
+        sensor.add(new MetricName(operation + RATE_SUFFIX, group, description, 
tags),
+                   new Rate(TimeUnit.SECONDS, new WindowedSum()));
+    }
+
+    public static void addSumMetricToSensor(final Sensor sensor,
+                                            final String group,
+                                            final Map<String, String> tags,
+                                            final String operation,
+                                            final String description) {
+        sensor.add(new MetricName(operation + TOTAL_SUFFIX, group, 
description, tags), new CumulativeSum());
+    }
+
+    public static void addValueMetricToSensor(final Sensor sensor,
+                                              final String group,
+                                              final Map<String, String> tags,
+                                              final String name,
+                                              final String description) {
+        sensor.add(new MetricName(name, group, description, tags), new 
Value());
+    }
+
+    public static void addAvgAndSumMetricsToSensor(final Sensor sensor,
+                                                   final String group,
+                                                   final Map<String, String> 
tags,
+                                                   final String 
metricNamePrefix,
+                                                   final String 
descriptionOfAvg,
+                                                   final String 
descriptionOfTotal) {
+        sensor.add(new MetricName(metricNamePrefix + AVG_SUFFIX, group, 
descriptionOfAvg, tags), new Avg());
+        sensor.add(
+            new MetricName(metricNamePrefix + TOTAL_SUFFIX, group, 
descriptionOfTotal, tags),
+            new CumulativeSum()
+        );
     }
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java
new file mode 100644
index 0000000..95cb522
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals.metrics;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.AVG_SUFFIX;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.MAX_SUFFIX;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.MIN_SUFFIX;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATIO_SUFFIX;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.STATE_LEVEL_GROUP;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addRateOfSumMetricToSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndSumMetricsToSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addSumMetricToSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addValueMetricToSensor;
+
+public class RocksDBMetrics {
+    private RocksDBMetrics() {}
+
+    private static final String BYTES_WRITTEN_TO_DB = "bytes-written";
+    private static final String BYTES_READ_FROM_DB = "bytes-read";
+    private static final String MEMTABLE_BYTES_FLUSHED = 
"memtable-bytes-flushed";
+    private static final String MEMTABLE_HIT_RATIO = "memtable-hit" + 
RATIO_SUFFIX;
+    private static final String MEMTABLE_FLUSH_TIME = "memtable-flush-time";
+    private static final String MEMTABLE_FLUSH_TIME_AVG = MEMTABLE_FLUSH_TIME 
+ AVG_SUFFIX;
+    private static final String MEMTABLE_FLUSH_TIME_MIN = MEMTABLE_FLUSH_TIME 
+ MIN_SUFFIX;
+    private static final String MEMTABLE_FLUSH_TIME_MAX = MEMTABLE_FLUSH_TIME 
+ MAX_SUFFIX;
+    private static final String WRITE_STALL_DURATION = "write-stall-duration";
+    private static final String BLOCK_CACHE_DATA_HIT_RATIO = 
"block-cache-data-hit" + RATIO_SUFFIX;
+    private static final String BLOCK_CACHE_INDEX_HIT_RATIO = 
"block-cache-index-hit" + RATIO_SUFFIX;
+    private static final String BLOCK_CACHE_FILTER_HIT_RATIO = 
"block-cache-filter-hit" + RATIO_SUFFIX;
+    private static final String BYTES_READ_DURING_COMPACTION = 
"bytes-read-compaction";
+    private static final String BYTES_WRITTEN_DURING_COMPACTION = 
"bytes-written-compaction";
+    private static final String COMPACTION_TIME = "compaction-time";
+    private static final String COMPACTION_TIME_AVG = COMPACTION_TIME + 
AVG_SUFFIX;
+    private static final String COMPACTION_TIME_MIN = COMPACTION_TIME + 
MIN_SUFFIX;
+    private static final String COMPACTION_TIME_MAX = COMPACTION_TIME + 
MAX_SUFFIX;
+    private static final String NUMBER_OF_OPEN_FILES = "number-open-files";
+    private static final String NUMBER_OF_FILE_ERRORS = "number-file-errors";
+
+    private static final String BYTES_WRITTEN_TO_DB_RATE_DESCRIPTION =
+        "Average number of bytes written per second to the RocksDB state 
store";
+    private static final String BYTES_WRITTEN_TO_DB_TOTAL_DESCRIPTION =
+        "Total number of bytes written to the RocksDB state store";
+    private static final String BYTES_READ_FROM_DB_RATE_DESCRIPTION =
+        "Average number of bytes read per second from the RocksDB state store";
+    private static final String BYTES_READ_FROM_DB_TOTAL_DESCRIPTION =
+        "Total number of bytes read from the RocksDB state store";
+    private static final String MEMTABLE_BYTES_FLUSHED_RATE_DESCRIPTION =
+        "Average number of bytes flushed per second from the memtable to disk";
+    private static final String MEMTABLE_BYTES_FLUSHED_TOTAL_DESCRIPTION =
+        "Total number of bytes flushed from the memtable to disk";
+    private static final String MEMTABLE_HIT_RATIO_DESCRIPTION =
+        "Ratio of memtable hits relative to all lookups to the memtable";
+    private static final String MEMTABLE_FLUSH_TIME_AVG_DESCRIPTION =
+        "Average time spent on flushing the memtable to disk in ms";
+    private static final String MEMTABLE_FLUSH_TIME_MIN_DESCRIPTION =
+        "Minimum time spent on flushing the memtable to disk in ms";
+    private static final String MEMTABLE_FLUSH_TIME_MAX_DESCRIPTION =
+        "Maximum time spent on flushing the memtable to disk in ms";
+    private static final String WRITE_STALL_DURATION_AVG_DESCRIPTION = 
"Average duration of write stalls in ms";
+    private static final String WRITE_STALL_DURATION_TOTAL_DESCRIPTION = 
"Total duration of write stalls in ms";
+    private static final String BLOCK_CACHE_DATA_HIT_RATIO_DESCRIPTION =
+        "Ratio of block cache hits for data relative to all lookups for data 
to the block cache";
+    private static final String BLOCK_CACHE_INDEX_HIT_RATIO_DESCRIPTION =
+        "Ratio of block cache hits for indexes relative to all lookups for 
indexes to the block cache";
+    private static final String BLOCK_CACHE_FILTER_HIT_RATIO_DESCRIPTION =
+        "Ratio of block cache hits for filters relative to all lookups for 
filters to the block cache";
+    private static final String BYTES_READ_DURING_COMPACTION_DESCRIPTION =
+        "Average number of bytes read per second during compaction";
+    private static final String BYTES_WRITTEN_DURING_COMPACTION_DESCRIPTION =
+        "Average number of bytes written per second during compaction";
+    private static final String COMPACTION_TIME_AVG_DESCRIPTION = "Average 
time spent on compaction in ms";
+    private static final String COMPACTION_TIME_MIN_DESCRIPTION = "Minimum 
time spent on compaction in ms";
+    private static final String COMPACTION_TIME_MAX_DESCRIPTION = "Maximum 
time spent on compaction in ms";
+    private static final String NUMBER_OF_OPEN_FILES_DESCRIPTION = "Number of 
currently open files";
+    private static final String NUMBER_OF_FILE_ERRORS_DESCRIPTION = "Total 
number of file errors occurred";
+
+    public static class RocksDBMetricContext {
+        private final String taskName;
+        private final String storeType;
+        private final String storeName;
+
+        public RocksDBMetricContext(final String taskName, final String 
storeType, final String storeName) {
+            this.taskName = taskName;
+            this.storeType = "rocksdb-" + storeType;
+            this.storeName = storeName;
+        }
+
+        public String taskName() {
+            return taskName;
+        }
+        public String storeType() {
+            return storeType;
+        }
+        public String storeName() {
+            return storeName;
+        }
+    }
+
+    public static Sensor bytesWrittenToDatabaseSensor(final StreamsMetricsImpl 
streamsMetrics,
+                                                      final 
RocksDBMetricContext metricContext) {
+        final Sensor sensor = createSensor(streamsMetrics, metricContext, 
BYTES_WRITTEN_TO_DB);
+        addRateOfSumAndSumMetricsToSensor(
+            sensor,
+            STATE_LEVEL_GROUP,
+            streamsMetrics
+                .storeLevelTagMap(metricContext.taskName(), 
metricContext.storeType(), metricContext.storeName()),
+            BYTES_WRITTEN_TO_DB,
+            BYTES_WRITTEN_TO_DB_RATE_DESCRIPTION,
+            BYTES_WRITTEN_TO_DB_TOTAL_DESCRIPTION
+        );
+        return sensor;
+    }
+
+    public static Sensor bytesReadFromDatabaseSensor(final StreamsMetricsImpl 
streamsMetrics,
+                                                     final 
RocksDBMetricContext metricContext) {
+        final Sensor sensor = createSensor(streamsMetrics, metricContext, 
BYTES_READ_FROM_DB);
+        addRateOfSumAndSumMetricsToSensor(
+            sensor,
+            STATE_LEVEL_GROUP,
+            streamsMetrics
+                .storeLevelTagMap(metricContext.taskName(), 
metricContext.storeType(), metricContext.storeName()),
+            BYTES_READ_FROM_DB,
+            BYTES_READ_FROM_DB_RATE_DESCRIPTION,
+            BYTES_READ_FROM_DB_TOTAL_DESCRIPTION
+        );
+        return sensor;
+    }
+
+    public static Sensor memtableBytesFlushedSensor(final StreamsMetricsImpl 
streamsMetrics,
+                                                    final RocksDBMetricContext 
metricContext) {
+        final Sensor sensor = createSensor(streamsMetrics, metricContext, 
MEMTABLE_BYTES_FLUSHED);
+        addRateOfSumAndSumMetricsToSensor(
+            sensor,
+            STATE_LEVEL_GROUP,
+            streamsMetrics
+                .storeLevelTagMap(metricContext.taskName(), 
metricContext.storeType(), metricContext.storeName()),
+            MEMTABLE_BYTES_FLUSHED,
+            MEMTABLE_BYTES_FLUSHED_RATE_DESCRIPTION,
+            MEMTABLE_BYTES_FLUSHED_TOTAL_DESCRIPTION
+        );
+        return sensor;
+    }
+
+    public static Sensor memtableHitRatioSensor(final StreamsMetricsImpl 
streamsMetrics,
+                                                final RocksDBMetricContext 
metricContext) {
+        final Sensor sensor = createSensor(streamsMetrics, metricContext, 
MEMTABLE_HIT_RATIO);
+        addValueMetricToSensor(
+            sensor,
+            STATE_LEVEL_GROUP,
+            streamsMetrics
+                .storeLevelTagMap(metricContext.taskName(), 
metricContext.storeType(), metricContext.storeName()),
+            MEMTABLE_HIT_RATIO,
+            MEMTABLE_HIT_RATIO_DESCRIPTION
+        );
+        return sensor;
+    }
+
+    public static Sensor memtableAvgFlushTimeSensor(final StreamsMetricsImpl 
streamsMetrics,
+                                                    final RocksDBMetricContext 
metricContext) {
+        final Sensor sensor = createSensor(streamsMetrics, metricContext, 
MEMTABLE_FLUSH_TIME_AVG);
+        addValueMetricToSensor(
+            sensor,
+            STATE_LEVEL_GROUP,
+            streamsMetrics
+                .storeLevelTagMap(metricContext.taskName(), 
metricContext.storeType(), metricContext.storeName()),
+            MEMTABLE_FLUSH_TIME_AVG,
+            MEMTABLE_FLUSH_TIME_AVG_DESCRIPTION
+        );
+        return sensor;
+    }
+
+    public static Sensor memtableMinFlushTimeSensor(final StreamsMetricsImpl 
streamsMetrics,
+                                                    final RocksDBMetricContext 
metricContext) {
+        final Sensor sensor = createSensor(streamsMetrics, metricContext, 
MEMTABLE_FLUSH_TIME_MIN);
+        addValueMetricToSensor(
+            sensor,
+            STATE_LEVEL_GROUP,
+            streamsMetrics
+                .storeLevelTagMap(metricContext.taskName(), 
metricContext.storeType(), metricContext.storeName()),
+            MEMTABLE_FLUSH_TIME_MIN,
+            MEMTABLE_FLUSH_TIME_MIN_DESCRIPTION
+        );
+        return sensor;
+    }
+
+    public static Sensor memtableMaxFlushTimeSensor(final StreamsMetricsImpl 
streamsMetrics,
+                                                    final RocksDBMetricContext 
metricContext) {
+        final Sensor sensor = createSensor(streamsMetrics, metricContext, 
MEMTABLE_FLUSH_TIME_MAX);
+        addValueMetricToSensor(
+            sensor,
+            STATE_LEVEL_GROUP,
+            streamsMetrics
+                .storeLevelTagMap(metricContext.taskName(), 
metricContext.storeType(), metricContext.storeName()),
+            MEMTABLE_FLUSH_TIME_MAX,
+            MEMTABLE_FLUSH_TIME_MAX_DESCRIPTION
+        );
+        return sensor;
+    }
+
+    public static Sensor writeStallDurationSensor(final StreamsMetricsImpl 
streamsMetrics,
+                                                  final RocksDBMetricContext 
metricContext) {
+        final Sensor sensor = createSensor(streamsMetrics, metricContext, 
WRITE_STALL_DURATION);
+        addAvgAndSumMetricsToSensor(
+            sensor,
+            STATE_LEVEL_GROUP,
+            streamsMetrics
+                .storeLevelTagMap(metricContext.taskName(), 
metricContext.storeType(), metricContext.storeName()),
+            WRITE_STALL_DURATION,
+            WRITE_STALL_DURATION_AVG_DESCRIPTION,
+            WRITE_STALL_DURATION_TOTAL_DESCRIPTION
+        );
+        return sensor;
+    }
+
+    public static Sensor blockCacheDataHitRatioSensor(final StreamsMetricsImpl 
streamsMetrics,
+                                                      final 
RocksDBMetricContext metricContext) {
+        final Sensor sensor = createSensor(streamsMetrics, metricContext, 
BLOCK_CACHE_DATA_HIT_RATIO);
+        addValueMetricToSensor(
+            sensor,
+            STATE_LEVEL_GROUP,
+            streamsMetrics
+                .storeLevelTagMap(metricContext.taskName(), 
metricContext.storeType(), metricContext.storeName()),
+            BLOCK_CACHE_DATA_HIT_RATIO,
+            BLOCK_CACHE_DATA_HIT_RATIO_DESCRIPTION
+        );
+        return sensor;
+    }
+
+    public static Sensor blockCacheIndexHitRatioSensor(final 
StreamsMetricsImpl streamsMetrics,
+                                                       final 
RocksDBMetricContext metricContext) {
+        final Sensor sensor = createSensor(streamsMetrics, metricContext, 
BLOCK_CACHE_INDEX_HIT_RATIO);
+        addValueMetricToSensor(
+            sensor,
+            STATE_LEVEL_GROUP,
+            streamsMetrics.storeLevelTagMap(metricContext.taskName(), 
metricContext.storeType(), metricContext.storeName()),
+            BLOCK_CACHE_INDEX_HIT_RATIO,
+            BLOCK_CACHE_INDEX_HIT_RATIO_DESCRIPTION
+        );
+        return sensor;
+    }
+
+    public static Sensor blockCacheFilterHitRatioSensor(final 
StreamsMetricsImpl streamsMetrics,
+                                                        final 
RocksDBMetricContext metricContext) {
+        final Sensor sensor = createSensor(streamsMetrics, metricContext, 
BLOCK_CACHE_FILTER_HIT_RATIO);
+        addValueMetricToSensor(
+            sensor,
+            STATE_LEVEL_GROUP,
+            streamsMetrics
+                .storeLevelTagMap(metricContext.taskName(), 
metricContext.storeType(), metricContext.storeName()),
+            BLOCK_CACHE_FILTER_HIT_RATIO,
+            BLOCK_CACHE_FILTER_HIT_RATIO_DESCRIPTION
+        );
+        return sensor;
+    }
+
+    public static Sensor bytesReadDuringCompactionSensor(final 
StreamsMetricsImpl streamsMetrics,
+                                                         final 
RocksDBMetricContext metricContext) {
+        final Sensor sensor = createSensor(streamsMetrics, metricContext, 
BYTES_READ_DURING_COMPACTION);
+        addRateOfSumMetricToSensor(
+            sensor,
+            STATE_LEVEL_GROUP,
+            streamsMetrics
+                .storeLevelTagMap(metricContext.taskName(), 
metricContext.storeType(), metricContext.storeName()),
+            BYTES_READ_DURING_COMPACTION,
+            BYTES_READ_DURING_COMPACTION_DESCRIPTION
+        );
+        return sensor;
+    }
+
+    public static Sensor bytesWrittenDuringCompactionSensor(final 
StreamsMetricsImpl streamsMetrics,
+                                                            final 
RocksDBMetricContext metricContext) {
+        final Sensor sensor = createSensor(streamsMetrics, metricContext, 
BYTES_WRITTEN_DURING_COMPACTION);
+        addRateOfSumMetricToSensor(
+            sensor,
+            STATE_LEVEL_GROUP,
+            streamsMetrics
+                .storeLevelTagMap(metricContext.taskName(), 
metricContext.storeType(), metricContext.storeName()),
+            BYTES_WRITTEN_DURING_COMPACTION,
+            BYTES_WRITTEN_DURING_COMPACTION_DESCRIPTION
+        );
+        return sensor;
+    }
+
+    public static Sensor compactionTimeAvgSensor(final StreamsMetricsImpl 
streamsMetrics,
+                                                 final RocksDBMetricContext 
metricContext) {
+        final Sensor sensor = createSensor(streamsMetrics, metricContext, 
COMPACTION_TIME_AVG);
+        addValueMetricToSensor(
+            sensor,
+            STATE_LEVEL_GROUP,
+            streamsMetrics
+                .storeLevelTagMap(metricContext.taskName(), 
metricContext.storeType(), metricContext.storeName()),
+            COMPACTION_TIME_AVG,
+            COMPACTION_TIME_AVG_DESCRIPTION
+        );
+        return sensor;
+    }
+
+    public static Sensor compactionTimeMinSensor(final StreamsMetricsImpl 
streamsMetrics,
+                                                 final RocksDBMetricContext 
metricContext) {
+        final Sensor sensor = createSensor(streamsMetrics, metricContext, 
COMPACTION_TIME_MIN);
+        addValueMetricToSensor(
+            sensor,
+            STATE_LEVEL_GROUP,
+            streamsMetrics
+                .storeLevelTagMap(metricContext.taskName(), 
metricContext.storeType(), metricContext.storeName()),
+            COMPACTION_TIME_MIN,
+            COMPACTION_TIME_MIN_DESCRIPTION
+        );
+        return sensor;
+    }
+
+    public static Sensor compactionTimeMaxSensor(final StreamsMetricsImpl 
streamsMetrics,
+                                                 final RocksDBMetricContext 
metricContext) {
+        final Sensor sensor = createSensor(streamsMetrics, metricContext, 
COMPACTION_TIME_MAX);
+        addValueMetricToSensor(
+            sensor,
+            STATE_LEVEL_GROUP,
+            streamsMetrics
+                .storeLevelTagMap(metricContext.taskName(), 
metricContext.storeType(), metricContext.storeName()),
+            COMPACTION_TIME_MAX,
+            COMPACTION_TIME_MAX_DESCRIPTION
+        );
+        return sensor;
+    }
+
+    public static Sensor numberOfOpenFilesSensor(final StreamsMetricsImpl 
streamsMetrics,
+                                                 final RocksDBMetricContext 
metricContext) {
+        final Sensor sensor = createSensor(streamsMetrics, metricContext, 
NUMBER_OF_OPEN_FILES);
+        addValueMetricToSensor(
+            sensor,
+            STATE_LEVEL_GROUP,
+            streamsMetrics
+                .storeLevelTagMap(metricContext.taskName(), 
metricContext.storeType(), metricContext.storeName()),
+            NUMBER_OF_OPEN_FILES,
+            NUMBER_OF_OPEN_FILES_DESCRIPTION
+        );
+        return sensor;
+    }
+
+    public static Sensor numberOfFileErrorsSensor(final StreamsMetricsImpl 
streamsMetrics,
+                                                  final RocksDBMetricContext 
metricContext) {
+        final Sensor sensor = createSensor(streamsMetrics, metricContext, 
NUMBER_OF_FILE_ERRORS);
+        addSumMetricToSensor(
+            sensor,
+            STATE_LEVEL_GROUP,
+            streamsMetrics
+                .storeLevelTagMap(metricContext.taskName(), 
metricContext.storeType(), metricContext.storeName()),
+            NUMBER_OF_FILE_ERRORS,
+            NUMBER_OF_FILE_ERRORS_DESCRIPTION
+        );
+        return sensor;
+    }
+
+    private static Sensor createSensor(final StreamsMetricsImpl streamsMetrics,
+                                       final RocksDBMetricContext 
metricContext,
+                                       final String sensorName) {
+        return streamsMetrics.storeLevelSensor(
+            metricContext.taskName(),
+            metricContext.storeName(),
+            sensorName,
+            RecordingLevel.DEBUG);
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index f381a58..678d9f3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals.metrics;
 
-
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -28,6 +27,7 @@ import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -37,6 +37,8 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
@@ -49,6 +51,15 @@ public class StreamsMetricsImplTest extends EasyMockSupport {
     private final static String SENSOR_NAME_DELIMITER = ".s.";
     private final static String INTERNAL_PREFIX = "internal";
 
+    private final Metrics metrics = new Metrics();
+    private final Sensor sensor = metrics.sensor("dummy");
+    private final String metricNamePrefix = "metric";
+    private final String group = "group";
+    private final Map<String, String> tags = mkMap(mkEntry("tag", "value"));
+    private final String description1 = "description number one";
+    private final String description2 = "description number two";
+    private final MockTime time = new MockTime(0);
+
     @Test
     public void shouldGetThreadLevelSensor() {
         final Metrics metrics = mock(Metrics.class);
@@ -230,6 +241,106 @@ public class StreamsMetricsImplTest extends 
EasyMockSupport {
             assertEquals(i, 
Math.round(totalMetric.measurable().measure(config, time.milliseconds())));
             sensor.record(latency, time.milliseconds());
         }
+    }
+
+    @Test
+    public void shouldGetStoreLevelTagMap() {
+        final String threadName = "test-thread";
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, threadName);
+        final String taskName = "test-task";
+        final String storeType = "remote-window";
+        final String storeName = "window-keeper";
+
+        final Map<String, String> tagMap = 
streamsMetrics.storeLevelTagMap(taskName, storeType, storeName);
 
+        assertThat(tagMap.size(), equalTo(3));
+        assertThat(tagMap.get(StreamsMetricsImpl.THREAD_ID_TAG), 
equalTo(threadName));
+        assertThat(tagMap.get(StreamsMetricsImpl.TASK_ID_TAG), 
equalTo(taskName));
+        assertThat(tagMap.get(storeType + "-" + 
StreamsMetricsImpl.STORE_ID_TAG), equalTo(storeName));
+    }
+
+    @Test
+    public void shouldAddAmountRateAndSum() {
+        StreamsMetricsImpl
+            .addRateOfSumAndSumMetricsToSensor(sensor, group, tags, 
metricNamePrefix, description1, description2);
+
+        final double valueToRecord1 = 18.0;
+        final double valueToRecord2 = 72.0;
+        final long defaultWindowSizeInSeconds = Duration.ofMillis(new 
MetricConfig().timeWindowMs()).getSeconds();
+        final double expectedRateMetricValue = (valueToRecord1 + 
valueToRecord2) / defaultWindowSizeInSeconds;
+        verifyMetric(metricNamePrefix + "-rate", description1, valueToRecord1, 
valueToRecord2, expectedRateMetricValue);
+        final double expectedSumMetricValue = 2 * valueToRecord1 + 2 * 
valueToRecord2; // values are recorded once for each metric verification
+        verifyMetric(metricNamePrefix + "-total", description2, 
valueToRecord1, valueToRecord2, expectedSumMetricValue);
+        assertThat(metrics.metrics().size(), equalTo(2 + 1)); // one metric is 
added automatically in the constructor of Metrics
+    }
+
+    @Test
+    public void shouldAddSum() {
+        StreamsMetricsImpl.addSumMetricToSensor(sensor, group, tags, 
metricNamePrefix, description1);
+
+        final double valueToRecord1 = 18.0;
+        final double valueToRecord2 = 42.0;
+        final double expectedSumMetricValue = valueToRecord1 + valueToRecord2;
+        verifyMetric(metricNamePrefix + "-total", description1, 
valueToRecord1, valueToRecord2, expectedSumMetricValue);
+        assertThat(metrics.metrics().size(), equalTo(1 + 1)); // one metric is 
added automatically in the constructor of Metrics
+    }
+
+    @Test
+    public void shouldAddAmountRate() {
+        StreamsMetricsImpl.addRateOfSumMetricToSensor(sensor, group, tags, 
metricNamePrefix, description1);
+
+        final double valueToRecord1 = 18.0;
+        final double valueToRecord2 = 72.0;
+        final long defaultWindowSizeInSeconds = Duration.ofMillis(new 
MetricConfig().timeWindowMs()).getSeconds();
+        final double expectedRateMetricValue = (valueToRecord1 + 
valueToRecord2) / defaultWindowSizeInSeconds;
+        verifyMetric(metricNamePrefix + "-rate", description1, valueToRecord1, 
valueToRecord2, expectedRateMetricValue);
+        assertThat(metrics.metrics().size(), equalTo(1 + 1)); // one metric is 
added automatically in the constructor of Metrics
+    }
+
+    @Test
+    public void shouldAddValue() {
+        StreamsMetricsImpl.addValueMetricToSensor(sensor, group, tags, 
metricNamePrefix, description1);
+
+        final KafkaMetric ratioMetric = metrics.metric(new 
MetricName(metricNamePrefix, group, description1, tags));
+        assertThat(ratioMetric, is(notNullValue()));
+        final MetricConfig metricConfig = new MetricConfig();
+        final double value1 = 42.0;
+        sensor.record(value1);
+        assertThat(ratioMetric.measurable().measure(metricConfig, 
time.milliseconds()), equalTo(42.0));
+        final double value2 = 18.0;
+        sensor.record(value2);
+        assertThat(ratioMetric.measurable().measure(metricConfig, 
time.milliseconds()), equalTo(18.0));
+        assertThat(metrics.metrics().size(), equalTo(1 + 1)); // one metric is 
added automatically in the constructor of Metrics
+    }
+
+    @Test
+    public void shouldAddAvgAndTotalMetricsToSensor() {
+        StreamsMetricsImpl
+            .addAvgAndSumMetricsToSensor(sensor, group, tags, 
metricNamePrefix, description1, description2);
+
+        final double valueToRecord1 = 18.0;
+        final double valueToRecord2 = 42.0;
+        final double expectedAvgMetricValue = (valueToRecord1 + 
valueToRecord2) / 2;
+        verifyMetric(metricNamePrefix + "-avg", description1, valueToRecord1, 
valueToRecord2, expectedAvgMetricValue);
+        final double expectedSumMetricValue = 2 * valueToRecord1 + 2 * 
valueToRecord2; // values are recorded once for each metric verification
+        verifyMetric(metricNamePrefix + "-total", description2, 
valueToRecord1, valueToRecord2, expectedSumMetricValue);
+        assertThat(metrics.metrics().size(), equalTo(2 + 1)); // one metric is 
added automatically in the constructor of Metrics
+    }
+
+    private void verifyMetric(final String name,
+                              final String description,
+                              final double valueToRecord1,
+                              final double valueToRecord2,
+                              final double expectedMetricValue) {
+        final KafkaMetric metric = metrics
+            .metric(new MetricName(name, group, description, tags));
+        assertThat(metric, is(notNullValue()));
+        assertThat(metric.metricName().description(), equalTo(description));
+        sensor.record(valueToRecord1, time.milliseconds());
+        sensor.record(valueToRecord2, time.milliseconds());
+        assertThat(
+            metric.measurable().measure(new MetricConfig(), 
time.milliseconds()),
+            equalTo(expectedMetricValue)
+        );
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsTest.java
new file mode 100644
index 0000000..6c8b9d8
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsTest.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals.metrics;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import 
org.apache.kafka.streams.state.internals.metrics.RocksDBMetrics.RocksDBMetricContext;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.easymock.EasyMock.expect;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.powermock.api.easymock.PowerMock.createStrictMock;
+import static org.powermock.api.easymock.PowerMock.mockStatic;
+import static org.powermock.api.easymock.PowerMock.replay;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.powermock.api.easymock.PowerMock.verify;
+import static org.powermock.api.easymock.PowerMock.verifyAll;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(StreamsMetricsImpl.class)
+public class RocksDBMetricsTest {
+
+    private static final String STATE_LEVEL_GROUP = "stream-state-metrics";
+    private static final String STORE_TYPE_PREFIX = "rocksdb-";
+    private final String taskName = "task";
+    private final String storeType = "storeType";
+    private final String storeName = "store";
+    private final Metrics metrics = new Metrics();
+    private final Sensor sensor = metrics.sensor("dummy");
+    private final StreamsMetricsImpl streamsMetrics = 
createStrictMock(StreamsMetricsImpl.class);
+    private final Map<String, String> tags = Collections.singletonMap("hello", 
"world");
+
+    private interface SensorCreator {
+        Sensor sensor(final StreamsMetricsImpl streamsMetrics, final 
RocksDBMetricContext metricContext);
+    }
+
+    @Test
+    public void shouldGetBytesWrittenSensor() {
+        final String metricNamePrefix = "bytes-written";
+        final String descriptionOfTotal = "Total number of bytes written to 
the RocksDB state store";
+        final String descriptionOfRate = "Average number of bytes written per 
second to the RocksDB state store";
+        verifyRateAndTotalSensor(
+            metricNamePrefix,
+            descriptionOfTotal,
+            descriptionOfRate,
+            RocksDBMetrics::bytesWrittenToDatabaseSensor
+        );
+    }
+
+    @Test
+    public void shouldGetBytesReadSensor() {
+        final String metricNamePrefix = "bytes-read";
+        final String descriptionOfTotal = "Total number of bytes read from the 
RocksDB state store";
+        final String descriptionOfRate = "Average number of bytes read per 
second from the RocksDB state store";
+        verifyRateAndTotalSensor(
+            metricNamePrefix,
+            descriptionOfTotal,
+            descriptionOfRate,
+            RocksDBMetrics::bytesReadFromDatabaseSensor
+        );
+    }
+
+    @Test
+    public void shouldGetMemtableHitRatioSensor() {
+        final String metricNamePrefix = "memtable-hit-ratio";
+        final String description = "Ratio of memtable hits relative to all 
lookups to the memtable";
+        verifyValueSensor(metricNamePrefix, description, 
RocksDBMetrics::memtableHitRatioSensor);
+    }
+
+    @Test
+    public void shouldGetMemtableBytesFlushedSensor() {
+        final String metricNamePrefix = "memtable-bytes-flushed";
+        final String descriptionOfTotal = "Total number of bytes flushed from 
the memtable to disk";
+        final String descriptionOfRate = "Average number of bytes flushed per 
second from the memtable to disk";
+        verifyRateAndTotalSensor(
+            metricNamePrefix,
+            descriptionOfTotal,
+            descriptionOfRate,
+            RocksDBMetrics::memtableBytesFlushedSensor
+        );
+    }
+
+    @Test
+    public void shouldGetMemtableAvgFlushTimeSensor() {
+        final String metricNamePrefix = "memtable-flush-time-avg";
+        final String description = "Average time spent on flushing the 
memtable to disk in ms";
+        verifyValueSensor(metricNamePrefix, description, 
RocksDBMetrics::memtableAvgFlushTimeSensor);
+    }
+
+    @Test
+    public void shouldGetMemtableMinFlushTimeSensor() {
+        final String metricNamePrefix = "memtable-flush-time-min";
+        final String description = "Minimum time spent on flushing the 
memtable to disk in ms";
+        verifyValueSensor(metricNamePrefix, description, 
RocksDBMetrics::memtableMinFlushTimeSensor);
+    }
+
+    @Test
+    public void shouldGetMemtableMaxFlushTimeSensor() {
+        final String metricNamePrefix = "memtable-flush-time-max";
+        final String description = "Maximum time spent on flushing the 
memtable to disk in ms";
+        verifyValueSensor(metricNamePrefix, description, 
RocksDBMetrics::memtableMaxFlushTimeSensor);
+    }
+
+    @Test
+    public void shouldGetWriteStallDurationSensor() {
+        final String metricNamePrefix = "write-stall-duration";
+        final String descriptionOfAvg = "Average duration of write stalls in 
ms";
+        final String descriptionOfTotal = "Total duration of write stalls in 
ms";
+        setupStreamsMetricsMock(metricNamePrefix);
+        StreamsMetricsImpl.addAvgAndSumMetricsToSensor(
+            sensor,
+            STATE_LEVEL_GROUP,
+            tags,
+            metricNamePrefix,
+            descriptionOfAvg,
+            descriptionOfTotal
+        );
+
+        replayCallAndVerify(RocksDBMetrics::writeStallDurationSensor);
+    }
+
+    @Test
+    public void shouldGetBlockCacheDataHitRatioSensor() {
+        final String metricNamePrefix = "block-cache-data-hit-ratio";
+        final String description =
+            "Ratio of block cache hits for data relative to all lookups for 
data to the block cache";
+        verifyValueSensor(metricNamePrefix, description, 
RocksDBMetrics::blockCacheDataHitRatioSensor);
+    }
+
+    @Test
+    public void shouldGetBlockCacheIndexHitRatioSensor() {
+        final String metricNamePrefix = "block-cache-index-hit-ratio";
+        final String description =
+            "Ratio of block cache hits for indexes relative to all lookups for 
indexes to the block cache";
+        verifyValueSensor(metricNamePrefix, description, 
RocksDBMetrics::blockCacheIndexHitRatioSensor);
+    }
+
+    @Test
+    public void shouldGetBlockCacheFilterHitRatioSensor() {
+        final String metricNamePrefix = "block-cache-filter-hit-ratio";
+        final String description =
+            "Ratio of block cache hits for filters relative to all lookups for 
filters to the block cache";
+        verifyValueSensor(metricNamePrefix, description, 
RocksDBMetrics::blockCacheFilterHitRatioSensor);
+    }
+
+    @Test
+    public void shouldGetBytesReadDuringCompactionSensor() {
+        final String metricNamePrefix = "bytes-read-compaction";
+        final String description = "Average number of bytes read per second 
during compaction";
+        verifyRateSensor(metricNamePrefix, description, 
RocksDBMetrics::bytesReadDuringCompactionSensor);
+    }
+
+    @Test
+    public void shouldGetBytesWrittenDuringCompactionSensor() {
+        final String metricNamePrefix = "bytes-written-compaction";
+        final String description = "Average number of bytes written per second 
during compaction";
+        verifyRateSensor(metricNamePrefix, description, 
RocksDBMetrics::bytesWrittenDuringCompactionSensor);
+    }
+
+    @Test
+    public void shouldGetCompactionTimeAvgSensor() {
+        final String metricNamePrefix = "compaction-time-avg";
+        final String description = "Average time spent on compaction in ms";
+        verifyValueSensor(metricNamePrefix, description, 
RocksDBMetrics::compactionTimeAvgSensor);
+    }
+
+    @Test
+    public void shouldGetCompactionTimeMinSensor() {
+        final String metricNamePrefix = "compaction-time-min";
+        final String description = "Minimum time spent on compaction in ms";
+        verifyValueSensor(metricNamePrefix, description, 
RocksDBMetrics::compactionTimeMinSensor);
+    }
+
+    @Test
+    public void shouldGetCompactionTimeMaxSensor() {
+        final String metricNamePrefix = "compaction-time-max";
+        final String description = "Maximum time spent on compaction in ms";
+        verifyValueSensor(metricNamePrefix, description, 
RocksDBMetrics::compactionTimeMaxSensor);
+    }
+
+    @Test
+    public void shouldGetNumberOfOpenFilesSensor() {
+        final String metricNamePrefix = "number-open-files";
+        final String description = "Number of currently open files";
+        verifyValueSensor(metricNamePrefix, description, 
RocksDBMetrics::numberOfOpenFilesSensor);
+    }
+
+    @Test
+    public void shouldGetNumberOfFilesErrors() {
+        final String metricNamePrefix = "number-file-errors";
+        final String description = "Total number of file errors occurred";
+        setupStreamsMetricsMock(metricNamePrefix);
+        StreamsMetricsImpl.addSumMetricToSensor(sensor, STATE_LEVEL_GROUP, 
tags, metricNamePrefix, description);
+
+        replayCallAndVerify(RocksDBMetrics::numberOfFileErrorsSensor);
+    }
+
+    private void verifyRateAndTotalSensor(final String metricNamePrefix,
+                                          final String descriptionOfTotal,
+                                          final String descriptionOfRate,
+                                          final SensorCreator sensorCreator) {
+        setupStreamsMetricsMock(metricNamePrefix);
+        StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor(
+            sensor,
+            STATE_LEVEL_GROUP,
+            tags,
+            metricNamePrefix,
+            descriptionOfRate,
+            descriptionOfTotal
+        );
+
+        replayCallAndVerify(sensorCreator);
+    }
+
+    private void verifyRateSensor(final String metricNamePrefix,
+                                  final String description,
+                                  final SensorCreator sensorCreator) {
+        setupStreamsMetricsMock(metricNamePrefix);
+        StreamsMetricsImpl.addRateOfSumMetricToSensor(sensor, 
STATE_LEVEL_GROUP, tags, metricNamePrefix, description);
+
+        replayCallAndVerify(sensorCreator);
+    }
+
+    private void verifyValueSensor(final String metricNamePrefix,
+                                   final String description,
+                                   final SensorCreator sensorCreator) {
+        setupStreamsMetricsMock(metricNamePrefix);
+        StreamsMetricsImpl.addValueMetricToSensor(sensor, STATE_LEVEL_GROUP, 
tags, metricNamePrefix, description);
+
+        replayCallAndVerify(sensorCreator);
+    }
+
+    private void setupStreamsMetricsMock(final String metricNamePrefix) {
+        mockStatic(StreamsMetricsImpl.class);
+        expect(streamsMetrics.storeLevelSensor(
+            taskName,
+            storeName,
+            metricNamePrefix,
+            RecordingLevel.DEBUG
+        )).andReturn(sensor);
+        expect(streamsMetrics.storeLevelTagMap(
+            taskName,
+            STORE_TYPE_PREFIX + storeType,
+            storeName
+        )).andReturn(tags);
+    }
+
+    private void replayCallAndVerify(final SensorCreator sensorCreator) {
+        replayAll();
+        replay(StreamsMetricsImpl.class);
+
+        final Sensor sensor =
+            sensorCreator.sensor(streamsMetrics, new 
RocksDBMetricContext(taskName, storeType, storeName));
+
+        verifyAll();
+        verify(StreamsMetricsImpl.class);
+
+        assertThat(sensor, is(this.sensor));
+    }
+}

Reply via email to