Kartik created CAMEL-19650:
------------------------------
Summary: Camel Kafka doesn't honor 'workerPool' configuration
Key: CAMEL-19650
URL: https://issues.apache.org/jira/browse/CAMEL-19650
Project: Camel
Issue Type: Bug
Components: camel-kafka
Affects Versions: 3.14.9
Environment: Java - 8
Camel 3.14.9
Kafka 3.x broker
Reporter: Kartik
Fix For: 3.14.10
When we set the "{*}workerPool"{*} property in the Kafka configuration it's not
honored and "KafkaProducer" falls back to creating a new thread pool with the
default 10 threads.
So in KafkaProducer.java
{code:java}
// code placeholder
@Override
@SuppressWarnings("rawtypes")
protected void doStart() throws Exception {
Properties props = getProps();
if (kafkaProducer == null) {
createProducer(props);
}
// if we are in asynchronous mode we need a worker pool
if (!configuration.isSynchronous() && workerPool == null) {
workerPool = endpoint.createProducerExecutor();
// we create a thread pool so we should also shut it down
shutdownWorkerPool = true;
}
}
{code}
" _*if (!configuration.isSynchronous() && workerPool == null) {*_ "
we only check for synchronization, since the worker pool is "null" when
created, it goes ahead and creates a new pool with default threads by calling
"createProducerExecutor()"
Ideally, we should check if "configuration.getWorkerPool()" and if it's not
null, we should assign this to the worker pool instead of creating the new one.
Fix.
{code:java}
// code placeholder
protected void doStart() throws Exception {
Properties props = this.getProps();
if (this.kafkaProducer == null) {
this.createProducer(props);
}
if (!this.configuration.isSynchronous() && this.workerPool == null) {
if (this.configuration.getWorkerPool() != null) {
this.workerPool = this.configuration.getWorkerPool();
this.shutdownWorkerPool = false;
} else {
this.workerPool = this.endpoint.createProducerExecutor();
this.shutdownWorkerPool = true;
}
}
} {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)