codelipenghui commented on code in PR #22393:
URL: https://github.com/apache/pulsar/pull/22393#discussion_r1547722641
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -418,16 +419,14 @@ public void sendComplete(Exception e) {
stats.incrementNumAcksReceived(latencyNanos);
}
} finally {
- interceptorMessage.getDataBuffer().release();
+ ReferenceCountUtil.safeRelease(payloadInCurrentMsg);
Review Comment:
I tried a patch on my laptop. It looks like we also missed the stats and
metrics update in handing the batch messages.
```diff
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 4c106d39e7..c872a4c79c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -69,6 +69,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -92,18 +93,10 @@ import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.impl.ClientBuilderImpl;
-import org.apache.pulsar.client.impl.ConsumerBase;
-import org.apache.pulsar.client.impl.ConsumerImpl;
-import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
-import org.apache.pulsar.client.impl.PartitionedProducerImpl;
-import org.apache.pulsar.client.impl.ProducerImpl;
-import org.apache.pulsar.client.impl.TopicMessageImpl;
-import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.impl.*;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -4692,4 +4685,81 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
consumer.close();
admin.topics().delete(topic, false);
}
+
+ @DataProvider(name = "enableBatchSend")
+ public Object[][] enableBatchSend() {
+ return new Object[][]{
+ {true},
+ {false}
+ };
+ }
+
+ @Test(dataProvider = "enableBatchSend")
+ public void testPublishWithCreateMessageManually(boolean
enableBatchSend) throws Exception {
+ // Create an interceptor to verify the ref count of Message.payload
is as expected.
+ AtomicBoolean payloadWasReleasedWhenIntercept = new
AtomicBoolean(false);
+ ProducerInterceptor interceptor = new ProducerInterceptor(){
+
+ @Override
+ public void close() {
+
+ }
+ @Override
+ public Message beforeSend(Producer producer, Message message) {
+ MessageImpl msgImpl = (MessageImpl) message;
+ log.info("payload.refCnf before send: {}",
msgImpl.getDataBuffer().refCnt());
+ if (msgImpl.getDataBuffer().refCnt() < 1) {
+ payloadWasReleasedWhenIntercept.set(true);
+ }
+ return message;
+ }
+
+ @Override
+ public void onSendAcknowledgement(Producer producer, Message
message, MessageId msgId,
+ Throwable exception) {
+ MessageImpl msgImpl = (MessageImpl) message;
+ log.info("payload.refCnf on send acknowledgement: {}",
msgImpl.getDataBuffer().refCnt());
+ if (msgImpl.getDataBuffer().refCnt() < 1) {
+ payloadWasReleasedWhenIntercept.set(true);
+ }
+ }
+ };
+
+ final String topic =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp");
+ admin.topics().createNonPartitionedTopic(topic);
+ ProducerBase producerBase = (ProducerBase)
pulsarClient.newProducer().topic(topic).intercept(interceptor)
+ .enableBatching(enableBatchSend).create();
+ final int messages = 10;
+ ByteBuf[] payloads = new ByteBuf[messages];
+ for (int i = 0; i < messages; i++) {
+ ByteBuf payload = PulsarByteBufAllocator.DEFAULT.heapBuffer(1);
+ payload.writeByte(1);
+ payloads[i] = payload;
+ log.info("payload.refCnf[{}] 1st: {}", i, payload.refCnt());
+ payload.retain();
+ log.info("payload.refCnf[{}] 2nd: {}", i, payload.refCnt());
+ }
+ MessageMetadata messageMetadata = new MessageMetadata();
+ messageMetadata.setUncompressedSize(1);
+
+ // Publish message.
+ // Note: "ProducerBase.sendAsync" is not equals to
"Producer.sendAsync".
+ for (int i = 0; i < messages; i++) {
+ MessageImpl<byte[]> message = MessageImpl.create(topic, null,
messageMetadata, payloads[i], Optional.empty(),
+ null, Schema.BYTES, 0, true, 0);
+ CompletableFuture<MessageId> res =
producerBase.sendAsync(message).thenAccept(ignore_ -> message.release());
+ if (i == messages - 1) {
+ res.join();
+ }
+ }
+
+ // Assert payload's refCnf.
+ for (int i = 0; i < messages; i++) {
+ log.info("payload.refCnf[{}] 3rd: {}", i, payloads[i].refCnt());
+ assertEquals(payloads[i].refCnt(), 1);
+ payloads[i].release();
+ }
+ producerBase.close();
+ admin.topics().delete(topic, false);
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index dbd3aae426..c8cd7dd143 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -400,48 +400,39 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
@Override
public void sendComplete(Exception e) {
+ onSendComplete(interceptorMessage, future, e);
+ while (nextCallback != null) {
+ SendCallback sendCallback = nextCallback;
+ MessageImpl<?> msg = nextMsg;
+ onSendComplete(msg, sendCallback.getFuture(), e);
+ nextMsg = nextCallback.getNextMessage();
+ nextCallback = nextCallback.getNextSendCallback();
+ }
+ }
+
+ private void onSendComplete(MessageImpl<?> msg,
CompletableFuture<MessageId> callback, Exception e) {
long latencyNanos = System.nanoTime() - createdAt;
pendingMessagesUpDownCounter.decrement();
pendingBytesUpDownCounter.subtract(msgSize);
-
- try {
- if (e != null) {
- latencyHistogram.recordFailure(latencyNanos);
- stats.incrementSendFailed();
- onSendAcknowledgement(interceptorMessage, null, e);
- future.completeExceptionally(e);
- } else {
- latencyHistogram.recordSuccess(latencyNanos);
- publishedBytesCounter.add(msgSize);
- onSendAcknowledgement(interceptorMessage,
interceptorMessage.getMessageId(), null);
- future.complete(interceptorMessage.getMessageId());
- stats.incrementNumAcksReceived(latencyNanos);
+ if (e != null) {
+ latencyHistogram.recordFailure(latencyNanos);
+ stats.incrementSendFailed();
+ try {
+ onSendAcknowledgement(msg, null, e);
+ } finally {
+ msg.getDataBuffer().release();
}
- } finally {
- interceptorMessage.getDataBuffer().release();
- }
-
- while (nextCallback != null) {
- SendCallback sendCallback = nextCallback;
- MessageImpl<?> msg = nextMsg;
- // Retain the buffer used by interceptors callback to
get message. Buffer will release after
- // complete interceptors.
+ callback.completeExceptionally(e);
+ } else {
+ latencyHistogram.recordSuccess(latencyNanos);
+ publishedBytesCounter.add(msgSize);
+ stats.incrementNumAcksReceived(latencyNanos);
try {
- msg.getDataBuffer().retain();
- if (e != null) {
- stats.incrementSendFailed();
- onSendAcknowledgement(msg, null, e);
-
sendCallback.getFuture().completeExceptionally(e);
- } else {
- onSendAcknowledgement(msg, msg.getMessageId(),
null);
-
sendCallback.getFuture().complete(msg.getMessageId());
-
stats.incrementNumAcksReceived(System.nanoTime() - createdAt);
- }
- nextMsg = nextCallback.getNextMessage();
- nextCallback = nextCallback.getNextSendCallback();
+ onSendAcknowledgement(msg, msg.getMessageId(),
null);
} finally {
msg.getDataBuffer().release();
}
+ callback.complete(msg.getMessageId());
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]