This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 66f634e80d5 [fix][client] Avoid allocating unused buffer when 
receiving chunk message (#18511)
66f634e80d5 is described below

commit 66f634e80d5111625e7b12abb7511befd5495ba5
Author: Zike Yang <[email protected]>
AuthorDate: Fri Nov 18 16:28:40 2022 +0800

    [fix][client] Avoid allocating unused buffer when receiving chunk message 
(#18511)
    
    ### Motivation
    
    Currently, if there are duplicated messages whose chunk id is both 0, then 
it may result in allocating an unused buffer and may lead to the buffer memory 
leak.
    
    ### Modifications
    
    * Only allocate the bytebuffer when there is no duplicated chunk message 
with chunk id 0.
    
    Signed-off-by: Zike Yang <[email protected]>
---
 .../java/org/apache/pulsar/client/impl/ConsumerImpl.java  | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 49e8fff37ed..9e3a038eb1f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1431,20 +1431,21 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             );
         }
 
-        if (msgMetadata.getChunkId() == 0) {
-            ByteBuf chunkedMsgBuffer = 
PulsarByteBufAllocator.DEFAULT.buffer(msgMetadata.getTotalChunkMsgSize(),
-                    msgMetadata.getTotalChunkMsgSize());
-            int totalChunks = msgMetadata.getNumChunksFromMsg();
-            chunkedMessagesMap.computeIfAbsent(msgMetadata.getUuid(),
-                    (key) -> ChunkedMessageCtx.get(totalChunks, 
chunkedMsgBuffer));
+        ChunkedMessageCtx chunkedMsgCtx = 
chunkedMessagesMap.get(msgMetadata.getUuid());
+
+        if (msgMetadata.getChunkId() == 0 && chunkedMsgCtx == null) {
             pendingChunkedMessageCount++;
             if (maxPendingChunkedMessage > 0 && pendingChunkedMessageCount > 
maxPendingChunkedMessage) {
                 removeOldestPendingChunkedMessage();
             }
+            int totalChunks = msgMetadata.getNumChunksFromMsg();
+            ByteBuf chunkedMsgBuffer = 
PulsarByteBufAllocator.DEFAULT.buffer(msgMetadata.getTotalChunkMsgSize(),
+                    msgMetadata.getTotalChunkMsgSize());
+            chunkedMsgCtx = 
chunkedMessagesMap.computeIfAbsent(msgMetadata.getUuid(),
+                    (key) -> ChunkedMessageCtx.get(totalChunks, 
chunkedMsgBuffer));
             pendingChunkedMessageUuidQueue.add(msgMetadata.getUuid());
         }
 
-        ChunkedMessageCtx chunkedMsgCtx = 
chunkedMessagesMap.get(msgMetadata.getUuid());
         // discard message if chunk is out-of-order
         if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
                 || msgMetadata.getChunkId() != 
(chunkedMsgCtx.lastChunkedMessageId + 1)) {

Reply via email to