yashmayya commented on code in PR #13530:
URL: https://github.com/apache/kafka/pull/13530#discussion_r1164035350


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -229,7 +229,12 @@ private synchronized void putConnectorConfig(String 
connName,
                 }
 
                 requestExecutorService.submit(() -> {
-                    updateConnectorTasks(connName);
+                    try {
+                        updateConnectorTasks(connName);
+                    } catch (Throwable t) {
+                        callback.onCompletion(t, null);

Review Comment:
   This callback comes from 
[here](https://github.com/apache/kafka/blob/e1e3900ba1980ca774b927df4a8713a0328eeb86/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java#L146)
 / 
[here](https://github.com/apache/kafka/blob/e1e3900ba1980ca774b927df4a8713a0328eeb86/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java#L227)
 and is thrown 
[here](https://github.com/apache/kafka/blob/e1e3900ba1980ca774b927df4a8713a0328eeb86/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java#L66).
 This exception is then converted into a REST response by the 
`ConnectExceptionMapper` and is also logged 
[here](https://github.com/apache/kafka/blob/e1e3900ba1980ca774b927df4a8713a0328eeb86/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java#L64).



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -402,6 +403,44 @@ public void testRestartConnectorFailureOnStart() throws 
Exception {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testRestartConnectorWithTaskConfigsException() throws 
Exception {
+        expectAdd(SourceSink.SOURCE);
+
+        Map<String, String> config = connectorConfig(SourceSink.SOURCE);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, config);
+
+        worker.stopAndAwaitConnector(CONNECTOR_NAME);
+        EasyMock.expectLastCall();
+
+        Capture<Callback<TargetState>> onStart = EasyMock.newCapture();
+        worker.startConnector(eq(CONNECTOR_NAME), eq(config), 
EasyMock.anyObject(HerderConnectorContext.class),
+                eq(herder), eq(TargetState.STARTED), 
EasyMock.capture(onStart));
+        EasyMock.expectLastCall().andAnswer(() -> {
+            onStart.getValue().onCompletion(null, TargetState.STARTED);
+            return true;
+        });
+        
EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONNECTOR_NAME));
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
+
+        ConnectException taskConfigsException = new ConnectException("Test 
exception");
+        EasyMock.expect(worker.connectorTaskConfigs(eq(CONNECTOR_NAME), 
anyObject(ConnectorConfig.class))).andThrow(taskConfigsException);
+
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
+        assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());

Review Comment:
   This test is still using `EasyMock` where expectations are queued up and 
then replayed in the same order. The `expectAdd` method here (first line in the 
test) sets up the following expectation first:
   
   ```
   EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, connConfig))
               .andReturn(singletonList(generatedTaskProps));
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -296,7 +301,12 @@ public synchronized void restartConnector(String connName, 
Callback<Void> cb) {
 
         startConnector(connName, (error, targetState) -> {
             if (targetState == TargetState.STARTED) {
-                requestTaskReconfiguration(connName);
+                try {
+                    requestTaskReconfiguration(connName);
+                } catch (Throwable t) {
+                    cb.onCompletion(t, null);

Review Comment:
   Same as above, this also comes from a REST API call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to