swamirishi commented on code in PR #8912:
URL: https://github.com/apache/ozone/pull/8912#discussion_r2341343197


##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java:
##########
@@ -233,71 +235,146 @@ public String next() {
     }
   }
 
-  private abstract static class MultipleSstFileIterator<T> implements 
ClosableIterator<T> {
+  /**
+   * A wrapper class that holds an iterator and its current value for heap 
operations.
+   */
+  private static class HeapEntryWithFileIdx<T extends Comparable<T>>
+      implements Comparable<HeapEntryWithFileIdx<T>>, Closeable {
+    private final ClosableIterator<T> iterator;
+    private T currentKey;
+
+    HeapEntryWithFileIdx(ClosableIterator<T> iterator, int fileIndex) {
+      this.iterator = iterator;
+      advance();
+    }
+
+    @Override
+    public void close() {
+      iterator.close();
+    }
+
+    boolean advance() {
+      if (iterator.hasNext()) {
+        currentKey = iterator.next();
+        return true;
+      } else {
+        currentKey = null;
+        return false;
+      }
+    }
+
+    T getCurrentKey() {
+      return currentKey;
+    }
+
+    @Override
+    public int compareTo(@Nonnull HeapEntryWithFileIdx<T> other) {
+      if (this.currentKey == null && other.currentKey == null) {
+        return 0;
+      }
+      if (this.currentKey == null) {
+        return 1;
+      }
+      if (other.currentKey == null) {
+        return -1;
+      }
+      return this.currentKey.compareTo(other.currentKey);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null || getClass() != obj.getClass()) {
+        return false;
+      }
 
-    private final Iterator<String> fileNameIterator;
+      HeapEntryWithFileIdx<T> other = (HeapEntryWithFileIdx<T>) obj;
+      return compareTo(other) == 0;
+    }
 
-    private String currentFile;
-    private ClosableIterator<T> currentFileIterator;
+    @Override
+    public int hashCode() {
+      return Objects.hash(iterator, currentKey);
+    }
+  }
 
-    private MultipleSstFileIterator(Collection<String> files) {
-      this.fileNameIterator = files.iterator();
+  /**
+   * The MultipleSstFileIterator class is an abstract base for iterating over 
multiple SST files.
+   * It uses a PriorityQueue to merge keys from all files in sorted order.
+   * Each file's iterator is wrapped in a HeapEntryWithFileIdx object,
+   * which ensures stable ordering for identical keys by considering the file 
index.
+   * @param <T>
+   */
+  private abstract static class MultipleSstFileIterator<T extends 
Comparable<T>> implements ClosableIterator<T> {
+    private final PriorityQueue<HeapEntryWithFileIdx<T>> minHeap;
+
+    private MultipleSstFileIterator(Collection<String> sstFiles) {
+      this.minHeap = new PriorityQueue<>();
       init();
+      initMinHeap(sstFiles);
     }
 
     protected abstract void init();
 
     protected abstract ClosableIterator<T> getKeyIteratorForFile(String file) 
throws RocksDBException, IOException;
 
-    @Override
-    public boolean hasNext() {
+    private void initMinHeap(Collection<String> files) {
       try {
-        do {
-          if (Objects.nonNull(currentFileIterator) && 
currentFileIterator.hasNext()) {
-            return true;
+        int fileIndex = 0;
+        for (String file : files) {
+          ClosableIterator<T> iterator = getKeyIteratorForFile(file);
+          HeapEntryWithFileIdx<T> entry = new HeapEntryWithFileIdx<>(iterator, 
fileIndex++);
+
+          if (entry.getCurrentKey() != null) {
+            minHeap.offer(entry);
+          } else {
+            // No valid entries, close the iterator
+            entry.close();
           }
-        } while (moveToNextFile());
+        }
       } catch (IOException | RocksDBException e) {
-        // TODO: [Snapshot] This exception has to be handled by the caller.
-        //  We have to do better exception handling.
-        throw new RuntimeException(e);
+        // Clean up any opened iterators
+        close();
+        throw new RuntimeException("Failed to initialize SST file iterators", 
e);
       }
-      return false;
     }
 
     @Override
-    public T next() {
-      if (hasNext()) {
-        return currentFileIterator.next();
-      }
-      throw new NoSuchElementException("No more elements found.");
+    public boolean hasNext() {
+      return !minHeap.isEmpty();
     }
 
     @Override
-    public void close() throws UncheckedIOException {
-      try {
-        closeCurrentFile();
-      } catch (IOException e) {
-        throw new UncheckedIOException(e);
+    public T next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException("No more elements found.");
       }
-    }
 
-    private boolean moveToNextFile() throws IOException, RocksDBException {
-      if (fileNameIterator.hasNext()) {
-        closeCurrentFile();
-        currentFile = fileNameIterator.next();
-        this.currentFileIterator = getKeyIteratorForFile(currentFile);
-        return true;
+      assert minHeap.peek() != null;
+      // Get current key from heap
+      T currentKey = minHeap.peek().getCurrentKey();

Review Comment:
   ```suggestion
         HeapEntry currentKey = minHeap.peek();
   ```



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java:
##########
@@ -233,71 +235,146 @@ public String next() {
     }
   }
 
-  private abstract static class MultipleSstFileIterator<T> implements 
ClosableIterator<T> {
+  /**
+   * A wrapper class that holds an iterator and its current value for heap 
operations.
+   */
+  private static class HeapEntryWithFileIdx<T extends Comparable<T>>
+      implements Comparable<HeapEntryWithFileIdx<T>>, Closeable {
+    private final ClosableIterator<T> iterator;
+    private T currentKey;
+
+    HeapEntryWithFileIdx(ClosableIterator<T> iterator, int fileIndex) {
+      this.iterator = iterator;
+      advance();
+    }
+
+    @Override
+    public void close() {
+      iterator.close();
+    }
+
+    boolean advance() {
+      if (iterator.hasNext()) {
+        currentKey = iterator.next();
+        return true;
+      } else {
+        currentKey = null;
+        return false;
+      }
+    }
+
+    T getCurrentKey() {
+      return currentKey;
+    }
+
+    @Override
+    public int compareTo(@Nonnull HeapEntryWithFileIdx<T> other) {
+      if (this.currentKey == null && other.currentKey == null) {
+        return 0;
+      }
+      if (this.currentKey == null) {
+        return 1;
+      }
+      if (other.currentKey == null) {
+        return -1;
+      }
+      return this.currentKey.compareTo(other.currentKey);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null || getClass() != obj.getClass()) {
+        return false;
+      }
 
-    private final Iterator<String> fileNameIterator;
+      HeapEntryWithFileIdx<T> other = (HeapEntryWithFileIdx<T>) obj;
+      return compareTo(other) == 0;
+    }
 
-    private String currentFile;
-    private ClosableIterator<T> currentFileIterator;
+    @Override
+    public int hashCode() {
+      return Objects.hash(iterator, currentKey);
+    }
+  }
 
-    private MultipleSstFileIterator(Collection<String> files) {
-      this.fileNameIterator = files.iterator();
+  /**
+   * The MultipleSstFileIterator class is an abstract base for iterating over 
multiple SST files.
+   * It uses a PriorityQueue to merge keys from all files in sorted order.
+   * Each file's iterator is wrapped in a HeapEntryWithFileIdx object,
+   * which ensures stable ordering for identical keys by considering the file 
index.
+   * @param <T>
+   */
+  private abstract static class MultipleSstFileIterator<T extends 
Comparable<T>> implements ClosableIterator<T> {
+    private final PriorityQueue<HeapEntryWithFileIdx<T>> minHeap;
+
+    private MultipleSstFileIterator(Collection<String> sstFiles) {
+      this.minHeap = new PriorityQueue<>();
       init();
+      initMinHeap(sstFiles);
     }
 
     protected abstract void init();
 
     protected abstract ClosableIterator<T> getKeyIteratorForFile(String file) 
throws RocksDBException, IOException;
 
-    @Override
-    public boolean hasNext() {
+    private void initMinHeap(Collection<String> files) {
       try {
-        do {
-          if (Objects.nonNull(currentFileIterator) && 
currentFileIterator.hasNext()) {
-            return true;
+        int fileIndex = 0;
+        for (String file : files) {
+          ClosableIterator<T> iterator = getKeyIteratorForFile(file);
+          HeapEntryWithFileIdx<T> entry = new HeapEntryWithFileIdx<>(iterator, 
fileIndex++);
+
+          if (entry.getCurrentKey() != null) {
+            minHeap.offer(entry);
+          } else {
+            // No valid entries, close the iterator
+            entry.close();
           }
-        } while (moveToNextFile());
+        }
       } catch (IOException | RocksDBException e) {
-        // TODO: [Snapshot] This exception has to be handled by the caller.
-        //  We have to do better exception handling.
-        throw new RuntimeException(e);
+        // Clean up any opened iterators
+        close();
+        throw new RuntimeException("Failed to initialize SST file iterators", 
e);
       }
-      return false;
     }
 
     @Override
-    public T next() {
-      if (hasNext()) {
-        return currentFileIterator.next();
-      }
-      throw new NoSuchElementException("No more elements found.");
+    public boolean hasNext() {
+      return !minHeap.isEmpty();
     }
 
     @Override
-    public void close() throws UncheckedIOException {
-      try {
-        closeCurrentFile();
-      } catch (IOException e) {
-        throw new UncheckedIOException(e);
+    public T next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException("No more elements found.");
       }
-    }
 
-    private boolean moveToNextFile() throws IOException, RocksDBException {
-      if (fileNameIterator.hasNext()) {
-        closeCurrentFile();
-        currentFile = fileNameIterator.next();
-        this.currentFileIterator = getKeyIteratorForFile(currentFile);
-        return true;
+      assert minHeap.peek() != null;
+      // Get current key from heap
+      T currentKey = minHeap.peek().getCurrentKey();

Review Comment:
   ```suggestion
         HeapEntry<T> currentKey = minHeap.peek();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org
For additional commands, e-mail: issues-h...@ozone.apache.org

Reply via email to