Viswa Ramamoorthy created CAMEL-13339:
-----------------------------------------
Summary: StateRepository implemented to save offset state outside
Kafka, message loss occurs upon partition revoke
Key: CAMEL-13339
URL: https://issues.apache.org/jira/browse/CAMEL-13339
Project: Camel
Issue Type: Bug
Components: camel-kafka
Affects Versions: 2.23.0
Reporter: Viswa Ramamoorthy
Current implementation of
org.apache.camel.component.kafka.KafkaConsumer.KafkaFetchRecords's
onPartitionsRevoked, uses
org.apache.kafka.clients.consumer.KafkaConsumer.position(partition). This
approach causes message loss when multiple processes listening to same topic
for point to point messaging (like a QUEUE) type implementation.
Issue is noticed when partition gets assigned and then gets revoked in quick
succession. Upon partition assignment, say at the beginning of processing
offset is set to 0, and say there was no poll for this partition (may be due to
earlier poll brought in bunch of records and they are still being processed).
Subsequently, say partition got revoked, before polling.
In this case, as onPartitionsRevoked looks at
org.apache.kafka.clients.consumer.KafkaConsumer.position(partition) to save
offset state and so 0 gets saved in this case in StateRepository
implementation. When the same partition get assigned again,
StateRepository.getState returns 0. Since Camel KafkaConsumer treats this as
last processed offset, it adds 1 to it moving pointer to offset 1. Because of
this, message at offset 0 never gets processed.
Two fixes might be needed
# a) onPartitionsRevoked should look at last processed offset (possibly store
'last processed offset' for each topic/partition in a memory map) and use it to
save offset
# b) Currently onPartitionsRevoked just saves offset state when an
implementation of StateRepository configured. Ideally it should call
KafkaFetchRecords.commitOffset so commitSync call goes through when partition
revoked and no StateRepository implementation configured
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)