clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1014084531


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +139,92 @@ public void 
testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager 
failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to 
close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, 
stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state 
store.
-        
expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, 
TaskType.ACTIVE);
 
-        ctrl.verify();
+        verify(stateManager).close();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void  shouldStillWipeStateStoresIfCloseThrowsException() throws 
IOException {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        expect(stateManager.baseDir()).andReturn(randomFile);
+        doThrow(new ProcessorStateException("Close 
failed")).when(stateManager).close();
 
-        Utils.delete(randomFile);
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", 
false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, 
true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void 
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws 
IOException {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
-
-        expect(stateManager.baseDir()).andReturn(unknownFile);
 
-        Utils.delete(unknownFile);
-        expectLastCall().andThrow(new IOException("Deletion failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateManager.baseDir()).thenReturn(unknownFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            utils.when(() -> Utils.delete(unknownFile)).thenThrow(new 
IOException("Deletion failed"));
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+            final ProcessorStateException thrown = assertThrows(
+                    ProcessorStateException.class, () -> 
StateManagerUtil.closeStateManager(logger,
+                            "logPrefix:", false, true, stateManager, 
stateDirectory, TaskType.ACTIVE));
 
-        replayAll();
+            assertEquals(IOException.class, thrown.getCause().getClass());
+        }
 
-        final ProcessorStateException thrown = assertThrows(
-            ProcessorStateException.class, () -> 
StateManagerUtil.closeStateManager(logger,
-                "logPrefix:", false, true, stateManager, stateDirectory, 
TaskType.ACTIVE));
-
-        assertEquals(IOException.class, thrown.getCause().getClass());
-
-        ctrl.verify();
+        verify(stateManager).close();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(false);
-
-        stateManager.close();
-        expectLastCall().andThrow(new AssertionError("Should not be trying to 
close state you don't own!"));
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
+    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(false);
 
         StateManagerUtil.closeStateManager(
-            logger, "logPrefix:", true, false, stateManager, stateDirectory, 
TaskType.ACTIVE);
-    }
-
-    @Test
-    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() throws 
IOException {

Review Comment:
   This test appears to be just wrong, unless I am reading the code 
incorrectly. Because we use 
`expect(stateDirectory.lock(taskId)).andReturn(false);` we never reach the code 
path which would be using `Utils.delete`. As such, I just removed the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to