[ 
https://issues.apache.org/jira/browse/CAMEL-16181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Javier Holguera updated CAMEL-16181:
------------------------------------
    Description: 
The `KafkaIdempotentRepository` initialises its cache off the back of the 
pre-existing Kafka topic with previous entries, with the following code:

 
{code:java}
log.debug("Subscribing consumer to {}", topic);             
consumer.subscribe(Collections.singleton(topic));             
log.debug("Seeking to beginning");             
consumer.seekToBeginning(consumer.assignment());
             
POLL_LOOP: while (running.get()) {                 
  log.trace("Polling");                 
  ConsumerRecords<String, String> consumerRecords = 
consumer.poll(pollDurationMs);                 
  if (consumerRecords.isEmpty()) {                     
    // the first time this happens, we can assume that we have consumed all 
messages up to this point                     
    log.trace("0 messages fetched on poll");                     
    if (cacheReadyLatch.getCount() > 0) {                         
      log.debug("Cache warmed up");                         
      cacheReadyLatch.countDown();                     
    }
  }{code}
            \{code} The problem with this code is: # `consumer.subscribe` 
doesn't instantaneously assign partitions to the consumer # When 
`consumer.seekToBeginning` is called, the operation doesn't do anything because 
it has no partitions yet (see [seekToBeginning doesn't work without 
auto.offset.reset 
(apache.org)|https://mail-archives.apache.org/mod_mbox/kafka-users/201603.mbox/%3ccakwx9vumpliqtu9o0mpepaupszapw9lm91mwexvafwktgd3...@mail.gmail.com%3e]
 # When later the first `consumer.poll` is issued, it returns nothing, 
triggering the sequence to *confirm the cache as ready when it isn't yet*. That 
can cause upstream messages not been correctly de-duplicated.   The solution 
is: # Use a different overload of `consumer.subscribe` that accepts an 
implementation of the `ConsumerRebalanceListener`. When partitions are assigned 
to the `consumer` instance, call `seekToBeginning` there. # Doing an initial 
`poll(0)` that will never return records but will force the partition 
assignment process

  was:
The `KafkaIdempotentRepository` initialises its cache off the back of the 
pre-existing Kafka topic with previous entries, with the following code:

 

{code:java}            log.debug("Subscribing consumer to {}", topic);          
   consumer.subscribe(Collections.singleton(topic));             
log.debug("Seeking to beginning");             
consumer.seekToBeginning(consumer.assignment());             POLL_LOOP: while 
(running.get()) \{                 log.trace("Polling");                 
ConsumerRecords<String, String> consumerRecords = 
consumer.poll(pollDurationMs);                 if (consumerRecords.isEmpty()) { 
                    // the first time this happens, we can assume that we have  
                   // consumed all                     // messages up to this 
point                     log.trace("0 messages fetched on poll");              
       if (cacheReadyLatch.getCount() > 0) {                         
log.debug("Cache warmed up");                         
cacheReadyLatch.countDown();                     }                 } \{code} 
The problem with this code is: # `consumer.subscribe` doesn't instantaneously 
assign partitions to the consumer # When `consumer.seekToBeginning` is called, 
the operation doesn't do anything because it has no partitions yet (see 
[seekToBeginning doesn't work without auto.offset.reset 
(apache.org)|https://mail-archives.apache.org/mod_mbox/kafka-users/201603.mbox/%3ccakwx9vumpliqtu9o0mpepaupszapw9lm91mwexvafwktgd3...@mail.gmail.com%3e]
 # When later the first `consumer.poll` is issued, it returns nothing, 
triggering the sequence to *confirm the cache as ready when it isn't yet*. That 
can cause upstream messages not been correctly de-duplicated.   The solution 
is: # Use a different overload of `consumer.subscribe` that accepts an 
implementation of the `ConsumerRebalanceListener`. When partitions are assigned 
to the `consumer` instance, call `seekToBeginning` there. # Doing an initial 
`poll(0)` that will never return records but will force the partition 
assignment process


> KafkaIdempotentRepository cache incorrectly flagged as ready
> ------------------------------------------------------------
>
>                 Key: CAMEL-16181
>                 URL: https://issues.apache.org/jira/browse/CAMEL-16181
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-kafka
>    Affects Versions: 3.7.2
>            Reporter: Javier Holguera
>            Priority: Major
>             Fix For: 3.8.0
>
>
> The `KafkaIdempotentRepository` initialises its cache off the back of the 
> pre-existing Kafka topic with previous entries, with the following code:
>  
> {code:java}
> log.debug("Subscribing consumer to {}", topic);             
> consumer.subscribe(Collections.singleton(topic));             
> log.debug("Seeking to beginning");             
> consumer.seekToBeginning(consumer.assignment());
>              
> POLL_LOOP: while (running.get()) {                 
>   log.trace("Polling");                 
>   ConsumerRecords<String, String> consumerRecords = 
> consumer.poll(pollDurationMs);                 
>   if (consumerRecords.isEmpty()) {                     
>     // the first time this happens, we can assume that we have consumed all 
> messages up to this point                     
>     log.trace("0 messages fetched on poll");                     
>     if (cacheReadyLatch.getCount() > 0) {                         
>       log.debug("Cache warmed up");                         
>       cacheReadyLatch.countDown();                     
>     }
>   }{code}
>             \{code} The problem with this code is: # `consumer.subscribe` 
> doesn't instantaneously assign partitions to the consumer # When 
> `consumer.seekToBeginning` is called, the operation doesn't do anything 
> because it has no partitions yet (see [seekToBeginning doesn't work without 
> auto.offset.reset 
> (apache.org)|https://mail-archives.apache.org/mod_mbox/kafka-users/201603.mbox/%3ccakwx9vumpliqtu9o0mpepaupszapw9lm91mwexvafwktgd3...@mail.gmail.com%3e]
>  # When later the first `consumer.poll` is issued, it returns nothing, 
> triggering the sequence to *confirm the cache as ready when it isn't yet*. 
> That can cause upstream messages not been correctly de-duplicated.   The 
> solution is: # Use a different overload of `consumer.subscribe` that accepts 
> an implementation of the `ConsumerRebalanceListener`. When partitions are 
> assigned to the `consumer` instance, call `seekToBeginning` there. # Doing an 
> initial `poll(0)` that will never return records but will force the partition 
> assignment process



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to