James Netherton created CAMEL-10594:
---------------------------------------

             Summary: Kafka consumer stays alive when camel context is shut down
                 Key: CAMEL-10594
                 URL: https://issues.apache.org/jira/browse/CAMEL-10594
             Project: Camel
          Issue Type: Bug
          Components: camel-kafka
            Reporter: James Netherton


I happened to be running some camel-kafka unit tests with the log level set to 
DEBUG and noticed that the KafkaConsumer is not shut down correctly.

When the Camel Kafka consumer is stopped, it invokes shutdownNow() on the 
ExecutorService. But this does not guarantee any running threads will be 
terminated.

This is a bit of an issue when Camel runs in a container like Karaf or WildFly 
because the 
[KafkaFetchRecords|https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java#L94]
 thread just keeps on running for the lifetime of the JVM. 

It's simple to reproduce in a unit test:

* Enable DEBUG log level
* Start a Camel context with a Kafka consumer endpoint
* Stop the camel context
* Thread.sleep for some time (10 seconds or whatever). Then notice exception in 
the log output:

{code}
07:09:44,247 DEBUG [org.apache.kafka.clients.NetworkClient] (Camel 
(camel-36) thread #134 - KafkaConsumer[test]) Error connecting to node 1 at 
localhost:9092:: java.nio.channels.ClosedByInterruptException
        at 
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:659)
        at org.apache.kafka.common.network.Selector.connect(Selector.java:168)
        at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:498)
        at 
org.apache.kafka.clients.NetworkClient.access$400(NetworkClient.java:48)
        at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:645)
        at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:552)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:258)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:183)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:974)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
        at 
org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:130)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to