This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5edf3a965  [Feature] Optimize SortMergeReader: use loser tree to 
reduce comparisons (#833)
5edf3a965 is described below

commit 5edf3a9658e6bb927b5c53cc16dbaa713541c3e9
Author: liming.1018 <[email protected]>
AuthorDate: Thu Apr 27 17:18:30 2023 +0800

     [Feature] Optimize SortMergeReader: use loser tree to reduce comparisons 
(#833)
---
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  36 +++
 .../apache/paimon/mergetree/MergeTreeReaders.java  |  13 +-
 .../compact/ChangelogMergeTreeRewriter.java        |  13 +-
 .../FullChangelogMergeTreeCompactRewriter.java     |   6 +-
 .../compact/LookupMergeTreeCompactRewriter.java    |   6 +-
 .../apache/paimon/mergetree/compact/LoserTree.java | 354 +++++++++++++++++++++
 .../compact/MergeTreeCompactRewriter.java          |  13 +-
 .../paimon/mergetree/compact/SortMergeReader.java  | 179 +----------
 .../compact/SortMergeReaderWithLoserTree.java      | 102 ++++++
 ...Reader.java => SortMergeReaderWithMinHeap.java} |  14 +-
 .../paimon/operation/KeyValueFileStoreRead.java    |   8 +-
 .../paimon/operation/KeyValueFileStoreWrite.java   |  16 +-
 .../{MergeTreeTest.java => MergeTreeTestBase.java} |  31 +-
 .../compact/CombiningRecordReaderTestBase.java     |  20 +-
 .../mergetree/compact/ConcatRecordReaderTest.java  |   9 +-
 .../paimon/mergetree/compact/LoserTreeTest.java    |  81 +++++
 .../mergetree/compact/SortMergeReaderTestBase.java |  57 ++--
 18 files changed, 739 insertions(+), 225 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 1931791e7..16138d13b 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -188,6 +188,12 @@
             <td><p>Enum</p></td>
             <td>Specify the merge engine for table with primary key.<br /><br 
/>Possible values:<ul><li>"deduplicate": De-duplicate and keep the last 
row.</li><li>"partial-update": Partial update non-null 
fields.</li><li>"aggregation": Aggregate fields with same primary 
key.</li></ul></td>
         </tr>
+        <tr>
+            <td><h5>sort-engine</h5></td>
+            <td style="word-wrap: break-word;">loser-tree</td>
+            <td><p>Enum</p></td>
+            <td>Specify the sort engine for table with primary key.<br /><br 
/>Possible values:<ul><li>"min-heap": Use min-heap for multiway 
sorting.</li><li>"loser-tree": Use loser-tree for multiway sorting. Compared 
with heapsort, loser-tree has fewer comparisons and is more 
efficient.</li></ul></td>
+        </tr>
         <tr>
             <td><h5>num-levels</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index e034402eb..dc2e310c8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -186,6 +186,12 @@ public class CoreOptions implements Serializable {
                     .defaultValue(false)
                     .withDescription("Whether to ignore delete records in 
partial-update mode.");
 
+    public static final ConfigOption<SortEngine> SORT_ENGINE =
+            key("sort-engine")
+                    .enumType(SortEngine.class)
+                    .defaultValue(SortEngine.LOSER_TREE)
+                    .withDescription("Specify the sort engine for table with 
primary key.");
+
     @Immutable
     public static final ConfigOption<WriteMode> WRITE_MODE =
             key("write-mode")
@@ -706,6 +712,10 @@ public class CoreOptions implements Serializable {
         return options.get(MERGE_ENGINE);
     }
 
+    public SortEngine sortEngine() {
+        return options.get(SORT_ENGINE);
+    }
+
     public long splitTargetSize() {
         return options.get(SOURCE_SPLIT_TARGET_SIZE).getBytes();
     }
@@ -1191,4 +1201,30 @@ public class CoreOptions implements Serializable {
         }
         return immutableKeys;
     }
+
+    /** Specifies the sort engine for table with primary key. */
+    public enum SortEngine implements DescribedEnum {
+        MIN_HEAP("min-heap", "Use min-heap for multiway sorting."),
+        LOSER_TREE(
+                "loser-tree",
+                "Use loser-tree for multiway sorting. Compared with heapsort, 
loser-tree has fewer comparisons and is more efficient.");
+
+        private final String value;
+        private final String description;
+
+        SortEngine(String value, String description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return text(description);
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
index 624aca091..ed2468056 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.mergetree;
 
+import org.apache.paimon.CoreOptions.SortEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
@@ -45,7 +46,8 @@ public class MergeTreeReaders {
             boolean dropDelete,
             KeyValueFileReaderFactory readerFactory,
             Comparator<InternalRow> userKeyComparator,
-            MergeFunction<KeyValue> mergeFunction)
+            MergeFunction<KeyValue> mergeFunction,
+            SortEngine sortEngine)
             throws IOException {
         List<ConcatRecordReader.ReaderSupplier<KeyValue>> readers = new 
ArrayList<>();
         for (List<SortedRun> section : sections) {
@@ -55,7 +57,8 @@ public class MergeTreeReaders {
                                     section,
                                     readerFactory,
                                     userKeyComparator,
-                                    new 
ReducerMergeFunctionWrapper(mergeFunction)));
+                                    new 
ReducerMergeFunctionWrapper(mergeFunction),
+                                    sortEngine));
         }
         RecordReader<KeyValue> reader = ConcatRecordReader.create(readers);
         if (dropDelete) {
@@ -68,13 +71,15 @@ public class MergeTreeReaders {
             List<SortedRun> section,
             KeyValueFileReaderFactory readerFactory,
             Comparator<InternalRow> userKeyComparator,
-            MergeFunctionWrapper<KeyValue> mergeFunctionWrapper)
+            MergeFunctionWrapper<KeyValue> mergeFunctionWrapper,
+            SortEngine sortEngine)
             throws IOException {
         List<RecordReader<KeyValue>> readers = readerForSection(section, 
readerFactory);
         if (readers.size() == 1) {
             return readers.get(0);
         } else {
-            return new SortMergeReader<>(readers, userKeyComparator, 
mergeFunctionWrapper);
+            return SortMergeReader.createSortMergeReader(
+                    readers, userKeyComparator, mergeFunctionWrapper, 
sortEngine);
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index 4e704db5b..fbfa81d61 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.mergetree.compact;
 
+import org.apache.paimon.CoreOptions.SortEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.compact.CompactResult;
 import org.apache.paimon.data.InternalRow;
@@ -42,8 +43,9 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
             KeyValueFileReaderFactory readerFactory,
             KeyValueFileWriterFactory writerFactory,
             Comparator<InternalRow> keyComparator,
-            MergeFunctionFactory<KeyValue> mfFactory) {
-        super(readerFactory, writerFactory, keyComparator, mfFactory);
+            MergeFunctionFactory<KeyValue> mfFactory,
+            SortEngine sortEngine) {
+        super(readerFactory, writerFactory, keyComparator, mfFactory, 
sortEngine);
     }
 
     protected abstract boolean rewriteChangelog(
@@ -71,8 +73,11 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
                     () -> {
                         List<RecordReader<KeyValue>> runReaders =
                                 MergeTreeReaders.readerForSection(section, 
readerFactory);
-                        return new SortMergeReader<>(
-                                runReaders, keyComparator, 
createMergeWrapper(outputLevel));
+                        return SortMergeReader.createSortMergeReader(
+                                runReaders,
+                                keyComparator,
+                                createMergeWrapper(outputLevel),
+                                sortEngine);
                     });
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
index 207f1c258..9e9d8b0a7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.mergetree.compact;
 
+import org.apache.paimon.CoreOptions.SortEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
@@ -40,8 +41,9 @@ public class FullChangelogMergeTreeCompactRewriter extends 
ChangelogMergeTreeRew
             KeyValueFileReaderFactory readerFactory,
             KeyValueFileWriterFactory writerFactory,
             Comparator<InternalRow> keyComparator,
-            MergeFunctionFactory<KeyValue> mfFactory) {
-        super(readerFactory, writerFactory, keyComparator, mfFactory);
+            MergeFunctionFactory<KeyValue> mfFactory,
+            SortEngine sortEngine) {
+        super(readerFactory, writerFactory, keyComparator, mfFactory, 
sortEngine);
         this.maxLevel = maxLevel;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index c83f0ca4d..e799fe73d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.mergetree.compact;
 
+import org.apache.paimon.CoreOptions.SortEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
@@ -44,8 +45,9 @@ public class LookupMergeTreeCompactRewriter extends 
ChangelogMergeTreeRewriter {
             KeyValueFileReaderFactory readerFactory,
             KeyValueFileWriterFactory writerFactory,
             Comparator<InternalRow> keyComparator,
-            MergeFunctionFactory<KeyValue> mfFactory) {
-        super(readerFactory, writerFactory, keyComparator, mfFactory);
+            MergeFunctionFactory<KeyValue> mfFactory,
+            SortEngine sortEngine) {
+        super(readerFactory, writerFactory, keyComparator, mfFactory, 
sortEngine);
         this.lookupLevels = lookupLevels;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LoserTree.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LoserTree.java
new file mode 100644
index 000000000..b35be96f9
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LoserTree.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.utils.ExceptionUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * A variant of the loser tree. In the LSM-Tree architecture, there will be 
duplicate Keys in
+ * multiple {@link RecordReader}, and these Keys need to be merged. In the 
loser tree, we return in
+ * the order of the Keys, but because the returned objects may be reused in 
the {@link RecordReader}
+ * or the {@link MergeFunction}, for a single {@link RecordReader}, we cannot 
get the next Key
+ * immediately after returning a Key, and we need to wait until the same Key 
in all {@link
+ * RecordReader} is returned before proceeding to the next Key.
+ *
+ * <p>The process of building the loser tree is the same as a regular loser 
tree. The difference is
+ * that in the process of adjusting the tree, we need to record the index of 
the same key and the
+ * state of the winner/loser for subsequent quick adjustment of the position 
of the winner.
+ *
+ * <p>Detailed design can refer to 
https://cwiki.apache.org/confluence/x/9Ak0Dw.
+ */
+public class LoserTree<T> implements Closeable {
+    private final int[] tree;
+    private final int size;
+    private final List<LeafIterator<T>> leaves;
+
+    /**
+     * if comparator.compare('a', 'b') > 0, then 'a' is the winner. In the 
following implementation,
+     * we always let 'a' represent the parent node.
+     */
+    private final Comparator<T> firstComparator;
+
+    /** same as firstComparator, but mainly used to compare sequenceNumber. */
+    private final Comparator<T> secondComparator;
+
+    private boolean initialized;
+
+    public LoserTree(
+            List<RecordReader<T>> nextBatchReaders,
+            Comparator<T> firstComparator,
+            Comparator<T> secondComparator) {
+        this.size = nextBatchReaders.size();
+        this.leaves = new ArrayList<>(size);
+        this.tree = new int[size];
+        // if e1 and e2 are both null, it doesn't matter who becomes the new 
winner. But if
+        // firstComparator returns 0, it means that secondComparator must be 
used to compare again.
+        this.firstComparator =
+                (e1, e2) -> e1 == null ? -1 : (e2 == null ? 1 : 
firstComparator.compare(e1, e2));
+        this.secondComparator =
+                (e1, e2) -> e1 == null ? -1 : (e2 == null ? 1 : 
secondComparator.compare(e1, e2));
+        this.initialized = false;
+
+        for (RecordReader<T> reader : nextBatchReaders) {
+            LeafIterator<T> iterator = new LeafIterator<>(reader);
+            this.leaves.add(iterator);
+        }
+    }
+
+    /** Initialize the loser tree in the same way as the regular loser tree. */
+    public void initializeIfNeeded() throws IOException {
+        if (!initialized) {
+            Arrays.fill(tree, -1);
+            for (int i = size - 1; i >= 0; i--) {
+                leaves.get(i).advanceIfAvailable();
+                adjust(i);
+            }
+            initialized = true;
+        }
+    }
+
+    /** Adjust the Key that needs to be returned in the next round. */
+    public void adjustForNextLoop() throws IOException {
+        LeafIterator<T> winner = leaves.get(tree[0]);
+        while (winner.state == State.WINNER_POPPED) {
+            winner.advanceIfAvailable();
+            adjust(tree[0]);
+            winner = leaves.get(tree[0]);
+        }
+    }
+
+    /** Pop the current winner and update its state to {@link 
State#WINNER_POPPED}. */
+    public T popWinner() {
+        LeafIterator<T> winner = leaves.get(tree[0]);
+        if (winner.state == State.WINNER_POPPED) {
+            // if the winner has already been popped, it means that all the 
same key has been
+            // processed.
+            return null;
+        }
+        T result = winner.pop();
+        adjust(tree[0]);
+        return result;
+    }
+
+    /** Peek the current winner, mainly for key comparisons. */
+    public T peekWinner() {
+        return leaves.get(tree[0]).state != State.WINNER_POPPED ? 
leaves.get(tree[0]).peek() : null;
+    }
+
+    /**
+     * Adjust the winner from bottom to top. Using different {@link State}, we 
can quickly compare
+     * whether all the current same keys have been processed.
+     */
+    private void adjust(int winner) {
+        for (int parent = (winner + this.size) / 2; parent > 0 && winner >= 0; 
parent /= 2) {
+            LeafIterator<T> winnerNode = leaves.get(winner);
+            LeafIterator<T> parentNode;
+            if (this.tree[parent] == -1) {
+                // initialize the tree.
+                winnerNode.state = State.LOSER_WITH_NEW_KEY;
+            } else {
+                parentNode = leaves.get(this.tree[parent]);
+                switch (winnerNode.state) {
+                    case WINNER_WITH_NEW_KEY:
+                        adjustWithNewWinnerKey(parent, parentNode, winnerNode);
+                        break;
+                    case WINNER_WITH_SAME_KEY:
+                        adjustWithSameWinnerKey(parent, parentNode, 
winnerNode);
+                        break;
+                    case WINNER_POPPED:
+                        if (winnerNode.firstSameKeyIndex < 0) {
+                            // fast path, which means that the same key is not 
yet processed in the
+                            // current tree.
+                            parent = -1;
+                        } else {
+                            // fast path. Directly exchange positions with the 
same key that has not
+                            // yet been processed, no need to compare level by 
level.
+                            parent = winnerNode.firstSameKeyIndex;
+                            parentNode = leaves.get(this.tree[parent]);
+                            winnerNode.state = State.LOSER_POPPED;
+                            parentNode.state = State.WINNER_WITH_SAME_KEY;
+                        }
+                        break;
+                    default:
+                        throw new UnsupportedOperationException(
+                                "unknown state for " + 
winnerNode.state.name());
+                }
+            }
+
+            // if the winner loses, exchange nodes.
+            if (!winnerNode.state.isWinner()) {
+                int tmp = winner;
+                winner = this.tree[parent];
+                this.tree[parent] = tmp;
+            }
+        }
+        this.tree[0] = winner;
+    }
+
+    /** The winner node has the same userKey as the global winner. */
+    private void adjustWithSameWinnerKey(
+            int index, LeafIterator<T> parentNode, LeafIterator<T> winnerNode) 
{
+        switch (parentNode.state) {
+            case LOSER_WITH_SAME_KEY:
+                // the key of the previous loser is the same as the key of the 
current winner,
+                // only the sequence needs to be compared.
+                T parentKey = parentNode.peek();
+                T childKey = winnerNode.peek();
+                int secondResult = secondComparator.compare(parentKey, 
childKey);
+                if (secondResult > 0) {
+                    parentNode.state = State.WINNER_WITH_SAME_KEY;
+                    winnerNode.state = State.LOSER_WITH_SAME_KEY;
+                    parentNode.setFirstSameKeyIndex(index);
+                } else {
+                    winnerNode.setFirstSameKeyIndex(index);
+                }
+                return;
+            case LOSER_WITH_NEW_KEY:
+            case LOSER_POPPED:
+                return;
+            default:
+                throw new UnsupportedOperationException(
+                        "unknown state for " + parentNode.state.name());
+        }
+    }
+
+    /**
+     * The userKey of the new local winner node is different from that of the 
previous global
+     * winner.
+     */
+    private void adjustWithNewWinnerKey(
+            int index, LeafIterator<T> parentNode, LeafIterator<T> winnerNode) 
{
+        switch (parentNode.state) {
+            case LOSER_WITH_NEW_KEY:
+                // when the new winner is also a new key, it needs to be 
compared.
+                T parentKey = parentNode.peek();
+                T childKey = winnerNode.peek();
+                int firstResult = firstComparator.compare(parentKey, childKey);
+                if (firstResult == 0) {
+                    // if the compared keys are the same, we need to update 
the state of the node
+                    // and record the index of the same key for the winner.
+                    int secondResult = secondComparator.compare(parentKey, 
childKey);
+                    if (secondResult < 0) {
+                        parentNode.state = State.LOSER_WITH_SAME_KEY;
+                        winnerNode.setFirstSameKeyIndex(index);
+                    } else {
+                        winnerNode.state = State.LOSER_WITH_SAME_KEY;
+                        parentNode.state = State.WINNER_WITH_NEW_KEY;
+                        parentNode.setFirstSameKeyIndex(index);
+                    }
+                } else if (firstResult > 0) {
+                    // the two keys are completely different and just need to 
update the state.
+                    parentNode.state = State.WINNER_WITH_NEW_KEY;
+                    winnerNode.state = State.LOSER_WITH_NEW_KEY;
+                }
+                return;
+            case LOSER_WITH_SAME_KEY:
+                // A node in the WINNER_WITH_NEW_KEY state cannot encounter a 
node in the
+                // LOSER_WITH_SAME_KEY state.
+                throw new RuntimeException(
+                        "This is a bug. Please file an issue. A node in the 
WINNER_WITH_NEW_KEY "
+                                + "state cannot encounter a node in the 
LOSER_WITH_SAME_KEY state.");
+            case LOSER_POPPED:
+                // this case will only happen during adjustForNextLoop.
+                parentNode.state = State.WINNER_POPPED;
+                parentNode.firstSameKeyIndex = -1;
+                winnerNode.state = State.LOSER_WITH_NEW_KEY;
+                return;
+            default:
+                throw new UnsupportedOperationException(
+                        "unknown state for " + parentNode.state.name());
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        IOException exception = null;
+        for (LeafIterator<T> iterator : leaves) {
+            try {
+                iterator.close();
+            } catch (IOException e) {
+                exception = ExceptionUtils.firstOrSuppressed(e, exception);
+            }
+        }
+        if (exception != null) {
+            throw exception;
+        }
+    }
+
+    /** Leaf node, used to manage {@link RecordReader}. */
+    private static class LeafIterator<T> implements Closeable {
+        /** The reader that reads the batches of records. */
+        private final RecordReader<T> reader;
+
+        /** The iterator used by the current batch. */
+        private RecordReader.RecordIterator<T> iterator;
+
+        /** The current minimum kv. */
+        private T kv;
+
+        /** Mark whether the visit is complete. */
+        private boolean endOfInput;
+
+        /** The index of the first same key that wins. */
+        private int firstSameKeyIndex;
+
+        /** The state of the current node. */
+        private State state;
+
+        private LeafIterator(RecordReader<T> reader) {
+            this.reader = reader;
+            this.endOfInput = false;
+            this.firstSameKeyIndex = -1;
+            this.state = State.WINNER_WITH_NEW_KEY;
+        }
+
+        public T peek() {
+            return kv;
+        }
+
+        public T pop() {
+            this.state = State.WINNER_POPPED;
+            return kv;
+        }
+
+        public void setFirstSameKeyIndex(int index) {
+            if (firstSameKeyIndex == -1) {
+                firstSameKeyIndex = index;
+            }
+        }
+
+        /** Reads the next kv if any, otherwise returns null. */
+        public void advanceIfAvailable() throws IOException {
+            this.firstSameKeyIndex = -1;
+            this.state = State.WINNER_WITH_NEW_KEY;
+            if (iterator == null || (kv = iterator.next()) == null) {
+                while (!endOfInput) {
+                    if (iterator != null) {
+                        iterator.releaseBatch();
+                    }
+
+                    iterator = reader.readBatch();
+                    if (iterator == null) {
+                        endOfInput = true;
+                        kv = null;
+                        reader.close();
+                    } else if ((kv = iterator.next()) != null) {
+                        break;
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void close() throws IOException {
+            if (this.iterator != null) {
+                this.iterator.releaseBatch();
+            }
+            this.reader.close();
+        }
+    }
+
+    /** The state of the node in the loser tree. */
+    private enum State {
+        LOSER_WITH_NEW_KEY(false),
+        LOSER_WITH_SAME_KEY(false),
+        LOSER_POPPED(false),
+        WINNER_WITH_NEW_KEY(true),
+        WINNER_WITH_SAME_KEY(true),
+        WINNER_POPPED(true);
+
+        private final boolean winner;
+
+        State(boolean winner) {
+            this.winner = winner;
+        }
+
+        public boolean isWinner() {
+            return winner;
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
index 4adc1ce03..a4691bcb4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.mergetree.compact;
 
+import org.apache.paimon.CoreOptions.SortEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.compact.CompactResult;
 import org.apache.paimon.data.InternalRow;
@@ -40,16 +41,19 @@ public class MergeTreeCompactRewriter extends 
AbstractCompactRewriter {
     protected final KeyValueFileWriterFactory writerFactory;
     protected final Comparator<InternalRow> keyComparator;
     protected final MergeFunctionFactory<KeyValue> mfFactory;
+    protected final SortEngine sortEngine;
 
     public MergeTreeCompactRewriter(
             KeyValueFileReaderFactory readerFactory,
             KeyValueFileWriterFactory writerFactory,
             Comparator<InternalRow> keyComparator,
-            MergeFunctionFactory<KeyValue> mfFactory) {
+            MergeFunctionFactory<KeyValue> mfFactory,
+            SortEngine sortEngine) {
         this.readerFactory = readerFactory;
         this.writerFactory = writerFactory;
         this.keyComparator = keyComparator;
         this.mfFactory = mfFactory;
+        this.sortEngine = sortEngine;
     }
 
     @Override
@@ -64,7 +68,12 @@ public class MergeTreeCompactRewriter extends 
AbstractCompactRewriter {
                 writerFactory.createRollingMergeTreeFileWriter(outputLevel);
         RecordReader<KeyValue> sectionsReader =
                 MergeTreeReaders.readerForMergeTree(
-                        sections, dropDelete, readerFactory, keyComparator, 
mfFactory.create());
+                        sections,
+                        dropDelete,
+                        readerFactory,
+                        keyComparator,
+                        mfFactory.create(),
+                        sortEngine);
         writer.write(new RecordReaderIterator<>(sectionsReader));
         writer.close();
         return new CompactResult(extractFilesFromSections(sections), 
writer.result());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReader.java
index 897f8d83a..d545efd35 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReader.java
@@ -18,18 +18,13 @@
 
 package org.apache.paimon.mergetree.compact;
 
+import org.apache.paimon.CoreOptions.SortEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.utils.Preconditions;
 
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
-import java.util.PriorityQueue;
 
 /**
  * This reader is to read a list of {@link RecordReader}, which is already 
sorted by key and
@@ -38,168 +33,22 @@ import java.util.PriorityQueue;
  *
  * <p>NOTE: {@link KeyValue}s from the same {@link RecordReader} must not 
contain the same key.
  */
-public class SortMergeReader<T> implements RecordReader<T> {
-
-    private final List<RecordReader<KeyValue>> nextBatchReaders;
-    private final Comparator<InternalRow> userKeyComparator;
-    private final MergeFunctionWrapper<T> mergeFunctionWrapper;
+public interface SortMergeReader<T> extends RecordReader<T> {
 
-    private final PriorityQueue<Element> minHeap;
-    private final List<Element> polled;
-
-    public SortMergeReader(
+    static <T> SortMergeReader<T> createSortMergeReader(
             List<RecordReader<KeyValue>> readers,
             Comparator<InternalRow> userKeyComparator,
-            MergeFunctionWrapper<T> mergeFunctionWrapper) {
-        this.nextBatchReaders = new ArrayList<>(readers);
-        this.userKeyComparator = userKeyComparator;
-        this.mergeFunctionWrapper = mergeFunctionWrapper;
-
-        this.minHeap =
-                new PriorityQueue<>(
-                        (e1, e2) -> {
-                            int result = 
userKeyComparator.compare(e1.kv.key(), e2.kv.key());
-                            if (result != 0) {
-                                return result;
-                            }
-                            return Long.compare(e1.kv.sequenceNumber(), 
e2.kv.sequenceNumber());
-                        });
-        this.polled = new ArrayList<>();
-    }
-
-    @Nullable
-    @Override
-    public RecordIterator<T> readBatch() throws IOException {
-        for (RecordReader<KeyValue> reader : nextBatchReaders) {
-            while (true) {
-                RecordIterator<KeyValue> iterator = reader.readBatch();
-                if (iterator == null) {
-                    // no more batches, permanently remove this reader
-                    reader.close();
-                    break;
-                }
-                KeyValue kv = iterator.next();
-                if (kv == null) {
-                    // empty iterator, clean up and try next batch
-                    iterator.releaseBatch();
-                } else {
-                    // found next kv
-                    minHeap.offer(new Element(kv, iterator, reader));
-                    break;
-                }
-            }
-        }
-        nextBatchReaders.clear();
-
-        return minHeap.isEmpty() ? null : new SortMergeIterator();
-    }
-
-    @Override
-    public void close() throws IOException {
-        for (RecordReader<KeyValue> reader : nextBatchReaders) {
-            reader.close();
-        }
-        for (Element element : minHeap) {
-            element.iterator.releaseBatch();
-            element.reader.close();
-        }
-        for (Element element : polled) {
-            element.iterator.releaseBatch();
-            element.reader.close();
-        }
-    }
-
-    /** The iterator iterates on {@link SortMergeReader}. */
-    private class SortMergeIterator implements RecordIterator<T> {
-
-        private boolean released = false;
-
-        @Override
-        public T next() throws IOException {
-            while (true) {
-                boolean hasMore = nextImpl();
-                if (!hasMore) {
-                    return null;
-                }
-                T result = mergeFunctionWrapper.getResult();
-                if (result != null) {
-                    return result;
-                }
-            }
-        }
-
-        private boolean nextImpl() throws IOException {
-            Preconditions.checkState(
-                    !released, "SortMergeIterator#advanceNext is called after 
release");
-            Preconditions.checkState(
-                    nextBatchReaders.isEmpty(),
-                    "SortMergeIterator#advanceNext is called even if the last 
call returns null. "
-                            + "This is a bug.");
-
-            // add previously polled elements back to priority queue
-            for (Element element : polled) {
-                if (element.update()) {
-                    // still kvs left, add back to priority queue
-                    minHeap.offer(element);
-                } else {
-                    // reach end of batch, clean up
-                    element.iterator.releaseBatch();
-                    nextBatchReaders.add(element.reader);
-                }
-            }
-            polled.clear();
-
-            // there are readers reaching end of batch, so we end current batch
-            if (!nextBatchReaders.isEmpty()) {
-                return false;
-            }
-
-            mergeFunctionWrapper.reset();
-            InternalRow key =
-                    Preconditions.checkNotNull(minHeap.peek(), "Min heap is 
empty. This is a bug.")
-                            .kv
-                            .key();
-
-            // fetch all elements with the same key
-            // note that the same iterator should not produce the same keys, 
so this code is correct
-            while (!minHeap.isEmpty()) {
-                Element element = minHeap.peek();
-                if (userKeyComparator.compare(key, element.kv.key()) != 0) {
-                    break;
-                }
-                minHeap.poll();
-                mergeFunctionWrapper.add(element.kv);
-                polled.add(element);
-            }
-            return true;
-        }
-
-        @Override
-        public void releaseBatch() {
-            released = true;
-        }
-    }
-
-    private static class Element {
-        private KeyValue kv;
-        private final RecordIterator<KeyValue> iterator;
-        private final RecordReader<KeyValue> reader;
-
-        private Element(
-                KeyValue kv, RecordIterator<KeyValue> iterator, 
RecordReader<KeyValue> reader) {
-            this.kv = kv;
-            this.iterator = iterator;
-            this.reader = reader;
-        }
-
-        // IMPORTANT: Must not call this for elements still in priority queue!
-        private boolean update() throws IOException {
-            KeyValue nextKv = iterator.next();
-            if (nextKv == null) {
-                return false;
-            }
-            kv = nextKv;
-            return true;
+            MergeFunctionWrapper<T> mergeFunctionWrapper,
+            SortEngine sortEngine) {
+        switch (sortEngine) {
+            case MIN_HEAP:
+                return new SortMergeReaderWithMinHeap<>(
+                        readers, userKeyComparator, mergeFunctionWrapper);
+            case LOSER_TREE:
+                return new SortMergeReaderWithLoserTree<>(
+                        readers, userKeyComparator, mergeFunctionWrapper);
+            default:
+                throw new UnsupportedOperationException("Unsupported sort 
engine: " + sortEngine);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithLoserTree.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithLoserTree.java
new file mode 100644
index 000000000..f7a20ee67
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithLoserTree.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.utils.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+
+/** {@link SortMergeReader} implemented with loser-tree. */
+public class SortMergeReaderWithLoserTree<T> implements SortMergeReader<T> {
+
+    private final MergeFunctionWrapper<T> mergeFunctionWrapper;
+    private final LoserTree<KeyValue> loserTree;
+
+    public SortMergeReaderWithLoserTree(
+            List<RecordReader<KeyValue>> readers,
+            Comparator<InternalRow> userKeyComparator,
+            MergeFunctionWrapper<T> mergeFunctionWrapper) {
+        this.mergeFunctionWrapper = mergeFunctionWrapper;
+        this.loserTree =
+                new LoserTree<>(
+                        readers,
+                        (e1, e2) -> userKeyComparator.compare(e2.key(), 
e1.key()),
+                        (e1, e2) -> Long.compare(e2.sequenceNumber(), 
e1.sequenceNumber()));
+    }
+
+    /** Compared with heapsort, {@link LoserTree} will only produce one batch. 
*/
+    @Nullable
+    @Override
+    public RecordIterator<T> readBatch() throws IOException {
+        loserTree.initializeIfNeeded();
+        return loserTree.peekWinner() == null ? null : new SortMergeIterator();
+    }
+
+    @Override
+    public void close() throws IOException {
+        loserTree.close();
+    }
+
+    /** The iterator iterates on {@link SortMergeReaderWithLoserTree}. */
+    private class SortMergeIterator implements RecordIterator<T> {
+
+        private boolean released = false;
+
+        @Nullable
+        @Override
+        public T next() throws IOException {
+            while (true) {
+                loserTree.adjustForNextLoop();
+                KeyValue winner = loserTree.popWinner();
+                if (winner == null) {
+                    return null;
+                }
+                mergeFunctionWrapper.reset();
+                mergeFunctionWrapper.add(winner);
+
+                T result = merge();
+                if (result != null) {
+                    return result;
+                }
+            }
+        }
+
+        private T merge() {
+            Preconditions.checkState(
+                    !released, "SortMergeIterator#nextImpl is called after 
release");
+
+            while (loserTree.peekWinner() != null) {
+                mergeFunctionWrapper.add(loserTree.popWinner());
+            }
+            return mergeFunctionWrapper.getResult();
+        }
+
+        @Override
+        public void releaseBatch() {
+            released = true;
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithMinHeap.java
similarity index 93%
copy from 
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReader.java
copy to 
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithMinHeap.java
index 897f8d83a..adca53fdb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithMinHeap.java
@@ -31,14 +31,8 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.PriorityQueue;
 
-/**
- * This reader is to read a list of {@link RecordReader}, which is already 
sorted by key and
- * sequence number, and perform a sort merge algorithm. {@link KeyValue}s with 
the same key will
- * also be combined during sort merging.
- *
- * <p>NOTE: {@link KeyValue}s from the same {@link RecordReader} must not 
contain the same key.
- */
-public class SortMergeReader<T> implements RecordReader<T> {
+/** {@link SortMergeReader} implemented with min-heap. */
+public class SortMergeReaderWithMinHeap<T> implements SortMergeReader<T> {
 
     private final List<RecordReader<KeyValue>> nextBatchReaders;
     private final Comparator<InternalRow> userKeyComparator;
@@ -47,7 +41,7 @@ public class SortMergeReader<T> implements RecordReader<T> {
     private final PriorityQueue<Element> minHeap;
     private final List<Element> polled;
 
-    public SortMergeReader(
+    public SortMergeReaderWithMinHeap(
             List<RecordReader<KeyValue>> readers,
             Comparator<InternalRow> userKeyComparator,
             MergeFunctionWrapper<T> mergeFunctionWrapper) {
@@ -109,7 +103,7 @@ public class SortMergeReader<T> implements RecordReader<T> {
         }
     }
 
-    /** The iterator iterates on {@link SortMergeReader}. */
+    /** The iterator iterates on {@link SortMergeReaderWithMinHeap}. */
     private class SortMergeIterator implements RecordIterator<T> {
 
         private boolean released = false;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
index 3a056fa8a..8512afb5f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.operation;
 
+import org.apache.paimon.CoreOptions.SortEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.KeyValueFileStore;
 import org.apache.paimon.data.InternalRow;
@@ -33,6 +34,7 @@ import org.apache.paimon.mergetree.compact.IntervalPartition;
 import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
 import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
 import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
@@ -53,6 +55,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.CoreOptions.SORT_ENGINE;
 import static org.apache.paimon.io.DataFilePathFactory.CHANGELOG_FILE_PREFIX;
 import static org.apache.paimon.predicate.PredicateBuilder.containsFields;
 import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
@@ -65,6 +68,7 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
     private final Comparator<InternalRow> keyComparator;
     private final MergeFunctionFactory<KeyValue> mfFactory;
     private final boolean valueCountMode;
+    private final SortEngine sortEngine;
 
     @Nullable private int[][] keyProjectedFields;
 
@@ -99,6 +103,7 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
         this.keyComparator = keyComparator;
         this.mfFactory = mfFactory;
         this.valueCountMode = tableSchema.trimmedPrimaryKeys().isEmpty();
+        this.sortEngine = 
Options.fromMap(tableSchema.options()).get(SORT_ENGINE);
     }
 
     public KeyValueFileStoreRead withKeyProjection(int[][] projectedFields) {
@@ -196,7 +201,8 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
                                                 ? overlappedSectionFactory
                                                 : nonOverlappedSectionFactory,
                                         keyComparator,
-                                        mergeFuncWrapper));
+                                        mergeFuncWrapper,
+                                        sortEngine));
             }
             DropDeleteReader reader =
                     new 
DropDeleteReader(ConcatRecordReader.create(sectionReaders));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 72af8fb57..b346168a4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -210,14 +210,24 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                         readerFactory,
                         writerFactory,
                         keyComparator,
-                        mfFactory);
+                        mfFactory,
+                        options.sortEngine());
             case LOOKUP:
                 LookupLevels lookupLevels = createLookupLevels(levels, 
readerFactory);
                 return new LookupMergeTreeCompactRewriter(
-                        lookupLevels, readerFactory, writerFactory, 
keyComparator, mfFactory);
+                        lookupLevels,
+                        readerFactory,
+                        writerFactory,
+                        keyComparator,
+                        mfFactory,
+                        options.sortEngine());
             default:
                 return new MergeTreeCompactRewriter(
-                        readerFactory, writerFactory, keyComparator, 
mfFactory);
+                        readerFactory,
+                        writerFactory,
+                        keyComparator,
+                        mfFactory,
+                        options.sortEngine());
         }
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
similarity index 95%
rename from 
paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 3a2ce21cc..5538928e9 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.mergetree;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.ChangelogProducer;
+import org.apache.paimon.CoreOptions.SortEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.compact.CompactResult;
 import org.apache.paimon.data.BinaryRow;
@@ -85,7 +86,7 @@ import static java.util.Collections.singletonList;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link MergeTreeReaders} and {@link MergeTreeWriter}. */
-public class MergeTreeTest {
+public abstract class MergeTreeTestBase {
 
     @TempDir java.nio.file.Path tempDir;
     private static ExecutorService service;
@@ -99,12 +100,14 @@ public class MergeTreeTest {
     private KeyValueFileWriterFactory writerFactory;
     private KeyValueFileWriterFactory compactWriterFactory;
     private RecordWriter<KeyValue> writer;
+    private SortEngine sortEngine;
 
     @BeforeEach
     public void beforeEach() throws IOException {
         path = new Path(tempDir.toString());
         pathFactory = new FileStorePathFactory(path);
         comparator = Comparator.comparingInt(o -> o.getInt(0));
+        sortEngine = getSortEngine();
         recreateMergeTree(1024 * 1024);
         Path bucketDir = 
writerFactory.pathFactory().toPath("ignore").getParent();
         LocalFileIO.create().mkdirs(bucketDir);
@@ -433,7 +436,8 @@ public class MergeTreeTest {
                         dropDelete,
                         readerFactory,
                         comparator,
-                        DeduplicateMergeFunction.factory().create());
+                        DeduplicateMergeFunction.factory().create(),
+                        sortEngine);
         List<TestRecord> records = new ArrayList<>();
         try (RecordReaderIterator<KeyValue> iterator = new 
RecordReaderIterator<>(reader)) {
             while (iterator.hasNext()) {
@@ -462,6 +466,8 @@ public class MergeTreeTest {
         return records;
     }
 
+    protected abstract SortEngine getSortEngine();
+
     private class TestRewriter extends AbstractCompactRewriter {
 
         @Override
@@ -476,7 +482,8 @@ public class MergeTreeTest {
                             dropDelete,
                             compactReaderFactory,
                             comparator,
-                            DeduplicateMergeFunction.factory().create());
+                            DeduplicateMergeFunction.factory().create(),
+                            sortEngine);
             writer.write(new RecordReaderIterator<>(sectionsReader));
             writer.close();
             return new CompactResult(extractFilesFromSections(sections), 
writer.result());
@@ -512,4 +519,22 @@ public class MergeTreeTest {
             return "TestRecord{" + "kind=" + kind + ", k=" + k + ", v=" + v + 
'}';
         }
     }
+
+    /** {@link MergeTreeTestBase} with {@link SortEngine#LOSER_TREE}. */
+    public static class MergeTreeTestWithLoserTree extends MergeTreeTestBase {
+
+        @Override
+        protected SortEngine getSortEngine() {
+            return SortEngine.LOSER_TREE;
+        }
+    }
+
+    /** {@link MergeTreeTestBase} with {@link SortEngine#MIN_HEAP}. */
+    public static class MergeTreeTestWithMinHeap extends MergeTreeTestBase {
+
+        @Override
+        protected SortEngine getSortEngine() {
+            return SortEngine.MIN_HEAP;
+        }
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/CombiningRecordReaderTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/CombiningRecordReaderTestBase.java
index 0012c54aa..cddb0cbde 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/CombiningRecordReaderTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/CombiningRecordReaderTestBase.java
@@ -18,13 +18,15 @@
 
 package org.apache.paimon.mergetree.compact;
 
+import org.apache.paimon.CoreOptions.SortEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.codegen.RecordComparator;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.utils.ReusingTestData;
 import org.apache.paimon.utils.TestReusingRecordReader;
 
-import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -47,11 +49,14 @@ public abstract class CombiningRecordReaderTestBase {
     protected abstract List<ReusingTestData> getExpected(List<ReusingTestData> 
input);
 
     protected abstract RecordReader<KeyValue> createRecordReader(
-            List<TestReusingRecordReader> readers);
+            List<TestReusingRecordReader> readers, SortEngine sortEngine);
 
-    @RepeatedTest(100)
-    public void testRandom() throws IOException {
-        runTest(generateRandomData());
+    @ParameterizedTest
+    @EnumSource(SortEngine.class)
+    public void testRandom(SortEngine sortEngine) throws IOException {
+        for (int i = 0; i < 100; i++) {
+            runTest(generateRandomData(), sortEngine);
+        }
     }
 
     protected List<List<ReusingTestData>> parseData(String... stringsData) {
@@ -74,7 +79,8 @@ public abstract class CombiningRecordReaderTestBase {
         return readersData;
     }
 
-    protected void runTest(List<List<ReusingTestData>> readersData) throws 
IOException {
+    protected void runTest(List<List<ReusingTestData>> readersData, SortEngine 
sortEngine)
+            throws IOException {
         Iterator<ReusingTestData> expectedIterator =
                 getExpected(
                                 readersData.stream()
@@ -85,7 +91,7 @@ public abstract class CombiningRecordReaderTestBase {
         for (List<ReusingTestData> readerData : readersData) {
             readers.add(new TestReusingRecordReader(readerData));
         }
-        RecordReader<KeyValue> recordReader = createRecordReader(readers);
+        RecordReader<KeyValue> recordReader = createRecordReader(readers, 
sortEngine);
 
         RecordReader.RecordIterator<KeyValue> batch;
         while ((batch = recordReader.readBatch()) != null) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ConcatRecordReaderTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ConcatRecordReaderTest.java
index 5afaecc2d..6b4a85c0c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ConcatRecordReaderTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ConcatRecordReaderTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.mergetree.compact;
 
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.SortEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.utils.ReusingTestData;
@@ -43,7 +45,8 @@ public class ConcatRecordReaderTest extends 
CombiningRecordReaderTestBase {
     }
 
     @Override
-    protected RecordReader<KeyValue> 
createRecordReader(List<TestReusingRecordReader> readers) {
+    protected RecordReader<KeyValue> createRecordReader(
+            List<TestReusingRecordReader> readers, SortEngine sortEngine) {
         return new ConcatRecordReader(
                 readers.stream()
                         .map(r -> (ConcatRecordReader.ReaderSupplier) () -> r)
@@ -65,4 +68,8 @@ public class ConcatRecordReaderTest extends 
CombiningRecordReaderTestBase {
                         "",
                         " 12, 60, +, 1200 |  14, 70, -, 1400 |  16, 80, +, 
1600 |  18, 90, -, 1800"));
     }
+
+    private void runTest(List<List<ReusingTestData>> readersData) throws 
IOException {
+        runTest(readersData, CoreOptions.SORT_ENGINE.defaultValue());
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LoserTreeTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LoserTreeTest.java
new file mode 100644
index 000000000..d1547ed02
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LoserTreeTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.utils.ReusingTestData;
+import org.apache.paimon.utils.TestReusingRecordReader;
+
+import org.junit.jupiter.api.RepeatedTest;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link LoserTree}. */
+public class LoserTreeTest {
+    private static final Comparator<KeyValue> KEY_COMPARATOR =
+            Comparator.comparingInt(o -> o.key().getInt(0));
+    private static final Comparator<KeyValue> SEQUENCE_COMPARATOR =
+            Comparator.comparingLong(KeyValue::sequenceNumber);
+
+    @RepeatedTest(100)
+    public void testLoserTreeIsOrdered() throws IOException {
+        List<ReusingTestData> reusingTestData = 
ReusingTestData.generateData(1000, false);
+        List<RecordReader<KeyValue>> sortedTestReaders = new ArrayList<>();
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int numberReaders = random.nextInt(20) + 1;
+        int lowerBound = 0, upperBound = reusingTestData.size();
+        for (int i = 0; i < numberReaders; i++) {
+            int subUpperBound = random.nextInt(lowerBound, upperBound);
+            List<ReusingTestData> subReusingTestData =
+                    reusingTestData.subList(lowerBound, subUpperBound);
+            Collections.sort(subReusingTestData);
+            sortedTestReaders.add(new 
TestReusingRecordReader(subReusingTestData));
+            lowerBound = subUpperBound;
+        }
+        Collections.sort(reusingTestData);
+        checkLoserTree(sortedTestReaders, reusingTestData);
+    }
+
+    private void checkLoserTree(
+            List<RecordReader<KeyValue>> sortedTestReaders, 
List<ReusingTestData> expectedData)
+            throws IOException {
+        try (LoserTree<KeyValue> loserTree =
+                new LoserTree<>(sortedTestReaders, KEY_COMPARATOR, 
SEQUENCE_COMPARATOR)) {
+            Iterator<ReusingTestData> expectedIterator = 
expectedData.iterator();
+            do {
+                loserTree.adjustForNextLoop();
+                for (KeyValue winner = loserTree.popWinner();
+                        winner != null;
+                        winner = loserTree.popWinner()) {
+                    assertThat(expectedIterator.hasNext());
+                    expectedIterator.next().assertEquals(winner);
+                }
+            } while (loserTree.peekWinner() != null && 
expectedIterator.hasNext());
+        }
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
index d0200c6ab..e19b8113e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
@@ -18,12 +18,15 @@
 
 package org.apache.paimon.mergetree.compact;
 
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.SortEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.utils.ReusingTestData;
 import org.apache.paimon.utils.TestReusingRecordReader;
 
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -35,43 +38,53 @@ public abstract class SortMergeReaderTestBase extends 
CombiningRecordReaderTestB
     protected abstract MergeFunction<KeyValue> createMergeFunction();
 
     @Override
-    protected RecordReader<KeyValue> 
createRecordReader(List<TestReusingRecordReader> readers) {
-        return new SortMergeReader<>(
+    protected RecordReader<KeyValue> createRecordReader(
+            List<TestReusingRecordReader> readers, CoreOptions.SortEngine 
sortEngine) {
+        return SortMergeReader.createSortMergeReader(
                 new ArrayList<>(readers),
                 KEY_COMPARATOR,
-                new ReducerMergeFunctionWrapper(createMergeFunction()));
+                new ReducerMergeFunctionWrapper(createMergeFunction()),
+                sortEngine);
     }
 
-    @Test
-    public void testEmpty() throws IOException {
-        runTest(parseData(""));
-        runTest(parseData("", "", ""));
+    @ParameterizedTest
+    @EnumSource(SortEngine.class)
+    public void testEmpty(SortEngine sortEngine) throws IOException {
+        runTest(parseData(""), sortEngine);
+        runTest(parseData("", "", ""), sortEngine);
     }
 
-    @Test
-    public void testAlternateKeys() throws IOException {
+    @ParameterizedTest
+    @EnumSource(SortEngine.class)
+    public void testAlternateKeys(SortEngine sortEngine) throws IOException {
         runTest(
                 parseData(
                         "1, 1, +, 100 | 3, 2, +, 300 | 5, 3, +, 200 | 7, 4, +, 
600 | 9, 20, +, 400",
                         "0, 5, +, 0",
                         "0, 10, +, 0",
                         "",
-                        "2, 6, +, 200 | 4, 7, +, 400 | 6, 8, +, 600 | 8, 9, +, 
800"));
+                        "2, 6, +, 200 | 4, 7, +, 400 | 6, 8, +, 600 | 8, 9, +, 
800"),
+                sortEngine);
     }
 
-    @Test
-    public void testDuplicateKeys() throws IOException {
-        runTest(parseData("1, 1, +, 100 | 3, 3, +, 300", "1, 4, +, 200 | 3, 5, 
+, 300"));
+    @ParameterizedTest
+    @EnumSource(SortEngine.class)
+    public void testDuplicateKeys(SortEngine sortEngine) throws IOException {
+        runTest(
+                parseData("1, 1, +, 100 | 3, 3, +, 300", "1, 4, +, 200 | 3, 5, 
+, 300"),
+                sortEngine);
     }
 
-    @Test
-    public void testLongTailRecords() throws IOException {
+    @ParameterizedTest
+    @EnumSource(SortEngine.class)
+    public void testLongTailRecords(SortEngine sortEngine) throws IOException {
         runTest(
                 parseData(
                         "1, 1, +, 100 | 2, 500, +, 200",
                         "1, 3, +, 100 | 3, 4, +, 300 | 5, 501, +, 500 | 7, 
503, +, 700 | "
                                 + "8, 504, +, 800 | 9, 505, +, 900 | 10, 506, 
+, 1000 | "
-                                + "11, 507, +, 1100 | 12, 508, +, 1200 | 13, 
509, +, 1300"));
+                                + "11, 507, +, 1100 | 12, 508, +, 1200 | 13, 
509, +, 1300"),
+                sortEngine);
     }
 
     /** Tests for {@link SortMergeReader} with {@link 
DeduplicateMergeFunction}. */
@@ -111,8 +124,9 @@ public abstract class SortMergeReaderTestBase extends 
CombiningRecordReaderTestB
             return new ValueCountMergeFunction();
         }
 
-        @Test
-        public void testCancelingRecords() throws IOException {
+        @ParameterizedTest
+        @EnumSource(SortEngine.class)
+        public void testCancelingRecords(SortEngine sortEngine) throws 
IOException {
             runTest(
                     parseData(
                             "1, 1, +, 100 | 3, 5, +, -300 | 5, 300, +, 300",
@@ -120,8 +134,9 @@ public abstract class SortMergeReaderTestBase extends 
CombiningRecordReaderTestB
                             "1, 4, +, -200 | 3, 3, +, 300",
                             "5, 100, +, -200 | 7, 123, +, -500",
                             "7, 321, +, 200",
-                            "7, 456, +, 300"));
-            runTest(parseData("1, 2, +, 100", "1, 1, +, -100"));
+                            "7, 456, +, 300"),
+                    sortEngine);
+            runTest(parseData("1, 2, +, 100", "1, 1, +, -100"), sortEngine);
         }
     }
 }

Reply via email to