This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new a5d71e1550 MINOR: Fix static mock usage in ThreadMetricsTest (#12454) a5d71e1550 is described below commit a5d71e1550c5a2b670347c0c3bafb0b195bf916c Author: Bruno Cadonna <cado...@apache.org> AuthorDate: Thu Jul 28 22:32:46 2022 +0200 MINOR: Fix static mock usage in ThreadMetricsTest (#12454) Before this PR the calls to the static methods on StreamsMetricsImpl were just calls and not a verification on the mock. This miss happened during the switch from EasyMock to Mockito. Reviewers: Guozhang Wang <wangg...@gmail.com> --- .../internals/metrics/ThreadMetricsTest.java | 422 +++++++++++---------- 1 file changed, 224 insertions(+), 198 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java index 6ed97ebf7c..3d2aaa20c8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java @@ -24,12 +24,14 @@ import org.junit.Test; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.Collections; import java.util.Map; import org.mockito.ArgumentCaptor; +import org.mockito.MockedStatic; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX; @@ -54,17 +56,20 @@ public class ThreadMetricsTest { final String ratioDescription = "The fraction of time the thread spent on processing active tasks"; when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor); when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); - StreamsMetricsImpl.addValueMetricToSensor( - expectedSensor, - THREAD_LEVEL_GROUP, - tagMap, - operation, - ratioDescription - ); - - final Sensor sensor = ThreadMetrics.processRatioSensor(THREAD_ID, streamsMetrics); - assertThat(sensor, is(expectedSensor)); + try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) { + final Sensor sensor = ThreadMetrics.processRatioSensor(THREAD_ID, streamsMetrics); + streamsMetricsStaticMock.verify( + () -> StreamsMetricsImpl.addValueMetricToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + operation, + ratioDescription + ) + ); + assertThat(sensor, is(expectedSensor)); + } } @Test @@ -74,18 +79,21 @@ public class ThreadMetricsTest { final String maxDescription = "The maximum number of records processed within an iteration"; when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor); when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); - StreamsMetricsImpl.addAvgAndMaxToSensor( - expectedSensor, - THREAD_LEVEL_GROUP, - tagMap, - operation, - avgDescription, - maxDescription - ); - final Sensor sensor = ThreadMetrics.processRecordsSensor(THREAD_ID, streamsMetrics); - - assertThat(sensor, is(expectedSensor)); + try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) { + final Sensor sensor = ThreadMetrics.processRecordsSensor(THREAD_ID, streamsMetrics); + streamsMetricsStaticMock.verify( + () -> StreamsMetricsImpl.addAvgAndMaxToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + operation, + avgDescription, + maxDescription + ) + ); + assertThat(sensor, is(expectedSensor)); + } } @Test @@ -95,18 +103,21 @@ public class ThreadMetricsTest { final String maxLatencyDescription = "The maximum process latency"; when(streamsMetrics.threadLevelSensor(THREAD_ID, operationLatency, RecordingLevel.INFO)).thenReturn(expectedSensor); when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); - StreamsMetricsImpl.addAvgAndMaxToSensor( - expectedSensor, - THREAD_LEVEL_GROUP, - tagMap, - operationLatency, - avgLatencyDescription, - maxLatencyDescription - ); - final Sensor sensor = ThreadMetrics.processLatencySensor(THREAD_ID, streamsMetrics); - - assertThat(sensor, is(expectedSensor)); + try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) { + final Sensor sensor = ThreadMetrics.processLatencySensor(THREAD_ID, streamsMetrics); + streamsMetricsStaticMock.verify( + () -> StreamsMetricsImpl.addAvgAndMaxToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + operationLatency, + avgLatencyDescription, + maxLatencyDescription + ) + ); + assertThat(sensor, is(expectedSensor)); + } } @Test @@ -117,18 +128,21 @@ public class ThreadMetricsTest { final String rateDescription = "The average per-second number of calls to process"; when(streamsMetrics.threadLevelSensor(THREAD_ID, operationRate, RecordingLevel.INFO)).thenReturn(expectedSensor); when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); - StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor( - expectedSensor, - THREAD_LEVEL_GROUP, - tagMap, - operation, - rateDescription, - totalDescription - ); - - final Sensor sensor = ThreadMetrics.processRateSensor(THREAD_ID, streamsMetrics); - assertThat(sensor, is(expectedSensor)); + try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) { + final Sensor sensor = ThreadMetrics.processRateSensor(THREAD_ID, streamsMetrics); + streamsMetricsStaticMock.verify( + () -> StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + operation, + rateDescription, + totalDescription + ) + ); + assertThat(sensor, is(expectedSensor)); + } } @Test @@ -137,17 +151,20 @@ public class ThreadMetricsTest { final String ratioDescription = "The fraction of time the thread spent on polling records from consumer"; when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor); when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); - StreamsMetricsImpl.addValueMetricToSensor( - expectedSensor, - THREAD_LEVEL_GROUP, - tagMap, - operation, - ratioDescription - ); - - final Sensor sensor = ThreadMetrics.pollRatioSensor(THREAD_ID, streamsMetrics); - assertThat(sensor, is(expectedSensor)); + try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) { + final Sensor sensor = ThreadMetrics.pollRatioSensor(THREAD_ID, streamsMetrics); + streamsMetricsStaticMock.verify( + () -> StreamsMetricsImpl.addValueMetricToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + operation, + ratioDescription + ) + ); + assertThat(sensor, is(expectedSensor)); + } } @Test @@ -157,18 +174,21 @@ public class ThreadMetricsTest { final String maxDescription = "The maximum number of records polled from consumer within an iteration"; when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor); when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); - StreamsMetricsImpl.addAvgAndMaxToSensor( - expectedSensor, - THREAD_LEVEL_GROUP, - tagMap, - operation, - avgDescription, - maxDescription - ); - - final Sensor sensor = ThreadMetrics.pollRecordsSensor(THREAD_ID, streamsMetrics); - assertThat(sensor, is(expectedSensor)); + try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) { + final Sensor sensor = ThreadMetrics.pollRecordsSensor(THREAD_ID, streamsMetrics); + streamsMetricsStaticMock.verify( + () -> StreamsMetricsImpl.addAvgAndMaxToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + operation, + avgDescription, + maxDescription + ) + ); + assertThat(sensor, is(expectedSensor)); + } } @Test @@ -181,26 +201,31 @@ public class ThreadMetricsTest { final String maxLatencyDescription = "The maximum poll latency"; when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor); when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); - StreamsMetricsImpl.addInvocationRateAndCountToSensor( - expectedSensor, - THREAD_LEVEL_GROUP, - tagMap, - operation, - rateDescription, - totalDescription - ); - StreamsMetricsImpl.addAvgAndMaxToSensor( - expectedSensor, - THREAD_LEVEL_GROUP, - tagMap, - operationLatency, - avgLatencyDescription, - maxLatencyDescription - ); - - final Sensor sensor = ThreadMetrics.pollSensor(THREAD_ID, streamsMetrics); - assertThat(sensor, is(expectedSensor)); + try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) { + final Sensor sensor = ThreadMetrics.pollSensor(THREAD_ID, streamsMetrics); + streamsMetricsStaticMock.verify( + () -> StreamsMetricsImpl.addInvocationRateAndCountToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + operation, + rateDescription, + totalDescription + ) + ); + streamsMetricsStaticMock.verify( + () -> StreamsMetricsImpl.addAvgAndMaxToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + operationLatency, + avgLatencyDescription, + maxLatencyDescription + ) + ); + assertThat(sensor, is(expectedSensor)); + } } @Test @@ -213,25 +238,31 @@ public class ThreadMetricsTest { final String maxLatencyDescription = "The maximum commit latency"; when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor); when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); - StreamsMetricsImpl.addInvocationRateAndCountToSensor( - expectedSensor, - THREAD_LEVEL_GROUP, - tagMap, - operation, - rateDescription, - totalDescription - ); - StreamsMetricsImpl.addAvgAndMaxToSensor( - expectedSensor, - THREAD_LEVEL_GROUP, - tagMap, - operationLatency, - avgLatencyDescription, - maxLatencyDescription); - - final Sensor sensor = ThreadMetrics.commitSensor(THREAD_ID, streamsMetrics); - assertThat(sensor, is(expectedSensor)); + try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) { + final Sensor sensor = ThreadMetrics.commitSensor(THREAD_ID, streamsMetrics); + streamsMetricsStaticMock.verify( + () -> StreamsMetricsImpl.addInvocationRateAndCountToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + operation, + rateDescription, + totalDescription + ) + ); + streamsMetricsStaticMock.verify( + () -> StreamsMetricsImpl.addAvgAndMaxToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + operationLatency, + avgLatencyDescription, + maxLatencyDescription + ) + ); + assertThat(sensor, is(expectedSensor)); + } } @Test @@ -240,17 +271,20 @@ public class ThreadMetricsTest { final String ratioDescription = "The fraction of time the thread spent on committing all tasks"; when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor); when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); - StreamsMetricsImpl.addValueMetricToSensor( - expectedSensor, - THREAD_LEVEL_GROUP, - tagMap, - operation, - ratioDescription - ); - final Sensor sensor = ThreadMetrics.commitRatioSensor(THREAD_ID, streamsMetrics); - - assertThat(sensor, is(expectedSensor)); + try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) { + final Sensor sensor = ThreadMetrics.commitRatioSensor(THREAD_ID, streamsMetrics); + streamsMetricsStaticMock.verify( + () -> StreamsMetricsImpl.addValueMetricToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + operation, + ratioDescription + ) + ); + assertThat(sensor, is(expectedSensor)); + } } @Test @@ -262,18 +296,21 @@ public class ThreadMetricsTest { "The average per-second number of calls to commit over all tasks assigned to one stream thread"; when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.DEBUG)).thenReturn(expectedSensor); when(streamsMetrics.taskLevelTagMap(THREAD_ID, ROLLUP_VALUE)).thenReturn(tagMap); - StreamsMetricsImpl.addInvocationRateAndCountToSensor( - expectedSensor, - TASK_LEVEL_GROUP, - tagMap, - operation, - rateDescription, - totalDescription - ); - final Sensor sensor = ThreadMetrics.commitOverTasksSensor(THREAD_ID, streamsMetrics); - - assertThat(sensor, is(expectedSensor)); + try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) { + final Sensor sensor = ThreadMetrics.commitOverTasksSensor(THREAD_ID, streamsMetrics); + streamsMetricsStaticMock.verify( + () -> StreamsMetricsImpl.addInvocationRateAndCountToSensor( + expectedSensor, + TASK_LEVEL_GROUP, + tagMap, + operation, + rateDescription, + totalDescription + ) + ); + assertThat(sensor, is(expectedSensor)); + } } @Test @@ -286,26 +323,31 @@ public class ThreadMetricsTest { final String maxLatencyDescription = "The maximum punctuate latency"; when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor); when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); - StreamsMetricsImpl.addInvocationRateAndCountToSensor( - expectedSensor, - THREAD_LEVEL_GROUP, - tagMap, - operation, - rateDescription, - totalDescription - ); - StreamsMetricsImpl.addAvgAndMaxToSensor( - expectedSensor, - THREAD_LEVEL_GROUP, - tagMap, - operationLatency, - avgLatencyDescription, - maxLatencyDescription - ); - final Sensor sensor = ThreadMetrics.punctuateSensor(THREAD_ID, streamsMetrics); - - assertThat(sensor, is(expectedSensor)); + try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) { + final Sensor sensor = ThreadMetrics.punctuateSensor(THREAD_ID, streamsMetrics); + streamsMetricsStaticMock.verify( + () -> StreamsMetricsImpl.addInvocationRateAndCountToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + operation, + rateDescription, + totalDescription + ) + ); + streamsMetricsStaticMock.verify( + () -> StreamsMetricsImpl.addAvgAndMaxToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + operationLatency, + avgLatencyDescription, + maxLatencyDescription + ) + ); + assertThat(sensor, is(expectedSensor)); + } } @Test @@ -314,39 +356,20 @@ public class ThreadMetricsTest { final String ratioDescription = "The fraction of time the thread spent on punctuating active tasks"; when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor); when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); - StreamsMetricsImpl.addValueMetricToSensor( - expectedSensor, - THREAD_LEVEL_GROUP, - tagMap, - operation, - ratioDescription - ); - - final Sensor sensor = ThreadMetrics.punctuateRatioSensor(THREAD_ID, streamsMetrics); - - assertThat(sensor, is(expectedSensor)); - } - - @Test - public void shouldGetSkipRecordSensor() { - final String operation = "skipped-records"; - final String totalDescription = "The total number of skipped records"; - final String rateDescription = "The average per-second number of skipped records"; - when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)) - .thenReturn(expectedSensor); - when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); - StreamsMetricsImpl.addInvocationRateAndCountToSensor( - expectedSensor, - THREAD_LEVEL_GROUP, - tagMap, - operation, - rateDescription, - totalDescription - ); - - final Sensor sensor = ThreadMetrics.skipRecordSensor(THREAD_ID, streamsMetrics); - assertThat(sensor, is(expectedSensor)); + try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) { + final Sensor sensor = ThreadMetrics.punctuateRatioSensor(THREAD_ID, streamsMetrics); + streamsMetricsStaticMock.verify( + () -> StreamsMetricsImpl.addValueMetricToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + operation, + ratioDescription + ) + ); + assertThat(sensor, is(expectedSensor)); + } } @Test @@ -357,19 +380,20 @@ public class ThreadMetricsTest { when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor); when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); - StreamsMetricsImpl.addInvocationRateAndCountToSensor( - expectedSensor, - THREAD_LEVEL_GROUP, - tagMap, - operation, - rateDescription, - totalDescription - ); - - - final Sensor sensor = ThreadMetrics.createTaskSensor(THREAD_ID, streamsMetrics); - - assertThat(sensor, is(expectedSensor)); + try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) { + final Sensor sensor = ThreadMetrics.createTaskSensor(THREAD_ID, streamsMetrics); + streamsMetricsStaticMock.verify( + () -> StreamsMetricsImpl.addInvocationRateAndCountToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + operation, + rateDescription, + totalDescription + ) + ); + assertThat(sensor, is(expectedSensor)); + } } @Test @@ -379,19 +403,21 @@ public class ThreadMetricsTest { final String rateDescription = "The average per-second number of closed tasks"; when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor); when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); - StreamsMetricsImpl.addInvocationRateAndCountToSensor( - expectedSensor, - THREAD_LEVEL_GROUP, - tagMap, - operation, - rateDescription, - totalDescription - ); - - - final Sensor sensor = ThreadMetrics.closeTaskSensor(THREAD_ID, streamsMetrics); - assertThat(sensor, is(expectedSensor)); + try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) { + final Sensor sensor = ThreadMetrics.closeTaskSensor(THREAD_ID, streamsMetrics); + streamsMetricsStaticMock.verify( + () -> StreamsMetricsImpl.addInvocationRateAndCountToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + operation, + rateDescription, + totalDescription + ) + ); + assertThat(sensor, is(expectedSensor)); + } } @Test