Javier Holguera created CAMEL-16181:
---------------------------------------

             Summary: KafkaIdempotentRepository cache incorrectly flagged as 
ready
                 Key: CAMEL-16181
                 URL: https://issues.apache.org/jira/browse/CAMEL-16181
             Project: Camel
          Issue Type: Improvement
          Components: camel-kafka
    Affects Versions: 3.7.2
            Reporter: Javier Holguera
             Fix For: 3.8.0


The `KafkaIdempotentRepository` initialises its cache off the back of the 
pre-existing Kafka topic with previous entries, with the following code:

 

{code:java}            log.debug("Subscribing consumer to {}", topic);          
   consumer.subscribe(Collections.singleton(topic));             
log.debug("Seeking to beginning");             
consumer.seekToBeginning(consumer.assignment());             POLL_LOOP: while 
(running.get()) \{                 log.trace("Polling");                 
ConsumerRecords<String, String> consumerRecords = 
consumer.poll(pollDurationMs);                 if (consumerRecords.isEmpty()) { 
                    // the first time this happens, we can assume that we have  
                   // consumed all                     // messages up to this 
point                     log.trace("0 messages fetched on poll");              
       if (cacheReadyLatch.getCount() > 0) {                         
log.debug("Cache warmed up");                         
cacheReadyLatch.countDown();                     }                 } \{code} 
The problem with this code is: # `consumer.subscribe` doesn't instantaneously 
assign partitions to the consumer # When `consumer.seekToBeginning` is called, 
the operation doesn't do anything because it has no partitions yet (see 
[seekToBeginning doesn't work without auto.offset.reset 
(apache.org)|https://mail-archives.apache.org/mod_mbox/kafka-users/201603.mbox/%3ccakwx9vumpliqtu9o0mpepaupszapw9lm91mwexvafwktgd3...@mail.gmail.com%3e]
 # When later the first `consumer.poll` is issued, it returns nothing, 
triggering the sequence to *confirm the cache as ready when it isn't yet*. That 
can cause upstream messages not been correctly de-duplicated.   The solution 
is: # Use a different overload of `consumer.subscribe` that accepts an 
implementation of the `ConsumerRebalanceListener`. When partitions are assigned 
to the `consumer` instance, call `seekToBeginning` there. # Doing an initial 
`poll(0)` that will never return records but will force the partition 
assignment process



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to