cadonna commented on code in PR #13269: URL: https://github.com/apache/kafka/pull/13269#discussion_r1157024108
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ########## @@ -2484,6 +2511,48 @@ public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() { ); } + @Test + public void shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled() { + final ProcessorStateManager processorStateManager = mockStateManager(); + recordCollector = mock(RecordCollectorImpl.class); + doReturn(singletonMap(partition1, 1L)).when(recordCollector).offsets(); + + task = createStatefulTask(createConfig(AT_LEAST_ONCE, "100"), true, processorStateManager); + task.initializeIfNeeded(); + task.completeRestoration(noOpResetter -> { }); + verify(processorStateManager, times(1)).checkpoint(); + verify(recordCollector, times(1)).offsets(); Review Comment: `times(1)` is default and can be omitted. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ########## @@ -2484,6 +2511,48 @@ public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() { ); } + @Test + public void shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled() { + final ProcessorStateManager processorStateManager = mockStateManager(); + recordCollector = mock(RecordCollectorImpl.class); + doReturn(singletonMap(partition1, 1L)).when(recordCollector).offsets(); + + task = createStatefulTask(createConfig(AT_LEAST_ONCE, "100"), true, processorStateManager); + task.initializeIfNeeded(); + task.completeRestoration(noOpResetter -> { }); + verify(processorStateManager, times(1)).checkpoint(); + verify(recordCollector, times(1)).offsets(); Review Comment: `verify(recordCollector, times(1)).offsets();` is not strictly needed. The important thing in this test is that the checkpoint is written. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ########## @@ -2484,6 +2511,48 @@ public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() { ); } + @Test + public void shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled() { + final ProcessorStateManager processorStateManager = mockStateManager(); Review Comment: ```suggestion final ProcessorStateManager processorStateManager = mock(ProcessorStateManager.class); when(processorStateManager.taskType()).thenReturn(TaskType.ACTIVE); when(processorStateManager.taskId()).thenReturn(taskId); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ########## @@ -44,6 +44,7 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TopologyConfig; Review Comment: That is fine! ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ########## @@ -2484,6 +2511,48 @@ public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() { ); } + @Test + public void shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled() { + final ProcessorStateManager processorStateManager = mockStateManager(); + recordCollector = mock(RecordCollectorImpl.class); + doReturn(singletonMap(partition1, 1L)).when(recordCollector).offsets(); + + task = createStatefulTask(createConfig(AT_LEAST_ONCE, "100"), true, processorStateManager); + task.initializeIfNeeded(); + task.completeRestoration(noOpResetter -> { }); + verify(processorStateManager, times(1)).checkpoint(); + verify(recordCollector, times(1)).offsets(); + } + + @Test + public void shouldNotCheckpointAfterRestorationWhenExactlyOnceEnabled() { + final ProcessorStateManager processorStateManager = mockStateManager(); + recordCollector = mock(RecordCollectorImpl.class); + + task = createStatefulTask(createConfig(EXACTLY_ONCE_V2, "100"), true, processorStateManager); + task.initializeIfNeeded(); + task.completeRestoration(noOpResetter -> { }); + verify(processorStateManager, never()).checkpoint(); + verify(processorStateManager, never()).changelogOffsets(); + verify(recordCollector, never()).offsets(); + } + + private ProcessorStateManager mockStateManager() { + final ProcessorStateManager manager = spy(new ProcessorStateManager( Review Comment: I am afraid I cannot follow here. With Mockito you can define strict stubbings. That means Mockito will throw an exception if you have defined stubs that are not used. To use strict mocks, we usually specify ```java @MockitoSettings(strictness = Strictness.STRICT_STUBS) ``` on the test class level. Here, this does not work because I think (I am not sure) we use ```java @RunWith(EasyMockRunner.class) ``` What works is using ```java @Before public void setup() { mockito = Mockito.mockitoSession() .initMocks(this) .strictness(Strictness.STRICT_STUBS) .startMocking(); ... ``` as first call in the setup method and ```java @After public void cleanup() throws IOException { ... mockito.finishMocking(); } ``` as last call in the teardown method. With this the unit test becomes: ```java @Test public void shouldNotCheckpointAfterRestorationWhenExactlyOnceEnabled() { final ProcessorStateManager processorStateManager = mock(ProcessorStateManager.class); when(processorStateManager.taskType()).thenReturn(TaskType.ACTIVE); when(processorStateManager.taskId()).thenReturn(taskId); recordCollector = mock(RecordCollectorImpl.class); task = createStatefulTask(createConfig(EXACTLY_ONCE_V2, "100"), true, processorStateManager); task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); verify(processorStateManager, never()).checkpoint(); verify(processorStateManager, never()).changelogOffsets(); verify(recordCollector, never()).offsets(); } ``` If you like you could extract this part into a method and reuse it in both unit tests (that is basically what you originally did, but w\o spy): ```java final ProcessorStateManager processorStateManager = mock(ProcessorStateManager.class); when(processorStateManager.taskType()).thenReturn(TaskType.ACTIVE); when(processorStateManager.taskId()).thenReturn(taskId); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ########## @@ -2484,6 +2511,48 @@ public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() { ); } + @Test + public void shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled() { + final ProcessorStateManager processorStateManager = mockStateManager(); + recordCollector = mock(RecordCollectorImpl.class); + doReturn(singletonMap(partition1, 1L)).when(recordCollector).offsets(); Review Comment: This is not needed since the checkpoint is forced. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org