Javier Holguera created CAMEL-16181:
---------------------------------------
Summary: KafkaIdempotentRepository cache incorrectly flagged as
ready
Key: CAMEL-16181
URL: https://issues.apache.org/jira/browse/CAMEL-16181
Project: Camel
Issue Type: Improvement
Components: camel-kafka
Affects Versions: 3.7.2
Reporter: Javier Holguera
Fix For: 3.8.0
The `KafkaIdempotentRepository` initialises its cache off the back of the
pre-existing Kafka topic with previous entries, with the following code:
{code:java} log.debug("Subscribing consumer to {}", topic);
consumer.subscribe(Collections.singleton(topic));
log.debug("Seeking to beginning");
consumer.seekToBeginning(consumer.assignment()); POLL_LOOP: while
(running.get()) \{ log.trace("Polling");
ConsumerRecords<String, String> consumerRecords =
consumer.poll(pollDurationMs); if (consumerRecords.isEmpty()) {
// the first time this happens, we can assume that we have
// consumed all // messages up to this
point log.trace("0 messages fetched on poll");
if (cacheReadyLatch.getCount() > 0) {
log.debug("Cache warmed up");
cacheReadyLatch.countDown(); } } \{code}
The problem with this code is: # `consumer.subscribe` doesn't instantaneously
assign partitions to the consumer # When `consumer.seekToBeginning` is called,
the operation doesn't do anything because it has no partitions yet (see
[seekToBeginning doesn't work without auto.offset.reset
(apache.org)|https://mail-archives.apache.org/mod_mbox/kafka-users/201603.mbox/%3ccakwx9vumpliqtu9o0mpepaupszapw9lm91mwexvafwktgd3...@mail.gmail.com%3e]
# When later the first `consumer.poll` is issued, it returns nothing,
triggering the sequence to *confirm the cache as ready when it isn't yet*. That
can cause upstream messages not been correctly de-duplicated. The solution
is: # Use a different overload of `consumer.subscribe` that accepts an
implementation of the `ConsumerRebalanceListener`. When partitions are assigned
to the `consumer` instance, call `seekToBeginning` there. # Doing an initial
`poll(0)` that will never return records but will force the partition
assignment process
--
This message was sent by Atlassian Jira
(v8.3.4#803005)