This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e13865cec7f [fix] [client] fix memory leak if enabled pooled messages 
(#19585)
e13865cec7f is described below

commit e13865cec7f2bc5fb8cbae49b92cfef2e93c515a
Author: fengyubiao <[email protected]>
AuthorDate: Sun Mar 5 10:08:30 2023 +0800

    [fix] [client] fix memory leak if enabled pooled messages (#19585)
---
 .../client/impl/BrokerClientIntegrationTest.java   |  7 ++++-
 .../apache/pulsar/client/impl/ConsumerBase.java    |  3 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 12 ++++++++
 .../collections/GrowableArrayBlockingQueue.java    | 33 ++++++++++++++++++++++
 4 files changed, 52 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index c29978c5f52..716dd1019f4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -947,9 +947,14 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
         ByteBuf payload = ((MessageImpl) msg).getPayload();
         assertNotEquals(payload.refCnt(), 0);
         consumer.redeliverUnacknowledgedMessages();
-        assertEquals(payload.refCnt(), 0);
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(consumer.incomingMessages.size() >= 100);
+        });
         consumer.close();
         producer.close();
+        admin.topics().delete(topic, false);
+        assertEquals(consumer.incomingMessages.size(), 0);
+        assertEquals(payload.refCnt(), 0);
     }
 
     /**
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 6988adcccf1..75d3b2edf6e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -29,7 +29,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
@@ -82,7 +81,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     protected final ExecutorService externalPinnedExecutor;
     protected final ExecutorService internalPinnedExecutor;
     protected UnAckedMessageTracker unAckedMessageTracker;
-    final BlockingQueue<Message<T>> incomingMessages;
+    final GrowableArrayBlockingQueue<Message<T>> incomingMessages;
     protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]> 
unAckedChunkedMessageIdSequenceMap;
     protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> 
pendingReceives;
     protected final int maxReceiverQueueSize;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 08a6bb15807..18abb5a52c4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1075,6 +1075,18 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
         negativeAcksTracker.close();
         stats.getStatTimeout().ifPresent(Timeout::cancel);
+        if (poolMessages) {
+            releasePooledMessagesAndStopAcceptNew();
+        }
+    }
+
+    /**
+     * If enabled pooled messages, we should release the messages after 
closing consumer and stop accept the new
+     * messages.
+     */
+    private void releasePooledMessagesAndStopAcceptNew() {
+        incomingMessages.terminate(message -> message.release());
+        clearIncomingMessages();
     }
 
     void activeConsumerChanged(boolean isActive) {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
index 7825f9a0562..467a455ed8b 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
@@ -32,6 +32,7 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.StampedLock;
 import java.util.function.Consumer;
+import javax.annotation.Nullable;
 
 /**
  * This implements a {@link BlockingQueue} backed by an array with no fixed 
capacity.
@@ -53,6 +54,10 @@ public class GrowableArrayBlockingQueue<T> extends 
AbstractQueue<T> implements B
             .newUpdater(GrowableArrayBlockingQueue.class, "size");
     private volatile int size = 0;
 
+    private volatile boolean terminated = false;
+
+    private volatile Consumer<T> itemAfterTerminatedHandler;
+
     public GrowableArrayBlockingQueue() {
         this(64);
     }
@@ -132,6 +137,13 @@ public class GrowableArrayBlockingQueue<T> extends 
AbstractQueue<T> implements B
         boolean wasEmpty = false;
 
         try {
+            if (terminated){
+                if (itemAfterTerminatedHandler != null) {
+                    itemAfterTerminatedHandler.accept(e);
+                }
+                return;
+            }
+
             if (SIZE_UPDATER.get(this) == data.length) {
                 expandArray();
             }
@@ -401,6 +413,27 @@ public class GrowableArrayBlockingQueue<T> extends 
AbstractQueue<T> implements B
         return sb.toString();
     }
 
+    /**
+     * Make the queue not accept new items. if there are still new data trying 
to enter the queue, it will be handed
+     * by {@param itemAfterTerminatedHandler}.
+     */
+    public void terminate(@Nullable Consumer<T> itemAfterTerminatedHandler) {
+        // After wait for the in-flight item enqueue, it means the operation 
of terminate is finished.
+        long stamp = tailLock.writeLock();
+        try {
+            terminated = true;
+            if (itemAfterTerminatedHandler != null) {
+                this.itemAfterTerminatedHandler = itemAfterTerminatedHandler;
+            }
+        } finally {
+            tailLock.unlockWrite(stamp);
+        }
+    }
+
+    public boolean isTerminated() {
+        return terminated;
+    }
+
     @SuppressWarnings("unchecked")
     private void expandArray() {
         // We already hold the tailLock

Reply via email to