Michael J. Kitchin created CAMEL-8975:
-----------------------------------------
Summary: Message loss with batch commit
Key: CAMEL-8975
URL: https://issues.apache.org/jira/browse/CAMEL-8975
Project: Camel
Issue Type: Bug
Components: camel-kafka
Affects Versions: 2.15.2
Environment: Unbuntu LTS 14.x, Java 7
Reporter: Michael J. Kitchin
These issues center around Kafka consumer (KafaConsumer.java, line numberrs
below):
# Exchange exceptions/failures ignored at process() (:148), meaning:
## Automatic offset commit on exchange failure (e.g., processor/endpoint
exception)
## In-flight message loss on Camel context shutdown
# BatchCommitConsumerTask activations are unbalanced during periods of low
activity, meaning:
## await() (:165) will timeout for active BatchCommitConsumerTask(s) when other
consumer threads are binding on it.hasNext() (:145) (blocking call, despite no
@throws)
## Any, previously-activated await()'ing thread will (a) get a
TimeoutExeception, (b) loop, and (c) get a BrokenBarrierException on the next
await() call and (d) exit
## Process will repeat until (a) all consumer stream threads have exited, (b)
leaving consumer dead
## Aggravated if process() (:148) blocks (e.g., for delay/redelivery on the
route)
# An ExecutorService is obtained from Camel to handle KafkaStreams with # of
threads set to the consumerStreams param (:77). Since the # of KafkaStreams
actually created is (consumersCount * consumerStreams) and executor runnables
are indefinite loops, a random selection of streams will not be serviced if
consumersCount>1.
Source code URL:
-
https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
We've troubleshot this extensively and reimplemented the KafkaConsumer class
with params added to KafkaConfiguration to address these concerns and are happy
to submit these back to the community, if interested.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)