[ 
https://issues.apache.org/jira/browse/CAMEL-12722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16577103#comment-16577103
 ] 

michael elbaz edited comment on CAMEL-12722 at 8/11/18 8:33 PM:
----------------------------------------------------------------

Ok done so in my opinion is about missing some synchronisation (the same 
kafkaconsumer is used in the from ?).

The scenario was:

1 - Increase the producer loop message to 1500

2 - Starting this camel route (The same with the test i just create another one 
to be sure)
{code:java}
from("kafka:product?brokers=localhost:9092&groupId=testGroup&clientId=testClient&autoCommitEnable=false&allowManualCommit=true&autoOffsetReset=earliest")
        .routeId("foo")
        .to("seda:out");

from("seda:out")
        .to("log:TEST?level=WARN")
        .process(e -> {
            KafkaManualCommit manual = 
e.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
            manual.commitSync();
        });
{code}
Then i got the `ConcurrentModificationException`

(More precision it is not happen all the time like i say in my first message)


was (Author: michael992):
Ok done so my conclusion about is that missing some synchronisation.

The scenario was:

1 - Increase the producer loop message to 1500

2 - Starting this camel route (The same with the test i just create another one 
to be sure)
{code:java}
from("kafka:product?brokers=localhost:9092&groupId=testGroup&clientId=testClient&autoCommitEnable=false&allowManualCommit=true&autoOffsetReset=earliest")
        .routeId("foo")
        .to("seda:out");

from("seda:out")
        .to("log:TEST?level=WARN")
        .process(e -> {
            KafkaManualCommit manual = 
e.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
            manual.commitSync();
        });
{code}
Then i got the `ConcurrentModificationException`

(More precision it is not happen all the time like i say in my first message)

> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
> ----------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-12722
>                 URL: https://issues.apache.org/jira/browse/CAMEL-12722
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 2.22.0
>            Reporter: michael elbaz
>            Priority: Major
>
> I got an exception when i try to use manual commit on camel kafka component
>  
> {code:java}
> from("kafka:{{kafka.hostname}}:{{kafka.port}}?topic={{kafka.topic}}&groupId={{kafka.consumer.groupid}}&autoCommitEnable=false&allowManualCommit=true")
>             .process(new Processor() {
>                 @Override
>                 public void process(Exchange exchange) throws Exception {
>                     KafkaManualCommit manual =
>                             
> exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, 
> KafkaManualCommit.class);
>                     manual.commitSync();
>                 }
>             }).to("...")
> {code}
> The exception
> {code:java}
> Trace: java.util.ConcurrentModificationException: KafkaConsumer is not safe 
> for multi-threaded access
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255)
>     at 
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitOffset(DefaultKafkaManualCommit.java:60)
>     at 
> org.apache.camel.component.kafka.DefaultKafkaManualCommit.commitSync(DefaultKafkaManualCommit.java:51)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to