This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ac94296d248 [improve][broker] Follow up #4196 use 
`PulsarByteBufAllocator` handle OOM (#20837)
ac94296d248 is described below

commit ac94296d2488b2b13d76fa2b1bfa71e9e3cfbb45
Author: Qiang Zhao <[email protected]>
AuthorDate: Thu Jul 20 23:20:44 2023 +0800

    [improve][broker] Follow up #4196 use `PulsarByteBufAllocator` handle OOM 
(#20837)
---
 .../main/java/org/apache/pulsar/common/protocol/Markers.java | 12 ++++++------
 .../filesystem/impl/FileStoreBackedReadHandleImpl.java       |  4 ++--
 2 files changed, 8 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
index 50b036f99ee..2291aee781f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
@@ -19,13 +19,13 @@
 package org.apache.pulsar.common.protocol;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.util.concurrent.FastThreadLocal;
 import java.io.IOException;
 import java.util.Map;
 import java.util.Optional;
 import lombok.SneakyThrows;
 import lombok.experimental.UtilityClass;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.MarkerType;
 import org.apache.pulsar.common.api.proto.MarkersMessageIdData;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -109,7 +109,7 @@ public class Markers {
                 .clear()
                 .setSnapshotId(snapshotId)
                 .setSourceCluster(sourceCluster);
-        ByteBuf payload = 
PooledByteBufAllocator.DEFAULT.buffer(req.getSerializedSize());
+        ByteBuf payload = 
PulsarByteBufAllocator.DEFAULT.buffer(req.getSerializedSize());
 
         try {
             req.writeTo(payload);
@@ -138,7 +138,7 @@ public class Markers {
                 .setLedgerId(ledgerId)
                 .setEntryId(entryId);
 
-        ByteBuf payload = 
PooledByteBufAllocator.DEFAULT.buffer(response.getSerializedSize());
+        ByteBuf payload = 
PulsarByteBufAllocator.DEFAULT.buffer(response.getSerializedSize());
         try {
             response.writeTo(payload);
             return 
newMessage(MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE, 
Optional.of(replyToCluster),
@@ -172,7 +172,7 @@ public class Markers {
         });
 
         int size = snapshot.getSerializedSize();
-        ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(size);
+        ByteBuf payload = PulsarByteBufAllocator.DEFAULT.buffer(size);
         try {
             snapshot.writeTo(payload);
             return newMessage(MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT, 
Optional.of(sourceCluster), payload);
@@ -201,7 +201,7 @@ public class Markers {
                     .setMessageId().copyFrom(msgId);
         });
 
-        ByteBuf payload = 
PooledByteBufAllocator.DEFAULT.buffer(update.getSerializedSize());
+        ByteBuf payload = 
PulsarByteBufAllocator.DEFAULT.buffer(update.getSerializedSize());
 
         try {
             update.writeTo(payload);
@@ -258,7 +258,7 @@ public class Markers {
                 .setTxnidMostBits(txnMostBits)
                 .setTxnidLeastBits(txnLeastBits);
 
-        ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(0);
+        ByteBuf payload = PulsarByteBufAllocator.DEFAULT.buffer(0);
 
         try {
             return Commands.serializeMetadataAndPayload(ChecksumType.Crc32c,
diff --git 
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
 
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
index 506fbb8de68..49b2071f5db 100644
--- 
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
+++ 
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
@@ -20,7 +20,6 @@ package org.apache.bookkeeper.mledger.offload.filesystem.impl;
 
 import static 
org.apache.bookkeeper.mledger.offload.OffloadUtils.parseLedgerMetadata;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -40,6 +39,7 @@ import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapFile;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.naming.TopicName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -133,7 +133,7 @@ public class FileStoreBackedReadHandleImpl implements 
ReadHandle {
                     int length = value.getLength();
                     long entryId = key.get();
                     if (entryId == nextExpectedId) {
-                        ByteBuf buf = 
PooledByteBufAllocator.DEFAULT.buffer(length, length);
+                        ByteBuf buf = 
PulsarByteBufAllocator.DEFAULT.buffer(length, length);
                         entries.add(LedgerEntryImpl.create(ledgerId, entryId, 
length, buf));
                         buf.writeBytes(value.copyBytes());
                         entriesToRead--;

Reply via email to