Otavio Rodolfo Piske created CAMEL-18003:
--------------------------------------------
Summary: camel-kafka: manual pause of consumer causes
ConcurrentModificationException
Key: CAMEL-18003
URL: https://issues.apache.org/jira/browse/CAMEL-18003
Project: Camel
Issue Type: Task
Components: camel-health, camel-kafka
Reporter: Otavio Rodolfo Piske
Assignee: Otavio Rodolfo Piske
The Kafka consumer is not safe for concurrent access. We need to protect it
from multiple different threads trying to access the instance on the component
consumer. Otherwise:
{code:java}
java.util.ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2469)
~[kafka-clients-3.1.0.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2453)
~[kafka-clients-3.1.0.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.assignment(KafkaConsumer.java:891)
~[kafka-clients-3.1.0.jar:?]
at
org.apache.camel.component.kafka.KafkaFetchRecords.pause(KafkaFetchRecords.java:519)
~[camel-kafka-3.17.0-SNAPSHOT.jar:3.17.0-SNAPSHOT]
at
org.apache.camel.component.kafka.KafkaConsumer.doSuspend(KafkaConsumer.java:221)
~[camel-kafka-3.17.0-SNAPSHOT.jar:3.17.0-SNAPSHOT]
at
org.apache.camel.support.service.BaseService.suspend(BaseService.java:189)
~[camel-api-3.17.0-SNAPSHOT.jar:3.17.0-SNAPSHOT]
at
org.apache.camel.support.service.ServiceHelper.suspendService(ServiceHelper.java:404)
~[camel-api-3.17.0-SNAPSHOT.jar:3.17.0-SNAPSHOT]
at
org.apache.camel.impl.engine.DefaultShutdownStrategy.suspendNow(DefaultShutdownStrategy.java:447)
~[camel-base-engine-3.17.0-SNAPSHOT.jar:3.17.0-SNAPSHOT]
at
org.apache.camel.impl.engine.DefaultShutdownStrategy$ShutdownTask.run(DefaultShutdownStrategy.java:620)
~[camel-base-engine-3.17.0-SNAPSHOT.jar:3.17.0-SNAPSHOT]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
~[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
~[?:?]
at java.lang.Thread.run(Thread.java:833) ~[?:?]{code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)