vamossagar12 commented on code in PR #13530:
URL: https://github.com/apache/kafka/pull/13530#discussion_r1163797689


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -296,7 +301,12 @@ public synchronized void restartConnector(String connName, 
Callback<Void> cb) {
 
         startConnector(connName, (error, targetState) -> {
             if (targetState == TargetState.STARTED) {
-                requestTaskReconfiguration(connName);
+                try {
+                    requestTaskReconfiguration(connName);
+                } catch (Throwable t) {
+                    cb.onCompletion(t, null);

Review Comment:
   Same here. Should we log it?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -229,7 +229,12 @@ private synchronized void putConnectorConfig(String 
connName,
                 }
 
                 requestExecutorService.submit(() -> {
-                    updateConnectorTasks(connName);
+                    try {
+                        updateConnectorTasks(connName);
+                    } catch (Throwable t) {
+                        callback.onCompletion(t, null);

Review Comment:
   Should we log the exception as well? I think that's what the JIRA ticket 
also talks about.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -402,6 +403,44 @@ public void testRestartConnectorFailureOnStart() throws 
Exception {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testRestartConnectorWithTaskConfigsException() throws 
Exception {
+        expectAdd(SourceSink.SOURCE);
+
+        Map<String, String> config = connectorConfig(SourceSink.SOURCE);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, config);
+
+        worker.stopAndAwaitConnector(CONNECTOR_NAME);
+        EasyMock.expectLastCall();
+
+        Capture<Callback<TargetState>> onStart = EasyMock.newCapture();
+        worker.startConnector(eq(CONNECTOR_NAME), eq(config), 
EasyMock.anyObject(HerderConnectorContext.class),
+                eq(herder), eq(TargetState.STARTED), 
EasyMock.capture(onStart));
+        EasyMock.expectLastCall().andAnswer(() -> {
+            onStart.getValue().onCompletion(null, TargetState.STARTED);
+            return true;
+        });
+        
EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONNECTOR_NAME));
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
+
+        ConnectException taskConfigsException = new ConnectException("Test 
exception");
+        EasyMock.expect(worker.connectorTaskConfigs(eq(CONNECTOR_NAME), 
anyObject(ConnectorConfig.class))).andThrow(taskConfigsException);
+
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
+        assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());

Review Comment:
   I may be missing something here but shouldn't the callback have an error 
here as well because `worker.connectorTaskConfigs` always throws a 
taskConfigsException? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to