This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.11.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.11.x by this push:
     new afc17b6  CAMEL-17131: camel-kafka - Consumer should not be in 
reconnect state on startup as this causes to create the kafka consumer twice on 
startup, which is also causing a JMX duplicate MBean error, and let alone the 
creation of the consumer again.
afc17b6 is described below

commit afc17b6520018c1beac28c388d28e8b5e22f0d0e
Author: Claus Ibsen <[email protected]>
AuthorDate: Tue Nov 16 10:00:51 2021 +0100

    CAMEL-17131: camel-kafka - Consumer should not be in reconnect state on 
startup as this causes to create the kafka consumer twice on startup, which is 
also causing a JMX duplicate MBean error, and let alone the creation of the 
consumer again.
---
 .../java/org/apache/camel/component/kafka/KafkaConsumer.java     | 9 +--------
 1 file changed, 1 insertion(+), 8 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 0378524..59fcec2 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -145,9 +145,6 @@ public class KafkaConsumer extends DefaultConsumer {
 
         for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); 
i++) {
             KafkaFetchRecords task = new KafkaFetchRecords(topic, pattern, i + 
"", getProps());
-            // pre-initialize task during startup so if there is any error we
-            // have it thrown asap
-            task.preInit();
             executor.submit(task);
             tasks.add(task);
         }
@@ -214,7 +211,7 @@ public class KafkaConsumer extends DefaultConsumer {
                         // re-initialize on re-connect so we have a fresh 
consumer
                         doInit();
                     }
-                } catch (Exception e) {
+                } catch (Throwable e) {
                     // ensure this is logged so users can see the problem
                     LOG.warn("Error creating 
org.apache.kafka.clients.consumer.KafkaConsumer due {}", e.getMessage(), e);
                 }
@@ -252,10 +249,6 @@ public class KafkaConsumer extends DefaultConsumer {
             LOG.info("Terminating KafkaConsumer thread: {} receiving from 
topic: {}", threadId, topicName);
         }
 
-        void preInit() {
-            doInit();
-        }
-
         protected void doInit() {
             // create consumer
             ClassLoader threadClassLoader = 
Thread.currentThread().getContextClassLoader();

Reply via email to