[ 
https://issues.apache.org/jira/browse/CAMEL-10115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15360638#comment-15360638
 ] 

ASF GitHub Bot commented on CAMEL-10115:
----------------------------------------

GitHub user anoordover opened a pull request:

    https://github.com/apache/camel/pull/1057

    CAMEL-10115: introduced pollTimeOutMs with default 30000

    Please provide comment about two things:
    - the default time of 30000ms for pollTimeOutMs;
    - the assumption that the configuration of the endpoint (see line 50 in 
KafkaConsumer) is never null.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/anoordover/camel CAMEL-10115

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/camel/pull/1057.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1057
    
----
commit 2ee1ed1f2ef86310e750d7ce86f786add0b4535a
Author: Arno Noordover <[email protected]>
Date:   2016-07-03T18:30:03Z

    CAMEL-10115: introduced pollTimeOutMs with default 30000

----


> Kafka consumer stays running if no messages were received after shutdown start
> ------------------------------------------------------------------------------
>
>                 Key: CAMEL-10115
>                 URL: https://issues.apache.org/jira/browse/CAMEL-10115
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 2.17.1
>         Environment: Spring Boot 1.4.0.M3 with Tomcat
> Java 8
> Camel 2.17.1
>            Reporter: Vadym Chekrii
>
> After triggering {{CamelContext#close()}} method the execution will reach 
> {{org.apache.camel.component.kafka.KafkaConsumer#doStop}} where the shutdown 
> of the executor instance will be triggered and where in it's turn the 
> interruption of the submitted to the executor threads should happen (by 
> reaching the native implementation of Thread#interrupt())
> According to 
> https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#interrupt-- 
> interrupt method will only set a corresponding status to the thread, but will 
> not terminate it. 
> Problem is in the line {{KafkaConsumer.java:108}}:
> {{ConsumerRecords<Object, Object> records = consumer.poll(Long.MAX_VALUE);}}
> In the Kafka implementation of the poll method this will lead to almost 
> infinite {{while}} loop which is not checking the thread status and this loop 
> will exit only in case of receiving a message from a broker. Only after 
> exciting the loop the interrupted status of the thread will be discovered and 
> the thread will be terminated.
> This leads to a couple of problems:
> 1. The KafkaConsumers remain alive until receiving at least one more message 
> from the broker.
> 2. As the CamelContext at this point of time is most likely already shut 
> down, the received message is not going to be processed, but will be 
> acknowledged to the broker. So effectively the message gets lost.
> A potential fix would be to either make the poll timeout reasonably small or 
> configurable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to