[ 
https://issues.apache.org/jira/browse/CAMEL-19650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicolas Filotto resolved CAMEL-19650.
-------------------------------------
    Resolution: Fixed

> Camel Kafka doesn't honor 'workerPool' configuration
> ----------------------------------------------------
>
>                 Key: CAMEL-19650
>                 URL: https://issues.apache.org/jira/browse/CAMEL-19650
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 3.14.9
>         Environment: Java - 8
> Camel 3.14.9
> Kafka 3.x broker
>            Reporter: Kartik
>            Priority: Minor
>              Labels: camel-component
>             Fix For: 3.14.10, 3.20.7, 3.21.1, 3.22.0, 4.0.0
>
>
> When we set the "{*}workerPool"{*} property in the Kafka configuration it's 
> not honored and "KafkaProducer" falls back to creating a new thread pool with 
> the default 10 threads.
>  
> As mentioned in 
> [https://camel.apache.org/components/3.14.x/kafka-component.html#_component_options]
>  There is a component option to provide custom thread pool "{*}workerPool 
> (producer){*}"
>  
> So in KafkaProducer.java 
>  
> {code:java}
> // code placeholder
>     @Override
>     @SuppressWarnings("rawtypes")
>     protected void doStart() throws Exception {
>         Properties props = getProps();
>         if (kafkaProducer == null) {
>             createProducer(props);
>         }
>         // if we are in asynchronous mode we need a worker pool
>         if (!configuration.isSynchronous() && workerPool == null) {
>             workerPool = endpoint.createProducerExecutor();
>             // we create a thread pool so we should also shut it down
>             shutdownWorkerPool = true;
>         }
>     }
> {code}
> " _*if (!configuration.isSynchronous() && workerPool == null) {*_ "
> we only check for synchronization, since the worker pool is "null" when 
> created, it goes ahead and creates a new pool with default threads by calling 
> "createProducerExecutor()"
>  
> Ideally, we should check if "configuration.getWorkerPool()" and if it's not 
> null, we should assign this to the worker pool instead of creating the new 
> one.
>  
> Fix.
>  
> {code:java}
> // code placeholder
> protected void doStart() throws Exception {
>     Properties props = this.getProps();
>     if (this.kafkaProducer == null) {
>         this.createProducer(props);
>     }
>     if (!this.configuration.isSynchronous() && this.workerPool == null) {
>         if (this.configuration.getWorkerPool() != null) {
>             this.workerPool = this.configuration.getWorkerPool();
>             this.shutdownWorkerPool = false;
>         } else {
>             this.workerPool = this.endpoint.createProducerExecutor();
>             this.shutdownWorkerPool = true;
>         }
>     }
> } {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to