This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ac94296d248 [improve][broker] Follow up #4196 use
`PulsarByteBufAllocator` handle OOM (#20837)
ac94296d248 is described below
commit ac94296d2488b2b13d76fa2b1bfa71e9e3cfbb45
Author: Qiang Zhao <[email protected]>
AuthorDate: Thu Jul 20 23:20:44 2023 +0800
[improve][broker] Follow up #4196 use `PulsarByteBufAllocator` handle OOM
(#20837)
---
.../main/java/org/apache/pulsar/common/protocol/Markers.java | 12 ++++++------
.../filesystem/impl/FileStoreBackedReadHandleImpl.java | 4 ++--
2 files changed, 8 insertions(+), 8 deletions(-)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
index 50b036f99ee..2291aee781f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
@@ -19,13 +19,13 @@
package org.apache.pulsar.common.protocol;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
import io.netty.util.concurrent.FastThreadLocal;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import lombok.SneakyThrows;
import lombok.experimental.UtilityClass;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.MarkerType;
import org.apache.pulsar.common.api.proto.MarkersMessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -109,7 +109,7 @@ public class Markers {
.clear()
.setSnapshotId(snapshotId)
.setSourceCluster(sourceCluster);
- ByteBuf payload =
PooledByteBufAllocator.DEFAULT.buffer(req.getSerializedSize());
+ ByteBuf payload =
PulsarByteBufAllocator.DEFAULT.buffer(req.getSerializedSize());
try {
req.writeTo(payload);
@@ -138,7 +138,7 @@ public class Markers {
.setLedgerId(ledgerId)
.setEntryId(entryId);
- ByteBuf payload =
PooledByteBufAllocator.DEFAULT.buffer(response.getSerializedSize());
+ ByteBuf payload =
PulsarByteBufAllocator.DEFAULT.buffer(response.getSerializedSize());
try {
response.writeTo(payload);
return
newMessage(MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE,
Optional.of(replyToCluster),
@@ -172,7 +172,7 @@ public class Markers {
});
int size = snapshot.getSerializedSize();
- ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(size);
+ ByteBuf payload = PulsarByteBufAllocator.DEFAULT.buffer(size);
try {
snapshot.writeTo(payload);
return newMessage(MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT,
Optional.of(sourceCluster), payload);
@@ -201,7 +201,7 @@ public class Markers {
.setMessageId().copyFrom(msgId);
});
- ByteBuf payload =
PooledByteBufAllocator.DEFAULT.buffer(update.getSerializedSize());
+ ByteBuf payload =
PulsarByteBufAllocator.DEFAULT.buffer(update.getSerializedSize());
try {
update.writeTo(payload);
@@ -258,7 +258,7 @@ public class Markers {
.setTxnidMostBits(txnMostBits)
.setTxnidLeastBits(txnLeastBits);
- ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(0);
+ ByteBuf payload = PulsarByteBufAllocator.DEFAULT.buffer(0);
try {
return Commands.serializeMetadataAndPayload(ChecksumType.Crc32c,
diff --git
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
index 506fbb8de68..49b2071f5db 100644
---
a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
+++
b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
@@ -20,7 +20,6 @@ package org.apache.bookkeeper.mledger.offload.filesystem.impl;
import static
org.apache.bookkeeper.mledger.offload.OffloadUtils.parseLedgerMetadata;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -40,6 +39,7 @@ import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -133,7 +133,7 @@ public class FileStoreBackedReadHandleImpl implements
ReadHandle {
int length = value.getLength();
long entryId = key.get();
if (entryId == nextExpectedId) {
- ByteBuf buf =
PooledByteBufAllocator.DEFAULT.buffer(length, length);
+ ByteBuf buf =
PulsarByteBufAllocator.DEFAULT.buffer(length, length);
entries.add(LedgerEntryImpl.create(ledgerId, entryId,
length, buf));
buf.writeBytes(value.copyBytes());
entriesToRead--;