dplavcic commented on code in PR #12459: URL: https://github.com/apache/kafka/pull/12459#discussion_r944710092
########## streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java: ########## @@ -279,449 +310,335 @@ public void shouldThrowIfDbToAddWasAlreadyAddedForOtherSegment() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); final Throwable exception = assertThrows( - IllegalStateException.class, - () -> recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd1, cacheToAdd2, statisticsToAdd2) + IllegalStateException.class, + () -> recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd1, cacheToAdd2, statisticsToAdd2) ); assertThat( - exception.getMessage(), - is("DB instance for store " + SEGMENT_STORE_NAME_2 + " of task " + TASK_ID1 + - " was already added for another segment as a value provider. This is a bug in Kafka Streams. " + - "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues") + exception.getMessage(), + is("DB instance for store " + SEGMENT_STORE_NAME_2 + " of task " + TASK_ID1 + + " was already added for another segment as a value provider. This is a bug in Kafka Streams. " + + "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues") ); } @Test public void shouldAddItselfToRecordingTriggerWhenFirstValueProvidersAreAddedToNewlyCreatedRecorder() { - recordingTrigger.addMetricsRecorder(recorder); - replay(recordingTrigger); - recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); - verify(recordingTrigger); + verify(recordingTrigger).addMetricsRecorder(recorder); } @Test public void shouldAddItselfToRecordingTriggerWhenFirstValueProvidersAreAddedAfterLastValueProvidersWereRemoved() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); recorder.removeValueProviders(SEGMENT_STORE_NAME_1); - reset(recordingTrigger); - recordingTrigger.addMetricsRecorder(recorder); - replay(recordingTrigger); + + Mockito.reset(recordingTrigger); recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2); - verify(recordingTrigger); + verify(recordingTrigger).addMetricsRecorder(recorder); } @Test public void shouldNotAddItselfToRecordingTriggerWhenNotEmpty2() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); - reset(recordingTrigger); - replay(recordingTrigger); + + verify(recordingTrigger).addMetricsRecorder(recorder); recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2); - verify(recordingTrigger); + verifyNoMoreInteractions(recordingTrigger); } @Test public void shouldCloseStatisticsWhenValueProvidersAreRemoved() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); - reset(statisticsToAdd1); - statisticsToAdd1.close(); - replay(statisticsToAdd1); - recorder.removeValueProviders(SEGMENT_STORE_NAME_1); - - verify(statisticsToAdd1); + verify(statisticsToAdd1).close(); } @Test public void shouldNotCloseStatisticsWhenValueProvidersWithoutStatisticsAreRemoved() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null); - reset(statisticsToAdd1); - replay(statisticsToAdd1); - recorder.removeValueProviders(SEGMENT_STORE_NAME_1); - - verify(statisticsToAdd1); + verify(statisticsToAdd1, never()).close(); } @Test public void shouldRemoveItselfFromRecordingTriggerWhenLastValueProvidersAreRemoved() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2); - reset(recordingTrigger); - replay(recordingTrigger); + + Mockito.reset(recordingTrigger); recorder.removeValueProviders(SEGMENT_STORE_NAME_1); - verify(recordingTrigger); + verify(recordingTrigger, never()).removeMetricsRecorder(recorder); - reset(recordingTrigger); - recordingTrigger.removeMetricsRecorder(recorder); - replay(recordingTrigger); + Mockito.reset(recordingTrigger); recorder.removeValueProviders(SEGMENT_STORE_NAME_2); - verify(recordingTrigger); + verify(recordingTrigger).removeMetricsRecorder(recorder); } @Test public void shouldThrowIfValueProvidersToRemoveNotFound() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); assertThrows( - IllegalStateException.class, - () -> recorder.removeValueProviders(SEGMENT_STORE_NAME_2) + IllegalStateException.class, + () -> recorder.removeValueProviders(SEGMENT_STORE_NAME_2) ); } @Test public void shouldRecordStatisticsBasedMetrics() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2); - reset(statisticsToAdd1); - reset(statisticsToAdd2); - - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).andReturn(1L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).andReturn(2L); - bytesWrittenToDatabaseSensor.record(1 + 2, 0L); - replay(bytesWrittenToDatabaseSensor); - - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_READ)).andReturn(2L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_READ)).andReturn(3L); - bytesReadFromDatabaseSensor.record(2 + 3, 0L); - replay(bytesReadFromDatabaseSensor); - - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).andReturn(3L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).andReturn(4L); - memtableBytesFlushedSensor.record(3 + 4, 0L); - replay(memtableBytesFlushedSensor); - - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).andReturn(1L); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).andReturn(2L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).andReturn(3L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).andReturn(4L); - memtableHitRatioSensor.record((double) 4 / (4 + 6), 0L); - replay(memtableHitRatioSensor); + final long now = 0L; + + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).thenReturn(1L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).thenReturn(2L); + doNothing().when(bytesWrittenToDatabaseSensor).record(1 + 2, now); + + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_READ)).thenReturn(2L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_READ)).thenReturn(3L); + doNothing().when(bytesReadFromDatabaseSensor).record(2 + 3, now); + + when(statisticsToAdd1.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).thenReturn(3L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).thenReturn(4L); + doNothing().when(memtableBytesFlushedSensor).record(3 + 4, now); + + when(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).thenReturn(1L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).thenReturn(2L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).thenReturn(3L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).thenReturn(4L); + doNothing().when(memtableHitRatioSensor).record((double) 4 / (4 + 6), now); final HistogramData memtableFlushTimeData1 = new HistogramData(0.0, 0.0, 0.0, 0.0, 0.0, 16.0, 2L, 10L, 3.0); final HistogramData memtableFlushTimeData2 = new HistogramData(0.0, 0.0, 0.0, 0.0, 0.0, 20.0, 4L, 8L, 10.0); - expect(statisticsToAdd1.getHistogramData(HistogramType.FLUSH_TIME)).andReturn(memtableFlushTimeData1); - expect(statisticsToAdd2.getHistogramData(HistogramType.FLUSH_TIME)).andReturn(memtableFlushTimeData2); - memtableAvgFlushTimeSensor.record((double) (10 + 8) / (2 + 4), 0L); - replay(memtableAvgFlushTimeSensor); - memtableMinFlushTimeSensor.record(3.0, 0L); - replay(memtableMinFlushTimeSensor); - memtableMaxFlushTimeSensor.record(20.0, 0L); - replay(memtableMaxFlushTimeSensor); - - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.STALL_MICROS)).andReturn(4L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.STALL_MICROS)).andReturn(5L); - writeStallDurationSensor.record(4 + 5, 0L); - replay(writeStallDurationSensor); - - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).andReturn(5L); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).andReturn(4L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).andReturn(3L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).andReturn(2L); - blockCacheDataHitRatioSensor.record((double) 8 / (8 + 6), 0L); - replay(blockCacheDataHitRatioSensor); - - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).andReturn(4L); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).andReturn(2L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).andReturn(2L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).andReturn(4L); - blockCacheIndexHitRatioSensor.record((double) 6 / (6 + 6), 0L); - replay(blockCacheIndexHitRatioSensor); - - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).andReturn(2L); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).andReturn(4L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).andReturn(3L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).andReturn(5L); - blockCacheFilterHitRatioSensor.record((double) 5 / (5 + 9), 0L); - replay(blockCacheFilterHitRatioSensor); - - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).andReturn(2L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).andReturn(4L); - bytesWrittenDuringCompactionSensor.record(2 + 4, 0L); - replay(bytesWrittenDuringCompactionSensor); - - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).andReturn(5L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).andReturn(6L); - bytesReadDuringCompactionSensor.record(5 + 6, 0L); - replay(bytesReadDuringCompactionSensor); + when(statisticsToAdd1.getHistogramData(HistogramType.FLUSH_TIME)).thenReturn(memtableFlushTimeData1); + when(statisticsToAdd2.getHistogramData(HistogramType.FLUSH_TIME)).thenReturn(memtableFlushTimeData2); + doNothing().when(memtableAvgFlushTimeSensor).record((double) (10 + 8) / (2 + 4), now); + doNothing().when(memtableMinFlushTimeSensor).record(3.0, now); + doNothing().when(memtableMaxFlushTimeSensor).record(20.0, now); + + when(statisticsToAdd1.getAndResetTickerCount(TickerType.STALL_MICROS)).thenReturn(4L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.STALL_MICROS)).thenReturn(5L); + doNothing().when(writeStallDurationSensor).record(4 + 5, now); + + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).thenReturn(5L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).thenReturn(4L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).thenReturn(3L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).thenReturn(2L); + doNothing().when(blockCacheDataHitRatioSensor).record((double) 8 / (8 + 6), now); + + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).thenReturn(4L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).thenReturn(2L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).thenReturn(2L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).thenReturn(4L); + doNothing().when(blockCacheIndexHitRatioSensor).record((double) 6 / (6 + 6), now); + + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).thenReturn(2L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).thenReturn(4L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).thenReturn(3L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).thenReturn(5L); + doNothing().when(blockCacheFilterHitRatioSensor).record((double) 5 / (5 + 9), now); + + when(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).thenReturn(2L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).thenReturn(4L); + doNothing().when(bytesWrittenDuringCompactionSensor).record(2 + 4, now); + + when(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).thenReturn(5L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).thenReturn(6L); + doNothing().when(bytesReadDuringCompactionSensor).record(5 + 6, now); final HistogramData compactionTimeData1 = new HistogramData(0.0, 0.0, 0.0, 0.0, 0.0, 16.0, 2L, 8L, 6.0); final HistogramData compactionTimeData2 = new HistogramData(0.0, 0.0, 0.0, 0.0, 0.0, 24.0, 2L, 8L, 4.0); - expect(statisticsToAdd1.getHistogramData(HistogramType.COMPACTION_TIME)).andReturn(compactionTimeData1); - expect(statisticsToAdd2.getHistogramData(HistogramType.COMPACTION_TIME)).andReturn(compactionTimeData2); - compactionTimeAvgSensor.record((double) (8 + 8) / (2 + 2), 0L); - replay(compactionTimeAvgSensor); - compactionTimeMinSensor.record(4.0, 0L); - replay(compactionTimeMinSensor); - compactionTimeMaxSensor.record(24.0, 0L); - replay(compactionTimeMaxSensor); - - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).andReturn(5L); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).andReturn(3L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).andReturn(7L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).andReturn(4L); - numberOfOpenFilesSensor.record((5 + 7) - (3 + 4), 0L); - replay(numberOfOpenFilesSensor); - - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).andReturn(34L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).andReturn(11L); - numberOfFileErrorsSensor.record(11 + 34, 0L); - replay(numberOfFileErrorsSensor); - - replay(statisticsToAdd1); - replay(statisticsToAdd2); - - recorder.record(0L); - - verify(statisticsToAdd1); - verify(statisticsToAdd2); - verify( - bytesWrittenToDatabaseSensor, - bytesReadFromDatabaseSensor, - memtableBytesFlushedSensor, - memtableHitRatioSensor, - memtableAvgFlushTimeSensor, - memtableMinFlushTimeSensor, - memtableMaxFlushTimeSensor, - writeStallDurationSensor, - blockCacheDataHitRatioSensor, - blockCacheIndexHitRatioSensor, - blockCacheFilterHitRatioSensor, - bytesWrittenDuringCompactionSensor, - bytesReadDuringCompactionSensor, - compactionTimeAvgSensor, - compactionTimeMinSensor, - compactionTimeMaxSensor, - numberOfOpenFilesSensor, - numberOfFileErrorsSensor - ); + when(statisticsToAdd1.getHistogramData(HistogramType.COMPACTION_TIME)).thenReturn(compactionTimeData1); + when(statisticsToAdd2.getHistogramData(HistogramType.COMPACTION_TIME)).thenReturn(compactionTimeData2); + doNothing().when(compactionTimeAvgSensor).record((double) (8 + 8) / (2 + 2), now); + doNothing().when(compactionTimeMinSensor).record(4.0, now); + doNothing().when(compactionTimeMaxSensor).record(24.0, now); + + when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(5L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(3L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(7L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(4L); + doNothing().when(numberOfOpenFilesSensor).record((5 + 7) - (3 + 4), now); + + when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(34L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(11L); + doNothing().when(numberOfFileErrorsSensor).record(11 + 34, now); + + recorder.record(now); + + verify(statisticsToAdd1, times(17)).getAndResetTickerCount(isA(TickerType.class)); + verify(statisticsToAdd2, times(17)).getAndResetTickerCount(isA(TickerType.class)); + verify(statisticsToAdd2, times(2)).getHistogramData(isA(HistogramType.class)); + verify(statisticsToAdd2, times(2)).getHistogramData(isA(HistogramType.class)); Review Comment: @cadonna, @divijvaidya, what do you think we just update this test method with "improved" verification, as @cadonna described [here](https://github.com/apache/kafka/pull/12459/files/e4e7549e229b2687f8491842a2352ecaf2563e5f#r944335669), and we create a new ticket/PR to split this test to multiple smaller tests (which would be cool)? IMHO, that way we keep this PR short and simple and in the scope of the "Replace EasyMock and PowerMock with Mockito.." task 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org