Viswa Ramamoorthy created CAMEL-13339:
-----------------------------------------

             Summary: StateRepository implemented to save offset state outside 
Kafka, message loss occurs upon partition revoke
                 Key: CAMEL-13339
                 URL: https://issues.apache.org/jira/browse/CAMEL-13339
             Project: Camel
          Issue Type: Bug
          Components: camel-kafka
    Affects Versions: 2.23.0
            Reporter: Viswa Ramamoorthy


Current implementation of 
org.apache.camel.component.kafka.KafkaConsumer.KafkaFetchRecords's 
onPartitionsRevoked, uses 
org.apache.kafka.clients.consumer.KafkaConsumer.position(partition). This 
approach causes message loss when multiple processes listening to same topic 
for point to point messaging (like a QUEUE) type implementation.

 

Issue is noticed when partition gets assigned and then gets revoked in quick 
succession. Upon partition assignment, say at the beginning of processing 
offset is set to 0, and say there was no poll for this partition (may be due to 
earlier poll brought in bunch of records and they are still being processed). 
Subsequently, say partition got revoked, before polling.

In this case, as onPartitionsRevoked looks at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(partition) to save 
offset state and so 0 gets saved in this case in StateRepository 
implementation. When the same partition get assigned again, 
StateRepository.getState returns 0. Since Camel KafkaConsumer treats this as 
last processed offset, it adds 1 to it moving pointer to offset 1. Because of 
this, message at offset 0 never gets processed.

 

Two fixes might be needed
 # a) onPartitionsRevoked should look at last processed offset (possibly store 
'last processed offset' for each topic/partition in a memory map) and use it to 
save offset
 # b) Currently onPartitionsRevoked just saves offset state when an 
implementation of StateRepository configured. Ideally it should call 
KafkaFetchRecords.commitOffset so commitSync call goes through when partition 
revoked and no StateRepository implementation configured



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to