[ 
https://issues.apache.org/jira/browse/CAMEL-10115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vadym Chekrii updated CAMEL-10115:
----------------------------------
    Description: 
After triggering {{CamelContext#close()}} method the execution will reach 
{{org.apache.camel.component.kafka.KafkaConsumer#doStop}} where the shutdown of 
the executor instance will be triggered and where in it's turn the interruption 
of the submitted to the executor threads should happen (by reaching the native 
implementation of Thread#interrupt())

According to 
https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#interrupt-- 
interrupt method will only set a corresponding status to the thread, but will 
not terminate it. 

Problem is in the line {{KafkaConsumer.java:108}}:
{{ConsumerRecords<Object, Object> records = consumer.poll(Long.MAX_VALUE);}}

In the Kafka implementation of the poll method this will lead to almost 
infinite {{while}} loop which is not checking the thread status and this loop 
will exit only in case of receiving a message from a broker. Only after 
exciting the loop the interrupted status of the thread will be discovered and 
the thread will be terminated.

This leads to a couple of problems:
1. The KafkaConsumers remain alive until receiving at least one more message 
from the broker.
2. As the CamelContext at this point of time is most likely already shut down, 
the received message is not going to be processed, but will be acknowledged to 
the broker. So effectively the message gets lost.

A potential fix would be to either make the poll timeout reasonably small or 
configurable.


  was:
After triggering {{CamelContext#close()}} method the execution will reach 
{{org.apache.camel.component.kafka.KafkaConsumer#doStop}} where the shutdown of 
the executor instance will be triggered and where in it's turn the interruption 
of the submitted to the executor threads should happen (by reaching the native 
implementation of Thread#interrupt())

According to 
https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#interrupt-- 
interrupt method will only set a corresponding status to the thread, but will 
not terminate it. 

Problem is in the line {{KafkaConsumer.java:108}}
{{ConsumerRecords<Object, Object> records = consumer.poll(Long.MAX_VALUE);}}

In the Kafka implementation of the poll method this will lead to almost 
infinite `while` loop which is not checking the thread status and this loop 
will exit only in case of receiving a message from a broker. Only after 
exciting the loop the interupted status of the thread will be discovered and 
the thread will be terminated.

This leads to a couple of problems:
1. The KafkaConsumers remain alive until receiving at least one more message 
from the broker.
2. As the CamelContext at this point of time is most likely already shut down, 
the received message is not going to be processed, but will be acknwoledged to 
the broker. So effectively the message gets lost.

A potential fix would be to either make poll timeout reasonably small, or make 
it configurable.



> Kafka consumer is being left running if no message were received after 
> shutdown start
> -------------------------------------------------------------------------------------
>
>                 Key: CAMEL-10115
>                 URL: https://issues.apache.org/jira/browse/CAMEL-10115
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 2.17.1
>         Environment: Spring Boot 1.4.0.M3 with Tomcat
> Java 8
> Camel 2.17.1
>            Reporter: Vadym Chekrii
>
> After triggering {{CamelContext#close()}} method the execution will reach 
> {{org.apache.camel.component.kafka.KafkaConsumer#doStop}} where the shutdown 
> of the executor instance will be triggered and where in it's turn the 
> interruption of the submitted to the executor threads should happen (by 
> reaching the native implementation of Thread#interrupt())
> According to 
> https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#interrupt-- 
> interrupt method will only set a corresponding status to the thread, but will 
> not terminate it. 
> Problem is in the line {{KafkaConsumer.java:108}}:
> {{ConsumerRecords<Object, Object> records = consumer.poll(Long.MAX_VALUE);}}
> In the Kafka implementation of the poll method this will lead to almost 
> infinite {{while}} loop which is not checking the thread status and this loop 
> will exit only in case of receiving a message from a broker. Only after 
> exciting the loop the interrupted status of the thread will be discovered and 
> the thread will be terminated.
> This leads to a couple of problems:
> 1. The KafkaConsumers remain alive until receiving at least one more message 
> from the broker.
> 2. As the CamelContext at this point of time is most likely already shut 
> down, the received message is not going to be processed, but will be 
> acknowledged to the broker. So effectively the message gets lost.
> A potential fix would be to either make the poll timeout reasonably small or 
> configurable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to