vamossagar12 commented on code in PR #12802: URL: https://github.com/apache/kafka/pull/12802#discussion_r1036857522
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -1645,6 +1646,8 @@ private void startAndStop(Collection<Callable<Void>> callables) { startAndStopExecutor.invokeAll(callables); } catch (InterruptedException e) { // ignore + } catch (RejectedExecutionException re) { + log.error("startAndStopExecutor already shutdown or full. Not invoking explicit connector/task shutdown"); Review Comment: The reason I did this was that in case `RejectedExecutionException` does get thrown the rest of the logic in `DistributedHerder#halt` method doesn't get executed. Within halt, we stop the members and eventually when we stop the worker, the connectors and tasks that didn't get stopped, have another opportunity to get stopped. But none of that triggers today FWIU. Regarding your suggestion about shutting down connectors/tasks _before_ shutting down the `startAndStopExecutor` I can do that as well but that would ideally need to happen from within the `stop` method IMO. To keep changes at a minimal, I did this. Let me know if it makes sense. Lastly, I increased the `START_AND_STOP_SHUTDOWN_TIMEOUT_MS` to 5 seconds from current 1s which shouldn't harm much IMO but gives a higher window for the connectors/tasks to close cleanly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org