[
https://issues.apache.org/jira/browse/CAMEL-20373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Arseniy Tashoyan updated CAMEL-20373:
-------------------------------------
Description:
Current implementation of _KafkaIdempotentRepository_ gets initialized as
follows (after CAMEL-20218 fixed):
- Run a separate thread: _TopicPoller_. The _TopicPoller_ executes the
_KafkaConsumer.poll()_ method in a while-loop to retrieve the cached keys from
the Kafka topic and to populate the local in-memory cache.
- _TopicPoller_ stops retrieving records from the Kafka topic when
_KafkaConsumer.poll()_ returns zero records. The empty output from _poll()_ is
considered as a flag, that all records are retrieved.
- The main thread waits for the _TopicPoller_ thread to finish, but no longer
than 30 seconds (the value is hardcoded).
This implementation allows partially initialized local cache due to the
following reasons:
1. If _TopicPoller_ doesn't manage to consume all Kafka records from the topic
within 30-seconds interval.
2. If _KafkaConsumer.poll()_ returns empty record set, despite it has not yet
reached the end of the Kafka topic (this is possible).
Hence we may have the situation, when after application restart,
_KafkaIdempotentRepository_ could not restore the local cache. Then the
consumer will re-consume already processed input. This will cause duplicates.
h3. Proposed implementation
- Remove asynchronous _TopicPoller_, retrieve all records from Kafka
synchronously in _KafkaIdempotentRepository.doStart()_
- Read records from the Kafka topic until end offsets are reached. Do not rely
on the condition "_poll()_ returns empty record set".
Pseudocode:
{code:java}
partitions = consumer.partitionsFor(topic)
consumer.assign(partitions)
consumer.seekToBeginning(partitions)
endOffsets = consumer.endOffsets(partitions)
while(!isReachedOffsets(consumer, endOffsets)) {
consumerRecords = consumer.poll()
addToLocalCache(consumerRecords)
}
{code}
This implementation makes sure, that _KafkaIdempotentRepository_ is used in a
Camel application only after all cached records are restored from the
persistent storage (Kafka topic) to the local cache. It prevents duplicates
from occurring after the application has restarted.
In the case of IdempotentConsumer, a Kafka topic plays the role of a database
table. We need basically the _"SELECT * FROM app_state"_ operation. Makes
little sense to run this _SELECT_ asynchronously and rely on a partial result
set.
was:
Current implementation of _KafkaIdempotentRepository_ gets initialized as
follows (after CAMEL-20218 fixed):
- Run a separate thread: _TopicPoller_. The _TopicPoller_ executes the
_KafkaConsumer.poll()_ method in a while-loop to retrieve the cached keys from
the Kafka topic and to populate the local in-memory cache.
- _TopicPoller_ stops retrieving records from the Kafka topic when
_KafkaConsumer.poll()_ returns zero records. The empty output from _poll()_ is
considered as a flag, that all records are retrieved.
- The main thread waits for the _TopicPoller_ thread to finish, but no longer
than 30 seconds (the value is hardcoded).
This implementation allows partially initialized local cache due to the
following reasons:
1. If _TopicPoller_ doesn't manage to consume all Kafka records from the topic
within 30-seconds interval.
2. If _KafkaConsumer.poll()_ returns empty record set, despite it is not
reached the end of the Kafka topic (this is possible).
Hence we may have the situation, when after application restart,
_KafkaIdempotentRepository_ could not restore the local cache. Then the
consumer will re-consume already processed input. This will cause duplicates.
h3. Proposed implementation
- Remove asynchronous _TopicPoller_, retrieve all records from Kafka
synchronously in _KafkaIdempotentRepository.doStart()_
- Read records from the Kafka topic until end offsets are reached. Do not rely
on the condition "_poll()_ returns empty record set".
Pseudocode:
{code:java}
partitions = consumer.partitionsFor(topic)
consumer.assign(partitions)
consumer.seekToBeginning(partitions)
endOffsets = consumer.endOffsets(partitions)
while(!isReachedOffsets(consumer, endOffsets)) {
consumerRecords = consumer.poll()
addToLocalCache(consumerRecords)
}
{code}
This implementation makes sure, that _KafkaIdempotentRepository_ is used in a
Camel application only after all cached records are restored from the
persistent storage (Kafka topic) to the local cache. It prevents duplicates
from occurring after the application has restarted.
In the case of IdempotentConsumer, a Kafka topic plays the role of a database
table. We need basically the _"SELECT * FROM app_state"_ operation. Makes
little sense to run this _SELECT_ asynchronously and rely on a partial result
set.
> camel-kafka - KafkaIdempotentRepository may allow some duplicates after
> application restart
> -------------------------------------------------------------------------------------------
>
> Key: CAMEL-20373
> URL: https://issues.apache.org/jira/browse/CAMEL-20373
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 3.22.0, 4.2.0
> Reporter: Arseniy Tashoyan
> Priority: Major
> Fix For: 4.x
>
>
> Current implementation of _KafkaIdempotentRepository_ gets initialized as
> follows (after CAMEL-20218 fixed):
> - Run a separate thread: _TopicPoller_. The _TopicPoller_ executes the
> _KafkaConsumer.poll()_ method in a while-loop to retrieve the cached keys
> from the Kafka topic and to populate the local in-memory cache.
> - _TopicPoller_ stops retrieving records from the Kafka topic when
> _KafkaConsumer.poll()_ returns zero records. The empty output from _poll()_
> is considered as a flag, that all records are retrieved.
> - The main thread waits for the _TopicPoller_ thread to finish, but no longer
> than 30 seconds (the value is hardcoded).
> This implementation allows partially initialized local cache due to the
> following reasons:
> 1. If _TopicPoller_ doesn't manage to consume all Kafka records from the
> topic within 30-seconds interval.
> 2. If _KafkaConsumer.poll()_ returns empty record set, despite it has not yet
> reached the end of the Kafka topic (this is possible).
> Hence we may have the situation, when after application restart,
> _KafkaIdempotentRepository_ could not restore the local cache. Then the
> consumer will re-consume already processed input. This will cause duplicates.
> h3. Proposed implementation
> - Remove asynchronous _TopicPoller_, retrieve all records from Kafka
> synchronously in _KafkaIdempotentRepository.doStart()_
> - Read records from the Kafka topic until end offsets are reached. Do not
> rely on the condition "_poll()_ returns empty record set".
> Pseudocode:
> {code:java}
> partitions = consumer.partitionsFor(topic)
> consumer.assign(partitions)
> consumer.seekToBeginning(partitions)
> endOffsets = consumer.endOffsets(partitions)
> while(!isReachedOffsets(consumer, endOffsets)) {
> consumerRecords = consumer.poll()
> addToLocalCache(consumerRecords)
> }
> {code}
> This implementation makes sure, that _KafkaIdempotentRepository_ is used in a
> Camel application only after all cached records are restored from the
> persistent storage (Kafka topic) to the local cache. It prevents duplicates
> from occurring after the application has restarted.
> In the case of IdempotentConsumer, a Kafka topic plays the role of a database
> table. We need basically the _"SELECT * FROM app_state"_ operation. Makes
> little sense to run this _SELECT_ asynchronously and rely on a partial result
> set.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)