This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new c9f767e83d HDDS-8741. Use adjustable capacity CodecBuffer to get from 
DB. (#4813)
c9f767e83d is described below

commit c9f767e83dfeca76c2c665973aee390e45141a69
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Sat Jun 3 02:55:55 2023 +0800

    HDDS-8741. Use adjustable capacity CodecBuffer to get from DB. (#4813)
---
 .../apache/hadoop/hdds/utils/db/CodecBuffer.java   | 76 ++++++++++++++++---
 .../apache/hadoop/hdds/utils/db/TypedTable.java    | 87 ++++++++++++++--------
 2 files changed, 121 insertions(+), 42 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
index f73cd389fd..93f2ed716d 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
@@ -46,14 +46,45 @@ public final class CodecBuffer implements AutoCloseable {
   private static final ByteBufAllocator POOL
       = PooledByteBufAllocator.DEFAULT;
 
-  /** Allocate a direct buffer. */
-  public static CodecBuffer allocateDirect(int exactSize) {
-    return new CodecBuffer(POOL.directBuffer(exactSize, exactSize));
+  
+  /**
+   * Allocate a direct buffer.
+   * When the given capacity is non-negative, allocate a buffer by setting
+   * the initial capacity and the maximum capacity to the given capacity.
+   * When the given capacity is negative, allocate a buffer
+   * by setting only the initial capacity to the absolute value of it
+   * and then the buffer's capacity can be increased if necessary.
+   */
+  public static CodecBuffer allocateDirect(int capacity) {
+    final ByteBuf buf;
+    if (capacity >= 0) {
+      // allocate exact size
+      buf = POOL.directBuffer(capacity, capacity);
+    } else {
+      // allocate a resizable buffer
+      buf = POOL.directBuffer(-capacity);
+    }
+    return new CodecBuffer(buf);
   }
 
-  /** Allocate a heap buffer. */
-  public static CodecBuffer allocateHeap(int exactSize) {
-    return new CodecBuffer(POOL.heapBuffer(exactSize, exactSize));
+  /**
+   * Allocate a heap buffer.
+   * When the given capacity is non-negative, allocate a buffer by setting
+   * the initial capacity and the maximum capacity to the given capacity.
+   * When the given capacity is negative, allocate a buffer
+   * by setting only the initial capacity to the absolute value of it
+   * and then the buffer's capacity can be increased if necessary.
+   */
+  public static CodecBuffer allocateHeap(int capacity) {
+    final ByteBuf buf;
+    if (capacity >= 0) {
+      // allocate exact size
+      buf = POOL.heapBuffer(capacity, capacity);
+    } else {
+      // allocate a resizable buffer
+      buf = POOL.heapBuffer(-capacity);
+    }
+    return new CodecBuffer(buf);
   }
 
   /** Wrap the given array. */
@@ -121,6 +152,31 @@ public final class CodecBuffer implements AutoCloseable {
     return released;
   }
 
+  /** Clear this buffer. */
+  public void clear() {
+    buf.clear();
+  }
+
+  /**
+   * Set the capacity of this buffer.
+   *
+   * @return true iff it has successfully changed the capacity.
+   */
+  public boolean setCapacity(int newCapacity) {
+    if (newCapacity < 0) {
+      throw new IllegalArgumentException(
+          "newCapacity = " + newCapacity + " < 0");
+    }
+    LOG.debug("setCapacity: {} -> {}, max={}",
+        buf.capacity(), newCapacity, buf.maxCapacity());
+    if (newCapacity <= buf.maxCapacity()) {
+      final ByteBuf returned = buf.capacity(newCapacity);
+      Preconditions.assertSame(buf, returned, "buf");
+      return true;
+    }
+    return false;
+  }
+
   /** @return the number of bytes can be read. */
   public int readableBytes() {
     return buf.readableBytes();
@@ -239,11 +295,11 @@ public final class CodecBuffer implements AutoCloseable {
    *
    * @param source put bytes to a {@link ByteBuffer}.
    * @return the return value from the source function.
-   * @throws IOException in case the source throws an {@link IOException}.
+   * @param <E> The {@link Exception} type may be thrown by the given source.
+   * @throws E in case the source throws it.
    */
-  Integer putFromSource(
-      CheckedFunction<ByteBuffer, Integer, IOException> source)
-      throws IOException {
+  <E extends Exception> Integer putFromSource(
+      CheckedFunction<ByteBuffer, Integer, E> source) throws E {
     assertRefCnt(1);
     final int i = buf.writerIndex();
     final int writable = buf.writableBytes();
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
index 068722b8ad..4c9a65d07c 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.utils.db;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -38,6 +39,8 @@ import 
org.apache.hadoop.hdds.utils.db.cache.PartialTableCache;
 import org.apache.hadoop.hdds.utils.db.cache.TableCache.CacheType;
 import org.apache.hadoop.hdds.utils.db.cache.TableCache;
 import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.function.CheckedBiFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -311,30 +314,27 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, 
VALUE> {
     }
   }
 
-  private VALUE getFromTableCodecBuffer(KEY key) throws IOException {
-    try (CodecBuffer inKey = keyCodec.toDirectCodecBuffer(key)) {
-      for (; ;) {
-        final int allocated = bufferSize.get();
-        try (CodecBuffer outValue = CodecBuffer.allocateDirect(allocated)) {
-          final Integer required = outValue.putFromSource(
-              buffer -> rawTable.get(inKey.asReadOnlyByteBuffer(), buffer));
-          if (required == null) {
-            // key not found
-            return null;
-          } else if (required <= allocated) {
-            // buffer size is big enough
-            return valueCodec.fromCodecBuffer(outValue);
-          }
-          // buffer size too small, retry
-          increaseBufferSize(required);
-        }
-      }
-    }
+  /**
+   * Use {@link RDBTable#get(ByteBuffer, ByteBuffer)}
+   * to get a value mapped to the given key.
+   *
+   * @param key the buffer containing the key.
+   * @param outValue the buffer to write the output value.
+   *                 When the buffer capacity is smaller than the value size,
+   *                 partial value may be written.
+   * @return null if the key is not found;
+   *         otherwise, return the size of the value.
+   * @throws IOException in case is an error reading from the db.
+   */
+  private Integer getFromTable(CodecBuffer key, CodecBuffer outValue)
+      throws IOException {
+    return outValue.putFromSource(
+        buffer -> rawTable.get(key.asReadOnlyByteBuffer(), buffer));
   }
 
   private VALUE getFromTable(KEY key) throws IOException {
     if (supportCodecBuffer) {
-      return getFromTableCodecBuffer(key);
+      return getFromTable(key, this::getFromTable);
     } else {
       final byte[] keyBytes = encodeKey(key);
       byte[] valueBytes = rawTable.get(keyBytes);
@@ -342,35 +342,58 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, 
VALUE> {
     }
   }
 
-  private Integer getFromTableIfExist(CodecBuffer key,
-      CodecBuffer outValue) throws IOException {
+  /**
+   * Similar to {@link #getFromTable(CodecBuffer, CodecBuffer)} except that
+   * this method use {@link RDBTable#getIfExist(ByteBuffer, ByteBuffer)}.
+   */
+  private Integer getFromTableIfExist(CodecBuffer key, CodecBuffer outValue)
+      throws IOException {
     return outValue.putFromSource(
         buffer -> rawTable.getIfExist(key.asReadOnlyByteBuffer(), buffer));
   }
 
-  private VALUE getFromTableIfExistCodecBuffer(KEY key) throws IOException {
+  private VALUE getFromTable(KEY key,
+      CheckedBiFunction<CodecBuffer, CodecBuffer, Integer, IOException> get)
+      throws IOException {
     try (CodecBuffer inKey = keyCodec.toDirectCodecBuffer(key)) {
       for (; ;) {
-        final int allocated = bufferSize.get();
-        try (CodecBuffer outValue = CodecBuffer.allocateDirect(allocated)) {
-          final Integer required = getFromTableIfExist(inKey, outValue);
+        final Integer required;
+        final int initial = -bufferSize.get(); // allocate a resizable buffer
+        try (CodecBuffer outValue = CodecBuffer.allocateDirect(initial)) {
+          required = get.apply(inKey, outValue);
           if (required == null) {
             // key not found
             return null;
-          } else if (required >= 0 && required <= allocated) {
-            // buffer size is big enough
-            return valueCodec.fromCodecBuffer(outValue);
+          } else if (required < 0) {
+            throw new IllegalStateException("required = " + required + " < 0");
+          }
+
+          for (; ;) {
+            if (required == outValue.readableBytes()) {
+              // buffer size is big enough
+              return valueCodec.fromCodecBuffer(outValue);
+            }
+            // buffer size too small, try increasing the capacity.
+            if (!outValue.setCapacity(required)) {
+              break;
+            }
+
+            // retry with the new capacity
+            outValue.clear();
+            final int retried = get.apply(inKey, outValue);
+            Preconditions.assertSame(required.intValue(), retried, "required");
           }
-          // buffer size too small, retry
-          increaseBufferSize(required);
         }
+
+        // buffer size too small, reallocate a new buffer.
+        increaseBufferSize(required);
       }
     }
   }
 
   private VALUE getFromTableIfExist(KEY key) throws IOException {
     if (supportCodecBuffer) {
-      return getFromTableIfExistCodecBuffer(key);
+      return getFromTable(key, this::getFromTableIfExist);
     } else {
       final byte[] keyBytes = encodeKey(key);
       final byte[] valueBytes = rawTable.getIfExist(keyBytes);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to