This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e13865cec7f [fix] [client] fix memory leak if enabled pooled messages
(#19585)
e13865cec7f is described below
commit e13865cec7f2bc5fb8cbae49b92cfef2e93c515a
Author: fengyubiao <[email protected]>
AuthorDate: Sun Mar 5 10:08:30 2023 +0800
[fix] [client] fix memory leak if enabled pooled messages (#19585)
---
.../client/impl/BrokerClientIntegrationTest.java | 7 ++++-
.../apache/pulsar/client/impl/ConsumerBase.java | 3 +-
.../apache/pulsar/client/impl/ConsumerImpl.java | 12 ++++++++
.../collections/GrowableArrayBlockingQueue.java | 33 ++++++++++++++++++++++
4 files changed, 52 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index c29978c5f52..716dd1019f4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -947,9 +947,14 @@ public class BrokerClientIntegrationTest extends
ProducerConsumerBase {
ByteBuf payload = ((MessageImpl) msg).getPayload();
assertNotEquals(payload.refCnt(), 0);
consumer.redeliverUnacknowledgedMessages();
- assertEquals(payload.refCnt(), 0);
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(consumer.incomingMessages.size() >= 100);
+ });
consumer.close();
producer.close();
+ admin.topics().delete(topic, false);
+ assertEquals(consumer.incomingMessages.size(), 0);
+ assertEquals(payload.refCnt(), 0);
}
/**
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 6988adcccf1..75d3b2edf6e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -29,7 +29,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
@@ -82,7 +81,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
protected final ExecutorService externalPinnedExecutor;
protected final ExecutorService internalPinnedExecutor;
protected UnAckedMessageTracker unAckedMessageTracker;
- final BlockingQueue<Message<T>> incomingMessages;
+ final GrowableArrayBlockingQueue<Message<T>> incomingMessages;
protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]>
unAckedChunkedMessageIdSequenceMap;
protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>>
pendingReceives;
protected final int maxReceiverQueueSize;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 08a6bb15807..18abb5a52c4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1075,6 +1075,18 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
negativeAcksTracker.close();
stats.getStatTimeout().ifPresent(Timeout::cancel);
+ if (poolMessages) {
+ releasePooledMessagesAndStopAcceptNew();
+ }
+ }
+
+ /**
+ * If enabled pooled messages, we should release the messages after
closing consumer and stop accept the new
+ * messages.
+ */
+ private void releasePooledMessagesAndStopAcceptNew() {
+ incomingMessages.terminate(message -> message.release());
+ clearIncomingMessages();
}
void activeConsumerChanged(boolean isActive) {
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
index 7825f9a0562..467a455ed8b 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java
@@ -32,6 +32,7 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.StampedLock;
import java.util.function.Consumer;
+import javax.annotation.Nullable;
/**
* This implements a {@link BlockingQueue} backed by an array with no fixed
capacity.
@@ -53,6 +54,10 @@ public class GrowableArrayBlockingQueue<T> extends
AbstractQueue<T> implements B
.newUpdater(GrowableArrayBlockingQueue.class, "size");
private volatile int size = 0;
+ private volatile boolean terminated = false;
+
+ private volatile Consumer<T> itemAfterTerminatedHandler;
+
public GrowableArrayBlockingQueue() {
this(64);
}
@@ -132,6 +137,13 @@ public class GrowableArrayBlockingQueue<T> extends
AbstractQueue<T> implements B
boolean wasEmpty = false;
try {
+ if (terminated){
+ if (itemAfterTerminatedHandler != null) {
+ itemAfterTerminatedHandler.accept(e);
+ }
+ return;
+ }
+
if (SIZE_UPDATER.get(this) == data.length) {
expandArray();
}
@@ -401,6 +413,27 @@ public class GrowableArrayBlockingQueue<T> extends
AbstractQueue<T> implements B
return sb.toString();
}
+ /**
+ * Make the queue not accept new items. if there are still new data trying
to enter the queue, it will be handed
+ * by {@param itemAfterTerminatedHandler}.
+ */
+ public void terminate(@Nullable Consumer<T> itemAfterTerminatedHandler) {
+ // After wait for the in-flight item enqueue, it means the operation
of terminate is finished.
+ long stamp = tailLock.writeLock();
+ try {
+ terminated = true;
+ if (itemAfterTerminatedHandler != null) {
+ this.itemAfterTerminatedHandler = itemAfterTerminatedHandler;
+ }
+ } finally {
+ tailLock.unlockWrite(stamp);
+ }
+ }
+
+ public boolean isTerminated() {
+ return terminated;
+ }
+
@SuppressWarnings("unchecked")
private void expandArray() {
// We already hold the tailLock