yashmayya commented on code in PR #13530: URL: https://github.com/apache/kafka/pull/13530#discussion_r1164035350
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java: ########## @@ -229,7 +229,12 @@ private synchronized void putConnectorConfig(String connName, } requestExecutorService.submit(() -> { - updateConnectorTasks(connName); + try { + updateConnectorTasks(connName); + } catch (Throwable t) { + callback.onCompletion(t, null); Review Comment: This callback comes from [here](https://github.com/apache/kafka/blob/e1e3900ba1980ca774b927df4a8713a0328eeb86/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java#L146) / [here](https://github.com/apache/kafka/blob/e1e3900ba1980ca774b927df4a8713a0328eeb86/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java#L227) and is thrown [here](https://github.com/apache/kafka/blob/e1e3900ba1980ca774b927df4a8713a0328eeb86/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java#L66). This exception is then converted into a REST response by the `ConnectExceptionMapper` and is also logged [here](https://github.com/apache/kafka/blob/e1e3900ba1980ca774b927df4a8713a0328eeb86/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java#L64). ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -402,6 +403,44 @@ public void testRestartConnectorFailureOnStart() throws Exception { PowerMock.verifyAll(); } + @Test + public void testRestartConnectorWithTaskConfigsException() throws Exception { + expectAdd(SourceSink.SOURCE); + + Map<String, String> config = connectorConfig(SourceSink.SOURCE); + Connector connectorMock = PowerMock.createMock(SourceConnector.class); + expectConfigValidation(connectorMock, true, config); + + worker.stopAndAwaitConnector(CONNECTOR_NAME); + EasyMock.expectLastCall(); + + Capture<Callback<TargetState>> onStart = EasyMock.newCapture(); + worker.startConnector(eq(CONNECTOR_NAME), eq(config), EasyMock.anyObject(HerderConnectorContext.class), + eq(herder), eq(TargetState.STARTED), EasyMock.capture(onStart)); + EasyMock.expectLastCall().andAnswer(() -> { + onStart.getValue().onCompletion(null, TargetState.STARTED); + return true; + }); + EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONNECTOR_NAME)); + EasyMock.expect(worker.getPlugins()).andReturn(plugins); + + ConnectException taskConfigsException = new ConnectException("Test exception"); + EasyMock.expect(worker.connectorTaskConfigs(eq(CONNECTOR_NAME), anyObject(ConnectorConfig.class))).andThrow(taskConfigsException); + + PowerMock.replayAll(); + + herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS); + assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); Review Comment: This test is still using `EasyMock` where expectations are queued up and then replayed in the same order. The `expectAdd` method here (first line in the test) sets up the following expectation first: ``` EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, connConfig)) .andReturn(singletonList(generatedTaskProps)); ``` ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java: ########## @@ -296,7 +301,12 @@ public synchronized void restartConnector(String connName, Callback<Void> cb) { startConnector(connName, (error, targetState) -> { if (targetState == TargetState.STARTED) { - requestTaskReconfiguration(connName); + try { + requestTaskReconfiguration(connName); + } catch (Throwable t) { + cb.onCompletion(t, null); Review Comment: Same as above, this also comes from a REST API call. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org