This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new cc8b6cb CAMEL-17131: avoid duplicate initialization of the Kafka
consumer
cc8b6cb is described below
commit cc8b6cb1afd99415fb7cf10357f2407f129adc27
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Mon Nov 15 13:34:25 2021 +0100
CAMEL-17131: avoid duplicate initialization of the Kafka consumer
---
.../camel/component/kafka/KafkaConsumer.java | 4 --
.../camel/component/kafka/KafkaFetchRecords.java | 44 +++++++++-------------
2 files changed, 18 insertions(+), 30 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index ef2e4e9..e3b0c26 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -118,10 +118,6 @@ public class KafkaConsumer extends DefaultConsumer {
for (int i = 0; i < endpoint.getConfiguration().getConsumersCount();
i++) {
KafkaFetchRecords task = new KafkaFetchRecords(
this, pollExceptionStrategy, bridge, topic, pattern, i +
"", getProps());
- // pre-initialize task during startup so if there is any error we
- // have it thrown asap
- task.preInit();
-
executor.submit(task);
tasks.add(task);
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index c0119f4..2cc7e59 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -76,33 +76,25 @@ class KafkaFetchRecords implements Runnable {
this.kafkaProps = kafkaProps;
}
- void preInit() {
- createConsumer();
- }
-
@Override
public void run() {
if (!isKafkaConsumerRunnable()) {
return;
}
- if (isRetrying() || isReconnecting()) {
+ do {
try {
- if (isReconnecting()) {
- // re-initialize on re-connect so we have a fresh consumer
- createConsumer();
- }
+ createConsumer();
+
+ initializeConsumer();
} catch (Exception e) {
// ensure this is logged so users can see the problem
LOG.warn("Error creating
org.apache.kafka.clients.consumer.KafkaConsumer due {}", e.getMessage(), e);
+ continue;
}
- long delay =
kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
- String prefix = isReconnecting() ? "Reconnecting" : "Retrying";
- LOG.info("{} {} to topic {} after {} ms", prefix, threadId,
topicName, delay);
-
- doRun();
- }
+ startPolling();
+ } while (isRetrying() || isReconnecting());
LOG.info("Terminating KafkaConsumer thread: {} receiving from topic:
{}", threadId, topicName);
safeUnsubscribe();
@@ -117,6 +109,11 @@ class KafkaFetchRecords implements Runnable {
Thread.currentThread()
.setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
+ // The Kafka consumer should be null at the first try. For every
other reconnection event, it will not
+ long delay =
kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
+ final String prefix = this.consumer == null ? "Connecting" :
"Reconnecting";
+ LOG.info("{} Kafka consumer thread ID {} with poll timeout of {}
ms", prefix, threadId, delay);
+
// this may throw an exception if something is wrong with kafka
consumer
this.consumer =
kafkaConsumer.getEndpoint().getKafkaClientFactory().getConsumer(kafkaProps);
} finally {
@@ -124,19 +121,14 @@ class KafkaFetchRecords implements Runnable {
}
}
- protected void doRun() {
- if (isReconnecting()) {
- subscribe();
-
- // set reconnect to false as the connection and resume is done at
this point
- setReconnect(false);
+ private void initializeConsumer() {
+ subscribe();
- // set retry to true to continue polling
- setRetry(true);
- }
+ // set reconnect to false as the connection and resume is done at this
point
+ setReconnect(false);
- // start polling
- startPolling();
+ // set retry to true to continue polling
+ setRetry(true);
}
private void subscribe() {