[ https://issues.apache.org/jira/browse/CAMEL-20373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Gregor Zurowski updated CAMEL-20373: ------------------------------------ Fix Version/s: 3.22.2 (was: 3.22.1) > camel-kafka - KafkaIdempotentRepository may allow some duplicates after > application restart > ------------------------------------------------------------------------------------------- > > Key: CAMEL-20373 > URL: https://issues.apache.org/jira/browse/CAMEL-20373 > Project: Camel > Issue Type: Bug > Components: camel-kafka > Affects Versions: 3.22.0, 4.2.0 > Reporter: Arseniy Tashoyan > Priority: Major > Fix For: 3.22.2, 4.x > > > Current implementation of _KafkaIdempotentRepository_ gets initialized as > follows (after CAMEL-20218 fixed): > - Run a separate thread: _TopicPoller_. The _TopicPoller_ executes the > _KafkaConsumer.poll()_ method in a while-loop to retrieve the cached keys > from the Kafka topic and to populate the local in-memory cache. > - _TopicPoller_ stops retrieving records from the Kafka topic when > _KafkaConsumer.poll()_ returns zero records. The empty output from _poll()_ > is considered as a flag, that all records are retrieved. > - The main thread waits for the _TopicPoller_ thread to finish, but no longer > than 30 seconds (the value is hardcoded). > This implementation allows partially initialized local cache due to the > following reasons: > 1. If _TopicPoller_ doesn't manage to consume all Kafka records from the > topic within 30-seconds interval. > 2. If _KafkaConsumer.poll()_ returns empty record set, despite it has not yet > reached the end of the Kafka topic (this is possible). > Hence we may have the situation, when after application restart, > _KafkaIdempotentRepository_ could not restore the local cache. Then the > consumer will re-consume already processed input. This will cause duplicates. > h3. Proposed implementation > - Remove asynchronous _TopicPoller_, retrieve all records from Kafka > synchronously in _KafkaIdempotentRepository.doStart()_ > - Read records from the Kafka topic until end offsets are reached. Do not > rely on the condition "_poll()_ returns empty record set". > Pseudocode: > {code:java} > partitions = consumer.partitionsFor(topic) > consumer.assign(partitions) > consumer.seekToBeginning(partitions) > endOffsets = consumer.endOffsets(partitions) > while(!isReachedOffsets(consumer, endOffsets)) { > consumerRecords = consumer.poll() > addToLocalCache(consumerRecords) > } > {code} > This implementation makes sure, that _KafkaIdempotentRepository_ is used in a > Camel application only after all cached records are restored from the > persistent storage (Kafka topic) to the local cache. It prevents duplicates > from occurring after the application has restarted. > In the case of IdempotentConsumer, a Kafka topic plays the role of a database > table. We need basically the _"SELECT * FROM app_state"_ operation. Makes > little sense to run this _SELECT_ asynchronously and rely on a partial result > set. -- This message was sent by Atlassian Jira (v8.20.10#820010)