[
https://issues.apache.org/jira/browse/CAMEL-12031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Claus Ibsen updated CAMEL-12031:
--------------------------------
Affects Version/s: 2.20.0
> KafkaConsumer stops consuming messages when exception occurs during offset
> commit
> ---------------------------------------------------------------------------------
>
> Key: CAMEL-12031
> URL: https://issues.apache.org/jira/browse/CAMEL-12031
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 2.20.0
> Reporter: Rafał Gała
> Fix For: 2.20.2, 2.21.0
>
>
> When processing of messages takes longer than max session timeout, the
> consumer thread will end after receiving the
> *org.apache.kafka.clients.consumer.CommitFailedException*.
> {code:java}
> @Override
> public void run() {
> boolean first = true;
> boolean reConnect = true;
> while (reConnect) {
> // create consumer
> ClassLoader threadClassLoader =
> Thread.currentThread().getContextClassLoader();
> try {
> // Kafka uses reflection for loading authentication
> settings, use its classloader
>
> Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
> this.consumer = new
> org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
> } finally {
>
> Thread.currentThread().setContextClassLoader(threadClassLoader);
> }
> if (!first) {
> // skip one poll timeout before trying again
> long delay =
> endpoint.getConfiguration().getPollTimeoutMs();
> log.info("Reconnecting {} to topic {} after {} ms",
> threadId, topicName, delay);
> try {
> Thread.sleep(delay);
> } catch (InterruptedException e) {
> Thread.currentThread().interrupt();
> }
> }
> first = false;
> // doRun keeps running until we either shutdown or is told to
> re-connect
> reConnect = doRun();
> }
> }
> {code}
> The *doRun()* method returns false and the loop ends. It should be possible
> to let the proces continue after failed offset commit.
> I think the catch block inside *doRun* method should look like this:
> {code:java}
> ...
> } catch (InterruptException e) {
> getExceptionHandler().handleException("Interrupted while
> consuming " + threadId + " from kafka topic", e);
> log.info("Unsubscribing {} from topic {}", threadId,
> topicName);
> consumer.unsubscribe();
> Thread.currentThread().interrupt();
> } catch (org.apache.kafka.clients.consumer.CommitFailedException
> e) { //or even org.apache.kafka.common.KafkaException
> getExceptionHandler().handleException("Error consuming " +
> threadId + " from kafka topic", e);
> reConnect = true;
> } catch (Exception e) {
> getExceptionHandler().handleException("Error consuming " +
> threadId + " from kafka topic", e);
> } finally {
> log.debug("Closing {} ", threadId);
> IOHelper.close(consumer);
> }
> ...
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)