This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.11.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.11.x by this push:
new afc17b6 CAMEL-17131: camel-kafka - Consumer should not be in
reconnect state on startup as this causes to create the kafka consumer twice on
startup, which is also causing a JMX duplicate MBean error, and let alone the
creation of the consumer again.
afc17b6 is described below
commit afc17b6520018c1beac28c388d28e8b5e22f0d0e
Author: Claus Ibsen <[email protected]>
AuthorDate: Tue Nov 16 10:00:51 2021 +0100
CAMEL-17131: camel-kafka - Consumer should not be in reconnect state on
startup as this causes to create the kafka consumer twice on startup, which is
also causing a JMX duplicate MBean error, and let alone the creation of the
consumer again.
---
.../java/org/apache/camel/component/kafka/KafkaConsumer.java | 9 +--------
1 file changed, 1 insertion(+), 8 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 0378524..59fcec2 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -145,9 +145,6 @@ public class KafkaConsumer extends DefaultConsumer {
for (int i = 0; i < endpoint.getConfiguration().getConsumersCount();
i++) {
KafkaFetchRecords task = new KafkaFetchRecords(topic, pattern, i +
"", getProps());
- // pre-initialize task during startup so if there is any error we
- // have it thrown asap
- task.preInit();
executor.submit(task);
tasks.add(task);
}
@@ -214,7 +211,7 @@ public class KafkaConsumer extends DefaultConsumer {
// re-initialize on re-connect so we have a fresh
consumer
doInit();
}
- } catch (Exception e) {
+ } catch (Throwable e) {
// ensure this is logged so users can see the problem
LOG.warn("Error creating
org.apache.kafka.clients.consumer.KafkaConsumer due {}", e.getMessage(), e);
}
@@ -252,10 +249,6 @@ public class KafkaConsumer extends DefaultConsumer {
LOG.info("Terminating KafkaConsumer thread: {} receiving from
topic: {}", threadId, topicName);
}
- void preInit() {
- doInit();
- }
-
protected void doInit() {
// create consumer
ClassLoader threadClassLoader =
Thread.currentThread().getContextClassLoader();