This is an automated email from the ASF dual-hosted git repository.
daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fdfe00a3c27 [fix][client] Cannot access message data inside
ProducerInterceptor#onSendAcknowledgement (#23791)
fdfe00a3c27 is described below
commit fdfe00a3c270cc7bf630224957d9fa86958d9e6e
Author: Yike Xiao <[email protected]>
AuthorDate: Tue Dec 31 02:25:09 2024 +0800
[fix][client] Cannot access message data inside
ProducerInterceptor#onSendAcknowledgement (#23791)
---
.../apache/pulsar/client/api/InterceptorsTest.java | 44 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ProducerImpl.java | 28 +++++++-------
2 files changed, 59 insertions(+), 13 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
index f71cdc55141..68d082adbb6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
@@ -21,10 +21,12 @@ package org.apache.pulsar.client.api;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -216,6 +218,48 @@ public class InterceptorsTest extends ProducerConsumerBase
{
producer.close();
}
+ @Test
+ public void testProducerInterceptorAccessMessageData() throws
PulsarClientException {
+ List<String> messageDataInBeforeSend =
Collections.synchronizedList(new ArrayList<>());
+ List<String> messageDataOnSendAcknowledgement =
Collections.synchronizedList(new ArrayList<>());
+ ProducerInterceptor<String> interceptor = new ProducerInterceptor<>() {
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public Message<String> beforeSend(Producer<String> producer,
Message<String> message) {
+ messageDataInBeforeSend.add(new String(message.getData()));
+ return message;
+ }
+
+ @Override
+ public void onSendAcknowledgement(Producer<String> producer,
Message<String> message, MessageId msgId,
+ Throwable exception) {
+ messageDataOnSendAcknowledgement.add(new
String(message.getData()));
+ }
+ };
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic("persistent://my-property/my-ns/my-topic")
+ .intercept(interceptor)
+ .create();
+
+ final String messageValue = UUID.randomUUID().toString();
+ try {
+ producer.newMessage().value(messageValue).send();
+ } catch (Exception ignore) {
+ }
+ Assert.assertEquals(messageDataInBeforeSend.size(), 1,
+ "Message data should be available in beforeSend");
+ Assert.assertEquals(messageDataInBeforeSend.get(0), messageValue,
+ "Message data should be available in beforeSend");
+ Assert.assertEquals(messageDataOnSendAcknowledgement.size(), 1,
+ "Message data should be available in onSendAcknowledgement");
+ Assert.assertEquals(messageDataOnSendAcknowledgement.get(0),
messageValue,
+ "Message data should be available in onSendAcknowledgement");
+ }
+
@Test
public void testConsumerInterceptorWithErrors() throws
PulsarClientException {
ConsumerInterceptor<String> interceptor = new
ConsumerInterceptor<String>() {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 54d337925dc..64b706cc5fa 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -436,20 +436,22 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
if (payload == null) {
log.error("[{}] [{}] Payload is null when calling
onSendComplete, which is not expected.",
topic, producerName);
- } else {
- ReferenceCountUtil.safeRelease(payload);
}
- if (e != null) {
- latencyHistogram.recordFailure(latencyNanos);
- stats.incrementSendFailed();
- onSendAcknowledgement(msg, null, e);
- sendCallback.getFuture().completeExceptionally(e);
- } else {
- latencyHistogram.recordSuccess(latencyNanos);
- publishedBytesCounter.add(msgSize);
- stats.incrementNumAcksReceived(latencyNanos);
- onSendAcknowledgement(msg, msg.getMessageId(), null);
- sendCallback.getFuture().complete(msg.getMessageId());
+ try {
+ if (e != null) {
+ latencyHistogram.recordFailure(latencyNanos);
+ stats.incrementSendFailed();
+ onSendAcknowledgement(msg, null, e);
+ sendCallback.getFuture().completeExceptionally(e);
+ } else {
+ latencyHistogram.recordSuccess(latencyNanos);
+ publishedBytesCounter.add(msgSize);
+ stats.incrementNumAcksReceived(latencyNanos);
+ onSendAcknowledgement(msg, msg.getMessageId(), null);
+ sendCallback.getFuture().complete(msg.getMessageId());
+ }
+ } finally {
+ ReferenceCountUtil.safeRelease(payload);
}
}