[
https://issues.apache.org/jira/browse/CAMEL-19650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Nicolas Filotto resolved CAMEL-19650.
-------------------------------------
Resolution: Fixed
> Camel Kafka doesn't honor 'workerPool' configuration
> ----------------------------------------------------
>
> Key: CAMEL-19650
> URL: https://issues.apache.org/jira/browse/CAMEL-19650
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 3.14.9
> Environment: Java - 8
> Camel 3.14.9
> Kafka 3.x broker
> Reporter: Kartik
> Priority: Minor
> Labels: camel-component
> Fix For: 3.14.10, 3.20.7, 3.21.1, 3.22.0, 4.0.0
>
>
> When we set the "{*}workerPool"{*} property in the Kafka configuration it's
> not honored and "KafkaProducer" falls back to creating a new thread pool with
> the default 10 threads.
>
> As mentioned in
> [https://camel.apache.org/components/3.14.x/kafka-component.html#_component_options]
> There is a component option to provide custom thread pool "{*}workerPool
> (producer){*}"
>
> So in KafkaProducer.java
>
> {code:java}
> // code placeholder
> @Override
> @SuppressWarnings("rawtypes")
> protected void doStart() throws Exception {
> Properties props = getProps();
> if (kafkaProducer == null) {
> createProducer(props);
> }
> // if we are in asynchronous mode we need a worker pool
> if (!configuration.isSynchronous() && workerPool == null) {
> workerPool = endpoint.createProducerExecutor();
> // we create a thread pool so we should also shut it down
> shutdownWorkerPool = true;
> }
> }
> {code}
> " _*if (!configuration.isSynchronous() && workerPool == null) {*_ "
> we only check for synchronization, since the worker pool is "null" when
> created, it goes ahead and creates a new pool with default threads by calling
> "createProducerExecutor()"
>
> Ideally, we should check if "configuration.getWorkerPool()" and if it's not
> null, we should assign this to the worker pool instead of creating the new
> one.
>
> Fix.
>
> {code:java}
> // code placeholder
> protected void doStart() throws Exception {
> Properties props = this.getProps();
> if (this.kafkaProducer == null) {
> this.createProducer(props);
> }
> if (!this.configuration.isSynchronous() && this.workerPool == null) {
> if (this.configuration.getWorkerPool() != null) {
> this.workerPool = this.configuration.getWorkerPool();
> this.shutdownWorkerPool = false;
> } else {
> this.workerPool = this.endpoint.createProducerExecutor();
> this.shutdownWorkerPool = true;
> }
> }
> } {code}
>
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)