codelipenghui commented on code in PR #22393:
URL: https://github.com/apache/pulsar/pull/22393#discussion_r1547722641


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -418,16 +419,14 @@ public void sendComplete(Exception e) {
                         stats.incrementNumAcksReceived(latencyNanos);
                     }
                 } finally {
-                    interceptorMessage.getDataBuffer().release();
+                    ReferenceCountUtil.safeRelease(payloadInCurrentMsg);

Review Comment:
   I tried a patch on my laptop. It looks like we also missed the stats and 
metrics update in handing the batch messages.
   
   ```diff
   diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
   index 4c106d39e7..c872a4c79c 100644
   --- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
   +++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
   @@ -69,6 +69,7 @@ import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
   +import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicLong;
    import java.util.concurrent.atomic.AtomicReference;
   @@ -92,18 +93,10 @@ import org.apache.pulsar.broker.service.ServerCnx;
    import org.apache.pulsar.broker.service.persistent.PersistentTopic;
    import org.apache.pulsar.client.admin.PulsarAdminException;
    import org.apache.pulsar.client.api.schema.GenericRecord;
   -import org.apache.pulsar.client.impl.ClientBuilderImpl;
   -import org.apache.pulsar.client.impl.ConsumerBase;
   -import org.apache.pulsar.client.impl.ConsumerImpl;
   -import org.apache.pulsar.client.impl.MessageIdImpl;
   -import org.apache.pulsar.client.impl.MessageImpl;
   -import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
   -import org.apache.pulsar.client.impl.PartitionedProducerImpl;
   -import org.apache.pulsar.client.impl.ProducerImpl;
   -import org.apache.pulsar.client.impl.TopicMessageImpl;
   -import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
   +import org.apache.pulsar.client.impl.*;
    import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
    import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
   +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
    import org.apache.pulsar.common.api.EncryptionContext;
    import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
    import org.apache.pulsar.common.api.proto.MessageMetadata;
   @@ -4692,4 +4685,81 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
            consumer.close();
            admin.topics().delete(topic, false);
        }
   +
   +    @DataProvider(name = "enableBatchSend")
   +    public Object[][] enableBatchSend() {
   +        return new Object[][]{
   +                {true},
   +                {false}
   +        };
   +    }
   +
   +    @Test(dataProvider = "enableBatchSend")
   +    public void testPublishWithCreateMessageManually(boolean 
enableBatchSend) throws Exception {
   +        // Create an interceptor to verify the ref count of Message.payload 
is as expected.
   +        AtomicBoolean payloadWasReleasedWhenIntercept = new 
AtomicBoolean(false);
   +        ProducerInterceptor interceptor = new ProducerInterceptor(){
   +
   +            @Override
   +            public void close() {
   +
   +            }
   +            @Override
   +            public Message beforeSend(Producer producer, Message message) {
   +                MessageImpl msgImpl = (MessageImpl) message;
   +                log.info("payload.refCnf before send: {}", 
msgImpl.getDataBuffer().refCnt());
   +                if (msgImpl.getDataBuffer().refCnt() < 1) {
   +                    payloadWasReleasedWhenIntercept.set(true);
   +                }
   +                return message;
   +            }
   +
   +            @Override
   +            public void onSendAcknowledgement(Producer producer, Message 
message, MessageId msgId,
   +                                              Throwable exception) {
   +                MessageImpl msgImpl = (MessageImpl) message;
   +                log.info("payload.refCnf on send acknowledgement: {}", 
msgImpl.getDataBuffer().refCnt());
   +                if (msgImpl.getDataBuffer().refCnt() < 1) {
   +                    payloadWasReleasedWhenIntercept.set(true);
   +                }
   +            }
   +        };
   +
   +        final String topic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp");
   +        admin.topics().createNonPartitionedTopic(topic);
   +        ProducerBase producerBase = (ProducerBase) 
pulsarClient.newProducer().topic(topic).intercept(interceptor)
   +                .enableBatching(enableBatchSend).create();
   +        final int messages = 10;
   +        ByteBuf[] payloads = new ByteBuf[messages];
   +        for (int i = 0; i < messages; i++) {
   +            ByteBuf payload = PulsarByteBufAllocator.DEFAULT.heapBuffer(1);
   +            payload.writeByte(1);
   +            payloads[i] = payload;
   +            log.info("payload.refCnf[{}] 1st: {}", i, payload.refCnt());
   +            payload.retain();
   +            log.info("payload.refCnf[{}] 2nd: {}", i, payload.refCnt());
   +        }
   +        MessageMetadata messageMetadata = new MessageMetadata();
   +        messageMetadata.setUncompressedSize(1);
   +
   +        // Publish message.
   +        // Note: "ProducerBase.sendAsync" is not equals to 
"Producer.sendAsync".
   +        for (int i = 0; i < messages; i++) {
   +            MessageImpl<byte[]> message = MessageImpl.create(topic, null, 
messageMetadata, payloads[i], Optional.empty(),
   +                    null, Schema.BYTES, 0, true, 0);
   +            CompletableFuture<MessageId> res = 
producerBase.sendAsync(message).thenAccept(ignore_ -> message.release());
   +            if (i == messages - 1) {
   +                res.join();
   +            }
   +        }
   +
   +        // Assert payload's refCnf.
   +        for (int i = 0; i < messages; i++) {
   +            log.info("payload.refCnf[{}] 3rd: {}", i, payloads[i].refCnt());
   +            assertEquals(payloads[i].refCnt(), 1);
   +            payloads[i].release();
   +        }
   +        producerBase.close();
   +        admin.topics().delete(topic, false);
   +    }
    }
   diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
   index dbd3aae426..c8cd7dd143 100644
   --- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
   +++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
   @@ -400,48 +400,39 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
    
                @Override
                public void sendComplete(Exception e) {
   +                onSendComplete(interceptorMessage, future, e);
   +                while (nextCallback != null) {
   +                    SendCallback sendCallback = nextCallback;
   +                    MessageImpl<?> msg = nextMsg;
   +                    onSendComplete(msg, sendCallback.getFuture(), e);
   +                    nextMsg = nextCallback.getNextMessage();
   +                    nextCallback = nextCallback.getNextSendCallback();
   +                }
   +            }
   +
   +            private void onSendComplete(MessageImpl<?> msg, 
CompletableFuture<MessageId> callback, Exception e) {
                    long latencyNanos = System.nanoTime() - createdAt;
                    pendingMessagesUpDownCounter.decrement();
                    pendingBytesUpDownCounter.subtract(msgSize);
   -
   -                try {
   -                    if (e != null) {
   -                        latencyHistogram.recordFailure(latencyNanos);
   -                        stats.incrementSendFailed();
   -                        onSendAcknowledgement(interceptorMessage, null, e);
   -                        future.completeExceptionally(e);
   -                    } else {
   -                        latencyHistogram.recordSuccess(latencyNanos);
   -                        publishedBytesCounter.add(msgSize);
   -                        onSendAcknowledgement(interceptorMessage, 
interceptorMessage.getMessageId(), null);
   -                        future.complete(interceptorMessage.getMessageId());
   -                        stats.incrementNumAcksReceived(latencyNanos);
   +                if (e != null) {
   +                    latencyHistogram.recordFailure(latencyNanos);
   +                    stats.incrementSendFailed();
   +                    try {
   +                        onSendAcknowledgement(msg, null, e);
   +                    } finally {
   +                        msg.getDataBuffer().release();
                        }
   -                } finally {
   -                    interceptorMessage.getDataBuffer().release();
   -                }
   -
   -                while (nextCallback != null) {
   -                    SendCallback sendCallback = nextCallback;
   -                    MessageImpl<?> msg = nextMsg;
   -                    // Retain the buffer used by interceptors callback to 
get message. Buffer will release after
   -                    // complete interceptors.
   +                    callback.completeExceptionally(e);
   +                } else {
   +                    latencyHistogram.recordSuccess(latencyNanos);
   +                    publishedBytesCounter.add(msgSize);
   +                    stats.incrementNumAcksReceived(latencyNanos);
                        try {
   -                        msg.getDataBuffer().retain();
   -                        if (e != null) {
   -                            stats.incrementSendFailed();
   -                            onSendAcknowledgement(msg, null, e);
   -                            
sendCallback.getFuture().completeExceptionally(e);
   -                        } else {
   -                            onSendAcknowledgement(msg, msg.getMessageId(), 
null);
   -                            
sendCallback.getFuture().complete(msg.getMessageId());
   -                            
stats.incrementNumAcksReceived(System.nanoTime() - createdAt);
   -                        }
   -                        nextMsg = nextCallback.getNextMessage();
   -                        nextCallback = nextCallback.getNextSendCallback();
   +                        onSendAcknowledgement(msg, msg.getMessageId(), 
null);
                        } finally {
                            msg.getDataBuffer().release();
                        }
   +                    callback.complete(msg.getMessageId());
                    }
                }
    
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to