[ https://issues.apache.org/jira/browse/CAMEL-12722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16577103#comment-16577103 ]
michael elbaz commented on CAMEL-12722: --------------------------------------- Ok done so my conclusion about is that missing some synchronisation. The scenario was: 1 - Increase the producer loop message to 1500 2 - Starting this camel route {code:java} from("kafka:product?brokers=localhost:9092&groupId=testGroup&clientId=testClient&autoCommitEnable=false&allowManualCommit=true&autoOffsetReset=earliest") .routeId("foo") .to("seda:out"); from("seda:out") .to("log:TEST?level=WARN") .process(e -> { KafkaManualCommit manual = e.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); manual.commitSync(); }); {code} Then i got the `ConcurrentModificationException` > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > ---------------------------------------------------------------------------------------------- > > Key: CAMEL-12722 > URL: https://issues.apache.org/jira/browse/CAMEL-12722 > Project: Camel > Issue Type: Bug > Components: camel-kafka > Affects Versions: 2.22.0 > Reporter: michael elbaz > Priority: Major > > I got an exception when i try to use manual commit on camel kafka component > > {code:java} > from("kafka:{{kafka.hostname}}:{{kafka.port}}?topic={{kafka.topic}}&groupId={{kafka.consumer.groupid}}&autoCommitEnable=false&allowManualCommit=true") > .process(new Processor() { > @Override > public void process(Exchange exchange) throws Exception { > KafkaManualCommit manual = > > exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, > KafkaManualCommit.class); > manual.commitSync(); > } > }).to("...") > {code} > The exception > {code:java} > Trace: java.util.ConcurrentModificationException: KafkaConsumer is not safe > for multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824) > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255) > at > org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60) > at > org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)