C0urante commented on code in PR #12295: URL: https://github.com/apache/kafka/pull/12295#discussion_r903123084
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ########## @@ -1904,6 +1906,78 @@ public void testConnectorConfigUpdate() throws Exception { PowerMock.verifyAll(); } + @Test + public void testConnectorConfigUpdateFailedTransformation() throws Exception { + // Connector config can be applied without any rebalance + + EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1)); + + WorkerConfigTransformer configTransformer = EasyMock.mock(WorkerConfigTransformer.class); + ClusterConfigState snapshotWithTransform = new ClusterConfigState(1, null, Collections.singletonMap(CONN1, 1), + Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), + TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet(), configTransformer); + + // join + expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList()); + EasyMock.expect(configTransformer.transform(EasyMock.eq(CONN1), EasyMock.anyObject())) + .andReturn(CONN1_CONFIG).times(2); // once for the connector, once for the single task + expectConfigRefreshAndSnapshot(snapshotWithTransform); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + Capture<Callback<TargetState>> onStart = newCapture(); + worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), EasyMock.anyObject(), + EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), capture(onStart)); + PowerMock.expectLastCall().andAnswer(() -> { + onStart.getValue().onCompletion(null, TargetState.STARTED); + return true; + }); + member.wakeup(); + PowerMock.expectLastCall(); + EasyMock.expect(worker.getPlugins()).andReturn(plugins); + EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // apply config + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + EasyMock.expect(configBackingStore.snapshot()).andReturn(snapshotWithTransform); // for this test, it doesn't matter if we use the same config snapshot + EasyMock.expect(configTransformer.transform(EasyMock.eq(CONN1), EasyMock.anyObject())) + .andThrow(new AssertionError("Config transformation should not crash herder")); Review Comment: Nit: I get that if this exception bubbles up to the surface of a unit test, it does count as a failed assertion of sorts. But I usually associate an `AssertionError` with a failed attempt to actively assert some state that we've calculated during the test, like comparing an expected exception message to an actual one, or counting the number of elements in a list. It's also a little unrealistic for a config provider to throw this kind of exception. I'd personally opt to replace this with a `ConnectException` or a `ConfigException` with a self-describing body like "simulated exception thrown from config provider", which might help illustrate the purpose of the test better to fresh eyes. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ########## @@ -1904,6 +1906,78 @@ public void testConnectorConfigUpdate() throws Exception { PowerMock.verifyAll(); } + @Test + public void testConnectorConfigUpdateFailedTransformation() throws Exception { + // Connector config can be applied without any rebalance + + EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1)); + + WorkerConfigTransformer configTransformer = EasyMock.mock(WorkerConfigTransformer.class); + ClusterConfigState snapshotWithTransform = new ClusterConfigState(1, null, Collections.singletonMap(CONN1, 1), + Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), + TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet(), Collections.emptySet(), configTransformer); + + // join + expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList()); + EasyMock.expect(configTransformer.transform(EasyMock.eq(CONN1), EasyMock.anyObject())) + .andReturn(CONN1_CONFIG).times(2); // once for the connector, once for the single task + expectConfigRefreshAndSnapshot(snapshotWithTransform); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + Capture<Callback<TargetState>> onStart = newCapture(); + worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), EasyMock.anyObject(), + EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), capture(onStart)); + PowerMock.expectLastCall().andAnswer(() -> { + onStart.getValue().onCompletion(null, TargetState.STARTED); + return true; + }); + member.wakeup(); + PowerMock.expectLastCall(); + EasyMock.expect(worker.getPlugins()).andReturn(plugins); + EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // apply config + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + EasyMock.expect(configBackingStore.snapshot()).andReturn(snapshotWithTransform); // for this test, it doesn't matter if we use the same config snapshot + EasyMock.expect(configTransformer.transform(EasyMock.eq(CONN1), EasyMock.anyObject())) + .andThrow(new AssertionError("Config transformation should not crash herder")); + worker.stopAndAwaitConnector(CONN1); + PowerMock.expectLastCall(); + Capture<ConnectorStatus> failedStatus = newCapture(); + statusBackingStore.putSafe(capture(failedStatus)); + PowerMock.expectLastCall().andAnswer(() -> { + assertEquals(CONN1, failedStatus.getValue().id()); + assertEquals(FAILED, failedStatus.getValue().state()); Review Comment: Nit: it's a little strange to do these assertions in an answer to the method invocation. If the herder has logic to handle exceptions thrown by `statusBackingStore::putSafe`, or if the call to that method takes place on a different thread, it might get accidentally swallowed and missed by the test. It might be better to move this logic into the top-level body of the unit test method, probably near the call to `PowerMock::verifyAll`. It'd be less susceptible to false negatives, and IMO would be a little easier to follow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org