CodeSmell commented on code in PR #11920:
URL: https://github.com/apache/camel/pull/11920#discussion_r1384965058
##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java:
##########
@@ -112,30 +115,48 @@ public ProcessingResult processExchange(
exchange.setException(e);
}
if (exchange.getException() != null) {
- boolean breakOnErrorExit = processException(exchange, partition,
lastResult.getPartitionLastOffset(),
+
+ LOG.debug("an exception was thrown for record at partition {} and
offset {}",
+ record.partition(), record.offset());
+
+ boolean breakOnErrorExit = processException(exchange,
topicPartition, record, lastResult,
exceptionHandler);
- return new ProcessingResult(breakOnErrorExit,
lastResult.getPartitionLastOffset(), true);
+
+ return new ProcessingResult(breakOnErrorExit,
lastResult.getPartition(), lastResult.getPartitionLastOffset(), true);
} else {
- return new ProcessingResult(false, record.offset(),
exchange.getException() != null);
+ return new ProcessingResult(false, record.partition(),
record.offset(), exchange.getException() != null);
}
}
private boolean processException(
- Exchange exchange, TopicPartition partition, long
partitionLastOffset,
+ Exchange exchange, TopicPartition topicPartition,
+ ConsumerRecord<Object, Object> record, ProcessingResult lastResult,
ExceptionHandler exceptionHandler) {
// processing failed due to an unhandled exception, what should we do
if (configuration.isBreakOnFirstError()) {
+
+ if (lastResult.getPartition() != -1 &&
+ lastResult.getPartition() != record.partition()) {
+ LOG.error("about to process an exception with UNEXPECTED
partition & offset. Got topic partition {}. " +
+ " The last result was on partition {} with offset {}
but was expecting partition {} with offset {}",
+ topicPartition.partition(), lastResult.getPartition(),
lastResult.getPartitionLastOffset(),
+ record.partition(), record.offset());
+ }
+
// we are failing and we should break out
if (LOG.isWarnEnabled()) {
- LOG.warn("Error during processing {} from topic: {}",
exchange, partition.topic(), exchange.getException());
- LOG.warn("Will seek consumer to offset {} and start polling
again.", partitionLastOffset);
+ LOG.warn("Error during processing {} from topic: {}",
exchange, topicPartition.topic(), exchange.getException());
+ LOG.warn("Will seek consumer to offset {} on partition {} and
start polling again.",
+ lastResult.getPartitionLastOffset(),
lastResult.getPartition());
}
- // force commit, so we resume on next poll where we failed except
when the failure happened
- // at the first message in a poll
- if (partitionLastOffset != AbstractCommitManager.START_OFFSET) {
- commitManager.forceCommit(partition, partitionLastOffset);
+ // force commit, so we resume on next poll where we failed
+ // except when the failure happened at the first message in a poll
+ if (lastResult.getPartitionLastOffset() !=
AbstractCommitManager.START_OFFSET) {
+ // should we use record.offset ?
+ //commitManager.forceCommit(topicPartition, record.offset() -
1);
Review Comment:
Cool. Removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]