This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 75df6c1b75 HDDS-9843. Ozone client high memory (heap) utilization 
(#6153)
75df6c1b75 is described below

commit 75df6c1b752ccc5db642251673bdf0cc9029a2f9
Author: Duong Nguyen <[email protected]>
AuthorDate: Fri Feb 9 10:02:45 2024 -0800

    HDDS-9843. Ozone client high memory (heap) utilization (#6153)
    
    Co-authored-by: Tsz-Wo Nicholas Sze <[email protected]>
---
 .../apache/hadoop/hdds/scm/storage/BufferPool.java |  1 +
 .../apache/hadoop/hdds/utils/db/CodecBuffer.java   |  9 +++++++-
 .../apache/hadoop/ozone/common/ChunkBuffer.java    | 10 +++++++--
 .../common/ChunkBufferImplWithByteBuffer.java      | 14 ++++++++++++
 .../ozone/common/IncrementalChunkBuffer.java       | 19 +++++++++++++---
 .../hadoop/ozone/common/TestChunkBuffer.java       | 23 +++++++++++++++++---
 .../cli/container/upgrade/TestUpgradeManager.java  | 25 ++++++++++++++++------
 .../hadoop/hdds/scm/storage/TestCommitWatcher.java |  4 ++++
 8 files changed, 90 insertions(+), 15 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
index 274b977ef6..b68b56f67c 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
@@ -111,6 +111,7 @@ public class BufferPool {
   }
 
   public void clearBufferPool() {
+    bufferList.forEach(ChunkBuffer::close);
     bufferList.clear();
     currentBufferIndex = -1;
   }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
index 64e494a5af..1ac293b301 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
@@ -28,6 +28,7 @@ import 
org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
 import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.UncheckedAutoCloseable;
 import org.apache.ratis.util.function.CheckedFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,7 +51,7 @@ import static org.apache.hadoop.hdds.HddsUtils.getStackTrace;
  * A buffer used by {@link Codec}
  * for supporting RocksDB direct {@link ByteBuffer} APIs.
  */
-public class CodecBuffer implements AutoCloseable {
+public class CodecBuffer implements UncheckedAutoCloseable {
   public static final Logger LOG = LoggerFactory.getLogger(CodecBuffer.class);
 
   /** To create {@link CodecBuffer} instances. */
@@ -340,6 +341,12 @@ public class CodecBuffer implements AutoCloseable {
     return buf.readableBytes();
   }
 
+  /** @return a writable {@link ByteBuffer}. */
+  public ByteBuffer asWritableByteBuffer() {
+    assertRefCnt(1);
+    return buf.nioBuffer(0, buf.maxCapacity());
+  }
+
   /** @return a readonly {@link ByteBuffer} view of this buffer. */
   public ByteBuffer asReadOnlyByteBuffer() {
     assertRefCnt(1);
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
index 3948b5f04f..058934c2f2 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
@@ -27,10 +27,12 @@ import java.util.function.Supplier;
 
 import org.apache.hadoop.hdds.scm.ByteStringConversion;
 
+import org.apache.hadoop.hdds.utils.db.CodecBuffer;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.UncheckedAutoCloseable;
 
 /** Buffer for a block chunk. */
-public interface ChunkBuffer {
+public interface ChunkBuffer extends UncheckedAutoCloseable {
 
   /** Similar to {@link ByteBuffer#allocate(int)}. */
   static ChunkBuffer allocate(int capacity) {
@@ -49,7 +51,8 @@ public interface ChunkBuffer {
     if (increment > 0 && increment < capacity) {
       return new IncrementalChunkBuffer(capacity, increment, false);
     }
-    return new ChunkBufferImplWithByteBuffer(ByteBuffer.allocate(capacity));
+    CodecBuffer codecBuffer = CodecBuffer.allocateDirect(capacity);
+    return new 
ChunkBufferImplWithByteBuffer(codecBuffer.asWritableByteBuffer(), codecBuffer);
   }
 
   /** Wrap the given {@link ByteBuffer} as a {@link ChunkBuffer}. */
@@ -86,6 +89,9 @@ public interface ChunkBuffer {
   /** Similar to {@link ByteBuffer#clear()}. */
   ChunkBuffer clear();
 
+  default void close() {
+  }
+
   /** Similar to {@link ByteBuffer#put(ByteBuffer)}. */
   ChunkBuffer put(ByteBuffer b);
 
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
index 0cf49681cb..fe2ee5fa8a 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
@@ -28,13 +28,27 @@ import java.util.Objects;
 import java.util.function.Function;
 
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.UncheckedAutoCloseable;
 
 /** {@link ChunkBuffer} implementation using a single {@link ByteBuffer}. */
 final class ChunkBufferImplWithByteBuffer implements ChunkBuffer {
   private final ByteBuffer buffer;
+  private final UncheckedAutoCloseable underlying;
 
   ChunkBufferImplWithByteBuffer(ByteBuffer buffer) {
+    this(buffer, null);
+  }
+
+  ChunkBufferImplWithByteBuffer(ByteBuffer buffer, UncheckedAutoCloseable 
underlying) {
     this.buffer = Objects.requireNonNull(buffer, "buffer == null");
+    this.underlying = underlying;
+  }
+
+  @Override
+  public void close() {
+    if (underlying != null) {
+      underlying.close();
+    }
   }
 
   @Override
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
index 5a63c09f12..dda4fae0d2 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.common;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.utils.db.CodecBuffer;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 
 import java.io.IOException;
@@ -47,6 +48,8 @@ final class IncrementalChunkBuffer implements ChunkBuffer {
   private final int limitIndex;
   /** Buffer list to be allocated incrementally. */
   private final List<ByteBuffer> buffers;
+  /** The underlying buffers. */
+  private final List<CodecBuffer> underlying;
   /** Is this a duplicated buffer? (for debug only) */
   private final boolean isDuplicated;
   /** The index of the first non-full buffer. */
@@ -58,11 +61,18 @@ final class IncrementalChunkBuffer implements ChunkBuffer {
     this.limit = limit;
     this.increment = increment;
     this.limitIndex = limit / increment;
-    this.buffers = new ArrayList<>(
-        limitIndex + (limit % increment == 0 ? 0 : 1));
+    int size = limitIndex + (limit % increment == 0 ? 0 : 1);
+    this.buffers = new ArrayList<>(size);
+    this.underlying = isDuplicated ? Collections.emptyList() : new 
ArrayList<>(size);
     this.isDuplicated = isDuplicated;
   }
 
+  @Override
+  public void close() {
+    underlying.forEach(CodecBuffer::release);
+    underlying.clear();
+  }
+
   /** @return the capacity for the buffer at the given index. */
   private int getBufferCapacityAtIndex(int i) {
     Preconditions.checkArgument(i >= 0);
@@ -99,6 +109,7 @@ final class IncrementalChunkBuffer implements ChunkBuffer {
 
   /** @return the i-th buffer. It may allocate buffers. */
   private ByteBuffer getAndAllocateAtIndex(int index) {
+    Preconditions.checkState(!isDuplicated, "Duplicated buffer is readonly.");
     Preconditions.checkArgument(index >= 0);
     // never allocate over limit
     if (limit % increment == 0) {
@@ -115,7 +126,9 @@ final class IncrementalChunkBuffer implements ChunkBuffer {
     // allocate upto the given index
     ByteBuffer b = null;
     for (; i <= index; i++) {
-      b = ByteBuffer.allocate(getBufferCapacityAtIndex(i));
+      final CodecBuffer c = 
CodecBuffer.allocateDirect(getBufferCapacityAtIndex(i));
+      underlying.add(c);
+      b = c.asWritableByteBuffer();
       buffers.add(b);
     }
     return b;
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java
index 3d6d38f3d3..b5212825e5 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java
@@ -29,7 +29,11 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.hadoop.hdds.utils.MockGatheringChannel;
 
+import org.apache.hadoop.hdds.utils.db.CodecBuffer;
+import org.apache.hadoop.hdds.utils.db.CodecTestUtil;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
@@ -46,6 +50,16 @@ public class TestChunkBuffer {
     return ThreadLocalRandom.current().nextInt(n);
   }
 
+  @BeforeAll
+  public static void beforeAll() {
+    CodecBuffer.enableLeakDetection();
+  }
+
+  @AfterEach
+  public void after() throws Exception {
+    CodecTestUtil.gc();
+  }
+
   @Test
   @Timeout(1)
   void testImplWithByteBuffer() throws IOException {
@@ -59,7 +73,9 @@ public class TestChunkBuffer {
   private static void runTestImplWithByteBuffer(int n) throws IOException {
     final byte[] expected = new byte[n];
     ThreadLocalRandom.current().nextBytes(expected);
-    runTestImpl(expected, 0, ChunkBuffer.allocate(n));
+    try (ChunkBuffer c = ChunkBuffer.allocate(n)) {
+      runTestImpl(expected, 0, c);
+    }
   }
 
   @Test
@@ -78,8 +94,9 @@ public class TestChunkBuffer {
   private static void runTestIncrementalChunkBuffer(int increment, int n) 
throws IOException {
     final byte[] expected = new byte[n];
     ThreadLocalRandom.current().nextBytes(expected);
-    runTestImpl(expected, increment,
-        new IncrementalChunkBuffer(n, increment, false));
+    try (IncrementalChunkBuffer c = new IncrementalChunkBuffer(n, increment, 
false)) {
+      runTestImpl(expected, increment, c);
+    }
   }
 
   @Test
diff --git 
a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/upgrade/TestUpgradeManager.java
 
b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/upgrade/TestUpgradeManager.java
index 3be931c132..b3c15a46f7 100644
--- 
a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/upgrade/TestUpgradeManager.java
+++ 
b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/upgrade/TestUpgradeManager.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.utils.db.CodecBuffer;
+import org.apache.hadoop.hdds.utils.db.CodecTestUtil;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.Checksum;
@@ -44,6 +46,8 @@ import 
org.apache.hadoop.ozone.container.keyvalue.impl.FilePerBlockStrategy;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
 import 
org.apache.hadoop.ozone.container.metadata.DatanodeSchemaThreeDBDefinition;
 import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaThreeImpl;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -135,6 +139,16 @@ public class TestUpgradeManager {
     chunkManager = new FilePerBlockStrategy(true, blockManager, null);
   }
 
+  @BeforeAll
+  public static void beforeClass() {
+    CodecBuffer.enableLeakDetection();
+  }
+
+  @AfterEach
+  public void after() throws Exception {
+    CodecTestUtil.gc();
+  }
+
   @Test
   public void testUpgrade() throws IOException {
     int num = 2;
@@ -187,7 +201,7 @@ public class TestUpgradeManager {
   private void putChunksInBlock(int numOfChunksPerBlock, int i,
                                 List<ContainerProtos.ChunkInfo> chunks,
                                 KeyValueContainer container, BlockID blockID) {
-    long chunkLength = 100;
+    final long chunkLength = 100;
     try {
       for (int k = 0; k < numOfChunksPerBlock; k++) {
         final String chunkName = String.format("%d_chunk_%d_block_%d",
@@ -199,11 +213,10 @@ public class TestUpgradeManager {
                 .setChecksumData(Checksum.getNoChecksumDataProto()).build();
         chunks.add(info);
         ChunkInfo chunkInfo = new ChunkInfo(chunkName, offset, chunkLength);
-        final ChunkBuffer chunkData = ChunkBuffer.allocate((int) chunkLength);
-        chunkManager
-            .writeChunk(container, blockID, chunkInfo, chunkData, WRITE_STAGE);
-        chunkManager
-            .writeChunk(container, blockID, chunkInfo, chunkData, 
COMMIT_STAGE);
+        try (ChunkBuffer chunkData = ChunkBuffer.allocate((int) chunkLength)) {
+          chunkManager.writeChunk(container, blockID, chunkInfo, chunkData, 
WRITE_STAGE);
+          chunkManager.writeChunk(container, blockID, chunkInfo, chunkData, 
COMMIT_STAGE);
+        }
       }
     } catch (IOException ex) {
       LOG.warn("Putting chunks in blocks was not successful for BlockID: "
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java
index 1363dc2269..2b13daaca2 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java
@@ -237,6 +237,8 @@ public class TestCommitWatcher {
         assertThat(watcher.getFutureMap()).isEmpty();
         assertThat(watcher.getCommitIndexMap()).isEmpty();
       }
+    } finally {
+      bufferPool.clearBufferPool();
     }
   }
 
@@ -330,6 +332,8 @@ public class TestCommitWatcher {
           assertThat(watcher.getCommitIndexMap()).isEmpty();
         }
       }
+    } finally {
+      bufferPool.clearBufferPool();
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to