[
https://issues.apache.org/jira/browse/CAMEL-22150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Claus Ibsen updated CAMEL-22150:
--------------------------------
Affects Version/s: 4.12.0
> camel-kafka - Issue with Batching: Missed Records Due to due
> hasExpiredRecords check
> ------------------------------------------------------------------------------------
>
> Key: CAMEL-22150
> URL: https://issues.apache.org/jira/browse/CAMEL-22150
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 4.10.5, 4.11.0, 4.12.0
> Reporter: Adithya Kashyap H M
> Priority: Major
> Fix For: 4.10.6, 4.13.0
>
>
> When using the batching approach in Camel-Kafka, we observed an issue related
> to how polled {{ConsumerRecords}} and the {{exchangeList}} are handled.
> Inside the {{hasExpiredRecords}} method, two conditions are checked:
> {code:java}
> private boolean hasExpiredRecords(ConsumerRecords<Object, Object>
> consumerRecords) {
> // no records in batch
> if (exchangeList.isEmpty()) {
> return false;
> }
> // timeout is only triggered if we no new records
> boolean timeout = consumerRecords.isEmpty() && timeoutWatch.taken() >=
> configuration.getPollTimeoutMs();
> // interval is triggered if enabled, and it has been X time since last
> batch completion
> boolean interval = configuration.getBatchingIntervalMs() != null
> && intervalWatch.taken() >= configuration.getBatchingIntervalMs();
> return timeout || interval;
> } {code}
> If the {{intervalWatch}} exceeds the configured {{{}batchingIntervalMs{}}},
> the condition evaluates to {{{}true{}}}. At this point, Camel proceeds to
> process the existing messages in the {{{}exchangeList{}}}.
> However, any new records fetched in the current Kafka poll
> ({{{}consumerRecords{}}}) are *not* *added* to the {{exchangeList}} for
> processing and it *returns* the result. Due to Kafka's polling behaviour, the
> next poll starts *after* the offset of these already-fetched-but-unprocessed
> records. As a result, these records are effectively skipped.
> If we continue to process and commit newly polled messages in the next
> iteration, the previously fetched (but unprocessed) messages are lost.
> This currently resulting in {*}data loss{*}, and we believe this is a
> critical issue in the batching logic.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)