swamirishi commented on code in PR #8912: URL: https://github.com/apache/ozone/pull/8912#discussion_r2341343197
########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java: ########## @@ -233,71 +235,146 @@ public String next() { } } - private abstract static class MultipleSstFileIterator<T> implements ClosableIterator<T> { + /** + * A wrapper class that holds an iterator and its current value for heap operations. + */ + private static class HeapEntryWithFileIdx<T extends Comparable<T>> + implements Comparable<HeapEntryWithFileIdx<T>>, Closeable { + private final ClosableIterator<T> iterator; + private T currentKey; + + HeapEntryWithFileIdx(ClosableIterator<T> iterator, int fileIndex) { + this.iterator = iterator; + advance(); + } + + @Override + public void close() { + iterator.close(); + } + + boolean advance() { + if (iterator.hasNext()) { + currentKey = iterator.next(); + return true; + } else { + currentKey = null; + return false; + } + } + + T getCurrentKey() { + return currentKey; + } + + @Override + public int compareTo(@Nonnull HeapEntryWithFileIdx<T> other) { + if (this.currentKey == null && other.currentKey == null) { + return 0; + } + if (this.currentKey == null) { + return 1; + } + if (other.currentKey == null) { + return -1; + } + return this.currentKey.compareTo(other.currentKey); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } - private final Iterator<String> fileNameIterator; + HeapEntryWithFileIdx<T> other = (HeapEntryWithFileIdx<T>) obj; + return compareTo(other) == 0; + } - private String currentFile; - private ClosableIterator<T> currentFileIterator; + @Override + public int hashCode() { + return Objects.hash(iterator, currentKey); + } + } - private MultipleSstFileIterator(Collection<String> files) { - this.fileNameIterator = files.iterator(); + /** + * The MultipleSstFileIterator class is an abstract base for iterating over multiple SST files. + * It uses a PriorityQueue to merge keys from all files in sorted order. + * Each file's iterator is wrapped in a HeapEntryWithFileIdx object, + * which ensures stable ordering for identical keys by considering the file index. + * @param <T> + */ + private abstract static class MultipleSstFileIterator<T extends Comparable<T>> implements ClosableIterator<T> { + private final PriorityQueue<HeapEntryWithFileIdx<T>> minHeap; + + private MultipleSstFileIterator(Collection<String> sstFiles) { + this.minHeap = new PriorityQueue<>(); init(); + initMinHeap(sstFiles); } protected abstract void init(); protected abstract ClosableIterator<T> getKeyIteratorForFile(String file) throws RocksDBException, IOException; - @Override - public boolean hasNext() { + private void initMinHeap(Collection<String> files) { try { - do { - if (Objects.nonNull(currentFileIterator) && currentFileIterator.hasNext()) { - return true; + int fileIndex = 0; + for (String file : files) { + ClosableIterator<T> iterator = getKeyIteratorForFile(file); + HeapEntryWithFileIdx<T> entry = new HeapEntryWithFileIdx<>(iterator, fileIndex++); + + if (entry.getCurrentKey() != null) { + minHeap.offer(entry); + } else { + // No valid entries, close the iterator + entry.close(); } - } while (moveToNextFile()); + } } catch (IOException | RocksDBException e) { - // TODO: [Snapshot] This exception has to be handled by the caller. - // We have to do better exception handling. - throw new RuntimeException(e); + // Clean up any opened iterators + close(); + throw new RuntimeException("Failed to initialize SST file iterators", e); } - return false; } @Override - public T next() { - if (hasNext()) { - return currentFileIterator.next(); - } - throw new NoSuchElementException("No more elements found."); + public boolean hasNext() { + return !minHeap.isEmpty(); } @Override - public void close() throws UncheckedIOException { - try { - closeCurrentFile(); - } catch (IOException e) { - throw new UncheckedIOException(e); + public T next() { + if (!hasNext()) { + throw new NoSuchElementException("No more elements found."); } - } - private boolean moveToNextFile() throws IOException, RocksDBException { - if (fileNameIterator.hasNext()) { - closeCurrentFile(); - currentFile = fileNameIterator.next(); - this.currentFileIterator = getKeyIteratorForFile(currentFile); - return true; + assert minHeap.peek() != null; + // Get current key from heap + T currentKey = minHeap.peek().getCurrentKey(); Review Comment: ```suggestion HeapEntry currentKey = minHeap.peek(); ``` ########## hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java: ########## @@ -233,71 +235,146 @@ public String next() { } } - private abstract static class MultipleSstFileIterator<T> implements ClosableIterator<T> { + /** + * A wrapper class that holds an iterator and its current value for heap operations. + */ + private static class HeapEntryWithFileIdx<T extends Comparable<T>> + implements Comparable<HeapEntryWithFileIdx<T>>, Closeable { + private final ClosableIterator<T> iterator; + private T currentKey; + + HeapEntryWithFileIdx(ClosableIterator<T> iterator, int fileIndex) { + this.iterator = iterator; + advance(); + } + + @Override + public void close() { + iterator.close(); + } + + boolean advance() { + if (iterator.hasNext()) { + currentKey = iterator.next(); + return true; + } else { + currentKey = null; + return false; + } + } + + T getCurrentKey() { + return currentKey; + } + + @Override + public int compareTo(@Nonnull HeapEntryWithFileIdx<T> other) { + if (this.currentKey == null && other.currentKey == null) { + return 0; + } + if (this.currentKey == null) { + return 1; + } + if (other.currentKey == null) { + return -1; + } + return this.currentKey.compareTo(other.currentKey); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } - private final Iterator<String> fileNameIterator; + HeapEntryWithFileIdx<T> other = (HeapEntryWithFileIdx<T>) obj; + return compareTo(other) == 0; + } - private String currentFile; - private ClosableIterator<T> currentFileIterator; + @Override + public int hashCode() { + return Objects.hash(iterator, currentKey); + } + } - private MultipleSstFileIterator(Collection<String> files) { - this.fileNameIterator = files.iterator(); + /** + * The MultipleSstFileIterator class is an abstract base for iterating over multiple SST files. + * It uses a PriorityQueue to merge keys from all files in sorted order. + * Each file's iterator is wrapped in a HeapEntryWithFileIdx object, + * which ensures stable ordering for identical keys by considering the file index. + * @param <T> + */ + private abstract static class MultipleSstFileIterator<T extends Comparable<T>> implements ClosableIterator<T> { + private final PriorityQueue<HeapEntryWithFileIdx<T>> minHeap; + + private MultipleSstFileIterator(Collection<String> sstFiles) { + this.minHeap = new PriorityQueue<>(); init(); + initMinHeap(sstFiles); } protected abstract void init(); protected abstract ClosableIterator<T> getKeyIteratorForFile(String file) throws RocksDBException, IOException; - @Override - public boolean hasNext() { + private void initMinHeap(Collection<String> files) { try { - do { - if (Objects.nonNull(currentFileIterator) && currentFileIterator.hasNext()) { - return true; + int fileIndex = 0; + for (String file : files) { + ClosableIterator<T> iterator = getKeyIteratorForFile(file); + HeapEntryWithFileIdx<T> entry = new HeapEntryWithFileIdx<>(iterator, fileIndex++); + + if (entry.getCurrentKey() != null) { + minHeap.offer(entry); + } else { + // No valid entries, close the iterator + entry.close(); } - } while (moveToNextFile()); + } } catch (IOException | RocksDBException e) { - // TODO: [Snapshot] This exception has to be handled by the caller. - // We have to do better exception handling. - throw new RuntimeException(e); + // Clean up any opened iterators + close(); + throw new RuntimeException("Failed to initialize SST file iterators", e); } - return false; } @Override - public T next() { - if (hasNext()) { - return currentFileIterator.next(); - } - throw new NoSuchElementException("No more elements found."); + public boolean hasNext() { + return !minHeap.isEmpty(); } @Override - public void close() throws UncheckedIOException { - try { - closeCurrentFile(); - } catch (IOException e) { - throw new UncheckedIOException(e); + public T next() { + if (!hasNext()) { + throw new NoSuchElementException("No more elements found."); } - } - private boolean moveToNextFile() throws IOException, RocksDBException { - if (fileNameIterator.hasNext()) { - closeCurrentFile(); - currentFile = fileNameIterator.next(); - this.currentFileIterator = getKeyIteratorForFile(currentFile); - return true; + assert minHeap.peek() != null; + // Get current key from heap + T currentKey = minHeap.peek().getCurrentKey(); Review Comment: ```suggestion HeapEntry<T> currentKey = minHeap.peek(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For additional commands, e-mail: issues-h...@ozone.apache.org