[ 
https://issues.apache.org/jira/browse/CAMEL-22150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Claus Ibsen updated CAMEL-22150:
--------------------------------
    Priority: Major  (was: Critical)

> Issue with Batching in Camel-Kafka: Missed Records Due to due 
> hasExpiredRecords check
> -------------------------------------------------------------------------------------
>
>                 Key: CAMEL-22150
>                 URL: https://issues.apache.org/jira/browse/CAMEL-22150
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 4.11.0
>            Reporter: Adithya Kashyap H M
>            Priority: Major
>
> When using the batching approach in Camel-Kafka, we observed an issue related 
> to how polled {{ConsumerRecords}} and the {{exchangeList}} are handled.
> Inside the {{hasExpiredRecords}} method, two conditions are checked:
> {code:java}
> private boolean hasExpiredRecords(ConsumerRecords<Object, Object> 
> consumerRecords) {
>     // no records in batch
>     if (exchangeList.isEmpty()) {
>         return false;
>     }
>     // timeout is only triggered if we no new records
>     boolean timeout = consumerRecords.isEmpty() && timeoutWatch.taken() >= 
> configuration.getPollTimeoutMs();
>     // interval is triggered if enabled, and it has been X time since last 
> batch completion
>     boolean interval = configuration.getBatchingIntervalMs() != null
>             && intervalWatch.taken() >= configuration.getBatchingIntervalMs();
>     return timeout || interval;
> } {code}
> If the {{intervalWatch}} exceeds the configured {{{}batchingIntervalMs{}}}, 
> the condition evaluates to {{{}true{}}}. At this point, Camel proceeds to 
> process the existing messages in the {{{}exchangeList{}}}.
> However, any new records fetched in the current Kafka poll 
> ({{{}consumerRecords{}}}) are *not* *added* to the {{exchangeList}} for 
> processing and it *returns* the result. Due to Kafka's polling behaviour, the 
> next poll starts *after* the offset of these already-fetched-but-unprocessed 
> records. As a result, these records are effectively skipped.
> If we continue to process and commit newly polled messages in the next 
> iteration, the previously fetched (but unprocessed) messages are lost.
> This currently resulting in {*}data loss{*}, and we believe this is a 
> critical issue in the batching logic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to