[
https://issues.apache.org/jira/browse/CAMEL-20373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Gregor Zurowski updated CAMEL-20373:
------------------------------------
Fix Version/s: 3.22.2
(was: 3.22.1)
> camel-kafka - KafkaIdempotentRepository may allow some duplicates after
> application restart
> -------------------------------------------------------------------------------------------
>
> Key: CAMEL-20373
> URL: https://issues.apache.org/jira/browse/CAMEL-20373
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 3.22.0, 4.2.0
> Reporter: Arseniy Tashoyan
> Priority: Major
> Fix For: 3.22.2, 4.x
>
>
> Current implementation of _KafkaIdempotentRepository_ gets initialized as
> follows (after CAMEL-20218 fixed):
> - Run a separate thread: _TopicPoller_. The _TopicPoller_ executes the
> _KafkaConsumer.poll()_ method in a while-loop to retrieve the cached keys
> from the Kafka topic and to populate the local in-memory cache.
> - _TopicPoller_ stops retrieving records from the Kafka topic when
> _KafkaConsumer.poll()_ returns zero records. The empty output from _poll()_
> is considered as a flag, that all records are retrieved.
> - The main thread waits for the _TopicPoller_ thread to finish, but no longer
> than 30 seconds (the value is hardcoded).
> This implementation allows partially initialized local cache due to the
> following reasons:
> 1. If _TopicPoller_ doesn't manage to consume all Kafka records from the
> topic within 30-seconds interval.
> 2. If _KafkaConsumer.poll()_ returns empty record set, despite it has not yet
> reached the end of the Kafka topic (this is possible).
> Hence we may have the situation, when after application restart,
> _KafkaIdempotentRepository_ could not restore the local cache. Then the
> consumer will re-consume already processed input. This will cause duplicates.
> h3. Proposed implementation
> - Remove asynchronous _TopicPoller_, retrieve all records from Kafka
> synchronously in _KafkaIdempotentRepository.doStart()_
> - Read records from the Kafka topic until end offsets are reached. Do not
> rely on the condition "_poll()_ returns empty record set".
> Pseudocode:
> {code:java}
> partitions = consumer.partitionsFor(topic)
> consumer.assign(partitions)
> consumer.seekToBeginning(partitions)
> endOffsets = consumer.endOffsets(partitions)
> while(!isReachedOffsets(consumer, endOffsets)) {
> consumerRecords = consumer.poll()
> addToLocalCache(consumerRecords)
> }
> {code}
> This implementation makes sure, that _KafkaIdempotentRepository_ is used in a
> Camel application only after all cached records are restored from the
> persistent storage (Kafka topic) to the local cache. It prevents duplicates
> from occurring after the application has restarted.
> In the case of IdempotentConsumer, a Kafka topic plays the role of a database
> table. We need basically the _"SELECT * FROM app_state"_ operation. Makes
> little sense to run this _SELECT_ asynchronously and rely on a partial result
> set.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)