This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1d44ac69e4f063f56c3a5fd2bebe2ce62e025ac4 Author: Yike Xiao <[email protected]> AuthorDate: Tue Dec 31 02:25:09 2024 +0800 [fix][client] Cannot access message data inside ProducerInterceptor#onSendAcknowledgement (#23791) (cherry picked from commit fdfe00a3c270cc7bf630224957d9fa86958d9e6e) --- .../apache/pulsar/client/api/InterceptorsTest.java | 44 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ProducerImpl.java | 28 +++++++------- 2 files changed, 59 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java index f71cdc55141..68d082adbb6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java @@ -21,10 +21,12 @@ package org.apache.pulsar.client.api; import com.google.common.collect.Sets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -216,6 +218,48 @@ public class InterceptorsTest extends ProducerConsumerBase { producer.close(); } + @Test + public void testProducerInterceptorAccessMessageData() throws PulsarClientException { + List<String> messageDataInBeforeSend = Collections.synchronizedList(new ArrayList<>()); + List<String> messageDataOnSendAcknowledgement = Collections.synchronizedList(new ArrayList<>()); + ProducerInterceptor<String> interceptor = new ProducerInterceptor<>() { + @Override + public void close() { + } + + @Override + public Message<String> beforeSend(Producer<String> producer, Message<String> message) { + messageDataInBeforeSend.add(new String(message.getData())); + return message; + } + + @Override + public void onSendAcknowledgement(Producer<String> producer, Message<String> message, MessageId msgId, + Throwable exception) { + messageDataOnSendAcknowledgement.add(new String(message.getData())); + } + }; + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic("persistent://my-property/my-ns/my-topic") + .intercept(interceptor) + .create(); + + final String messageValue = UUID.randomUUID().toString(); + try { + producer.newMessage().value(messageValue).send(); + } catch (Exception ignore) { + } + Assert.assertEquals(messageDataInBeforeSend.size(), 1, + "Message data should be available in beforeSend"); + Assert.assertEquals(messageDataInBeforeSend.get(0), messageValue, + "Message data should be available in beforeSend"); + Assert.assertEquals(messageDataOnSendAcknowledgement.size(), 1, + "Message data should be available in onSendAcknowledgement"); + Assert.assertEquals(messageDataOnSendAcknowledgement.get(0), messageValue, + "Message data should be available in onSendAcknowledgement"); + } + @Test public void testConsumerInterceptorWithErrors() throws PulsarClientException { ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>() { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 54d337925dc..64b706cc5fa 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -436,20 +436,22 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne if (payload == null) { log.error("[{}] [{}] Payload is null when calling onSendComplete, which is not expected.", topic, producerName); - } else { - ReferenceCountUtil.safeRelease(payload); } - if (e != null) { - latencyHistogram.recordFailure(latencyNanos); - stats.incrementSendFailed(); - onSendAcknowledgement(msg, null, e); - sendCallback.getFuture().completeExceptionally(e); - } else { - latencyHistogram.recordSuccess(latencyNanos); - publishedBytesCounter.add(msgSize); - stats.incrementNumAcksReceived(latencyNanos); - onSendAcknowledgement(msg, msg.getMessageId(), null); - sendCallback.getFuture().complete(msg.getMessageId()); + try { + if (e != null) { + latencyHistogram.recordFailure(latencyNanos); + stats.incrementSendFailed(); + onSendAcknowledgement(msg, null, e); + sendCallback.getFuture().completeExceptionally(e); + } else { + latencyHistogram.recordSuccess(latencyNanos); + publishedBytesCounter.add(msgSize); + stats.incrementNumAcksReceived(latencyNanos); + onSendAcknowledgement(msg, msg.getMessageId(), null); + sendCallback.getFuture().complete(msg.getMessageId()); + } + } finally { + ReferenceCountUtil.safeRelease(payload); } }
