davsclaus commented on code in PR #12879:
URL: https://github.com/apache/camel/pull/12879#discussion_r1463725683
##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java:
##########
@@ -43,6 +44,28 @@ final class KafkaRecordBatchingProcessor extends
KafkaRecordProcessor {
private final Processor processor;
private final CommitManager commitManager;
+ private record CommitSynchronization(CommitManager commitManager)
implements Synchronization {
+
+ @Override
+ public void onComplete(Exchange exchange) {
+ final List<?> exchanges =
exchange.getMessage().getBody(List.class);
+
+ // Ensure we are actually receiving what we are asked for
+ if (exchanges == null || exchanges.isEmpty()) {
+ LOG.warn("The exchange is {}", exchanges == null ? "not of the
expected type (null)" : "empty");
+ return;
+ }
+
+ LOG.debug("Calling commit on {} exchanges using {}",
exchanges.size(), commitManager.getClass().getSimpleName());
+ commitManager.commit();
+ }
+
+ @Override
+ public void onFailure(Exchange exchange) {
+ LOG.warn("Skipping auto-commit on the batch because processing the
exchanged has failed");
Review Comment:
This may be noisy if you have a recurring error and kafka batching will keep
poll the same messages as offset is not moved forward.
##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java:
##########
@@ -43,6 +44,28 @@ final class KafkaRecordBatchingProcessor extends
KafkaRecordProcessor {
private final Processor processor;
private final CommitManager commitManager;
+ private record CommitSynchronization(CommitManager commitManager)
implements Synchronization {
+
+ @Override
+ public void onComplete(Exchange exchange) {
+ final List<?> exchanges =
exchange.getMessage().getBody(List.class);
+
+ // Ensure we are actually receiving what we are asked for
+ if (exchanges == null || exchanges.isEmpty()) {
+ LOG.warn("The exchange is {}", exchanges == null ? "not of the
expected type (null)" : "empty");
+ return;
+ }
+
+ LOG.debug("Calling commit on {} exchanges using {}",
exchanges.size(), commitManager.getClass().getSimpleName());
+ commitManager.commit();
+ }
+
+ @Override
+ public void onFailure(Exchange exchange) {
+ LOG.warn("Skipping auto-commit on the batch because processing the
exchanged has failed");
Review Comment:
This will be noisy if you have a recurring error and kafka batching will
keep poll the same messages as offset is not moved forward.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]