This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 46cacffd882 [fix][broker] Fix direct memory leak in RawReaderImpl 
(#18928)
46cacffd882 is described below

commit 46cacffd882259712271d6d01ba3c10d3355b112
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Dec 20 22:37:33 2022 +0200

    [fix][broker] Fix direct memory leak in RawReaderImpl (#18928)
---
 .../apache/pulsar/client/impl/RawReaderImpl.java   | 60 +++++++++++++++++-----
 1 file changed, 47 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 255df22580b..9a1c972b2cc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -166,6 +166,32 @@ public class RawReaderImpl implements RawReader {
             }
         }
 
+        @Override
+        protected CompletableFuture<Void> failPendingReceive() {
+            if (internalPinnedExecutor.isShutdown()) {
+                failPendingRawReceives();
+                return CompletableFuture.completedFuture(null);
+            } else {
+                CompletableFuture<Void> future = new CompletableFuture<>();
+                internalPinnedExecutor.execute(() -> {
+                    try {
+                        failPendingRawReceives();
+                    } finally {
+                        future.complete(null);
+                    }
+                });
+                return future;
+            }
+        }
+
+        private void failPendingRawReceives() {
+            List<CompletableFuture<RawMessage>> toError = new ArrayList<>();
+            while (!pendingRawReceives.isEmpty()) {
+                toError.add(pendingRawReceives.remove());
+            }
+            toError.forEach((f) -> f.cancel(false));
+        }
+
         CompletableFuture<RawMessage> receiveRawAsync() {
             CompletableFuture<RawMessage> result = new CompletableFuture<>();
             pendingRawReceives.add(result);
@@ -174,19 +200,22 @@ public class RawReaderImpl implements RawReader {
         }
 
         private void reset() {
-            List<CompletableFuture<RawMessage>> toError = new ArrayList<>();
-            synchronized (this) {
-                while (!pendingRawReceives.isEmpty()) {
-                    toError.add(pendingRawReceives.remove());
-                }
-                RawMessageAndCnx m = incomingRawMessages.poll();
-                while (m != null) {
-                    m.msg.close();
-                    m = incomingRawMessages.poll();
-                }
-                incomingRawMessages.clear();
+            failPendingRawReceives();
+            clearIncomingRawMessages();
+        }
+
+        private void clearIncomingRawMessages() {
+            RawMessageAndCnx m = incomingRawMessages.poll();
+            while (m != null) {
+                m.msg.close();
+                m = incomingRawMessages.poll();
             }
-            toError.forEach((f) -> f.cancel(false));
+        }
+
+        @Override
+        protected void clearIncomingMessages() {
+            super.clearIncomingMessages();
+            clearIncomingRawMessages();
         }
 
         @Override
@@ -203,12 +232,17 @@ public class RawReaderImpl implements RawReader {
 
         @Override
         public CompletableFuture<Void> closeAsync() {
+            CompletableFuture<Void> closeFuture = super.closeAsync();
             reset();
-            return super.closeAsync();
+            return closeFuture;
         }
 
         @Override
         void messageReceived(CommandMessage commandMessage, ByteBuf 
headersAndPayload, ClientCnx cnx) {
+            State state = getState();
+            if (state == State.Closing || state == State.Closed) {
+                return;
+            }
             MessageIdData messageId = commandMessage.getMessageId();
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Received raw message: {}/{}/{}", topic, 
subscription,

Reply via email to