This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 66f634e80d5 [fix][client] Avoid allocating unused buffer when
receiving chunk message (#18511)
66f634e80d5 is described below
commit 66f634e80d5111625e7b12abb7511befd5495ba5
Author: Zike Yang <[email protected]>
AuthorDate: Fri Nov 18 16:28:40 2022 +0800
[fix][client] Avoid allocating unused buffer when receiving chunk message
(#18511)
### Motivation
Currently, if there are duplicated messages whose chunk id is both 0, then
it may result in allocating an unused buffer and may lead to the buffer memory
leak.
### Modifications
* Only allocate the bytebuffer when there is no duplicated chunk message
with chunk id 0.
Signed-off-by: Zike Yang <[email protected]>
---
.../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 15 ++++++++-------
1 file changed, 8 insertions(+), 7 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 49e8fff37ed..9e3a038eb1f 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1431,20 +1431,21 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
);
}
- if (msgMetadata.getChunkId() == 0) {
- ByteBuf chunkedMsgBuffer =
PulsarByteBufAllocator.DEFAULT.buffer(msgMetadata.getTotalChunkMsgSize(),
- msgMetadata.getTotalChunkMsgSize());
- int totalChunks = msgMetadata.getNumChunksFromMsg();
- chunkedMessagesMap.computeIfAbsent(msgMetadata.getUuid(),
- (key) -> ChunkedMessageCtx.get(totalChunks,
chunkedMsgBuffer));
+ ChunkedMessageCtx chunkedMsgCtx =
chunkedMessagesMap.get(msgMetadata.getUuid());
+
+ if (msgMetadata.getChunkId() == 0 && chunkedMsgCtx == null) {
pendingChunkedMessageCount++;
if (maxPendingChunkedMessage > 0 && pendingChunkedMessageCount >
maxPendingChunkedMessage) {
removeOldestPendingChunkedMessage();
}
+ int totalChunks = msgMetadata.getNumChunksFromMsg();
+ ByteBuf chunkedMsgBuffer =
PulsarByteBufAllocator.DEFAULT.buffer(msgMetadata.getTotalChunkMsgSize(),
+ msgMetadata.getTotalChunkMsgSize());
+ chunkedMsgCtx =
chunkedMessagesMap.computeIfAbsent(msgMetadata.getUuid(),
+ (key) -> ChunkedMessageCtx.get(totalChunks,
chunkedMsgBuffer));
pendingChunkedMessageUuidQueue.add(msgMetadata.getUuid());
}
- ChunkedMessageCtx chunkedMsgCtx =
chunkedMessagesMap.get(msgMetadata.getUuid());
// discard message if chunk is out-of-order
if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
|| msgMetadata.getChunkId() !=
(chunkedMsgCtx.lastChunkedMessageId + 1)) {