cadonna commented on code in PR #12459:
URL: https://github.com/apache/kafka/pull/12459#discussion_r952332188


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java:
##########
@@ -279,449 +310,335 @@ public void 
shouldThrowIfDbToAddWasAlreadyAddedForOtherSegment() {
         recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, statisticsToAdd1);
 
         final Throwable exception = assertThrows(
-            IllegalStateException.class,
-            () -> recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd1, 
cacheToAdd2, statisticsToAdd2)
+                IllegalStateException.class,
+                () -> recorder.addValueProviders(SEGMENT_STORE_NAME_2, 
dbToAdd1, cacheToAdd2, statisticsToAdd2)
         );
         assertThat(
-            exception.getMessage(),
-            is("DB instance for store " + SEGMENT_STORE_NAME_2 + " of task " + 
TASK_ID1 +
-                " was already added for another segment as a value provider. 
This is a bug in Kafka Streams. " +
-                "Please open a bug report under 
https://issues.apache.org/jira/projects/KAFKA/issues";)
+                exception.getMessage(),
+                is("DB instance for store " + SEGMENT_STORE_NAME_2 + " of task 
" + TASK_ID1 +
+                        " was already added for another segment as a value 
provider. This is a bug in Kafka Streams. " +
+                        "Please open a bug report under 
https://issues.apache.org/jira/projects/KAFKA/issues";)
         );
     }
 
     @Test
     public void 
shouldAddItselfToRecordingTriggerWhenFirstValueProvidersAreAddedToNewlyCreatedRecorder()
 {
-        recordingTrigger.addMetricsRecorder(recorder);
-        replay(recordingTrigger);
-
         recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, statisticsToAdd1);
 
-        verify(recordingTrigger);
+        verify(recordingTrigger).addMetricsRecorder(recorder);
     }
 
     @Test
     public void 
shouldAddItselfToRecordingTriggerWhenFirstValueProvidersAreAddedAfterLastValueProvidersWereRemoved()
 {
         recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, statisticsToAdd1);
         recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
-        reset(recordingTrigger);
-        recordingTrigger.addMetricsRecorder(recorder);
-        replay(recordingTrigger);
+
+        Mockito.reset(recordingTrigger);
 
         recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, 
cacheToAdd2, statisticsToAdd2);
 
-        verify(recordingTrigger);
+        verify(recordingTrigger).addMetricsRecorder(recorder);
     }
 
     @Test
     public void shouldNotAddItselfToRecordingTriggerWhenNotEmpty2() {
         recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, statisticsToAdd1);
-        reset(recordingTrigger);
-        replay(recordingTrigger);
+
+        verify(recordingTrigger).addMetricsRecorder(recorder);
 
         recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, 
cacheToAdd2, statisticsToAdd2);
 
-        verify(recordingTrigger);
+        verifyNoMoreInteractions(recordingTrigger);
     }
 
     @Test
     public void shouldCloseStatisticsWhenValueProvidersAreRemoved() {
         recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, statisticsToAdd1);
-        reset(statisticsToAdd1);
-        statisticsToAdd1.close();
-        replay(statisticsToAdd1);
-
         recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
-
-        verify(statisticsToAdd1);
+        verify(statisticsToAdd1).close();
     }
 
     @Test
     public void 
shouldNotCloseStatisticsWhenValueProvidersWithoutStatisticsAreRemoved() {
         recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, null);
-        reset(statisticsToAdd1);
-        replay(statisticsToAdd1);
-
         recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
-
-        verify(statisticsToAdd1);
+        verify(statisticsToAdd1, never()).close();
     }
 
     @Test
     public void 
shouldRemoveItselfFromRecordingTriggerWhenLastValueProvidersAreRemoved() {
         recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, statisticsToAdd1);
         recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, 
cacheToAdd2, statisticsToAdd2);
-        reset(recordingTrigger);
-        replay(recordingTrigger);
+
+        Mockito.reset(recordingTrigger);
 
         recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
 
-        verify(recordingTrigger);
+        verify(recordingTrigger, never()).removeMetricsRecorder(recorder);
 
-        reset(recordingTrigger);
-        recordingTrigger.removeMetricsRecorder(recorder);
-        replay(recordingTrigger);
+        Mockito.reset(recordingTrigger);
 
         recorder.removeValueProviders(SEGMENT_STORE_NAME_2);
 
-        verify(recordingTrigger);
+        verify(recordingTrigger).removeMetricsRecorder(recorder);
     }
 
     @Test
     public void shouldThrowIfValueProvidersToRemoveNotFound() {
         recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, statisticsToAdd1);
 
         assertThrows(
-            IllegalStateException.class,
-            () -> recorder.removeValueProviders(SEGMENT_STORE_NAME_2)
+                IllegalStateException.class,
+                () -> recorder.removeValueProviders(SEGMENT_STORE_NAME_2)
         );
     }
 
     @Test
     public void shouldRecordStatisticsBasedMetrics() {
         recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, statisticsToAdd1);
         recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, 
cacheToAdd2, statisticsToAdd2);
-        reset(statisticsToAdd1);
-        reset(statisticsToAdd2);
-
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).andReturn(1L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).andReturn(2L);
-        bytesWrittenToDatabaseSensor.record(1 + 2, 0L);
-        replay(bytesWrittenToDatabaseSensor);
-
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_READ)).andReturn(2L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_READ)).andReturn(3L);
-        bytesReadFromDatabaseSensor.record(2 + 3, 0L);
-        replay(bytesReadFromDatabaseSensor);
-
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).andReturn(3L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).andReturn(4L);
-        memtableBytesFlushedSensor.record(3 + 4, 0L);
-        replay(memtableBytesFlushedSensor);
-
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).andReturn(1L);
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).andReturn(2L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).andReturn(3L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).andReturn(4L);
-        memtableHitRatioSensor.record((double) 4 / (4 + 6), 0L);
-        replay(memtableHitRatioSensor);
+        final long now = 0L;
+
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).thenReturn(1L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).thenReturn(2L);
+        doNothing().when(bytesWrittenToDatabaseSensor).record(1 + 2, now);
+
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_READ)).thenReturn(2L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_READ)).thenReturn(3L);
+        doNothing().when(bytesReadFromDatabaseSensor).record(2 + 3, now);
+
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).thenReturn(3L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).thenReturn(4L);
+        doNothing().when(memtableBytesFlushedSensor).record(3 + 4, now);
+
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).thenReturn(1L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).thenReturn(2L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).thenReturn(3L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).thenReturn(4L);
+        doNothing().when(memtableHitRatioSensor).record((double) 4 / (4 + 6), 
now);
 
         final HistogramData memtableFlushTimeData1 = new HistogramData(0.0, 
0.0, 0.0, 0.0, 0.0, 16.0, 2L, 10L, 3.0);
         final HistogramData memtableFlushTimeData2 = new HistogramData(0.0, 
0.0, 0.0, 0.0, 0.0, 20.0, 4L, 8L, 10.0);
-        
expect(statisticsToAdd1.getHistogramData(HistogramType.FLUSH_TIME)).andReturn(memtableFlushTimeData1);
-        
expect(statisticsToAdd2.getHistogramData(HistogramType.FLUSH_TIME)).andReturn(memtableFlushTimeData2);
-        memtableAvgFlushTimeSensor.record((double) (10 + 8) / (2 + 4), 0L);
-        replay(memtableAvgFlushTimeSensor);
-        memtableMinFlushTimeSensor.record(3.0, 0L);
-        replay(memtableMinFlushTimeSensor);
-        memtableMaxFlushTimeSensor.record(20.0, 0L);
-        replay(memtableMaxFlushTimeSensor);
-
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.STALL_MICROS)).andReturn(4L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.STALL_MICROS)).andReturn(5L);
-        writeStallDurationSensor.record(4 + 5, 0L);
-        replay(writeStallDurationSensor);
-
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).andReturn(5L);
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).andReturn(4L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).andReturn(3L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).andReturn(2L);
-        blockCacheDataHitRatioSensor.record((double) 8 / (8 + 6), 0L);
-        replay(blockCacheDataHitRatioSensor);
-
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).andReturn(4L);
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).andReturn(2L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).andReturn(2L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).andReturn(4L);
-        blockCacheIndexHitRatioSensor.record((double) 6 / (6 + 6), 0L);
-        replay(blockCacheIndexHitRatioSensor);
-
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).andReturn(2L);
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).andReturn(4L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).andReturn(3L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).andReturn(5L);
-        blockCacheFilterHitRatioSensor.record((double) 5 / (5 + 9), 0L);
-        replay(blockCacheFilterHitRatioSensor);
-
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).andReturn(2L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).andReturn(4L);
-        bytesWrittenDuringCompactionSensor.record(2 + 4, 0L);
-        replay(bytesWrittenDuringCompactionSensor);
-
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).andReturn(5L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).andReturn(6L);
-        bytesReadDuringCompactionSensor.record(5 + 6, 0L);
-        replay(bytesReadDuringCompactionSensor);
+        
when(statisticsToAdd1.getHistogramData(HistogramType.FLUSH_TIME)).thenReturn(memtableFlushTimeData1);
+        
when(statisticsToAdd2.getHistogramData(HistogramType.FLUSH_TIME)).thenReturn(memtableFlushTimeData2);
+        doNothing().when(memtableAvgFlushTimeSensor).record((double) (10 + 8) 
/ (2 + 4), now);
+        doNothing().when(memtableMinFlushTimeSensor).record(3.0, now);
+        doNothing().when(memtableMaxFlushTimeSensor).record(20.0, now);
+
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.STALL_MICROS)).thenReturn(4L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.STALL_MICROS)).thenReturn(5L);
+        doNothing().when(writeStallDurationSensor).record(4 + 5, now);
+
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).thenReturn(5L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).thenReturn(4L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).thenReturn(3L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).thenReturn(2L);
+        doNothing().when(blockCacheDataHitRatioSensor).record((double) 8 / (8 
+ 6), now);
+
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).thenReturn(4L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).thenReturn(2L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).thenReturn(2L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).thenReturn(4L);
+        doNothing().when(blockCacheIndexHitRatioSensor).record((double) 6 / (6 
+ 6), now);
+
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).thenReturn(2L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).thenReturn(4L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).thenReturn(3L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).thenReturn(5L);
+        doNothing().when(blockCacheFilterHitRatioSensor).record((double) 5 / 
(5 + 9), now);
+
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).thenReturn(2L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).thenReturn(4L);
+        doNothing().when(bytesWrittenDuringCompactionSensor).record(2 + 4, 
now);
+
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).thenReturn(5L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).thenReturn(6L);
+        doNothing().when(bytesReadDuringCompactionSensor).record(5 + 6, now);
 
         final HistogramData compactionTimeData1 = new HistogramData(0.0, 0.0, 
0.0, 0.0, 0.0, 16.0, 2L, 8L, 6.0);
         final HistogramData compactionTimeData2 = new HistogramData(0.0, 0.0, 
0.0, 0.0, 0.0, 24.0, 2L, 8L, 4.0);
-        
expect(statisticsToAdd1.getHistogramData(HistogramType.COMPACTION_TIME)).andReturn(compactionTimeData1);
-        
expect(statisticsToAdd2.getHistogramData(HistogramType.COMPACTION_TIME)).andReturn(compactionTimeData2);
-        compactionTimeAvgSensor.record((double) (8 + 8) / (2 + 2), 0L);
-        replay(compactionTimeAvgSensor);
-        compactionTimeMinSensor.record(4.0, 0L);
-        replay(compactionTimeMinSensor);
-        compactionTimeMaxSensor.record(24.0, 0L);
-        replay(compactionTimeMaxSensor);
-
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).andReturn(5L);
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).andReturn(3L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).andReturn(7L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).andReturn(4L);
-        numberOfOpenFilesSensor.record((5 + 7) - (3 + 4), 0L);
-        replay(numberOfOpenFilesSensor);
-
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).andReturn(34L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).andReturn(11L);
-        numberOfFileErrorsSensor.record(11 + 34, 0L);
-        replay(numberOfFileErrorsSensor);
-
-        replay(statisticsToAdd1);
-        replay(statisticsToAdd2);
-
-        recorder.record(0L);
-
-        verify(statisticsToAdd1);
-        verify(statisticsToAdd2);
-        verify(
-            bytesWrittenToDatabaseSensor,
-            bytesReadFromDatabaseSensor,
-            memtableBytesFlushedSensor,
-            memtableHitRatioSensor,
-            memtableAvgFlushTimeSensor,
-            memtableMinFlushTimeSensor,
-            memtableMaxFlushTimeSensor,
-            writeStallDurationSensor,
-            blockCacheDataHitRatioSensor,
-            blockCacheIndexHitRatioSensor,
-            blockCacheFilterHitRatioSensor,
-            bytesWrittenDuringCompactionSensor,
-            bytesReadDuringCompactionSensor,
-            compactionTimeAvgSensor,
-            compactionTimeMinSensor,
-            compactionTimeMaxSensor,
-            numberOfOpenFilesSensor,
-            numberOfFileErrorsSensor
-        );
+        
when(statisticsToAdd1.getHistogramData(HistogramType.COMPACTION_TIME)).thenReturn(compactionTimeData1);
+        
when(statisticsToAdd2.getHistogramData(HistogramType.COMPACTION_TIME)).thenReturn(compactionTimeData2);
+        doNothing().when(compactionTimeAvgSensor).record((double) (8 + 8) / (2 
+ 2), now);
+        doNothing().when(compactionTimeMinSensor).record(4.0, now);
+        doNothing().when(compactionTimeMaxSensor).record(24.0, now);
+
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(5L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(3L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(7L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(4L);
+        doNothing().when(numberOfOpenFilesSensor).record((5 + 7) - (3 + 4), 
now);
+
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(34L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(11L);
+        doNothing().when(numberOfFileErrorsSensor).record(11 + 34, now);
+
+        recorder.record(now);
+
+        verify(statisticsToAdd1, 
times(17)).getAndResetTickerCount(isA(TickerType.class));
+        verify(statisticsToAdd2, 
times(17)).getAndResetTickerCount(isA(TickerType.class));
+        verify(statisticsToAdd2, 
times(2)).getHistogramData(isA(HistogramType.class));
+        verify(statisticsToAdd2, 
times(2)).getHistogramData(isA(HistogramType.class));

Review Comment:
   I think I agree with you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to