Rafał Gała created CAMEL-12031:
----------------------------------

             Summary: KafkaConsumer stops consuming messages when exception 
occurs during offset commit
                 Key: CAMEL-12031
                 URL: https://issues.apache.org/jira/browse/CAMEL-12031
             Project: Camel
          Issue Type: Bug
          Components: camel-kafka
            Reporter: Rafał Gała


When processing of messages takes longer than max session timeout, the consumer 
thread will and after receiving the 
*org.apache.kafka.clients.consumer.CommitFailedException*.

{code:java}
       @Override
        public void run() {
            boolean first = true;
            boolean reConnect = true;

            while (reConnect) {

                // create consumer
                ClassLoader threadClassLoader = 
Thread.currentThread().getContextClassLoader();
                try {
                    // Kafka uses reflection for loading authentication 
settings, use its classloader
                    
Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
                    this.consumer = new 
org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
                } finally {
                    
Thread.currentThread().setContextClassLoader(threadClassLoader);
                }

                if (!first) {
                    // skip one poll timeout before trying again
                    long delay = endpoint.getConfiguration().getPollTimeoutMs();
                    log.info("Reconnecting {} to topic {} after {} ms", 
threadId, topicName, delay);
                    try {
                        Thread.sleep(delay);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }

                first = false;

                // doRun keeps running until we either shutdown or is told to 
re-connect
                reConnect = doRun();
            }
        }
{code}

The *doRun()* method returns false and the loop ends. It should be possible to 
let the proces continue after failed offset commit.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to