Copilot commented on code in PR #9409: URL: https://github.com/apache/ozone/pull/9409#discussion_r2587085514
########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/MinHeapMergeIterator.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ozone.rocksdb.util; + +import jakarta.annotation.Nonnull; +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.PriorityQueue; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.hadoop.hdds.utils.db.RocksDatabaseException; +import org.apache.hadoop.ozone.util.ClosableIterator; +import org.rocksdb.RocksDBException; + +/** + * An abstract class that provides functionality to merge elements from multiple sorted iterators + * using a min-heap. The {@code MinHeapMergeIterator} ensures the merged output is in sorted order + * by repeatedly polling the smallest element from the heap of iterators. + * + * @param <K> the type of keys being merged, must be {@link Comparable} + * @param <I> the type of iterators being used, must extend {@link Iterator} and implement {@link Closeable} + * @param <V> the type of the final merged output + */ +public abstract class MinHeapMergeIterator<K, I extends Iterator<K> & Closeable, V> + implements ClosableIterator<V> { + private final PriorityQueue<HeapEntry<K>> minHeap; + private final Map<Integer, K> keys; + private final List<I> iterators; + private boolean initialized; + private final Comparator<K> comparator; + + public MinHeapMergeIterator(int numberOfIterators, Comparator<K> comparator) { + this.minHeap = new PriorityQueue<>(Math.max(numberOfIterators, 1)); + keys = new HashMap<>(numberOfIterators); + iterators = IntStream.range(0, numberOfIterators).mapToObj(i -> (I) null).collect(Collectors.toList()); + this.initialized = false; + this.comparator = comparator; + } + + protected abstract I getIterator(int idx) throws RocksDBException, IOException; + + private boolean initHeap() throws IOException, RocksDBException { + if (initialized) { + return false; + } + initialized = true; + int count = 0; + try { + for (int idx = 0; idx < iterators.size(); idx++) { + I itr = getIterator(idx); + iterators.set(idx, itr); + HeapEntry<K> entry = new HeapEntry<>(idx, itr, comparator); + if (entry.getCurrentKey() != null) { + minHeap.add(entry); + count++; + } else { + // No valid entries, close the iterator. + closeItrAtIndex(idx); + } + } + } catch (IOException | RocksDBException e) { + close(); + throw e; + } + return count > 0; + } + + @Override + public boolean hasNext() { + try { + return !minHeap.isEmpty() || initHeap(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (RocksDBException e) { + throw new UncheckedIOException(new RocksDatabaseException("Error while initializing iterator ", e)); + } + } + + protected abstract V merge(Map<Integer, K> keysToMerge); + + @Override + public V next() { + if (!hasNext()) { + throw new NoSuchElementException("No more elements found."); + } + + assert minHeap.peek() != null; + // Get current key from heap + K currentKey = minHeap.peek().getCurrentKey(); + // Clear the keys list by setting all entries to null. Review Comment: The comment on line 113 says "Clear the keys list by setting all entries to null" but the code actually calls `keys.clear()` which removes all entries, not sets them to null. The comment should be updated to: "Clear the keys map from the previous iteration." ```suggestion // Clear the keys map from the previous iteration. ``` ########## hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java: ########## @@ -170,4 +181,39 @@ DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount) boolean isClosed(); String getSnapshotsParentDir(); + + /** + * Creates an iterator that merges multiple tables into a single iterator, + * grouping values with the same key across the tables. + * + * @param <KEY> the type of keys for the tables + * @param keyComparator the comparator used to compare keys from different tables + * @param prefix the prefix used to filter entries of each table + * @param table one or more tables to merge + * @return a closable iterator over merged key-value pairs, where each key corresponds + * to a collection of values from the tables + */ + default <KEY> ClosableIterator<KeyValue<KEY, Collection<Object>>> getMergeIterator( + Comparator<KEY> keyComparator, KEY prefix, Table<KEY, Object>... table) { + List<Object> tableValues = IntStream.range(0, table.length).mapToObj(i -> null).collect(Collectors.toList()); + KeyValue<KEY, Object> defaultNullValue = newKeyValue(null, null); + Comparator<KeyValue<KEY, Object>> comparator = Comparator.comparing(KeyValue::getKey, keyComparator); + return new MinHeapMergeIterator<KeyValue<KEY, Object>, Table.KeyValueIterator<KEY, Object>, + KeyValue<KEY, Collection<Object>>>(table.length + 1, comparator) { + @Override + protected Table.KeyValueIterator<KEY, Object> getIterator(int idx) throws IOException { + return table[idx].iterator(prefix); + } + + @Override + protected KeyValue<KEY, Collection<Object>> merge(Map<Integer, KeyValue<KEY, Object>> keysToMerge) { + KEY key = keysToMerge.values().stream().findAny() + .orElseThrow(() -> new NoSuchElementException("No keys found")).getKey(); + for (int i = 0; i < tableValues.size(); i++) { + tableValues.set(i, keysToMerge.getOrDefault(i, defaultNullValue).getValue()); + } + return newKeyValue(key, tableValues); Review Comment: The `tableValues` list is reused and mutated on every call to `merge()`. This means all returned `Collection<Object>` values from the iterator share the same underlying list, and subsequent calls to `next()` will modify the contents of previously returned collections. This can lead to subtle bugs if callers retain references to these collections. Consider creating a new list for each merge result: `return newKeyValue(key, new ArrayList<>(tableValues));` ```suggestion return newKeyValue(key, new java.util.ArrayList<>(tableValues)); ``` ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/MinHeapMergeIterator.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ozone.rocksdb.util; + +import jakarta.annotation.Nonnull; +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.PriorityQueue; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.hadoop.hdds.utils.db.RocksDatabaseException; +import org.apache.hadoop.ozone.util.ClosableIterator; +import org.rocksdb.RocksDBException; + +/** + * An abstract class that provides functionality to merge elements from multiple sorted iterators + * using a min-heap. The {@code MinHeapMergeIterator} ensures the merged output is in sorted order + * by repeatedly polling the smallest element from the heap of iterators. + * + * @param <K> the type of keys being merged, must be {@link Comparable} Review Comment: The class javadoc states that K "must be {@link Comparable}" but this is not accurate. The type parameter K doesn't have a Comparable bound, and the class accepts any type with a Comparator. The documentation should be updated to reflect that K needs a Comparator provided via the constructor. ```suggestion * @param <K> the type of keys being merged; a {@link Comparator} for K must be provided via the constructor ``` ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/MinHeapMergeIterator.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ozone.rocksdb.util; + +import jakarta.annotation.Nonnull; +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.PriorityQueue; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.hadoop.hdds.utils.db.RocksDatabaseException; +import org.apache.hadoop.ozone.util.ClosableIterator; +import org.rocksdb.RocksDBException; + +/** + * An abstract class that provides functionality to merge elements from multiple sorted iterators + * using a min-heap. The {@code MinHeapMergeIterator} ensures the merged output is in sorted order + * by repeatedly polling the smallest element from the heap of iterators. + * + * @param <K> the type of keys being merged, must be {@link Comparable} + * @param <I> the type of iterators being used, must extend {@link Iterator} and implement {@link Closeable} + * @param <V> the type of the final merged output + */ +public abstract class MinHeapMergeIterator<K, I extends Iterator<K> & Closeable, V> + implements ClosableIterator<V> { + private final PriorityQueue<HeapEntry<K>> minHeap; + private final Map<Integer, K> keys; + private final List<I> iterators; + private boolean initialized; + private final Comparator<K> comparator; + + public MinHeapMergeIterator(int numberOfIterators, Comparator<K> comparator) { + this.minHeap = new PriorityQueue<>(Math.max(numberOfIterators, 1)); + keys = new HashMap<>(numberOfIterators); + iterators = IntStream.range(0, numberOfIterators).mapToObj(i -> (I) null).collect(Collectors.toList()); + this.initialized = false; + this.comparator = comparator; + } + + protected abstract I getIterator(int idx) throws RocksDBException, IOException; + + private boolean initHeap() throws IOException, RocksDBException { + if (initialized) { + return false; + } + initialized = true; + int count = 0; + try { + for (int idx = 0; idx < iterators.size(); idx++) { + I itr = getIterator(idx); + iterators.set(idx, itr); + HeapEntry<K> entry = new HeapEntry<>(idx, itr, comparator); + if (entry.getCurrentKey() != null) { + minHeap.add(entry); + count++; + } else { + // No valid entries, close the iterator. + closeItrAtIndex(idx); + } + } + } catch (IOException | RocksDBException e) { + close(); + throw e; + } + return count > 0; + } + + @Override + public boolean hasNext() { + try { + return !minHeap.isEmpty() || initHeap(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (RocksDBException e) { + throw new UncheckedIOException(new RocksDatabaseException("Error while initializing iterator ", e)); + } + } + + protected abstract V merge(Map<Integer, K> keysToMerge); + + @Override + public V next() { + if (!hasNext()) { + throw new NoSuchElementException("No more elements found."); + } + + assert minHeap.peek() != null; + // Get current key from heap + K currentKey = minHeap.peek().getCurrentKey(); + // Clear the keys list by setting all entries to null. + keys.clear(); + // Advance all entries with the same key (from different files) + while (!minHeap.isEmpty() && Objects.equals(minHeap.peek().getCurrentKey(), currentKey)) { + HeapEntry<K> entry = minHeap.poll(); + int idx = entry.index; + // Set the key for the current entry in the keys list. + keys.put(idx, entry.getCurrentKey()); + if (entry.advance()) { + minHeap.offer(entry); + } else { + // Iterator is exhausted, close it to prevent resource leak + try { + closeItrAtIndex(idx); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + return merge(keys); + } + + private void closeItrAtIndex(int idx) throws IOException { + if (iterators.get(idx) != null) { + iterators.get(idx).close(); + iterators.set(idx, null); + } + } + Review Comment: This method overrides [ClosableIterator<V>.close](1); it is advisable to add an Override annotation. ```suggestion @Override ``` ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java: ########## @@ -228,137 +227,36 @@ public String next() { } } - /** - * A wrapper class that holds an iterator and its current value for heap operations. - */ - private static class HeapEntry<T extends Comparable<T>> - implements Comparable<HeapEntry<T>>, Closeable { - private final ClosableIterator<T> iterator; - private T currentKey; - - HeapEntry(ClosableIterator<T> iterator) { - this.iterator = iterator; - advance(); - } - - @Override - public void close() { - iterator.close(); - } - - boolean advance() { - if (iterator.hasNext()) { - currentKey = iterator.next(); - return true; - } else { - currentKey = null; - return false; - } - } - - T getCurrentKey() { - return currentKey; - } - - @Override - public int compareTo(@Nonnull HeapEntry<T> other) { - return Comparator.comparing(HeapEntry<T>::getCurrentKey).compare(this, other); - } - - @Override - @SuppressWarnings("unchecked") - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { - return false; - } - - HeapEntry<T> other = (HeapEntry<T>) obj; - return this.compareTo(other) == 0; - } - - @Override - public int hashCode() { - return currentKey.hashCode(); - } - } - /** * The MultipleSstFileIterator class is an abstract base for iterating over multiple SST files. * It uses a PriorityQueue to merge keys from all files in sorted order. * Each file's iterator is wrapped in a HeapEntryWithFileIdx object, * which ensures stable ordering for identical keys by considering the file index. * @param <T> Review Comment: The comment still mentions "HeapEntryWithFileIdx" and "stable ordering for identical keys by considering the file index", but the current implementation no longer maintains stable ordering by file index. The HeapEntry class was moved to MinHeapMergeIterator and doesn't use file index for comparison. This comment should be updated to reflect the current behavior. ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/MinHeapMergeIterator.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ozone.rocksdb.util; + +import jakarta.annotation.Nonnull; +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.PriorityQueue; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.hadoop.hdds.utils.db.RocksDatabaseException; +import org.apache.hadoop.ozone.util.ClosableIterator; +import org.rocksdb.RocksDBException; + +/** + * An abstract class that provides functionality to merge elements from multiple sorted iterators + * using a min-heap. The {@code MinHeapMergeIterator} ensures the merged output is in sorted order + * by repeatedly polling the smallest element from the heap of iterators. + * + * @param <K> the type of keys being merged, must be {@link Comparable} + * @param <I> the type of iterators being used, must extend {@link Iterator} and implement {@link Closeable} + * @param <V> the type of the final merged output + */ +public abstract class MinHeapMergeIterator<K, I extends Iterator<K> & Closeable, V> + implements ClosableIterator<V> { + private final PriorityQueue<HeapEntry<K>> minHeap; + private final Map<Integer, K> keys; + private final List<I> iterators; + private boolean initialized; + private final Comparator<K> comparator; + + public MinHeapMergeIterator(int numberOfIterators, Comparator<K> comparator) { + this.minHeap = new PriorityQueue<>(Math.max(numberOfIterators, 1)); + keys = new HashMap<>(numberOfIterators); + iterators = IntStream.range(0, numberOfIterators).mapToObj(i -> (I) null).collect(Collectors.toList()); + this.initialized = false; + this.comparator = comparator; + } + + protected abstract I getIterator(int idx) throws RocksDBException, IOException; + + private boolean initHeap() throws IOException, RocksDBException { + if (initialized) { + return false; + } + initialized = true; + int count = 0; + try { + for (int idx = 0; idx < iterators.size(); idx++) { + I itr = getIterator(idx); + iterators.set(idx, itr); + HeapEntry<K> entry = new HeapEntry<>(idx, itr, comparator); + if (entry.getCurrentKey() != null) { + minHeap.add(entry); + count++; + } else { + // No valid entries, close the iterator. + closeItrAtIndex(idx); + } + } + } catch (IOException | RocksDBException e) { + close(); + throw e; + } + return count > 0; + } + + @Override + public boolean hasNext() { + try { + return !minHeap.isEmpty() || initHeap(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (RocksDBException e) { + throw new UncheckedIOException(new RocksDatabaseException("Error while initializing iterator ", e)); + } + } + + protected abstract V merge(Map<Integer, K> keysToMerge); + + @Override + public V next() { + if (!hasNext()) { + throw new NoSuchElementException("No more elements found."); + } + + assert minHeap.peek() != null; + // Get current key from heap + K currentKey = minHeap.peek().getCurrentKey(); + // Clear the keys list by setting all entries to null. + keys.clear(); + // Advance all entries with the same key (from different files) + while (!minHeap.isEmpty() && Objects.equals(minHeap.peek().getCurrentKey(), currentKey)) { + HeapEntry<K> entry = minHeap.poll(); + int idx = entry.index; + // Set the key for the current entry in the keys list. + keys.put(idx, entry.getCurrentKey()); + if (entry.advance()) { + minHeap.offer(entry); + } else { + // Iterator is exhausted, close it to prevent resource leak + try { + closeItrAtIndex(idx); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + return merge(keys); + } + + private void closeItrAtIndex(int idx) throws IOException { + if (iterators.get(idx) != null) { + iterators.get(idx).close(); + iterators.set(idx, null); + } + } + + public void close() { + IOException exception = null; + for (int idx = 0; idx < iterators.size(); idx++) { + try { + closeItrAtIndex(idx); + } catch (IOException e) { + exception = e; + } + } + if (exception != null) { + throw new UncheckedIOException(exception); + } + } Review Comment: The close() method doesn't clear the minHeap after closing all iterators. While the iterators themselves are closed and set to null in the list, HeapEntry objects remain in the minHeap, preventing proper garbage collection and potentially causing issues if the iterator is accidentally used after close(). ########## hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java: ########## @@ -170,4 +181,39 @@ DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount) boolean isClosed(); String getSnapshotsParentDir(); + + /** + * Creates an iterator that merges multiple tables into a single iterator, + * grouping values with the same key across the tables. + * + * @param <KEY> the type of keys for the tables + * @param keyComparator the comparator used to compare keys from different tables + * @param prefix the prefix used to filter entries of each table + * @param table one or more tables to merge + * @return a closable iterator over merged key-value pairs, where each key corresponds + * to a collection of values from the tables + */ + default <KEY> ClosableIterator<KeyValue<KEY, Collection<Object>>> getMergeIterator( + Comparator<KEY> keyComparator, KEY prefix, Table<KEY, Object>... table) { Review Comment: The varargs parameter `table` is not validated for null or empty. If an empty array or null is passed, this will cause issues: an empty array will create an iterator that never returns elements but won't fail gracefully, and null will cause a NullPointerException. Consider adding validation at the start of the method. ```suggestion Comparator<KEY> keyComparator, KEY prefix, Table<KEY, Object>... table) { if (table == null || table.length == 0) { throw new IllegalArgumentException("At least one table must be provided to getMergeIterator."); } ``` ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/MinHeapMergeIterator.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ozone.rocksdb.util; + +import jakarta.annotation.Nonnull; +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.PriorityQueue; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.hadoop.hdds.utils.db.RocksDatabaseException; +import org.apache.hadoop.ozone.util.ClosableIterator; +import org.rocksdb.RocksDBException; + +/** + * An abstract class that provides functionality to merge elements from multiple sorted iterators + * using a min-heap. The {@code MinHeapMergeIterator} ensures the merged output is in sorted order + * by repeatedly polling the smallest element from the heap of iterators. + * + * @param <K> the type of keys being merged, must be {@link Comparable} + * @param <I> the type of iterators being used, must extend {@link Iterator} and implement {@link Closeable} + * @param <V> the type of the final merged output + */ +public abstract class MinHeapMergeIterator<K, I extends Iterator<K> & Closeable, V> + implements ClosableIterator<V> { + private final PriorityQueue<HeapEntry<K>> minHeap; + private final Map<Integer, K> keys; + private final List<I> iterators; + private boolean initialized; + private final Comparator<K> comparator; + + public MinHeapMergeIterator(int numberOfIterators, Comparator<K> comparator) { + this.minHeap = new PriorityQueue<>(Math.max(numberOfIterators, 1)); + keys = new HashMap<>(numberOfIterators); + iterators = IntStream.range(0, numberOfIterators).mapToObj(i -> (I) null).collect(Collectors.toList()); + this.initialized = false; + this.comparator = comparator; + } + + protected abstract I getIterator(int idx) throws RocksDBException, IOException; + + private boolean initHeap() throws IOException, RocksDBException { + if (initialized) { + return false; + } + initialized = true; + int count = 0; + try { + for (int idx = 0; idx < iterators.size(); idx++) { + I itr = getIterator(idx); + iterators.set(idx, itr); + HeapEntry<K> entry = new HeapEntry<>(idx, itr, comparator); + if (entry.getCurrentKey() != null) { + minHeap.add(entry); + count++; + } else { + // No valid entries, close the iterator. + closeItrAtIndex(idx); + } + } + } catch (IOException | RocksDBException e) { + close(); + throw e; + } + return count > 0; + } + + @Override + public boolean hasNext() { + try { + return !minHeap.isEmpty() || initHeap(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (RocksDBException e) { + throw new UncheckedIOException(new RocksDatabaseException("Error while initializing iterator ", e)); + } + } + + protected abstract V merge(Map<Integer, K> keysToMerge); + + @Override + public V next() { + if (!hasNext()) { + throw new NoSuchElementException("No more elements found."); + } + + assert minHeap.peek() != null; + // Get current key from heap + K currentKey = minHeap.peek().getCurrentKey(); + // Clear the keys list by setting all entries to null. + keys.clear(); + // Advance all entries with the same key (from different files) + while (!minHeap.isEmpty() && Objects.equals(minHeap.peek().getCurrentKey(), currentKey)) { + HeapEntry<K> entry = minHeap.poll(); + int idx = entry.index; + // Set the key for the current entry in the keys list. + keys.put(idx, entry.getCurrentKey()); + if (entry.advance()) { + minHeap.offer(entry); + } else { + // Iterator is exhausted, close it to prevent resource leak + try { + closeItrAtIndex(idx); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + return merge(keys); + } + + private void closeItrAtIndex(int idx) throws IOException { + if (iterators.get(idx) != null) { + iterators.get(idx).close(); + iterators.set(idx, null); + } + } + + public void close() { + IOException exception = null; + for (int idx = 0; idx < iterators.size(); idx++) { + try { + closeItrAtIndex(idx); + } catch (IOException e) { + exception = e; + } + } + if (exception != null) { + throw new UncheckedIOException(exception); Review Comment: The close() method only preserves the last IOException that occurred during cleanup. If multiple iterators throw exceptions during close(), only the last one is thrown, potentially hiding other important errors. Consider collecting all exceptions or using a suppressed exception pattern to preserve all error information. ```suggestion IOException firstException = null; for (int idx = 0; idx < iterators.size(); idx++) { try { closeItrAtIndex(idx); } catch (IOException e) { if (firstException == null) { firstException = e; } else { firstException.addSuppressed(e); } } } if (firstException != null) { throw new UncheckedIOException(firstException); ``` ########## hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java: ########## @@ -170,4 +181,39 @@ DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount) boolean isClosed(); String getSnapshotsParentDir(); + + /** + * Creates an iterator that merges multiple tables into a single iterator, + * grouping values with the same key across the tables. + * + * @param <KEY> the type of keys for the tables + * @param keyComparator the comparator used to compare keys from different tables + * @param prefix the prefix used to filter entries of each table + * @param table one or more tables to merge + * @return a closable iterator over merged key-value pairs, where each key corresponds + * to a collection of values from the tables + */ + default <KEY> ClosableIterator<KeyValue<KEY, Collection<Object>>> getMergeIterator( + Comparator<KEY> keyComparator, KEY prefix, Table<KEY, Object>... table) { + List<Object> tableValues = IntStream.range(0, table.length).mapToObj(i -> null).collect(Collectors.toList()); + KeyValue<KEY, Object> defaultNullValue = newKeyValue(null, null); + Comparator<KeyValue<KEY, Object>> comparator = Comparator.comparing(KeyValue::getKey, keyComparator); + return new MinHeapMergeIterator<KeyValue<KEY, Object>, Table.KeyValueIterator<KEY, Object>, + KeyValue<KEY, Collection<Object>>>(table.length + 1, comparator) { Review Comment: The constructor creates a PriorityQueue with size `Math.max(numberOfIterators, 1)` but then creates a MinHeapMergeIterator with `table.length + 1` on line 202. This results in a heap size one larger than the number of tables, which is wasteful and inconsistent. The size should be `table.length` not `table.length + 1`. ```suggestion KeyValue<KEY, Collection<Object>>>(table.length, comparator) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
