This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 75f5a28f37 HDDS-8542. In RDBTable, add a put method using RocksDB 
ByteBuffer APIs. (#4666)
75f5a28f37 is described below

commit 75f5a28f376516e7252e750e86a10c2f9d874d5a
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Sat May 6 19:00:36 2023 -0700

    HDDS-8542. In RDBTable, add a put method using RocksDB ByteBuffer APIs. 
(#4666)
---
 .../java/org/apache/hadoop/hdds/StringUtils.java   |  26 ++--
 .../org/apache/hadoop/hdds/utils/db/Codec.java     |  62 ++++++++-
 .../apache/hadoop/hdds/utils/db/CodecBuffer.java   | 151 +++++++++++++++++++++
 .../container/keyvalue/TestKeyValueContainer.java  |   7 +
 .../hadoop/hdds/utils/db/ByteArrayCodec.java       |   2 +-
 .../apache/hadoop/hdds/utils/db/IntegerCodec.java  |  27 +++-
 .../org/apache/hadoop/hdds/utils/db/LongCodec.java |  19 ++-
 .../org/apache/hadoop/hdds/utils/db/RDBStore.java  |   2 +-
 .../org/apache/hadoop/hdds/utils/db/RDBTable.java  |   5 +
 .../apache/hadoop/hdds/utils/db/RocksDatabase.java |  15 ++
 .../apache/hadoop/hdds/utils/db/StringCodec.java   |  23 +++-
 .../apache/hadoop/hdds/utils/db/TypedTable.java    |  18 ++-
 .../hadoop/hdds/utils/TestRDBSnapshotProvider.java |   2 +
 .../org/apache/hadoop/hdds/utils/db/TestCodec.java | 130 ++++++++++++++++++
 .../apache/hadoop/hdds/utils/db/TestRDBStore.java  |   1 +
 .../hdds/utils/db/TestTypedRDBTableStore.java      |   1 +
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |   2 +
 17 files changed, 456 insertions(+), 37 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/StringUtils.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/StringUtils.java
index ed8f410d0e..4f95839f05 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/StringUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/StringUtils.java
@@ -17,7 +17,8 @@
  */
 package org.apache.hadoop.hdds;
 
-import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hdds.utils.SignalLogger;
 import org.apache.hadoop.hdds.utils.VersionInfo;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
 import org.slf4j.Logger;
 
 /**
@@ -40,9 +42,7 @@ public final class StringUtils {
   private StringUtils() {
   }
 
-  // Using the charset canonical name for String/byte[] conversions is much
-  // more efficient due to use of cached encoders/decoders.
-  private static final String UTF8_CSN = StandardCharsets.UTF_8.name();
+  private static final Charset UTF8 = StandardCharsets.UTF_8;
 
   /**
    * Priority of the StringUtils shutdown hook.
@@ -59,12 +59,11 @@ public final class StringUtils {
    * @return The decoded string
    */
   public static String bytes2String(byte[] bytes, int offset, int length) {
-    try {
-      return new String(bytes, offset, length, UTF8_CSN);
-    } catch (UnsupportedEncodingException e) {
-      // should never happen!
-      throw new IllegalArgumentException("UTF8 encoding is not supported", e);
-    }
+    return new String(bytes, offset, length, UTF8);
+  }
+
+  public static String bytes2String(ByteBuffer bytes) {
+    return Unpooled.wrappedBuffer(bytes.asReadOnlyBuffer()).toString(UTF8);
   }
 
   /**
@@ -82,12 +81,7 @@ public final class StringUtils {
    * Converts a string to a byte array using UTF8 encoding.
    */
   public static byte[] string2Bytes(String str) {
-    try {
-      return str.getBytes(UTF8_CSN);
-    } catch (UnsupportedEncodingException e) {
-      // should never happen!
-      throw new IllegalArgumentException("UTF8 decoding is not supported", e);
-    }
+    return str.getBytes(UTF8);
   }
 
   /**
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/Codec.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/Codec.java
index 099a83b92b..1d4ad01f8b 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/Codec.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/Codec.java
@@ -18,15 +18,62 @@
  */
 package org.apache.hadoop.hdds.utils.db;
 
+import javax.annotation.Nonnull;
 import java.io.IOException;
+import java.util.function.IntFunction;
 
 /**
- * Codec interface to marshall/unmarshall data to/from a byte[] based
- * key/value store.
+ * Codec interface to serialize/deserialize objects to/from bytes.
+ * A codec implementation must support the byte[] methods
+ * and may optionally support the {@link CodecBuffer} methods.
  *
- * @param <T> Unserialized type
+ * @param <T> The object type.
  */
 public interface Codec<T> {
+  /**
+   * Does this {@link Codec} support the {@link CodecBuffer} methods?
+   * If this method returns true, this class must implement both
+   * {@link #toCodecBuffer(Object, IntFunction)} and
+   * {@link #fromCodecBuffer(CodecBuffer)}.
+   *
+   * @return ture iff this class supports the {@link CodecBuffer} methods.
+   */
+  default boolean supportCodecBuffer() {
+    return false;
+  }
+
+  /**
+   * Serialize the given object to bytes.
+   *
+   * @param object The object to be serialized.
+   * @param allocator To allocate a buffer.
+   * @return a buffer storing the serialized bytes.
+   */
+  default CodecBuffer toCodecBuffer(@Nonnull T object,
+      IntFunction<CodecBuffer> allocator) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Serialize the given object to bytes.
+   *
+   * @param object The object to be serialized.
+   * @return a direct buffer storing the serialized bytes.
+   */
+  default CodecBuffer toDirectCodecBuffer(@Nonnull T object)
+      throws IOException {
+    return toCodecBuffer(object, CodecBuffer::allocateDirect);
+  }
+
+  /**
+   * Deserialize an object from the given buffer.
+   *
+   * @param buffer Storing the serialized bytes of an object.
+   * @return the deserialized object.
+   */
+  default T fromCodecBuffer(@Nonnull CodecBuffer buffer) throws IOException {
+    throw new UnsupportedOperationException();
+  }
 
   /**
    * Convert object to raw persisted format.
@@ -42,8 +89,13 @@ public interface Codec<T> {
   T fromPersistedFormat(byte[] rawData) throws IOException;
 
   /**
-   * Copy Object from the provided object, and returns a new object.
-   * @param object
+   * Copy the given object.
+   * When the given object is immutable,
+   * the implementation of this method may safely return the given object.
+   *
+   * @param object The object to be copied.
+   * @return a copy of the given object.  When the given object is immutable,
+   *         the returned object can possibly be the same as the given object.
    */
   T copyObject(T object);
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
new file mode 100644
index 0000000000..4a082ad2b6
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.utils.db;
+
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufAllocator;
+import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
+import org.apache.ratis.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A buffer used by {@link Codec}
+ * for supporting RocksDB direct {@link ByteBuffer} APIs.
+ */
+public final class CodecBuffer implements AutoCloseable {
+  public static final Logger LOG = LoggerFactory.getLogger(CodecBuffer.class);
+
+  private static final ByteBufAllocator POOL
+      = PooledByteBufAllocator.DEFAULT;
+
+  /** Allocate a direct buffer. */
+  public static CodecBuffer allocateDirect(int exactSize) {
+    return new CodecBuffer(POOL.directBuffer(exactSize, exactSize));
+  }
+
+  /** Allocate a heap buffer. */
+  public static CodecBuffer allocateHeap(int exactSize) {
+    return new CodecBuffer(POOL.heapBuffer(exactSize, exactSize));
+  }
+
+  /** Wrap the given array. */
+  public static CodecBuffer wrap(byte[] array) {
+    return new CodecBuffer(Unpooled.wrappedBuffer(array));
+  }
+
+  private static final AtomicInteger LEAK_COUNT = new AtomicInteger();
+
+  /** Assert the number of leak detected is zero. */
+  public static void assertNoLeaks() {
+    final long leak = LEAK_COUNT.get();
+    if (leak > 0) {
+      throw new AssertionError("Found " + leak + " leaked objects, check 
logs");
+    }
+  }
+
+  private final ByteBuf buf;
+  private final AtomicBoolean released = new AtomicBoolean();
+
+  private CodecBuffer(ByteBuf buf) {
+    this.buf = buf;
+    assertRefCnt(1);
+  }
+
+  private void assertRefCnt(int expected) {
+    Preconditions.assertSame(expected, buf.refCnt(), "refCnt");
+  }
+
+  @Override
+  protected void finalize() throws Throwable {
+    // leak detection
+    final int capacity = buf.capacity();
+    if (!released.get() && capacity > 0) {
+      final int refCnt = buf.refCnt();
+      if (refCnt > 0) {
+        final int leak = LEAK_COUNT.incrementAndGet();
+        LOG.warn("LEAK {}: {}, refCnt={}, capacity={}",
+            leak, this, refCnt, capacity);
+        buf.release(refCnt);
+      }
+    }
+    super.finalize();
+  }
+
+  @Override
+  public void close() {
+    release();
+  }
+
+  /** Release this buffer and return it back to the pool. */
+  public void release() {
+    final boolean set = released.compareAndSet(false, true);
+    Preconditions.assertTrue(set, () -> "Already released: " + this);
+    if (buf.release()) {
+      assertRefCnt(0);
+    } else {
+      // A zero capacity buffer, possibly singleton, may not be able released.
+      Preconditions.assertSame(0, buf.capacity(), "capacity");
+    }
+  }
+
+  /** @return a readonly {@link ByteBuffer} view of this buffer. */
+  public ByteBuffer asReadOnlyByteBuffer() {
+    assertRefCnt(1);
+    Preconditions.assertTrue(buf.nioBufferCount() > 0);
+    return buf.nioBuffer().asReadOnlyBuffer();
+  }
+
+  /**
+   * Similar to {@link ByteBuffer#putInt(int)}.
+   *
+   * @return this object.
+   */
+  public CodecBuffer putInt(int n) {
+    assertRefCnt(1);
+    buf.writeInt(n);
+    return this;
+  }
+
+  /**
+   * Similar to {@link ByteBuffer#putLong(long)}.
+   *
+   * @return this object.
+   */
+  public CodecBuffer putLong(long n) {
+    assertRefCnt(1);
+    buf.writeLong(n);
+    return this;
+  }
+
+  /**
+   * Similar to {@link ByteBuffer#put(byte[])}.
+   *
+   * @return this object.
+   */
+  public CodecBuffer put(byte[] array) {
+    assertRefCnt(1);
+    buf.writeBytes(array);
+    return this;
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
index 70bc715182..b385c15c8a 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
@@ -27,6 +27,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;
+import org.apache.hadoop.hdds.utils.db.CodecBuffer;
 import org.apache.hadoop.hdds.utils.db.DBProfile;
 import org.apache.hadoop.hdds.utils.db.RDBStore;
 import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily;
@@ -54,6 +55,7 @@ import org.apache.ozone.test.GenericTestUtils;
 import org.apache.hadoop.util.DiskChecker;
 
 import org.assertj.core.api.Fail;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Before;
@@ -159,6 +161,11 @@ public class TestKeyValueContainer {
     keyValueContainer = new KeyValueContainer(keyValueContainerData, CONF);
   }
 
+  @After
+  public void after() {
+    CodecBuffer.assertNoLeaks();
+  }
+
   @Test
   public void testCreateContainer() throws Exception {
     createContainer();
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/ByteArrayCodec.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/ByteArrayCodec.java
index a462d7c20a..293775641c 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/ByteArrayCodec.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/ByteArrayCodec.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hdds.utils.db;
 /**
  * Codec to convert byte array to/from byte array.
  */
-public class ByteArrayCodec implements Codec<byte[]> {
+public final class ByteArrayCodec implements Codec<byte[]> {
 
   @Override
   public byte[] toPersistedFormat(byte[] bytes) {
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/IntegerCodec.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/IntegerCodec.java
index c75fd2e808..eeea286d61 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/IntegerCodec.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/IntegerCodec.java
@@ -18,21 +18,38 @@
 
 package org.apache.hadoop.hdds.utils.db;
 
-import java.io.IOException;
-
 import com.google.common.primitives.Ints;
 
+import javax.annotation.Nonnull;
+import java.util.function.IntFunction;
+
 /**
  * Codec to convert Integer to/from byte array.
  */
-public class IntegerCodec implements Codec<Integer> {
+public final class IntegerCodec implements Codec<Integer> {
+  @Override
+  public boolean supportCodecBuffer() {
+    return true;
+  }
+
+  @Override
+  public CodecBuffer toCodecBuffer(@Nonnull Integer object,
+      IntFunction<CodecBuffer> allocator) {
+    return allocator.apply(Integer.BYTES).putInt(object);
+  }
+
+  @Override
+  public Integer fromCodecBuffer(@Nonnull CodecBuffer buffer) {
+    return buffer.asReadOnlyByteBuffer().getInt();
+  }
+
   @Override
-  public byte[] toPersistedFormat(Integer object) throws IOException {
+  public byte[] toPersistedFormat(Integer object) {
     return Ints.toByteArray(object);
   }
 
   @Override
-  public Integer fromPersistedFormat(byte[] rawData) throws IOException {
+  public Integer fromPersistedFormat(byte[] rawData) {
     return Ints.fromByteArray(rawData);
   }
 
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/LongCodec.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/LongCodec.java
index f3b8ca7007..a7e848281e 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/LongCodec.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/LongCodec.java
@@ -20,11 +20,28 @@ package org.apache.hadoop.hdds.utils.db;
 
 import com.google.common.primitives.Longs;
 
+import javax.annotation.Nonnull;
+import java.util.function.IntFunction;
 
 /**
  * Codec to convert Long to/from byte array.
  */
-public class LongCodec implements Codec<Long> {
+public final class LongCodec implements Codec<Long> {
+  @Override
+  public boolean supportCodecBuffer() {
+    return true;
+  }
+
+  @Override
+  public CodecBuffer toCodecBuffer(@Nonnull Long object,
+      IntFunction<CodecBuffer> allocator) {
+    return allocator.apply(Long.BYTES).putLong(object);
+  }
+
+  @Override
+  public Long fromCodecBuffer(@Nonnull CodecBuffer buffer) {
+    return buffer.asReadOnlyByteBuffer().getLong();
+  }
 
   @Override
   public byte[] toPersistedFormat(Long object) {
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index ca2b6743d6..e4692a8ae9 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@ -255,7 +255,7 @@ public class RDBStore implements DBStore {
   }
 
   @Override
-  public Table<byte[], byte[]> getTable(String name) throws IOException {
+  public RDBTable getTable(String name) throws IOException {
     final ColumnFamily handle = db.getColumnFamily(name);
     if (handle == null) {
       throw new IOException("No such table in this DB. TableName : " + name);
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
index d70d007bfb..9808f7182f 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.utils.db;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -65,6 +66,10 @@ class RDBTable implements Table<byte[], byte[]> {
     return family;
   }
 
+  public void put(ByteBuffer key, ByteBuffer value) throws IOException {
+    db.put(family, key, value);
+  }
+
   @Override
   public void put(byte[] key, byte[] value) throws IOException {
     db.put(family, key, value);
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
index cc44361908..4c48ed0c7b 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
@@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -445,6 +446,20 @@ public final class RocksDatabase {
     }
   }
 
+  public void put(ColumnFamily family, ByteBuffer key, ByteBuffer value)
+      throws IOException {
+    assertClose();
+    try {
+      counter.incrementAndGet();
+      db.get().put(family.getHandle(), writeOptions, key, value);
+    } catch (RocksDBException e) {
+      closeOnError(e, true);
+      throw toIOException(this, "put " + bytes2String(key), e);
+    } finally {
+      counter.decrementAndGet();
+    }
+  }
+
   public void flush() throws IOException {
     assertClose();
     try (ManagedFlushOptions options = new ManagedFlushOptions()) {
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/StringCodec.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/StringCodec.java
index dc432ab9f1..9b5bcb0237 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/StringCodec.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/StringCodec.java
@@ -18,17 +18,34 @@
  */
 package org.apache.hadoop.hdds.utils.db;
 
-import java.io.IOException;
+import java.util.function.IntFunction;
+import javax.annotation.Nonnull;
 
 import org.apache.hadoop.hdds.StringUtils;
 
 /**
  * Codec to convert String to/from byte array.
  */
-public class StringCodec implements Codec<String> {
+public final class StringCodec implements Codec<String> {
+  @Override
+  public boolean supportCodecBuffer() {
+    return true;
+  }
+
+  @Override
+  public CodecBuffer toCodecBuffer(@Nonnull String object,
+      IntFunction<CodecBuffer> allocator) {
+    final byte[] array = toPersistedFormat(object);
+    return allocator.apply(array.length).put(array);
+  }
+
+  @Override
+  public String fromCodecBuffer(@Nonnull CodecBuffer buffer) {
+    return StringUtils.bytes2String(buffer.asReadOnlyByteBuffer());
+  }
 
   @Override
-  public byte[] toPersistedFormat(String object) throws IOException {
+  public byte[] toPersistedFormat(String object) {
     if (object != null) {
       return StringUtils.string2Bytes(object);
     } else {
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
index 4eba908e67..2287a30b32 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
@@ -49,7 +49,7 @@ import static 
org.apache.hadoop.hdds.utils.db.cache.CacheResult.CacheStatus.NOT_
  */
 public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
 
-  private final Table<byte[], byte[]> rawTable;
+  private final RDBTable rawTable;
 
   private final CodecRegistry codecRegistry;
 
@@ -69,8 +69,7 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, 
VALUE> {
    * @param keyType
    * @param valueType
    */
-  public TypedTable(
-      Table<byte[], byte[]> rawTable,
+  public TypedTable(RDBTable rawTable,
       CodecRegistry codecRegistry, Class<KEY> keyType,
       Class<VALUE> valueType) throws IOException {
     this(rawTable, codecRegistry, keyType, valueType,
@@ -86,8 +85,7 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, 
VALUE> {
    * @param cacheType
    * @throws IOException
    */
-  public TypedTable(
-      Table<byte[], byte[]> rawTable,
+  public TypedTable(RDBTable rawTable,
       CodecRegistry codecRegistry, Class<KEY> keyType,
       Class<VALUE> valueType,
       CacheType cacheType) throws IOException {
@@ -119,6 +117,16 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, 
VALUE> {
 
   @Override
   public void put(KEY key, VALUE value) throws IOException {
+    final Codec<KEY> keyCodec = codecRegistry.getCodec(key);
+    final Codec<VALUE> valueCodec = codecRegistry.getCodec(value);
+    if (keyCodec.supportCodecBuffer() && valueCodec.supportCodecBuffer()) {
+      try (CodecBuffer k = keyCodec.toDirectCodecBuffer(key);
+           CodecBuffer v = valueCodec.toDirectCodecBuffer(value)) {
+        rawTable.put(k.asReadOnlyByteBuffer(), v.asReadOnlyByteBuffer());
+      }
+      return;
+    }
+
     byte[] keyData = codecRegistry.asRawData(key);
     byte[] valueData = codecRegistry.asRawData(value);
     rawTable.put(keyData, valueData);
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestRDBSnapshotProvider.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestRDBSnapshotProvider.java
index 48f7745dba..54fb72645a 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestRDBSnapshotProvider.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestRDBSnapshotProvider.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.utils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.StringUtils;
+import org.apache.hadoop.hdds.utils.db.CodecBuffer;
 import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
 import org.apache.hadoop.hdds.utils.db.RDBStore;
 import org.apache.hadoop.hdds.utils.db.Table;
@@ -132,6 +133,7 @@ public class TestRDBSnapshotProvider {
     if (testDir.exists()) {
       FileUtil.fullyDelete(testDir);
     }
+    CodecBuffer.assertNoLeaks();
   }
 
   @Test
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java
new file mode 100644
index 0000000000..448eafe184
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.utils.db;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Test {@link Codec} implementations.
+ */
+public final class TestCodec {
+  static final Logger LOG = LoggerFactory.getLogger(TestCodec.class);
+  static final int NUM_LOOPS = 10;
+
+  /** Force gc to check leakage. */
+  static void gc() throws InterruptedException {
+    // use WeakReference to detect gc
+    Object obj = new Object();
+    final WeakReference<Object> weakRef = new WeakReference<>(obj);
+    obj = null;
+
+    // loop until gc has completed.
+    for (int i = 0; weakRef.get() != null; i++) {
+      LOG.info("gc {}", i);
+      System.gc();
+      Thread.sleep(100);
+    }
+    CodecBuffer.assertNoLeaks();
+  }
+
+  @Test
+  public void testIntegerCodec() throws Exception {
+    final IntegerCodec codec = new IntegerCodec();
+    runTest(codec, 0, Integer.BYTES);
+    runTest(codec, 1, Integer.BYTES);
+    runTest(codec, -1, Integer.BYTES);
+    runTest(codec, Integer.MAX_VALUE, Integer.BYTES);
+    runTest(codec, Integer.MIN_VALUE, Integer.BYTES);
+
+    for (int i = 0; i < NUM_LOOPS; i++) {
+      final int original = ThreadLocalRandom.current().nextInt();
+      runTest(codec, original, Integer.BYTES);
+    }
+    gc();
+  }
+
+  @Test
+  public void testLongCodec() throws Exception {
+    final LongCodec codec = new LongCodec();
+    runTest(codec, 0L, Long.BYTES);
+    runTest(codec, 1L, Long.BYTES);
+    runTest(codec, -1L, Long.BYTES);
+    runTest(codec, Long.MAX_VALUE, Long.BYTES);
+    runTest(codec, Long.MIN_VALUE, Long.BYTES);
+
+    for (int i = 0; i < NUM_LOOPS; i++) {
+      final long original = ThreadLocalRandom.current().nextLong();
+      runTest(codec, original, Long.BYTES);
+    }
+    gc();
+  }
+
+  @Test
+  public void testStringCodec() throws Exception {
+    final StringCodec codec = new StringCodec();
+    runTest(codec, "", 0);
+
+    for (int i = 0; i < NUM_LOOPS; i++) {
+      final String original = "test" + ThreadLocalRandom.current().nextLong();
+      runTest(codec, original, original.length());
+    }
+    gc();
+  }
+
+  static <T> void runTest(Codec<T> codec, T original,
+      Integer serializedSize) throws Exception {
+    Assertions.assertTrue(codec.supportCodecBuffer());
+
+    // serialize to byte[]
+    final byte[] array = codec.toPersistedFormat(original);
+    if (serializedSize != null) {
+      Assertions.assertEquals(serializedSize, array.length);
+    }
+    // deserialize from byte[]
+    final T fromArray = codec.fromPersistedFormat(array);
+    Assertions.assertEquals(original, fromArray);
+
+    // serialize to CodecBuffer
+    final CodecBuffer codecBuffer = codec.toCodecBuffer(
+        original, CodecBuffer::allocateHeap);
+    final ByteBuffer byteBuffer = codecBuffer.asReadOnlyByteBuffer();
+    Assertions.assertEquals(array.length, byteBuffer.remaining());
+    for (int i = 0; i < array.length; i++) {
+      // assert exact content
+      Assertions.assertEquals(array[i], byteBuffer.get(i));
+    }
+
+    // deserialize from CodecBuffer
+    final T fromBuffer = codec.fromCodecBuffer(codecBuffer);
+    codecBuffer.release();
+    Assertions.assertEquals(original, fromBuffer);
+
+    // deserialize from wrapped buffer
+    final CodecBuffer wrapped = CodecBuffer.wrap(array);
+    final T fromWrappedArray = codec.fromCodecBuffer(wrapped);
+    wrapped.release();
+    Assertions.assertEquals(original, fromWrappedArray);
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
index a939531471..7831e03efd 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
@@ -97,6 +97,7 @@ public class TestRDBStore {
     if (rdbStore != null) {
       rdbStore.close();
     }
+    CodecBuffer.assertNoLeaks();
   }
 
   public void insertRandomData(RDBStore dbStore, int familyIndex)
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java
index ad1c03b868..cc249a57b7 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java
@@ -91,6 +91,7 @@ public class TestTypedRDBTableStore {
     if (rdbStore != null) {
       rdbStore.close();
     }
+    CodecBuffer.assertNoLeaks();
   }
 
   @Test
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 5c63f98a15..8c325e40e7 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.hdds.utils.db.CodecBuffer;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectMetrics;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.ozone.client.OzoneClient;
@@ -464,6 +465,7 @@ public class MiniOzoneClusterImpl implements 
MiniOzoneCluster {
       DefaultMetricsSystem.shutdown();
 
       ManagedRocksObjectMetrics.INSTANCE.assertNoLeaks();
+      CodecBuffer.assertNoLeaks();
     } catch (IOException e) {
       LOG.error("Exception while shutting down the cluster.", e);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to