This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 75df6c1b75 HDDS-9843. Ozone client high memory (heap) utilization
(#6153)
75df6c1b75 is described below
commit 75df6c1b752ccc5db642251673bdf0cc9029a2f9
Author: Duong Nguyen <[email protected]>
AuthorDate: Fri Feb 9 10:02:45 2024 -0800
HDDS-9843. Ozone client high memory (heap) utilization (#6153)
Co-authored-by: Tsz-Wo Nicholas Sze <[email protected]>
---
.../apache/hadoop/hdds/scm/storage/BufferPool.java | 1 +
.../apache/hadoop/hdds/utils/db/CodecBuffer.java | 9 +++++++-
.../apache/hadoop/ozone/common/ChunkBuffer.java | 10 +++++++--
.../common/ChunkBufferImplWithByteBuffer.java | 14 ++++++++++++
.../ozone/common/IncrementalChunkBuffer.java | 19 +++++++++++++---
.../hadoop/ozone/common/TestChunkBuffer.java | 23 +++++++++++++++++---
.../cli/container/upgrade/TestUpgradeManager.java | 25 ++++++++++++++++------
.../hadoop/hdds/scm/storage/TestCommitWatcher.java | 4 ++++
8 files changed, 90 insertions(+), 15 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
index 274b977ef6..b68b56f67c 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
@@ -111,6 +111,7 @@ public class BufferPool {
}
public void clearBufferPool() {
+ bufferList.forEach(ChunkBuffer::close);
bufferList.clear();
currentBufferIndex = -1;
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
index 64e494a5af..1ac293b301 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
@@ -28,6 +28,7 @@ import
org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.UncheckedAutoCloseable;
import org.apache.ratis.util.function.CheckedFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +51,7 @@ import static org.apache.hadoop.hdds.HddsUtils.getStackTrace;
* A buffer used by {@link Codec}
* for supporting RocksDB direct {@link ByteBuffer} APIs.
*/
-public class CodecBuffer implements AutoCloseable {
+public class CodecBuffer implements UncheckedAutoCloseable {
public static final Logger LOG = LoggerFactory.getLogger(CodecBuffer.class);
/** To create {@link CodecBuffer} instances. */
@@ -340,6 +341,12 @@ public class CodecBuffer implements AutoCloseable {
return buf.readableBytes();
}
+ /** @return a writable {@link ByteBuffer}. */
+ public ByteBuffer asWritableByteBuffer() {
+ assertRefCnt(1);
+ return buf.nioBuffer(0, buf.maxCapacity());
+ }
+
/** @return a readonly {@link ByteBuffer} view of this buffer. */
public ByteBuffer asReadOnlyByteBuffer() {
assertRefCnt(1);
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
index 3948b5f04f..058934c2f2 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
@@ -27,10 +27,12 @@ import java.util.function.Supplier;
import org.apache.hadoop.hdds.scm.ByteStringConversion;
+import org.apache.hadoop.hdds.utils.db.CodecBuffer;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.UncheckedAutoCloseable;
/** Buffer for a block chunk. */
-public interface ChunkBuffer {
+public interface ChunkBuffer extends UncheckedAutoCloseable {
/** Similar to {@link ByteBuffer#allocate(int)}. */
static ChunkBuffer allocate(int capacity) {
@@ -49,7 +51,8 @@ public interface ChunkBuffer {
if (increment > 0 && increment < capacity) {
return new IncrementalChunkBuffer(capacity, increment, false);
}
- return new ChunkBufferImplWithByteBuffer(ByteBuffer.allocate(capacity));
+ CodecBuffer codecBuffer = CodecBuffer.allocateDirect(capacity);
+ return new
ChunkBufferImplWithByteBuffer(codecBuffer.asWritableByteBuffer(), codecBuffer);
}
/** Wrap the given {@link ByteBuffer} as a {@link ChunkBuffer}. */
@@ -86,6 +89,9 @@ public interface ChunkBuffer {
/** Similar to {@link ByteBuffer#clear()}. */
ChunkBuffer clear();
+ default void close() {
+ }
+
/** Similar to {@link ByteBuffer#put(ByteBuffer)}. */
ChunkBuffer put(ByteBuffer b);
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
index 0cf49681cb..fe2ee5fa8a 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
@@ -28,13 +28,27 @@ import java.util.Objects;
import java.util.function.Function;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.UncheckedAutoCloseable;
/** {@link ChunkBuffer} implementation using a single {@link ByteBuffer}. */
final class ChunkBufferImplWithByteBuffer implements ChunkBuffer {
private final ByteBuffer buffer;
+ private final UncheckedAutoCloseable underlying;
ChunkBufferImplWithByteBuffer(ByteBuffer buffer) {
+ this(buffer, null);
+ }
+
+ ChunkBufferImplWithByteBuffer(ByteBuffer buffer, UncheckedAutoCloseable
underlying) {
this.buffer = Objects.requireNonNull(buffer, "buffer == null");
+ this.underlying = underlying;
+ }
+
+ @Override
+ public void close() {
+ if (underlying != null) {
+ underlying.close();
+ }
}
@Override
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
index 5a63c09f12..dda4fae0d2 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.common;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.utils.db.CodecBuffer;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import java.io.IOException;
@@ -47,6 +48,8 @@ final class IncrementalChunkBuffer implements ChunkBuffer {
private final int limitIndex;
/** Buffer list to be allocated incrementally. */
private final List<ByteBuffer> buffers;
+ /** The underlying buffers. */
+ private final List<CodecBuffer> underlying;
/** Is this a duplicated buffer? (for debug only) */
private final boolean isDuplicated;
/** The index of the first non-full buffer. */
@@ -58,11 +61,18 @@ final class IncrementalChunkBuffer implements ChunkBuffer {
this.limit = limit;
this.increment = increment;
this.limitIndex = limit / increment;
- this.buffers = new ArrayList<>(
- limitIndex + (limit % increment == 0 ? 0 : 1));
+ int size = limitIndex + (limit % increment == 0 ? 0 : 1);
+ this.buffers = new ArrayList<>(size);
+ this.underlying = isDuplicated ? Collections.emptyList() : new
ArrayList<>(size);
this.isDuplicated = isDuplicated;
}
+ @Override
+ public void close() {
+ underlying.forEach(CodecBuffer::release);
+ underlying.clear();
+ }
+
/** @return the capacity for the buffer at the given index. */
private int getBufferCapacityAtIndex(int i) {
Preconditions.checkArgument(i >= 0);
@@ -99,6 +109,7 @@ final class IncrementalChunkBuffer implements ChunkBuffer {
/** @return the i-th buffer. It may allocate buffers. */
private ByteBuffer getAndAllocateAtIndex(int index) {
+ Preconditions.checkState(!isDuplicated, "Duplicated buffer is readonly.");
Preconditions.checkArgument(index >= 0);
// never allocate over limit
if (limit % increment == 0) {
@@ -115,7 +126,9 @@ final class IncrementalChunkBuffer implements ChunkBuffer {
// allocate upto the given index
ByteBuffer b = null;
for (; i <= index; i++) {
- b = ByteBuffer.allocate(getBufferCapacityAtIndex(i));
+ final CodecBuffer c =
CodecBuffer.allocateDirect(getBufferCapacityAtIndex(i));
+ underlying.add(c);
+ b = c.asWritableByteBuffer();
buffers.add(b);
}
return b;
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java
index 3d6d38f3d3..b5212825e5 100644
---
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java
+++
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java
@@ -29,7 +29,11 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.hdds.utils.MockGatheringChannel;
+import org.apache.hadoop.hdds.utils.db.CodecBuffer;
+import org.apache.hadoop.hdds.utils.db.CodecTestUtil;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -46,6 +50,16 @@ public class TestChunkBuffer {
return ThreadLocalRandom.current().nextInt(n);
}
+ @BeforeAll
+ public static void beforeAll() {
+ CodecBuffer.enableLeakDetection();
+ }
+
+ @AfterEach
+ public void after() throws Exception {
+ CodecTestUtil.gc();
+ }
+
@Test
@Timeout(1)
void testImplWithByteBuffer() throws IOException {
@@ -59,7 +73,9 @@ public class TestChunkBuffer {
private static void runTestImplWithByteBuffer(int n) throws IOException {
final byte[] expected = new byte[n];
ThreadLocalRandom.current().nextBytes(expected);
- runTestImpl(expected, 0, ChunkBuffer.allocate(n));
+ try (ChunkBuffer c = ChunkBuffer.allocate(n)) {
+ runTestImpl(expected, 0, c);
+ }
}
@Test
@@ -78,8 +94,9 @@ public class TestChunkBuffer {
private static void runTestIncrementalChunkBuffer(int increment, int n)
throws IOException {
final byte[] expected = new byte[n];
ThreadLocalRandom.current().nextBytes(expected);
- runTestImpl(expected, increment,
- new IncrementalChunkBuffer(n, increment, false));
+ try (IncrementalChunkBuffer c = new IncrementalChunkBuffer(n, increment,
false)) {
+ runTestImpl(expected, increment, c);
+ }
}
@Test
diff --git
a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/upgrade/TestUpgradeManager.java
b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/upgrade/TestUpgradeManager.java
index 3be931c132..b3c15a46f7 100644
---
a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/upgrade/TestUpgradeManager.java
+++
b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/upgrade/TestUpgradeManager.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.utils.db.CodecBuffer;
+import org.apache.hadoop.hdds.utils.db.CodecTestUtil;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.Checksum;
@@ -44,6 +46,8 @@ import
org.apache.hadoop.ozone.container.keyvalue.impl.FilePerBlockStrategy;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
import
org.apache.hadoop.ozone.container.metadata.DatanodeSchemaThreeDBDefinition;
import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaThreeImpl;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -135,6 +139,16 @@ public class TestUpgradeManager {
chunkManager = new FilePerBlockStrategy(true, blockManager, null);
}
+ @BeforeAll
+ public static void beforeClass() {
+ CodecBuffer.enableLeakDetection();
+ }
+
+ @AfterEach
+ public void after() throws Exception {
+ CodecTestUtil.gc();
+ }
+
@Test
public void testUpgrade() throws IOException {
int num = 2;
@@ -187,7 +201,7 @@ public class TestUpgradeManager {
private void putChunksInBlock(int numOfChunksPerBlock, int i,
List<ContainerProtos.ChunkInfo> chunks,
KeyValueContainer container, BlockID blockID) {
- long chunkLength = 100;
+ final long chunkLength = 100;
try {
for (int k = 0; k < numOfChunksPerBlock; k++) {
final String chunkName = String.format("%d_chunk_%d_block_%d",
@@ -199,11 +213,10 @@ public class TestUpgradeManager {
.setChecksumData(Checksum.getNoChecksumDataProto()).build();
chunks.add(info);
ChunkInfo chunkInfo = new ChunkInfo(chunkName, offset, chunkLength);
- final ChunkBuffer chunkData = ChunkBuffer.allocate((int) chunkLength);
- chunkManager
- .writeChunk(container, blockID, chunkInfo, chunkData, WRITE_STAGE);
- chunkManager
- .writeChunk(container, blockID, chunkInfo, chunkData,
COMMIT_STAGE);
+ try (ChunkBuffer chunkData = ChunkBuffer.allocate((int) chunkLength)) {
+ chunkManager.writeChunk(container, blockID, chunkInfo, chunkData,
WRITE_STAGE);
+ chunkManager.writeChunk(container, blockID, chunkInfo, chunkData,
COMMIT_STAGE);
+ }
}
} catch (IOException ex) {
LOG.warn("Putting chunks in blocks was not successful for BlockID: "
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java
index 1363dc2269..2b13daaca2 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java
@@ -237,6 +237,8 @@ public class TestCommitWatcher {
assertThat(watcher.getFutureMap()).isEmpty();
assertThat(watcher.getCommitIndexMap()).isEmpty();
}
+ } finally {
+ bufferPool.clearBufferPool();
}
}
@@ -330,6 +332,8 @@ public class TestCommitWatcher {
assertThat(watcher.getCommitIndexMap()).isEmpty();
}
}
+ } finally {
+ bufferPool.clearBufferPool();
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]