This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch 3.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.2 by this push: new e51c43c049 KAFKA-12841: Remove an additional call of onAcknowledgement (#12064) e51c43c049 is described below commit e51c43c049e8ef7d37ff25ed78dfe265938c086e Author: Philip Nee <philip...@gmail.com> AuthorDate: Mon Apr 25 15:59:45 2022 -0700 KAFKA-12841: Remove an additional call of onAcknowledgement (#12064) The bug was introduced in #11689 that an additional onAcknowledgement was made using the InterceptorCallback class. This is undesirable since onSendError will attempt to call onAcknowledgement once more. Reviewers: Jun Rao <jun...@gmail.com> --- .../java/org/apache/kafka/clients/producer/KafkaProducer.java | 9 ++++----- .../org/apache/kafka/clients/producer/KafkaProducerTest.java | 11 +++++++++-- .../java/org/apache/kafka/test/MockProducerInterceptor.java | 2 ++ 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index da749c2f12..4cc9c1521a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -1014,11 +1014,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> { tp = ProducerInterceptors.extractTopicPartition(record); } - Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); - - // The onCompletion callback does expect a non-null metadata, but one will be created inside - // the interceptor's onCompletion implementation before the user's callback is invoked. - interceptCallback.onCompletion(null, e); + if (callback != null) { + RecordMetadata nullMetadata = new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1); + callback.onCompletion(nullMetadata, e); + } this.errors.record(); this.interceptors.onSendError(record, tp, e); if (transactionManager != null) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index bc91340a7b..ce01620803 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -1901,10 +1901,13 @@ public class KafkaProducerTest { } @Test - public void testCallbackHandlesError() throws Exception { + public void testCallbackAndInterceptorHandleError() throws Exception { Map<String, Object> configs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000"); + configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MockProducerInterceptor.class.getName()); + configs.put(MockProducerInterceptor.APPEND_STRING_PROP, "something"); + Time time = new MockTime(); ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE); @@ -1912,8 +1915,11 @@ public class KafkaProducerTest { String invalidTopicName = "topic abc"; // Invalid topic name due to space + ProducerInterceptors<String, String> producerInterceptors = + new ProducerInterceptors<>(Arrays.asList(new MockProducerInterceptor())); + try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(), new StringSerializer(), - producerMetadata, client, null, time)) { + producerMetadata, client, producerInterceptors, time)) { ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka"); // Here's the important piece of the test. Let's make sure that the RecordMetadata we get @@ -1938,6 +1944,7 @@ public class KafkaProducerTest { }; producer.send(record, callBack); + assertEquals(1, MockProducerInterceptor.ON_ACKNOWLEDGEMENT_COUNT.intValue()); } } diff --git a/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java b/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java index 133ff567d4..eedc3bdaec 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java +++ b/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java @@ -35,6 +35,7 @@ public class MockProducerInterceptor implements ClusterResourceListener, Produce public static final AtomicInteger ON_SUCCESS_COUNT = new AtomicInteger(0); public static final AtomicInteger ON_ERROR_COUNT = new AtomicInteger(0); public static final AtomicInteger ON_ERROR_WITH_METADATA_COUNT = new AtomicInteger(0); + public static final AtomicInteger ON_ACKNOWLEDGEMENT_COUNT = new AtomicInteger(0); public static final AtomicReference<ClusterResource> CLUSTER_META = new AtomicReference<>(); public static final ClusterResource NO_CLUSTER_ID = new ClusterResource("no_cluster_id"); public static final AtomicReference<ClusterResource> CLUSTER_ID_BEFORE_ON_ACKNOWLEDGEMENT = new AtomicReference<>(NO_CLUSTER_ID); @@ -69,6 +70,7 @@ public class MockProducerInterceptor implements ClusterResourceListener, Produce @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { + ON_ACKNOWLEDGEMENT_COUNT.incrementAndGet(); // This will ensure that we get the cluster metadata when onAcknowledgement is called for the first time // as subsequent compareAndSet operations will fail. CLUSTER_ID_BEFORE_ON_ACKNOWLEDGEMENT.compareAndSet(NO_CLUSTER_ID, CLUSTER_META.get());