CodeSmell commented on code in PR #11959:
URL: https://github.com/apache/camel/pull/11959#discussion_r1388410012
##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java:
##########
@@ -149,14 +146,30 @@ private boolean processException(
Exception exc = exchange.getException();
LOG.warn("Error during processing {} from topic: {} due to
{}", exchange, topicPartition.topic(),
exc.getMessage());
- LOG.warn("Will seek consumer to offset {} on partition {} and
start polling again.",
- lastResult.getPartitionLastOffset(),
lastResult.getPartition());
+ LOG.warn("Will seek consumer to offset {} on partition {} and
start polling again.",
+ record.offset(), record.partition());
}
// force commit, so we resume on next poll where we failed
// except when the failure happened at the first message in a poll
if (lastResult.getPartitionLastOffset() !=
AbstractCommitManager.START_OFFSET) {
- commitManager.forceCommit(topicPartition,
lastResult.getPartitionLastOffset());
+ // the record we are processing had the error
+ // so we will force commit the offset prior
+ // this will enable the current desired behavior to
+ // retry the message 1 more time
+ //
+ // Note: without a more extensive look at handling of
breakOnFirstError
+ // we will still need the lastResult so that we don't force
+ // retrying this message over and over
+ // commitManager.forceCommit(topicPartition, record.offset() -
1);
Review Comment:
this change to use record.offset instead of lastResult offset fixes
CAMEL-19894 and CAMEL-20044 as it will correctly manage the offset. the
lastResult will still occasionally get corrupted however.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]