[
https://issues.apache.org/jira/browse/CAMEL-19650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kartik updated CAMEL-19650:
---------------------------
Description:
When we set the "{*}workerPool"{*} property in the Kafka configuration it's not
honored and "KafkaProducer" falls back to creating a new thread pool with the
default 10 threads.
As mentioned in
[https://camel.apache.org/components/3.14.x/kafka-component.html#_component_options]
There is a component option to provide custom thread pool "{*}workerPool
(producer){*}"
So in KafkaProducer.java
{code:java}
// code placeholder
@Override
@SuppressWarnings("rawtypes")
protected void doStart() throws Exception {
Properties props = getProps();
if (kafkaProducer == null) {
createProducer(props);
}
// if we are in asynchronous mode we need a worker pool
if (!configuration.isSynchronous() && workerPool == null) {
workerPool = endpoint.createProducerExecutor();
// we create a thread pool so we should also shut it down
shutdownWorkerPool = true;
}
}
{code}
" _*if (!configuration.isSynchronous() && workerPool == null) {*_ "
we only check for synchronization, since the worker pool is "null" when
created, it goes ahead and creates a new pool with default threads by calling
"createProducerExecutor()"
Ideally, we should check if "configuration.getWorkerPool()" and if it's not
null, we should assign this to the worker pool instead of creating the new one.
Fix.
{code:java}
// code placeholder
protected void doStart() throws Exception {
Properties props = this.getProps();
if (this.kafkaProducer == null) {
this.createProducer(props);
}
if (!this.configuration.isSynchronous() && this.workerPool == null) {
if (this.configuration.getWorkerPool() != null) {
this.workerPool = this.configuration.getWorkerPool();
this.shutdownWorkerPool = false;
} else {
this.workerPool = this.endpoint.createProducerExecutor();
this.shutdownWorkerPool = true;
}
}
} {code}
was:
When we set the "{*}workerPool"{*} property in the Kafka configuration it's not
honored and "KafkaProducer" falls back to creating a new thread pool with the
default 10 threads.
So in KafkaProducer.java
{code:java}
// code placeholder
@Override
@SuppressWarnings("rawtypes")
protected void doStart() throws Exception {
Properties props = getProps();
if (kafkaProducer == null) {
createProducer(props);
}
// if we are in asynchronous mode we need a worker pool
if (!configuration.isSynchronous() && workerPool == null) {
workerPool = endpoint.createProducerExecutor();
// we create a thread pool so we should also shut it down
shutdownWorkerPool = true;
}
}
{code}
" _*if (!configuration.isSynchronous() && workerPool == null) {*_ "
we only check for synchronization, since the worker pool is "null" when
created, it goes ahead and creates a new pool with default threads by calling
"createProducerExecutor()"
Ideally, we should check if "configuration.getWorkerPool()" and if it's not
null, we should assign this to the worker pool instead of creating the new one.
Fix.
{code:java}
// code placeholder
protected void doStart() throws Exception {
Properties props = this.getProps();
if (this.kafkaProducer == null) {
this.createProducer(props);
}
if (!this.configuration.isSynchronous() && this.workerPool == null) {
if (this.configuration.getWorkerPool() != null) {
this.workerPool = this.configuration.getWorkerPool();
this.shutdownWorkerPool = false;
} else {
this.workerPool = this.endpoint.createProducerExecutor();
this.shutdownWorkerPool = true;
}
}
} {code}
> Camel Kafka doesn't honor 'workerPool' configuration
> ----------------------------------------------------
>
> Key: CAMEL-19650
> URL: https://issues.apache.org/jira/browse/CAMEL-19650
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 3.14.9
> Environment: Java - 8
> Camel 3.14.9
> Kafka 3.x broker
> Reporter: Kartik
> Priority: Major
> Labels: camel-component
> Fix For: 3.14.10
>
>
> When we set the "{*}workerPool"{*} property in the Kafka configuration it's
> not honored and "KafkaProducer" falls back to creating a new thread pool with
> the default 10 threads.
>
> As mentioned in
> [https://camel.apache.org/components/3.14.x/kafka-component.html#_component_options]
> There is a component option to provide custom thread pool "{*}workerPool
> (producer){*}"
>
> So in KafkaProducer.java
>
> {code:java}
> // code placeholder
> @Override
> @SuppressWarnings("rawtypes")
> protected void doStart() throws Exception {
> Properties props = getProps();
> if (kafkaProducer == null) {
> createProducer(props);
> }
> // if we are in asynchronous mode we need a worker pool
> if (!configuration.isSynchronous() && workerPool == null) {
> workerPool = endpoint.createProducerExecutor();
> // we create a thread pool so we should also shut it down
> shutdownWorkerPool = true;
> }
> }
> {code}
> " _*if (!configuration.isSynchronous() && workerPool == null) {*_ "
> we only check for synchronization, since the worker pool is "null" when
> created, it goes ahead and creates a new pool with default threads by calling
> "createProducerExecutor()"
>
> Ideally, we should check if "configuration.getWorkerPool()" and if it's not
> null, we should assign this to the worker pool instead of creating the new
> one.
>
> Fix.
>
> {code:java}
> // code placeholder
> protected void doStart() throws Exception {
> Properties props = this.getProps();
> if (this.kafkaProducer == null) {
> this.createProducer(props);
> }
> if (!this.configuration.isSynchronous() && this.workerPool == null) {
> if (this.configuration.getWorkerPool() != null) {
> this.workerPool = this.configuration.getWorkerPool();
> this.shutdownWorkerPool = false;
> } else {
> this.workerPool = this.endpoint.createProducerExecutor();
> this.shutdownWorkerPool = true;
> }
> }
> } {code}
>
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)