This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch ko in repository https://gitbox.apache.org/repos/asf/camel.git
commit 13df92ffc6a7b23cd4ff0576080d574b1aa603a9 Author: Claus Ibsen <[email protected]> AuthorDate: Tue Feb 25 09:49:55 2025 +0100 CAMEL-21788: DelegatingCallback invokes completion in wrong order. --- .../org/apache/camel/component/kafka/KafkaProducer.java | 3 ++- .../kafka/producer/support/DelegatingCallback.java | 17 +++++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index d3deb114633..06ecf4510c5 100755 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -510,7 +510,8 @@ public class KafkaProducer extends DefaultAsyncProducer { KafkaProducerMetadataCallBack metadataCallBack = new KafkaProducerMetadataCallBack( key, configuration.isRecordMetadata()); - DelegatingCallback delegatingCallback = new DelegatingCallback(cb, metadataCallBack); + // make sure to cb is last in the order here + DelegatingCallback delegatingCallback = new DelegatingCallback(metadataCallBack, cb); kafkaProducer.send(record, delegatingCallback); } else { diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java index 80a6c86703e..c316cf6fd67 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/producer/support/DelegatingCallback.java @@ -18,9 +18,12 @@ package org.apache.camel.component.kafka.producer.support; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class DelegatingCallback implements Callback { + private static final Logger LOG = LoggerFactory.getLogger(DelegatingCallback.class); private final Callback callback1; private final Callback callback2; @@ -31,7 +34,17 @@ public final class DelegatingCallback implements Callback { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { - callback1.onCompletion(metadata, exception); - callback2.onCompletion(metadata, exception); + try { + callback1.onCompletion(metadata, exception); + } catch (Exception e) { + // ensure every callback is invoked + LOG.warn("Error invoking 1st onCompletion. This exception is ignored.", e); + } + try { + callback2.onCompletion(metadata, exception); + } catch (Exception e) { + // ensure every callback is invoked + LOG.warn("Error invoking 2nd onCompletion. This exception is ignored.", e); + } } }
