Otavio Rodolfo Piske created CAMEL-18003:
--------------------------------------------

             Summary: camel-kafka: manual pause of consumer causes 
ConcurrentModificationException
                 Key: CAMEL-18003
                 URL: https://issues.apache.org/jira/browse/CAMEL-18003
             Project: Camel
          Issue Type: Task
          Components: camel-health, camel-kafka
            Reporter: Otavio Rodolfo Piske
            Assignee: Otavio Rodolfo Piske


The Kafka consumer is not safe for concurrent access. We need to protect it 
from multiple different threads trying to access the instance on the component 
consumer. Otherwise:
{code:java}
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2469)
 ~[kafka-clients-3.1.0.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2453)
 ~[kafka-clients-3.1.0.jar:?]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.assignment(KafkaConsumer.java:891)
 ~[kafka-clients-3.1.0.jar:?]
        at 
org.apache.camel.component.kafka.KafkaFetchRecords.pause(KafkaFetchRecords.java:519)
 ~[camel-kafka-3.17.0-SNAPSHOT.jar:3.17.0-SNAPSHOT]
        at 
org.apache.camel.component.kafka.KafkaConsumer.doSuspend(KafkaConsumer.java:221)
 ~[camel-kafka-3.17.0-SNAPSHOT.jar:3.17.0-SNAPSHOT]
        at 
org.apache.camel.support.service.BaseService.suspend(BaseService.java:189) 
~[camel-api-3.17.0-SNAPSHOT.jar:3.17.0-SNAPSHOT]
        at 
org.apache.camel.support.service.ServiceHelper.suspendService(ServiceHelper.java:404)
 ~[camel-api-3.17.0-SNAPSHOT.jar:3.17.0-SNAPSHOT]
        at 
org.apache.camel.impl.engine.DefaultShutdownStrategy.suspendNow(DefaultShutdownStrategy.java:447)
 ~[camel-base-engine-3.17.0-SNAPSHOT.jar:3.17.0-SNAPSHOT]
        at 
org.apache.camel.impl.engine.DefaultShutdownStrategy$ShutdownTask.run(DefaultShutdownStrategy.java:620)
 ~[camel-base-engine-3.17.0-SNAPSHOT.jar:3.17.0-SNAPSHOT]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 
~[?:?]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 
~[?:?]
        at java.lang.Thread.run(Thread.java:833) ~[?:?]{code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to