This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit aef4ddd7491ffb78642f9943dea90e9c378e2d99
Author: codesmell <[email protected]>
AuthorDate: Wed Nov 15 13:16:10 2023 -0500

    CAMEL-20044: add extra logging for BreakOnFirstError (#11920)
---
 .../camel/component/kafka/KafkaFetchRecords.java   | 32 +++++++++++++++
 .../kafka/consumer/AbstractCommitManager.java      |  2 +
 .../consumer/support/KafkaRecordProcessor.java     | 45 ++++++++++++++++------
 .../support/KafkaRecordProcessorFacade.java        |  3 ++
 .../kafka/consumer/support/ProcessingResult.java   | 13 ++++++-
 5 files changed, 81 insertions(+), 14 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 987a4a36c21..9fb130f6697 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -319,7 +319,9 @@ public class KafkaFetchRecords implements Runnable {
                     kafkaConsumer, threadId, commitManager, consumerListener);
 
             Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
+
             ProcessingResult lastResult = null;
+
             while (isKafkaConsumerRunnableAndNotStopped() && isConnected() && 
pollExceptionStrategy.canContinue()) {
                 ConsumerRecords<Object, Object> allRecords = 
consumer.poll(pollDuration);
                 if (consumerListener != null) {
@@ -328,7 +330,32 @@ public class KafkaFetchRecords implements Runnable {
                     }
                 }
 
+                if (lastResult != null) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("This polling iteration is using lastresult 
on partition {} and offset {}",
+                                lastResult.getPartition(), 
lastResult.getPartitionLastOffset());
+                    }
+
+                } else {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("This polling iteration is using lastresult 
of null");
+                    }
+                }
+
                 ProcessingResult result = 
recordProcessorFacade.processPolledRecords(allRecords, lastResult);
+
+                if (result != null) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("This polling iteration had a result 
returned for partition {} and offset {}",
+                                result.getPartition(), 
result.getPartitionLastOffset());
+                    }
+
+                } else {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("This polling iteration had a result 
returned as null");
+                    }
+                }
+
                 updateTaskState();
                 if (result.isBreakOnErrorHit() && 
!this.state.equals(State.PAUSED)) {
                     LOG.debug("We hit an error ... setting flags to force 
reconnect");
@@ -337,6 +364,11 @@ public class KafkaFetchRecords implements Runnable {
                     setConnected(false);
                 } else {
                     lastResult = result;
+
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("setting lastresult to partition {} and 
offset {}",
+                                lastResult.getPartition(), 
lastResult.getPartitionLastOffset());
+                    }
                 }
 
             }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java
index fe5abc3e403..a1da9823be4 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java
@@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory;
 
 public abstract class AbstractCommitManager implements CommitManager {
     public static final long START_OFFSET = -1;
+    public static final long NON_PARTITION = -1;
+
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractCommitManager.class);
 
     protected final KafkaConsumer kafkaConsumer;
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index 1afe53cbe2b..97875b097f1 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -63,6 +63,9 @@ public class KafkaRecordProcessor {
             message.setHeader(KafkaConstants.KEY, record.key());
         }
 
+        LOG.debug("setting up the exchange for message from partition {} and 
offset {}",
+                record.partition(), record.offset());
+
         message.setBody(record.value());
     }
 
@@ -82,7 +85,7 @@ public class KafkaRecordProcessor {
     }
 
     public ProcessingResult processExchange(
-            Exchange exchange, TopicPartition partition, boolean 
partitionHasNext,
+            Exchange exchange, TopicPartition topicPartition, boolean 
partitionHasNext,
             boolean recordHasNext, ConsumerRecord<Object, Object> record, 
ProcessingResult lastResult,
             ExceptionHandler exceptionHandler) {
 
@@ -100,7 +103,7 @@ public class KafkaRecordProcessor {
 
         if (configuration.isAllowManualCommit()) {
             // allow Camel users to access the Kafka consumer API to be able 
to do for example manual commits
-            KafkaManualCommit manual = commitManager.getManualCommit(exchange, 
partition, record);
+            KafkaManualCommit manual = commitManager.getManualCommit(exchange, 
topicPartition, record);
 
             message.setHeader(KafkaConstants.MANUAL_COMMIT, manual);
             message.setHeader(KafkaConstants.LAST_POLL_RECORD, !recordHasNext 
&& !partitionHasNext);
@@ -112,30 +115,48 @@ public class KafkaRecordProcessor {
             exchange.setException(e);
         }
         if (exchange.getException() != null) {
-            boolean breakOnErrorExit = processException(exchange, partition, 
lastResult.getPartitionLastOffset(),
+
+            LOG.debug("An exception was thrown for record at partition {} and 
offset {}",
+                    record.partition(), record.offset());
+
+            boolean breakOnErrorExit = processException(exchange, 
topicPartition, record, lastResult,
                     exceptionHandler);
-            return new ProcessingResult(breakOnErrorExit, 
lastResult.getPartitionLastOffset(), true);
+
+            return new ProcessingResult(breakOnErrorExit, 
lastResult.getPartition(), lastResult.getPartitionLastOffset(), true);
         } else {
-            return new ProcessingResult(false, record.offset(), 
exchange.getException() != null);
+            return new ProcessingResult(false, record.partition(), 
record.offset(), exchange.getException() != null);
         }
     }
 
     private boolean processException(
-            Exchange exchange, TopicPartition partition, long 
partitionLastOffset,
+            Exchange exchange, TopicPartition topicPartition,
+            ConsumerRecord<Object, Object> record, ProcessingResult lastResult,
             ExceptionHandler exceptionHandler) {
 
         // processing failed due to an unhandled exception, what should we do
         if (configuration.isBreakOnFirstError()) {
+
+            if (lastResult.getPartition() != -1 &&
+                    lastResult.getPartition() != record.partition()) {
+                LOG.error("About to process an exception with UNEXPECTED 
partition & offset. Got topic partition {}. " +
+                          " The last result was on partition {} with offset {} 
but was expecting partition {} with offset {}",
+                        topicPartition.partition(), lastResult.getPartition(), 
lastResult.getPartitionLastOffset(),
+                        record.partition(), record.offset());
+            }
+
             // we are failing and we should break out
             if (LOG.isWarnEnabled()) {
-                LOG.warn("Error during processing {} from topic: {}", 
exchange, partition.topic(), exchange.getException());
-                LOG.warn("Will seek consumer to offset {} and start polling 
again.", partitionLastOffset);
+                Exception exc = exchange.getException();
+                LOG.warn("Error during processing {} from topic: {} due to 
{}", exchange, topicPartition.topic(),
+                        exc.getMessage());
+                LOG.warn("Will seek consumer to offset {} on partition {} and 
start polling again.",
+                        lastResult.getPartitionLastOffset(), 
lastResult.getPartition());
             }
 
-            // force commit, so we resume on next poll where we failed except 
when the failure happened
-            // at the first message in a poll
-            if (partitionLastOffset != AbstractCommitManager.START_OFFSET) {
-                commitManager.forceCommit(partition, partitionLastOffset);
+            // force commit, so we resume on next poll where we failed 
+            // except when the failure happened at the first message in a poll
+            if (lastResult.getPartitionLastOffset() != 
AbstractCommitManager.START_OFFSET) {
+                commitManager.forceCommit(topicPartition, 
lastResult.getPartitionLastOffset());
             }
 
             // continue to next partition
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
index fbf6f3d09a8..134246891fb 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
@@ -78,6 +78,9 @@ public class KafkaRecordProcessorFacade {
                 lastResult = processRecord(partition, 
partitionIterator.hasNext(), recordIterator.hasNext(), lastResult,
                         kafkaRecordProcessor, record);
 
+                LOG.debug("Processed record on partition {} and offset {} and 
got result for partition {} and offset {}",
+                        record.partition(), record.offset(), 
lastResult.getPartition(), lastResult.getPartitionLastOffset());
+
                 if (consumerListener != null) {
                     if (!consumerListener.afterProcess(lastResult)) {
                         commitManager.commit(partition);
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
index 36f0c69c8b2..fe3afd6ee8d 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
@@ -21,14 +21,19 @@ import 
org.apache.camel.component.kafka.consumer.AbstractCommitManager;
 
 public final class ProcessingResult {
     private static final ProcessingResult UNPROCESSED_RESULT
-            = new ProcessingResult(false, AbstractCommitManager.START_OFFSET, 
false);
+            = new ProcessingResult(
+                    false,
+                    AbstractCommitManager.NON_PARTITION,
+                    AbstractCommitManager.START_OFFSET, false);
 
     private final boolean breakOnErrorHit;
+    private final long lastPartition;
     private final long partitionLastOffset;
     private final boolean failed;
 
-    ProcessingResult(boolean breakOnErrorHit, long partitionLastOffset, 
boolean failed) {
+    ProcessingResult(boolean breakOnErrorHit, long lastPartition, long 
partitionLastOffset, boolean failed) {
         this.breakOnErrorHit = breakOnErrorHit;
+        this.lastPartition = lastPartition;
         this.partitionLastOffset = partitionLastOffset;
         this.failed = failed;
     }
@@ -41,6 +46,10 @@ public final class ProcessingResult {
         return partitionLastOffset;
     }
 
+    public long getPartition() {
+        return lastPartition;
+    }
+
     public boolean isFailed() {
         return failed;
     }

Reply via email to