[ 
https://issues.apache.org/jira/browse/CAMEL-12031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rafał Gała updated CAMEL-12031:
-------------------------------
    Description: 
When processing of messages takes longer than max session timeout, the consumer 
thread will end after receiving the 
*org.apache.kafka.clients.consumer.CommitFailedException*.

{code:java}
       @Override
        public void run() {
            boolean first = true;
            boolean reConnect = true;

            while (reConnect) {

                // create consumer
                ClassLoader threadClassLoader = 
Thread.currentThread().getContextClassLoader();
                try {
                    // Kafka uses reflection for loading authentication 
settings, use its classloader
                    
Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
                    this.consumer = new 
org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
                } finally {
                    
Thread.currentThread().setContextClassLoader(threadClassLoader);
                }

                if (!first) {
                    // skip one poll timeout before trying again
                    long delay = endpoint.getConfiguration().getPollTimeoutMs();
                    log.info("Reconnecting {} to topic {} after {} ms", 
threadId, topicName, delay);
                    try {
                        Thread.sleep(delay);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }

                first = false;

                // doRun keeps running until we either shutdown or is told to 
re-connect
                reConnect = doRun();
            }
        }
{code}

The *doRun()* method returns false and the loop ends. It should be possible to 
let the proces continue after failed offset commit.

I think the catch block inside *doRun* method should look like this:


{code:java}
           ...
            } catch (InterruptException e) {
                getExceptionHandler().handleException("Interrupted while 
consuming " + threadId + " from kafka topic", e);
                log.info("Unsubscribing {} from topic {}", threadId, topicName);
                consumer.unsubscribe();
                Thread.currentThread().interrupt();
            } catch (org.apache.kafka.clients.consumer.CommitFailedException e) 
{ //or even org.apache.kafka.common.KafkaException
                getExceptionHandler().handleException("Error consuming " + 
threadId + " from kafka topic", e);
                reConnect = true;
            } catch (Exception e) {
                getExceptionHandler().handleException("Error consuming " + 
threadId + " from kafka topic", e);
            } finally {
                log.debug("Closing {} ", threadId);
                IOHelper.close(consumer);
            }
            ...
{code}


  was:
When processing of messages takes longer than max session timeout, the consumer 
thread will end after receiving the 
*org.apache.kafka.clients.consumer.CommitFailedException*.

{code:java}
       @Override
        public void run() {
            boolean first = true;
            boolean reConnect = true;

            while (reConnect) {

                // create consumer
                ClassLoader threadClassLoader = 
Thread.currentThread().getContextClassLoader();
                try {
                    // Kafka uses reflection for loading authentication 
settings, use its classloader
                    
Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
                    this.consumer = new 
org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
                } finally {
                    
Thread.currentThread().setContextClassLoader(threadClassLoader);
                }

                if (!first) {
                    // skip one poll timeout before trying again
                    long delay = endpoint.getConfiguration().getPollTimeoutMs();
                    log.info("Reconnecting {} to topic {} after {} ms", 
threadId, topicName, delay);
                    try {
                        Thread.sleep(delay);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }

                first = false;

                // doRun keeps running until we either shutdown or is told to 
re-connect
                reConnect = doRun();
            }
        }
{code}

The *doRun()* method returns false and the loop ends. It should be possible to 
let the proces continue after failed offset commit.



> KafkaConsumer stops consuming messages when exception occurs during offset 
> commit
> ---------------------------------------------------------------------------------
>
>                 Key: CAMEL-12031
>                 URL: https://issues.apache.org/jira/browse/CAMEL-12031
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>            Reporter: Rafał Gała
>
> When processing of messages takes longer than max session timeout, the 
> consumer thread will end after receiving the 
> *org.apache.kafka.clients.consumer.CommitFailedException*.
> {code:java}
>        @Override
>         public void run() {
>             boolean first = true;
>             boolean reConnect = true;
>             while (reConnect) {
>                 // create consumer
>                 ClassLoader threadClassLoader = 
> Thread.currentThread().getContextClassLoader();
>                 try {
>                     // Kafka uses reflection for loading authentication 
> settings, use its classloader
>                     
> Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
>                     this.consumer = new 
> org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
>                 } finally {
>                     
> Thread.currentThread().setContextClassLoader(threadClassLoader);
>                 }
>                 if (!first) {
>                     // skip one poll timeout before trying again
>                     long delay = 
> endpoint.getConfiguration().getPollTimeoutMs();
>                     log.info("Reconnecting {} to topic {} after {} ms", 
> threadId, topicName, delay);
>                     try {
>                         Thread.sleep(delay);
>                     } catch (InterruptedException e) {
>                         Thread.currentThread().interrupt();
>                     }
>                 }
>                 first = false;
>                 // doRun keeps running until we either shutdown or is told to 
> re-connect
>                 reConnect = doRun();
>             }
>         }
> {code}
> The *doRun()* method returns false and the loop ends. It should be possible 
> to let the proces continue after failed offset commit.
> I think the catch block inside *doRun* method should look like this:
> {code:java}
>            ...
>             } catch (InterruptException e) {
>                 getExceptionHandler().handleException("Interrupted while 
> consuming " + threadId + " from kafka topic", e);
>                 log.info("Unsubscribing {} from topic {}", threadId, 
> topicName);
>                 consumer.unsubscribe();
>                 Thread.currentThread().interrupt();
>             } catch (org.apache.kafka.clients.consumer.CommitFailedException 
> e) { //or even org.apache.kafka.common.KafkaException
>                 getExceptionHandler().handleException("Error consuming " + 
> threadId + " from kafka topic", e);
>                 reConnect = true;
>             } catch (Exception e) {
>                 getExceptionHandler().handleException("Error consuming " + 
> threadId + " from kafka topic", e);
>             } finally {
>                 log.debug("Closing {} ", threadId);
>                 IOHelper.close(consumer);
>             }
>             ...
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to