[ 
https://issues.apache.org/jira/browse/CAMEL-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael J. Kitchin updated CAMEL-8975:
--------------------------------------
    Description: 
These issues center around Kafka consumer (KafaConsumer.java, line numberrs 
below):
# Exchange exceptions/failures ignored at process() (:148), meaning:
## Automatic offset commit on exchange failure (e.g., processor/endpoint 
exception)
## In-flight exchange loss on Camel context/runtime shutdown (i.e., route 
interrupted -> exception suppressed -> offset committed)
# BatchCommitConsumerTask activations are unbalanced during periods of low 
activity, meaning:
## await() (:165) will timeout for active BatchCommitConsumerTask(s) when other 
consumer threads are binding on it.hasNext() (:145) (blocking call, despite no 
@throws)
## Any, previously-activated await()'ing thread will (a) get a 
TimeoutExeception, (b) loop, and (c) get a BrokenBarrierException on the next 
await() call and (d) exit
## Process will repeat until (a) all consumer stream threads have exited, (b) 
leaving consumer dead
## Aggravated if process() (:148) blocks (e.g., for delay/redelivery on the 
route)
# An ExecutorService is obtained from Camel to handle KafkaStreams with # of 
threads set to the consumerStreams param (:77). Since the # of KafkaStreams 
actually created is (consumersCount * consumerStreams) and executor runnables 
are indefinite loops, a random selection of streams will not be serviced if 
consumersCount>1.

Source code URL:
- 
https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java

We've troubleshot this extensively and reimplemented the KafkaConsumer class 
with params added to KafkaConfiguration to address these concerns and are happy 
to submit these back to the community, if interested.

  was:
These issues center around Kafka consumer (KafaConsumer.java, line numberrs 
below):
# Exchange exceptions/failures ignored at process() (:148), meaning:
## Automatic offset commit on exchange failure (e.g., processor/endpoint 
exception)
## In-flight message loss on Camel context shutdown
# BatchCommitConsumerTask activations are unbalanced during periods of low 
activity, meaning:
## await() (:165) will timeout for active BatchCommitConsumerTask(s) when other 
consumer threads are binding on it.hasNext() (:145) (blocking call, despite no 
@throws)
## Any, previously-activated await()'ing thread will (a) get a 
TimeoutExeception, (b) loop, and (c) get a BrokenBarrierException on the next 
await() call and (d) exit
## Process will repeat until (a) all consumer stream threads have exited, (b) 
leaving consumer dead
## Aggravated if process() (:148) blocks (e.g., for delay/redelivery on the 
route)
# An ExecutorService is obtained from Camel to handle KafkaStreams with # of 
threads set to the consumerStreams param (:77). Since the # of KafkaStreams 
actually created is (consumersCount * consumerStreams) and executor runnables 
are indefinite loops, a random selection of streams will not be serviced if 
consumersCount>1.

Source code URL:
- 
https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java

We've troubleshot this extensively and reimplemented the KafkaConsumer class 
with params added to KafkaConfiguration to address these concerns and are happy 
to submit these back to the community, if interested.


> Message loss with batch commit
> ------------------------------
>
>                 Key: CAMEL-8975
>                 URL: https://issues.apache.org/jira/browse/CAMEL-8975
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 2.15.2
>         Environment: Unbuntu LTS 14.x, Java 7
>            Reporter: Michael J. Kitchin
>
> These issues center around Kafka consumer (KafaConsumer.java, line numberrs 
> below):
> # Exchange exceptions/failures ignored at process() (:148), meaning:
> ## Automatic offset commit on exchange failure (e.g., processor/endpoint 
> exception)
> ## In-flight exchange loss on Camel context/runtime shutdown (i.e., route 
> interrupted -> exception suppressed -> offset committed)
> # BatchCommitConsumerTask activations are unbalanced during periods of low 
> activity, meaning:
> ## await() (:165) will timeout for active BatchCommitConsumerTask(s) when 
> other consumer threads are binding on it.hasNext() (:145) (blocking call, 
> despite no @throws)
> ## Any, previously-activated await()'ing thread will (a) get a 
> TimeoutExeception, (b) loop, and (c) get a BrokenBarrierException on the next 
> await() call and (d) exit
> ## Process will repeat until (a) all consumer stream threads have exited, (b) 
> leaving consumer dead
> ## Aggravated if process() (:148) blocks (e.g., for delay/redelivery on the 
> route)
> # An ExecutorService is obtained from Camel to handle KafkaStreams with # of 
> threads set to the consumerStreams param (:77). Since the # of KafkaStreams 
> actually created is (consumersCount * consumerStreams) and executor runnables 
> are indefinite loops, a random selection of streams will not be serviced if 
> consumersCount>1.
> Source code URL:
> - 
> https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
> We've troubleshot this extensively and reimplemented the KafkaConsumer class 
> with params added to KafkaConfiguration to address these concerns and are 
> happy to submit these back to the community, if interested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to