Kartik created CAMEL-19650:
------------------------------

             Summary: Camel Kafka doesn't honor 'workerPool' configuration
                 Key: CAMEL-19650
                 URL: https://issues.apache.org/jira/browse/CAMEL-19650
             Project: Camel
          Issue Type: Bug
          Components: camel-kafka
    Affects Versions: 3.14.9
         Environment: Java - 8

Camel 3.14.9

Kafka 3.x broker
            Reporter: Kartik
             Fix For: 3.14.10


When we set the "{*}workerPool"{*} property in the Kafka configuration it's not 
honored and "KafkaProducer" falls back to creating a new thread pool with the 
default 10 threads.

 

So in KafkaProducer.java 

 
{code:java}
// code placeholder

    @Override
    @SuppressWarnings("rawtypes")
    protected void doStart() throws Exception {
        Properties props = getProps();
        if (kafkaProducer == null) {
            createProducer(props);
        }

        // if we are in asynchronous mode we need a worker pool
        if (!configuration.isSynchronous() && workerPool == null) {
            workerPool = endpoint.createProducerExecutor();
            // we create a thread pool so we should also shut it down
            shutdownWorkerPool = true;
        }
    }

{code}
" _*if (!configuration.isSynchronous() && workerPool == null) {*_ "

we only check for synchronization, since the worker pool is "null" when 
created, it goes ahead and creates a new pool with default threads by calling 
"createProducerExecutor()"

 

Ideally, we should check if "configuration.getWorkerPool()" and if it's not 
null, we should assign this to the worker pool instead of creating the new one.

 

Fix.

 
{code:java}
// code placeholder

protected void doStart() throws Exception {
    Properties props = this.getProps();
    if (this.kafkaProducer == null) {
        this.createProducer(props);
    }

    if (!this.configuration.isSynchronous() && this.workerPool == null) {
        if (this.configuration.getWorkerPool() != null) {
            this.workerPool = this.configuration.getWorkerPool();
            this.shutdownWorkerPool = false;
        } else {
            this.workerPool = this.endpoint.createProducerExecutor();
            this.shutdownWorkerPool = true;
        }
    }

} {code}
 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to