davsclaus commented on code in PR #12879:
URL: https://github.com/apache/camel/pull/12879#discussion_r1464521542
##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java:
##########
@@ -43,6 +44,28 @@ final class KafkaRecordBatchingProcessor extends
KafkaRecordProcessor {
private final Processor processor;
private final CommitManager commitManager;
+ private record CommitSynchronization(CommitManager commitManager)
implements Synchronization {
+
+ @Override
+ public void onComplete(Exchange exchange) {
+ final List<?> exchanges =
exchange.getMessage().getBody(List.class);
+
+ // Ensure we are actually receiving what we are asked for
+ if (exchanges == null || exchanges.isEmpty()) {
+ LOG.warn("The exchange is {}", exchanges == null ? "not of the
expected type (null)" : "empty");
+ return;
+ }
+
+ LOG.debug("Calling commit on {} exchanges using {}",
exchanges.size(), commitManager.getClass().getSimpleName());
+ commitManager.commit();
+ }
+
+ @Override
+ public void onFailure(Exchange exchange) {
+ LOG.warn("Skipping auto-commit on the batch because processing the
exchanged has failed");
Review Comment:
The other components would use
consumer.getExceptionHandler().handleException(...) which are usually if there
was a caused unhandled exception, and the default implementation will just log
a WARN.
So if you do
from file
throw exception
it will also just keep being noisy but its only doing this every 0.5 sec (by
default).
Is kafka not a lot faster, or how "slow is that rate" ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]