This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new c9f767e83d HDDS-8741. Use adjustable capacity CodecBuffer to get from
DB. (#4813)
c9f767e83d is described below
commit c9f767e83dfeca76c2c665973aee390e45141a69
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Sat Jun 3 02:55:55 2023 +0800
HDDS-8741. Use adjustable capacity CodecBuffer to get from DB. (#4813)
---
.../apache/hadoop/hdds/utils/db/CodecBuffer.java | 76 ++++++++++++++++---
.../apache/hadoop/hdds/utils/db/TypedTable.java | 87 ++++++++++++++--------
2 files changed, 121 insertions(+), 42 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
index f73cd389fd..93f2ed716d 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
@@ -46,14 +46,45 @@ public final class CodecBuffer implements AutoCloseable {
private static final ByteBufAllocator POOL
= PooledByteBufAllocator.DEFAULT;
- /** Allocate a direct buffer. */
- public static CodecBuffer allocateDirect(int exactSize) {
- return new CodecBuffer(POOL.directBuffer(exactSize, exactSize));
+
+ /**
+ * Allocate a direct buffer.
+ * When the given capacity is non-negative, allocate a buffer by setting
+ * the initial capacity and the maximum capacity to the given capacity.
+ * When the given capacity is negative, allocate a buffer
+ * by setting only the initial capacity to the absolute value of it
+ * and then the buffer's capacity can be increased if necessary.
+ */
+ public static CodecBuffer allocateDirect(int capacity) {
+ final ByteBuf buf;
+ if (capacity >= 0) {
+ // allocate exact size
+ buf = POOL.directBuffer(capacity, capacity);
+ } else {
+ // allocate a resizable buffer
+ buf = POOL.directBuffer(-capacity);
+ }
+ return new CodecBuffer(buf);
}
- /** Allocate a heap buffer. */
- public static CodecBuffer allocateHeap(int exactSize) {
- return new CodecBuffer(POOL.heapBuffer(exactSize, exactSize));
+ /**
+ * Allocate a heap buffer.
+ * When the given capacity is non-negative, allocate a buffer by setting
+ * the initial capacity and the maximum capacity to the given capacity.
+ * When the given capacity is negative, allocate a buffer
+ * by setting only the initial capacity to the absolute value of it
+ * and then the buffer's capacity can be increased if necessary.
+ */
+ public static CodecBuffer allocateHeap(int capacity) {
+ final ByteBuf buf;
+ if (capacity >= 0) {
+ // allocate exact size
+ buf = POOL.heapBuffer(capacity, capacity);
+ } else {
+ // allocate a resizable buffer
+ buf = POOL.heapBuffer(-capacity);
+ }
+ return new CodecBuffer(buf);
}
/** Wrap the given array. */
@@ -121,6 +152,31 @@ public final class CodecBuffer implements AutoCloseable {
return released;
}
+ /** Clear this buffer. */
+ public void clear() {
+ buf.clear();
+ }
+
+ /**
+ * Set the capacity of this buffer.
+ *
+ * @return true iff it has successfully changed the capacity.
+ */
+ public boolean setCapacity(int newCapacity) {
+ if (newCapacity < 0) {
+ throw new IllegalArgumentException(
+ "newCapacity = " + newCapacity + " < 0");
+ }
+ LOG.debug("setCapacity: {} -> {}, max={}",
+ buf.capacity(), newCapacity, buf.maxCapacity());
+ if (newCapacity <= buf.maxCapacity()) {
+ final ByteBuf returned = buf.capacity(newCapacity);
+ Preconditions.assertSame(buf, returned, "buf");
+ return true;
+ }
+ return false;
+ }
+
/** @return the number of bytes can be read. */
public int readableBytes() {
return buf.readableBytes();
@@ -239,11 +295,11 @@ public final class CodecBuffer implements AutoCloseable {
*
* @param source put bytes to a {@link ByteBuffer}.
* @return the return value from the source function.
- * @throws IOException in case the source throws an {@link IOException}.
+ * @param <E> The {@link Exception} type may be thrown by the given source.
+ * @throws E in case the source throws it.
*/
- Integer putFromSource(
- CheckedFunction<ByteBuffer, Integer, IOException> source)
- throws IOException {
+ <E extends Exception> Integer putFromSource(
+ CheckedFunction<ByteBuffer, Integer, E> source) throws E {
assertRefCnt(1);
final int i = buf.writerIndex();
final int writable = buf.writableBytes();
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
index 068722b8ad..4c9a65d07c 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.utils.db;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -38,6 +39,8 @@ import
org.apache.hadoop.hdds.utils.db.cache.PartialTableCache;
import org.apache.hadoop.hdds.utils.db.cache.TableCache.CacheType;
import org.apache.hadoop.hdds.utils.db.cache.TableCache;
import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.function.CheckedBiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -311,30 +314,27 @@ public class TypedTable<KEY, VALUE> implements Table<KEY,
VALUE> {
}
}
- private VALUE getFromTableCodecBuffer(KEY key) throws IOException {
- try (CodecBuffer inKey = keyCodec.toDirectCodecBuffer(key)) {
- for (; ;) {
- final int allocated = bufferSize.get();
- try (CodecBuffer outValue = CodecBuffer.allocateDirect(allocated)) {
- final Integer required = outValue.putFromSource(
- buffer -> rawTable.get(inKey.asReadOnlyByteBuffer(), buffer));
- if (required == null) {
- // key not found
- return null;
- } else if (required <= allocated) {
- // buffer size is big enough
- return valueCodec.fromCodecBuffer(outValue);
- }
- // buffer size too small, retry
- increaseBufferSize(required);
- }
- }
- }
+ /**
+ * Use {@link RDBTable#get(ByteBuffer, ByteBuffer)}
+ * to get a value mapped to the given key.
+ *
+ * @param key the buffer containing the key.
+ * @param outValue the buffer to write the output value.
+ * When the buffer capacity is smaller than the value size,
+ * partial value may be written.
+ * @return null if the key is not found;
+ * otherwise, return the size of the value.
+ * @throws IOException in case is an error reading from the db.
+ */
+ private Integer getFromTable(CodecBuffer key, CodecBuffer outValue)
+ throws IOException {
+ return outValue.putFromSource(
+ buffer -> rawTable.get(key.asReadOnlyByteBuffer(), buffer));
}
private VALUE getFromTable(KEY key) throws IOException {
if (supportCodecBuffer) {
- return getFromTableCodecBuffer(key);
+ return getFromTable(key, this::getFromTable);
} else {
final byte[] keyBytes = encodeKey(key);
byte[] valueBytes = rawTable.get(keyBytes);
@@ -342,35 +342,58 @@ public class TypedTable<KEY, VALUE> implements Table<KEY,
VALUE> {
}
}
- private Integer getFromTableIfExist(CodecBuffer key,
- CodecBuffer outValue) throws IOException {
+ /**
+ * Similar to {@link #getFromTable(CodecBuffer, CodecBuffer)} except that
+ * this method use {@link RDBTable#getIfExist(ByteBuffer, ByteBuffer)}.
+ */
+ private Integer getFromTableIfExist(CodecBuffer key, CodecBuffer outValue)
+ throws IOException {
return outValue.putFromSource(
buffer -> rawTable.getIfExist(key.asReadOnlyByteBuffer(), buffer));
}
- private VALUE getFromTableIfExistCodecBuffer(KEY key) throws IOException {
+ private VALUE getFromTable(KEY key,
+ CheckedBiFunction<CodecBuffer, CodecBuffer, Integer, IOException> get)
+ throws IOException {
try (CodecBuffer inKey = keyCodec.toDirectCodecBuffer(key)) {
for (; ;) {
- final int allocated = bufferSize.get();
- try (CodecBuffer outValue = CodecBuffer.allocateDirect(allocated)) {
- final Integer required = getFromTableIfExist(inKey, outValue);
+ final Integer required;
+ final int initial = -bufferSize.get(); // allocate a resizable buffer
+ try (CodecBuffer outValue = CodecBuffer.allocateDirect(initial)) {
+ required = get.apply(inKey, outValue);
if (required == null) {
// key not found
return null;
- } else if (required >= 0 && required <= allocated) {
- // buffer size is big enough
- return valueCodec.fromCodecBuffer(outValue);
+ } else if (required < 0) {
+ throw new IllegalStateException("required = " + required + " < 0");
+ }
+
+ for (; ;) {
+ if (required == outValue.readableBytes()) {
+ // buffer size is big enough
+ return valueCodec.fromCodecBuffer(outValue);
+ }
+ // buffer size too small, try increasing the capacity.
+ if (!outValue.setCapacity(required)) {
+ break;
+ }
+
+ // retry with the new capacity
+ outValue.clear();
+ final int retried = get.apply(inKey, outValue);
+ Preconditions.assertSame(required.intValue(), retried, "required");
}
- // buffer size too small, retry
- increaseBufferSize(required);
}
+
+ // buffer size too small, reallocate a new buffer.
+ increaseBufferSize(required);
}
}
}
private VALUE getFromTableIfExist(KEY key) throws IOException {
if (supportCodecBuffer) {
- return getFromTableIfExistCodecBuffer(key);
+ return getFromTable(key, this::getFromTableIfExist);
} else {
final byte[] keyBytes = encodeKey(key);
final byte[] valueBytes = rawTable.getIfExist(keyBytes);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]