[ 
https://issues.apache.org/jira/browse/CAMEL-19827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17773795#comment-17773795
 ] 

Claus Ibsen commented on CAMEL-19827:
-------------------------------------

Lets see if we can validate those kafka configuration options more eager to 
make the route fail on startup. However it wont validate all, as if there is 
network connectivity problems then camel will keep retrying to re-connect (as 
it has always done)

> Kafka Component generates huge logs infinitely when invalid configuration is 
> provided.
> --------------------------------------------------------------------------------------
>
>                 Key: CAMEL-19827
>                 URL: https://issues.apache.org/jira/browse/CAMEL-19827
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 3.14.9
>         Environment: 3.14.9 Issue seen.
> 3.13.0 Works fine.
>            Reporter: Kartik
>            Priority: Minor
>             Fix For: 4.x
>
>
> When Camel-Kafka 3.13.0 version is used and if we define a Kafka endpoint 
> with an invalid configuration like port 1234556 which is invalid, An 
> exception is thrown and propagated back to a higher layer.
> Exception
>  
> {code:java}
> 15:24:42.513 [main] [{}] INFO  
> org.apache.camel.impl.engine.AbstractCamelContext - Apache Camel 3.13.0 
> (camel-1) shutting down (timeout:45s)
> 15:24:42.516 [main] [{}] DEBUG 
> org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager - Shutting 
> down with no inflight threads.
> 15:24:42.518 [main] [{}] DEBUG 
> org.apache.camel.impl.engine.AbstractCamelContext - Route: route1 which 
> failed to startup will be stopped
> 15:24:42.520 [main] [{}] DEBUG org.apache.camel.support.DefaultConsumer - 
> Shutting down consumer: Consumer[kafka:xyz?brokers=localhost:119092]
> 15:24:42.524 [main] [{}] INFO  
> org.apache.camel.impl.engine.AbstractCamelContext - Routes shutdown summary 
> (total:1 stopped:1)
> 15:24:42.524 [main] [{}] INFO  
> org.apache.camel.impl.engine.AbstractCamelContext -     Stopped route1 
> (kafka:xyz)
> 15:24:42.525 [main] [{}] DEBUG 
> org.apache.camel.impl.engine.DefaultInflightRepository - Shutting down with 
> no inflight exchanges.
> 15:24:42.529 [main] [{}] INFO  
> org.apache.camel.impl.engine.AbstractCamelContext - Apache Camel 3.13.0 
> (camel-1) shutdown in 16ms (uptime:1s575ms)
> Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to 
> construct kafka consumer
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:626)
>     at 
> org.apache.camel.component.kafka.DefaultKafkaClientFactory.getConsumer(DefaultKafkaClientFactory.java:34)
>     at 
> org.apache.camel.component.kafka.KafkaFetchRecords.createConsumer(KafkaFetchRecords.java:121)
>     at 
> org.apache.camel.component.kafka.KafkaFetchRecords.preInit(KafkaFetchRecords.java:80)
>     at 
> org.apache.camel.component.kafka.KafkaConsumer.doStart(KafkaConsumer.java:123)
>     at 
> org.apache.camel.support.service.BaseService.start(BaseService.java:119)
>     at 
> org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:113)
>     at 
> org.apache.camel.impl.engine.AbstractCamelContext.startService(AbstractCamelContext.java:3498)
>     at 
> org.apache.camel.impl.engine.InternalRouteStartupManager.doStartOrResumeRouteConsumers(InternalRouteStartupManager.java:401)
>     at 
> org.apache.camel.impl.engine.InternalRouteStartupManager.doStartRouteConsumers(InternalRouteStartupManager.java:319)
>     at 
> org.apache.camel.impl.engine.InternalRouteStartupManager.safelyStartRouteServices(InternalRouteStartupManager.java:213)
>     at 
> org.apache.camel.impl.engine.InternalRouteStartupManager.doStartOrResumeRoutes(InternalRouteStartupManager.java:147)
>     at 
> org.apache.camel.impl.engine.AbstractCamelContext.doStartCamel(AbstractCamelContext.java:3201)
>     at 
> org.apache.camel.impl.engine.AbstractCamelContext.doStartContext(AbstractCamelContext.java:2863)
>     at 
> org.apache.camel.impl.engine.AbstractCamelContext.doStart(AbstractCamelContext.java:2814)
>     at 
> org.apache.camel.support.service.BaseService.start(BaseService.java:119)
>     at 
> org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2510)
>     at 
> org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:246)
>     at org.example.camel.MyCamelExample.main(MyCamelExample.java:22)
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid port in 
> bootstrap.servers: localhost:119092
>     at 
> org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:82)
>     at 
> org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:731)
>     ... 21 more {code}
>  
> If we upgrade the library version to 3.14.9 then we don't see this exception 
> propagated to a higher layer instead infinite retries are done growing log 
> size in GB and filling up disk space.
>  
> {code:java}
> 15:27:10.603 [Camel (camel-1) thread #1 - KafkaConsumer[xyz]] [{}] INFO  
> org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
> 15:27:10.603 [Camel (camel-1) thread #1 - KafkaConsumer[xyz]] [{}] INFO  
> org.apache.kafka.common.metrics.Metrics - Closing reporter 
> org.apache.kafka.common.metrics.JmxReporter
> 15:27:10.604 [Camel (camel-1) thread #1 - KafkaConsumer[xyz]] [{}] INFO  
> org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
> 15:27:10.604 [Camel (camel-1) thread #1 - KafkaConsumer[xyz]] [{}] INFO  
> org.apache.kafka.common.utils.AppInfoParser - App info kafka.consumer for 
> consumer-df1a4884-a933-41fa-81b8-e1831627f0f6-239 unregistered
> 15:27:10.604 [Camel (camel-1) thread #1 - KafkaConsumer[xyz]] [{}] DEBUG 
> org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer 
> clientId=consumer-df1a4884-a933-41fa-81b8-e1831627f0f6-239, 
> groupId=df1a4884-a933-41fa-81b8-e1831627f0f6] Kafka consumer has been closed
> 15:27:10.604 [Camel (camel-1) thread #1 - KafkaConsumer[xyz]] [{}] WARN  
> org.apache.camel.component.kafka.KafkaFetchRecords - Error creating 
> org.apache.kafka.clients.consumer.KafkaConsumer due Failed to construct kafka 
> consumer
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
>  ~[kafka-clients-2.8.1.jar:?]
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665)
>  ~[kafka-clients-2.8.1.jar:?]
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646)
>  ~[kafka-clients-2.8.1.jar:?]
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:626)
>  ~[kafka-clients-2.8.1.jar:?]
>     at 
> org.apache.camel.component.kafka.DefaultKafkaClientFactory.getConsumer(DefaultKafkaClientFactory.java:34)
>  ~[camel-kafka-3.14.9.jar:3.14.9]
>     at 
> org.apache.camel.component.kafka.KafkaFetchRecords.createConsumer(KafkaFetchRecords.java:126)
>  ~[camel-kafka-3.14.9.jar:3.14.9]
>     at 
> org.apache.camel.component.kafka.KafkaFetchRecords.run(KafkaFetchRecords.java:89)
>  ~[camel-kafka-3.14.9.jar:3.14.9]
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
>     at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  ~[?:?]
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  ~[?:?]
>     at java.lang.Thread.run(Thread.java:829) ~[?:?]
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid port in 
> bootstrap.servers: localhost:119092
>     at 
> org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:82)
>  ~[kafka-clients-2.8.1.jar:?]
>     at 
> org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48)
>  ~[kafka-clients-2.8.1.jar:?]
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:731)
>  ~[kafka-clients-2.8.1.jar:?]
>     ... 11 more
> 15:27:10.604 [Camel (camel-1) thread #1 - KafkaConsumer[xyz]] [{}] INFO  
> org.apache.camel.component.kafka.KafkaFetchRecords - Connecting Kafka 
> consumer thread ID xyz-Thread 0 with poll timeout of 5000 ms
> 15:27:10.605 [Camel (camel-1) thread #1 - KafkaConsumer[xyz]] [{}] INFO  
> org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
>     allow.auto.create.topics = true
>     auto.commit.interval.ms = 5000
>     auto.offset.reset = latest
>     bootstrap.servers = [localhost:119092]
>     check.crcs = true
>     client.dns.lookup = use_all_dns_ips
>     client.id = consumer-df1a4884-a933-41fa-81b8-e1831627f0f6-240
>     client.rack = 
>     connections.max.idle.ms = 540000
>     default.api.timeout.ms = 60000
>     enable.auto.commit = true {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to