[ 
https://issues.apache.org/jira/browse/CAMEL-20373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gregor Zurowski updated CAMEL-20373:
------------------------------------
    Fix Version/s: 3.22.2
                       (was: 3.22.1)

> camel-kafka - KafkaIdempotentRepository may allow some duplicates after 
> application restart
> -------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-20373
>                 URL: https://issues.apache.org/jira/browse/CAMEL-20373
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 3.22.0, 4.2.0
>            Reporter: Arseniy Tashoyan
>            Priority: Major
>             Fix For: 3.22.2, 4.x
>
>
> Current implementation of _KafkaIdempotentRepository_ gets initialized as 
> follows (after CAMEL-20218 fixed):
> - Run a separate thread: _TopicPoller_. The _TopicPoller_ executes the 
> _KafkaConsumer.poll()_ method in a while-loop to retrieve the cached keys 
> from the Kafka topic and to populate the local in-memory cache.
> - _TopicPoller_ stops retrieving records from the Kafka topic when 
> _KafkaConsumer.poll()_ returns zero records. The empty output from _poll()_ 
> is considered as a flag, that all records are retrieved.
> - The main thread waits for the _TopicPoller_ thread to finish, but no longer 
> than 30 seconds (the value is hardcoded).
> This implementation allows partially initialized local cache due to the 
> following reasons:
> 1. If _TopicPoller_ doesn't manage to consume all Kafka records from the 
> topic within 30-seconds interval.
> 2. If _KafkaConsumer.poll()_ returns empty record set, despite it has not yet 
> reached the end of the Kafka topic (this is possible).
> Hence we may have the situation, when after application restart, 
> _KafkaIdempotentRepository_ could not restore the local cache. Then the 
> consumer will re-consume already processed input. This will cause duplicates.
> h3. Proposed implementation
> - Remove asynchronous _TopicPoller_, retrieve all records from Kafka 
> synchronously in _KafkaIdempotentRepository.doStart()_
> - Read records from the Kafka topic until end offsets are reached. Do not 
> rely on the condition "_poll()_ returns empty record set".
> Pseudocode:
> {code:java}
> partitions = consumer.partitionsFor(topic)
> consumer.assign(partitions)
> consumer.seekToBeginning(partitions)
> endOffsets = consumer.endOffsets(partitions)
> while(!isReachedOffsets(consumer, endOffsets)) {
>   consumerRecords = consumer.poll()
>   addToLocalCache(consumerRecords)
> }
> {code}
> This implementation makes sure, that _KafkaIdempotentRepository_ is used in a 
> Camel application only after all cached records are restored from the 
> persistent storage (Kafka topic) to the local cache. It prevents duplicates 
> from occurring after the application has restarted.
> In the case of IdempotentConsumer, a Kafka topic plays the role of a database 
> table. We need basically the _"SELECT * FROM app_state"_ operation. Makes 
> little sense to run this _SELECT_ asynchronously and rely on a partial result 
> set.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to