vamossagar12 opened a new pull request, #12802: URL: https://github.com/apache/kafka/pull/12802
When the DistributedHerder::stop() method called, it triggers asynchronous shutdown of the background herder thread, and continues with synchronous shutdown of some other resources, including the stopAndStartExecutor. This executor is responsible for cleanly stopping connectors and tasks, which it the DistributedHerder::halt() method. There is a race condition between the halt() method asynchronously submitting these connector/task stop jobs and the stop() method terminating the executor. If the executor is terminated first, this exception appears: ``` [2022-10-17 16:29:23,396] ERROR [Worker clientId=connect-2, groupId=connect-integration-test-connect-cluster-1] Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:366) java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@62878e25[Not completed, task = org.apache.kafka.connect.runtime.distributed.DistributedHerder$$Lambda$2285/0x00000008015046a8@58deade3] rejected from java.util.concurrent.ThreadPoolExecutor@10351ac3[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1] at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065) at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833) at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365) at java.base/java.util.concurrent.AbstractExecutorService.invokeAll(AbstractExecutorService.java:247) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startAndStop(DistributedHerder.java:1667) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.halt(DistributedHerder.java:765) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:361) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) ``` This PR aims to handle the above exception cleanly and log the error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
