This is an automated email from the ASF dual-hosted git repository.
swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 9fa7fffb4c6 HDDS-14166. Get rid of byte array operations from
RDBBatchOperation for PUT and DELETE (#9552)
9fa7fffb4c6 is described below
commit 9fa7fffb4c6245caaad305655a73501bfb5f6bb6
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Fri Jan 9 20:38:39 2026 -0500
HDDS-14166. Get rid of byte array operations from RDBBatchOperation for PUT
and DELETE (#9552)
---
hadoop-hdds/framework/pom.xml | 6 +
.../hadoop/hdds/utils/db/CodecBufferCodec.java | 6 +-
.../org/apache/hadoop/hdds/utils/db/DBStore.java | 45 -----
.../hadoop/hdds/utils/db/RDBBatchOperation.java | 156 +++++++--------
.../org/apache/hadoop/hdds/utils/db/RDBTable.java | 8 +
.../apache/hadoop/hdds/utils/db/RocksDatabase.java | 16 +-
.../apache/hadoop/hdds/utils/db/TypedTable.java | 14 +-
.../hdds/utils/db/TestRDBBatchOperation.java | 213 +++++++++++++++++++++
hadoop-hdds/managed-rocksdb/pom.xml | 22 ++-
.../TrackingUtilManagedWriteBatchForTesting.java | 195 +++++++++++++++++++
pom.xml | 6 +
11 files changed, 529 insertions(+), 158 deletions(-)
diff --git a/hadoop-hdds/framework/pom.xml b/hadoop-hdds/framework/pom.xml
index ea7f08edd82..27654930c41 100644
--- a/hadoop-hdds/framework/pom.xml
+++ b/hadoop-hdds/framework/pom.xml
@@ -307,6 +307,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.ozone</groupId>
+ <artifactId>hdds-managed-rocksdb</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>hdds-test-utils</artifactId>
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java
index 9d2944fab66..416cc8bb9c1 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java
@@ -39,12 +39,12 @@
*/
public final class CodecBufferCodec implements Codec<CodecBuffer> {
- private static final Codec<CodecBuffer> DIRECT_INSTANCE = new
CodecBufferCodec(true);
- private static final Codec<CodecBuffer> NON_DIRECT_INSTANCE = new
CodecBufferCodec(false);
+ private static final CodecBufferCodec DIRECT_INSTANCE = new
CodecBufferCodec(true);
+ private static final CodecBufferCodec NON_DIRECT_INSTANCE = new
CodecBufferCodec(false);
private final CodecBuffer.Allocator allocator;
- public static Codec<CodecBuffer> get(boolean direct) {
+ public static CodecBufferCodec get(boolean direct) {
return direct ? DIRECT_INSTANCE : NON_DIRECT_INSTANCE;
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
index f83f9b3d10a..a269ebc56b9 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
@@ -17,23 +17,13 @@
package org.apache.hadoop.hdds.utils.db;
-import static org.apache.hadoop.hdds.utils.db.Table.newKeyValue;
-
import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Comparator;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
-import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
import org.apache.hadoop.hdds.utils.db.cache.TableCache;
import org.apache.hadoop.hdds.utils.db.cache.TableCache.CacheType;
import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions;
-import org.apache.hadoop.ozone.util.ClosableIterator;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
import org.apache.ratis.util.UncheckedAutoCloseable;
@@ -180,39 +170,4 @@ DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long
limitCount)
boolean isClosed();
String getSnapshotsParentDir();
-
- /**
- * Creates an iterator that merges multiple tables into a single iterator,
- * grouping values with the same key across the tables.
- *
- * @param <KEY> the type of keys for the tables
- * @param keyComparator the comparator used to compare keys from different
tables
- * @param prefix the prefix used to filter entries of each table
- * @param table one or more tables to merge
- * @return a closable iterator over merged key-value pairs, where each key
corresponds
- * to a collection of values from the tables
- */
- default <KEY> ClosableIterator<KeyValue<KEY, Collection<Object>>>
getMergeIterator(
- Comparator<KEY> keyComparator, KEY prefix, Table<KEY, Object>... table) {
- List<Object> tableValues = IntStream.range(0, table.length).mapToObj(i ->
null).collect(Collectors.toList());
- KeyValue<KEY, Object> defaultNullValue = newKeyValue(null, null);
- Comparator<KeyValue<KEY, Object>> comparator =
Comparator.comparing(KeyValue::getKey, keyComparator);
- return new MinHeapMergeIterator<KeyValue<KEY, Object>,
Table.KeyValueIterator<KEY, Object>,
- KeyValue<KEY, Collection<Object>>>(table.length, comparator) {
- @Override
- protected Table.KeyValueIterator<KEY, Object> getIterator(int idx)
throws IOException {
- return table[idx].iterator(prefix);
- }
-
- @Override
- protected KeyValue<KEY, Collection<Object>> merge(Map<Integer,
KeyValue<KEY, Object>> keysToMerge) {
- KEY key = keysToMerge.values().stream().findAny()
- .orElseThrow(() -> new NoSuchElementException("No keys
found")).getKey();
- for (int i = 0; i < tableValues.size(); i++) {
- tableValues.set(i, keysToMerge.getOrDefault(i,
defaultNullValue).getValue());
- }
- return newKeyValue(key, tableValues);
- }
- };
- }
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
index de181ae0c8d..513c732d302 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java
@@ -49,6 +49,7 @@ public final class RDBBatchOperation implements
BatchOperation {
static final Logger LOG = LoggerFactory.getLogger(RDBBatchOperation.class);
private static final AtomicInteger BATCH_COUNT = new AtomicInteger();
+ private static final CodecBufferCodec DIRECT_CODEC_BUFFER_CODEC =
CodecBufferCodec.get(true);
private final String name = "Batch-" + BATCH_COUNT.getAndIncrement();
@@ -136,76 +137,91 @@ public void close() {
}
private abstract static class Op implements Closeable {
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws
RocksDatabaseException;
+
+ abstract int totalLength();
+
+ boolean closeImpl() {
+ return closed.compareAndSet(false, true);
+ }
+
+ @Override
+ public final void close() {
+ closeImpl();
+ }
+ }
+
+ private abstract static class SingleKeyOp extends Op {
+ private final CodecBuffer keyBuffer;
private final Bytes keyBytes;
- private Op(Bytes keyBytes) {
- this.keyBytes = keyBytes;
+ private SingleKeyOp(CodecBuffer keyBuffer) {
+ this.keyBuffer = Objects.requireNonNull(keyBuffer);
+ this.keyBytes = Bytes.newBytes(keyBuffer);
}
- abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws
RocksDatabaseException;
+ CodecBuffer getKeyBuffer() {
+ return keyBuffer;
+ }
- abstract int keyLen();
+ Bytes getKeyBytes() {
+ return keyBytes;
+ }
+
+ int keyLen() {
+ return getKeyBuffer().readableBytes();
+ }
int valLen() {
return 0;
}
+ @Override
int totalLength() {
return keyLen() + valLen();
}
@Override
- public void close() {
- if (keyBytes != null) {
- keyBytes.close();
+ boolean closeImpl() {
+ if (super.closeImpl()) {
+ IOUtils.close(LOG, keyBuffer, keyBytes);
+ return true;
}
+ return false;
}
}
/**
* Delete operation to be applied to a {@link ColumnFamily} batch.
*/
- private static final class DeleteOp extends Op {
- private final byte[] key;
+ private static final class DeleteOp extends SingleKeyOp {
- private DeleteOp(byte[] key, Bytes keyBytes) {
- super(Objects.requireNonNull(keyBytes, "keyBytes == null"));
- this.key = Objects.requireNonNull(key, "key == null");
+ private DeleteOp(CodecBuffer key) {
+ super(key);
}
@Override
public void apply(ColumnFamily family, ManagedWriteBatch batch) throws
RocksDatabaseException {
- family.batchDelete(batch, this.key);
- }
-
- @Override
- public int keyLen() {
- return key.length;
+ family.batchDelete(batch, this.getKeyBuffer().asReadOnlyByteBuffer());
}
}
/**
* Put operation to be applied to a {@link ColumnFamily} batch using the
CodecBuffer api.
*/
- private final class PutOp extends Op {
- private final CodecBuffer key;
+ private final class PutOp extends SingleKeyOp {
private final CodecBuffer value;
- private final AtomicBoolean closed = new AtomicBoolean(false);
- private PutOp(CodecBuffer key, CodecBuffer value, Bytes keyBytes) {
- super(keyBytes);
- this.key = key;
- this.value = value;
+ private PutOp(CodecBuffer key, CodecBuffer value) {
+ super(Objects.requireNonNull(key, "key == null"));
+ this.value = Objects.requireNonNull(value, "value == null");
}
@Override
public void apply(ColumnFamily family, ManagedWriteBatch batch) throws
RocksDatabaseException {
- family.batchPut(batch, key.asReadOnlyByteBuffer(),
value.asReadOnlyByteBuffer());
- }
-
- @Override
- public int keyLen() {
- return key.readableBytes();
+ family.batchPut(batch, getKeyBuffer().asReadOnlyByteBuffer(),
value.asReadOnlyByteBuffer());
}
@Override
@@ -214,41 +230,12 @@ public int valLen() {
}
@Override
- public void close() {
- if (closed.compareAndSet(false, true)) {
- key.release();
- value.release();
+ boolean closeImpl() {
+ if (super.closeImpl()) {
+ IOUtils.close(LOG, value);
+ return true;
}
- super.close();
- }
- }
-
- /**
- * Put operation to be applied to a {@link ColumnFamily} batch using the
byte array api.
- */
- private static final class ByteArrayPutOp extends Op {
- private final byte[] key;
- private final byte[] value;
-
- private ByteArrayPutOp(byte[] key, byte[] value, Bytes keyBytes) {
- super(keyBytes);
- this.key = Objects.requireNonNull(key, "key == null");
- this.value = Objects.requireNonNull(value, "value == null");
- }
-
- @Override
- public void apply(ColumnFamily family, ManagedWriteBatch batch) throws
RocksDatabaseException {
- family.batchPut(batch, key, value);
- }
-
- @Override
- public int keyLen() {
- return key.length;
- }
-
- @Override
- public int valLen() {
- return value.length;
+ return false;
}
}
@@ -271,7 +258,7 @@ private class FamilyCache {
* It supports operations such as additions and deletions while
maintaining the ability to overwrite
* existing entries when necessary.
*/
- private final Map<Bytes, Op> ops = new HashMap<>();
+ private final Map<Bytes, SingleKeyOp> ops = new HashMap<>();
private boolean isCommit;
private long batchSize;
@@ -312,7 +299,7 @@ void clear() {
}
private void deleteIfExist(Bytes key) {
- final Op previous = ops.remove(key);
+ final SingleKeyOp previous = ops.remove(key);
if (previous != null) {
previous.close();
discardedSize += previous.totalLength();
@@ -322,8 +309,9 @@ private void deleteIfExist(Bytes key) {
}
}
- void overwriteIfExists(Bytes key, Op op) {
+ void overwriteIfExists(SingleKeyOp op) {
Preconditions.checkState(!isCommit, "%s is already committed.", this);
+ Bytes key = op.getKeyBytes();
deleteIfExist(key);
batchSize += op.totalLength();
Op overwritten = ops.put(key, op);
@@ -336,21 +324,12 @@ void overwriteIfExists(Bytes key, Op op) {
void put(CodecBuffer key, CodecBuffer value) {
putCount++;
- // always release the key with the value
- Bytes keyBytes = Bytes.newBytes(key);
- overwriteIfExists(keyBytes, new PutOp(key, value, keyBytes));
- }
-
- void put(byte[] key, byte[] value) {
- putCount++;
- Bytes keyBytes = new Bytes(key);
- overwriteIfExists(keyBytes, new ByteArrayPutOp(key, value, keyBytes));
+ overwriteIfExists(new PutOp(key, value));
}
- void delete(byte[] key) {
+ void delete(CodecBuffer key) {
delCount++;
- Bytes keyBytes = new Bytes(key);
- overwriteIfExists(keyBytes, new DeleteOp(key, keyBytes));
+ overwriteIfExists(new DeleteOp(key));
}
String putString(int keySize, int valueSize) {
@@ -378,14 +357,8 @@ void put(ColumnFamily f, CodecBuffer key, CodecBuffer
value) {
.put(key, value);
}
- void put(ColumnFamily f, byte[] key, byte[] value) {
- name2cache.computeIfAbsent(f.getName(), k -> new FamilyCache(f))
- .put(key, value);
- }
-
- void delete(ColumnFamily family, byte[] key) {
- name2cache.computeIfAbsent(family.getName(), k -> new
FamilyCache(family))
- .delete(key);
+ void delete(ColumnFamily family, CodecBuffer key) {
+ name2cache.computeIfAbsent(family.getName(), k -> new
FamilyCache(family)).delete(key);
}
/** Prepare batch write for the entire cache. */
@@ -461,6 +434,10 @@ public void close() {
}
public void delete(ColumnFamily family, byte[] key) {
+ opCache.delete(family, DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key));
+ }
+
+ public void delete(ColumnFamily family, CodecBuffer key) {
opCache.delete(family, key);
}
@@ -469,6 +446,7 @@ public void put(ColumnFamily family, CodecBuffer key,
CodecBuffer value) {
}
public void put(ColumnFamily family, byte[] key, byte[] value) {
- opCache.put(family, key, value);
+ opCache.put(family, DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key),
+ DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(value));
}
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
index f732735cbe3..2aef5daa3c9 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
@@ -193,6 +193,14 @@ public void deleteRange(byte[] beginKey, byte[] endKey)
throws RocksDatabaseExce
db.deleteRange(family, beginKey, endKey);
}
+ void deleteWithBatch(BatchOperation batch, CodecBuffer key) {
+ if (batch instanceof RDBBatchOperation) {
+ ((RDBBatchOperation) batch).delete(family, key);
+ } else {
+ throw new IllegalArgumentException("Unexpected batch class: " +
batch.getClass().getSimpleName());
+ }
+ }
+
@Override
public void deleteWithBatch(BatchOperation batch, byte[] key) {
if (batch instanceof RDBBatchOperation) {
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
index 659954a861b..5aff9351804 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
@@ -299,7 +299,7 @@ public ColumnFamilyHandle getHandle() {
return handle;
}
- public void batchDelete(ManagedWriteBatch writeBatch, byte[] key)
+ public void batchDelete(ManagedWriteBatch writeBatch, ByteBuffer key)
throws RocksDatabaseException {
try (UncheckedAutoCloseable ignored = acquire()) {
writeBatch.delete(getHandle(), key);
@@ -308,20 +308,6 @@ public void batchDelete(ManagedWriteBatch writeBatch,
byte[] key)
}
}
- public void batchPut(ManagedWriteBatch writeBatch, byte[] key, byte[]
value)
- throws RocksDatabaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("batchPut array key {}", bytes2String(key));
- LOG.debug("batchPut array value {}", bytes2String(value));
- }
-
- try (UncheckedAutoCloseable ignored = acquire()) {
- writeBatch.put(getHandle(), key, value);
- } catch (RocksDBException e) {
- throw toRocksDatabaseException(this, "batchPut key " +
bytes2String(key), e);
- }
- }
-
public void batchPut(ManagedWriteBatch writeBatch, ByteBuffer key,
ByteBuffer value) throws RocksDatabaseException {
if (LOG.isDebugEnabled()) {
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
index 8000d48c618..59e924529ce 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
@@ -377,7 +377,19 @@ public void delete(KEY key) throws RocksDatabaseException,
CodecException {
@Override
public void deleteWithBatch(BatchOperation batch, KEY key) throws
CodecException {
- rawTable.deleteWithBatch(batch, encodeKey(key));
+ if (supportCodecBuffer) {
+ CodecBuffer keyBuffer = null;
+ try {
+ keyBuffer = keyCodec.toDirectCodecBuffer(key);
+ // The buffers will be released after commit.
+ rawTable.deleteWithBatch(batch, keyBuffer);
+ } catch (Exception e) {
+ IOUtils.closeQuietly(keyBuffer);
+ throw e;
+ }
+ } else {
+ rawTable.deleteWithBatch(batch, encodeKey(key));
+ }
}
@Override
diff --git
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java
new file mode 100644
index 00000000000..aee70feaceb
--- /dev/null
+++
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db;
+
+import static org.apache.hadoop.hdds.StringUtils.string2Bytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch;
+import
org.apache.hadoop.hdds.utils.db.managed.TrackingUtilManagedWriteBatchForTesting;
+import
org.apache.hadoop.hdds.utils.db.managed.TrackingUtilManagedWriteBatchForTesting.OpType;
+import
org.apache.hadoop.hdds.utils.db.managed.TrackingUtilManagedWriteBatchForTesting.Operation;
+import org.apache.ratis.util.function.CheckedConsumer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+
+/**
+ * The TestRDBBatchOperation class provides test cases to validate the
functionality of RDB batch operations
+ * in a RocksDB-based backend. It verifies the correct behavior of write
operations using batch processing
+ * and ensures the integrity of operations like put and delete when performed
in batch mode.
+ */
+public class TestRDBBatchOperation {
+
+ static {
+ ManagedRocksObjectUtils.loadRocksDBLibrary();
+ }
+
+ @TempDir
+ private Path tempDir;
+
+ private static Operation getOperation(String key, String value, OpType
opType) {
+ return new Operation(string2Bytes(key), value == null ? null :
string2Bytes(value), opType);
+ }
+
+ @Test
+ public void testBatchOperation() throws RocksDatabaseException,
CodecException, RocksDBException {
+ try (TrackingUtilManagedWriteBatchForTesting writeBatch = new
TrackingUtilManagedWriteBatchForTesting();
+ RDBBatchOperation batchOperation =
RDBBatchOperation.newAtomicOperation(writeBatch)) {
+ ColumnFamilyHandle columnFamilyHandle =
Mockito.mock(ColumnFamilyHandle.class);
+ RocksDatabase.ColumnFamily columnFamily =
Mockito.mock(RocksDatabase.ColumnFamily.class);
+ doAnswer((i) -> {
+ ((ManagedWriteBatch)i.getArgument(0))
+ .put(columnFamilyHandle, (ByteBuffer) i.getArgument(1),
(ByteBuffer) i.getArgument(2));
+ return null;
+ }).when(columnFamily).batchPut(any(ManagedWriteBatch.class),
any(ByteBuffer.class), any(ByteBuffer.class));
+
+ doAnswer((i) -> {
+ ((ManagedWriteBatch)i.getArgument(0))
+ .delete(columnFamilyHandle, (ByteBuffer) i.getArgument(1));
+ return null;
+ }).when(columnFamily).batchDelete(any(ManagedWriteBatch.class),
any(ByteBuffer.class));
+
+ when(columnFamily.getHandle()).thenReturn(columnFamilyHandle);
+ when(columnFamilyHandle.getName()).thenReturn(string2Bytes("test"));
+ when(columnFamily.getName()).thenReturn("test");
+ Codec<String> codec = StringCodec.get();
+ // OP1: This should be skipped in favor of OP9.
+ batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key01"),
codec.toDirectCodecBuffer("value01"));
+ // OP2
+ batchOperation.put(columnFamily, codec.toPersistedFormat("key02"),
codec.toPersistedFormat("value02"));
+ // OP3: This should be skipped in favor of OP4.
+ batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key03"),
codec.toDirectCodecBuffer("value03"));
+ // OP4
+ batchOperation.put(columnFamily, codec.toPersistedFormat("key03"),
codec.toPersistedFormat("value04"));
+ // OP5
+ batchOperation.delete(columnFamily, codec.toDirectCodecBuffer("key05"));
+ // OP6
+ batchOperation.delete(columnFamily, codec.toPersistedFormat("key10"));
+ // OP7
+ batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key04"),
codec.toDirectCodecBuffer("value04"));
+ // OP8
+ batchOperation.put(columnFamily, codec.toPersistedFormat("key06"),
codec.toPersistedFormat("value05"));
+ //OP9
+ batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key01"),
codec.toDirectCodecBuffer("value011"));
+
+
+ RocksDatabase db = Mockito.mock(RocksDatabase.class);
+ doNothing().when(db).batchWrite(any());
+ batchOperation.commit(db);
+ Set<Operation> expectedOps = ImmutableSet.of(
+ getOperation("key01", "value011", OpType.PUT_DIRECT),
+ getOperation("key02", "value02", OpType.PUT_DIRECT),
+ getOperation("key03", "value04", OpType.PUT_DIRECT),
+ getOperation("key05", null, OpType.DELETE_DIRECT),
+ getOperation("key10", null, OpType.DELETE_DIRECT),
+ getOperation("key04", "value04", OpType.PUT_DIRECT),
+ getOperation("key06", "value05", OpType.PUT_DIRECT));
+ assertEquals(Collections.singleton("test"),
writeBatch.getOperations().keySet());
+ assertEquals(expectedOps, new
HashSet<>(writeBatch.getOperations().get("test")));
+ }
+ }
+
+ private DBStore getDBStore(OzoneConfiguration conf, String name, String
tableName) throws RocksDatabaseException {
+ return DBStoreBuilder.newBuilder(conf)
+ .setName(name).setPath(tempDir).addTable(tableName).build();
+ }
+
+ private void performPut(Table<String, String> withBatchTable, BatchOperation
batchOperation,
+ Table<String, String> withoutBatchTable, String key) throws
RocksDatabaseException, CodecException {
+ String value = getRandomString();
+ withBatchTable.putWithBatch(batchOperation, key, value);
+ withoutBatchTable.put(key, value);
+ }
+
+ private void performDelete(Table<String, String> withBatchTable,
BatchOperation batchOperation,
+ Table<String, String> withoutBatchTable, String key) throws
RocksDatabaseException, CodecException {
+ withBatchTable.deleteWithBatch(batchOperation, key);
+ withoutBatchTable.delete(key);
+ }
+
+ private String getRandomString() {
+ int length = ThreadLocalRandom.current().nextInt(1, 1024);
+ return RandomStringUtils.insecure().next(length);
+ }
+
+ private void performOpWithRandomKey(CheckedConsumer<String, IOException> op,
Set<String> keySet,
+ List<String> keyList) throws IOException {
+ String key = getRandomString();
+ op.accept(key);
+ if (!keySet.contains(key)) {
+ keyList.add(key);
+ keySet.add(key);
+ }
+ }
+
+ private void performOpWithRandomPreExistingKey(CheckedConsumer<String,
IOException> op, List<String> keyList)
+ throws IOException {
+ int randomIndex = ThreadLocalRandom.current().nextInt(0, keyList.size());
+ op.accept(keyList.get(randomIndex));
+ }
+
+ @Test
+ public void testRDBBatchOperationWithRDB() throws IOException {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ String tableName = "test";
+ try (DBStore dbStoreWithBatch = getDBStore(conf, "WithBatch.db",
tableName);
+ DBStore dbStoreWithoutBatch = getDBStore(conf, "WithoutBatch.db",
tableName)) {
+ try (BatchOperation batchOperation =
dbStoreWithBatch.initBatchOperation()) {
+ Table<String, String> withBatchTable =
dbStoreWithBatch.getTable(tableName,
+ StringCodec.get(), StringCodec.get());
+ Table<String, String> withoutBatchTable =
dbStoreWithoutBatch.getTable(tableName,
+ StringCodec.get(), StringCodec.get());
+ List<String> keyList = new ArrayList<>();
+ Set<String> keySet = new HashSet<>();
+ List<CheckedConsumer<String, IOException>> ops = Arrays.asList(
+ (key) -> performPut(withBatchTable, batchOperation,
withoutBatchTable, key),
+ (key) -> performDelete(withBatchTable, batchOperation,
withoutBatchTable, key));
+ for (int i = 0; i < 30000; i++) {
+ CheckedConsumer<String, IOException> op =
ops.get(ThreadLocalRandom.current().nextInt(ops.size()));
+ boolean performWithPreExistingKey =
ThreadLocalRandom.current().nextBoolean();
+ if (performWithPreExistingKey && !keyList.isEmpty()) {
+ performOpWithRandomPreExistingKey(op, keyList);
+ } else {
+ performOpWithRandomKey(op, keySet, keyList);
+ }
+ }
+ dbStoreWithBatch.commitBatchOperation(batchOperation);
+ }
+ Table<CodecBuffer, CodecBuffer> withBatchTable =
dbStoreWithBatch.getTable(tableName,
+ CodecBufferCodec.get(true), CodecBufferCodec.get(true));
+ Table<CodecBuffer, CodecBuffer> withoutBatchTable =
dbStoreWithoutBatch.getTable(tableName,
+ CodecBufferCodec.get(true), CodecBufferCodec.get(true));
+ try (Table.KeyValueIterator<CodecBuffer, CodecBuffer> itr1 =
withBatchTable.iterator();
+ Table.KeyValueIterator<CodecBuffer, CodecBuffer> itr2 =
withoutBatchTable.iterator();) {
+ while (itr1.hasNext() || itr2.hasNext()) {
+ assertEquals(itr1.hasNext(), itr2.hasNext(), "Expected same number
of entries");
+ KeyValue<CodecBuffer, CodecBuffer> kv1 = itr1.next();
+ KeyValue<CodecBuffer, CodecBuffer> kv2 = itr2.next();
+ assertEquals(kv1.getKey().asReadOnlyByteBuffer(),
kv2.getKey().asReadOnlyByteBuffer(),
+ "Expected same keys");
+ assertEquals(kv1.getValue().asReadOnlyByteBuffer(),
kv2.getValue().asReadOnlyByteBuffer(),
+ "Expected same keys");
+ }
+ }
+ }
+ }
+}
diff --git a/hadoop-hdds/managed-rocksdb/pom.xml
b/hadoop-hdds/managed-rocksdb/pom.xml
index 5e6976500f9..1a1fb3a82be 100644
--- a/hadoop-hdds/managed-rocksdb/pom.xml
+++ b/hadoop-hdds/managed-rocksdb/pom.xml
@@ -25,11 +25,6 @@
<name>Apache Ozone HDDS Managed RocksDB</name>
<description>Apache Ozone Managed RocksDB library</description>
- <properties>
- <!-- no tests in this module so far -->
- <maven.test.skip>true</maven.test.skip>
- </properties>
-
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
@@ -63,6 +58,11 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
@@ -74,6 +74,18 @@
<proc>none</proc>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>test-jar</id>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
diff --git
a/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TrackingUtilManagedWriteBatchForTesting.java
b/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TrackingUtilManagedWriteBatchForTesting.java
new file mode 100644
index 00000000000..1c9241c1d9f
--- /dev/null
+++
b/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TrackingUtilManagedWriteBatchForTesting.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db.managed;
+
+import static org.apache.hadoop.hdds.StringUtils.bytes2String;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+
+/**
+ * The TrackingUtilManagedWriteBatch class extends ManagedWriteBatch to
provide functionality
+ * for tracking operations in a managed write batch context. Operations such
as put, delete,
+ * merge, and delete range are managed and tracked, along with their
corresponding operation types.
+ *
+ * This class supports direct and indirect operation types, delineated in the
OpType enumeration.
+ * Direct operations are created using ByteBuffers while indirect operations
are created using
+ * byte arrays.
+ */
+public class TrackingUtilManagedWriteBatchForTesting extends ManagedWriteBatch
{
+
+ private final Map<String, List<Operation>> operations = new HashMap<>();
+
+ /**
+ * The OpType enumeration defines the different types of operations
performed in a batch.
+ */
+ public enum OpType {
+ PUT_DIRECT,
+ DELETE_DIRECT,
+ MERGE_DIRECT,
+ DELETE_RANGE_INDIRECT,
+ PUT_NON_DIRECT,
+ DELETE_NON_DIRECT,
+ MERGE_NON_DIRECT,
+ }
+
+ /**
+ * The Operation class represents an individual operation to be performed in
the context of
+ * a batch operation, such as a database write, delete, or merge. Each
operation is characterized
+ * by a key, value, and an operation type (OpType).
+ *
+ * Operations can be of different types, as defined in the OpType
enumeration, which include
+ * actions such as put, delete, merge, and delete range, either direct or
indirect.
+ */
+ public static class Operation {
+ private final byte[] key;
+ private final byte[] value;
+ private final OpType opType;
+
+ public Operation(byte[] key, byte[] value, OpType opType) {
+ this.key = Arrays.copyOf(key, key.length);
+ this.value = value == null ? null : Arrays.copyOf(value, value.length);
+ this.opType = opType;
+ }
+
+ public Operation(byte[] key, OpType opType) {
+ this(key, null, opType);
+ }
+
+ @Override
+ public final boolean equals(Object o) {
+ if (!(o instanceof Operation)) {
+ return false;
+ }
+
+ Operation operation = (Operation) o;
+ return Arrays.equals(key, operation.key) && Arrays.equals(value,
operation.value) &&
+ opType == operation.opType;
+ }
+
+ @Override
+ public final int hashCode() {
+ return Arrays.hashCode(key) + Arrays.hashCode(value) + opType.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "Operation{" +
+ "key=" + bytes2String(key) +
+ ", value=" + (value == null ? null : bytes2String(value)) +
+ ", opType=" + opType +
+ '}';
+ }
+ }
+
+ public Map<String, List<Operation>> getOperations() {
+ return operations;
+ }
+
+ public TrackingUtilManagedWriteBatchForTesting() {
+ super();
+ }
+
+ private byte[] convert(ByteBuffer buffer) {
+ byte[] bytes = new byte[buffer.remaining()];
+ buffer.get(bytes);
+ return bytes;
+ }
+
+ @Override
+ public void delete(ColumnFamilyHandle columnFamilyHandle, byte[] key) throws
RocksDBException {
+ operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k
-> new ArrayList<>())
+ .add(new Operation(key, OpType.DELETE_NON_DIRECT));
+ }
+
+ @Override
+ public void delete(ColumnFamilyHandle columnFamilyHandle, ByteBuffer key)
throws RocksDBException {
+ operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k
-> new ArrayList<>())
+ .add(new Operation(convert(key), OpType.DELETE_DIRECT));
+ }
+
+ @Override
+ public void delete(byte[] key) throws RocksDBException {
+ operations.computeIfAbsent("", k -> new ArrayList<>()).add(new
Operation(key, OpType.DELETE_NON_DIRECT));
+ }
+
+ @Override
+ public void delete(ByteBuffer key) throws RocksDBException {
+ operations.computeIfAbsent("", k -> new ArrayList<>())
+ .add(new Operation(convert(key), OpType.DELETE_DIRECT));
+ }
+
+ @Override
+ public void deleteRange(byte[] beginKey, byte[] endKey) {
+ operations.computeIfAbsent("", k -> new ArrayList<>())
+ .add(new Operation(beginKey, endKey, OpType.DELETE_RANGE_INDIRECT));
+ }
+
+ @Override
+ public void deleteRange(ColumnFamilyHandle columnFamilyHandle, byte[]
beginKey, byte[] endKey)
+ throws RocksDBException {
+ operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k
-> new ArrayList<>())
+ .add(new Operation(beginKey, endKey, OpType.DELETE_RANGE_INDIRECT));
+ }
+
+ @Override
+ public void merge(ColumnFamilyHandle columnFamilyHandle, byte[] key, byte[]
value) throws RocksDBException {
+ operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k
-> new ArrayList<>())
+ .add(new Operation(key, value, OpType.MERGE_NON_DIRECT));
+ }
+
+ @Override
+ public void merge(byte[] key, byte[] value) {
+ operations.computeIfAbsent("", k -> new ArrayList<>())
+ .add(new Operation(key, value, OpType.MERGE_NON_DIRECT));
+ }
+
+ @Override
+ public void put(ColumnFamilyHandle columnFamilyHandle, byte[] key, byte[]
value) throws RocksDBException {
+ operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k
-> new ArrayList<>())
+ .add(new Operation(key, value, OpType.PUT_NON_DIRECT));
+ }
+
+ @Override
+ public void put(ColumnFamilyHandle columnFamilyHandle, ByteBuffer key,
ByteBuffer value) throws RocksDBException {
+ operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k
-> new ArrayList<>())
+ .add(new Operation(convert(key), convert(value), OpType.PUT_DIRECT));
+ }
+
+ @Override
+ public void put(byte[] key, byte[] value) throws RocksDBException {
+ operations.computeIfAbsent("", k -> new ArrayList<>()).add(new
Operation(key, value, OpType.PUT_NON_DIRECT));
+ }
+
+ @Override
+ public void put(ByteBuffer key, ByteBuffer value) throws RocksDBException {
+ operations.computeIfAbsent("", k -> new ArrayList<>())
+ .add(new Operation(convert(key), convert(value), OpType.PUT_DIRECT));
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ }
+}
diff --git a/pom.xml b/pom.xml
index 64192a0f072..4bcaadf6acc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1069,6 +1069,12 @@
<artifactId>hdds-managed-rocksdb</artifactId>
<version>${hdds.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.ozone</groupId>
+ <artifactId>hdds-managed-rocksdb</artifactId>
+ <version>${hdds.version}</version>
+ <type>test-jar</type>
+ </dependency>
<dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>hdds-rocks-native</artifactId>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]