This is an automated email from the ASF dual-hosted git repository.

swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fa7fffb4c6 HDDS-14166. Get rid of byte array operations from 
RDBBatchOperation for PUT and DELETE (#9552)
9fa7fffb4c6 is described below

commit 9fa7fffb4c6245caaad305655a73501bfb5f6bb6
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Fri Jan 9 20:38:39 2026 -0500

    HDDS-14166. Get rid of byte array operations from RDBBatchOperation for PUT 
and DELETE (#9552)
---
 hadoop-hdds/framework/pom.xml                      |   6 +
 .../hadoop/hdds/utils/db/CodecBufferCodec.java     |   6 +-
 .../org/apache/hadoop/hdds/utils/db/DBStore.java   |  45 -----
 .../hadoop/hdds/utils/db/RDBBatchOperation.java    | 156 +++++++--------
 .../org/apache/hadoop/hdds/utils/db/RDBTable.java  |   8 +
 .../apache/hadoop/hdds/utils/db/RocksDatabase.java |  16 +-
 .../apache/hadoop/hdds/utils/db/TypedTable.java    |  14 +-
 .../hdds/utils/db/TestRDBBatchOperation.java       | 213 +++++++++++++++++++++
 hadoop-hdds/managed-rocksdb/pom.xml                |  22 ++-
 .../TrackingUtilManagedWriteBatchForTesting.java   | 195 +++++++++++++++++++
 pom.xml                                            |   6 +
 11 files changed, 529 insertions(+), 158 deletions(-)

diff --git a/hadoop-hdds/framework/pom.xml b/hadoop-hdds/framework/pom.xml
index ea7f08edd82..27654930c41 100644
--- a/hadoop-hdds/framework/pom.xml
+++ b/hadoop-hdds/framework/pom.xml
@@ -307,6 +307,12 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.ozone</groupId>
+      <artifactId>hdds-managed-rocksdb</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.ozone</groupId>
       <artifactId>hdds-test-utils</artifactId>
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java
index 9d2944fab66..416cc8bb9c1 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java
@@ -39,12 +39,12 @@
  */
 public final class CodecBufferCodec implements Codec<CodecBuffer> {
 
-  private static final Codec<CodecBuffer> DIRECT_INSTANCE = new 
CodecBufferCodec(true);
-  private static final Codec<CodecBuffer> NON_DIRECT_INSTANCE = new 
CodecBufferCodec(false);
+  private static final CodecBufferCodec DIRECT_INSTANCE = new 
CodecBufferCodec(true);
+  private static final CodecBufferCodec NON_DIRECT_INSTANCE = new 
CodecBufferCodec(false);
 
   private final CodecBuffer.Allocator allocator;
 
-  public static Codec<CodecBuffer> get(boolean direct) {
+  public static CodecBufferCodec get(boolean direct) {
     return direct ? DIRECT_INSTANCE : NON_DIRECT_INSTANCE;
   }
 
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
index f83f9b3d10a..a269ebc56b9 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
@@ -17,23 +17,13 @@
 
 package org.apache.hadoop.hdds.utils.db;
 
-import static org.apache.hadoop.hdds.utils.db.Table.newKeyValue;
-
 import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 import org.apache.hadoop.hdds.annotation.InterfaceStability;
-import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
 import org.apache.hadoop.hdds.utils.db.cache.TableCache;
 import org.apache.hadoop.hdds.utils.db.cache.TableCache.CacheType;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions;
-import org.apache.hadoop.ozone.util.ClosableIterator;
 import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
 import org.apache.ratis.util.UncheckedAutoCloseable;
 
@@ -180,39 +170,4 @@ DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long 
limitCount)
   boolean isClosed();
 
   String getSnapshotsParentDir();
-
-  /**
-   * Creates an iterator that merges multiple tables into a single iterator,
-   * grouping values with the same key across the tables.
-   *
-   * @param <KEY> the type of keys for the tables
-   * @param keyComparator the comparator used to compare keys from different 
tables
-   * @param prefix the prefix used to filter entries of each table
-   * @param table one or more tables to merge
-   * @return a closable iterator over merged key-value pairs, where each key 
corresponds
-   *         to a collection of values from the tables
-   */
-  default <KEY> ClosableIterator<KeyValue<KEY, Collection<Object>>> 
getMergeIterator(
-      Comparator<KEY> keyComparator, KEY prefix, Table<KEY, Object>... table) {
-    List<Object> tableValues = IntStream.range(0, table.length).mapToObj(i -> 
null).collect(Collectors.toList());
-    KeyValue<KEY, Object> defaultNullValue = newKeyValue(null, null);
-    Comparator<KeyValue<KEY, Object>> comparator = 
Comparator.comparing(KeyValue::getKey, keyComparator);
-    return new MinHeapMergeIterator<KeyValue<KEY, Object>, 
Table.KeyValueIterator<KEY, Object>,
-        KeyValue<KEY, Collection<Object>>>(table.length, comparator) {
-      @Override
-      protected Table.KeyValueIterator<KEY, Object> getIterator(int idx) 
throws IOException {
-        return table[idx].iterator(prefix);
-      }
-
-      @Override
-      protected KeyValue<KEY, Collection<Object>> merge(Map<Integer, 
KeyValue<KEY, Object>> keysToMerge) {
-        KEY key = keysToMerge.values().stream().findAny()
-            .orElseThrow(() -> new NoSuchElementException("No keys 
found")).getKey();
-        for (int i = 0; i < tableValues.size(); i++) {
-          tableValues.set(i, keysToMerge.getOrDefault(i, 
defaultNullValue).getValue());
-        }
-        return newKeyValue(key, tableValues);
-      }
-    };
-  }
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
index de181ae0c8d..513c732d302 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
@@ -49,6 +49,7 @@ public final class RDBBatchOperation implements 
BatchOperation {
   static final Logger LOG = LoggerFactory.getLogger(RDBBatchOperation.class);
 
   private static final AtomicInteger BATCH_COUNT = new AtomicInteger();
+  private static final CodecBufferCodec DIRECT_CODEC_BUFFER_CODEC = 
CodecBufferCodec.get(true);
 
   private final String name = "Batch-" + BATCH_COUNT.getAndIncrement();
 
@@ -136,76 +137,91 @@ public void close() {
   }
 
   private abstract static class Op implements Closeable {
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws 
RocksDatabaseException;
+
+    abstract int totalLength();
+
+    boolean closeImpl() {
+      return closed.compareAndSet(false, true);
+    }
+
+    @Override
+    public final void close() {
+      closeImpl();
+    }
+  }
+
+  private abstract static class SingleKeyOp extends Op {
+    private final CodecBuffer keyBuffer;
     private final Bytes keyBytes;
 
-    private Op(Bytes keyBytes) {
-      this.keyBytes = keyBytes;
+    private SingleKeyOp(CodecBuffer keyBuffer) {
+      this.keyBuffer = Objects.requireNonNull(keyBuffer);
+      this.keyBytes = Bytes.newBytes(keyBuffer);
     }
 
-    abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws 
RocksDatabaseException;
+    CodecBuffer getKeyBuffer() {
+      return keyBuffer;
+    }
 
-    abstract int keyLen();
+    Bytes getKeyBytes() {
+      return keyBytes;
+    }
+
+    int keyLen() {
+      return getKeyBuffer().readableBytes();
+    }
 
     int valLen() {
       return 0;
     }
 
+    @Override
     int totalLength() {
       return keyLen() + valLen();
     }
 
     @Override
-    public void close() {
-      if (keyBytes != null) {
-        keyBytes.close();
+    boolean closeImpl() {
+      if (super.closeImpl()) {
+        IOUtils.close(LOG, keyBuffer, keyBytes);
+        return true;
       }
+      return false;
     }
   }
 
   /**
    * Delete operation to be applied to a {@link ColumnFamily} batch.
    */
-  private static final class DeleteOp extends Op {
-    private final byte[] key;
+  private static final class DeleteOp extends SingleKeyOp {
 
-    private DeleteOp(byte[] key, Bytes keyBytes) {
-      super(Objects.requireNonNull(keyBytes, "keyBytes == null"));
-      this.key = Objects.requireNonNull(key, "key == null");
+    private DeleteOp(CodecBuffer key) {
+      super(key);
     }
 
     @Override
     public void apply(ColumnFamily family, ManagedWriteBatch batch) throws 
RocksDatabaseException {
-      family.batchDelete(batch, this.key);
-    }
-
-    @Override
-    public int keyLen() {
-      return key.length;
+      family.batchDelete(batch, this.getKeyBuffer().asReadOnlyByteBuffer());
     }
   }
 
   /**
    * Put operation to be applied to a {@link ColumnFamily} batch using the 
CodecBuffer api.
    */
-  private final class PutOp extends Op {
-    private final CodecBuffer key;
+  private final class PutOp extends SingleKeyOp {
     private final CodecBuffer value;
-    private final AtomicBoolean closed = new AtomicBoolean(false);
 
-    private PutOp(CodecBuffer key, CodecBuffer value, Bytes keyBytes) {
-      super(keyBytes);
-      this.key = key;
-      this.value = value;
+    private PutOp(CodecBuffer key, CodecBuffer value) {
+      super(Objects.requireNonNull(key, "key == null"));
+      this.value = Objects.requireNonNull(value, "value == null");
     }
 
     @Override
     public void apply(ColumnFamily family, ManagedWriteBatch batch) throws 
RocksDatabaseException {
-      family.batchPut(batch, key.asReadOnlyByteBuffer(), 
value.asReadOnlyByteBuffer());
-    }
-
-    @Override
-    public int keyLen() {
-      return key.readableBytes();
+      family.batchPut(batch, getKeyBuffer().asReadOnlyByteBuffer(), 
value.asReadOnlyByteBuffer());
     }
 
     @Override
@@ -214,41 +230,12 @@ public int valLen() {
     }
 
     @Override
-    public void close() {
-      if (closed.compareAndSet(false, true)) {
-        key.release();
-        value.release();
+    boolean closeImpl() {
+      if (super.closeImpl()) {
+        IOUtils.close(LOG, value);
+        return true;
       }
-      super.close();
-    }
-  }
-
-  /**
-   * Put operation to be applied to a {@link ColumnFamily} batch using the 
byte array api.
-   */
-  private static final class ByteArrayPutOp extends Op {
-    private final byte[] key;
-    private final byte[] value;
-
-    private ByteArrayPutOp(byte[] key, byte[] value, Bytes keyBytes) {
-      super(keyBytes);
-      this.key = Objects.requireNonNull(key, "key == null");
-      this.value = Objects.requireNonNull(value, "value == null");
-    }
-
-    @Override
-    public void apply(ColumnFamily family, ManagedWriteBatch batch) throws 
RocksDatabaseException {
-      family.batchPut(batch, key, value);
-    }
-
-    @Override
-    public int keyLen() {
-      return key.length;
-    }
-
-    @Override
-    public int valLen() {
-      return value.length;
+      return false;
     }
   }
 
@@ -271,7 +258,7 @@ private class FamilyCache {
        * It supports operations such as additions and deletions while 
maintaining the ability to overwrite
        * existing entries when necessary.
        */
-      private final Map<Bytes, Op> ops = new HashMap<>();
+      private final Map<Bytes, SingleKeyOp> ops = new HashMap<>();
       private boolean isCommit;
 
       private long batchSize;
@@ -312,7 +299,7 @@ void clear() {
       }
 
       private void deleteIfExist(Bytes key) {
-        final Op previous = ops.remove(key);
+        final SingleKeyOp previous = ops.remove(key);
         if (previous != null) {
           previous.close();
           discardedSize += previous.totalLength();
@@ -322,8 +309,9 @@ private void deleteIfExist(Bytes key) {
         }
       }
 
-      void overwriteIfExists(Bytes key, Op op) {
+      void overwriteIfExists(SingleKeyOp op) {
         Preconditions.checkState(!isCommit, "%s is already committed.", this);
+        Bytes key = op.getKeyBytes();
         deleteIfExist(key);
         batchSize += op.totalLength();
         Op overwritten = ops.put(key, op);
@@ -336,21 +324,12 @@ void overwriteIfExists(Bytes key, Op op) {
 
       void put(CodecBuffer key, CodecBuffer value) {
         putCount++;
-        // always release the key with the value
-        Bytes keyBytes = Bytes.newBytes(key);
-        overwriteIfExists(keyBytes, new PutOp(key, value, keyBytes));
-      }
-
-      void put(byte[] key, byte[] value) {
-        putCount++;
-        Bytes keyBytes = new Bytes(key);
-        overwriteIfExists(keyBytes, new ByteArrayPutOp(key, value, keyBytes));
+        overwriteIfExists(new PutOp(key, value));
       }
 
-      void delete(byte[] key) {
+      void delete(CodecBuffer key) {
         delCount++;
-        Bytes keyBytes = new Bytes(key);
-        overwriteIfExists(keyBytes, new DeleteOp(key, keyBytes));
+        overwriteIfExists(new DeleteOp(key));
       }
 
       String putString(int keySize, int valueSize) {
@@ -378,14 +357,8 @@ void put(ColumnFamily f, CodecBuffer key, CodecBuffer 
value) {
           .put(key, value);
     }
 
-    void put(ColumnFamily f, byte[] key, byte[] value) {
-      name2cache.computeIfAbsent(f.getName(), k -> new FamilyCache(f))
-          .put(key, value);
-    }
-
-    void delete(ColumnFamily family, byte[] key) {
-      name2cache.computeIfAbsent(family.getName(), k -> new 
FamilyCache(family))
-          .delete(key);
+    void delete(ColumnFamily family, CodecBuffer key) {
+      name2cache.computeIfAbsent(family.getName(), k -> new 
FamilyCache(family)).delete(key);
     }
 
     /** Prepare batch write for the entire cache. */
@@ -461,6 +434,10 @@ public void close() {
   }
 
   public void delete(ColumnFamily family, byte[] key) {
+    opCache.delete(family, DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key));
+  }
+
+  public void delete(ColumnFamily family, CodecBuffer key) {
     opCache.delete(family, key);
   }
 
@@ -469,6 +446,7 @@ public void put(ColumnFamily family, CodecBuffer key, 
CodecBuffer value) {
   }
 
   public void put(ColumnFamily family, byte[] key, byte[] value) {
-    opCache.put(family, key, value);
+    opCache.put(family, DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key),
+        DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(value));
   }
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
index f732735cbe3..2aef5daa3c9 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
@@ -193,6 +193,14 @@ public void deleteRange(byte[] beginKey, byte[] endKey) 
throws RocksDatabaseExce
     db.deleteRange(family, beginKey, endKey);
   }
 
+  void deleteWithBatch(BatchOperation batch, CodecBuffer key) {
+    if (batch instanceof RDBBatchOperation) {
+      ((RDBBatchOperation) batch).delete(family, key);
+    } else {
+      throw new IllegalArgumentException("Unexpected batch class: " + 
batch.getClass().getSimpleName());
+    }
+  }
+
   @Override
   public void deleteWithBatch(BatchOperation batch, byte[] key) {
     if (batch instanceof RDBBatchOperation) {
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
index 659954a861b..5aff9351804 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
@@ -299,7 +299,7 @@ public ColumnFamilyHandle getHandle() {
       return handle;
     }
 
-    public void batchDelete(ManagedWriteBatch writeBatch, byte[] key)
+    public void batchDelete(ManagedWriteBatch writeBatch, ByteBuffer key)
         throws RocksDatabaseException {
       try (UncheckedAutoCloseable ignored = acquire()) {
         writeBatch.delete(getHandle(), key);
@@ -308,20 +308,6 @@ public void batchDelete(ManagedWriteBatch writeBatch, 
byte[] key)
       }
     }
 
-    public void batchPut(ManagedWriteBatch writeBatch, byte[] key, byte[] 
value)
-        throws RocksDatabaseException {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("batchPut array key {}", bytes2String(key));
-        LOG.debug("batchPut array value {}", bytes2String(value));
-      }
-
-      try (UncheckedAutoCloseable ignored = acquire()) {
-        writeBatch.put(getHandle(), key, value);
-      } catch (RocksDBException e) {
-        throw toRocksDatabaseException(this, "batchPut key " + 
bytes2String(key), e);
-      }
-    }
-
     public void batchPut(ManagedWriteBatch writeBatch, ByteBuffer key,
         ByteBuffer value) throws RocksDatabaseException {
       if (LOG.isDebugEnabled()) {
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
index 8000d48c618..59e924529ce 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
@@ -377,7 +377,19 @@ public void delete(KEY key) throws RocksDatabaseException, 
CodecException {
 
   @Override
   public void deleteWithBatch(BatchOperation batch, KEY key) throws 
CodecException {
-    rawTable.deleteWithBatch(batch, encodeKey(key));
+    if (supportCodecBuffer) {
+      CodecBuffer keyBuffer = null;
+      try {
+        keyBuffer = keyCodec.toDirectCodecBuffer(key);
+        // The buffers will be released after commit.
+        rawTable.deleteWithBatch(batch, keyBuffer);
+      } catch (Exception e) {
+        IOUtils.closeQuietly(keyBuffer);
+        throw e;
+      }
+    } else {
+      rawTable.deleteWithBatch(batch, encodeKey(key));
+    }
   }
 
   @Override
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java
new file mode 100644
index 00000000000..aee70feaceb
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db;
+
+import static org.apache.hadoop.hdds.StringUtils.string2Bytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch;
+import 
org.apache.hadoop.hdds.utils.db.managed.TrackingUtilManagedWriteBatchForTesting;
+import 
org.apache.hadoop.hdds.utils.db.managed.TrackingUtilManagedWriteBatchForTesting.OpType;
+import 
org.apache.hadoop.hdds.utils.db.managed.TrackingUtilManagedWriteBatchForTesting.Operation;
+import org.apache.ratis.util.function.CheckedConsumer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+
+/**
+ * The TestRDBBatchOperation class provides test cases to validate the 
functionality of RDB batch operations
+ * in a RocksDB-based backend. It verifies the correct behavior of write 
operations using batch processing
+ * and ensures the integrity of operations like put and delete when performed 
in batch mode.
+ */
+public class TestRDBBatchOperation {
+
+  static {
+    ManagedRocksObjectUtils.loadRocksDBLibrary();
+  }
+
+  @TempDir
+  private Path tempDir;
+
+  private static Operation getOperation(String key, String value, OpType 
opType) {
+    return new Operation(string2Bytes(key), value == null ? null : 
string2Bytes(value), opType);
+  }
+
+  @Test
+  public void testBatchOperation() throws RocksDatabaseException, 
CodecException, RocksDBException {
+    try (TrackingUtilManagedWriteBatchForTesting writeBatch = new 
TrackingUtilManagedWriteBatchForTesting();
+         RDBBatchOperation batchOperation = 
RDBBatchOperation.newAtomicOperation(writeBatch)) {
+      ColumnFamilyHandle columnFamilyHandle = 
Mockito.mock(ColumnFamilyHandle.class);
+      RocksDatabase.ColumnFamily columnFamily = 
Mockito.mock(RocksDatabase.ColumnFamily.class);
+      doAnswer((i) -> {
+        ((ManagedWriteBatch)i.getArgument(0))
+            .put(columnFamilyHandle, (ByteBuffer) i.getArgument(1), 
(ByteBuffer) i.getArgument(2));
+        return null;
+      }).when(columnFamily).batchPut(any(ManagedWriteBatch.class), 
any(ByteBuffer.class), any(ByteBuffer.class));
+
+      doAnswer((i) -> {
+        ((ManagedWriteBatch)i.getArgument(0))
+            .delete(columnFamilyHandle, (ByteBuffer) i.getArgument(1));
+        return null;
+      }).when(columnFamily).batchDelete(any(ManagedWriteBatch.class), 
any(ByteBuffer.class));
+
+      when(columnFamily.getHandle()).thenReturn(columnFamilyHandle);
+      when(columnFamilyHandle.getName()).thenReturn(string2Bytes("test"));
+      when(columnFamily.getName()).thenReturn("test");
+      Codec<String> codec = StringCodec.get();
+      // OP1: This should be skipped in favor of OP9.
+      batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key01"), 
codec.toDirectCodecBuffer("value01"));
+      // OP2
+      batchOperation.put(columnFamily, codec.toPersistedFormat("key02"), 
codec.toPersistedFormat("value02"));
+      // OP3: This should be skipped in favor of OP4.
+      batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key03"), 
codec.toDirectCodecBuffer("value03"));
+      // OP4
+      batchOperation.put(columnFamily, codec.toPersistedFormat("key03"), 
codec.toPersistedFormat("value04"));
+      // OP5
+      batchOperation.delete(columnFamily, codec.toDirectCodecBuffer("key05"));
+      // OP6
+      batchOperation.delete(columnFamily, codec.toPersistedFormat("key10"));
+      // OP7
+      batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key04"), 
codec.toDirectCodecBuffer("value04"));
+      // OP8
+      batchOperation.put(columnFamily, codec.toPersistedFormat("key06"), 
codec.toPersistedFormat("value05"));
+      //OP9
+      batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key01"), 
codec.toDirectCodecBuffer("value011"));
+
+
+      RocksDatabase db = Mockito.mock(RocksDatabase.class);
+      doNothing().when(db).batchWrite(any());
+      batchOperation.commit(db);
+      Set<Operation> expectedOps = ImmutableSet.of(
+          getOperation("key01", "value011", OpType.PUT_DIRECT),
+          getOperation("key02", "value02", OpType.PUT_DIRECT),
+          getOperation("key03", "value04", OpType.PUT_DIRECT),
+          getOperation("key05", null, OpType.DELETE_DIRECT),
+          getOperation("key10", null, OpType.DELETE_DIRECT),
+          getOperation("key04", "value04", OpType.PUT_DIRECT),
+          getOperation("key06", "value05", OpType.PUT_DIRECT));
+      assertEquals(Collections.singleton("test"), 
writeBatch.getOperations().keySet());
+      assertEquals(expectedOps, new 
HashSet<>(writeBatch.getOperations().get("test")));
+    }
+  }
+
+  private DBStore getDBStore(OzoneConfiguration conf, String name, String 
tableName) throws RocksDatabaseException {
+    return DBStoreBuilder.newBuilder(conf)
+        .setName(name).setPath(tempDir).addTable(tableName).build();
+  }
+
+  private void performPut(Table<String, String> withBatchTable, BatchOperation 
batchOperation,
+      Table<String, String> withoutBatchTable, String key) throws 
RocksDatabaseException, CodecException {
+    String value = getRandomString();
+    withBatchTable.putWithBatch(batchOperation, key, value);
+    withoutBatchTable.put(key, value);
+  }
+
+  private void performDelete(Table<String, String> withBatchTable, 
BatchOperation batchOperation,
+      Table<String, String> withoutBatchTable, String key) throws 
RocksDatabaseException, CodecException {
+    withBatchTable.deleteWithBatch(batchOperation, key);
+    withoutBatchTable.delete(key);
+  }
+
+  private String getRandomString() {
+    int length = ThreadLocalRandom.current().nextInt(1, 1024);
+    return RandomStringUtils.insecure().next(length);
+  }
+
+  private void performOpWithRandomKey(CheckedConsumer<String, IOException> op, 
Set<String> keySet,
+      List<String> keyList) throws IOException {
+    String key = getRandomString();
+    op.accept(key);
+    if (!keySet.contains(key)) {
+      keyList.add(key);
+      keySet.add(key);
+    }
+  }
+
+  private void performOpWithRandomPreExistingKey(CheckedConsumer<String, 
IOException> op, List<String> keyList)
+      throws IOException {
+    int randomIndex = ThreadLocalRandom.current().nextInt(0, keyList.size());
+    op.accept(keyList.get(randomIndex));
+  }
+
+  @Test
+  public void testRDBBatchOperationWithRDB() throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    String tableName = "test";
+    try (DBStore dbStoreWithBatch = getDBStore(conf, "WithBatch.db", 
tableName);
+         DBStore dbStoreWithoutBatch = getDBStore(conf, "WithoutBatch.db", 
tableName)) {
+      try (BatchOperation batchOperation = 
dbStoreWithBatch.initBatchOperation()) {
+        Table<String, String> withBatchTable = 
dbStoreWithBatch.getTable(tableName,
+            StringCodec.get(), StringCodec.get());
+        Table<String, String> withoutBatchTable = 
dbStoreWithoutBatch.getTable(tableName,
+            StringCodec.get(), StringCodec.get());
+        List<String> keyList = new ArrayList<>();
+        Set<String> keySet = new HashSet<>();
+        List<CheckedConsumer<String, IOException>> ops = Arrays.asList(
+            (key) -> performPut(withBatchTable, batchOperation, 
withoutBatchTable, key),
+            (key) -> performDelete(withBatchTable, batchOperation, 
withoutBatchTable, key));
+        for (int i = 0; i < 30000; i++) {
+          CheckedConsumer<String, IOException> op = 
ops.get(ThreadLocalRandom.current().nextInt(ops.size()));
+          boolean performWithPreExistingKey = 
ThreadLocalRandom.current().nextBoolean();
+          if (performWithPreExistingKey && !keyList.isEmpty()) {
+            performOpWithRandomPreExistingKey(op, keyList);
+          } else {
+            performOpWithRandomKey(op, keySet, keyList);
+          }
+        }
+        dbStoreWithBatch.commitBatchOperation(batchOperation);
+      }
+      Table<CodecBuffer, CodecBuffer> withBatchTable = 
dbStoreWithBatch.getTable(tableName,
+          CodecBufferCodec.get(true), CodecBufferCodec.get(true));
+      Table<CodecBuffer, CodecBuffer> withoutBatchTable = 
dbStoreWithoutBatch.getTable(tableName,
+          CodecBufferCodec.get(true), CodecBufferCodec.get(true));
+      try (Table.KeyValueIterator<CodecBuffer, CodecBuffer> itr1 = 
withBatchTable.iterator();
+           Table.KeyValueIterator<CodecBuffer, CodecBuffer> itr2 = 
withoutBatchTable.iterator();) {
+        while (itr1.hasNext() || itr2.hasNext()) {
+          assertEquals(itr1.hasNext(), itr2.hasNext(), "Expected same number 
of entries");
+          KeyValue<CodecBuffer, CodecBuffer> kv1 = itr1.next();
+          KeyValue<CodecBuffer, CodecBuffer> kv2 = itr2.next();
+          assertEquals(kv1.getKey().asReadOnlyByteBuffer(), 
kv2.getKey().asReadOnlyByteBuffer(),
+              "Expected same keys");
+          assertEquals(kv1.getValue().asReadOnlyByteBuffer(), 
kv2.getValue().asReadOnlyByteBuffer(),
+              "Expected same keys");
+        }
+      }
+    }
+  }
+}
diff --git a/hadoop-hdds/managed-rocksdb/pom.xml 
b/hadoop-hdds/managed-rocksdb/pom.xml
index 5e6976500f9..1a1fb3a82be 100644
--- a/hadoop-hdds/managed-rocksdb/pom.xml
+++ b/hadoop-hdds/managed-rocksdb/pom.xml
@@ -25,11 +25,6 @@
   <name>Apache Ozone HDDS Managed RocksDB</name>
   <description>Apache Ozone Managed RocksDB library</description>
 
-  <properties>
-    <!-- no tests in this module so far -->
-    <maven.test.skip>true</maven.test.skip>
-  </properties>
-
   <dependencies>
     <dependency>
       <groupId>com.google.guava</groupId>
@@ -63,6 +58,11 @@
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
@@ -74,6 +74,18 @@
           <proc>none</proc>
         </configuration>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>test-jar</id>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 </project>
diff --git 
a/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TrackingUtilManagedWriteBatchForTesting.java
 
b/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TrackingUtilManagedWriteBatchForTesting.java
new file mode 100644
index 00000000000..1c9241c1d9f
--- /dev/null
+++ 
b/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TrackingUtilManagedWriteBatchForTesting.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db.managed;
+
+import static org.apache.hadoop.hdds.StringUtils.bytes2String;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+
+/**
+ * The TrackingUtilManagedWriteBatch class extends ManagedWriteBatch to 
provide functionality
+ * for tracking operations in a managed write batch context. Operations such 
as put, delete,
+ * merge, and delete range are managed and tracked, along with their 
corresponding operation types.
+ *
+ * This class supports direct and indirect operation types, delineated in the 
OpType enumeration.
+ * Direct operations are created using ByteBuffers while indirect operations 
are created using
+ * byte arrays.
+ */
+public class TrackingUtilManagedWriteBatchForTesting extends ManagedWriteBatch 
{
+
+  private final Map<String, List<Operation>> operations = new HashMap<>();
+
+  /**
+   * The OpType enumeration defines the different types of operations 
performed in a batch.
+   */
+  public enum OpType {
+    PUT_DIRECT,
+    DELETE_DIRECT,
+    MERGE_DIRECT,
+    DELETE_RANGE_INDIRECT,
+    PUT_NON_DIRECT,
+    DELETE_NON_DIRECT,
+    MERGE_NON_DIRECT,
+  }
+
+  /**
+   * The Operation class represents an individual operation to be performed in 
the context of
+   * a batch operation, such as a database write, delete, or merge. Each 
operation is characterized
+   * by a key, value, and an operation type (OpType).
+   *
+   * Operations can be of different types, as defined in the OpType 
enumeration, which include
+   * actions such as put, delete, merge, and delete range, either direct or 
indirect.
+   */
+  public static class Operation {
+    private final byte[] key;
+    private final byte[] value;
+    private final OpType opType;
+
+    public Operation(byte[] key, byte[] value, OpType opType) {
+      this.key = Arrays.copyOf(key, key.length);
+      this.value = value == null ? null : Arrays.copyOf(value, value.length);
+      this.opType = opType;
+    }
+
+    public Operation(byte[] key, OpType opType) {
+      this(key, null, opType);
+    }
+
+    @Override
+    public final boolean equals(Object o) {
+      if (!(o instanceof Operation)) {
+        return false;
+      }
+
+      Operation operation = (Operation) o;
+      return Arrays.equals(key, operation.key) && Arrays.equals(value, 
operation.value) &&
+          opType == operation.opType;
+    }
+
+    @Override
+    public final int hashCode() {
+      return Arrays.hashCode(key) + Arrays.hashCode(value) + opType.hashCode();
+    }
+
+    @Override
+    public String toString() {
+      return "Operation{" +
+          "key=" + bytes2String(key) +
+          ", value=" + (value == null ? null : bytes2String(value)) +
+          ", opType=" + opType +
+          '}';
+    }
+  }
+
+  public Map<String, List<Operation>> getOperations() {
+    return operations;
+  }
+
+  public TrackingUtilManagedWriteBatchForTesting() {
+    super();
+  }
+
+  private byte[] convert(ByteBuffer buffer) {
+    byte[] bytes = new byte[buffer.remaining()];
+    buffer.get(bytes);
+    return bytes;
+  }
+
+  @Override
+  public void delete(ColumnFamilyHandle columnFamilyHandle, byte[] key) throws 
RocksDBException {
+    operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k 
-> new ArrayList<>())
+        .add(new Operation(key, OpType.DELETE_NON_DIRECT));
+  }
+
+  @Override
+  public void delete(ColumnFamilyHandle columnFamilyHandle, ByteBuffer key) 
throws RocksDBException {
+    operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k 
-> new ArrayList<>())
+        .add(new Operation(convert(key), OpType.DELETE_DIRECT));
+  }
+
+  @Override
+  public void delete(byte[] key) throws RocksDBException {
+    operations.computeIfAbsent("", k -> new ArrayList<>()).add(new 
Operation(key, OpType.DELETE_NON_DIRECT));
+  }
+
+  @Override
+  public void delete(ByteBuffer key) throws RocksDBException {
+    operations.computeIfAbsent("", k -> new ArrayList<>())
+        .add(new Operation(convert(key), OpType.DELETE_DIRECT));
+  }
+
+  @Override
+  public void deleteRange(byte[] beginKey, byte[] endKey) {
+    operations.computeIfAbsent("", k -> new ArrayList<>())
+        .add(new Operation(beginKey, endKey, OpType.DELETE_RANGE_INDIRECT));
+  }
+
+  @Override
+  public void deleteRange(ColumnFamilyHandle columnFamilyHandle, byte[] 
beginKey, byte[] endKey)
+      throws RocksDBException {
+    operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k 
-> new ArrayList<>())
+        .add(new Operation(beginKey, endKey, OpType.DELETE_RANGE_INDIRECT));
+  }
+
+  @Override
+  public void merge(ColumnFamilyHandle columnFamilyHandle, byte[] key, byte[] 
value) throws RocksDBException {
+    operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k 
-> new ArrayList<>())
+        .add(new Operation(key, value, OpType.MERGE_NON_DIRECT));
+  }
+
+  @Override
+  public void merge(byte[] key, byte[] value) {
+    operations.computeIfAbsent("", k -> new ArrayList<>())
+        .add(new Operation(key, value, OpType.MERGE_NON_DIRECT));
+  }
+
+  @Override
+  public void put(ColumnFamilyHandle columnFamilyHandle, byte[] key, byte[] 
value) throws RocksDBException {
+    operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k 
-> new ArrayList<>())
+        .add(new Operation(key, value, OpType.PUT_NON_DIRECT));
+  }
+
+  @Override
+  public void put(ColumnFamilyHandle columnFamilyHandle, ByteBuffer key, 
ByteBuffer value) throws RocksDBException {
+    operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k 
-> new ArrayList<>())
+        .add(new Operation(convert(key), convert(value), OpType.PUT_DIRECT));
+  }
+
+  @Override
+  public void put(byte[] key, byte[] value) throws RocksDBException {
+    operations.computeIfAbsent("", k -> new ArrayList<>()).add(new 
Operation(key, value, OpType.PUT_NON_DIRECT));
+  }
+
+  @Override
+  public void put(ByteBuffer key, ByteBuffer value) throws RocksDBException {
+    operations.computeIfAbsent("", k -> new ArrayList<>())
+        .add(new Operation(convert(key), convert(value), OpType.PUT_DIRECT));
+  }
+
+  @Override
+  public void close() {
+    super.close();
+  }
+}
diff --git a/pom.xml b/pom.xml
index 64192a0f072..4bcaadf6acc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1069,6 +1069,12 @@
         <artifactId>hdds-managed-rocksdb</artifactId>
         <version>${hdds.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.ozone</groupId>
+        <artifactId>hdds-managed-rocksdb</artifactId>
+        <version>${hdds.version}</version>
+        <type>test-jar</type>
+      </dependency>
       <dependency>
         <groupId>org.apache.ozone</groupId>
         <artifactId>hdds-rocks-native</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to