This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 3e1eb0d136f [fix][client] Fix client side memory leak when call 
MessageImpl.create and fix imprecise client-side metrics: 
pendingMessagesUpDownCounter, pendingBytesUpDownCounter, latencyHistogram 
(#22393)
3e1eb0d136f is described below

commit 3e1eb0d136f4500b7cfa1f74ea3836c7116ef82e
Author: fengyubiao <[email protected]>
AuthorDate: Sun Apr 7 09:28:04 2024 +0800

    [fix][client] Fix client side memory leak when call MessageImpl.create and 
fix imprecise client-side metrics: pendingMessagesUpDownCounter, 
pendingBytesUpDownCounter, latencyHistogram (#22393)
    
    (cherry picked from commit 2469b97b7e4de10fec64cc7ff1f4f46a410ad125)
---
 .../client/api/SimpleProducerConsumerTest.java     | 144 +++++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    | 127 +++++++++---------
 2 files changed, 211 insertions(+), 60 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 4c106d39e7a..7552b84a1c5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -51,6 +51,7 @@ import java.time.Clock;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -69,6 +70,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -92,6 +94,7 @@ import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.ConsumerBase;
 import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -99,11 +102,13 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.PartitionedProducerImpl;
+import org.apache.pulsar.client.impl.ProducerBase;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
 import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -4692,4 +4697,143 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         consumer.close();
         admin.topics().delete(topic, false);
     }
+
+    @DataProvider(name = "enableBatchSend")
+    public Object[][] enableBatchSend() {
+        return new Object[][]{
+                {true},
+                {false}
+        };
+    }
+
+    @Test(dataProvider = "enableBatchSend")
+    public void testPublishWithCreateMessageManually(boolean enableBatchSend) 
throws Exception {
+        final int messageCount = 10;
+        final List<MessageImpl> messageArrayBeforeSend = 
Collections.synchronizedList(new ArrayList<>());
+        final List<MessageImpl> messageArrayOnSendAcknowledgement = 
Collections.synchronizedList(new ArrayList<>());
+        // Create an interceptor to verify the ref count of Message.payload is 
as expected.
+        AtomicBoolean payloadWasReleasedWhenIntercept = new 
AtomicBoolean(false);
+        ProducerInterceptor interceptor = new ProducerInterceptor(){
+
+            @Override
+            public void close() {
+
+            }
+            @Override
+            public Message beforeSend(Producer producer, Message message) {
+                MessageImpl msgImpl = (MessageImpl) message;
+                log.info("payload.refCnf before send: {}", 
msgImpl.getDataBuffer().refCnt());
+                if (msgImpl.getDataBuffer().refCnt() < 1) {
+                    payloadWasReleasedWhenIntercept.set(true);
+                }
+                messageArrayBeforeSend.add(msgImpl);
+                return message;
+            }
+
+            @Override
+            public void onSendAcknowledgement(Producer producer, Message 
message, MessageId msgId,
+                                              Throwable exception) {
+                MessageImpl msgImpl = (MessageImpl) message;
+                log.info("payload.refCnf on send acknowledgement: {}", 
msgImpl.getDataBuffer().refCnt());
+                if (msgImpl.getDataBuffer().refCnt() < 1) {
+                    payloadWasReleasedWhenIntercept.set(true);
+                }
+                messageArrayOnSendAcknowledgement.add(msgImpl);
+            }
+        };
+
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp");
+        admin.topics().createNonPartitionedTopic(topic);
+        ProducerBase producerBase = (ProducerBase) 
pulsarClient.newProducer().topic(topic).intercept(interceptor)
+                .enableBatching(enableBatchSend).create();
+
+        // Publish message.
+        // Note: "ProducerBase.sendAsync" is not equals to 
"Producer.sendAsync".
+        final MessageImpl[] messageArraySent = new MessageImpl[messageCount];
+        final ByteBuf[] payloads = new ByteBuf[messageCount];
+        List<CompletableFuture<MessageId>> sendFutureList = new ArrayList<>();
+        List<CompletableFuture> releaseFutureList = new ArrayList<>();
+        for (int i = 0; i < messageCount; i++) {
+            // Create message payload, refCnf = 1 now.
+            ByteBuf payload = PulsarByteBufAllocator.DEFAULT.heapBuffer(1);
+            payloads[i] = payload;
+            log.info("payload_{}.refCnf 1st: {}", i,  payload.refCnt());
+            payload.writeByte(i);
+            // refCnf = 2 now.
+            payload.retain();
+            log.info("payload_{}.refCnf 2nd: {}", i,  payload.refCnt());
+            MessageMetadata messageMetadata = new MessageMetadata();
+            messageMetadata.setUncompressedSize(1);
+            MessageImpl<byte[]> message1 = MessageImpl.create(topic, null, 
messageMetadata, payload, Optional.empty(),
+                    null, Schema.BYTES, 0, true, 0);
+            messageArraySent[i] = message1;
+            // Release ByteBuf the first time, refCnf = 1 now.
+            CompletableFuture<MessageId> future = 
producerBase.sendAsync(message1);
+            sendFutureList.add(future);
+            final int indexForLog = i;
+            future.whenComplete((v, ex) -> {
+                message1.release();
+                log.info("payload_{}.refCnf 3rd after_complete_refCnf: {}, ex: 
{}", indexForLog, payload.refCnt(),
+                        ex == null ? "null" : ex.getMessage());
+            });
+        }
+        sendFutureList.get(messageCount - 1).join();
+
+        // Left 2 seconds to wait the code in the finally-block, which is 
using to avoid this test to be flaky.
+        Thread.sleep(1000 * 2);
+
+        // Verify: payload's refCnf.
+        for (int i = 0; i < messageCount; i++) {
+            log.info("payload_{}.refCnf 4th: {}", i, payloads[i].refCnt());
+            assertEquals(payloads[i].refCnt(), 1);
+        }
+
+        // Verify: the messages has not been released when calling interceptor.
+        assertFalse(payloadWasReleasedWhenIntercept.get());
+
+        // Verify: the order of send complete event.
+        MessageIdImpl messageIdPreviousOne = null;
+        for (int i = 0; i < messageCount; i++) {
+            MessageIdImpl messageId = (MessageIdImpl) 
sendFutureList.get(i).get();
+            if (messageIdPreviousOne != null) {
+                assertTrue(compareMessageIds(messageIdPreviousOne, messageId) 
> 0);
+            }
+            messageIdPreviousOne = messageId;
+        }
+
+        // Verify: the order of interceptor events.
+        for (int i = 0; i < messageCount; i++) {
+            assertTrue(messageArraySent[i] == messageArrayBeforeSend.get(i));
+            assertTrue(messageArraySent[i] == 
messageArrayOnSendAcknowledgement.get(i));
+        }
+
+        // cleanup.
+        for (int i = 0; i < messageCount; i++) {
+            payloads[i].release();
+        }
+        producerBase.close();
+        admin.topics().delete(topic, false);
+    }
+
+    private int compareMessageIds(MessageIdImpl messageId1, MessageIdImpl 
messageId2) {
+        if (messageId2.getLedgerId() < messageId1.getLedgerId()) {
+            return -1;
+        }
+        if (messageId2.getLedgerId() > messageId1.getLedgerId()) {
+            return 1;
+        }
+        if (messageId2.getEntryId() < messageId1.getEntryId()) {
+            return -1;
+        }
+        if (messageId2.getEntryId() > messageId1.getEntryId()) {
+            return 1;
+        }
+        if (messageId2 instanceof BatchMessageIdImpl && messageId1 instanceof 
BatchMessageIdImpl) {
+            BatchMessageIdImpl batchMessageId1 = (BatchMessageIdImpl) 
messageId1;
+            BatchMessageIdImpl batchMessageId2 = (BatchMessageIdImpl) 
messageId2;
+            return batchMessageId2.getBatchIndex() - 
batchMessageId1.getBatchIndex();
+        } else {
+            return 0;
+        }
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 880185f7a97..04dfb01eeaa 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -337,73 +337,80 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         if (interceptors != null) {
             interceptorMessage.getProperties();
         }
-        sendAsync(interceptorMessage, new SendCallback() {
-            SendCallback nextCallback = null;
-            MessageImpl<?> nextMsg = null;
-            long createdAt = System.nanoTime();
 
-            @Override
-            public CompletableFuture<MessageId> getFuture() {
-                return future;
-            }
+        int msgSize = interceptorMessage.getDataBuffer().readableBytes();
+        sendAsync(interceptorMessage, new DefaultSendMessageCallback(future, 
interceptorMessage, msgSize));
+        return future;
+    }
 
-            @Override
-            public SendCallback getNextSendCallback() {
-                return nextCallback;
-            }
+    private class DefaultSendMessageCallback implements SendCallback {
 
-            @Override
-            public MessageImpl<?> getNextMessage() {
-                return nextMsg;
-            }
+        CompletableFuture<MessageId> sendFuture;
+        MessageImpl<?> currentMsg;
+        int msgSize;
+        long createdAt = System.nanoTime();
+        SendCallback nextCallback = null;
+        MessageImpl<?> nextMsg = null;
 
-            @Override
-            public void sendComplete(Exception e) {
-                try {
-                    if (e != null) {
-                        stats.incrementSendFailed();
-                        onSendAcknowledgement(interceptorMessage, null, e);
-                        future.completeExceptionally(e);
-                    } else {
-                        onSendAcknowledgement(interceptorMessage, 
interceptorMessage.getMessageId(), null);
-                        future.complete(interceptorMessage.getMessageId());
-                        stats.incrementNumAcksReceived(System.nanoTime() - 
createdAt);
-                    }
-                } finally {
-                    interceptorMessage.getDataBuffer().release();
-                }
+        DefaultSendMessageCallback(CompletableFuture<MessageId> sendFuture, 
MessageImpl<?> currentMsg, int msgSize) {
+            this.sendFuture = sendFuture;
+            this.currentMsg = currentMsg;
+            this.msgSize = msgSize;
+        }
 
-                while (nextCallback != null) {
-                    SendCallback sendCallback = nextCallback;
-                    MessageImpl<?> msg = nextMsg;
-                    // Retain the buffer used by interceptors callback to get 
message. Buffer will release after
-                    // complete interceptors.
-                    try {
-                        msg.getDataBuffer().retain();
-                        if (e != null) {
-                            stats.incrementSendFailed();
-                            onSendAcknowledgement(msg, null, e);
-                            sendCallback.getFuture().completeExceptionally(e);
-                        } else {
-                            onSendAcknowledgement(msg, msg.getMessageId(), 
null);
-                            
sendCallback.getFuture().complete(msg.getMessageId());
-                            stats.incrementNumAcksReceived(System.nanoTime() - 
createdAt);
-                        }
-                        nextMsg = nextCallback.getNextMessage();
-                        nextCallback = nextCallback.getNextSendCallback();
-                    } finally {
-                        msg.getDataBuffer().release();
-                    }
-                }
-            }
+        @Override
+        public CompletableFuture<MessageId> getFuture() {
+            return sendFuture;
+        }
 
-            @Override
-            public void addCallback(MessageImpl<?> msg, SendCallback scb) {
-                nextMsg = msg;
-                nextCallback = scb;
+        @Override
+        public SendCallback getNextSendCallback() {
+            return nextCallback;
+        }
+
+        @Override
+        public MessageImpl<?> getNextMessage() {
+            return nextMsg;
+        }
+
+        @Override
+        public void sendComplete(Exception e) {
+            SendCallback loopingCallback = this;
+            MessageImpl<?> loopingMsg = currentMsg;
+            while (loopingCallback != null) {
+                onSendComplete(e, loopingCallback, loopingMsg);
+                loopingMsg = loopingCallback.getNextMessage();
+                loopingCallback = loopingCallback.getNextSendCallback();
+            }
+        }
+
+        private void onSendComplete(Exception e, SendCallback sendCallback, 
MessageImpl<?> msg) {
+            long createdAt = (sendCallback instanceof 
ProducerImpl.DefaultSendMessageCallback)
+                    ? ((DefaultSendMessageCallback) sendCallback).createdAt : 
this.createdAt;
+            long latencyNanos = System.nanoTime() - createdAt;
+            ByteBuf payload = msg.getDataBuffer();
+            if (payload == null) {
+                log.error("[{}] [{}] Payload is null when calling 
onSendComplete, which is not expected.",
+                        topic, producerName);
+            } else {
+                ReferenceCountUtil.safeRelease(payload);
             }
-        });
-        return future;
+            if (e != null) {
+                stats.incrementSendFailed();
+                onSendAcknowledgement(msg, null, e);
+                sendCallback.getFuture().completeExceptionally(e);
+            } else {
+                stats.incrementNumAcksReceived(latencyNanos);
+                onSendAcknowledgement(msg, msg.getMessageId(), null);
+                sendCallback.getFuture().complete(msg.getMessageId());
+            }
+        }
+
+        @Override
+        public void addCallback(MessageImpl<?> msg, SendCallback scb) {
+            nextMsg = msg;
+            nextCallback = scb;
+        }
     }
 
     @Override

Reply via email to