Greg Harris created KAFKA-14311:
-----------------------------------
Summary: Connect Worker clean shutdown does not cleanly stop
connectors/tasks
Key: KAFKA-14311
URL: https://issues.apache.org/jira/browse/KAFKA-14311
Project: Kafka
Issue Type: Bug
Components: KafkaConnect
Affects Versions: 3.3.1
Reporter: Greg Harris
When the DistributedHerder::stop() method called, it triggers asynchronous
shutdown of the background herder thread, and continues with synchronous
shutdown of some other resources, including the stopAndStartExecutor.
This executor is responsible for cleanly stopping connectors and tasks, which
it the DistributedHerder::halt() method. There is a race condition between the
halt() method asynchronously submitting these connector/task stop jobs and the
stop() method terminating the executor. If the executor is terminated first,
this exception appears:
{noformat}
[2022-10-17 16:29:23,396] ERROR [Worker clientId=connect-2,
groupId=connect-integration-test-connect-cluster-1] Uncaught exception in
herder work thread, exiting:
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:366)
java.util.concurrent.RejectedExecutionException: Task
java.util.concurrent.FutureTask@62878e25[Not completed, task =
org.apache.kafka.connect.runtime.distributed.DistributedHerder$$Lambda$2285/0x00000008015046a8@58deade3]
rejected from java.util.concurrent.ThreadPoolExecutor@10351ac3[Terminated,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
at
java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
at
java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
at
java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
at
java.base/java.util.concurrent.AbstractExecutorService.invokeAll(AbstractExecutorService.java:247)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.startAndStop(DistributedHerder.java:1667)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.halt(DistributedHerder.java:765)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:361)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833){noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)