[
https://issues.apache.org/jira/browse/CAMEL-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14736711#comment-14736711
]
Jonathan Anstey commented on CAMEL-8975:
----------------------------------------
[~mcoyote] would you mind submitting a PR for this? Sounds like an awesome
improvement!
> camel-kafka - Message loss with batch commit
> --------------------------------------------
>
> Key: CAMEL-8975
> URL: https://issues.apache.org/jira/browse/CAMEL-8975
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 2.15.2
> Environment: Unbuntu LTS 14.x, Java 7
> Reporter: Michael J. Kitchin
>
> These issues center around Kafka consumer (KafaConsumer.java, line numbers
> below):
> # Exchange exceptions/failures ignored at process() (:148), meaning:
> ## Automatic offset commit on exchange failure (e.g., processor/endpoint
> exception)
> ## In-flight exchange loss on Camel context/runtime shutdown (i.e., route
> interrupted -> exception suppressed -> offset committed)
> # BatchCommitConsumerTask activations are unbalanced during periods of low
> activity, meaning:
> ## await() (:165) will timeout for active BatchCommitConsumerTask(s) when
> other consumer threads are binding on it.hasNext() (:145) (blocking call,
> despite no @throws)
> ## Any, previously-activated await()'ing thread will (a) get a
> TimeoutExeception, (b) loop, and (c) get a BrokenBarrierException on the next
> await() call and (d) exit
> ## Process will repeat until (a) all consumer stream threads have exited, (b)
> leaving consumer dead
> ## Aggravated if process() (:148) blocks (e.g., for delay/redelivery on the
> route)
> # An ExecutorService is obtained from Camel to handle KafkaStreams with # of
> threads set to the consumerStreams param (:77). Since the # of KafkaStreams
> actually created is (consumersCount * consumerStreams) and executor runnables
> are indefinite loops, a random selection of streams will not be serviced if
> consumersCount>1.
> Source code URL:
> -
> https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
> We've troubleshot this extensively and reimplemented the KafkaConsumer class
> with params added to KafkaConfiguration to address these concerns and are
> happy to submit these back to the community, if interested.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)