C0urante commented on code in PR #12728:
URL: https://github.com/apache/kafka/pull/12728#discussion_r1376416990


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -528,72 +483,61 @@ public void testRestartConnectorAndTasksNoStatus() throws 
Exception {
         ExecutionException ee = assertThrows(ExecutionException.class, () -> 
restartCallback.get(1000L, TimeUnit.MILLISECONDS));
         assertTrue(ee.getCause() instanceof NotFoundException);
         assertTrue(ee.getMessage().contains("Status for connector"));
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testRestartConnectorAndTasksNoRestarts() throws Exception {
         RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, 
false, true);
-        RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class);
-        ConnectorStateInfo connectorStateInfo = 
PowerMock.createMock(ConnectorStateInfo.class);
-        
EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(false).anyTimes();
-        
EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(false).anyTimes();
-        
EasyMock.expect(restartPlan.restartConnectorStateInfo()).andReturn(connectorStateInfo).anyTimes();
-        EasyMock.expect(herder.buildRestartPlan(restartRequest))
-                .andReturn(Optional.of(restartPlan)).anyTimes();
-
-        connector = PowerMock.createMock(BogusSinkConnector.class);
+        RestartPlan restartPlan = mock(RestartPlan.class);
+        ConnectorStateInfo connectorStateInfo = mock(ConnectorStateInfo.class);
+        when(restartPlan.shouldRestartConnector()).thenReturn(false);
+        when(restartPlan.shouldRestartTasks()).thenReturn(false);
+        
when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo);
+        
doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest);
+
+        connector = mock(BogusSinkConnector.class);
         expectAdd(SourceSink.SINK);
 
         Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK);
-        Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+        Connector connectorMock = mock(SinkConnector.class);
         expectConfigValidation(connectorMock, true, connectorConfig);
 
-        PowerMock.replayAll();
-
         herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, 
createCallback);
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
         assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result());
 
         FutureCallback<ConnectorStateInfo> restartCallback = new 
FutureCallback<>();
         herder.restartConnectorAndTasks(restartRequest, restartCallback);
         assertEquals(connectorStateInfo, restartCallback.get(1000L, 
TimeUnit.MILLISECONDS));
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testRestartConnectorAndTasksOnlyConnector() throws Exception {
         RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, 
false, true);
-        RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class);
-        ConnectorStateInfo connectorStateInfo = 
PowerMock.createMock(ConnectorStateInfo.class);
-        
EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(true).anyTimes();
-        
EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(false).anyTimes();
-        
EasyMock.expect(restartPlan.restartConnectorStateInfo()).andReturn(connectorStateInfo).anyTimes();
-        EasyMock.expect(herder.buildRestartPlan(restartRequest))
-                .andReturn(Optional.of(restartPlan)).anyTimes();
+        RestartPlan restartPlan = mock(RestartPlan.class);
+        ConnectorStateInfo connectorStateInfo = mock(ConnectorStateInfo.class);
+        when(restartPlan.shouldRestartConnector()).thenReturn(true);
+        when(restartPlan.shouldRestartTasks()).thenReturn(false);
+        
when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo);
+        
doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest);
 
         herder.onRestart(CONNECTOR_NAME);

Review Comment:
   We can remove this.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -602,36 +546,37 @@ public void testRestartConnectorAndTasksOnlyConnector() 
throws Exception {
         FutureCallback<ConnectorStateInfo> restartCallback = new 
FutureCallback<>();
         herder.restartConnectorAndTasks(restartRequest, restartCallback);
         assertEquals(connectorStateInfo, restartCallback.get(1000L, 
TimeUnit.MILLISECONDS));
-        PowerMock.verifyAll();
+
+        verifyConnectorStatusRestart();
     }
 
     @Test
     public void testRestartConnectorAndTasksOnlyTasks() throws Exception {
         ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
         RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, 
false, true);
-        RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class);
-        ConnectorStateInfo connectorStateInfo = 
PowerMock.createMock(ConnectorStateInfo.class);
-        
EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(false).anyTimes();
-        
EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(true).anyTimes();
-        
EasyMock.expect(restartPlan.restartTaskCount()).andReturn(1).anyTimes();
-        EasyMock.expect(restartPlan.totalTaskCount()).andReturn(1).anyTimes();
-        
EasyMock.expect(restartPlan.taskIdsToRestart()).andReturn(Collections.singletonList(taskId)).anyTimes();
-        
EasyMock.expect(restartPlan.restartConnectorStateInfo()).andReturn(connectorStateInfo).anyTimes();
-        EasyMock.expect(herder.buildRestartPlan(restartRequest))
-                .andReturn(Optional.of(restartPlan)).anyTimes();
+        RestartPlan restartPlan = mock(RestartPlan.class);
+        ConnectorStateInfo connectorStateInfo = mock(ConnectorStateInfo.class);
+        when(restartPlan.shouldRestartConnector()).thenReturn(false);
+        when(restartPlan.shouldRestartTasks()).thenReturn(true);
+        when(restartPlan.restartTaskCount()).thenReturn(1);
+        when(restartPlan.totalTaskCount()).thenReturn(1);
+        
when(restartPlan.taskIdsToRestart()).thenReturn(Collections.singletonList(taskId));
+        
when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo);
+        
doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest);
 
         herder.onRestart(taskId);
-        EasyMock.expectLastCall();
+        verify(statusBackingStore).put(new TaskStatus(new 
ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.RESTARTING, WORKER_ID, 
0));
 
-        connector = PowerMock.createMock(BogusSinkConnector.class);
+        doNothing().when(herder).onRestart(taskId);

Review Comment:
   We can remove this.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -127,94 +134,86 @@ private enum SourceSink {
         noneConnectorClientConfigOverridePolicy = new 
SampleConnectorClientConfigOverridePolicy();
 
     @Before
-    public void setup() {
-        worker = PowerMock.createMock(Worker.class);
-        String[] methodNames = new String[]{"connectorType", 
"buildRestartPlan", "recordRestarting"};
-        herder = PowerMock.createPartialMock(StandaloneHerder.class, 
methodNames,
-                worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new 
MemoryConfigBackingStore(transformer), noneConnectorClientConfigOverridePolicy, 
new MockTime());
+    public void setup() throws ExecutionException, InterruptedException {
+        worker = mock(Worker.class);
+        herder = mock(StandaloneHerder.class, withSettings()
+            .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID, 
statusBackingStore, new MemoryConfigBackingStore(transformer), 
noneConnectorClientConfigOverridePolicy, new MockTime())
+            .defaultAnswer(CALLS_REAL_METHODS));
         createCallback = new FutureCallback<>();
-        plugins = PowerMock.createMock(Plugins.class);
-        pluginLoader = PowerMock.createMock(PluginClassLoader.class);
-        loaderSwap = PowerMock.createMock(LoaderSwap.class);
-        PowerMock.mockStatic(WorkerConnector.class);
-        Capture<Map<String, String>> configCapture = Capture.newInstance();
-        EasyMock.expect(transformer.transform(eq(CONNECTOR_NAME), 
EasyMock.capture(configCapture))).andAnswer(configCapture::getValue).anyTimes();
+        plugins = mock(Plugins.class);
+        pluginLoader = mock(PluginClassLoader.class);
+        loaderSwap = mock(LoaderSwap.class);
+        final ArgumentCaptor<Map<String, String>> configCapture = 
ArgumentCaptor.forClass(Map.class);
+        when(transformer.transform(eq(CONNECTOR_NAME), 
configCapture.capture())).thenAnswer(invocation -> configCapture.getValue());
+    }
+
+    @After
+    public void tearDown() {
+        verifyNoMoreInteractions(worker, statusBackingStore);
     }
 
     @Test
     public void testCreateSourceConnector() throws Exception {
-        connector = PowerMock.createMock(BogusSourceConnector.class);
+        connector = mock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        Connector connectorMock = mock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, config);
 
-        PowerMock.replayAll();
-
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testCreateConnectorFailedValidation() {
         // Basic validation should be performed and return an error, but 
should still evaluate the connector's config
-        connector = PowerMock.createMock(BogusSourceConnector.class);
+        connector = mock(BogusSourceConnector.class);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
         config.remove(ConnectorConfig.NAME_CONFIG);
 
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
-        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
-        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
-        
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
-        
EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader);
-        
EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap);
+        Connector connectorMock = mock(SourceConnector.class);
+        when(worker.configTransformer()).thenReturn(transformer);
+        final ArgumentCaptor<Map<String, String>> configCapture = 
ArgumentCaptor.forClass(Map.class);
+        
when(transformer.transform(configCapture.capture())).thenAnswer(invocation -> 
configCapture.getValue());
+        when(worker.getPlugins()).thenReturn(plugins);
+        when(plugins.newConnector(anyString())).thenReturn(connectorMock);
+        when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
+        when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
 
-        EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef());
+        when(connectorMock.config()).thenReturn(new ConfigDef());
 
         ConfigValue validatedValue = new ConfigValue("foo.bar");
-        EasyMock.expect(connectorMock.validate(config)).andReturn(new 
Config(singletonList(validatedValue)));
+        when(connectorMock.validate(config)).thenReturn(new 
Config(singletonList(validatedValue)));
         loaderSwap.close();
-        EasyMock.expectLastCall();
-
-        PowerMock.replayAll();
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
 
         ExecutionException exception = assertThrows(ExecutionException.class, 
() -> createCallback.get(1000L, TimeUnit.SECONDS));
         if (BadRequestException.class != exception.getCause().getClass()) {
             throw new AssertionError(exception.getCause());
         }
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testCreateConnectorAlreadyExists() throws Throwable {
-        connector = PowerMock.createMock(BogusSourceConnector.class);
+        connector = mock(BogusSourceConnector.class);
         // First addition should succeed
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        Connector connectorMock = mock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, config, config);
 
-        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
-        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
-        
EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader);
-        
EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap);
+        when(worker.configTransformer()).thenReturn(transformer);
+        final ArgumentCaptor<Map<String, String>> configCapture = 
ArgumentCaptor.forClass(Map.class);
+        
when(transformer.transform(configCapture.capture())).thenAnswer(invocation -> 
configCapture.getValue());
+        when(worker.getPlugins()).thenReturn(plugins);
+        when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
+        when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);

Review Comment:
   Isn't all of this already handled by `expectConfigValidation`?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -656,48 +600,40 @@ public void testRestartConnectorAndTasksOnlyTasks() 
throws Exception {
         FutureCallback<ConnectorStateInfo> restartCallback = new 
FutureCallback<>();
         herder.restartConnectorAndTasks(restartRequest, restartCallback);
         assertEquals(connectorStateInfo, restartCallback.get(1000L, 
TimeUnit.MILLISECONDS));

Review Comment:
   After this part, we can add a similar assertion to 
`verifyConnectorStatusRestart`, but that operates on task statuses instead.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -1225,22 +1090,21 @@ private void expectConfigValidation(
             Map<String, String>... configs
     ) {
         // config validation
-        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
-        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
-        
EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader);
-        
EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap);
+        when(worker.configTransformer()).thenReturn(transformer);
+        final ArgumentCaptor<Map<String, String>> configCapture = 
ArgumentCaptor.forClass(Map.class);
+        
when(transformer.transform(configCapture.capture())).thenAnswer(invocation -> 
configCapture.getValue());
+        when(worker.getPlugins()).thenReturn(plugins);
+        when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
+        when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
         if (shouldCreateConnector) {
-            EasyMock.expect(worker.getPlugins()).andReturn(plugins);
-            
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+            when(worker.getPlugins()).thenReturn(plugins);
+            when(plugins.newConnector(anyString())).thenReturn(connectorMock);
         }
-        EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef());
+        when(connectorMock.config()).thenReturn(new ConfigDef());
 
         for (Map<String, String> config : configs)
-            EasyMock.expect(connectorMock.validate(config)).andReturn(new 
Config(Collections.emptyList()));
+            when(connectorMock.validate(config)).thenReturn(new 
Config(Collections.emptyList()));
         loaderSwap.close();

Review Comment:
   Why is this here? Shouldn't we remove it?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -1256,4 +1120,11 @@ private abstract class BogusSinkConnector extends 
SinkConnector {
     private abstract class BogusSinkTask extends SourceTask {
     }
 
+    private void verifyConnectorStatusRestart() {
+        ArgumentCaptor<ConnectorStatus> connectorStatus = 
ArgumentCaptor.forClass(ConnectorStatus.class);
+        verify(statusBackingStore, 
atLeastOnce()).put(connectorStatus.capture());

Review Comment:
   We can remove the `atLeastOnce()` from this part (which gives us stronger 
testing guarantees).



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -281,45 +274,38 @@ public void testDestroyConnector() throws Exception {
         } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof NotFoundException);
         }

Review Comment:
   Nit: while we're in the neighborhood, we can also update things to use 
`assertThrows`:
   ```java
   ExecutionException e = assertThrows(
           "Should have thrown wrapped NotFoundException",
           ExecutionException.class,
           () -> failedDeleteCallback.get(1000L, TimeUnit.MILLISECONDS)
   );
   assertTrue(e.getCause() instanceof NotFoundException);
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -127,94 +134,86 @@ private enum SourceSink {
         noneConnectorClientConfigOverridePolicy = new 
SampleConnectorClientConfigOverridePolicy();
 
     @Before
-    public void setup() {
-        worker = PowerMock.createMock(Worker.class);
-        String[] methodNames = new String[]{"connectorType", 
"buildRestartPlan", "recordRestarting"};
-        herder = PowerMock.createPartialMock(StandaloneHerder.class, 
methodNames,
-                worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new 
MemoryConfigBackingStore(transformer), noneConnectorClientConfigOverridePolicy, 
new MockTime());
+    public void setup() throws ExecutionException, InterruptedException {
+        worker = mock(Worker.class);
+        herder = mock(StandaloneHerder.class, withSettings()
+            .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID, 
statusBackingStore, new MemoryConfigBackingStore(transformer), 
noneConnectorClientConfigOverridePolicy, new MockTime())
+            .defaultAnswer(CALLS_REAL_METHODS));
         createCallback = new FutureCallback<>();
-        plugins = PowerMock.createMock(Plugins.class);
-        pluginLoader = PowerMock.createMock(PluginClassLoader.class);
-        loaderSwap = PowerMock.createMock(LoaderSwap.class);
-        PowerMock.mockStatic(WorkerConnector.class);
-        Capture<Map<String, String>> configCapture = Capture.newInstance();
-        EasyMock.expect(transformer.transform(eq(CONNECTOR_NAME), 
EasyMock.capture(configCapture))).andAnswer(configCapture::getValue).anyTimes();
+        plugins = mock(Plugins.class);
+        pluginLoader = mock(PluginClassLoader.class);
+        loaderSwap = mock(LoaderSwap.class);
+        final ArgumentCaptor<Map<String, String>> configCapture = 
ArgumentCaptor.forClass(Map.class);
+        when(transformer.transform(eq(CONNECTOR_NAME), 
configCapture.capture())).thenAnswer(invocation -> configCapture.getValue());
+    }
+
+    @After
+    public void tearDown() {
+        verifyNoMoreInteractions(worker, statusBackingStore);
     }
 
     @Test
     public void testCreateSourceConnector() throws Exception {
-        connector = PowerMock.createMock(BogusSourceConnector.class);
+        connector = mock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        Connector connectorMock = mock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, config);
 
-        PowerMock.replayAll();
-
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testCreateConnectorFailedValidation() {
         // Basic validation should be performed and return an error, but 
should still evaluate the connector's config
-        connector = PowerMock.createMock(BogusSourceConnector.class);
+        connector = mock(BogusSourceConnector.class);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
         config.remove(ConnectorConfig.NAME_CONFIG);
 
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
-        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
-        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
-        
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
-        
EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader);
-        
EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap);
+        Connector connectorMock = mock(SourceConnector.class);
+        when(worker.configTransformer()).thenReturn(transformer);
+        final ArgumentCaptor<Map<String, String>> configCapture = 
ArgumentCaptor.forClass(Map.class);
+        
when(transformer.transform(configCapture.capture())).thenAnswer(invocation -> 
configCapture.getValue());
+        when(worker.getPlugins()).thenReturn(plugins);
+        when(plugins.newConnector(anyString())).thenReturn(connectorMock);
+        when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
+        when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
 
-        EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef());
+        when(connectorMock.config()).thenReturn(new ConfigDef());
 
         ConfigValue validatedValue = new ConfigValue("foo.bar");
-        EasyMock.expect(connectorMock.validate(config)).andReturn(new 
Config(singletonList(validatedValue)));
+        when(connectorMock.validate(config)).thenReturn(new 
Config(singletonList(validatedValue)));
         loaderSwap.close();

Review Comment:
   Why is this here? Should it be verified after the test (i.e., 
`verify(loaderSwap).close();`) instead?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -127,94 +134,86 @@ private enum SourceSink {
         noneConnectorClientConfigOverridePolicy = new 
SampleConnectorClientConfigOverridePolicy();
 
     @Before
-    public void setup() {
-        worker = PowerMock.createMock(Worker.class);
-        String[] methodNames = new String[]{"connectorType", 
"buildRestartPlan", "recordRestarting"};
-        herder = PowerMock.createPartialMock(StandaloneHerder.class, 
methodNames,
-                worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new 
MemoryConfigBackingStore(transformer), noneConnectorClientConfigOverridePolicy, 
new MockTime());
+    public void setup() throws ExecutionException, InterruptedException {
+        worker = mock(Worker.class);
+        herder = mock(StandaloneHerder.class, withSettings()
+            .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID, 
statusBackingStore, new MemoryConfigBackingStore(transformer), 
noneConnectorClientConfigOverridePolicy, new MockTime())
+            .defaultAnswer(CALLS_REAL_METHODS));
         createCallback = new FutureCallback<>();
-        plugins = PowerMock.createMock(Plugins.class);
-        pluginLoader = PowerMock.createMock(PluginClassLoader.class);
-        loaderSwap = PowerMock.createMock(LoaderSwap.class);
-        PowerMock.mockStatic(WorkerConnector.class);
-        Capture<Map<String, String>> configCapture = Capture.newInstance();
-        EasyMock.expect(transformer.transform(eq(CONNECTOR_NAME), 
EasyMock.capture(configCapture))).andAnswer(configCapture::getValue).anyTimes();
+        plugins = mock(Plugins.class);
+        pluginLoader = mock(PluginClassLoader.class);
+        loaderSwap = mock(LoaderSwap.class);
+        final ArgumentCaptor<Map<String, String>> configCapture = 
ArgumentCaptor.forClass(Map.class);
+        when(transformer.transform(eq(CONNECTOR_NAME), 
configCapture.capture())).thenAnswer(invocation -> configCapture.getValue());
+    }
+
+    @After
+    public void tearDown() {
+        verifyNoMoreInteractions(worker, statusBackingStore);
     }
 
     @Test
     public void testCreateSourceConnector() throws Exception {
-        connector = PowerMock.createMock(BogusSourceConnector.class);
+        connector = mock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        Connector connectorMock = mock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, config);
 
-        PowerMock.replayAll();
-
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testCreateConnectorFailedValidation() {
         // Basic validation should be performed and return an error, but 
should still evaluate the connector's config
-        connector = PowerMock.createMock(BogusSourceConnector.class);
+        connector = mock(BogusSourceConnector.class);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
         config.remove(ConnectorConfig.NAME_CONFIG);
 
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
-        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
-        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
-        
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
-        
EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader);
-        
EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap);
+        Connector connectorMock = mock(SourceConnector.class);
+        when(worker.configTransformer()).thenReturn(transformer);
+        final ArgumentCaptor<Map<String, String>> configCapture = 
ArgumentCaptor.forClass(Map.class);
+        
when(transformer.transform(configCapture.capture())).thenAnswer(invocation -> 
configCapture.getValue());
+        when(worker.getPlugins()).thenReturn(plugins);
+        when(plugins.newConnector(anyString())).thenReturn(connectorMock);
+        when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
+        when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
 
-        EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef());
+        when(connectorMock.config()).thenReturn(new ConfigDef());
 
         ConfigValue validatedValue = new ConfigValue("foo.bar");
-        EasyMock.expect(connectorMock.validate(config)).andReturn(new 
Config(singletonList(validatedValue)));
+        when(connectorMock.validate(config)).thenReturn(new 
Config(singletonList(validatedValue)));
         loaderSwap.close();
-        EasyMock.expectLastCall();
-
-        PowerMock.replayAll();
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
 
         ExecutionException exception = assertThrows(ExecutionException.class, 
() -> createCallback.get(1000L, TimeUnit.SECONDS));
         if (BadRequestException.class != exception.getCause().getClass()) {
             throw new AssertionError(exception.getCause());
         }
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testCreateConnectorAlreadyExists() throws Throwable {
-        connector = PowerMock.createMock(BogusSourceConnector.class);
+        connector = mock(BogusSourceConnector.class);
         // First addition should succeed
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        Connector connectorMock = mock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, config, config);
 
-        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
-        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
-        
EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader);
-        
EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap);
+        when(worker.configTransformer()).thenReturn(transformer);
+        final ArgumentCaptor<Map<String, String>> configCapture = 
ArgumentCaptor.forClass(Map.class);
+        
when(transformer.transform(configCapture.capture())).thenAnswer(invocation -> 
configCapture.getValue());
+        when(worker.getPlugins()).thenReturn(plugins);
+        when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
+        when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
         // No new connector is created
-        loaderSwap.close();
-        EasyMock.expectLastCall();

Review Comment:
   Shouldn't we verify this at the end of the test? Or, better yet, verify once 
after the first call to `Herder::putConnectorConfig` and again (with 
`times(2)`) after the second call?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -281,45 +274,38 @@ public void testDestroyConnector() throws Exception {
         } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof NotFoundException);
         }
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testRestartConnectorSameTaskConfigs() throws Exception {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        Connector connectorMock = mock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, config);
 
-        worker.stopAndAwaitConnector(CONNECTOR_NAME);
-        EasyMock.expectLastCall();
-
-        Capture<Callback<TargetState>> onStart = EasyMock.newCapture();
-        worker.startConnector(eq(CONNECTOR_NAME), eq(config), 
EasyMock.anyObject(HerderConnectorContext.class),
-            eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart));
-        EasyMock.expectLastCall().andAnswer(() -> {
+        final ArgumentCaptor<Callback<TargetState>> onStart = 
ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
             onStart.getValue().onCompletion(null, TargetState.STARTED);
             return true;
-        });
+        }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(config), 
any(HerderConnectorContext.class),
+            eq(herder), eq(TargetState.STARTED), onStart.capture());
 
-        
EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONNECTOR_NAME));
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
+        
when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME));
+        when(worker.getPlugins()).thenReturn(plugins);
         // same task configs as earlier, so don't expect a new set of tasks to 
be brought up
-        EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, new 
SourceConnectorConfig(plugins, config, 
true))).andReturn(Collections.singletonList(taskConfig(SourceSink.SOURCE)));
-
-        PowerMock.replayAll();
+        when(worker.connectorTaskConfigs(CONNECTOR_NAME, new 
SourceConnectorConfig(plugins, config, true)))
+            
.thenReturn(Collections.singletonList(taskConfig(SourceSink.SOURCE)));
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
         FutureCallback<Void> restartCallback = new FutureCallback<>();
         herder.restartConnector(CONNECTOR_NAME, restartCallback);
+        worker.stopAndAwaitConnector(CONNECTOR_NAME);
         restartCallback.get(1000L, TimeUnit.MILLISECONDS);
-
-        PowerMock.verifyAll();
+        verify(worker, 
atLeastOnce()).stopAndAwaitConnector(eq(CONNECTOR_NAME));

Review Comment:
   We can remove the `atLeastOnce()` from this verification.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -127,94 +134,86 @@ private enum SourceSink {
         noneConnectorClientConfigOverridePolicy = new 
SampleConnectorClientConfigOverridePolicy();
 
     @Before
-    public void setup() {
-        worker = PowerMock.createMock(Worker.class);
-        String[] methodNames = new String[]{"connectorType", 
"buildRestartPlan", "recordRestarting"};
-        herder = PowerMock.createPartialMock(StandaloneHerder.class, 
methodNames,
-                worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new 
MemoryConfigBackingStore(transformer), noneConnectorClientConfigOverridePolicy, 
new MockTime());
+    public void setup() throws ExecutionException, InterruptedException {
+        worker = mock(Worker.class);
+        herder = mock(StandaloneHerder.class, withSettings()
+            .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID, 
statusBackingStore, new MemoryConfigBackingStore(transformer), 
noneConnectorClientConfigOverridePolicy, new MockTime())
+            .defaultAnswer(CALLS_REAL_METHODS));
         createCallback = new FutureCallback<>();
-        plugins = PowerMock.createMock(Plugins.class);
-        pluginLoader = PowerMock.createMock(PluginClassLoader.class);
-        loaderSwap = PowerMock.createMock(LoaderSwap.class);
-        PowerMock.mockStatic(WorkerConnector.class);
-        Capture<Map<String, String>> configCapture = Capture.newInstance();
-        EasyMock.expect(transformer.transform(eq(CONNECTOR_NAME), 
EasyMock.capture(configCapture))).andAnswer(configCapture::getValue).anyTimes();
+        plugins = mock(Plugins.class);
+        pluginLoader = mock(PluginClassLoader.class);
+        loaderSwap = mock(LoaderSwap.class);
+        final ArgumentCaptor<Map<String, String>> configCapture = 
ArgumentCaptor.forClass(Map.class);
+        when(transformer.transform(eq(CONNECTOR_NAME), 
configCapture.capture())).thenAnswer(invocation -> configCapture.getValue());
+    }
+
+    @After
+    public void tearDown() {
+        verifyNoMoreInteractions(worker, statusBackingStore);
     }
 
     @Test
     public void testCreateSourceConnector() throws Exception {
-        connector = PowerMock.createMock(BogusSourceConnector.class);
+        connector = mock(BogusSourceConnector.class);
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        Connector connectorMock = mock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, config);
 
-        PowerMock.replayAll();
-
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testCreateConnectorFailedValidation() {
         // Basic validation should be performed and return an error, but 
should still evaluate the connector's config
-        connector = PowerMock.createMock(BogusSourceConnector.class);
+        connector = mock(BogusSourceConnector.class);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
         config.remove(ConnectorConfig.NAME_CONFIG);
 
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
-        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
-        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
-        
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
-        
EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader);
-        
EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap);
+        Connector connectorMock = mock(SourceConnector.class);
+        when(worker.configTransformer()).thenReturn(transformer);
+        final ArgumentCaptor<Map<String, String>> configCapture = 
ArgumentCaptor.forClass(Map.class);
+        
when(transformer.transform(configCapture.capture())).thenAnswer(invocation -> 
configCapture.getValue());
+        when(worker.getPlugins()).thenReturn(plugins);
+        when(plugins.newConnector(anyString())).thenReturn(connectorMock);
+        when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
+        when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
 
-        EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef());
+        when(connectorMock.config()).thenReturn(new ConfigDef());
 
         ConfigValue validatedValue = new ConfigValue("foo.bar");
-        EasyMock.expect(connectorMock.validate(config)).andReturn(new 
Config(singletonList(validatedValue)));
+        when(connectorMock.validate(config)).thenReturn(new 
Config(singletonList(validatedValue)));
         loaderSwap.close();
-        EasyMock.expectLastCall();
-
-        PowerMock.replayAll();
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
 
         ExecutionException exception = assertThrows(ExecutionException.class, 
() -> createCallback.get(1000L, TimeUnit.SECONDS));
         if (BadRequestException.class != exception.getCause().getClass()) {
             throw new AssertionError(exception.getCause());
         }
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testCreateConnectorAlreadyExists() throws Throwable {
-        connector = PowerMock.createMock(BogusSourceConnector.class);
+        connector = mock(BogusSourceConnector.class);
         // First addition should succeed
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        Connector connectorMock = mock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, config, config);
 
-        
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = 
EasyMock.newCapture();
-        
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
-        
EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader);
-        
EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap);
+        when(worker.configTransformer()).thenReturn(transformer);
+        final ArgumentCaptor<Map<String, String>> configCapture = 
ArgumentCaptor.forClass(Map.class);
+        
when(transformer.transform(configCapture.capture())).thenAnswer(invocation -> 
configCapture.getValue());
+        when(worker.getPlugins()).thenReturn(plugins);
+        when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
+        when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
         // No new connector is created

Review Comment:
   This comment doesn't really make sense in this place anymore. Maybe move it 
to after `// Second should fail`?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -281,45 +274,38 @@ public void testDestroyConnector() throws Exception {
         } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof NotFoundException);
         }
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testRestartConnectorSameTaskConfigs() throws Exception {
         expectAdd(SourceSink.SOURCE);
 
         Map<String, String> config = connectorConfig(SourceSink.SOURCE);
-        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        Connector connectorMock = mock(SourceConnector.class);
         expectConfigValidation(connectorMock, true, config);
 
-        worker.stopAndAwaitConnector(CONNECTOR_NAME);
-        EasyMock.expectLastCall();
-
-        Capture<Callback<TargetState>> onStart = EasyMock.newCapture();
-        worker.startConnector(eq(CONNECTOR_NAME), eq(config), 
EasyMock.anyObject(HerderConnectorContext.class),
-            eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart));
-        EasyMock.expectLastCall().andAnswer(() -> {
+        final ArgumentCaptor<Callback<TargetState>> onStart = 
ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
             onStart.getValue().onCompletion(null, TargetState.STARTED);
             return true;
-        });
+        }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(config), 
any(HerderConnectorContext.class),
+            eq(herder), eq(TargetState.STARTED), onStart.capture());
 
-        
EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONNECTOR_NAME));
-        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
+        
when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME));
+        when(worker.getPlugins()).thenReturn(plugins);
         // same task configs as earlier, so don't expect a new set of tasks to 
be brought up
-        EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, new 
SourceConnectorConfig(plugins, config, 
true))).andReturn(Collections.singletonList(taskConfig(SourceSink.SOURCE)));
-
-        PowerMock.replayAll();
+        when(worker.connectorTaskConfigs(CONNECTOR_NAME, new 
SourceConnectorConfig(plugins, config, true)))
+            
.thenReturn(Collections.singletonList(taskConfig(SourceSink.SOURCE)));
 
         herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
         Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
         FutureCallback<Void> restartCallback = new FutureCallback<>();
         herder.restartConnector(CONNECTOR_NAME, restartCallback);
+        worker.stopAndAwaitConnector(CONNECTOR_NAME);

Review Comment:
   This can be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to