[
https://issues.apache.org/jira/browse/CAMEL-16181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17314361#comment-17314361
]
Javier Holguera commented on CAMEL-16181:
-----------------------------------------
[~valdar] it's been a while since I worked in this PR. From what I recall, the
problem wasn't that the Consumer wouldn't try to fetch the right records (i.e.,
the first record in the topic/partition).
The issue was that the first call to `poll` returned nothing, which the cache
mechanism took as having nothing left to consume. The logic followed that the
consumer had caught up with the topic/partition and the cache was marked as
ready.
The code has a call to `seekToBeginning`, which would do the same as a
combination of `auto.offset.reset=earliest` + `group.id` that had a unique
value (i.e., UUID) to force.
> KafkaIdempotentRepository cache incorrectly flagged as ready
> ------------------------------------------------------------
>
> Key: CAMEL-16181
> URL: https://issues.apache.org/jira/browse/CAMEL-16181
> Project: Camel
> Issue Type: Improvement
> Components: camel-kafka
> Affects Versions: 3.7.2
> Reporter: Javier Holguera
> Priority: Major
> Fix For: 3.7.3, 3.8.0
>
>
> The `KafkaIdempotentRepository` initialises its cache off the back of the
> pre-existing Kafka topic with previous entries, with the following code:
>
> {code:java}
> log.debug("Subscribing consumer to {}", topic);
> consumer.subscribe(Collections.singleton(topic));
> log.debug("Seeking to beginning");
> consumer.seekToBeginning(consumer.assignment());
>
> POLL_LOOP: while (running.get()) {
> log.trace("Polling");
> ConsumerRecords<String, String> consumerRecords =
> consumer.poll(pollDurationMs);
> if (consumerRecords.isEmpty()) {
> // the first time this happens, we can assume that we have consumed all
> messages up to this point
> log.trace("0 messages fetched on poll");
> if (cacheReadyLatch.getCount() > 0) {
> log.debug("Cache warmed up");
> cacheReadyLatch.countDown();
> }
> }{code}
>
> The problem with this code is:
> # `consumer.subscribe` doesn't instantaneously assign partitions to the
> consumer
> # When `consumer.seekToBeginning` is called, the operation doesn't do
> anything because it has no partitions yet (see [seekToBeginning doesn't work
> without auto.offset.reset
> (apache.org)|https://mail-archives.apache.org/mod_mbox/kafka-users/201603.mbox/%3ccakwx9vumpliqtu9o0mpepaupszapw9lm91mwexvafwktgd3...@mail.gmail.com%3e]
>
> # When later the first `consumer.poll` is issued, it returns nothing,
> triggering the sequence to *confirm the cache as ready when it isn't yet*.
> That can cause upstream messages not been correctly de-duplicated.
> The solution is:
> # Use a different overload of `consumer.subscribe` that accepts an
> implementation of the `ConsumerRebalanceListener`.
> # When partitions are assigned to the `consumer` instance, call
> `seekToBeginning` there.
> # Doing an initial `poll(0)` that will never return records but will force
> the partition assignment process
--
This message was sent by Atlassian Jira
(v8.3.4#803005)