[ 
https://issues.apache.org/jira/browse/CAMEL-13140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aleksei Vasilevskii updated CAMEL-13140:
----------------------------------------
    Attachment: Kafka_Brokers_CPU.png

> camel-kafka - consumer does not respect auto.commit.interval.ms with 
> AutoCommitEnabled=true
> -------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-13140
>                 URL: https://issues.apache.org/jira/browse/CAMEL-13140
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 2.23.0
>            Reporter: Aleksei Vasilevskii
>            Priority: Major
>         Attachments: Kafka_Brokers_CPU.png
>
>
> This is probably a side effect of CAMEL-12454: when auto commit enabled is 
> enabled kafka consumer commits offsets as soon as an exchange is complete 
> with no regard to the auto.commit.interval.ms setting, which may cause 
> additional non-balanced load on the kafka cluster (see attached screenshot 
> depicting kafka brokers cpu load right after upgrade from 2.20.3 to 2.23.0).
> Here is an example with debugging turned on for 
> org.apache.camel.component.kafka.KafkaConsumer, as you can see Kafka consumer 
> commits every 10-500 ms instead of once per 5 seconds:
> {code:java}
> 2019-01-28 10:46:55.025  INFO 3324 --- [ontext_Worker-3] 
> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
>       auto.commit.interval.ms = 5000
>       auto.offset.reset = latest
>       bootstrap.servers = [192.168.56.10:9093]
>       check.crcs = true
>       client.id = 
>       connections.max.idle.ms = 540000
>       enable.auto.commit = true
>       exclude.internal.topics = true
>       fetch.max.bytes = 52428800
>       fetch.max.wait.ms = 500
>       fetch.min.bytes = 1
>       group.id = service_new
>       heartbeat.interval.ms = 3000
>       interceptor.classes = null
>       internal.leave.group.on.close = true
>       isolation.level = read_uncommitted
>       key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
>       max.partition.fetch.bytes = 1048576
>       max.poll.interval.ms = 300000
>       max.poll.records = 1000
>       metadata.max.age.ms = 300000
>       metric.reporters = []
>       metrics.num.samples = 2
>       metrics.recording.level = INFO
>       metrics.sample.window.ms = 30000
>       partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
>       receive.buffer.bytes = 65536
>       reconnect.backoff.max.ms = 1000
>       reconnect.backoff.ms = 50
>       request.timeout.ms = 40000
>       retry.backoff.ms = 100
>       sasl.jaas.config = null
>       sasl.kerberos.kinit.cmd = /usr/bin/kinit
>       sasl.kerberos.min.time.before.relogin = 60000
>       sasl.kerberos.service.name = null
>       sasl.kerberos.ticket.renew.jitter = 0.05
>       sasl.kerberos.ticket.renew.window.factor = 0.8
>       sasl.mechanism = GSSAPI
>       security.protocol = SSL
>       send.buffer.bytes = 131072
>       session.timeout.ms = 30000
>       ssl.cipher.suites = null
>       ssl.enabled.protocols = [TLSv1.2]
>       ssl.endpoint.identification.algorithm = null
>       ssl.key.password = null
>       ssl.keymanager.algorithm = SunX509
>       ssl.keystore.location = /usr/files/server.jks
>       ssl.keystore.password = [hidden]
>       ssl.keystore.type = JCEKS
>       ssl.protocol = TLS
>       ssl.provider = null
>       ssl.secure.random.implementation = null
>       ssl.trustmanager.algorithm = PKIX
>       ssl.truststore.location = /usr/files/truststore.jks
>       ssl.truststore.password = [hidden]
>       ssl.truststore.type = JCEKS
>       value.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
> 2019-01-28 10:46:55.160  INFO 3324 --- [ontext_Worker-3] 
> o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.2
> 2019-01-28 10:46:55.160  INFO 3324 --- [ontext_Worker-3] 
> o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : 2a121f7b1d402825
> 2019-01-28 10:46:55.160  INFO 3324 --- [ontext_Worker-3] 
> o.a.camel.spring.SpringCamelContext      : Route: route1 started and 
> consuming from: kafka:topic1,topic2,topic3?brokers=192.168.56.10:9093
> 2019-01-28 10:46:55.161  INFO 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Subscribing 
> topic1,topic2,topic3-Thread 0 to topic topic1,topic2,topic3
> 2019-01-28 10:46:55.161  INFO 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Subscribing 
> topic1,topic2,topic3-Thread 0 to topic topic1,topic2,topic3
> 2019-01-28 10:46:55.313  INFO 3324 --- [uponassignment]] 
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, 
> groupId=service_new] Discovered group coordinator 192.168.56.10:9093 (id: 
> 2147483646 rack: null)
> 2019-01-28 10:46:55.315  INFO 3324 --- [uponassignment]] 
> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, 
> groupId=service_new] Revoking previously assigned partitions []
> 2019-01-28 10:46:55.316  INFO 3324 --- [uponassignment]] 
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, 
> groupId=service_new] (Re-)joining group
> 2019-01-28 10:46:58.469  INFO 3324 --- [uponassignment]] 
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, 
> groupId=service_new] Successfully joined group with generation 3
> 2019-01-28 10:46:58.470  INFO 3324 --- [uponassignment]] 
> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, 
> groupId=service_new] Setting newly assigned partitions [topic3-1, topic3-0, 
> topic2-1, topic1-0, topic2-0, topic1-1]
> 2019-01-28 10:47:24.953 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3221
> 2019-01-28 10:47:24.953 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3221
> 2019-01-28 10:47:24.957 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3222
> 2019-01-28 10:47:24.957 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3222
> 2019-01-28 10:47:24.964 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3325
> 2019-01-28 10:47:24.964 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3325
> 2019-01-28 10:47:25.005 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3223
> 2019-01-28 10:47:25.005 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3223
> 2019-01-28 10:47:25.515 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3326
> 2019-01-28 10:47:25.515 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3326
> 2019-01-28 10:47:26.005 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3224
> 2019-01-28 10:47:26.005 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3224
> 2019-01-28 10:47:26.528 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3327
> 2019-01-28 10:47:26.528 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3327
> 2019-01-28 10:47:27.014 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3225
> 2019-01-28 10:47:27.014 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3225
> 2019-01-28 10:47:27.512 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3328
> 2019-01-28 10:47:27.512 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3328
> 2019-01-28 10:47:28.001 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3226
> 2019-01-28 10:47:28.001 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3226
> 2019-01-28 10:47:28.497 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3329
> 2019-01-28 10:47:28.497 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3329
> 2019-01-28 10:47:29.024 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3330
> 2019-01-28 10:47:29.024 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3330
> 2019-01-28 10:47:29.521 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3227
> 2019-01-28 10:47:29.521 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3227
> 2019-01-28 10:47:30.012 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3228
> 2019-01-28 10:47:30.012 DEBUG 3324 --- [uponassignment]] 
> o.a.camel.component.kafka.KafkaConsumer  : Auto commitSync 
> topic1,topic2,topic3-Thread 0 from topic topic1,topic2,topic3 with offset: 
> 3228
> {code}
> When downgraded to camel-kafka 2.20.3 commits are done correctly (once every 
> auto.commit.interval.ms) and there are no messages from 
> o.a.camel.component.kafka.KafkaConsumer in the debug log.
> Running with debugger showed that this code actually never gets executed in 
> 2.20.3: 
> https://github.com/apache/camel/blob/2.20.x/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java#L382
>  and we seem to rely on the auto-commit feature of the Kafka client itself, 
> not the camel-kafka wrapper.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to