[ 
https://issues.apache.org/jira/browse/CAMEL-14010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16936893#comment-16936893
 ] 

Srikant Mantha edited comment on CAMEL-14010 at 9/24/19 3:00 PM:
-----------------------------------------------------------------

Okay, I was modifying the content, didn't see your comments on the priority. I 
will try with the latest camel-kafka version


was (Author: srikant_mantha):
Okay, I was modifying the context, didn't see your comments on the priority. I 
will try with the latest camel-kafka version

> Camel-Kafka ConsumerCount drops to 1 (default) from the defined value
> ---------------------------------------------------------------------
>
>                 Key: CAMEL-14010
>                 URL: https://issues.apache.org/jira/browse/CAMEL-14010
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 2.22.0
>            Reporter: Srikant Mantha
>            Priority: Major
>
> Iam using kafka consumer in our application integrated with Camel. 
>  We consume the messages and send the data to the server for processing.
> There is one topic "*XYZ*" defined with *30* partitions and I have assigned 
> *15* as consumer count on each consumer node (total 2 instances)
> /*** Camel Consumer Configuration ***/
> {code:java}
> kafka.consumersCount=15
>  kafka.consumerStreams=15{code}
> {{I see from the logs that when the consumer starts, there are *15* consumer 
> threads (lets say on 1 node), which is good as configured.I see from the logs 
> that when the consumer starts, there are 15 consumer threads (lets say on 1 
> node), which is good as configured.}}
>  
>  
> {code:java}
> {code}
> {{INFO  Camel (camel-1) thread #2 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 0 to 
> topic XYZ }}
> {{INFO  Camel (camel-1) thread #3 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 1 to 
> topic XYZ }}
> {{INFO  Camel (camel-1) thread #4 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 2 to 
> topic XYZ }}
> {{INFO  Camel (camel-1) thread #5 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 3 to 
> topic XYZ }}
> {{INFO  Camel (camel-1) thread #6 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 4 to 
> topic XYZ }}
> {{INFO  Camel (camel-1) thread #7 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 5 to 
> topic XYZ }}
> {{INFO  Camel (camel-1) thread #8 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 6 to 
> topic XYZ }}
> {{INFO  Camel (camel-1) thread #9 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 7 to 
> topic XYZ }}
> {{INFO  Camel (camel-1) thread #10 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 8 to 
> topic XYZ }}
> {{INFO  Camel (camel-1) thread #11 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 9 to 
> topic XYZ }}
> {{INFO  Camel (camel-1) thread #12 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 10 to 
> topic XYZ }}
> {{INFO  Camel (camel-1) thread #13 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 11 to 
> topic XYZ }}
> {{INFO  Camel (camel-1) thread #14 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 12 to 
> topic XYZ }}
> {{INFO  Camel (camel-1) thread #15 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 13 to 
> topic XYZ }}
> {{INFO  Camel (camel-1) thread #16 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 14 to 
> topic XYZ}}
>  
> If server stops responding due to network issue or any other scenario when 
> the server is unavailable, then all the kafka consumers starts unsubscribing 
> which is again an expected behavior (so far good)
> Note: We have defined the Camel
> {code:java}
> ThrottlingExceptionRoutePolicy {code}
> which does a health check call on the server before sending the consumed 
> message.
>  Once the server is back and available, *I see that not all 15 consumer 
> threads are active*, but only 1 (*I guess this is the default value*). 
> From the logs below, I observe that the consumerss are getting subscribed and 
> unsubscribed one by one from the topic and finally the application runs with 
> only a single consumer count. This is really strange to see.
> {{INFO  Camel (camel-1) thread #17 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 0 to 
> topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{auto.commit.interval.ms = 5000 }}
> {{auto.offset.reset = latest }}
> {{bootstrap.servers = [ListOfDefinedServers] }}
> {{check.crcs = true }}
> {{client.id =  }}
> {{connections.max.idle.ms = 540000 }}
> {{enable.auto.commit = false }}
> {{exclude.internal.topics = true }}
> {{fetch.max.bytes = 52428800 }}
> {{fetch.max.wait.ms = 500 }}
> {{fetch.min.bytes = 1 }}
> {{group.id = XYZ-GroupId-12345 }}
> {{heartbeat.interval.ms = 3000 }}
> {{interceptor.classes = null }}
> {{internal.leave.group.on.close = true }}
> {{isolation.level = read_uncommitted }}
> {{key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer }}
> {{max.partition.fetch.bytes = 1048576 }}
> {{max.poll.interval.ms = 300000 }}
> {{max.poll.records = 500 }}
> {{metadata.max.age.ms = 5000 }}
> {{metric.reporters = [] metrics.num.samples = 2 }}
> {{metrics.recording.level = INFO }}
> {{metrics.sample.window.ms = 30000}}
> {{partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor] }}
> {{receive.buffer.bytes = 65536 }}
> {{reconnect.backoff.max.ms = 1000 }}
> {{reconnect.backoff.ms = 50 }}
> {{request.timeout.ms = 40000 }}
> {{retry.backoff.ms = 100 }}
> {{sasl.jaas.config = null }}
> {{sasl.kerberos.kinit.cmd = /usr/bin/kinit }}
> {{sasl.kerberos.min.time.before.relogin = 60000 }}
> {{sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 
> sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI 
> security.protocol = SSL send.buffer.bytes = 131072 session.timeout.ms = 10000 
> ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
> ssl.endpoint.identification.algorithm = HTTPS ssl.key.password = [hidden] 
> ssl.keymanager.algorithm = SunX509 ssl.keystore.location = cert.jks 
> ssl.keystore.password = [hidden] ssl.keystore.type = JKS ssl.protocol = TLS 
> ssl.provider = null ssl.secure.random.implementation = null 
> ssl.trustmanager.algorithm = PKIX ssl.truststore.location = cert.jks 
> ssl.truststore.password = [hidden] ssl.truststore.type = JKS 
> value.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer }}
> {{INFO  Camel (camel-1) thread #17 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 0 
> from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined INFO  Camel 
> (camel-1) thread #18 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 1 to 
> topic XYZ INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] 
> ConsumerConfig values: Prints all the consumer config values 
> defined/undefined INFO  Camel (camel-1) thread #18 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 1 
> from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #19 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 2 to 
> topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #19 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 2 
> from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #20 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 3 to 
> topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #20 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 3 
> from topic XYZ}}
> INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined
> {{INFO  Camel (camel-1) thread #21 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 4 to 
> topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #21 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 4 
> from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #22 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 5 to 
> topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #22 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 5 
> from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #23 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 6 to 
> topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #23 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 6 
> from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #24 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 7 to 
> topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #24 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 7 
> from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #25 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 8 to 
> topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #25 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 8 
> from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #26 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 9 to 
> topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #26 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 9 
> from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #27 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 10 to 
> topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #27 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 10 
> from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #28 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 11 to 
> topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #28 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 11 
> from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #29 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 12 to 
> topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #29 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 12 
> from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #30 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 13 to 
> topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #30 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 13 
> from topic XYZ }}
> {{INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig 
> values: Prints all the consumer config values defined/undefined }}
> {{INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] 
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 14 to 
> topic XYZ INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] 
> [clients.consumer.internals.AbstractCoordinator(info:341)] [Consumer 
> clientId=consumer-30, groupId=XYZ-GroupId-12345] Discovered group coordinator 
> servername (id: 2147482644 rack: null) INFO  Camel (camel-1) thread #31 - 
> KafkaConsumer[XYZ] [clients.consumer.internals.ConsumerCoordinator(info:341)] 
> [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] Revoking 
> previously assigned partitions [] INFO  Camel (camel-1) thread #31 - 
> KafkaConsumer[XYZ] [clients.consumer.internals.AbstractCoordinator(info:336)] 
> [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] (Re-)joining group 
> INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] 
> [clients.consumer.internals.AbstractCoordinator(info:341)] [Consumer 
> clientId=consumer-30, groupId=XYZ-GroupId-12345] Successfully joined group 
> with generation 1 INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ] 
> [clients.consumer.internals.ConsumerCoordinator(info:341)] [Consumer 
> clientId=consumer-30, groupId=XYZ-GroupId-12345] Setting newly assigned 
> partitions [XYZ-17, XYZ-19, XYZ-13, XYZ-15, XYZ-25, XYZ-27, XYZ-21, XYZ-23, 
> XYZ-1, XYZ-3, XYZ-28, XYZ-9, XYZ-11, XYZ-5, XYZ-7, XYZ-16, XYZ-18, XYZ-12, 
> XYZ-14, XYZ-24, XYZ-26, XYZ-20, XYZ-22, XYZ-0, XYZ-2, XYZ-29, XYZ-8, XYZ-10, 
> XYZ-4, XYZ-6]}}
>  
> How to fix this issue ?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to