[
https://issues.apache.org/jira/browse/CAMEL-14010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Srikant Mantha updated CAMEL-14010:
-----------------------------------
Description:
Iam using kafka consumer in our application integrated with Camel.
We consume the messages and send the data to the server for processing.
There is one topic "*XYZ*" defined with *30* partitions and I have assigned
*15* as consumer count on each consumer node (total 2 instances)
/*** Camel Consumer Configuration ***/
{code:java}
kafka.consumersCount=15
kafka.consumerStreams=15{code}
{{I see from the logs that when the consumer starts, there are *15* consumer
threads (lets say on 1 node), which is good as configured.I see from the logs
that when the consumer starts, there are 15 consumer threads (lets say on 1
node), which is good as configured.}}
{code:java}
INFO Camel (camel-1) thread #2 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 0 to
topic XYZ INFO Camel (camel-1) thread #3 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 1 to
topic XYZ INFO Camel (camel-1) thread #4 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 2 to
topic XYZ INFO Camel (camel-1) thread #5 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 3 to
topic XYZ INFO Camel (camel-1) thread #6 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 4 to
topic XYZ INFO Camel (camel-1) thread #7 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 5 to
topic XYZ INFO Camel (camel-1) thread #8 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 6 to
topic XYZ INFO Camel (camel-1) thread #9 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 7 to
topic XYZ INFO Camel (camel-1) thread #10 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 8 to
topic XYZ INFO Camel (camel-1) thread #11 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 9 to
topic XYZ INFO Camel (camel-1) thread #12 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 10 to
topic XYZ INFO Camel (camel-1) thread #13 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 11 to
topic XYZ INFO Camel (camel-1) thread #14 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 12 to
topic XYZ INFO Camel (camel-1) thread #15 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 13 to
topic XYZ INFO Camel (camel-1) thread #16 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 14 to
topic XYZ{code}
If server stops responding due to network issue or any other scenario when the
server is unavailable, then all the kafka consumers starts unsubscribing which
is again an expected behavior (so far good)
Note: We have defined the Camel
{code:java}
ThrottlingExceptionRoutePolicy {code}
which does a health check call on the server before sending the consumed
message.
Once the server is back and available, *I see that not all 15 consumer threads
are active*, but only 1 (*I guess this is the default value*).
>From the logs below, I observe that the consumerss are getting subscribed and
>unsubscribed one by one from the topic and finally the application runs with
>only a single consumer count. This is really strange to see.
{code:java}
INFO Camel (camel-1) thread #17 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 0 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers =
[ListOfDefinedServers] check.crcs = true client.id = connections.max.idle.ms =
540000 enable.auto.commit = false exclude.internal.topics = true
fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id
= XYZ-GroupId-12345 heartbeat.interval.ms = 3000 interceptor.classes = null
internal.leave.group.on.close = true isolation.level = read_uncommitted
key.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000
max.poll.records = 500 metadata.max.age.ms = 5000 metric.reporters = []
metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms
= 30000 partition.assignment.strategy =
[org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms =
40000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd =
/usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI
security.protocol = SSL send.buffer.bytes = 131072 session.timeout.ms = 10000
ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = HTTPS ssl.key.password = [hidden]
ssl.keymanager.algorithm = SunX509 ssl.keystore.location = cert.jks
ssl.keystore.password = [hidden] ssl.keystore.type = JKS ssl.protocol = TLS
ssl.provider = null ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX ssl.truststore.location = cert.jks
ssl.truststore.password = [hidden] ssl.truststore.type = JKS value.deserializer
= class org.apache.kafka.common.serialization.StringDeserializer INFO Camel
(camel-1) thread #17 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 0
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #18 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 1 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #18 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 1
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #19 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 2 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #19 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 2
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #20 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 3 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #20 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 3
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #21 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 4 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #21 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 4
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #22 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 5 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #22 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 5
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #23 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 6 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #23 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 6
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #24 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 7 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #24 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 7
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #25 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 8 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #25 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 8
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #26 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 9 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #26 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 9
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #27 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 10 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #27 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 10
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #28 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 11 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #28 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 11
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #29 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 12 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #29 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 12
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #30 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 13 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #30 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 13
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #31 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 14 to
topic XYZ INFO Camel (camel-1) thread #31 - KafkaConsumer[XYZ]
[clients.consumer.internals.AbstractCoordinator(info:341)] [Consumer
clientId=consumer-30, groupId=XYZ-GroupId-12345] Discovered group coordinator
servername (id: 2147482644 rack: null) INFO Camel (camel-1) thread #31 -
KafkaConsumer[XYZ] [clients.consumer.internals.ConsumerCoordinator(info:341)]
[Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] Revoking previously
assigned partitions [] INFO Camel (camel-1) thread #31 - KafkaConsumer[XYZ]
[clients.consumer.internals.AbstractCoordinator(info:336)] [Consumer
clientId=consumer-30, groupId=XYZ-GroupId-12345] (Re-)joining group INFO Camel
(camel-1) thread #31 - KafkaConsumer[XYZ]
[clients.consumer.internals.AbstractCoordinator(info:341)] [Consumer
clientId=consumer-30, groupId=XYZ-GroupId-12345] Successfully joined group with
generation 1 INFO Camel (camel-1) thread #31 - KafkaConsumer[XYZ]
[clients.consumer.internals.ConsumerCoordinator(info:341)] [Consumer
clientId=consumer-30, groupId=XYZ-GroupId-12345] Setting newly assigned
partitions [XYZ-17, XYZ-19, XYZ-13, XYZ-15, XYZ-25, XYZ-27, XYZ-21, XYZ-23,
XYZ-1, XYZ-3, XYZ-28, XYZ-9, XYZ-11, XYZ-5, XYZ-7, XYZ-16, XYZ-18, XYZ-12,
XYZ-14, XYZ-24, XYZ-26, XYZ-20, XYZ-22, XYZ-0, XYZ-2, XYZ-29, XYZ-8, XYZ-10,
XYZ-4, XYZ-6]
{code}
was:
Iam using kafka consumer in our application integrated with Camel.
We consume the messages and send the data to the server for processing.
There is one topic "*XYZ*" defined with *30* partitions and I have assigned
*15* as consumer count on each consumer node (total 2 instances)
/*** Camel Consumer Configuration ***/
{code:java}
kafka.consumersCount=15
kafka.consumerStreams=15{code}
{{I see from the logs that when the consumer starts, there are *15* consumer
threads (lets say on 1 node), which is good as configured.I see from the logs
that when the consumer starts, there are 15 consumer threads (lets say on 1
node), which is good as configured.}}
{quote}
{code:java}
INFO Camel (camel-1) thread #2 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 0 to
topic XYZ INFO Camel (camel-1) thread #3 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 1 to
topic XYZ INFO Camel (camel-1) thread #4 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 2 to
topic XYZ INFO Camel (camel-1) thread #5 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 3 to
topic XYZ INFO Camel (camel-1) thread #6 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 4 to
topic XYZ INFO Camel (camel-1) thread #7 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 5 to
topic XYZ INFO Camel (camel-1) thread #8 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 6 to
topic XYZ INFO Camel (camel-1) thread #9 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 7 to
topic XYZ INFO Camel (camel-1) thread #10 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 8 to
topic XYZ INFO Camel (camel-1) thread #11 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 9 to
topic XYZ INFO Camel (camel-1) thread #12 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 10 to
topic XYZ INFO Camel (camel-1) thread #13 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 11 to
topic XYZ INFO Camel (camel-1) thread #14 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 12 to
topic XYZ INFO Camel (camel-1) thread #15 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 13 to
topic XYZ INFO Camel (camel-1) thread #16 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 14 to
topic XYZ{code}
{quote}
If server stops responding due to network issue or any other scenario when the
server is unavailable, then all the kafka consumers starts unsubscribing which
is again an expected behavior (so far good)
Note: We have defined the Camel
{code:java}
ThrottlingExceptionRoutePolicy {code}
which does a health check call on the server before sending the consumed
message.
Once the server is back and available, *I see that not all 15 consumer threads
are active*, but only 1 (*I guess this is the default value*).
>From the logs below, I observe that the consumerss are getting subscribed and
>unsubscribed one by one from the topic and finally the application runs with
>only a single consumer count. This is really strange to see.
{code:java}
INFO Camel (camel-1) thread #17 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 0 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers =
[ListOfDefinedServers] check.crcs = true client.id = connections.max.idle.ms =
540000 enable.auto.commit = false exclude.internal.topics = true
fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id
= XYZ-GroupId-12345 heartbeat.interval.ms = 3000 interceptor.classes = null
internal.leave.group.on.close = true isolation.level = read_uncommitted
key.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000
max.poll.records = 500 metadata.max.age.ms = 5000 metric.reporters = []
metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms
= 30000 partition.assignment.strategy =
[org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms =
40000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd =
/usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI
security.protocol = SSL send.buffer.bytes = 131072 session.timeout.ms = 10000
ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = HTTPS ssl.key.password = [hidden]
ssl.keymanager.algorithm = SunX509 ssl.keystore.location = cert.jks
ssl.keystore.password = [hidden] ssl.keystore.type = JKS ssl.protocol = TLS
ssl.provider = null ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX ssl.truststore.location = cert.jks
ssl.truststore.password = [hidden] ssl.truststore.type = JKS value.deserializer
= class org.apache.kafka.common.serialization.StringDeserializer INFO Camel
(camel-1) thread #17 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 0
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #18 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 1 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #18 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 1
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #19 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 2 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #19 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 2
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #20 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 3 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #20 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 3
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #21 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 4 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #21 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 4
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #22 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 5 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #22 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 5
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #23 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 6 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #23 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 6
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #24 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 7 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #24 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 7
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #25 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 8 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #25 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 8
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #26 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 9 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #26 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 9
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #27 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 10 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #27 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 10
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #28 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 11 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #28 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 11
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #29 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 12 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #29 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 12
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #30 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 13 to
topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #30 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 13
from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values defined/undefined
INFO Camel (camel-1) thread #31 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 14 to
topic XYZ INFO Camel (camel-1) thread #31 - KafkaConsumer[XYZ]
[clients.consumer.internals.AbstractCoordinator(info:341)] [Consumer
clientId=consumer-30, groupId=XYZ-GroupId-12345] Discovered group coordinator
servername (id: 2147482644 rack: null) INFO Camel (camel-1) thread #31 -
KafkaConsumer[XYZ] [clients.consumer.internals.ConsumerCoordinator(info:341)]
[Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] Revoking previously
assigned partitions [] INFO Camel (camel-1) thread #31 - KafkaConsumer[XYZ]
[clients.consumer.internals.AbstractCoordinator(info:336)] [Consumer
clientId=consumer-30, groupId=XYZ-GroupId-12345] (Re-)joining group INFO Camel
(camel-1) thread #31 - KafkaConsumer[XYZ]
[clients.consumer.internals.AbstractCoordinator(info:341)] [Consumer
clientId=consumer-30, groupId=XYZ-GroupId-12345] Successfully joined group with
generation 1 INFO Camel (camel-1) thread #31 - KafkaConsumer[XYZ]
[clients.consumer.internals.ConsumerCoordinator(info:341)] [Consumer
clientId=consumer-30, groupId=XYZ-GroupId-12345] Setting newly assigned
partitions [XYZ-17, XYZ-19, XYZ-13, XYZ-15, XYZ-25, XYZ-27, XYZ-21, XYZ-23,
XYZ-1, XYZ-3, XYZ-28, XYZ-9, XYZ-11, XYZ-5, XYZ-7, XYZ-16, XYZ-18, XYZ-12,
XYZ-14, XYZ-24, XYZ-26, XYZ-20, XYZ-22, XYZ-0, XYZ-2, XYZ-29, XYZ-8, XYZ-10,
XYZ-4, XYZ-6]
{code}
Priority: Critical (was: Major)
> Camel-Kafka ConsumerCount drops to 1 (default) from the defined value
> ---------------------------------------------------------------------
>
> Key: CAMEL-14010
> URL: https://issues.apache.org/jira/browse/CAMEL-14010
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 2.22.0
> Reporter: Srikant Mantha
> Priority: Critical
>
> Iam using kafka consumer in our application integrated with Camel.
> We consume the messages and send the data to the server for processing.
> There is one topic "*XYZ*" defined with *30* partitions and I have assigned
> *15* as consumer count on each consumer node (total 2 instances)
> /*** Camel Consumer Configuration ***/
> {code:java}
> kafka.consumersCount=15
> kafka.consumerStreams=15{code}
> {{I see from the logs that when the consumer starts, there are *15* consumer
> threads (lets say on 1 node), which is good as configured.I see from the logs
> that when the consumer starts, there are 15 consumer threads (lets say on 1
> node), which is good as configured.}}
> {code:java}
> INFO Camel (camel-1) thread #2 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 0 to
> topic XYZ INFO Camel (camel-1) thread #3 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 1 to
> topic XYZ INFO Camel (camel-1) thread #4 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 2 to
> topic XYZ INFO Camel (camel-1) thread #5 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 3 to
> topic XYZ INFO Camel (camel-1) thread #6 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 4 to
> topic XYZ INFO Camel (camel-1) thread #7 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 5 to
> topic XYZ INFO Camel (camel-1) thread #8 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 6 to
> topic XYZ INFO Camel (camel-1) thread #9 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 7 to
> topic XYZ INFO Camel (camel-1) thread #10 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 8 to
> topic XYZ INFO Camel (camel-1) thread #11 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 9 to
> topic XYZ INFO Camel (camel-1) thread #12 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 10 to
> topic XYZ INFO Camel (camel-1) thread #13 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 11 to
> topic XYZ INFO Camel (camel-1) thread #14 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 12 to
> topic XYZ INFO Camel (camel-1) thread #15 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 13 to
> topic XYZ INFO Camel (camel-1) thread #16 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 14 to
> topic XYZ{code}
>
> If server stops responding due to network issue or any other scenario when
> the server is unavailable, then all the kafka consumers starts unsubscribing
> which is again an expected behavior (so far good)
> Note: We have defined the Camel
> {code:java}
> ThrottlingExceptionRoutePolicy {code}
> which does a health check call on the server before sending the consumed
> message.
> Once the server is back and available, *I see that not all 15 consumer
> threads are active*, but only 1 (*I guess this is the default value*).
> From the logs below, I observe that the consumerss are getting subscribed and
> unsubscribed one by one from the topic and finally the application runs with
> only a single consumer count. This is really strange to see.
> {code:java}
> INFO Camel (camel-1) thread #17 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 0 to
> topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values
> defined/undefined auto.commit.interval.ms = 5000 auto.offset.reset = latest
> bootstrap.servers = [ListOfDefinedServers] check.crcs = true client.id =
> connections.max.idle.ms = 540000 enable.auto.commit = false
> exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms =
> 500 fetch.min.bytes = 1 group.id = XYZ-GroupId-12345 heartbeat.interval.ms =
> 3000 interceptor.classes = null internal.leave.group.on.close = true
> isolation.level = read_uncommitted key.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer
> max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000
> max.poll.records = 500 metadata.max.age.ms = 5000 metric.reporters = []
> metrics.num.samples = 2 metrics.recording.level = INFO
> metrics.sample.window.ms = 30000 partition.assignment.strategy =
> [org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes =
> 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50
> request.timeout.ms = 40000 retry.backoff.ms = 100 sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name =
> null sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI
> security.protocol = SSL send.buffer.bytes = 131072 session.timeout.ms = 10000
> ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = HTTPS ssl.key.password = [hidden]
> ssl.keymanager.algorithm = SunX509 ssl.keystore.location = cert.jks
> ssl.keystore.password = [hidden] ssl.keystore.type = JKS ssl.protocol = TLS
> ssl.provider = null ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX ssl.truststore.location = cert.jks
> ssl.truststore.password = [hidden] ssl.truststore.type = JKS
> value.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer INFO Camel
> (camel-1) thread #17 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 0
> from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values defined/undefined
> INFO Camel (camel-1) thread #18 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 1 to
> topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values
> defined/undefined INFO Camel (camel-1) thread #18 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 1
> from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values defined/undefined
> INFO Camel (camel-1) thread #19 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 2 to
> topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values
> defined/undefined INFO Camel (camel-1) thread #19 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 2
> from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values defined/undefined
> INFO Camel (camel-1) thread #20 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 3 to
> topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values
> defined/undefined INFO Camel (camel-1) thread #20 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 3
> from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values defined/undefined
> INFO Camel (camel-1) thread #21 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 4 to
> topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values
> defined/undefined INFO Camel (camel-1) thread #21 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 4
> from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values defined/undefined
> INFO Camel (camel-1) thread #22 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 5 to
> topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values
> defined/undefined INFO Camel (camel-1) thread #22 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 5
> from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values defined/undefined
> INFO Camel (camel-1) thread #23 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 6 to
> topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values
> defined/undefined INFO Camel (camel-1) thread #23 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 6
> from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values defined/undefined
> INFO Camel (camel-1) thread #24 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 7 to
> topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values
> defined/undefined INFO Camel (camel-1) thread #24 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 7
> from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values defined/undefined
> INFO Camel (camel-1) thread #25 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 8 to
> topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values
> defined/undefined INFO Camel (camel-1) thread #25 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 8
> from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values defined/undefined
> INFO Camel (camel-1) thread #26 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 9 to
> topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values
> defined/undefined INFO Camel (camel-1) thread #26 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 9
> from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values defined/undefined
> INFO Camel (camel-1) thread #27 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 10 to
> topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values
> defined/undefined INFO Camel (camel-1) thread #27 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 10
> from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values defined/undefined
> INFO Camel (camel-1) thread #28 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 11 to
> topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values
> defined/undefined INFO Camel (camel-1) thread #28 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 11
> from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values defined/undefined
> INFO Camel (camel-1) thread #29 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 12 to
> topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values
> defined/undefined INFO Camel (camel-1) thread #29 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 12
> from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values defined/undefined
> INFO Camel (camel-1) thread #30 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 13 to
> topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values
> defined/undefined INFO Camel (camel-1) thread #30 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 13
> from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)]
> ConsumerConfig values: Prints all the consumer config values defined/undefined
> INFO Camel (camel-1) thread #31 - KafkaConsumer[XYZ]
> [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 14 to
> topic XYZ INFO Camel (camel-1) thread #31 - KafkaConsumer[XYZ]
> [clients.consumer.internals.AbstractCoordinator(info:341)] [Consumer
> clientId=consumer-30, groupId=XYZ-GroupId-12345] Discovered group coordinator
> servername (id: 2147482644 rack: null) INFO Camel (camel-1) thread #31 -
> KafkaConsumer[XYZ] [clients.consumer.internals.ConsumerCoordinator(info:341)]
> [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] Revoking
> previously assigned partitions [] INFO Camel (camel-1) thread #31 -
> KafkaConsumer[XYZ] [clients.consumer.internals.AbstractCoordinator(info:336)]
> [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] (Re-)joining group
> INFO Camel (camel-1) thread #31 - KafkaConsumer[XYZ]
> [clients.consumer.internals.AbstractCoordinator(info:341)] [Consumer
> clientId=consumer-30, groupId=XYZ-GroupId-12345] Successfully joined group
> with generation 1 INFO Camel (camel-1) thread #31 - KafkaConsumer[XYZ]
> [clients.consumer.internals.ConsumerCoordinator(info:341)] [Consumer
> clientId=consumer-30, groupId=XYZ-GroupId-12345] Setting newly assigned
> partitions [XYZ-17, XYZ-19, XYZ-13, XYZ-15, XYZ-25, XYZ-27, XYZ-21, XYZ-23,
> XYZ-1, XYZ-3, XYZ-28, XYZ-9, XYZ-11, XYZ-5, XYZ-7, XYZ-16, XYZ-18, XYZ-12,
> XYZ-14, XYZ-24, XYZ-26, XYZ-20, XYZ-22, XYZ-0, XYZ-2, XYZ-29, XYZ-8, XYZ-10,
> XYZ-4, XYZ-6]
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)