Rafał Gała created CAMEL-12031:
----------------------------------
Summary: KafkaConsumer stops consuming messages when exception
occurs during offset commit
Key: CAMEL-12031
URL: https://issues.apache.org/jira/browse/CAMEL-12031
Project: Camel
Issue Type: Bug
Components: camel-kafka
Reporter: Rafał Gała
When processing of messages takes longer than max session timeout, the consumer
thread will and after receiving the
*org.apache.kafka.clients.consumer.CommitFailedException*.
{code:java}
@Override
public void run() {
boolean first = true;
boolean reConnect = true;
while (reConnect) {
// create consumer
ClassLoader threadClassLoader =
Thread.currentThread().getContextClassLoader();
try {
// Kafka uses reflection for loading authentication
settings, use its classloader
Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
this.consumer = new
org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
} finally {
Thread.currentThread().setContextClassLoader(threadClassLoader);
}
if (!first) {
// skip one poll timeout before trying again
long delay = endpoint.getConfiguration().getPollTimeoutMs();
log.info("Reconnecting {} to topic {} after {} ms",
threadId, topicName, delay);
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
first = false;
// doRun keeps running until we either shutdown or is told to
re-connect
reConnect = doRun();
}
}
{code}
The *doRun()* method returns false and the loop ends. It should be possible to
let the proces continue after failed offset commit.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)