[ 
https://issues.apache.org/jira/browse/CAMEL-19650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kartik updated CAMEL-19650:
---------------------------
    Description: 
When we set the "{*}workerPool"{*} property in the Kafka configuration it's not 
honored and "KafkaProducer" falls back to creating a new thread pool with the 
default 10 threads.

 

As mentioned in 
[https://camel.apache.org/components/3.14.x/kafka-component.html#_component_options]
 There is a component option to provide custom thread pool "{*}workerPool 
(producer){*}"

 

So in KafkaProducer.java 

 
{code:java}
// code placeholder

    @Override
    @SuppressWarnings("rawtypes")
    protected void doStart() throws Exception {
        Properties props = getProps();
        if (kafkaProducer == null) {
            createProducer(props);
        }

        // if we are in asynchronous mode we need a worker pool
        if (!configuration.isSynchronous() && workerPool == null) {
            workerPool = endpoint.createProducerExecutor();
            // we create a thread pool so we should also shut it down
            shutdownWorkerPool = true;
        }
    }

{code}
" _*if (!configuration.isSynchronous() && workerPool == null) {*_ "

we only check for synchronization, since the worker pool is "null" when 
created, it goes ahead and creates a new pool with default threads by calling 
"createProducerExecutor()"

 

Ideally, we should check if "configuration.getWorkerPool()" and if it's not 
null, we should assign this to the worker pool instead of creating the new one.

 

Fix.

 
{code:java}
// code placeholder

protected void doStart() throws Exception {
    Properties props = this.getProps();
    if (this.kafkaProducer == null) {
        this.createProducer(props);
    }

    if (!this.configuration.isSynchronous() && this.workerPool == null) {
        if (this.configuration.getWorkerPool() != null) {
            this.workerPool = this.configuration.getWorkerPool();
            this.shutdownWorkerPool = false;
        } else {
            this.workerPool = this.endpoint.createProducerExecutor();
            this.shutdownWorkerPool = true;
        }
    }

} {code}
 

 

 

 

 

  was:
When we set the "{*}workerPool"{*} property in the Kafka configuration it's not 
honored and "KafkaProducer" falls back to creating a new thread pool with the 
default 10 threads.

 

So in KafkaProducer.java 

 
{code:java}
// code placeholder

    @Override
    @SuppressWarnings("rawtypes")
    protected void doStart() throws Exception {
        Properties props = getProps();
        if (kafkaProducer == null) {
            createProducer(props);
        }

        // if we are in asynchronous mode we need a worker pool
        if (!configuration.isSynchronous() && workerPool == null) {
            workerPool = endpoint.createProducerExecutor();
            // we create a thread pool so we should also shut it down
            shutdownWorkerPool = true;
        }
    }

{code}
" _*if (!configuration.isSynchronous() && workerPool == null) {*_ "

we only check for synchronization, since the worker pool is "null" when 
created, it goes ahead and creates a new pool with default threads by calling 
"createProducerExecutor()"

 

Ideally, we should check if "configuration.getWorkerPool()" and if it's not 
null, we should assign this to the worker pool instead of creating the new one.

 

Fix.

 
{code:java}
// code placeholder

protected void doStart() throws Exception {
    Properties props = this.getProps();
    if (this.kafkaProducer == null) {
        this.createProducer(props);
    }

    if (!this.configuration.isSynchronous() && this.workerPool == null) {
        if (this.configuration.getWorkerPool() != null) {
            this.workerPool = this.configuration.getWorkerPool();
            this.shutdownWorkerPool = false;
        } else {
            this.workerPool = this.endpoint.createProducerExecutor();
            this.shutdownWorkerPool = true;
        }
    }

} {code}
 

 

 

 

 


> Camel Kafka doesn't honor 'workerPool' configuration
> ----------------------------------------------------
>
>                 Key: CAMEL-19650
>                 URL: https://issues.apache.org/jira/browse/CAMEL-19650
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 3.14.9
>         Environment: Java - 8
> Camel 3.14.9
> Kafka 3.x broker
>            Reporter: Kartik
>            Priority: Major
>              Labels: camel-component
>             Fix For: 3.14.10
>
>
> When we set the "{*}workerPool"{*} property in the Kafka configuration it's 
> not honored and "KafkaProducer" falls back to creating a new thread pool with 
> the default 10 threads.
>  
> As mentioned in 
> [https://camel.apache.org/components/3.14.x/kafka-component.html#_component_options]
>  There is a component option to provide custom thread pool "{*}workerPool 
> (producer){*}"
>  
> So in KafkaProducer.java 
>  
> {code:java}
> // code placeholder
>     @Override
>     @SuppressWarnings("rawtypes")
>     protected void doStart() throws Exception {
>         Properties props = getProps();
>         if (kafkaProducer == null) {
>             createProducer(props);
>         }
>         // if we are in asynchronous mode we need a worker pool
>         if (!configuration.isSynchronous() && workerPool == null) {
>             workerPool = endpoint.createProducerExecutor();
>             // we create a thread pool so we should also shut it down
>             shutdownWorkerPool = true;
>         }
>     }
> {code}
> " _*if (!configuration.isSynchronous() && workerPool == null) {*_ "
> we only check for synchronization, since the worker pool is "null" when 
> created, it goes ahead and creates a new pool with default threads by calling 
> "createProducerExecutor()"
>  
> Ideally, we should check if "configuration.getWorkerPool()" and if it's not 
> null, we should assign this to the worker pool instead of creating the new 
> one.
>  
> Fix.
>  
> {code:java}
> // code placeholder
> protected void doStart() throws Exception {
>     Properties props = this.getProps();
>     if (this.kafkaProducer == null) {
>         this.createProducer(props);
>     }
>     if (!this.configuration.isSynchronous() && this.workerPool == null) {
>         if (this.configuration.getWorkerPool() != null) {
>             this.workerPool = this.configuration.getWorkerPool();
>             this.shutdownWorkerPool = false;
>         } else {
>             this.workerPool = this.endpoint.createProducerExecutor();
>             this.shutdownWorkerPool = true;
>         }
>     }
> } {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to