This is an automated email from the ASF dual-hosted git repository.
swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 90994bb75f7 HDDS-14330. MinHeapMergeIterator should use key comparator
while popping out entries from the heap (#9576)
90994bb75f7 is described below
commit 90994bb75f771f2023f9697430678ff05e3cc957
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Fri Jan 2 09:03:57 2026 -0500
HDDS-14330. MinHeapMergeIterator should use key comparator while popping
out entries from the heap (#9576)
---
.../org/apache/hadoop/hdds/utils/db/DBStore.java | 2 +-
.../hadoop/hdds/utils/db/MinHeapMergeIterator.java | 4 +-
.../hdds/utils/db/TestMinHeapMergeIterator.java | 73 ++++++++++++----------
3 files changed, 43 insertions(+), 36 deletions(-)
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
index 298260efdf9..f83f9b3d10a 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
@@ -198,7 +198,7 @@ default <KEY> ClosableIterator<KeyValue<KEY,
Collection<Object>>> getMergeIterat
KeyValue<KEY, Object> defaultNullValue = newKeyValue(null, null);
Comparator<KeyValue<KEY, Object>> comparator =
Comparator.comparing(KeyValue::getKey, keyComparator);
return new MinHeapMergeIterator<KeyValue<KEY, Object>,
Table.KeyValueIterator<KEY, Object>,
- KeyValue<KEY, Collection<Object>>>(table.length + 1, comparator) {
+ KeyValue<KEY, Collection<Object>>>(table.length, comparator) {
@Override
protected Table.KeyValueIterator<KEY, Object> getIterator(int idx)
throws IOException {
return table[idx].iterator(prefix);
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/MinHeapMergeIterator.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/MinHeapMergeIterator.java
index 8326b9e9ca1..1536619d51c 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/MinHeapMergeIterator.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/MinHeapMergeIterator.java
@@ -55,7 +55,7 @@ public MinHeapMergeIterator(int numberOfIterators,
Comparator<K> comparator) {
keys = new HashMap<>(numberOfIterators);
iterators = IntStream.range(0, numberOfIterators).mapToObj(i -> (I)
null).collect(Collectors.toList());
this.initialized = false;
- this.comparator = comparator;
+ this.comparator = Objects.requireNonNull(comparator, "comparator cannot be
null");
}
protected abstract I getIterator(int idx) throws IOException;
@@ -109,7 +109,7 @@ public V next() {
// Clear the keys list by setting all entries to null.
keys.clear();
// Advance all entries with the same key (from different files)
- while (!minHeap.isEmpty() &&
Objects.equals(minHeap.peek().getCurrentKey(), currentKey)) {
+ while (!minHeap.isEmpty() &&
comparator.compare(minHeap.peek().getCurrentKey(), currentKey) == 0) {
HeapEntry<K> entry = minHeap.poll();
int idx = entry.index;
// Set the key for the current entry in the keys list.
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/hadoop/hdds/utils/db/TestMinHeapMergeIterator.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/hadoop/hdds/utils/db/TestMinHeapMergeIterator.java
index 1ac177c646b..d79fab7796c 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/hadoop/hdds/utils/db/TestMinHeapMergeIterator.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/hadoop/hdds/utils/db/TestMinHeapMergeIterator.java
@@ -26,10 +26,12 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.primitives.UnsignedBytes;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
@@ -38,6 +40,7 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
+import org.apache.hadoop.hdds.StringUtils;
import org.junit.jupiter.api.Test;
/**
@@ -45,7 +48,7 @@
*/
class TestMinHeapMergeIterator {
- private static final Comparator<String> STRING_COMPARATOR =
String::compareTo;
+ private static final Comparator<byte[]> BYTE_COMPARATOR =
UnsignedBytes.lexicographicalComparator();
/**
* A closeable iterator which tracks close() calls.
@@ -87,8 +90,8 @@ private static final class MergeResult {
private final String key;
private final Set<Integer> sources;
- private MergeResult(String key, Set<Integer> sources) {
- this.key = key;
+ private MergeResult(byte[] key, Set<Integer> sources) {
+ this.key = StringUtils.bytes2String(key);
this.sources = sources;
}
@@ -104,17 +107,17 @@ Set<Integer> getSources() {
/**
* Concrete implementation for tests.
*/
- private static final class TestIterator extends MinHeapMergeIterator<String,
- TrackingCloseableIterator<String>, MergeResult> {
+ private static final class TestIterator extends MinHeapMergeIterator<byte[],
+ TrackingCloseableIterator<byte[]>, MergeResult> {
- private final List<TrackingCloseableIterator<String>> itrs;
+ private final List<TrackingCloseableIterator<byte[]>> itrs;
private final List<MergeResult> merged = new ArrayList<>();
private IOException ioExceptionAtIndex;
private int exceptionIndex = -1;
- private TestIterator(List<TrackingCloseableIterator<String>> itrs) {
- super(itrs.size(), STRING_COMPARATOR);
+ private TestIterator(List<TrackingCloseableIterator<byte[]>> itrs) {
+ super(itrs.size(), BYTE_COMPARATOR);
this.itrs = itrs;
}
@@ -125,7 +128,7 @@ private TestIterator withGetIteratorIOException(int index,
IOException ex) {
}
@Override
- protected TrackingCloseableIterator<String> getIterator(int idx)
+ protected TrackingCloseableIterator<byte[]> getIterator(int idx)
throws IOException {
if (idx == exceptionIndex) {
if (ioExceptionAtIndex != null) {
@@ -136,9 +139,9 @@ protected TrackingCloseableIterator<String> getIterator(int
idx)
}
@Override
- protected MergeResult merge(Map<Integer, String> keysToMerge) {
+ protected MergeResult merge(Map<Integer, byte[]> keysToMerge) {
// All values in keysToMerge are expected to be equal (same key across
iterators).
- String key = keysToMerge.values().iterator().next();
+ byte[] key = keysToMerge.values().iterator().next();
MergeResult r = new MergeResult(key, new
HashSet<>(keysToMerge.keySet()));
merged.add(r);
return r;
@@ -149,14 +152,18 @@ List<MergeResult> getMerged() {
}
}
+ private ImmutableList<byte[]> toBytesList(String... keys) {
+ return
Arrays.stream(keys).map(StringUtils::string2Bytes).collect(ImmutableList.toImmutableList());
+ }
+
@Test
void testMergedOrderAndDuplicateGroupingAndAutoCloseOnExhaustion() {
- TrackingCloseableIterator<String> itr0 =
- new TrackingCloseableIterator<>(ImmutableList.of("a", "c", "e", "g"));
- TrackingCloseableIterator<String> itr1 =
- new TrackingCloseableIterator<>(ImmutableList.of("b", "c", "d", "g",
"h"));
- TrackingCloseableIterator<String> itr2 =
- new TrackingCloseableIterator<>(ImmutableList.of("c", "e", "f", "h"));
+ TrackingCloseableIterator<byte[]> itr0 =
+ new TrackingCloseableIterator<>(toBytesList("a", "c", "e", "g"));
+ TrackingCloseableIterator<byte[]> itr1 =
+ new TrackingCloseableIterator<>(toBytesList("b", "c", "d", "g", "h"));
+ TrackingCloseableIterator<byte[]> itr2 =
+ new TrackingCloseableIterator<>(toBytesList("c", "e", "f", "h"));
List<String> keys = new ArrayList<>();
try (TestIterator mergeItr = new TestIterator(ImmutableList.of(itr0, itr1,
itr2))) {
@@ -195,10 +202,10 @@ void
testMergedOrderAndDuplicateGroupingAndAutoCloseOnExhaustion() {
@Test
void testInitClosesEmptyIterators() {
- TrackingCloseableIterator<String> empty =
+ TrackingCloseableIterator<byte[]> empty =
new TrackingCloseableIterator<>(Collections.emptyList());
- TrackingCloseableIterator<String> nonEmpty =
- new TrackingCloseableIterator<>(ImmutableList.of("a"));
+ TrackingCloseableIterator<byte[]> nonEmpty =
+ new TrackingCloseableIterator<>(toBytesList("a"));
try (TestIterator mergeItr = new TestIterator(ImmutableList.of(empty,
nonEmpty))) {
assertTrue(mergeItr.hasNext()); // triggers init
@@ -212,10 +219,10 @@ void testInitClosesEmptyIterators() {
@Test
void testCloseClosesAllIterators() {
- TrackingCloseableIterator<String> itr0 =
- new TrackingCloseableIterator<>(ImmutableList.of("a", "c"));
- TrackingCloseableIterator<String> itr1 =
- new TrackingCloseableIterator<>(ImmutableList.of("b", "d"));
+ TrackingCloseableIterator<byte[]> itr0 =
+ new TrackingCloseableIterator<>(toBytesList("a", "c"));
+ TrackingCloseableIterator<byte[]> itr1 =
+ new TrackingCloseableIterator<>(toBytesList("b", "d"));
try (TestIterator mergeItr = new TestIterator(ImmutableList.of(itr0,
itr1))) {
assertTrue(mergeItr.hasNext()); // triggers init
@@ -233,10 +240,10 @@ void testCloseClosesAllIterators() {
@Test
void testHasNextWrapsIOExceptionFromGetIterator() {
IOException expected = new IOException("boom");
- TrackingCloseableIterator<String> itr0 =
- new TrackingCloseableIterator<>(ImmutableList.of("a"));
- TrackingCloseableIterator<String> itr1 =
- new TrackingCloseableIterator<>(ImmutableList.of("b"));
+ TrackingCloseableIterator<byte[]> itr0 =
+ new TrackingCloseableIterator<>(toBytesList("a"));
+ TrackingCloseableIterator<byte[]> itr1 =
+ new TrackingCloseableIterator<>(toBytesList("b"));
TestIterator mergeItr = new TestIterator(ImmutableList.of(itr0, itr1));
mergeItr.withGetIteratorIOException(1, expected);
try (TestIterator ignored = mergeItr) {
@@ -253,10 +260,10 @@ void testHasNextWrapsIOExceptionFromGetIterator() {
@Test
void
testHasNextWrapsRocksDBExceptionFromGetIteratorAndClosesOpenedIterators()
throws Exception {
- TrackingCloseableIterator<String> itr0 =
- new TrackingCloseableIterator<>(ImmutableList.of("a", "b"));
- TrackingCloseableIterator<String> itr1 =
- new TrackingCloseableIterator<>(ImmutableList.of("c"));
+ TrackingCloseableIterator<byte[]> itr0 =
+ new TrackingCloseableIterator<>(toBytesList("a", "b"));
+ TrackingCloseableIterator<byte[]> itr1 =
+ new TrackingCloseableIterator<>(toBytesList("c"));
RocksDatabaseException rdbEx = new RocksDatabaseException("rocks");
TestIterator mergeItr = new TestIterator(ImmutableList.of(itr0, itr1));
mergeItr.withGetIteratorIOException(1, rdbEx);
@@ -275,7 +282,7 @@ void
testHasNextWrapsRocksDBExceptionFromGetIteratorAndClosesOpenedIterators() t
@Test
void testNextWhenEmptyThrowsNoSuchElement() {
- TrackingCloseableIterator<String> empty =
+ TrackingCloseableIterator<byte[]> empty =
new TrackingCloseableIterator<>(Collections.emptyList());
try (TestIterator mergeItr = new TestIterator(ImmutableList.of(empty))) {
assertFalse(mergeItr.hasNext());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]