This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new aa4fef22a9c [fix][broker] Fix direct memory leak by delayed index
OutOfDirectMemory (#20823)
aa4fef22a9c is described below
commit aa4fef22a9cae56e09a9d695b81fc85aa595d96b
Author: Qiang Zhao <[email protected]>
AuthorDate: Thu Jul 20 21:07:43 2023 +0800
[fix][broker] Fix direct memory leak by delayed index OutOfDirectMemory
(#20823)
---
.../pulsar/common/util/collections/SegmentedLongArray.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/SegmentedLongArray.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/SegmentedLongArray.java
index 0b3520983b2..c551895c51a 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/SegmentedLongArray.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/SegmentedLongArray.java
@@ -19,11 +19,11 @@
package org.apache.pulsar.common.util.collections;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.concurrent.NotThreadSafe;
import lombok.Getter;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@NotThreadSafe
public class SegmentedLongArray implements AutoCloseable {
@@ -44,14 +44,14 @@ public class SegmentedLongArray implements AutoCloseable {
// Add first segment
int sizeToAdd = (int) Math.min(remainingToAdd, MAX_SEGMENT_SIZE);
- ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(sizeToAdd
* SIZE_OF_LONG);
+ ByteBuf buffer = PulsarByteBufAllocator.DEFAULT.directBuffer(sizeToAdd
* SIZE_OF_LONG);
buffer.writerIndex(sizeToAdd * SIZE_OF_LONG);
buffers.add(buffer);
remainingToAdd -= sizeToAdd;
// Add the remaining segments, all at full segment size, if necessary
while (remainingToAdd > 0) {
- buffer =
PooledByteBufAllocator.DEFAULT.directBuffer(MAX_SEGMENT_SIZE * SIZE_OF_LONG);
+ buffer =
PulsarByteBufAllocator.DEFAULT.directBuffer(MAX_SEGMENT_SIZE * SIZE_OF_LONG);
buffer.writerIndex(MAX_SEGMENT_SIZE * SIZE_OF_LONG);
buffers.add(buffer);
remainingToAdd -= MAX_SEGMENT_SIZE;
@@ -83,7 +83,7 @@ public class SegmentedLongArray implements AutoCloseable {
} else {
// Let's add 1 mode buffer to the list
int bufferSize = MAX_SEGMENT_SIZE * SIZE_OF_LONG;
- ByteBuf buffer =
PooledByteBufAllocator.DEFAULT.directBuffer(bufferSize, bufferSize);
+ ByteBuf buffer =
PulsarByteBufAllocator.DEFAULT.directBuffer(bufferSize, bufferSize);
buffer.writerIndex(bufferSize);
buffers.add(buffer);
capacity += MAX_SEGMENT_SIZE;
@@ -107,7 +107,7 @@ public class SegmentedLongArray implements AutoCloseable {
// We should also reduce the capacity of the first buffer
capacity -= sizeToReduce;
ByteBuf oldBuffer = buffers.get(0);
- ByteBuf newBuffer =
PooledByteBufAllocator.DEFAULT.directBuffer((int) capacity * SIZE_OF_LONG);
+ ByteBuf newBuffer =
PulsarByteBufAllocator.DEFAULT.directBuffer((int) capacity * SIZE_OF_LONG);
oldBuffer.getBytes(0, newBuffer, (int) capacity * SIZE_OF_LONG);
oldBuffer.release();
buffers.set(0, newBuffer);