This is an automated email from the ASF dual-hosted git repository.
klease pushed a commit to branch camel-3.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.14.x by this push:
new b42f5a4fcb6 CAMEL-18350: backport the fix for Kafka "breakOnFirst"
error (#8451)
b42f5a4fcb6 is described below
commit b42f5a4fcb6ea7e72fb7e597c1a26365c6cb8fa2
Author: klease <[email protected]>
AuthorDate: Thu Sep 29 17:40:31 2022 +0200
CAMEL-18350: backport the fix for Kafka "breakOnFirst" error (#8451)
---
.../apache/camel/component/kafka/KafkaFetchRecords.java | 17 +++++++++--------
1 file changed, 9 insertions(+), 8 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index db425700532..f0f9413880a 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -156,8 +156,7 @@ class KafkaFetchRecords implements Runnable {
}
protected void startPolling() {
- long partitionLastOffset = -1;
-
+ KafkaRecordProcessor.ProcessResult lastResult = null;
try {
/*
* We lock the processing of the record to avoid raising a
WakeUpException as a result to a call
@@ -179,7 +178,7 @@ class KafkaFetchRecords implements Runnable {
processAsyncCommits();
- partitionLastOffset = processPolledRecords(allRecords,
kafkaRecordProcessor);
+ lastResult = processPolledRecords(allRecords,
kafkaRecordProcessor, lastResult);
}
if (!isConnected()) {
@@ -213,7 +212,7 @@ class KafkaFetchRecords implements Runnable {
e.getClass().getName(), threadId, getPrintableTopic(),
lastProcessedOffset, e.getMessage());
}
- handleAccordingToStrategy(partitionLastOffset, e);
+ handleAccordingToStrategy(lastResult.getPartitionLastOffset(), e);
} finally {
lock.unlock();
@@ -338,16 +337,18 @@ class KafkaFetchRecords implements Runnable {
return kafkaConsumer.getEndpoint().getCamelContext().isStopping() &&
!kafkaConsumer.isRunAllowed();
}
- private long processPolledRecords(ConsumerRecords<Object, Object>
allRecords, KafkaRecordProcessor kafkaRecordProcessor) {
+ private KafkaRecordProcessor.ProcessResult processPolledRecords(
+ ConsumerRecords<Object, Object> allRecords, KafkaRecordProcessor
kafkaRecordProcessor,
+ KafkaRecordProcessor.ProcessResult resultFromPreviousPoll) {
logRecords(allRecords);
Set<TopicPartition> partitions = allRecords.partitions();
Iterator<TopicPartition> partitionIterator = partitions.iterator();
- KafkaRecordProcessor.ProcessResult lastResult =
KafkaRecordProcessor.ProcessResult.newUnprocessed();
+ KafkaRecordProcessor.ProcessResult lastResult
+ = resultFromPreviousPoll == null ?
KafkaRecordProcessor.ProcessResult.newUnprocessed() : resultFromPreviousPoll;
while (partitionIterator.hasNext() && !isStopping()) {
- lastResult = KafkaRecordProcessor.ProcessResult.newUnprocessed();
TopicPartition partition = partitionIterator.next();
List<ConsumerRecord<Object, Object>> partitionRecords =
allRecords.records(partition);
@@ -377,7 +378,7 @@ class KafkaFetchRecords implements Runnable {
setRetry(false); // to close the current consumer
}
- return lastResult.getPartitionLastOffset();
+ return lastResult;
}
private void logRecordsInPartition(List<ConsumerRecord<Object, Object>>
partitionRecords, TopicPartition partition) {