C0urante commented on code in PR #12728: URL: https://github.com/apache/kafka/pull/12728#discussion_r991507787
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -123,91 +134,91 @@ private enum SourceSink { private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy(); + private MockedStatic<Plugins> pluginsStatic; + + private MockedStatic<WorkerConnector> workerConnectorStatic; @Before public void setup() { - worker = PowerMock.createMock(Worker.class); - String[] methodNames = new String[]{"connectorTypeForClass"/*, "validateConnectorConfig"*/, "buildRestartPlan", "recordRestarting"}; - herder = PowerMock.createPartialMock(StandaloneHerder.class, methodNames, - worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new MemoryConfigBackingStore(transformer), noneConnectorClientConfigOverridePolicy); + worker = mock(Worker.class); + herder = mock(StandaloneHerder.class, withSettings() + .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new MemoryConfigBackingStore(transformer), noneConnectorClientConfigOverridePolicy) + .defaultAnswer(CALLS_REAL_METHODS)); createCallback = new FutureCallback<>(); - plugins = PowerMock.createMock(Plugins.class); - pluginLoader = PowerMock.createMock(PluginClassLoader.class); - delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class); - PowerMock.mockStatic(Plugins.class); - PowerMock.mockStatic(WorkerConnector.class); - Capture<Map<String, String>> configCapture = Capture.newInstance(); - EasyMock.expect(transformer.transform(eq(CONNECTOR_NAME), EasyMock.capture(configCapture))).andAnswer(configCapture::getValue).anyTimes(); + plugins = mock(Plugins.class); + pluginLoader = mock(PluginClassLoader.class); + delegatingLoader = mock(DelegatingClassLoader.class); + pluginsStatic = mockStatic(Plugins.class); + workerConnectorStatic = mockStatic(WorkerConnector.class); + final ArgumentCaptor<Map<String, String>> configCapture = ArgumentCaptor.forClass(Map.class); + when(transformer.transform(eq(CONNECTOR_NAME), configCapture.capture())).thenAnswer(invocation -> configCapture.getValue()); + } + + @After + public void tearDown() { + pluginsStatic.close(); + workerConnectorStatic.close(); Review Comment: Can we add a call to `verifyNoMoreInteractions(worker, statusBackingStore)` here? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -123,91 +134,91 @@ private enum SourceSink { private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy(); + private MockedStatic<Plugins> pluginsStatic; + + private MockedStatic<WorkerConnector> workerConnectorStatic; @Before public void setup() { - worker = PowerMock.createMock(Worker.class); - String[] methodNames = new String[]{"connectorTypeForClass"/*, "validateConnectorConfig"*/, "buildRestartPlan", "recordRestarting"}; - herder = PowerMock.createPartialMock(StandaloneHerder.class, methodNames, - worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new MemoryConfigBackingStore(transformer), noneConnectorClientConfigOverridePolicy); + worker = mock(Worker.class); + herder = mock(StandaloneHerder.class, withSettings() + .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new MemoryConfigBackingStore(transformer), noneConnectorClientConfigOverridePolicy) + .defaultAnswer(CALLS_REAL_METHODS)); createCallback = new FutureCallback<>(); - plugins = PowerMock.createMock(Plugins.class); - pluginLoader = PowerMock.createMock(PluginClassLoader.class); - delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class); - PowerMock.mockStatic(Plugins.class); - PowerMock.mockStatic(WorkerConnector.class); - Capture<Map<String, String>> configCapture = Capture.newInstance(); - EasyMock.expect(transformer.transform(eq(CONNECTOR_NAME), EasyMock.capture(configCapture))).andAnswer(configCapture::getValue).anyTimes(); + plugins = mock(Plugins.class); + pluginLoader = mock(PluginClassLoader.class); + delegatingLoader = mock(DelegatingClassLoader.class); + pluginsStatic = mockStatic(Plugins.class); + workerConnectorStatic = mockStatic(WorkerConnector.class); + final ArgumentCaptor<Map<String, String>> configCapture = ArgumentCaptor.forClass(Map.class); + when(transformer.transform(eq(CONNECTOR_NAME), configCapture.capture())).thenAnswer(invocation -> configCapture.getValue()); + } + + @After + public void tearDown() { + pluginsStatic.close(); + workerConnectorStatic.close(); } @Test public void testCreateSourceConnector() throws Exception { - connector = PowerMock.createMock(BogusSourceConnector.class); + connector = mock(BogusSourceConnector.class); expectAdd(SourceSink.SOURCE); Map<String, String> config = connectorConfig(SourceSink.SOURCE); - Connector connectorMock = PowerMock.createMock(SourceConnector.class); + Connector connectorMock = mock(SourceConnector.class); expectConfigValidation(connectorMock, true, config); - PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); - - PowerMock.verifyAll(); } @Test public void testCreateConnectorFailedValidation() { // Basic validation should be performed and return an error, but should still evaluate the connector's config - connector = PowerMock.createMock(BogusSourceConnector.class); + connector = mock(BogusSourceConnector.class); Map<String, String> config = connectorConfig(SourceSink.SOURCE); config.remove(ConnectorConfig.NAME_CONFIG); - Connector connectorMock = PowerMock.createMock(SourceConnector.class); - EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); - final Capture<Map<String, String>> configCapture = EasyMock.newCapture(); - EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); - EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); - EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); - EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); + Connector connectorMock = mock(SourceConnector.class); + when(worker.configTransformer()).thenReturn(transformer); + final ArgumentCaptor<Map<String, String>> configCapture = ArgumentCaptor.forClass(Map.class); + when(transformer.transform(configCapture.capture())).thenAnswer(invocation -> configCapture.getValue()); + when(worker.getPlugins()).thenReturn(plugins); + when(plugins.compareAndSwapLoaders(connectorMock)).thenReturn(delegatingLoader); + when(plugins.newConnector(anyString())).thenReturn(connectorMock); - EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef()); + when(connectorMock.config()).thenReturn(new ConfigDef()); ConfigValue validatedValue = new ConfigValue("foo.bar"); - EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(singletonList(validatedValue))); - EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader); - - PowerMock.replayAll(); + when(connectorMock.validate(config)).thenReturn(new Config(singletonList(validatedValue))); + when(Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(pluginLoader); Review Comment: This is mocking a static method. Any idea why these tests work even though we're not using the `pluginsStatic` field to set up these mocks? Does this need to be changed? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -319,35 +318,28 @@ public void testRestartConnectorNewTaskConfigs() throws Exception { Map<String, String> config = connectorConfig(SourceSink.SOURCE); ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); - Connector connectorMock = PowerMock.createMock(SourceConnector.class); + Connector connectorMock = mock(SourceConnector.class); expectConfigValidation(connectorMock, true, config); worker.stopAndAwaitConnector(CONNECTOR_NAME); - EasyMock.expectLastCall(); + doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME); Review Comment: These lines can be removed and replaced with a call to `verify(worker).stopAndAwaitConnector(CONNECTOR_NAME)` at the end of the test case. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -218,43 +229,37 @@ public void testCreateConnectorAlreadyExists() throws Throwable { herder.putConnectorConfig(CONNECTOR_NAME, config, false, failedCreateCallback); ExecutionException exception = assertThrows(ExecutionException.class, () -> failedCreateCallback.get(1000L, TimeUnit.SECONDS)); assertEquals(AlreadyExistsException.class, exception.getCause().getClass()); - PowerMock.verifyAll(); } @Test public void testCreateSinkConnector() throws Exception { - connector = PowerMock.createMock(BogusSinkConnector.class); + connector = mock(BogusSinkConnector.class); expectAdd(SourceSink.SINK); Map<String, String> config = connectorConfig(SourceSink.SINK); - Connector connectorMock = PowerMock.createMock(SinkConnector.class); + Connector connectorMock = mock(SinkConnector.class); expectConfigValidation(connectorMock, true, config); - PowerMock.replayAll(); herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result()); - - PowerMock.verifyAll(); } @Test public void testDestroyConnector() throws Exception { - connector = PowerMock.createMock(BogusSourceConnector.class); + connector = mock(BogusSourceConnector.class); expectAdd(SourceSink.SOURCE); Map<String, String> config = connectorConfig(SourceSink.SOURCE); - Connector connectorMock = PowerMock.createMock(SourceConnector.class); + Connector connectorMock = mock(SourceConnector.class); expectConfigValidation(connectorMock, true, config); - EasyMock.expect(statusBackingStore.getAll(CONNECTOR_NAME)).andReturn(Collections.emptyList()); + when(statusBackingStore.getAll(CONNECTOR_NAME)).thenReturn(Collections.emptyList()); statusBackingStore.put(new ConnectorStatus(CONNECTOR_NAME, AbstractStatus.State.DESTROYED, WORKER_ID, 0)); statusBackingStore.put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), TaskStatus.State.DESTROYED, WORKER_ID, 0)); expectDestroy(); Review Comment: Can this be moved further on in the test, right before the invocation on the herder that it's needed for? (I don't think we expect the connector to be destroyed when we invoke `herder::putConnectorConfig`.) ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -123,91 +134,91 @@ private enum SourceSink { private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy(); + private MockedStatic<Plugins> pluginsStatic; + + private MockedStatic<WorkerConnector> workerConnectorStatic; Review Comment: It doesn't look like this used in any test cases; can we remove it? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -319,35 +318,28 @@ public void testRestartConnectorNewTaskConfigs() throws Exception { Map<String, String> config = connectorConfig(SourceSink.SOURCE); ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); Review Comment: This can be removed. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -319,35 +318,28 @@ public void testRestartConnectorNewTaskConfigs() throws Exception { Map<String, String> config = connectorConfig(SourceSink.SOURCE); ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); - Connector connectorMock = PowerMock.createMock(SourceConnector.class); + Connector connectorMock = mock(SourceConnector.class); expectConfigValidation(connectorMock, true, config); worker.stopAndAwaitConnector(CONNECTOR_NAME); - EasyMock.expectLastCall(); + doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME); - Capture<Callback<TargetState>> onStart = EasyMock.newCapture(); - worker.startConnector(eq(CONNECTOR_NAME), eq(config), EasyMock.anyObject(HerderConnectorContext.class), - eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart)); - EasyMock.expectLastCall().andAnswer(() -> { + final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class); + doAnswer(invocation -> { onStart.getValue().onCompletion(null, TargetState.STARTED); return true; - }); + }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(config), any(HerderConnectorContext.class), + eq(herder), eq(TargetState.STARTED), onStart.capture()); - EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONNECTOR_NAME)); - EasyMock.expect(worker.getPlugins()).andReturn(plugins); + when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME)); + when(worker.getPlugins()).thenReturn(plugins); // changed task configs, expect a new set of tasks to be brought up (and the old ones to be stopped) Map<String, String> taskConfigs = taskConfig(SourceSink.SOURCE); taskConfigs.put("k", "v"); - EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, config, true))).andReturn(Collections.singletonList(taskConfigs)); + when(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, config, true))) + .thenReturn(Collections.singletonList(taskConfigs)); - worker.stopAndAwaitTasks(Collections.singletonList(taskId)); Review Comment: Don't we still need to verify that this call takes place? I think this test may need to be reordered so that mocking behavior is set up immediately before invoking methods on the herder instance; otherwise, we end up overwriting older mocks, which I believe is happening here. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -272,35 +277,31 @@ public void testDestroyConnector() throws Exception { } catch (ExecutionException e) { assertTrue(e.getCause() instanceof NotFoundException); } - - PowerMock.verifyAll(); } @Test public void testRestartConnectorSameTaskConfigs() throws Exception { expectAdd(SourceSink.SOURCE); Map<String, String> config = connectorConfig(SourceSink.SOURCE); - Connector connectorMock = PowerMock.createMock(SourceConnector.class); + Connector connectorMock = mock(SourceConnector.class); expectConfigValidation(connectorMock, true, config); worker.stopAndAwaitConnector(CONNECTOR_NAME); Review Comment: This line can be removed. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -218,43 +229,37 @@ public void testCreateConnectorAlreadyExists() throws Throwable { herder.putConnectorConfig(CONNECTOR_NAME, config, false, failedCreateCallback); ExecutionException exception = assertThrows(ExecutionException.class, () -> failedCreateCallback.get(1000L, TimeUnit.SECONDS)); assertEquals(AlreadyExistsException.class, exception.getCause().getClass()); - PowerMock.verifyAll(); } @Test public void testCreateSinkConnector() throws Exception { - connector = PowerMock.createMock(BogusSinkConnector.class); + connector = mock(BogusSinkConnector.class); expectAdd(SourceSink.SINK); Map<String, String> config = connectorConfig(SourceSink.SINK); - Connector connectorMock = PowerMock.createMock(SinkConnector.class); + Connector connectorMock = mock(SinkConnector.class); expectConfigValidation(connectorMock, true, config); - PowerMock.replayAll(); herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result()); - - PowerMock.verifyAll(); } @Test public void testDestroyConnector() throws Exception { - connector = PowerMock.createMock(BogusSourceConnector.class); + connector = mock(BogusSourceConnector.class); expectAdd(SourceSink.SOURCE); Map<String, String> config = connectorConfig(SourceSink.SOURCE); - Connector connectorMock = PowerMock.createMock(SourceConnector.class); + Connector connectorMock = mock(SourceConnector.class); expectConfigValidation(connectorMock, true, config); - EasyMock.expect(statusBackingStore.getAll(CONNECTOR_NAME)).andReturn(Collections.emptyList()); + when(statusBackingStore.getAll(CONNECTOR_NAME)).thenReturn(Collections.emptyList()); statusBackingStore.put(new ConnectorStatus(CONNECTOR_NAME, AbstractStatus.State.DESTROYED, WORKER_ID, 0)); statusBackingStore.put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), TaskStatus.State.DESTROYED, WORKER_ID, 0)); Review Comment: Why are we invoking these methods directly on our mock? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -272,35 +277,31 @@ public void testDestroyConnector() throws Exception { } catch (ExecutionException e) { assertTrue(e.getCause() instanceof NotFoundException); } - - PowerMock.verifyAll(); } @Test public void testRestartConnectorSameTaskConfigs() throws Exception { expectAdd(SourceSink.SOURCE); Map<String, String> config = connectorConfig(SourceSink.SOURCE); - Connector connectorMock = PowerMock.createMock(SourceConnector.class); + Connector connectorMock = mock(SourceConnector.class); expectConfigValidation(connectorMock, true, config); worker.stopAndAwaitConnector(CONNECTOR_NAME); - EasyMock.expectLastCall(); + verify(worker).stopAndAwaitConnector(CONNECTOR_NAME); Review Comment: This needs to be moved after the call to `herder::restartConnector` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org