jojochuang commented on code in PR #9409:
URL: https://github.com/apache/ozone/pull/9409#discussion_r2608430243
##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ClosableIterator.java:
##########
@@ -17,12 +17,13 @@
package org.apache.hadoop.ozone.util;
+import java.io.Closeable;
import java.util.Iterator;
/**
* An {@link Iterator} that may hold resources until it is closed.
*/
-public interface ClosableIterator<E> extends Iterator<E>, AutoCloseable {
+public interface ClosableIterator<E> extends Iterator<E>, Closeable {
Review Comment:
This change is acceptable. Closeable is a subclass of AutoCloseable, and the
only difference is it throws IOException instead of Exception. This is not a
public API.
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/MinHeapMergeIterator.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ozone.rocksdb.util;
+
+import jakarta.annotation.Nonnull;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
+import org.apache.hadoop.ozone.util.ClosableIterator;
+import org.rocksdb.RocksDBException;
+
+/**
+ * An abstract class that provides functionality to merge elements from
multiple sorted iterators
+ * using a min-heap. The {@code MinHeapMergeIterator} ensures the merged
output is in sorted order
+ * by repeatedly polling the smallest element from the heap of iterators.
+ *
+ * @param <K> the type of keys being merged, must be {@link Comparable}
+ * @param <I> the type of iterators being used, must extend {@link Iterator}
and implement {@link Closeable}
+ * @param <V> the type of the final merged output
+ */
+public abstract class MinHeapMergeIterator<K, I extends Iterator<K> &
Closeable, V>
+ implements ClosableIterator<V> {
+ private final PriorityQueue<HeapEntry<K>> minHeap;
+ private final Map<Integer, K> keys;
+ private final List<I> iterators;
+ private boolean initialized;
+ private final Comparator<K> comparator;
+
+ public MinHeapMergeIterator(int numberOfIterators, Comparator<K> comparator)
{
+ this.minHeap = new PriorityQueue<>(Math.max(numberOfIterators, 1));
+ keys = new HashMap<>(numberOfIterators);
+ iterators = IntStream.range(0, numberOfIterators).mapToObj(i -> (I)
null).collect(Collectors.toList());
+ this.initialized = false;
+ this.comparator = comparator;
+ }
+
+ protected abstract I getIterator(int idx) throws RocksDBException,
IOException;
+
+ private boolean initHeap() throws IOException, RocksDBException {
+ if (initialized) {
+ return false;
+ }
+ initialized = true;
+ int count = 0;
+ try {
+ for (int idx = 0; idx < iterators.size(); idx++) {
+ I itr = getIterator(idx);
+ iterators.set(idx, itr);
+ HeapEntry<K> entry = new HeapEntry<>(idx, itr, comparator);
+ if (entry.getCurrentKey() != null) {
+ minHeap.add(entry);
+ count++;
+ } else {
+ // No valid entries, close the iterator.
+ closeItrAtIndex(idx);
+ }
+ }
+ } catch (IOException | RocksDBException e) {
+ close();
+ throw e;
+ }
+ return count > 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ return !minHeap.isEmpty() || initHeap();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ } catch (RocksDBException e) {
+ throw new UncheckedIOException(new RocksDatabaseException("Error while
initializing iterator ", e));
+ }
+ }
+
+ protected abstract V merge(Map<Integer, K> keysToMerge);
+
+ @Override
+ public V next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException("No more elements found.");
+ }
+
+ assert minHeap.peek() != null;
+ // Get current key from heap
+ K currentKey = minHeap.peek().getCurrentKey();
+ // Clear the keys list by setting all entries to null.
+ keys.clear();
+ // Advance all entries with the same key (from different files)
+ while (!minHeap.isEmpty() &&
Objects.equals(minHeap.peek().getCurrentKey(), currentKey)) {
+ HeapEntry<K> entry = minHeap.poll();
+ int idx = entry.index;
+ // Set the key for the current entry in the keys list.
+ keys.put(idx, entry.getCurrentKey());
+ if (entry.advance()) {
+ minHeap.offer(entry);
+ } else {
+ // Iterator is exhausted, close it to prevent resource leak
+ try {
+ closeItrAtIndex(idx);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ }
+ return merge(keys);
+ }
+
+ private void closeItrAtIndex(int idx) throws IOException {
+ if (iterators.get(idx) != null) {
+ iterators.get(idx).close();
+ iterators.set(idx, null);
+ }
+ }
+
+ @Override
+ public void close() {
+ IOException exception = null;
+ for (int idx = 0; idx < iterators.size(); idx++) {
+ try {
+ closeItrAtIndex(idx);
+ } catch (IOException e) {
+ exception = e;
+ }
+ }
+ if (exception != null) {
+ throw new UncheckedIOException(exception);
+ }
+ }
+
+ /**
+ * A wrapper class that holds an iterator and its current value for heap
operations.
+ */
+ private static final class HeapEntry<T> implements Comparable<HeapEntry<T>> {
Review Comment:
this is moved from SstFileSetRead.HeapEntry, with index and comparator.
index is not used.
comparator is customizable.
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/MinHeapMergeIterator.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ozone.rocksdb.util;
+
+import jakarta.annotation.Nonnull;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
+import org.apache.hadoop.ozone.util.ClosableIterator;
+import org.rocksdb.RocksDBException;
+
+/**
+ * An abstract class that provides functionality to merge elements from
multiple sorted iterators
+ * using a min-heap. The {@code MinHeapMergeIterator} ensures the merged
output is in sorted order
+ * by repeatedly polling the smallest element from the heap of iterators.
+ *
+ * @param <K> the type of keys being merged, must be {@link Comparable}
+ * @param <I> the type of iterators being used, must extend {@link Iterator}
and implement {@link Closeable}
+ * @param <V> the type of the final merged output
+ */
+public abstract class MinHeapMergeIterator<K, I extends Iterator<K> &
Closeable, V>
+ implements ClosableIterator<V> {
+ private final PriorityQueue<HeapEntry<K>> minHeap;
+ private final Map<Integer, K> keys;
+ private final List<I> iterators;
+ private boolean initialized;
+ private final Comparator<K> comparator;
+
+ public MinHeapMergeIterator(int numberOfIterators, Comparator<K> comparator)
{
+ this.minHeap = new PriorityQueue<>(Math.max(numberOfIterators, 1));
+ keys = new HashMap<>(numberOfIterators);
+ iterators = IntStream.range(0, numberOfIterators).mapToObj(i -> (I)
null).collect(Collectors.toList());
+ this.initialized = false;
+ this.comparator = comparator;
+ }
+
+ protected abstract I getIterator(int idx) throws RocksDBException,
IOException;
+
+ private boolean initHeap() throws IOException, RocksDBException {
Review Comment:
moved from SstFileSetReader.MultipleSstFileIterator.initMeanHeap()
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/MinHeapMergeIterator.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ozone.rocksdb.util;
+
+import jakarta.annotation.Nonnull;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
+import org.apache.hadoop.ozone.util.ClosableIterator;
+import org.rocksdb.RocksDBException;
+
+/**
+ * An abstract class that provides functionality to merge elements from
multiple sorted iterators
+ * using a min-heap. The {@code MinHeapMergeIterator} ensures the merged
output is in sorted order
+ * by repeatedly polling the smallest element from the heap of iterators.
+ *
+ * @param <K> the type of keys being merged, must be {@link Comparable}
+ * @param <I> the type of iterators being used, must extend {@link Iterator}
and implement {@link Closeable}
+ * @param <V> the type of the final merged output
+ */
+public abstract class MinHeapMergeIterator<K, I extends Iterator<K> &
Closeable, V>
Review Comment:
Encapsulates several methods from SstFileSetReader.MultipleSstFileIterator,
to be used by DBStore.
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/MinHeapMergeIterator.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ozone.rocksdb.util;
+
+import jakarta.annotation.Nonnull;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
+import org.apache.hadoop.ozone.util.ClosableIterator;
+import org.rocksdb.RocksDBException;
+
+/**
+ * An abstract class that provides functionality to merge elements from
multiple sorted iterators
+ * using a min-heap. The {@code MinHeapMergeIterator} ensures the merged
output is in sorted order
+ * by repeatedly polling the smallest element from the heap of iterators.
+ *
+ * @param <K> the type of keys being merged, must be {@link Comparable}
+ * @param <I> the type of iterators being used, must extend {@link Iterator}
and implement {@link Closeable}
+ * @param <V> the type of the final merged output
+ */
+public abstract class MinHeapMergeIterator<K, I extends Iterator<K> &
Closeable, V>
+ implements ClosableIterator<V> {
+ private final PriorityQueue<HeapEntry<K>> minHeap;
+ private final Map<Integer, K> keys;
+ private final List<I> iterators;
+ private boolean initialized;
+ private final Comparator<K> comparator;
+
+ public MinHeapMergeIterator(int numberOfIterators, Comparator<K> comparator)
{
+ this.minHeap = new PriorityQueue<>(Math.max(numberOfIterators, 1));
+ keys = new HashMap<>(numberOfIterators);
+ iterators = IntStream.range(0, numberOfIterators).mapToObj(i -> (I)
null).collect(Collectors.toList());
+ this.initialized = false;
+ this.comparator = comparator;
+ }
+
+ protected abstract I getIterator(int idx) throws RocksDBException,
IOException;
+
+ private boolean initHeap() throws IOException, RocksDBException {
+ if (initialized) {
+ return false;
+ }
+ initialized = true;
+ int count = 0;
+ try {
+ for (int idx = 0; idx < iterators.size(); idx++) {
+ I itr = getIterator(idx);
+ iterators.set(idx, itr);
+ HeapEntry<K> entry = new HeapEntry<>(idx, itr, comparator);
+ if (entry.getCurrentKey() != null) {
+ minHeap.add(entry);
+ count++;
+ } else {
+ // No valid entries, close the iterator.
+ closeItrAtIndex(idx);
+ }
+ }
+ } catch (IOException | RocksDBException e) {
+ close();
+ throw e;
+ }
+ return count > 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ return !minHeap.isEmpty() || initHeap();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ } catch (RocksDBException e) {
+ throw new UncheckedIOException(new RocksDatabaseException("Error while
initializing iterator ", e));
+ }
+ }
+
+ protected abstract V merge(Map<Integer, K> keysToMerge);
+
+ @Override
+ public V next() {
Review Comment:
moved from SstFileSetReader.MultipleSstFileIterator.next(), except that it
also manages keys map.
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/MinHeapMergeIterator.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ozone.rocksdb.util;
+
+import jakarta.annotation.Nonnull;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
+import org.apache.hadoop.ozone.util.ClosableIterator;
+import org.rocksdb.RocksDBException;
+
+/**
+ * An abstract class that provides functionality to merge elements from
multiple sorted iterators
+ * using a min-heap. The {@code MinHeapMergeIterator} ensures the merged
output is in sorted order
+ * by repeatedly polling the smallest element from the heap of iterators.
+ *
+ * @param <K> the type of keys being merged, must be {@link Comparable}
+ * @param <I> the type of iterators being used, must extend {@link Iterator}
and implement {@link Closeable}
+ * @param <V> the type of the final merged output
+ */
+public abstract class MinHeapMergeIterator<K, I extends Iterator<K> &
Closeable, V>
+ implements ClosableIterator<V> {
+ private final PriorityQueue<HeapEntry<K>> minHeap;
+ private final Map<Integer, K> keys;
+ private final List<I> iterators;
+ private boolean initialized;
+ private final Comparator<K> comparator;
+
+ public MinHeapMergeIterator(int numberOfIterators, Comparator<K> comparator)
{
+ this.minHeap = new PriorityQueue<>(Math.max(numberOfIterators, 1));
+ keys = new HashMap<>(numberOfIterators);
+ iterators = IntStream.range(0, numberOfIterators).mapToObj(i -> (I)
null).collect(Collectors.toList());
+ this.initialized = false;
+ this.comparator = comparator;
+ }
+
+ protected abstract I getIterator(int idx) throws RocksDBException,
IOException;
+
+ private boolean initHeap() throws IOException, RocksDBException {
+ if (initialized) {
+ return false;
+ }
+ initialized = true;
+ int count = 0;
+ try {
+ for (int idx = 0; idx < iterators.size(); idx++) {
+ I itr = getIterator(idx);
+ iterators.set(idx, itr);
+ HeapEntry<K> entry = new HeapEntry<>(idx, itr, comparator);
+ if (entry.getCurrentKey() != null) {
+ minHeap.add(entry);
+ count++;
+ } else {
+ // No valid entries, close the iterator.
+ closeItrAtIndex(idx);
+ }
+ }
+ } catch (IOException | RocksDBException e) {
+ close();
+ throw e;
+ }
+ return count > 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ return !minHeap.isEmpty() || initHeap();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ } catch (RocksDBException e) {
+ throw new UncheckedIOException(new RocksDatabaseException("Error while
initializing iterator ", e));
+ }
+ }
+
+ protected abstract V merge(Map<Integer, K> keysToMerge);
+
+ @Override
+ public V next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException("No more elements found.");
+ }
+
+ assert minHeap.peek() != null;
+ // Get current key from heap
+ K currentKey = minHeap.peek().getCurrentKey();
+ // Clear the keys list by setting all entries to null.
+ keys.clear();
+ // Advance all entries with the same key (from different files)
+ while (!minHeap.isEmpty() &&
Objects.equals(minHeap.peek().getCurrentKey(), currentKey)) {
+ HeapEntry<K> entry = minHeap.poll();
+ int idx = entry.index;
+ // Set the key for the current entry in the keys list.
+ keys.put(idx, entry.getCurrentKey());
+ if (entry.advance()) {
+ minHeap.offer(entry);
+ } else {
+ // Iterator is exhausted, close it to prevent resource leak
+ try {
+ closeItrAtIndex(idx);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ }
+ return merge(keys);
+ }
+
+ private void closeItrAtIndex(int idx) throws IOException {
+ if (iterators.get(idx) != null) {
+ iterators.get(idx).close();
+ iterators.set(idx, null);
+ }
+ }
+
+ @Override
+ public void close() {
+ IOException exception = null;
+ for (int idx = 0; idx < iterators.size(); idx++) {
+ try {
+ closeItrAtIndex(idx);
+ } catch (IOException e) {
+ exception = e;
+ }
+ }
+ if (exception != null) {
+ throw new UncheckedIOException(exception);
+ }
+ }
+
+ /**
+ * A wrapper class that holds an iterator and its current value for heap
operations.
+ */
+ private static final class HeapEntry<T> implements Comparable<HeapEntry<T>> {
+ private final int index;
Review Comment:
index is not used.
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java:
##########
@@ -228,137 +227,36 @@ public String next() {
}
}
- /**
- * A wrapper class that holds an iterator and its current value for heap
operations.
- */
- private static class HeapEntry<T extends Comparable<T>>
- implements Comparable<HeapEntry<T>>, Closeable {
- private final ClosableIterator<T> iterator;
- private T currentKey;
-
- HeapEntry(ClosableIterator<T> iterator) {
- this.iterator = iterator;
- advance();
- }
-
- @Override
- public void close() {
- iterator.close();
- }
-
- boolean advance() {
- if (iterator.hasNext()) {
- currentKey = iterator.next();
- return true;
- } else {
- currentKey = null;
- return false;
- }
- }
-
- T getCurrentKey() {
- return currentKey;
- }
-
- @Override
- public int compareTo(@Nonnull HeapEntry<T> other) {
- return Comparator.comparing(HeapEntry<T>::getCurrentKey).compare(this,
other);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null || getClass() != obj.getClass()) {
- return false;
- }
-
- HeapEntry<T> other = (HeapEntry<T>) obj;
- return this.compareTo(other) == 0;
- }
-
- @Override
- public int hashCode() {
- return currentKey.hashCode();
- }
- }
-
/**
* The MultipleSstFileIterator class is an abstract base for iterating over
multiple SST files.
* It uses a PriorityQueue to merge keys from all files in sorted order.
* Each file's iterator is wrapped in a HeapEntryWithFileIdx object,
* which ensures stable ordering for identical keys by considering the file
index.
* @param <T>
*/
- private abstract static class MultipleSstFileIterator<T extends
Comparable<T>> implements ClosableIterator<T> {
- private final PriorityQueue<HeapEntry<T>> minHeap;
+ private abstract static class MultipleSstFileIterator<T extends
Comparable<T>>
+ extends MinHeapMergeIterator<T, ClosableIterator<T>, T> {
+ private final List<Path> sstFiles;
private MultipleSstFileIterator(Collection<Path> sstFiles) {
- this.minHeap = new PriorityQueue<>();
+ super(sstFiles.size(), Comparable::compareTo);
init();
- initMinHeap(sstFiles);
+ this.sstFiles =
sstFiles.stream().map(Path::toAbsolutePath).collect(Collectors.toList());
}
protected abstract void init();
protected abstract ClosableIterator<T> getKeyIteratorForFile(String file)
throws RocksDBException, IOException;
- private void initMinHeap(Collection<Path> files) {
- try {
- for (Path file : files) {
- ClosableIterator<T> iterator =
getKeyIteratorForFile(file.toAbsolutePath().toString());
- HeapEntry<T> entry = new HeapEntry<>(iterator);
-
- if (entry.getCurrentKey() != null) {
- minHeap.offer(entry);
- } else {
- // No valid entries, close the iterator
- entry.close();
- }
- }
- } catch (IOException | RocksDBException e) {
- // Clean up any opened iterators
- close();
- throw new RuntimeException("Failed to initialize SST file iterators",
e);
- }
- }
-
@Override
- public boolean hasNext() {
- return !minHeap.isEmpty();
+ protected ClosableIterator<T> getIterator(int idx) throws
RocksDBException, IOException {
+ return getKeyIteratorForFile(sstFiles.get(idx).toString());
}
@Override
- public T next() {
- if (!hasNext()) {
- throw new NoSuchElementException("No more elements found.");
- }
-
- assert minHeap.peek() != null;
- // Get current key from heap
- T currentKey = minHeap.peek().getCurrentKey();
-
- // Advance all entries with the same key (from different files)
- while (!minHeap.isEmpty() &&
Objects.equals(minHeap.peek().getCurrentKey(), currentKey)) {
- HeapEntry<T> entry = minHeap.poll();
- if (entry.advance()) {
- minHeap.offer(entry);
- } else {
- // Iterator is exhausted, close it to prevent resource leak
- entry.close();
- }
- }
-
- return currentKey;
- }
-
- @Override
- public void close() {
- while (!minHeap.isEmpty()) {
- minHeap.poll().close();
- }
+ protected T merge(Map<Integer, T> keys) {
Review Comment:
this is equivalent to the original code. Time complexity is O(1) constant.
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/MinHeapMergeIterator.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ozone.rocksdb.util;
+
+import jakarta.annotation.Nonnull;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
+import org.apache.hadoop.ozone.util.ClosableIterator;
+import org.rocksdb.RocksDBException;
+
+/**
+ * An abstract class that provides functionality to merge elements from
multiple sorted iterators
+ * using a min-heap. The {@code MinHeapMergeIterator} ensures the merged
output is in sorted order
+ * by repeatedly polling the smallest element from the heap of iterators.
+ *
+ * @param <K> the type of keys being merged, must be {@link Comparable}
+ * @param <I> the type of iterators being used, must extend {@link Iterator}
and implement {@link Closeable}
+ * @param <V> the type of the final merged output
+ */
+public abstract class MinHeapMergeIterator<K, I extends Iterator<K> &
Closeable, V>
+ implements ClosableIterator<V> {
+ private final PriorityQueue<HeapEntry<K>> minHeap;
+ private final Map<Integer, K> keys;
+ private final List<I> iterators;
+ private boolean initialized;
+ private final Comparator<K> comparator;
+
+ public MinHeapMergeIterator(int numberOfIterators, Comparator<K> comparator)
{
+ this.minHeap = new PriorityQueue<>(Math.max(numberOfIterators, 1));
+ keys = new HashMap<>(numberOfIterators);
+ iterators = IntStream.range(0, numberOfIterators).mapToObj(i -> (I)
null).collect(Collectors.toList());
+ this.initialized = false;
+ this.comparator = comparator;
+ }
+
+ protected abstract I getIterator(int idx) throws RocksDBException,
IOException;
+
+ private boolean initHeap() throws IOException, RocksDBException {
Review Comment:
moved from SstFileSetRead.MultipleSstFileIterator.initMinHeap()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]