This is an automated email from the ASF dual-hosted git repository.
klease pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 853477a2d0e CAMEL-18350: fix bug causing all messages to be reconsumed
(#8447)
853477a2d0e is described below
commit 853477a2d0e38dcbc0d399baa3c3e4a56c973b42
Author: klease <[email protected]>
AuthorDate: Thu Sep 29 09:47:19 2022 +0200
CAMEL-18350: fix bug causing all messages to be reconsumed (#8447)
When "breakOnFirstError" = "true", if an error occurs on the first message
in a set of polled messages, then the offset is set to -1, causing
all messages to be refetched and not only the one which failed.
Modify KafkaFetchRecords to remember the result returned from
the the processing of the previous batch when processing a new batch
of polled messages.
---
.../java/org/apache/camel/component/kafka/KafkaFetchRecords.java | 5 ++++-
.../kafka/consumer/support/KafkaRecordProcessorFacade.java | 7 ++++---
2 files changed, 8 insertions(+), 4 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 8cab5bcd76a..5684375bbdd 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -307,6 +307,7 @@ public class KafkaFetchRecords implements Runnable {
kafkaConsumer, threadId, commitManager, consumerListener);
Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
+ ProcessingResult lastResult = null;
while (isKafkaConsumerRunnableAndNotStopped() && isConnected() &&
pollExceptionStrategy.canContinue()) {
ConsumerRecords<Object, Object> allRecords =
consumer.poll(pollDuration);
if (consumerListener != null) {
@@ -315,13 +316,15 @@ public class KafkaFetchRecords implements Runnable {
}
}
- ProcessingResult result =
recordProcessorFacade.processPolledRecords(allRecords);
+ ProcessingResult result =
recordProcessorFacade.processPolledRecords(allRecords, lastResult);
if (result.isBreakOnErrorHit()) {
LOG.debug("We hit an error ... setting flags to force
reconnect");
// force re-connect
setReconnect(true);
setConnected(false);
+ } else {
+ lastResult = result;
}
updateTaskState();
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
index 53e519523e7..fbf6f3d09a8 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
@@ -54,16 +54,17 @@ public class KafkaRecordProcessorFacade {
return camelKafkaConsumer.isStopping();
}
- public ProcessingResult processPolledRecords(ConsumerRecords<Object,
Object> allRecords) {
+ public ProcessingResult processPolledRecords(
+ ConsumerRecords<Object, Object> allRecords, ProcessingResult
resultFromPreviousPoll) {
logRecords(allRecords);
Set<TopicPartition> partitions = allRecords.partitions();
Iterator<TopicPartition> partitionIterator = partitions.iterator();
- ProcessingResult lastResult = ProcessingResult.newUnprocessed();
+ ProcessingResult lastResult
+ = resultFromPreviousPoll == null ?
ProcessingResult.newUnprocessed() : resultFromPreviousPoll;
while (partitionIterator.hasNext() && !isStopping()) {
- lastResult = ProcessingResult.newUnprocessed();
TopicPartition partition = partitionIterator.next();
List<ConsumerRecord<Object, Object>> partitionRecords =
allRecords.records(partition);