orpiske commented on code in PR #12879:
URL: https://github.com/apache/camel/pull/12879#discussion_r1464809515


##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java:
##########
@@ -43,6 +44,28 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
     private final Processor processor;
     private final CommitManager commitManager;
 
+    private record CommitSynchronization(CommitManager commitManager) 
implements Synchronization {
+
+        @Override
+        public void onComplete(Exchange exchange) {
+            final List<?> exchanges = 
exchange.getMessage().getBody(List.class);
+
+            // Ensure we are actually receiving what we are asked for
+            if (exchanges == null || exchanges.isEmpty()) {
+                LOG.warn("The exchange is {}", exchanges == null ? "not of the 
expected type (null)" : "empty");
+                return;
+            }
+
+            LOG.debug("Calling commit on {} exchanges using {}", 
exchanges.size(), commitManager.getClass().getSimpleName());
+            commitManager.commit();
+        }
+
+        @Override
+        public void onFailure(Exchange exchange) {
+            LOG.warn("Skipping auto-commit on the batch because processing the 
exchanged has failed");

Review Comment:
   BTW, based on your suggestion ... I think I should also adjust the code 
below, where there was the error handling used in streaming mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to