vamossagar12 commented on code in PR #13530: URL: https://github.com/apache/kafka/pull/13530#discussion_r1163797689
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java: ########## @@ -296,7 +301,12 @@ public synchronized void restartConnector(String connName, Callback<Void> cb) { startConnector(connName, (error, targetState) -> { if (targetState == TargetState.STARTED) { - requestTaskReconfiguration(connName); + try { + requestTaskReconfiguration(connName); + } catch (Throwable t) { + cb.onCompletion(t, null); Review Comment: Same here. Should we log it? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java: ########## @@ -229,7 +229,12 @@ private synchronized void putConnectorConfig(String connName, } requestExecutorService.submit(() -> { - updateConnectorTasks(connName); + try { + updateConnectorTasks(connName); + } catch (Throwable t) { + callback.onCompletion(t, null); Review Comment: Should we log the exception as well? I think that's what the JIRA ticket also talks about. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -402,6 +403,44 @@ public void testRestartConnectorFailureOnStart() throws Exception { PowerMock.verifyAll(); } + @Test + public void testRestartConnectorWithTaskConfigsException() throws Exception { + expectAdd(SourceSink.SOURCE); + + Map<String, String> config = connectorConfig(SourceSink.SOURCE); + Connector connectorMock = PowerMock.createMock(SourceConnector.class); + expectConfigValidation(connectorMock, true, config); + + worker.stopAndAwaitConnector(CONNECTOR_NAME); + EasyMock.expectLastCall(); + + Capture<Callback<TargetState>> onStart = EasyMock.newCapture(); + worker.startConnector(eq(CONNECTOR_NAME), eq(config), EasyMock.anyObject(HerderConnectorContext.class), + eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart)); + EasyMock.expectLastCall().andAnswer(() -> { + onStart.getValue().onCompletion(null, TargetState.STARTED); + return true; + }); + EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONNECTOR_NAME)); + EasyMock.expect(worker.getPlugins()).andReturn(plugins); + + ConnectException taskConfigsException = new ConnectException("Test exception"); + EasyMock.expect(worker.connectorTaskConfigs(eq(CONNECTOR_NAME), anyObject(ConnectorConfig.class))).andThrow(taskConfigsException); + + PowerMock.replayAll(); + + herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); + assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); Review Comment: I may be missing something here but shouldn't the callback have an error here as well because `worker.connectorTaskConfigs` always throws a taskConfigsException? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org