This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 75f5a28f37 HDDS-8542. In RDBTable, add a put method using RocksDB
ByteBuffer APIs. (#4666)
75f5a28f37 is described below
commit 75f5a28f376516e7252e750e86a10c2f9d874d5a
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Sat May 6 19:00:36 2023 -0700
HDDS-8542. In RDBTable, add a put method using RocksDB ByteBuffer APIs.
(#4666)
---
.../java/org/apache/hadoop/hdds/StringUtils.java | 26 ++--
.../org/apache/hadoop/hdds/utils/db/Codec.java | 62 ++++++++-
.../apache/hadoop/hdds/utils/db/CodecBuffer.java | 151 +++++++++++++++++++++
.../container/keyvalue/TestKeyValueContainer.java | 7 +
.../hadoop/hdds/utils/db/ByteArrayCodec.java | 2 +-
.../apache/hadoop/hdds/utils/db/IntegerCodec.java | 27 +++-
.../org/apache/hadoop/hdds/utils/db/LongCodec.java | 19 ++-
.../org/apache/hadoop/hdds/utils/db/RDBStore.java | 2 +-
.../org/apache/hadoop/hdds/utils/db/RDBTable.java | 5 +
.../apache/hadoop/hdds/utils/db/RocksDatabase.java | 15 ++
.../apache/hadoop/hdds/utils/db/StringCodec.java | 23 +++-
.../apache/hadoop/hdds/utils/db/TypedTable.java | 18 ++-
.../hadoop/hdds/utils/TestRDBSnapshotProvider.java | 2 +
.../org/apache/hadoop/hdds/utils/db/TestCodec.java | 130 ++++++++++++++++++
.../apache/hadoop/hdds/utils/db/TestRDBStore.java | 1 +
.../hdds/utils/db/TestTypedRDBTableStore.java | 1 +
.../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 2 +
17 files changed, 456 insertions(+), 37 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/StringUtils.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/StringUtils.java
index ed8f410d0e..4f95839f05 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/StringUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/StringUtils.java
@@ -17,7 +17,8 @@
*/
package org.apache.hadoop.hdds;
-import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hdds.utils.SignalLogger;
import org.apache.hadoop.hdds.utils.VersionInfo;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
import org.slf4j.Logger;
/**
@@ -40,9 +42,7 @@ public final class StringUtils {
private StringUtils() {
}
- // Using the charset canonical name for String/byte[] conversions is much
- // more efficient due to use of cached encoders/decoders.
- private static final String UTF8_CSN = StandardCharsets.UTF_8.name();
+ private static final Charset UTF8 = StandardCharsets.UTF_8;
/**
* Priority of the StringUtils shutdown hook.
@@ -59,12 +59,11 @@ public final class StringUtils {
* @return The decoded string
*/
public static String bytes2String(byte[] bytes, int offset, int length) {
- try {
- return new String(bytes, offset, length, UTF8_CSN);
- } catch (UnsupportedEncodingException e) {
- // should never happen!
- throw new IllegalArgumentException("UTF8 encoding is not supported", e);
- }
+ return new String(bytes, offset, length, UTF8);
+ }
+
+ public static String bytes2String(ByteBuffer bytes) {
+ return Unpooled.wrappedBuffer(bytes.asReadOnlyBuffer()).toString(UTF8);
}
/**
@@ -82,12 +81,7 @@ public final class StringUtils {
* Converts a string to a byte array using UTF8 encoding.
*/
public static byte[] string2Bytes(String str) {
- try {
- return str.getBytes(UTF8_CSN);
- } catch (UnsupportedEncodingException e) {
- // should never happen!
- throw new IllegalArgumentException("UTF8 decoding is not supported", e);
- }
+ return str.getBytes(UTF8);
}
/**
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/Codec.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/Codec.java
index 099a83b92b..1d4ad01f8b 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/Codec.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/Codec.java
@@ -18,15 +18,62 @@
*/
package org.apache.hadoop.hdds.utils.db;
+import javax.annotation.Nonnull;
import java.io.IOException;
+import java.util.function.IntFunction;
/**
- * Codec interface to marshall/unmarshall data to/from a byte[] based
- * key/value store.
+ * Codec interface to serialize/deserialize objects to/from bytes.
+ * A codec implementation must support the byte[] methods
+ * and may optionally support the {@link CodecBuffer} methods.
*
- * @param <T> Unserialized type
+ * @param <T> The object type.
*/
public interface Codec<T> {
+ /**
+ * Does this {@link Codec} support the {@link CodecBuffer} methods?
+ * If this method returns true, this class must implement both
+ * {@link #toCodecBuffer(Object, IntFunction)} and
+ * {@link #fromCodecBuffer(CodecBuffer)}.
+ *
+ * @return ture iff this class supports the {@link CodecBuffer} methods.
+ */
+ default boolean supportCodecBuffer() {
+ return false;
+ }
+
+ /**
+ * Serialize the given object to bytes.
+ *
+ * @param object The object to be serialized.
+ * @param allocator To allocate a buffer.
+ * @return a buffer storing the serialized bytes.
+ */
+ default CodecBuffer toCodecBuffer(@Nonnull T object,
+ IntFunction<CodecBuffer> allocator) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Serialize the given object to bytes.
+ *
+ * @param object The object to be serialized.
+ * @return a direct buffer storing the serialized bytes.
+ */
+ default CodecBuffer toDirectCodecBuffer(@Nonnull T object)
+ throws IOException {
+ return toCodecBuffer(object, CodecBuffer::allocateDirect);
+ }
+
+ /**
+ * Deserialize an object from the given buffer.
+ *
+ * @param buffer Storing the serialized bytes of an object.
+ * @return the deserialized object.
+ */
+ default T fromCodecBuffer(@Nonnull CodecBuffer buffer) throws IOException {
+ throw new UnsupportedOperationException();
+ }
/**
* Convert object to raw persisted format.
@@ -42,8 +89,13 @@ public interface Codec<T> {
T fromPersistedFormat(byte[] rawData) throws IOException;
/**
- * Copy Object from the provided object, and returns a new object.
- * @param object
+ * Copy the given object.
+ * When the given object is immutable,
+ * the implementation of this method may safely return the given object.
+ *
+ * @param object The object to be copied.
+ * @return a copy of the given object. When the given object is immutable,
+ * the returned object can possibly be the same as the given object.
*/
T copyObject(T object);
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
new file mode 100644
index 0000000000..4a082ad2b6
--- /dev/null
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.utils.db;
+
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufAllocator;
+import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
+import org.apache.ratis.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A buffer used by {@link Codec}
+ * for supporting RocksDB direct {@link ByteBuffer} APIs.
+ */
+public final class CodecBuffer implements AutoCloseable {
+ public static final Logger LOG = LoggerFactory.getLogger(CodecBuffer.class);
+
+ private static final ByteBufAllocator POOL
+ = PooledByteBufAllocator.DEFAULT;
+
+ /** Allocate a direct buffer. */
+ public static CodecBuffer allocateDirect(int exactSize) {
+ return new CodecBuffer(POOL.directBuffer(exactSize, exactSize));
+ }
+
+ /** Allocate a heap buffer. */
+ public static CodecBuffer allocateHeap(int exactSize) {
+ return new CodecBuffer(POOL.heapBuffer(exactSize, exactSize));
+ }
+
+ /** Wrap the given array. */
+ public static CodecBuffer wrap(byte[] array) {
+ return new CodecBuffer(Unpooled.wrappedBuffer(array));
+ }
+
+ private static final AtomicInteger LEAK_COUNT = new AtomicInteger();
+
+ /** Assert the number of leak detected is zero. */
+ public static void assertNoLeaks() {
+ final long leak = LEAK_COUNT.get();
+ if (leak > 0) {
+ throw new AssertionError("Found " + leak + " leaked objects, check
logs");
+ }
+ }
+
+ private final ByteBuf buf;
+ private final AtomicBoolean released = new AtomicBoolean();
+
+ private CodecBuffer(ByteBuf buf) {
+ this.buf = buf;
+ assertRefCnt(1);
+ }
+
+ private void assertRefCnt(int expected) {
+ Preconditions.assertSame(expected, buf.refCnt(), "refCnt");
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ // leak detection
+ final int capacity = buf.capacity();
+ if (!released.get() && capacity > 0) {
+ final int refCnt = buf.refCnt();
+ if (refCnt > 0) {
+ final int leak = LEAK_COUNT.incrementAndGet();
+ LOG.warn("LEAK {}: {}, refCnt={}, capacity={}",
+ leak, this, refCnt, capacity);
+ buf.release(refCnt);
+ }
+ }
+ super.finalize();
+ }
+
+ @Override
+ public void close() {
+ release();
+ }
+
+ /** Release this buffer and return it back to the pool. */
+ public void release() {
+ final boolean set = released.compareAndSet(false, true);
+ Preconditions.assertTrue(set, () -> "Already released: " + this);
+ if (buf.release()) {
+ assertRefCnt(0);
+ } else {
+ // A zero capacity buffer, possibly singleton, may not be able released.
+ Preconditions.assertSame(0, buf.capacity(), "capacity");
+ }
+ }
+
+ /** @return a readonly {@link ByteBuffer} view of this buffer. */
+ public ByteBuffer asReadOnlyByteBuffer() {
+ assertRefCnt(1);
+ Preconditions.assertTrue(buf.nioBufferCount() > 0);
+ return buf.nioBuffer().asReadOnlyBuffer();
+ }
+
+ /**
+ * Similar to {@link ByteBuffer#putInt(int)}.
+ *
+ * @return this object.
+ */
+ public CodecBuffer putInt(int n) {
+ assertRefCnt(1);
+ buf.writeInt(n);
+ return this;
+ }
+
+ /**
+ * Similar to {@link ByteBuffer#putLong(long)}.
+ *
+ * @return this object.
+ */
+ public CodecBuffer putLong(long n) {
+ assertRefCnt(1);
+ buf.writeLong(n);
+ return this;
+ }
+
+ /**
+ * Similar to {@link ByteBuffer#put(byte[])}.
+ *
+ * @return this object.
+ */
+ public CodecBuffer put(byte[] array) {
+ assertRefCnt(1);
+ buf.writeBytes(array);
+ return this;
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
index 70bc715182..b385c15c8a 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
@@ -27,6 +27,7 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
+import org.apache.hadoop.hdds.utils.db.CodecBuffer;
import org.apache.hadoop.hdds.utils.db.DBProfile;
import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily;
@@ -54,6 +55,7 @@ import org.apache.ozone.test.GenericTestUtils;
import org.apache.hadoop.util.DiskChecker;
import org.assertj.core.api.Fail;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
@@ -159,6 +161,11 @@ public class TestKeyValueContainer {
keyValueContainer = new KeyValueContainer(keyValueContainerData, CONF);
}
+ @After
+ public void after() {
+ CodecBuffer.assertNoLeaks();
+ }
+
@Test
public void testCreateContainer() throws Exception {
createContainer();
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/ByteArrayCodec.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/ByteArrayCodec.java
index a462d7c20a..293775641c 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/ByteArrayCodec.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/ByteArrayCodec.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hdds.utils.db;
/**
* Codec to convert byte array to/from byte array.
*/
-public class ByteArrayCodec implements Codec<byte[]> {
+public final class ByteArrayCodec implements Codec<byte[]> {
@Override
public byte[] toPersistedFormat(byte[] bytes) {
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/IntegerCodec.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/IntegerCodec.java
index c75fd2e808..eeea286d61 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/IntegerCodec.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/IntegerCodec.java
@@ -18,21 +18,38 @@
package org.apache.hadoop.hdds.utils.db;
-import java.io.IOException;
-
import com.google.common.primitives.Ints;
+import javax.annotation.Nonnull;
+import java.util.function.IntFunction;
+
/**
* Codec to convert Integer to/from byte array.
*/
-public class IntegerCodec implements Codec<Integer> {
+public final class IntegerCodec implements Codec<Integer> {
+ @Override
+ public boolean supportCodecBuffer() {
+ return true;
+ }
+
+ @Override
+ public CodecBuffer toCodecBuffer(@Nonnull Integer object,
+ IntFunction<CodecBuffer> allocator) {
+ return allocator.apply(Integer.BYTES).putInt(object);
+ }
+
+ @Override
+ public Integer fromCodecBuffer(@Nonnull CodecBuffer buffer) {
+ return buffer.asReadOnlyByteBuffer().getInt();
+ }
+
@Override
- public byte[] toPersistedFormat(Integer object) throws IOException {
+ public byte[] toPersistedFormat(Integer object) {
return Ints.toByteArray(object);
}
@Override
- public Integer fromPersistedFormat(byte[] rawData) throws IOException {
+ public Integer fromPersistedFormat(byte[] rawData) {
return Ints.fromByteArray(rawData);
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/LongCodec.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/LongCodec.java
index f3b8ca7007..a7e848281e 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/LongCodec.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/LongCodec.java
@@ -20,11 +20,28 @@ package org.apache.hadoop.hdds.utils.db;
import com.google.common.primitives.Longs;
+import javax.annotation.Nonnull;
+import java.util.function.IntFunction;
/**
* Codec to convert Long to/from byte array.
*/
-public class LongCodec implements Codec<Long> {
+public final class LongCodec implements Codec<Long> {
+ @Override
+ public boolean supportCodecBuffer() {
+ return true;
+ }
+
+ @Override
+ public CodecBuffer toCodecBuffer(@Nonnull Long object,
+ IntFunction<CodecBuffer> allocator) {
+ return allocator.apply(Long.BYTES).putLong(object);
+ }
+
+ @Override
+ public Long fromCodecBuffer(@Nonnull CodecBuffer buffer) {
+ return buffer.asReadOnlyByteBuffer().getLong();
+ }
@Override
public byte[] toPersistedFormat(Long object) {
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index ca2b6743d6..e4692a8ae9 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@ -255,7 +255,7 @@ public class RDBStore implements DBStore {
}
@Override
- public Table<byte[], byte[]> getTable(String name) throws IOException {
+ public RDBTable getTable(String name) throws IOException {
final ColumnFamily handle = db.getColumnFamily(name);
if (handle == null) {
throw new IOException("No such table in this DB. TableName : " + name);
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
index d70d007bfb..9808f7182f 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.utils.db;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.ArrayList;
import java.util.Arrays;
@@ -65,6 +66,10 @@ class RDBTable implements Table<byte[], byte[]> {
return family;
}
+ public void put(ByteBuffer key, ByteBuffer value) throws IOException {
+ db.put(family, key, value);
+ }
+
@Override
public void put(byte[] key, byte[] value) throws IOException {
db.put(family, key, value);
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
index cc44361908..4c48ed0c7b 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
@@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -445,6 +446,20 @@ public final class RocksDatabase {
}
}
+ public void put(ColumnFamily family, ByteBuffer key, ByteBuffer value)
+ throws IOException {
+ assertClose();
+ try {
+ counter.incrementAndGet();
+ db.get().put(family.getHandle(), writeOptions, key, value);
+ } catch (RocksDBException e) {
+ closeOnError(e, true);
+ throw toIOException(this, "put " + bytes2String(key), e);
+ } finally {
+ counter.decrementAndGet();
+ }
+ }
+
public void flush() throws IOException {
assertClose();
try (ManagedFlushOptions options = new ManagedFlushOptions()) {
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/StringCodec.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/StringCodec.java
index dc432ab9f1..9b5bcb0237 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/StringCodec.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/StringCodec.java
@@ -18,17 +18,34 @@
*/
package org.apache.hadoop.hdds.utils.db;
-import java.io.IOException;
+import java.util.function.IntFunction;
+import javax.annotation.Nonnull;
import org.apache.hadoop.hdds.StringUtils;
/**
* Codec to convert String to/from byte array.
*/
-public class StringCodec implements Codec<String> {
+public final class StringCodec implements Codec<String> {
+ @Override
+ public boolean supportCodecBuffer() {
+ return true;
+ }
+
+ @Override
+ public CodecBuffer toCodecBuffer(@Nonnull String object,
+ IntFunction<CodecBuffer> allocator) {
+ final byte[] array = toPersistedFormat(object);
+ return allocator.apply(array.length).put(array);
+ }
+
+ @Override
+ public String fromCodecBuffer(@Nonnull CodecBuffer buffer) {
+ return StringUtils.bytes2String(buffer.asReadOnlyByteBuffer());
+ }
@Override
- public byte[] toPersistedFormat(String object) throws IOException {
+ public byte[] toPersistedFormat(String object) {
if (object != null) {
return StringUtils.string2Bytes(object);
} else {
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
index 4eba908e67..2287a30b32 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
@@ -49,7 +49,7 @@ import static
org.apache.hadoop.hdds.utils.db.cache.CacheResult.CacheStatus.NOT_
*/
public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
- private final Table<byte[], byte[]> rawTable;
+ private final RDBTable rawTable;
private final CodecRegistry codecRegistry;
@@ -69,8 +69,7 @@ public class TypedTable<KEY, VALUE> implements Table<KEY,
VALUE> {
* @param keyType
* @param valueType
*/
- public TypedTable(
- Table<byte[], byte[]> rawTable,
+ public TypedTable(RDBTable rawTable,
CodecRegistry codecRegistry, Class<KEY> keyType,
Class<VALUE> valueType) throws IOException {
this(rawTable, codecRegistry, keyType, valueType,
@@ -86,8 +85,7 @@ public class TypedTable<KEY, VALUE> implements Table<KEY,
VALUE> {
* @param cacheType
* @throws IOException
*/
- public TypedTable(
- Table<byte[], byte[]> rawTable,
+ public TypedTable(RDBTable rawTable,
CodecRegistry codecRegistry, Class<KEY> keyType,
Class<VALUE> valueType,
CacheType cacheType) throws IOException {
@@ -119,6 +117,16 @@ public class TypedTable<KEY, VALUE> implements Table<KEY,
VALUE> {
@Override
public void put(KEY key, VALUE value) throws IOException {
+ final Codec<KEY> keyCodec = codecRegistry.getCodec(key);
+ final Codec<VALUE> valueCodec = codecRegistry.getCodec(value);
+ if (keyCodec.supportCodecBuffer() && valueCodec.supportCodecBuffer()) {
+ try (CodecBuffer k = keyCodec.toDirectCodecBuffer(key);
+ CodecBuffer v = valueCodec.toDirectCodecBuffer(value)) {
+ rawTable.put(k.asReadOnlyByteBuffer(), v.asReadOnlyByteBuffer());
+ }
+ return;
+ }
+
byte[] keyData = codecRegistry.asRawData(key);
byte[] valueData = codecRegistry.asRawData(value);
rawTable.put(keyData, valueData);
diff --git
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestRDBSnapshotProvider.java
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestRDBSnapshotProvider.java
index 48f7745dba..54fb72645a 100644
---
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestRDBSnapshotProvider.java
+++
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestRDBSnapshotProvider.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.utils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.StringUtils;
+import org.apache.hadoop.hdds.utils.db.CodecBuffer;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.apache.hadoop.hdds.utils.db.Table;
@@ -132,6 +133,7 @@ public class TestRDBSnapshotProvider {
if (testDir.exists()) {
FileUtil.fullyDelete(testDir);
}
+ CodecBuffer.assertNoLeaks();
}
@Test
diff --git
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java
new file mode 100644
index 0000000000..448eafe184
--- /dev/null
+++
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.utils.db;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Test {@link Codec} implementations.
+ */
+public final class TestCodec {
+ static final Logger LOG = LoggerFactory.getLogger(TestCodec.class);
+ static final int NUM_LOOPS = 10;
+
+ /** Force gc to check leakage. */
+ static void gc() throws InterruptedException {
+ // use WeakReference to detect gc
+ Object obj = new Object();
+ final WeakReference<Object> weakRef = new WeakReference<>(obj);
+ obj = null;
+
+ // loop until gc has completed.
+ for (int i = 0; weakRef.get() != null; i++) {
+ LOG.info("gc {}", i);
+ System.gc();
+ Thread.sleep(100);
+ }
+ CodecBuffer.assertNoLeaks();
+ }
+
+ @Test
+ public void testIntegerCodec() throws Exception {
+ final IntegerCodec codec = new IntegerCodec();
+ runTest(codec, 0, Integer.BYTES);
+ runTest(codec, 1, Integer.BYTES);
+ runTest(codec, -1, Integer.BYTES);
+ runTest(codec, Integer.MAX_VALUE, Integer.BYTES);
+ runTest(codec, Integer.MIN_VALUE, Integer.BYTES);
+
+ for (int i = 0; i < NUM_LOOPS; i++) {
+ final int original = ThreadLocalRandom.current().nextInt();
+ runTest(codec, original, Integer.BYTES);
+ }
+ gc();
+ }
+
+ @Test
+ public void testLongCodec() throws Exception {
+ final LongCodec codec = new LongCodec();
+ runTest(codec, 0L, Long.BYTES);
+ runTest(codec, 1L, Long.BYTES);
+ runTest(codec, -1L, Long.BYTES);
+ runTest(codec, Long.MAX_VALUE, Long.BYTES);
+ runTest(codec, Long.MIN_VALUE, Long.BYTES);
+
+ for (int i = 0; i < NUM_LOOPS; i++) {
+ final long original = ThreadLocalRandom.current().nextLong();
+ runTest(codec, original, Long.BYTES);
+ }
+ gc();
+ }
+
+ @Test
+ public void testStringCodec() throws Exception {
+ final StringCodec codec = new StringCodec();
+ runTest(codec, "", 0);
+
+ for (int i = 0; i < NUM_LOOPS; i++) {
+ final String original = "test" + ThreadLocalRandom.current().nextLong();
+ runTest(codec, original, original.length());
+ }
+ gc();
+ }
+
+ static <T> void runTest(Codec<T> codec, T original,
+ Integer serializedSize) throws Exception {
+ Assertions.assertTrue(codec.supportCodecBuffer());
+
+ // serialize to byte[]
+ final byte[] array = codec.toPersistedFormat(original);
+ if (serializedSize != null) {
+ Assertions.assertEquals(serializedSize, array.length);
+ }
+ // deserialize from byte[]
+ final T fromArray = codec.fromPersistedFormat(array);
+ Assertions.assertEquals(original, fromArray);
+
+ // serialize to CodecBuffer
+ final CodecBuffer codecBuffer = codec.toCodecBuffer(
+ original, CodecBuffer::allocateHeap);
+ final ByteBuffer byteBuffer = codecBuffer.asReadOnlyByteBuffer();
+ Assertions.assertEquals(array.length, byteBuffer.remaining());
+ for (int i = 0; i < array.length; i++) {
+ // assert exact content
+ Assertions.assertEquals(array[i], byteBuffer.get(i));
+ }
+
+ // deserialize from CodecBuffer
+ final T fromBuffer = codec.fromCodecBuffer(codecBuffer);
+ codecBuffer.release();
+ Assertions.assertEquals(original, fromBuffer);
+
+ // deserialize from wrapped buffer
+ final CodecBuffer wrapped = CodecBuffer.wrap(array);
+ final T fromWrappedArray = codec.fromCodecBuffer(wrapped);
+ wrapped.release();
+ Assertions.assertEquals(original, fromWrappedArray);
+ }
+}
diff --git
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
index a939531471..7831e03efd 100644
---
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
+++
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
@@ -97,6 +97,7 @@ public class TestRDBStore {
if (rdbStore != null) {
rdbStore.close();
}
+ CodecBuffer.assertNoLeaks();
}
public void insertRandomData(RDBStore dbStore, int familyIndex)
diff --git
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java
index ad1c03b868..cc249a57b7 100644
---
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java
+++
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java
@@ -91,6 +91,7 @@ public class TestTypedRDBTableStore {
if (rdbStore != null) {
rdbStore.close();
}
+ CodecBuffer.assertNoLeaks();
}
@Test
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 5c63f98a15..8c325e40e7 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.hdds.utils.db.CodecBuffer;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectMetrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.ozone.client.OzoneClient;
@@ -464,6 +465,7 @@ public class MiniOzoneClusterImpl implements
MiniOzoneCluster {
DefaultMetricsSystem.shutdown();
ManagedRocksObjectMetrics.INSTANCE.assertNoLeaks();
+ CodecBuffer.assertNoLeaks();
} catch (IOException e) {
LOG.error("Exception while shutting down the cluster.", e);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]