[ 
https://issues.apache.org/jira/browse/CAMEL-16181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793859#comment-17793859
 ] 

Arseniy Tashoyan commented on CAMEL-16181:
------------------------------------------

This fix does not work. The callback to onPartitionsAssigned() gets executed 
after several calls to consumer.poll(). You can see in the log (attached), that 
several messages "0 messages fetched on poll" appear in the log before the 
message "Seeking to beginning" (printed by the callback). As a result, 
KafkaIdempotentRepository starts with empty local cache and re-consumes all 
input files. Practically it does not work.
 [^kafka-idempotent-repository.log] 

> KafkaIdempotentRepository cache incorrectly flagged as ready
> ------------------------------------------------------------
>
>                 Key: CAMEL-16181
>                 URL: https://issues.apache.org/jira/browse/CAMEL-16181
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-kafka
>    Affects Versions: 3.7.2
>            Reporter: Javier Holguera
>            Priority: Major
>             Fix For: 3.7.3, 3.8.0
>
>         Attachments: kafka-idempotent-repository.log
>
>
> The `KafkaIdempotentRepository` initialises its cache off the back of the 
> pre-existing Kafka topic with previous entries, with the following code:
>  
> {code:java}
> log.debug("Subscribing consumer to {}", topic);             
> consumer.subscribe(Collections.singleton(topic));             
> log.debug("Seeking to beginning");             
> consumer.seekToBeginning(consumer.assignment());
>              
> POLL_LOOP: while (running.get()) {                 
>   log.trace("Polling");                 
>   ConsumerRecords<String, String> consumerRecords = 
> consumer.poll(pollDurationMs);                 
>   if (consumerRecords.isEmpty()) {                     
>     // the first time this happens, we can assume that we have consumed all 
> messages up to this point                     
>     log.trace("0 messages fetched on poll");                     
>     if (cacheReadyLatch.getCount() > 0) {                         
>       log.debug("Cache warmed up");                         
>       cacheReadyLatch.countDown();                     
>     }
>   }{code}
>  
> The problem with this code is:
>  # `consumer.subscribe` doesn't instantaneously assign partitions to the 
> consumer
>  # When `consumer.seekToBeginning` is called, the operation doesn't do 
> anything because it has no partitions yet (see [seekToBeginning doesn't work 
> without auto.offset.reset 
> (apache.org)|https://mail-archives.apache.org/mod_mbox/kafka-users/201603.mbox/%3ccakwx9vumpliqtu9o0mpepaupszapw9lm91mwexvafwktgd3...@mail.gmail.com%3e]
>  
>  # When later the first `consumer.poll` is issued, it returns nothing, 
> triggering the sequence to *confirm the cache as ready when it isn't yet*. 
> That can cause upstream messages not been correctly de-duplicated.  
> The solution is:
>  # Use a different overload of `consumer.subscribe` that accepts an 
> implementation of the `ConsumerRebalanceListener`.
>  # When partitions are assigned to the `consumer` instance, call 
> `seekToBeginning` there.
>  # Doing an initial `poll(0)` that will never return records but will force 
> the partition assignment process



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to