This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 46c57a5e CAMEL-12110: camel-kafka consumer swallows exception if error
creating KafkaConsumer.
46c57a5e is described below
commit 46c57a5e672984df028f55ec3f16e25916df80ef
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Jan 3 11:52:42 2018 +0100
CAMEL-12110: camel-kafka consumer swallows exception if error creating
KafkaConsumer.
---
.../camel/component/kafka/KafkaConsumer.java | 34 +++++++++++++++++-----
1 file changed, 26 insertions(+), 8 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 769a5b8..72310e5 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -111,6 +111,8 @@ public class KafkaConsumer extends DefaultConsumer {
for (int i = 0; i < endpoint.getConfiguration().getConsumersCount();
i++) {
KafkaFetchRecords task = new KafkaFetchRecords(topic, pattern, i +
"", getProps());
+ // pre-initialize task during startup so if there is any error we
have it thrown asap
+ task.preInit();
executor.submit(task);
tasks.add(task);
}
@@ -158,15 +160,14 @@ public class KafkaConsumer extends DefaultConsumer {
boolean reConnect = true;
while (reConnect) {
-
- // create consumer
- ClassLoader threadClassLoader =
Thread.currentThread().getContextClassLoader();
try {
- // Kafka uses reflection for loading authentication
settings, use its classloader
-
Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
- this.consumer = new
org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
- } finally {
-
Thread.currentThread().setContextClassLoader(threadClassLoader);
+ if (!first) {
+ // re-initialize on re-connect so we have a fresh
consumer
+ doInit();
+ }
+ } catch (Throwable e) {
+ // ensure this is logged so users can see the problem
+ log.warn("Error creating
org.apache.kafka.clients.consumer.KafkaConsumer due " + e.getMessage(), e);
}
if (!first) {
@@ -187,6 +188,23 @@ public class KafkaConsumer extends DefaultConsumer {
}
}
+ void preInit() {
+ doInit();
+ }
+
+ protected void doInit() {
+ // create consumer
+ ClassLoader threadClassLoader =
Thread.currentThread().getContextClassLoader();
+ try {
+ // Kafka uses reflection for loading authentication settings,
use its classloader
+
Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
+ // this may throw an exception if something is wrong with
kafka consumer
+ this.consumer = new
org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
+ } finally {
+
Thread.currentThread().setContextClassLoader(threadClassLoader);
+ }
+ }
+
@SuppressWarnings("unchecked")
protected boolean doRun() {
// allow to re-connect thread in case we use that to retry failed
messages
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].