C0urante commented on code in PR #12728:
URL: https://github.com/apache/kafka/pull/12728#discussion_r1376416990
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -528,72 +483,61 @@ public void testRestartConnectorAndTasksNoStatus() throws
Exception {
ExecutionException ee = assertThrows(ExecutionException.class, () ->
restartCallback.get(1000L, TimeUnit.MILLISECONDS));
assertTrue(ee.getCause() instanceof NotFoundException);
assertTrue(ee.getMessage().contains("Status for connector"));
- PowerMock.verifyAll();
}
@Test
public void testRestartConnectorAndTasksNoRestarts() throws Exception {
RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME,
false, true);
- RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class);
- ConnectorStateInfo connectorStateInfo =
PowerMock.createMock(ConnectorStateInfo.class);
-
EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(false).anyTimes();
-
EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(false).anyTimes();
-
EasyMock.expect(restartPlan.restartConnectorStateInfo()).andReturn(connectorStateInfo).anyTimes();
- EasyMock.expect(herder.buildRestartPlan(restartRequest))
- .andReturn(Optional.of(restartPlan)).anyTimes();
-
- connector = PowerMock.createMock(BogusSinkConnector.class);
+ RestartPlan restartPlan = mock(RestartPlan.class);
+ ConnectorStateInfo connectorStateInfo = mock(ConnectorStateInfo.class);
+ when(restartPlan.shouldRestartConnector()).thenReturn(false);
+ when(restartPlan.shouldRestartTasks()).thenReturn(false);
+
when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo);
+
doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest);
+
+ connector = mock(BogusSinkConnector.class);
expectAdd(SourceSink.SINK);
Map<String, String> connectorConfig = connectorConfig(SourceSink.SINK);
- Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+ Connector connectorMock = mock(SinkConnector.class);
expectConfigValidation(connectorMock, true, connectorConfig);
- PowerMock.replayAll();
-
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false,
createCallback);
Herder.Created<ConnectorInfo> connectorInfo =
createCallback.get(1000L, TimeUnit.SECONDS);
assertEquals(createdInfo(SourceSink.SINK), connectorInfo.result());
FutureCallback<ConnectorStateInfo> restartCallback = new
FutureCallback<>();
herder.restartConnectorAndTasks(restartRequest, restartCallback);
assertEquals(connectorStateInfo, restartCallback.get(1000L,
TimeUnit.MILLISECONDS));
- PowerMock.verifyAll();
}
@Test
public void testRestartConnectorAndTasksOnlyConnector() throws Exception {
RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME,
false, true);
- RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class);
- ConnectorStateInfo connectorStateInfo =
PowerMock.createMock(ConnectorStateInfo.class);
-
EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(true).anyTimes();
-
EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(false).anyTimes();
-
EasyMock.expect(restartPlan.restartConnectorStateInfo()).andReturn(connectorStateInfo).anyTimes();
- EasyMock.expect(herder.buildRestartPlan(restartRequest))
- .andReturn(Optional.of(restartPlan)).anyTimes();
+ RestartPlan restartPlan = mock(RestartPlan.class);
+ ConnectorStateInfo connectorStateInfo = mock(ConnectorStateInfo.class);
+ when(restartPlan.shouldRestartConnector()).thenReturn(true);
+ when(restartPlan.shouldRestartTasks()).thenReturn(false);
+
when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo);
+
doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest);
herder.onRestart(CONNECTOR_NAME);
Review Comment:
We can remove this.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -602,36 +546,37 @@ public void testRestartConnectorAndTasksOnlyConnector()
throws Exception {
FutureCallback<ConnectorStateInfo> restartCallback = new
FutureCallback<>();
herder.restartConnectorAndTasks(restartRequest, restartCallback);
assertEquals(connectorStateInfo, restartCallback.get(1000L,
TimeUnit.MILLISECONDS));
- PowerMock.verifyAll();
+
+ verifyConnectorStatusRestart();
}
@Test
public void testRestartConnectorAndTasksOnlyTasks() throws Exception {
ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME,
false, true);
- RestartPlan restartPlan = PowerMock.createMock(RestartPlan.class);
- ConnectorStateInfo connectorStateInfo =
PowerMock.createMock(ConnectorStateInfo.class);
-
EasyMock.expect(restartPlan.shouldRestartConnector()).andReturn(false).anyTimes();
-
EasyMock.expect(restartPlan.shouldRestartTasks()).andReturn(true).anyTimes();
-
EasyMock.expect(restartPlan.restartTaskCount()).andReturn(1).anyTimes();
- EasyMock.expect(restartPlan.totalTaskCount()).andReturn(1).anyTimes();
-
EasyMock.expect(restartPlan.taskIdsToRestart()).andReturn(Collections.singletonList(taskId)).anyTimes();
-
EasyMock.expect(restartPlan.restartConnectorStateInfo()).andReturn(connectorStateInfo).anyTimes();
- EasyMock.expect(herder.buildRestartPlan(restartRequest))
- .andReturn(Optional.of(restartPlan)).anyTimes();
+ RestartPlan restartPlan = mock(RestartPlan.class);
+ ConnectorStateInfo connectorStateInfo = mock(ConnectorStateInfo.class);
+ when(restartPlan.shouldRestartConnector()).thenReturn(false);
+ when(restartPlan.shouldRestartTasks()).thenReturn(true);
+ when(restartPlan.restartTaskCount()).thenReturn(1);
+ when(restartPlan.totalTaskCount()).thenReturn(1);
+
when(restartPlan.taskIdsToRestart()).thenReturn(Collections.singletonList(taskId));
+
when(restartPlan.restartConnectorStateInfo()).thenReturn(connectorStateInfo);
+
doReturn(Optional.of(restartPlan)).when(herder).buildRestartPlan(restartRequest);
herder.onRestart(taskId);
- EasyMock.expectLastCall();
+ verify(statusBackingStore).put(new TaskStatus(new
ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.RESTARTING, WORKER_ID,
0));
- connector = PowerMock.createMock(BogusSinkConnector.class);
+ doNothing().when(herder).onRestart(taskId);
Review Comment:
We can remove this.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -127,94 +134,86 @@ private enum SourceSink {
noneConnectorClientConfigOverridePolicy = new
SampleConnectorClientConfigOverridePolicy();
@Before
- public void setup() {
- worker = PowerMock.createMock(Worker.class);
- String[] methodNames = new String[]{"connectorType",
"buildRestartPlan", "recordRestarting"};
- herder = PowerMock.createPartialMock(StandaloneHerder.class,
methodNames,
- worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new
MemoryConfigBackingStore(transformer), noneConnectorClientConfigOverridePolicy,
new MockTime());
+ public void setup() throws ExecutionException, InterruptedException {
+ worker = mock(Worker.class);
+ herder = mock(StandaloneHerder.class, withSettings()
+ .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID,
statusBackingStore, new MemoryConfigBackingStore(transformer),
noneConnectorClientConfigOverridePolicy, new MockTime())
+ .defaultAnswer(CALLS_REAL_METHODS));
createCallback = new FutureCallback<>();
- plugins = PowerMock.createMock(Plugins.class);
- pluginLoader = PowerMock.createMock(PluginClassLoader.class);
- loaderSwap = PowerMock.createMock(LoaderSwap.class);
- PowerMock.mockStatic(WorkerConnector.class);
- Capture<Map<String, String>> configCapture = Capture.newInstance();
- EasyMock.expect(transformer.transform(eq(CONNECTOR_NAME),
EasyMock.capture(configCapture))).andAnswer(configCapture::getValue).anyTimes();
+ plugins = mock(Plugins.class);
+ pluginLoader = mock(PluginClassLoader.class);
+ loaderSwap = mock(LoaderSwap.class);
+ final ArgumentCaptor<Map<String, String>> configCapture =
ArgumentCaptor.forClass(Map.class);
+ when(transformer.transform(eq(CONNECTOR_NAME),
configCapture.capture())).thenAnswer(invocation -> configCapture.getValue());
+ }
+
+ @After
+ public void tearDown() {
+ verifyNoMoreInteractions(worker, statusBackingStore);
}
@Test
public void testCreateSourceConnector() throws Exception {
- connector = PowerMock.createMock(BogusSourceConnector.class);
+ connector = mock(BogusSourceConnector.class);
expectAdd(SourceSink.SOURCE);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
- Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+ Connector connectorMock = mock(SourceConnector.class);
expectConfigValidation(connectorMock, true, config);
- PowerMock.replayAll();
-
herder.putConnectorConfig(CONNECTOR_NAME, config, false,
createCallback);
Herder.Created<ConnectorInfo> connectorInfo =
createCallback.get(1000L, TimeUnit.SECONDS);
assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
-
- PowerMock.verifyAll();
}
@Test
public void testCreateConnectorFailedValidation() {
// Basic validation should be performed and return an error, but
should still evaluate the connector's config
- connector = PowerMock.createMock(BogusSourceConnector.class);
+ connector = mock(BogusSourceConnector.class);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
config.remove(ConnectorConfig.NAME_CONFIG);
- Connector connectorMock = PowerMock.createMock(SourceConnector.class);
-
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
- final Capture<Map<String, String>> configCapture =
EasyMock.newCapture();
-
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
- EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
-
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
-
EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader);
-
EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap);
+ Connector connectorMock = mock(SourceConnector.class);
+ when(worker.configTransformer()).thenReturn(transformer);
+ final ArgumentCaptor<Map<String, String>> configCapture =
ArgumentCaptor.forClass(Map.class);
+
when(transformer.transform(configCapture.capture())).thenAnswer(invocation ->
configCapture.getValue());
+ when(worker.getPlugins()).thenReturn(plugins);
+ when(plugins.newConnector(anyString())).thenReturn(connectorMock);
+ when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
+ when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
- EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef());
+ when(connectorMock.config()).thenReturn(new ConfigDef());
ConfigValue validatedValue = new ConfigValue("foo.bar");
- EasyMock.expect(connectorMock.validate(config)).andReturn(new
Config(singletonList(validatedValue)));
+ when(connectorMock.validate(config)).thenReturn(new
Config(singletonList(validatedValue)));
loaderSwap.close();
- EasyMock.expectLastCall();
-
- PowerMock.replayAll();
herder.putConnectorConfig(CONNECTOR_NAME, config, false,
createCallback);
ExecutionException exception = assertThrows(ExecutionException.class,
() -> createCallback.get(1000L, TimeUnit.SECONDS));
if (BadRequestException.class != exception.getCause().getClass()) {
throw new AssertionError(exception.getCause());
}
- PowerMock.verifyAll();
}
@Test
public void testCreateConnectorAlreadyExists() throws Throwable {
- connector = PowerMock.createMock(BogusSourceConnector.class);
+ connector = mock(BogusSourceConnector.class);
// First addition should succeed
expectAdd(SourceSink.SOURCE);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
- Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+ Connector connectorMock = mock(SourceConnector.class);
expectConfigValidation(connectorMock, true, config, config);
-
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
- final Capture<Map<String, String>> configCapture =
EasyMock.newCapture();
-
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
- EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
-
EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader);
-
EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap);
+ when(worker.configTransformer()).thenReturn(transformer);
+ final ArgumentCaptor<Map<String, String>> configCapture =
ArgumentCaptor.forClass(Map.class);
+
when(transformer.transform(configCapture.capture())).thenAnswer(invocation ->
configCapture.getValue());
+ when(worker.getPlugins()).thenReturn(plugins);
+ when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
+ when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
Review Comment:
Isn't all of this already handled by `expectConfigValidation`?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -656,48 +600,40 @@ public void testRestartConnectorAndTasksOnlyTasks()
throws Exception {
FutureCallback<ConnectorStateInfo> restartCallback = new
FutureCallback<>();
herder.restartConnectorAndTasks(restartRequest, restartCallback);
assertEquals(connectorStateInfo, restartCallback.get(1000L,
TimeUnit.MILLISECONDS));
Review Comment:
After this part, we can add a similar assertion to
`verifyConnectorStatusRestart`, but that operates on task statuses instead.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -1225,22 +1090,21 @@ private void expectConfigValidation(
Map<String, String>... configs
) {
// config validation
-
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
- final Capture<Map<String, String>> configCapture =
EasyMock.newCapture();
-
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
- EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
-
EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader);
-
EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap);
+ when(worker.configTransformer()).thenReturn(transformer);
+ final ArgumentCaptor<Map<String, String>> configCapture =
ArgumentCaptor.forClass(Map.class);
+
when(transformer.transform(configCapture.capture())).thenAnswer(invocation ->
configCapture.getValue());
+ when(worker.getPlugins()).thenReturn(plugins);
+ when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
+ when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
if (shouldCreateConnector) {
- EasyMock.expect(worker.getPlugins()).andReturn(plugins);
-
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+ when(worker.getPlugins()).thenReturn(plugins);
+ when(plugins.newConnector(anyString())).thenReturn(connectorMock);
}
- EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef());
+ when(connectorMock.config()).thenReturn(new ConfigDef());
for (Map<String, String> config : configs)
- EasyMock.expect(connectorMock.validate(config)).andReturn(new
Config(Collections.emptyList()));
+ when(connectorMock.validate(config)).thenReturn(new
Config(Collections.emptyList()));
loaderSwap.close();
Review Comment:
Why is this here? Shouldn't we remove it?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -1256,4 +1120,11 @@ private abstract class BogusSinkConnector extends
SinkConnector {
private abstract class BogusSinkTask extends SourceTask {
}
+ private void verifyConnectorStatusRestart() {
+ ArgumentCaptor<ConnectorStatus> connectorStatus =
ArgumentCaptor.forClass(ConnectorStatus.class);
+ verify(statusBackingStore,
atLeastOnce()).put(connectorStatus.capture());
Review Comment:
We can remove the `atLeastOnce()` from this part (which gives us stronger
testing guarantees).
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -281,45 +274,38 @@ public void testDestroyConnector() throws Exception {
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof NotFoundException);
}
Review Comment:
Nit: while we're in the neighborhood, we can also update things to use
`assertThrows`:
```java
ExecutionException e = assertThrows(
"Should have thrown wrapped NotFoundException",
ExecutionException.class,
() -> failedDeleteCallback.get(1000L, TimeUnit.MILLISECONDS)
);
assertTrue(e.getCause() instanceof NotFoundException);
```
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -127,94 +134,86 @@ private enum SourceSink {
noneConnectorClientConfigOverridePolicy = new
SampleConnectorClientConfigOverridePolicy();
@Before
- public void setup() {
- worker = PowerMock.createMock(Worker.class);
- String[] methodNames = new String[]{"connectorType",
"buildRestartPlan", "recordRestarting"};
- herder = PowerMock.createPartialMock(StandaloneHerder.class,
methodNames,
- worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new
MemoryConfigBackingStore(transformer), noneConnectorClientConfigOverridePolicy,
new MockTime());
+ public void setup() throws ExecutionException, InterruptedException {
+ worker = mock(Worker.class);
+ herder = mock(StandaloneHerder.class, withSettings()
+ .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID,
statusBackingStore, new MemoryConfigBackingStore(transformer),
noneConnectorClientConfigOverridePolicy, new MockTime())
+ .defaultAnswer(CALLS_REAL_METHODS));
createCallback = new FutureCallback<>();
- plugins = PowerMock.createMock(Plugins.class);
- pluginLoader = PowerMock.createMock(PluginClassLoader.class);
- loaderSwap = PowerMock.createMock(LoaderSwap.class);
- PowerMock.mockStatic(WorkerConnector.class);
- Capture<Map<String, String>> configCapture = Capture.newInstance();
- EasyMock.expect(transformer.transform(eq(CONNECTOR_NAME),
EasyMock.capture(configCapture))).andAnswer(configCapture::getValue).anyTimes();
+ plugins = mock(Plugins.class);
+ pluginLoader = mock(PluginClassLoader.class);
+ loaderSwap = mock(LoaderSwap.class);
+ final ArgumentCaptor<Map<String, String>> configCapture =
ArgumentCaptor.forClass(Map.class);
+ when(transformer.transform(eq(CONNECTOR_NAME),
configCapture.capture())).thenAnswer(invocation -> configCapture.getValue());
+ }
+
+ @After
+ public void tearDown() {
+ verifyNoMoreInteractions(worker, statusBackingStore);
}
@Test
public void testCreateSourceConnector() throws Exception {
- connector = PowerMock.createMock(BogusSourceConnector.class);
+ connector = mock(BogusSourceConnector.class);
expectAdd(SourceSink.SOURCE);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
- Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+ Connector connectorMock = mock(SourceConnector.class);
expectConfigValidation(connectorMock, true, config);
- PowerMock.replayAll();
-
herder.putConnectorConfig(CONNECTOR_NAME, config, false,
createCallback);
Herder.Created<ConnectorInfo> connectorInfo =
createCallback.get(1000L, TimeUnit.SECONDS);
assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
-
- PowerMock.verifyAll();
}
@Test
public void testCreateConnectorFailedValidation() {
// Basic validation should be performed and return an error, but
should still evaluate the connector's config
- connector = PowerMock.createMock(BogusSourceConnector.class);
+ connector = mock(BogusSourceConnector.class);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
config.remove(ConnectorConfig.NAME_CONFIG);
- Connector connectorMock = PowerMock.createMock(SourceConnector.class);
-
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
- final Capture<Map<String, String>> configCapture =
EasyMock.newCapture();
-
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
- EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
-
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
-
EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader);
-
EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap);
+ Connector connectorMock = mock(SourceConnector.class);
+ when(worker.configTransformer()).thenReturn(transformer);
+ final ArgumentCaptor<Map<String, String>> configCapture =
ArgumentCaptor.forClass(Map.class);
+
when(transformer.transform(configCapture.capture())).thenAnswer(invocation ->
configCapture.getValue());
+ when(worker.getPlugins()).thenReturn(plugins);
+ when(plugins.newConnector(anyString())).thenReturn(connectorMock);
+ when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
+ when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
- EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef());
+ when(connectorMock.config()).thenReturn(new ConfigDef());
ConfigValue validatedValue = new ConfigValue("foo.bar");
- EasyMock.expect(connectorMock.validate(config)).andReturn(new
Config(singletonList(validatedValue)));
+ when(connectorMock.validate(config)).thenReturn(new
Config(singletonList(validatedValue)));
loaderSwap.close();
Review Comment:
Why is this here? Should it be verified after the test (i.e.,
`verify(loaderSwap).close();`) instead?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -127,94 +134,86 @@ private enum SourceSink {
noneConnectorClientConfigOverridePolicy = new
SampleConnectorClientConfigOverridePolicy();
@Before
- public void setup() {
- worker = PowerMock.createMock(Worker.class);
- String[] methodNames = new String[]{"connectorType",
"buildRestartPlan", "recordRestarting"};
- herder = PowerMock.createPartialMock(StandaloneHerder.class,
methodNames,
- worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new
MemoryConfigBackingStore(transformer), noneConnectorClientConfigOverridePolicy,
new MockTime());
+ public void setup() throws ExecutionException, InterruptedException {
+ worker = mock(Worker.class);
+ herder = mock(StandaloneHerder.class, withSettings()
+ .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID,
statusBackingStore, new MemoryConfigBackingStore(transformer),
noneConnectorClientConfigOverridePolicy, new MockTime())
+ .defaultAnswer(CALLS_REAL_METHODS));
createCallback = new FutureCallback<>();
- plugins = PowerMock.createMock(Plugins.class);
- pluginLoader = PowerMock.createMock(PluginClassLoader.class);
- loaderSwap = PowerMock.createMock(LoaderSwap.class);
- PowerMock.mockStatic(WorkerConnector.class);
- Capture<Map<String, String>> configCapture = Capture.newInstance();
- EasyMock.expect(transformer.transform(eq(CONNECTOR_NAME),
EasyMock.capture(configCapture))).andAnswer(configCapture::getValue).anyTimes();
+ plugins = mock(Plugins.class);
+ pluginLoader = mock(PluginClassLoader.class);
+ loaderSwap = mock(LoaderSwap.class);
+ final ArgumentCaptor<Map<String, String>> configCapture =
ArgumentCaptor.forClass(Map.class);
+ when(transformer.transform(eq(CONNECTOR_NAME),
configCapture.capture())).thenAnswer(invocation -> configCapture.getValue());
+ }
+
+ @After
+ public void tearDown() {
+ verifyNoMoreInteractions(worker, statusBackingStore);
}
@Test
public void testCreateSourceConnector() throws Exception {
- connector = PowerMock.createMock(BogusSourceConnector.class);
+ connector = mock(BogusSourceConnector.class);
expectAdd(SourceSink.SOURCE);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
- Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+ Connector connectorMock = mock(SourceConnector.class);
expectConfigValidation(connectorMock, true, config);
- PowerMock.replayAll();
-
herder.putConnectorConfig(CONNECTOR_NAME, config, false,
createCallback);
Herder.Created<ConnectorInfo> connectorInfo =
createCallback.get(1000L, TimeUnit.SECONDS);
assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
-
- PowerMock.verifyAll();
}
@Test
public void testCreateConnectorFailedValidation() {
// Basic validation should be performed and return an error, but
should still evaluate the connector's config
- connector = PowerMock.createMock(BogusSourceConnector.class);
+ connector = mock(BogusSourceConnector.class);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
config.remove(ConnectorConfig.NAME_CONFIG);
- Connector connectorMock = PowerMock.createMock(SourceConnector.class);
-
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
- final Capture<Map<String, String>> configCapture =
EasyMock.newCapture();
-
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
- EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
-
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
-
EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader);
-
EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap);
+ Connector connectorMock = mock(SourceConnector.class);
+ when(worker.configTransformer()).thenReturn(transformer);
+ final ArgumentCaptor<Map<String, String>> configCapture =
ArgumentCaptor.forClass(Map.class);
+
when(transformer.transform(configCapture.capture())).thenAnswer(invocation ->
configCapture.getValue());
+ when(worker.getPlugins()).thenReturn(plugins);
+ when(plugins.newConnector(anyString())).thenReturn(connectorMock);
+ when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
+ when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
- EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef());
+ when(connectorMock.config()).thenReturn(new ConfigDef());
ConfigValue validatedValue = new ConfigValue("foo.bar");
- EasyMock.expect(connectorMock.validate(config)).andReturn(new
Config(singletonList(validatedValue)));
+ when(connectorMock.validate(config)).thenReturn(new
Config(singletonList(validatedValue)));
loaderSwap.close();
- EasyMock.expectLastCall();
-
- PowerMock.replayAll();
herder.putConnectorConfig(CONNECTOR_NAME, config, false,
createCallback);
ExecutionException exception = assertThrows(ExecutionException.class,
() -> createCallback.get(1000L, TimeUnit.SECONDS));
if (BadRequestException.class != exception.getCause().getClass()) {
throw new AssertionError(exception.getCause());
}
- PowerMock.verifyAll();
}
@Test
public void testCreateConnectorAlreadyExists() throws Throwable {
- connector = PowerMock.createMock(BogusSourceConnector.class);
+ connector = mock(BogusSourceConnector.class);
// First addition should succeed
expectAdd(SourceSink.SOURCE);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
- Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+ Connector connectorMock = mock(SourceConnector.class);
expectConfigValidation(connectorMock, true, config, config);
-
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
- final Capture<Map<String, String>> configCapture =
EasyMock.newCapture();
-
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
- EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
-
EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader);
-
EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap);
+ when(worker.configTransformer()).thenReturn(transformer);
+ final ArgumentCaptor<Map<String, String>> configCapture =
ArgumentCaptor.forClass(Map.class);
+
when(transformer.transform(configCapture.capture())).thenAnswer(invocation ->
configCapture.getValue());
+ when(worker.getPlugins()).thenReturn(plugins);
+ when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
+ when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
// No new connector is created
- loaderSwap.close();
- EasyMock.expectLastCall();
Review Comment:
Shouldn't we verify this at the end of the test? Or, better yet, verify once
after the first call to `Herder::putConnectorConfig` and again (with
`times(2)`) after the second call?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -281,45 +274,38 @@ public void testDestroyConnector() throws Exception {
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof NotFoundException);
}
-
- PowerMock.verifyAll();
}
@Test
public void testRestartConnectorSameTaskConfigs() throws Exception {
expectAdd(SourceSink.SOURCE);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
- Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+ Connector connectorMock = mock(SourceConnector.class);
expectConfigValidation(connectorMock, true, config);
- worker.stopAndAwaitConnector(CONNECTOR_NAME);
- EasyMock.expectLastCall();
-
- Capture<Callback<TargetState>> onStart = EasyMock.newCapture();
- worker.startConnector(eq(CONNECTOR_NAME), eq(config),
EasyMock.anyObject(HerderConnectorContext.class),
- eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart));
- EasyMock.expectLastCall().andAnswer(() -> {
+ final ArgumentCaptor<Callback<TargetState>> onStart =
ArgumentCaptor.forClass(Callback.class);
+ doAnswer(invocation -> {
onStart.getValue().onCompletion(null, TargetState.STARTED);
return true;
- });
+ }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(config),
any(HerderConnectorContext.class),
+ eq(herder), eq(TargetState.STARTED), onStart.capture());
-
EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONNECTOR_NAME));
- EasyMock.expect(worker.getPlugins()).andReturn(plugins);
+
when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME));
+ when(worker.getPlugins()).thenReturn(plugins);
// same task configs as earlier, so don't expect a new set of tasks to
be brought up
- EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, new
SourceConnectorConfig(plugins, config,
true))).andReturn(Collections.singletonList(taskConfig(SourceSink.SOURCE)));
-
- PowerMock.replayAll();
+ when(worker.connectorTaskConfigs(CONNECTOR_NAME, new
SourceConnectorConfig(plugins, config, true)))
+
.thenReturn(Collections.singletonList(taskConfig(SourceSink.SOURCE)));
herder.putConnectorConfig(CONNECTOR_NAME, config, false,
createCallback);
Herder.Created<ConnectorInfo> connectorInfo =
createCallback.get(1000L, TimeUnit.SECONDS);
assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
FutureCallback<Void> restartCallback = new FutureCallback<>();
herder.restartConnector(CONNECTOR_NAME, restartCallback);
+ worker.stopAndAwaitConnector(CONNECTOR_NAME);
restartCallback.get(1000L, TimeUnit.MILLISECONDS);
-
- PowerMock.verifyAll();
+ verify(worker,
atLeastOnce()).stopAndAwaitConnector(eq(CONNECTOR_NAME));
Review Comment:
We can remove the `atLeastOnce()` from this verification.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -127,94 +134,86 @@ private enum SourceSink {
noneConnectorClientConfigOverridePolicy = new
SampleConnectorClientConfigOverridePolicy();
@Before
- public void setup() {
- worker = PowerMock.createMock(Worker.class);
- String[] methodNames = new String[]{"connectorType",
"buildRestartPlan", "recordRestarting"};
- herder = PowerMock.createPartialMock(StandaloneHerder.class,
methodNames,
- worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new
MemoryConfigBackingStore(transformer), noneConnectorClientConfigOverridePolicy,
new MockTime());
+ public void setup() throws ExecutionException, InterruptedException {
+ worker = mock(Worker.class);
+ herder = mock(StandaloneHerder.class, withSettings()
+ .useConstructor(worker, WORKER_ID, KAFKA_CLUSTER_ID,
statusBackingStore, new MemoryConfigBackingStore(transformer),
noneConnectorClientConfigOverridePolicy, new MockTime())
+ .defaultAnswer(CALLS_REAL_METHODS));
createCallback = new FutureCallback<>();
- plugins = PowerMock.createMock(Plugins.class);
- pluginLoader = PowerMock.createMock(PluginClassLoader.class);
- loaderSwap = PowerMock.createMock(LoaderSwap.class);
- PowerMock.mockStatic(WorkerConnector.class);
- Capture<Map<String, String>> configCapture = Capture.newInstance();
- EasyMock.expect(transformer.transform(eq(CONNECTOR_NAME),
EasyMock.capture(configCapture))).andAnswer(configCapture::getValue).anyTimes();
+ plugins = mock(Plugins.class);
+ pluginLoader = mock(PluginClassLoader.class);
+ loaderSwap = mock(LoaderSwap.class);
+ final ArgumentCaptor<Map<String, String>> configCapture =
ArgumentCaptor.forClass(Map.class);
+ when(transformer.transform(eq(CONNECTOR_NAME),
configCapture.capture())).thenAnswer(invocation -> configCapture.getValue());
+ }
+
+ @After
+ public void tearDown() {
+ verifyNoMoreInteractions(worker, statusBackingStore);
}
@Test
public void testCreateSourceConnector() throws Exception {
- connector = PowerMock.createMock(BogusSourceConnector.class);
+ connector = mock(BogusSourceConnector.class);
expectAdd(SourceSink.SOURCE);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
- Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+ Connector connectorMock = mock(SourceConnector.class);
expectConfigValidation(connectorMock, true, config);
- PowerMock.replayAll();
-
herder.putConnectorConfig(CONNECTOR_NAME, config, false,
createCallback);
Herder.Created<ConnectorInfo> connectorInfo =
createCallback.get(1000L, TimeUnit.SECONDS);
assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
-
- PowerMock.verifyAll();
}
@Test
public void testCreateConnectorFailedValidation() {
// Basic validation should be performed and return an error, but
should still evaluate the connector's config
- connector = PowerMock.createMock(BogusSourceConnector.class);
+ connector = mock(BogusSourceConnector.class);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
config.remove(ConnectorConfig.NAME_CONFIG);
- Connector connectorMock = PowerMock.createMock(SourceConnector.class);
-
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
- final Capture<Map<String, String>> configCapture =
EasyMock.newCapture();
-
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
- EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
-
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
-
EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader);
-
EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap);
+ Connector connectorMock = mock(SourceConnector.class);
+ when(worker.configTransformer()).thenReturn(transformer);
+ final ArgumentCaptor<Map<String, String>> configCapture =
ArgumentCaptor.forClass(Map.class);
+
when(transformer.transform(configCapture.capture())).thenAnswer(invocation ->
configCapture.getValue());
+ when(worker.getPlugins()).thenReturn(plugins);
+ when(plugins.newConnector(anyString())).thenReturn(connectorMock);
+ when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
+ when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
- EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef());
+ when(connectorMock.config()).thenReturn(new ConfigDef());
ConfigValue validatedValue = new ConfigValue("foo.bar");
- EasyMock.expect(connectorMock.validate(config)).andReturn(new
Config(singletonList(validatedValue)));
+ when(connectorMock.validate(config)).thenReturn(new
Config(singletonList(validatedValue)));
loaderSwap.close();
- EasyMock.expectLastCall();
-
- PowerMock.replayAll();
herder.putConnectorConfig(CONNECTOR_NAME, config, false,
createCallback);
ExecutionException exception = assertThrows(ExecutionException.class,
() -> createCallback.get(1000L, TimeUnit.SECONDS));
if (BadRequestException.class != exception.getCause().getClass()) {
throw new AssertionError(exception.getCause());
}
- PowerMock.verifyAll();
}
@Test
public void testCreateConnectorAlreadyExists() throws Throwable {
- connector = PowerMock.createMock(BogusSourceConnector.class);
+ connector = mock(BogusSourceConnector.class);
// First addition should succeed
expectAdd(SourceSink.SOURCE);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
- Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+ Connector connectorMock = mock(SourceConnector.class);
expectConfigValidation(connectorMock, true, config, config);
-
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
- final Capture<Map<String, String>> configCapture =
EasyMock.newCapture();
-
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
- EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
-
EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader);
-
EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap);
+ when(worker.configTransformer()).thenReturn(transformer);
+ final ArgumentCaptor<Map<String, String>> configCapture =
ArgumentCaptor.forClass(Map.class);
+
when(transformer.transform(configCapture.capture())).thenAnswer(invocation ->
configCapture.getValue());
+ when(worker.getPlugins()).thenReturn(plugins);
+ when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
+ when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
// No new connector is created
Review Comment:
This comment doesn't really make sense in this place anymore. Maybe move it
to after `// Second should fail`?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -281,45 +274,38 @@ public void testDestroyConnector() throws Exception {
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof NotFoundException);
}
-
- PowerMock.verifyAll();
}
@Test
public void testRestartConnectorSameTaskConfigs() throws Exception {
expectAdd(SourceSink.SOURCE);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
- Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+ Connector connectorMock = mock(SourceConnector.class);
expectConfigValidation(connectorMock, true, config);
- worker.stopAndAwaitConnector(CONNECTOR_NAME);
- EasyMock.expectLastCall();
-
- Capture<Callback<TargetState>> onStart = EasyMock.newCapture();
- worker.startConnector(eq(CONNECTOR_NAME), eq(config),
EasyMock.anyObject(HerderConnectorContext.class),
- eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart));
- EasyMock.expectLastCall().andAnswer(() -> {
+ final ArgumentCaptor<Callback<TargetState>> onStart =
ArgumentCaptor.forClass(Callback.class);
+ doAnswer(invocation -> {
onStart.getValue().onCompletion(null, TargetState.STARTED);
return true;
- });
+ }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(config),
any(HerderConnectorContext.class),
+ eq(herder), eq(TargetState.STARTED), onStart.capture());
-
EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONNECTOR_NAME));
- EasyMock.expect(worker.getPlugins()).andReturn(plugins);
+
when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME));
+ when(worker.getPlugins()).thenReturn(plugins);
// same task configs as earlier, so don't expect a new set of tasks to
be brought up
- EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, new
SourceConnectorConfig(plugins, config,
true))).andReturn(Collections.singletonList(taskConfig(SourceSink.SOURCE)));
-
- PowerMock.replayAll();
+ when(worker.connectorTaskConfigs(CONNECTOR_NAME, new
SourceConnectorConfig(plugins, config, true)))
+
.thenReturn(Collections.singletonList(taskConfig(SourceSink.SOURCE)));
herder.putConnectorConfig(CONNECTOR_NAME, config, false,
createCallback);
Herder.Created<ConnectorInfo> connectorInfo =
createCallback.get(1000L, TimeUnit.SECONDS);
assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
FutureCallback<Void> restartCallback = new FutureCallback<>();
herder.restartConnector(CONNECTOR_NAME, restartCallback);
+ worker.stopAndAwaitConnector(CONNECTOR_NAME);
Review Comment:
This can be removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]