[ 
https://issues.apache.org/jira/browse/CAMEL-14935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17119160#comment-17119160
 ] 

Chris McCarthy edited comment on CAMEL-14935 at 5/28/20, 11:37 PM:
-------------------------------------------------------------------

[~dariusx] I think the vulnerability would be well handled by a change in the 
KafkaConsumer (the camel component) in the method onPartitionsRevoked that 
implements the rebalance listener.  Wrapping the call to the commitOffset in a 
try block and removing the offsetKey from the Map lastProcesssedOffset in the 
corresponding finally block, so exceptions on commit do not prevent the Map 
being cleaned up. Hope that helps and is clear.   


was (Author: chris mccarthy):
[~dariusx] I think the vulnerability would be well handled by a change in the 
KafkaConsumer (the camel component) in the method onPartitionsRevoked that 
implements the rebalance listener.  Wrapping the call to the commitOffset in a 
try block and removing the offsetKey from the Map lastProcesssedOffset in the 
corresponding finally block, so exceptions on commit do not prevent the 
collection being cleaned up. Hope that helps and is clear.   

> KafkaConsumer commits old offset values in a failure scenario causing message 
> replays and offset reset error
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-14935
>                 URL: https://issues.apache.org/jira/browse/CAMEL-14935
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 2.24.0
>            Reporter: Chris McCarthy
>            Priority: Major
>             Fix For: 3.x
>
>
> We are experiencing unexpected offset reset errors occasionally, as well as 
> occasional replay of messages (without an offset reset error).
> The cause seems to be a failed commit on rebalance, leaving an old value in 
> the hashMap used to store the latest processed offset for a partition. This 
> old value is then re-read and re-committed across rebalances in certain 
> situations.
> Our relevant configuration details are:
> autoCommitEnable=false
>  allowManualCommit=true
>  autoOffsetReset=earliest
> It seems when the KafkaConsumer experiences an Exception committing the 
> offset (CommitFailedException) upon a rebalance, this leaves the old offset 
> value in the lastProcessedOffset hashMap.
> A subsequent rebalance that assigns the same partition to the same consumer, 
> that then thereafter experiences another rebalance (before any messages have 
> been processed successfully as this will over write the invalid value and 
> self correct the problem) will commit this old offset again.  This offset may 
> be very old if there have been many rebalances in between the original 
> rebalance that failed to commit its offset.
> If the old offset is beyond the retention period and the message is no longer 
> available the outcome is an offset reset error.  If the offset is within the 
> retention period all messages are replayed from that offset without an error 
> being thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to