vamossagar12 commented on code in PR #12802:
URL: https://github.com/apache/kafka/pull/12802#discussion_r1036857522


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1645,6 +1646,8 @@ private void startAndStop(Collection<Callable<Void>> 
callables) {
             startAndStopExecutor.invokeAll(callables);
         } catch (InterruptedException e) {
             // ignore
+        } catch (RejectedExecutionException re) {
+            log.error("startAndStopExecutor already shutdown or full. Not 
invoking explicit connector/task shutdown");

Review Comment:
   The reason I did this was that in case `RejectedExecutionException` does get 
thrown the rest of the logic in `DistributedHerder#halt` method doesn't get 
executed. Within halt, we stop the members and eventually when we stop the 
worker, the connectors and tasks that didn't get stopped, have another 
opportunity to get stopped. But none of that triggers today FWIU. 
   
   Regarding your suggestion about shutting down connectors/tasks _before_ 
shutting down the `startAndStopExecutor` I can do that as well but that would 
ideally need to happen from within the `stop` method IMO. To keep changes at a 
minimal, I did this. Let me know if it makes sense.
   
   Lastly, I increased the `START_AND_STOP_SHUTDOWN_TIMEOUT_MS` to 5 seconds 
from current 1s which shouldn't harm much IMO but gives a higher window for the 
connectors/tasks to close cleanly. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to