[ https://issues.apache.org/jira/browse/CAMEL-22150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Claus Ibsen updated CAMEL-22150: -------------------------------- Priority: Major (was: Critical) > Issue with Batching in Camel-Kafka: Missed Records Due to due > hasExpiredRecords check > ------------------------------------------------------------------------------------- > > Key: CAMEL-22150 > URL: https://issues.apache.org/jira/browse/CAMEL-22150 > Project: Camel > Issue Type: Bug > Components: camel-kafka > Affects Versions: 4.11.0 > Reporter: Adithya Kashyap H M > Priority: Major > > When using the batching approach in Camel-Kafka, we observed an issue related > to how polled {{ConsumerRecords}} and the {{exchangeList}} are handled. > Inside the {{hasExpiredRecords}} method, two conditions are checked: > {code:java} > private boolean hasExpiredRecords(ConsumerRecords<Object, Object> > consumerRecords) { > // no records in batch > if (exchangeList.isEmpty()) { > return false; > } > // timeout is only triggered if we no new records > boolean timeout = consumerRecords.isEmpty() && timeoutWatch.taken() >= > configuration.getPollTimeoutMs(); > // interval is triggered if enabled, and it has been X time since last > batch completion > boolean interval = configuration.getBatchingIntervalMs() != null > && intervalWatch.taken() >= configuration.getBatchingIntervalMs(); > return timeout || interval; > } {code} > If the {{intervalWatch}} exceeds the configured {{{}batchingIntervalMs{}}}, > the condition evaluates to {{{}true{}}}. At this point, Camel proceeds to > process the existing messages in the {{{}exchangeList{}}}. > However, any new records fetched in the current Kafka poll > ({{{}consumerRecords{}}}) are *not* *added* to the {{exchangeList}} for > processing and it *returns* the result. Due to Kafka's polling behaviour, the > next poll starts *after* the offset of these already-fetched-but-unprocessed > records. As a result, these records are effectively skipped. > If we continue to process and commit newly polled messages in the next > iteration, the previously fetched (but unprocessed) messages are lost. > This currently resulting in {*}data loss{*}, and we believe this is a > critical issue in the batching logic. -- This message was sent by Atlassian Jira (v8.20.10#820010)