yashmayya commented on code in PR #13424: URL: https://github.com/apache/kafka/pull/13424#discussion_r1146151628
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ########## @@ -215,6 +215,17 @@ public class DistributedHerderTest { Collections.emptyMap(), Collections.emptySet(), Collections.emptySet()); + private static final ClusterConfigState SNAPSHOT_STOPPED_CONN1 = new ClusterConfigState( + 1, + null, + Collections.singletonMap(CONN1, 3), + Collections.singletonMap(CONN1, CONN1_CONFIG), + Collections.singletonMap(CONN1, TargetState.STOPPED), + TASK_CONFIGS_MAP, Review Comment: nit: a stopped connector would be expected to have an empty set of task configs ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ########## @@ -2159,6 +2170,66 @@ public void testConnectorResumed() throws Exception { PowerMock.verifyAll(); } + @Test + public void testConnectorStopped() throws Exception { + // ensure that target state changes are propagated to the worker + + EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1)); + + // join + expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList()); + expectConfigRefreshAndSnapshot(SNAPSHOT); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + Capture<Callback<TargetState>> onStart = newCapture(); + worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), EasyMock.anyObject(), + EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), capture(onStart)); + PowerMock.expectLastCall().andAnswer(() -> { + onStart.getValue().onCompletion(null, TargetState.STARTED); + return true; + }); + member.wakeup(); + PowerMock.expectLastCall(); + expectExecuteTaskReconfiguration(true, conn1SinkConfig, () -> TASK_CONFIGS); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // handle the state change + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + + EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_STOPPED_CONN1); + PowerMock.expectLastCall(); + + Capture<Callback<TargetState>> onStop = newCapture(); + worker.setTargetState(EasyMock.eq(CONN1), EasyMock.eq(TargetState.STOPPED), capture(onStop)); + PowerMock.expectLastCall().andAnswer(() -> { + onStart.getValue().onCompletion(null, TargetState.STOPPED); + return null; + }); + + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // These will occur just before/during the third tick + member.ensureActive(); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); // join + configUpdateListener.onConnectorTargetStateChange(CONN1); // state changes to paused Review Comment: ```suggestion configUpdateListener.onConnectorTargetStateChange(CONN1); // state changes to stopped ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ########## @@ -2159,6 +2170,66 @@ public void testConnectorResumed() throws Exception { PowerMock.verifyAll(); } + @Test + public void testConnectorStopped() throws Exception { + // ensure that target state changes are propagated to the worker + + EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1)); + + // join + expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList()); + expectConfigRefreshAndSnapshot(SNAPSHOT); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + Capture<Callback<TargetState>> onStart = newCapture(); + worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), EasyMock.anyObject(), + EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), capture(onStart)); + PowerMock.expectLastCall().andAnswer(() -> { + onStart.getValue().onCompletion(null, TargetState.STARTED); + return true; + }); + member.wakeup(); + PowerMock.expectLastCall(); + expectExecuteTaskReconfiguration(true, conn1SinkConfig, () -> TASK_CONFIGS); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // handle the state change + member.wakeup(); + PowerMock.expectLastCall(); + member.ensureActive(); + PowerMock.expectLastCall(); + + EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_STOPPED_CONN1); + PowerMock.expectLastCall(); Review Comment: ```suggestion ``` nit: we're already setting up the expectation in the previous statement -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org