This is an automated email from the ASF dual-hosted git repository.

daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fdfe00a3c27 [fix][client] Cannot access message data inside 
ProducerInterceptor#onSendAcknowledgement (#23791)
fdfe00a3c27 is described below

commit fdfe00a3c270cc7bf630224957d9fa86958d9e6e
Author: Yike Xiao <[email protected]>
AuthorDate: Tue Dec 31 02:25:09 2024 +0800

    [fix][client] Cannot access message data inside 
ProducerInterceptor#onSendAcknowledgement (#23791)
---
 .../apache/pulsar/client/api/InterceptorsTest.java | 44 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    | 28 +++++++-------
 2 files changed, 59 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
index f71cdc55141..68d082adbb6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
@@ -21,10 +21,12 @@ package org.apache.pulsar.client.api;
 import com.google.common.collect.Sets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -216,6 +218,48 @@ public class InterceptorsTest extends ProducerConsumerBase 
{
         producer.close();
     }
 
+    @Test
+    public void testProducerInterceptorAccessMessageData() throws 
PulsarClientException {
+        List<String> messageDataInBeforeSend = 
Collections.synchronizedList(new ArrayList<>());
+        List<String> messageDataOnSendAcknowledgement = 
Collections.synchronizedList(new ArrayList<>());
+        ProducerInterceptor<String> interceptor = new ProducerInterceptor<>() {
+            @Override
+            public void close() {
+            }
+
+            @Override
+            public Message<String> beforeSend(Producer<String> producer, 
Message<String> message) {
+                messageDataInBeforeSend.add(new String(message.getData()));
+                return message;
+            }
+
+            @Override
+            public void onSendAcknowledgement(Producer<String> producer, 
Message<String> message, MessageId msgId,
+                                              Throwable exception) {
+                messageDataOnSendAcknowledgement.add(new 
String(message.getData()));
+            }
+        };
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic")
+                .intercept(interceptor)
+                .create();
+
+        final String messageValue = UUID.randomUUID().toString();
+        try {
+            producer.newMessage().value(messageValue).send();
+        } catch (Exception ignore) {
+        }
+        Assert.assertEquals(messageDataInBeforeSend.size(), 1,
+                "Message data should be available in beforeSend");
+        Assert.assertEquals(messageDataInBeforeSend.get(0), messageValue,
+                "Message data should be available in beforeSend");
+        Assert.assertEquals(messageDataOnSendAcknowledgement.size(), 1,
+                "Message data should be available in onSendAcknowledgement");
+        Assert.assertEquals(messageDataOnSendAcknowledgement.get(0), 
messageValue,
+                "Message data should be available in onSendAcknowledgement");
+    }
+
     @Test
     public void testConsumerInterceptorWithErrors() throws 
PulsarClientException {
         ConsumerInterceptor<String> interceptor = new 
ConsumerInterceptor<String>() {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 54d337925dc..64b706cc5fa 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -436,20 +436,22 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             if (payload == null) {
                 log.error("[{}] [{}] Payload is null when calling 
onSendComplete, which is not expected.",
                         topic, producerName);
-            } else {
-                ReferenceCountUtil.safeRelease(payload);
             }
-            if (e != null) {
-                latencyHistogram.recordFailure(latencyNanos);
-                stats.incrementSendFailed();
-                onSendAcknowledgement(msg, null, e);
-                sendCallback.getFuture().completeExceptionally(e);
-            } else {
-                latencyHistogram.recordSuccess(latencyNanos);
-                publishedBytesCounter.add(msgSize);
-                stats.incrementNumAcksReceived(latencyNanos);
-                onSendAcknowledgement(msg, msg.getMessageId(), null);
-                sendCallback.getFuture().complete(msg.getMessageId());
+            try {
+                if (e != null) {
+                    latencyHistogram.recordFailure(latencyNanos);
+                    stats.incrementSendFailed();
+                    onSendAcknowledgement(msg, null, e);
+                    sendCallback.getFuture().completeExceptionally(e);
+                } else {
+                    latencyHistogram.recordSuccess(latencyNanos);
+                    publishedBytesCounter.add(msgSize);
+                    stats.incrementNumAcksReceived(latencyNanos);
+                    onSendAcknowledgement(msg, msg.getMessageId(), null);
+                    sendCallback.getFuture().complete(msg.getMessageId());
+                }
+            } finally {
+                ReferenceCountUtil.safeRelease(payload);
             }
         }
 

Reply via email to