cadonna commented on code in PR #13269:
URL: https://github.com/apache/kafka/pull/13269#discussion_r1157024108


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2484,6 +2511,48 @@ public void 
shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
         );
     }
 
+    @Test
+    public void shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled() {
+        final ProcessorStateManager processorStateManager = mockStateManager();
+        recordCollector = mock(RecordCollectorImpl.class);
+        doReturn(singletonMap(partition1, 1L)).when(recordCollector).offsets();
+
+        task = createStatefulTask(createConfig(AT_LEAST_ONCE, "100"), true, 
processorStateManager);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+        verify(processorStateManager, times(1)).checkpoint();
+        verify(recordCollector, times(1)).offsets();

Review Comment:
   `times(1)` is default and can be omitted.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2484,6 +2511,48 @@ public void 
shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
         );
     }
 
+    @Test
+    public void shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled() {
+        final ProcessorStateManager processorStateManager = mockStateManager();
+        recordCollector = mock(RecordCollectorImpl.class);
+        doReturn(singletonMap(partition1, 1L)).when(recordCollector).offsets();
+
+        task = createStatefulTask(createConfig(AT_LEAST_ONCE, "100"), true, 
processorStateManager);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+        verify(processorStateManager, times(1)).checkpoint();
+        verify(recordCollector, times(1)).offsets();

Review Comment:
   `verify(recordCollector, times(1)).offsets();` is not strictly needed. The 
important thing in this test is that the checkpoint is written.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2484,6 +2511,48 @@ public void 
shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
         );
     }
 
+    @Test
+    public void shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled() {
+        final ProcessorStateManager processorStateManager = mockStateManager();

Review Comment:
   ```suggestion
           final ProcessorStateManager processorStateManager = 
mock(ProcessorStateManager.class);
           when(processorStateManager.taskType()).thenReturn(TaskType.ACTIVE);
           when(processorStateManager.taskId()).thenReturn(taskId);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -44,6 +44,7 @@
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyConfig;

Review Comment:
   That is fine!



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2484,6 +2511,48 @@ public void 
shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
         );
     }
 
+    @Test
+    public void shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled() {
+        final ProcessorStateManager processorStateManager = mockStateManager();
+        recordCollector = mock(RecordCollectorImpl.class);
+        doReturn(singletonMap(partition1, 1L)).when(recordCollector).offsets();
+
+        task = createStatefulTask(createConfig(AT_LEAST_ONCE, "100"), true, 
processorStateManager);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+        verify(processorStateManager, times(1)).checkpoint();
+        verify(recordCollector, times(1)).offsets();
+    }
+
+    @Test
+    public void shouldNotCheckpointAfterRestorationWhenExactlyOnceEnabled() {
+        final ProcessorStateManager processorStateManager = mockStateManager();
+        recordCollector = mock(RecordCollectorImpl.class);
+
+        task = createStatefulTask(createConfig(EXACTLY_ONCE_V2, "100"), true, 
processorStateManager);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+        verify(processorStateManager, never()).checkpoint();
+        verify(processorStateManager, never()).changelogOffsets();
+        verify(recordCollector, never()).offsets();
+    }
+
+    private ProcessorStateManager mockStateManager() {
+        final ProcessorStateManager manager = spy(new ProcessorStateManager(

Review Comment:
   I am afraid I cannot follow here. With Mockito you can define strict 
stubbings. That means Mockito will throw an exception if you have defined stubs 
that are not used. To use strict mocks, we usually specify 
   ```java
   @MockitoSettings(strictness = Strictness.STRICT_STUBS)
   ```
   on the test class level. Here, this does not work because I think (I am not 
sure) we use
   ```java
   @RunWith(EasyMockRunner.class)
   ```    
   What works is using
   ```java
       @Before
       public void setup() {
           mockito = Mockito.mockitoSession()
               .initMocks(this)
               .strictness(Strictness.STRICT_STUBS)
               .startMocking();
   ...
   ```
   as first call in the setup method and 
   ```java
       @After
       public void cleanup() throws IOException {
           ...
           mockito.finishMocking();
       }
   ```
   as last call in the teardown method.
   
   With this the unit test becomes:
   ```java
       @Test
       public void shouldNotCheckpointAfterRestorationWhenExactlyOnceEnabled() {
           final ProcessorStateManager processorStateManager = 
mock(ProcessorStateManager.class);
           when(processorStateManager.taskType()).thenReturn(TaskType.ACTIVE);
           when(processorStateManager.taskId()).thenReturn(taskId);
           recordCollector = mock(RecordCollectorImpl.class);
   
           task = createStatefulTask(createConfig(EXACTLY_ONCE_V2, "100"), 
true, processorStateManager);
           task.initializeIfNeeded();
           task.completeRestoration(noOpResetter -> { });
           verify(processorStateManager, never()).checkpoint();
           verify(processorStateManager, never()).changelogOffsets();
           verify(recordCollector, never()).offsets();
       }
   ```
   
   If you like you could extract this part into a method and reuse it in both 
unit tests (that is basically what you originally did, but w\o spy):
   ```java
           final ProcessorStateManager processorStateManager = 
mock(ProcessorStateManager.class);
           when(processorStateManager.taskType()).thenReturn(TaskType.ACTIVE);
           when(processorStateManager.taskId()).thenReturn(taskId);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2484,6 +2511,48 @@ public void 
shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
         );
     }
 
+    @Test
+    public void shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled() {
+        final ProcessorStateManager processorStateManager = mockStateManager();
+        recordCollector = mock(RecordCollectorImpl.class);
+        doReturn(singletonMap(partition1, 1L)).when(recordCollector).offsets();

Review Comment:
   This is not needed since the checkpoint is forced.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to