This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 7663a6c Minor: Refactor methods to add metrics to sensor in `StreamsMetricsImpl` (#7161) 7663a6c is described below commit 7663a6c44daae5d72f38cbba79d728416e11167d Author: cadonna <br...@confluent.io> AuthorDate: Tue Aug 6 17:51:08 2019 +0200 Minor: Refactor methods to add metrics to sensor in `StreamsMetricsImpl` (#7161) Renames method names in StreamsMetricsImpl to make them consistent. Reviewers: A. Sophie Blee-Goldman <sop...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- .../streams/kstream/internals/metrics/Sensors.java | 2 +- .../streams/processor/internals/ProcessorNode.java | 13 +++--- .../internals/metrics/StreamsMetricsImpl.java | 50 +++++++++++----------- .../processor/internals/metrics/ThreadMetrics.java | 30 ++++++------- .../AbstractRocksDBSegmentedBytesStore.java | 4 +- .../state/internals/InMemorySessionStore.java | 8 ++-- .../state/internals/InMemoryWindowStore.java | 4 +- .../streams/state/internals/metrics/Sensors.java | 12 +++--- .../internals/metrics/StreamsMetricsImplTest.java | 20 ++++----- .../internals/metrics/ThreadMetricsTest.java | 26 +++++------ 10 files changed, 85 insertions(+), 84 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java index 038b8ac..363ec6e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java @@ -44,7 +44,7 @@ public class Sensors { LATE_RECORD_DROP, Sensor.RecordingLevel.INFO ); - StreamsMetricsImpl.addInvocationRateAndCount( + StreamsMetricsImpl.addInvocationRateAndCountToSensor( sensor, PROCESSOR_NODE_METRICS_GROUP, metrics.tagMap("task-id", context.taskId().toString(), PROCESSOR_NODE_ID_TAG, context.currentNode().name()), diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 01e3e56..bc66ede 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -33,8 +33,7 @@ import java.util.Set; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_ID_TAG; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP; -import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency; -import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxLatencyToSensor; public class ProcessorNode<K, V> { @@ -232,12 +231,14 @@ public class ProcessorNode<K, V> { final Map<String, String> taskTags, final Map<String, String> nodeTags) { final Sensor parent = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG); - addAvgMaxLatency(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); - addInvocationRateAndCount(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); + addAvgAndMaxLatencyToSensor(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); + StreamsMetricsImpl + .addInvocationRateAndCountToSensor(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); final Sensor sensor = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent); - addAvgMaxLatency(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); - addInvocationRateAndCount(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); + addAvgAndMaxLatencyToSensor(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); + StreamsMetricsImpl + .addInvocationRateAndCountToSensor(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); return sensor; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index ae3d953..5ac2f33 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -344,13 +344,13 @@ public class StreamsMetricsImpl implements StreamsMetrics { // first add the global operation metrics if not yet, with the global tags only final Sensor parent = metrics.sensor(externalParentSensorName(operationName), recordingLevel); - addAvgMaxLatency(parent, group, allTagMap, operationName); - addInvocationRateAndCount(parent, group, allTagMap, operationName); + addAvgAndMaxLatencyToSensor(parent, group, allTagMap, operationName); + addInvocationRateAndCountToSensor(parent, group, allTagMap, operationName); // add the operation metrics with additional tags final Sensor sensor = metrics.sensor(externalChildSensorName(operationName, entityName), recordingLevel, parent); - addAvgMaxLatency(sensor, group, tagMap, operationName); - addInvocationRateAndCount(sensor, group, tagMap, operationName); + addAvgAndMaxLatencyToSensor(sensor, group, tagMap, operationName); + addInvocationRateAndCountToSensor(sensor, group, tagMap, operationName); parentSensors.put(sensor, parent); @@ -374,11 +374,11 @@ public class StreamsMetricsImpl implements StreamsMetrics { // first add the global operation metrics if not yet, with the global tags only final Sensor parent = metrics.sensor(externalParentSensorName(operationName), recordingLevel); - addInvocationRateAndCount(parent, group, allTagMap, operationName); + addInvocationRateAndCountToSensor(parent, group, allTagMap, operationName); // add the operation metrics with additional tags final Sensor sensor = metrics.sensor(externalChildSensorName(operationName, entityName), recordingLevel, parent); - addInvocationRateAndCount(sensor, group, tagMap, operationName); + addInvocationRateAndCountToSensor(sensor, group, tagMap, operationName); parentSensors.put(sensor, parent); @@ -397,10 +397,10 @@ public class StreamsMetricsImpl implements StreamsMetrics { } - public static void addAvgAndMax(final Sensor sensor, - final String group, - final Map<String, String> tags, - final String operation) { + public static void addAvgAndMaxToSensor(final Sensor sensor, + final String group, + final Map<String, String> tags, + final String operation) { sensor.add( new MetricName( operation + AVG_SUFFIX, @@ -419,10 +419,10 @@ public class StreamsMetricsImpl implements StreamsMetrics { ); } - public static void addAvgMaxLatency(final Sensor sensor, - final String group, - final Map<String, String> tags, - final String operation) { + public static void addAvgAndMaxLatencyToSensor(final Sensor sensor, + final String group, + final Map<String, String> tags, + final String operation) { sensor.add( new MetricName( operation + "-latency-avg", @@ -441,12 +441,12 @@ public class StreamsMetricsImpl implements StreamsMetrics { ); } - public static void addInvocationRateAndCount(final Sensor sensor, - final String group, - final Map<String, String> tags, - final String operation, - final String descriptionOfInvocation, - final String descriptionOfRate) { + public static void addInvocationRateAndCountToSensor(final Sensor sensor, + final String group, + final Map<String, String> tags, + final String operation, + final String descriptionOfInvocation, + final String descriptionOfRate) { sensor.add( new MetricName( operation + TOTAL_SUFFIX, @@ -467,11 +467,11 @@ public class StreamsMetricsImpl implements StreamsMetrics { ); } - public static void addInvocationRateAndCount(final Sensor sensor, - final String group, - final Map<String, String> tags, - final String operation) { - addInvocationRateAndCount( + public static void addInvocationRateAndCountToSensor(final Sensor sensor, + final String group, + final Map<String, String> tags, + final String operation) { + addInvocationRateAndCountToSensor( sensor, group, tags, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java index e177667..f8b7836 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java @@ -26,8 +26,8 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_ID_TAG; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_LEVEL_GROUP; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP; -import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMax; -import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor; public class ThreadMetrics { private ThreadMetrics() {} @@ -74,7 +74,7 @@ public class ThreadMetrics { public static Sensor createTaskSensor(final StreamsMetricsImpl streamsMetrics) { final Sensor createTaskSensor = streamsMetrics.threadLevelSensor(CREATE_TASK, RecordingLevel.INFO); - addInvocationRateAndCount(createTaskSensor, + addInvocationRateAndCountToSensor(createTaskSensor, THREAD_LEVEL_GROUP, streamsMetrics.threadLevelTagMap(), CREATE_TASK, @@ -85,7 +85,7 @@ public class ThreadMetrics { public static Sensor closeTaskSensor(final StreamsMetricsImpl streamsMetrics) { final Sensor closeTaskSensor = streamsMetrics.threadLevelSensor(CLOSE_TASK, RecordingLevel.INFO); - addInvocationRateAndCount(closeTaskSensor, + addInvocationRateAndCountToSensor(closeTaskSensor, THREAD_LEVEL_GROUP, streamsMetrics.threadLevelTagMap(), CLOSE_TASK, @@ -97,8 +97,8 @@ public class ThreadMetrics { public static Sensor commitSensor(final StreamsMetricsImpl streamsMetrics) { final Sensor commitSensor = streamsMetrics.threadLevelSensor(COMMIT, Sensor.RecordingLevel.INFO); final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(); - addAvgAndMax(commitSensor, THREAD_LEVEL_GROUP, tagMap, COMMIT_LATENCY); - addInvocationRateAndCount(commitSensor, + addAvgAndMaxToSensor(commitSensor, THREAD_LEVEL_GROUP, tagMap, COMMIT_LATENCY); + addInvocationRateAndCountToSensor(commitSensor, THREAD_LEVEL_GROUP, tagMap, COMMIT, @@ -110,8 +110,8 @@ public class ThreadMetrics { public static Sensor pollSensor(final StreamsMetricsImpl streamsMetrics) { final Sensor pollSensor = streamsMetrics.threadLevelSensor(POLL, Sensor.RecordingLevel.INFO); final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(); - addAvgAndMax(pollSensor, THREAD_LEVEL_GROUP, tagMap, POLL_LATENCY); - addInvocationRateAndCount(pollSensor, + addAvgAndMaxToSensor(pollSensor, THREAD_LEVEL_GROUP, tagMap, POLL_LATENCY); + addInvocationRateAndCountToSensor(pollSensor, THREAD_LEVEL_GROUP, tagMap, POLL, @@ -123,8 +123,8 @@ public class ThreadMetrics { public static Sensor processSensor(final StreamsMetricsImpl streamsMetrics) { final Sensor processSensor = streamsMetrics.threadLevelSensor(PROCESS, Sensor.RecordingLevel.INFO); final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(); - addAvgAndMax(processSensor, THREAD_LEVEL_GROUP, tagMap, PROCESS_LATENCY); - addInvocationRateAndCount(processSensor, + addAvgAndMaxToSensor(processSensor, THREAD_LEVEL_GROUP, tagMap, PROCESS_LATENCY); + addInvocationRateAndCountToSensor(processSensor, THREAD_LEVEL_GROUP, tagMap, PROCESS, @@ -137,8 +137,8 @@ public class ThreadMetrics { public static Sensor punctuateSensor(final StreamsMetricsImpl streamsMetrics) { final Sensor punctuateSensor = streamsMetrics.threadLevelSensor(PUNCTUATE, Sensor.RecordingLevel.INFO); final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(); - addAvgAndMax(punctuateSensor, THREAD_LEVEL_GROUP, tagMap, PUNCTUATE_LATENCY); - addInvocationRateAndCount(punctuateSensor, + addAvgAndMaxToSensor(punctuateSensor, THREAD_LEVEL_GROUP, tagMap, PUNCTUATE_LATENCY); + addInvocationRateAndCountToSensor(punctuateSensor, THREAD_LEVEL_GROUP, tagMap, PUNCTUATE, @@ -150,7 +150,7 @@ public class ThreadMetrics { public static Sensor skipRecordSensor(final StreamsMetricsImpl streamsMetrics) { final Sensor skippedRecordsSensor = streamsMetrics.threadLevelSensor(SKIP_RECORD, Sensor.RecordingLevel.INFO); - addInvocationRateAndCount(skippedRecordsSensor, + addInvocationRateAndCountToSensor(skippedRecordsSensor, THREAD_LEVEL_GROUP, streamsMetrics.threadLevelTagMap(), SKIP_RECORD, @@ -163,11 +163,11 @@ public class ThreadMetrics { public static Sensor commitOverTasksSensor(final StreamsMetricsImpl streamsMetrics) { final Sensor commitOverTasksSensor = streamsMetrics.threadLevelSensor(COMMIT, Sensor.RecordingLevel.DEBUG); final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(TASK_ID_TAG, ALL_TASKS); - addAvgAndMax(commitOverTasksSensor, + addAvgAndMaxToSensor(commitOverTasksSensor, TASK_LEVEL_GROUP, tagMap, COMMIT_LATENCY); - addInvocationRateAndCount(commitOverTasksSensor, + addInvocationRateAndCountToSensor(commitOverTasksSensor, TASK_LEVEL_GROUP, tagMap, COMMIT, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index ef18d3c..97dc8d5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -41,7 +41,7 @@ import java.util.Map; import java.util.Set; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP; -import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor; public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements SegmentedBytesStore { private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSegmentedBytesStore.class); @@ -182,7 +182,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se EXPIRED_WINDOW_RECORD_DROP, Sensor.RecordingLevel.INFO ); - addInvocationRateAndCount( + addInvocationRateAndCountToSensor( expiredRecordSensor, "stream-" + metricScope + "-metrics", metrics.tagMap("task-id", taskName, metricScope + "-id", name()), diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index ebe9878..6c64b04 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -16,9 +16,6 @@ */ package org.apache.kafka.streams.state.internals; -import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP; -import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount; - import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; @@ -43,6 +40,9 @@ import org.apache.kafka.streams.state.SessionStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor; + public class InMemorySessionStore implements SessionStore<Bytes, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(InMemorySessionStore.class); @@ -82,7 +82,7 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> { EXPIRED_WINDOW_RECORD_DROP, Sensor.RecordingLevel.INFO ); - addInvocationRateAndCount( + addInvocationRateAndCountToSensor( expiredRecordSensor, "stream-" + metricScope + "-metrics", metrics.tagMap("task-id", taskName, metricScope + "-id", name()), diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index 8063410..1a3e26b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -43,7 +43,7 @@ import java.util.Map; import java.util.NoSuchElementException; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP; -import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor; import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreKeyBytes; import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp; @@ -98,7 +98,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> { EXPIRED_WINDOW_RECORD_DROP, Sensor.RecordingLevel.INFO ); - addInvocationRateAndCount( + addInvocationRateAndCountToSensor( expiredRecordSensor, "stream-" + metricScope + "-metrics", metrics.tagMap("task-id", taskName, metricScope + "-id", name()), diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java index 13a39c6..8ed4d47 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java @@ -27,8 +27,8 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import java.util.Map; -import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency; -import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxLatencyToSensor; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor; public final class Sensors { private Sensors() {} @@ -42,11 +42,11 @@ public final class Sensors { final Map<String, String> taskTags, final Map<String, String> storeTags) { final Sensor taskSensor = metrics.taskLevelSensor(taskName, operation, level); - addAvgMaxLatency(taskSensor, metricsGroup, taskTags, operation); - addInvocationRateAndCount(taskSensor, metricsGroup, taskTags, operation); + addAvgAndMaxLatencyToSensor(taskSensor, metricsGroup, taskTags, operation); + addInvocationRateAndCountToSensor(taskSensor, metricsGroup, taskTags, operation); final Sensor sensor = metrics.storeLevelSensor(taskName, storeName, operation, level, taskSensor); - addAvgMaxLatency(sensor, metricsGroup, storeTags, operation); - addInvocationRateAndCount(sensor, metricsGroup, storeTags, operation); + addAvgAndMaxLatencyToSensor(sensor, metricsGroup, storeTags, operation); + addInvocationRateAndCountToSensor(sensor, metricsGroup, storeTags, operation); return sensor; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java index 678d9f3..4fd6f88 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java @@ -35,8 +35,8 @@ import java.util.concurrent.TimeUnit; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP; -import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency; -import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxLatencyToSensor; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; @@ -131,14 +131,14 @@ public class StreamsMetricsImplTest extends EasyMockSupport { final Map<String, String> nodeTags = mkMap(mkEntry("nkey", "value")); final Sensor parent1 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG); - addAvgMaxLatency(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); - addInvocationRateAndCount(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation, "", ""); + addAvgAndMaxLatencyToSensor(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); + addInvocationRateAndCountToSensor(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation, "", ""); final int numberOfTaskMetrics = registry.metrics().size(); final Sensor sensor1 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent1); - addAvgMaxLatency(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); - addInvocationRateAndCount(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation, "", ""); + addAvgAndMaxLatencyToSensor(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); + addInvocationRateAndCountToSensor(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation, "", ""); assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics)); @@ -147,14 +147,14 @@ public class StreamsMetricsImplTest extends EasyMockSupport { assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics)); final Sensor parent2 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG); - addAvgMaxLatency(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); - addInvocationRateAndCount(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation, "", ""); + addAvgAndMaxLatencyToSensor(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); + addInvocationRateAndCountToSensor(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation, "", ""); assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics)); final Sensor sensor2 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent2); - addAvgMaxLatency(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); - addInvocationRateAndCount(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation, "", ""); + addAvgAndMaxLatencyToSensor(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); + addInvocationRateAndCountToSensor(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation, "", ""); assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java index 89395d9..739f028 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java @@ -59,7 +59,7 @@ public class ThreadMetricsTest { mockStatic(StreamsMetricsImpl.class); expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor); expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap); - StreamsMetricsImpl.addInvocationRateAndCount( + StreamsMetricsImpl.addInvocationRateAndCountToSensor( dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); replayAll(); @@ -81,7 +81,7 @@ public class ThreadMetricsTest { mockStatic(StreamsMetricsImpl.class); expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor); expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap); - StreamsMetricsImpl.addInvocationRateAndCount( + StreamsMetricsImpl.addInvocationRateAndCountToSensor( dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); replayAll(); @@ -104,9 +104,9 @@ public class ThreadMetricsTest { mockStatic(StreamsMetricsImpl.class); expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor); expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap); - StreamsMetricsImpl.addInvocationRateAndCount( + StreamsMetricsImpl.addInvocationRateAndCountToSensor( dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); - StreamsMetricsImpl.addAvgAndMax( + StreamsMetricsImpl.addAvgAndMaxToSensor( dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency); replayAll(); @@ -129,9 +129,9 @@ public class ThreadMetricsTest { mockStatic(StreamsMetricsImpl.class); expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor); expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap); - StreamsMetricsImpl.addInvocationRateAndCount( + StreamsMetricsImpl.addInvocationRateAndCountToSensor( dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); - StreamsMetricsImpl.addAvgAndMax( + StreamsMetricsImpl.addAvgAndMaxToSensor( dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency); replayAll(); @@ -154,9 +154,9 @@ public class ThreadMetricsTest { mockStatic(StreamsMetricsImpl.class); expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor); expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap); - StreamsMetricsImpl.addInvocationRateAndCount( + StreamsMetricsImpl.addInvocationRateAndCountToSensor( dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); - StreamsMetricsImpl.addAvgAndMax( + StreamsMetricsImpl.addAvgAndMaxToSensor( dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency); replayAll(); @@ -179,9 +179,9 @@ public class ThreadMetricsTest { mockStatic(StreamsMetricsImpl.class); expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor); expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap); - StreamsMetricsImpl.addInvocationRateAndCount( + StreamsMetricsImpl.addInvocationRateAndCountToSensor( dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); - StreamsMetricsImpl.addAvgAndMax( + StreamsMetricsImpl.addAvgAndMaxToSensor( dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency); replayAll(); @@ -203,7 +203,7 @@ public class ThreadMetricsTest { mockStatic(StreamsMetricsImpl.class); expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor); expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap); - StreamsMetricsImpl.addInvocationRateAndCount( + StreamsMetricsImpl.addInvocationRateAndCountToSensor( dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); replayAll(); @@ -226,9 +226,9 @@ public class ThreadMetricsTest { mockStatic(StreamsMetricsImpl.class); expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.DEBUG)).andReturn(dummySensor); expect(streamsMetrics.threadLevelTagMap(TASK_ID_TAG, ALL_TASKS)).andReturn(dummyTagMap); - StreamsMetricsImpl.addInvocationRateAndCount( + StreamsMetricsImpl.addInvocationRateAndCountToSensor( dummySensor, TASK_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription); - StreamsMetricsImpl.addAvgAndMax( + StreamsMetricsImpl.addAvgAndMaxToSensor( dummySensor, TASK_LEVEL_GROUP, dummyTagMap, operationLatency); replayAll();