[
https://issues.apache.org/jira/browse/KAFKA-14311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chris Egerton updated KAFKA-14311:
----------------------------------
Fix Version/s: 3.5.0
> Connect Worker clean shutdown does not cleanly stop connectors/tasks
> --------------------------------------------------------------------
>
> Key: KAFKA-14311
> URL: https://issues.apache.org/jira/browse/KAFKA-14311
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Affects Versions: 3.3.1
> Reporter: Greg Harris
> Assignee: Sagar Rao
> Priority: Minor
> Fix For: 3.5.0
>
>
> When the DistributedHerder::stop() method called, it triggers asynchronous
> shutdown of the background herder thread, and continues with synchronous
> shutdown of some other resources, including the stopAndStartExecutor.
> This executor is responsible for cleanly stopping connectors and tasks, which
> it the DistributedHerder::halt() method. There is a race condition between
> the halt() method asynchronously submitting these connector/task stop jobs
> and the stop() method terminating the executor. If the executor is terminated
> first, this exception appears:
> {noformat}
> [2022-10-17 16:29:23,396] ERROR [Worker clientId=connect-2,
> groupId=connect-integration-test-connect-cluster-1] Uncaught exception in
> herder work thread, exiting:
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:366)
> java.util.concurrent.RejectedExecutionException: Task
> java.util.concurrent.FutureTask@62878e25[Not completed, task =
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$$Lambda$2285/0x00000008015046a8@58deade3]
> rejected from java.util.concurrent.ThreadPoolExecutor@10351ac3[Terminated,
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
> at
> java.base/java.util.concurrent.AbstractExecutorService.invokeAll(AbstractExecutorService.java:247)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startAndStop(DistributedHerder.java:1667)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.halt(DistributedHerder.java:765)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:361)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> at java.base/java.lang.Thread.run(Thread.java:833){noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)