This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5edf3a965 [Feature] Optimize SortMergeReader: use loser tree to
reduce comparisons (#833)
5edf3a965 is described below
commit 5edf3a9658e6bb927b5c53cc16dbaa713541c3e9
Author: liming.1018 <[email protected]>
AuthorDate: Thu Apr 27 17:18:30 2023 +0800
[Feature] Optimize SortMergeReader: use loser tree to reduce comparisons
(#833)
---
.../shortcodes/generated/core_configuration.html | 6 +
.../main/java/org/apache/paimon/CoreOptions.java | 36 +++
.../apache/paimon/mergetree/MergeTreeReaders.java | 13 +-
.../compact/ChangelogMergeTreeRewriter.java | 13 +-
.../FullChangelogMergeTreeCompactRewriter.java | 6 +-
.../compact/LookupMergeTreeCompactRewriter.java | 6 +-
.../apache/paimon/mergetree/compact/LoserTree.java | 354 +++++++++++++++++++++
.../compact/MergeTreeCompactRewriter.java | 13 +-
.../paimon/mergetree/compact/SortMergeReader.java | 179 +----------
.../compact/SortMergeReaderWithLoserTree.java | 102 ++++++
...Reader.java => SortMergeReaderWithMinHeap.java} | 14 +-
.../paimon/operation/KeyValueFileStoreRead.java | 8 +-
.../paimon/operation/KeyValueFileStoreWrite.java | 16 +-
.../{MergeTreeTest.java => MergeTreeTestBase.java} | 31 +-
.../compact/CombiningRecordReaderTestBase.java | 20 +-
.../mergetree/compact/ConcatRecordReaderTest.java | 9 +-
.../paimon/mergetree/compact/LoserTreeTest.java | 81 +++++
.../mergetree/compact/SortMergeReaderTestBase.java | 57 ++--
18 files changed, 739 insertions(+), 225 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 1931791e7..16138d13b 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -188,6 +188,12 @@
<td><p>Enum</p></td>
<td>Specify the merge engine for table with primary key.<br /><br
/>Possible values:<ul><li>"deduplicate": De-duplicate and keep the last
row.</li><li>"partial-update": Partial update non-null
fields.</li><li>"aggregation": Aggregate fields with same primary
key.</li></ul></td>
</tr>
+ <tr>
+ <td><h5>sort-engine</h5></td>
+ <td style="word-wrap: break-word;">loser-tree</td>
+ <td><p>Enum</p></td>
+ <td>Specify the sort engine for table with primary key.<br /><br
/>Possible values:<ul><li>"min-heap": Use min-heap for multiway
sorting.</li><li>"loser-tree": Use loser-tree for multiway sorting. Compared
with heapsort, loser-tree has fewer comparisons and is more
efficient.</li></ul></td>
+ </tr>
<tr>
<td><h5>num-levels</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index e034402eb..dc2e310c8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -186,6 +186,12 @@ public class CoreOptions implements Serializable {
.defaultValue(false)
.withDescription("Whether to ignore delete records in
partial-update mode.");
+ public static final ConfigOption<SortEngine> SORT_ENGINE =
+ key("sort-engine")
+ .enumType(SortEngine.class)
+ .defaultValue(SortEngine.LOSER_TREE)
+ .withDescription("Specify the sort engine for table with
primary key.");
+
@Immutable
public static final ConfigOption<WriteMode> WRITE_MODE =
key("write-mode")
@@ -706,6 +712,10 @@ public class CoreOptions implements Serializable {
return options.get(MERGE_ENGINE);
}
+ public SortEngine sortEngine() {
+ return options.get(SORT_ENGINE);
+ }
+
public long splitTargetSize() {
return options.get(SOURCE_SPLIT_TARGET_SIZE).getBytes();
}
@@ -1191,4 +1201,30 @@ public class CoreOptions implements Serializable {
}
return immutableKeys;
}
+
+ /** Specifies the sort engine for table with primary key. */
+ public enum SortEngine implements DescribedEnum {
+ MIN_HEAP("min-heap", "Use min-heap for multiway sorting."),
+ LOSER_TREE(
+ "loser-tree",
+ "Use loser-tree for multiway sorting. Compared with heapsort,
loser-tree has fewer comparisons and is more efficient.");
+
+ private final String value;
+ private final String description;
+
+ SortEngine(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
index 624aca091..ed2468056 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
@@ -18,6 +18,7 @@
package org.apache.paimon.mergetree;
+import org.apache.paimon.CoreOptions.SortEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
@@ -45,7 +46,8 @@ public class MergeTreeReaders {
boolean dropDelete,
KeyValueFileReaderFactory readerFactory,
Comparator<InternalRow> userKeyComparator,
- MergeFunction<KeyValue> mergeFunction)
+ MergeFunction<KeyValue> mergeFunction,
+ SortEngine sortEngine)
throws IOException {
List<ConcatRecordReader.ReaderSupplier<KeyValue>> readers = new
ArrayList<>();
for (List<SortedRun> section : sections) {
@@ -55,7 +57,8 @@ public class MergeTreeReaders {
section,
readerFactory,
userKeyComparator,
- new
ReducerMergeFunctionWrapper(mergeFunction)));
+ new
ReducerMergeFunctionWrapper(mergeFunction),
+ sortEngine));
}
RecordReader<KeyValue> reader = ConcatRecordReader.create(readers);
if (dropDelete) {
@@ -68,13 +71,15 @@ public class MergeTreeReaders {
List<SortedRun> section,
KeyValueFileReaderFactory readerFactory,
Comparator<InternalRow> userKeyComparator,
- MergeFunctionWrapper<KeyValue> mergeFunctionWrapper)
+ MergeFunctionWrapper<KeyValue> mergeFunctionWrapper,
+ SortEngine sortEngine)
throws IOException {
List<RecordReader<KeyValue>> readers = readerForSection(section,
readerFactory);
if (readers.size() == 1) {
return readers.get(0);
} else {
- return new SortMergeReader<>(readers, userKeyComparator,
mergeFunctionWrapper);
+ return SortMergeReader.createSortMergeReader(
+ readers, userKeyComparator, mergeFunctionWrapper,
sortEngine);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index 4e704db5b..fbfa81d61 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -18,6 +18,7 @@
package org.apache.paimon.mergetree.compact;
+import org.apache.paimon.CoreOptions.SortEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.data.InternalRow;
@@ -42,8 +43,9 @@ public abstract class ChangelogMergeTreeRewriter extends
MergeTreeCompactRewrite
KeyValueFileReaderFactory readerFactory,
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
- MergeFunctionFactory<KeyValue> mfFactory) {
- super(readerFactory, writerFactory, keyComparator, mfFactory);
+ MergeFunctionFactory<KeyValue> mfFactory,
+ SortEngine sortEngine) {
+ super(readerFactory, writerFactory, keyComparator, mfFactory,
sortEngine);
}
protected abstract boolean rewriteChangelog(
@@ -71,8 +73,11 @@ public abstract class ChangelogMergeTreeRewriter extends
MergeTreeCompactRewrite
() -> {
List<RecordReader<KeyValue>> runReaders =
MergeTreeReaders.readerForSection(section,
readerFactory);
- return new SortMergeReader<>(
- runReaders, keyComparator,
createMergeWrapper(outputLevel));
+ return SortMergeReader.createSortMergeReader(
+ runReaders,
+ keyComparator,
+ createMergeWrapper(outputLevel),
+ sortEngine);
});
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
index 207f1c258..9e9d8b0a7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
@@ -18,6 +18,7 @@
package org.apache.paimon.mergetree.compact;
+import org.apache.paimon.CoreOptions.SortEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
@@ -40,8 +41,9 @@ public class FullChangelogMergeTreeCompactRewriter extends
ChangelogMergeTreeRew
KeyValueFileReaderFactory readerFactory,
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
- MergeFunctionFactory<KeyValue> mfFactory) {
- super(readerFactory, writerFactory, keyComparator, mfFactory);
+ MergeFunctionFactory<KeyValue> mfFactory,
+ SortEngine sortEngine) {
+ super(readerFactory, writerFactory, keyComparator, mfFactory,
sortEngine);
this.maxLevel = maxLevel;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index c83f0ca4d..e799fe73d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -18,6 +18,7 @@
package org.apache.paimon.mergetree.compact;
+import org.apache.paimon.CoreOptions.SortEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
@@ -44,8 +45,9 @@ public class LookupMergeTreeCompactRewriter extends
ChangelogMergeTreeRewriter {
KeyValueFileReaderFactory readerFactory,
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
- MergeFunctionFactory<KeyValue> mfFactory) {
- super(readerFactory, writerFactory, keyComparator, mfFactory);
+ MergeFunctionFactory<KeyValue> mfFactory,
+ SortEngine sortEngine) {
+ super(readerFactory, writerFactory, keyComparator, mfFactory,
sortEngine);
this.lookupLevels = lookupLevels;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LoserTree.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LoserTree.java
new file mode 100644
index 000000000..b35be96f9
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LoserTree.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.utils.ExceptionUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * A variant of the loser tree. In the LSM-Tree architecture, there will be
duplicate Keys in
+ * multiple {@link RecordReader}, and these Keys need to be merged. In the
loser tree, we return in
+ * the order of the Keys, but because the returned objects may be reused in
the {@link RecordReader}
+ * or the {@link MergeFunction}, for a single {@link RecordReader}, we cannot
get the next Key
+ * immediately after returning a Key, and we need to wait until the same Key
in all {@link
+ * RecordReader} is returned before proceeding to the next Key.
+ *
+ * <p>The process of building the loser tree is the same as a regular loser
tree. The difference is
+ * that in the process of adjusting the tree, we need to record the index of
the same key and the
+ * state of the winner/loser for subsequent quick adjustment of the position
of the winner.
+ *
+ * <p>Detailed design can refer to
https://cwiki.apache.org/confluence/x/9Ak0Dw.
+ */
+public class LoserTree<T> implements Closeable {
+ private final int[] tree;
+ private final int size;
+ private final List<LeafIterator<T>> leaves;
+
+ /**
+ * if comparator.compare('a', 'b') > 0, then 'a' is the winner. In the
following implementation,
+ * we always let 'a' represent the parent node.
+ */
+ private final Comparator<T> firstComparator;
+
+ /** same as firstComparator, but mainly used to compare sequenceNumber. */
+ private final Comparator<T> secondComparator;
+
+ private boolean initialized;
+
+ public LoserTree(
+ List<RecordReader<T>> nextBatchReaders,
+ Comparator<T> firstComparator,
+ Comparator<T> secondComparator) {
+ this.size = nextBatchReaders.size();
+ this.leaves = new ArrayList<>(size);
+ this.tree = new int[size];
+ // if e1 and e2 are both null, it doesn't matter who becomes the new
winner. But if
+ // firstComparator returns 0, it means that secondComparator must be
used to compare again.
+ this.firstComparator =
+ (e1, e2) -> e1 == null ? -1 : (e2 == null ? 1 :
firstComparator.compare(e1, e2));
+ this.secondComparator =
+ (e1, e2) -> e1 == null ? -1 : (e2 == null ? 1 :
secondComparator.compare(e1, e2));
+ this.initialized = false;
+
+ for (RecordReader<T> reader : nextBatchReaders) {
+ LeafIterator<T> iterator = new LeafIterator<>(reader);
+ this.leaves.add(iterator);
+ }
+ }
+
+ /** Initialize the loser tree in the same way as the regular loser tree. */
+ public void initializeIfNeeded() throws IOException {
+ if (!initialized) {
+ Arrays.fill(tree, -1);
+ for (int i = size - 1; i >= 0; i--) {
+ leaves.get(i).advanceIfAvailable();
+ adjust(i);
+ }
+ initialized = true;
+ }
+ }
+
+ /** Adjust the Key that needs to be returned in the next round. */
+ public void adjustForNextLoop() throws IOException {
+ LeafIterator<T> winner = leaves.get(tree[0]);
+ while (winner.state == State.WINNER_POPPED) {
+ winner.advanceIfAvailable();
+ adjust(tree[0]);
+ winner = leaves.get(tree[0]);
+ }
+ }
+
+ /** Pop the current winner and update its state to {@link
State#WINNER_POPPED}. */
+ public T popWinner() {
+ LeafIterator<T> winner = leaves.get(tree[0]);
+ if (winner.state == State.WINNER_POPPED) {
+ // if the winner has already been popped, it means that all the
same key has been
+ // processed.
+ return null;
+ }
+ T result = winner.pop();
+ adjust(tree[0]);
+ return result;
+ }
+
+ /** Peek the current winner, mainly for key comparisons. */
+ public T peekWinner() {
+ return leaves.get(tree[0]).state != State.WINNER_POPPED ?
leaves.get(tree[0]).peek() : null;
+ }
+
+ /**
+ * Adjust the winner from bottom to top. Using different {@link State}, we
can quickly compare
+ * whether all the current same keys have been processed.
+ */
+ private void adjust(int winner) {
+ for (int parent = (winner + this.size) / 2; parent > 0 && winner >= 0;
parent /= 2) {
+ LeafIterator<T> winnerNode = leaves.get(winner);
+ LeafIterator<T> parentNode;
+ if (this.tree[parent] == -1) {
+ // initialize the tree.
+ winnerNode.state = State.LOSER_WITH_NEW_KEY;
+ } else {
+ parentNode = leaves.get(this.tree[parent]);
+ switch (winnerNode.state) {
+ case WINNER_WITH_NEW_KEY:
+ adjustWithNewWinnerKey(parent, parentNode, winnerNode);
+ break;
+ case WINNER_WITH_SAME_KEY:
+ adjustWithSameWinnerKey(parent, parentNode,
winnerNode);
+ break;
+ case WINNER_POPPED:
+ if (winnerNode.firstSameKeyIndex < 0) {
+ // fast path, which means that the same key is not
yet processed in the
+ // current tree.
+ parent = -1;
+ } else {
+ // fast path. Directly exchange positions with the
same key that has not
+ // yet been processed, no need to compare level by
level.
+ parent = winnerNode.firstSameKeyIndex;
+ parentNode = leaves.get(this.tree[parent]);
+ winnerNode.state = State.LOSER_POPPED;
+ parentNode.state = State.WINNER_WITH_SAME_KEY;
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "unknown state for " +
winnerNode.state.name());
+ }
+ }
+
+ // if the winner loses, exchange nodes.
+ if (!winnerNode.state.isWinner()) {
+ int tmp = winner;
+ winner = this.tree[parent];
+ this.tree[parent] = tmp;
+ }
+ }
+ this.tree[0] = winner;
+ }
+
+ /** The winner node has the same userKey as the global winner. */
+ private void adjustWithSameWinnerKey(
+ int index, LeafIterator<T> parentNode, LeafIterator<T> winnerNode)
{
+ switch (parentNode.state) {
+ case LOSER_WITH_SAME_KEY:
+ // the key of the previous loser is the same as the key of the
current winner,
+ // only the sequence needs to be compared.
+ T parentKey = parentNode.peek();
+ T childKey = winnerNode.peek();
+ int secondResult = secondComparator.compare(parentKey,
childKey);
+ if (secondResult > 0) {
+ parentNode.state = State.WINNER_WITH_SAME_KEY;
+ winnerNode.state = State.LOSER_WITH_SAME_KEY;
+ parentNode.setFirstSameKeyIndex(index);
+ } else {
+ winnerNode.setFirstSameKeyIndex(index);
+ }
+ return;
+ case LOSER_WITH_NEW_KEY:
+ case LOSER_POPPED:
+ return;
+ default:
+ throw new UnsupportedOperationException(
+ "unknown state for " + parentNode.state.name());
+ }
+ }
+
+ /**
+ * The userKey of the new local winner node is different from that of the
previous global
+ * winner.
+ */
+ private void adjustWithNewWinnerKey(
+ int index, LeafIterator<T> parentNode, LeafIterator<T> winnerNode)
{
+ switch (parentNode.state) {
+ case LOSER_WITH_NEW_KEY:
+ // when the new winner is also a new key, it needs to be
compared.
+ T parentKey = parentNode.peek();
+ T childKey = winnerNode.peek();
+ int firstResult = firstComparator.compare(parentKey, childKey);
+ if (firstResult == 0) {
+ // if the compared keys are the same, we need to update
the state of the node
+ // and record the index of the same key for the winner.
+ int secondResult = secondComparator.compare(parentKey,
childKey);
+ if (secondResult < 0) {
+ parentNode.state = State.LOSER_WITH_SAME_KEY;
+ winnerNode.setFirstSameKeyIndex(index);
+ } else {
+ winnerNode.state = State.LOSER_WITH_SAME_KEY;
+ parentNode.state = State.WINNER_WITH_NEW_KEY;
+ parentNode.setFirstSameKeyIndex(index);
+ }
+ } else if (firstResult > 0) {
+ // the two keys are completely different and just need to
update the state.
+ parentNode.state = State.WINNER_WITH_NEW_KEY;
+ winnerNode.state = State.LOSER_WITH_NEW_KEY;
+ }
+ return;
+ case LOSER_WITH_SAME_KEY:
+ // A node in the WINNER_WITH_NEW_KEY state cannot encounter a
node in the
+ // LOSER_WITH_SAME_KEY state.
+ throw new RuntimeException(
+ "This is a bug. Please file an issue. A node in the
WINNER_WITH_NEW_KEY "
+ + "state cannot encounter a node in the
LOSER_WITH_SAME_KEY state.");
+ case LOSER_POPPED:
+ // this case will only happen during adjustForNextLoop.
+ parentNode.state = State.WINNER_POPPED;
+ parentNode.firstSameKeyIndex = -1;
+ winnerNode.state = State.LOSER_WITH_NEW_KEY;
+ return;
+ default:
+ throw new UnsupportedOperationException(
+ "unknown state for " + parentNode.state.name());
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOException exception = null;
+ for (LeafIterator<T> iterator : leaves) {
+ try {
+ iterator.close();
+ } catch (IOException e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+ }
+ if (exception != null) {
+ throw exception;
+ }
+ }
+
+ /** Leaf node, used to manage {@link RecordReader}. */
+ private static class LeafIterator<T> implements Closeable {
+ /** The reader that reads the batches of records. */
+ private final RecordReader<T> reader;
+
+ /** The iterator used by the current batch. */
+ private RecordReader.RecordIterator<T> iterator;
+
+ /** The current minimum kv. */
+ private T kv;
+
+ /** Mark whether the visit is complete. */
+ private boolean endOfInput;
+
+ /** The index of the first same key that wins. */
+ private int firstSameKeyIndex;
+
+ /** The state of the current node. */
+ private State state;
+
+ private LeafIterator(RecordReader<T> reader) {
+ this.reader = reader;
+ this.endOfInput = false;
+ this.firstSameKeyIndex = -1;
+ this.state = State.WINNER_WITH_NEW_KEY;
+ }
+
+ public T peek() {
+ return kv;
+ }
+
+ public T pop() {
+ this.state = State.WINNER_POPPED;
+ return kv;
+ }
+
+ public void setFirstSameKeyIndex(int index) {
+ if (firstSameKeyIndex == -1) {
+ firstSameKeyIndex = index;
+ }
+ }
+
+ /** Reads the next kv if any, otherwise returns null. */
+ public void advanceIfAvailable() throws IOException {
+ this.firstSameKeyIndex = -1;
+ this.state = State.WINNER_WITH_NEW_KEY;
+ if (iterator == null || (kv = iterator.next()) == null) {
+ while (!endOfInput) {
+ if (iterator != null) {
+ iterator.releaseBatch();
+ }
+
+ iterator = reader.readBatch();
+ if (iterator == null) {
+ endOfInput = true;
+ kv = null;
+ reader.close();
+ } else if ((kv = iterator.next()) != null) {
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.iterator != null) {
+ this.iterator.releaseBatch();
+ }
+ this.reader.close();
+ }
+ }
+
+ /** The state of the node in the loser tree. */
+ private enum State {
+ LOSER_WITH_NEW_KEY(false),
+ LOSER_WITH_SAME_KEY(false),
+ LOSER_POPPED(false),
+ WINNER_WITH_NEW_KEY(true),
+ WINNER_WITH_SAME_KEY(true),
+ WINNER_POPPED(true);
+
+ private final boolean winner;
+
+ State(boolean winner) {
+ this.winner = winner;
+ }
+
+ public boolean isWinner() {
+ return winner;
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
index 4adc1ce03..a4691bcb4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
@@ -18,6 +18,7 @@
package org.apache.paimon.mergetree.compact;
+import org.apache.paimon.CoreOptions.SortEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.data.InternalRow;
@@ -40,16 +41,19 @@ public class MergeTreeCompactRewriter extends
AbstractCompactRewriter {
protected final KeyValueFileWriterFactory writerFactory;
protected final Comparator<InternalRow> keyComparator;
protected final MergeFunctionFactory<KeyValue> mfFactory;
+ protected final SortEngine sortEngine;
public MergeTreeCompactRewriter(
KeyValueFileReaderFactory readerFactory,
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
- MergeFunctionFactory<KeyValue> mfFactory) {
+ MergeFunctionFactory<KeyValue> mfFactory,
+ SortEngine sortEngine) {
this.readerFactory = readerFactory;
this.writerFactory = writerFactory;
this.keyComparator = keyComparator;
this.mfFactory = mfFactory;
+ this.sortEngine = sortEngine;
}
@Override
@@ -64,7 +68,12 @@ public class MergeTreeCompactRewriter extends
AbstractCompactRewriter {
writerFactory.createRollingMergeTreeFileWriter(outputLevel);
RecordReader<KeyValue> sectionsReader =
MergeTreeReaders.readerForMergeTree(
- sections, dropDelete, readerFactory, keyComparator,
mfFactory.create());
+ sections,
+ dropDelete,
+ readerFactory,
+ keyComparator,
+ mfFactory.create(),
+ sortEngine);
writer.write(new RecordReaderIterator<>(sectionsReader));
writer.close();
return new CompactResult(extractFilesFromSections(sections),
writer.result());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReader.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReader.java
index 897f8d83a..d545efd35 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReader.java
@@ -18,18 +18,13 @@
package org.apache.paimon.mergetree.compact;
+import org.apache.paimon.CoreOptions.SortEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.utils.Preconditions;
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
-import java.util.PriorityQueue;
/**
* This reader is to read a list of {@link RecordReader}, which is already
sorted by key and
@@ -38,168 +33,22 @@ import java.util.PriorityQueue;
*
* <p>NOTE: {@link KeyValue}s from the same {@link RecordReader} must not
contain the same key.
*/
-public class SortMergeReader<T> implements RecordReader<T> {
-
- private final List<RecordReader<KeyValue>> nextBatchReaders;
- private final Comparator<InternalRow> userKeyComparator;
- private final MergeFunctionWrapper<T> mergeFunctionWrapper;
+public interface SortMergeReader<T> extends RecordReader<T> {
- private final PriorityQueue<Element> minHeap;
- private final List<Element> polled;
-
- public SortMergeReader(
+ static <T> SortMergeReader<T> createSortMergeReader(
List<RecordReader<KeyValue>> readers,
Comparator<InternalRow> userKeyComparator,
- MergeFunctionWrapper<T> mergeFunctionWrapper) {
- this.nextBatchReaders = new ArrayList<>(readers);
- this.userKeyComparator = userKeyComparator;
- this.mergeFunctionWrapper = mergeFunctionWrapper;
-
- this.minHeap =
- new PriorityQueue<>(
- (e1, e2) -> {
- int result =
userKeyComparator.compare(e1.kv.key(), e2.kv.key());
- if (result != 0) {
- return result;
- }
- return Long.compare(e1.kv.sequenceNumber(),
e2.kv.sequenceNumber());
- });
- this.polled = new ArrayList<>();
- }
-
- @Nullable
- @Override
- public RecordIterator<T> readBatch() throws IOException {
- for (RecordReader<KeyValue> reader : nextBatchReaders) {
- while (true) {
- RecordIterator<KeyValue> iterator = reader.readBatch();
- if (iterator == null) {
- // no more batches, permanently remove this reader
- reader.close();
- break;
- }
- KeyValue kv = iterator.next();
- if (kv == null) {
- // empty iterator, clean up and try next batch
- iterator.releaseBatch();
- } else {
- // found next kv
- minHeap.offer(new Element(kv, iterator, reader));
- break;
- }
- }
- }
- nextBatchReaders.clear();
-
- return minHeap.isEmpty() ? null : new SortMergeIterator();
- }
-
- @Override
- public void close() throws IOException {
- for (RecordReader<KeyValue> reader : nextBatchReaders) {
- reader.close();
- }
- for (Element element : minHeap) {
- element.iterator.releaseBatch();
- element.reader.close();
- }
- for (Element element : polled) {
- element.iterator.releaseBatch();
- element.reader.close();
- }
- }
-
- /** The iterator iterates on {@link SortMergeReader}. */
- private class SortMergeIterator implements RecordIterator<T> {
-
- private boolean released = false;
-
- @Override
- public T next() throws IOException {
- while (true) {
- boolean hasMore = nextImpl();
- if (!hasMore) {
- return null;
- }
- T result = mergeFunctionWrapper.getResult();
- if (result != null) {
- return result;
- }
- }
- }
-
- private boolean nextImpl() throws IOException {
- Preconditions.checkState(
- !released, "SortMergeIterator#advanceNext is called after
release");
- Preconditions.checkState(
- nextBatchReaders.isEmpty(),
- "SortMergeIterator#advanceNext is called even if the last
call returns null. "
- + "This is a bug.");
-
- // add previously polled elements back to priority queue
- for (Element element : polled) {
- if (element.update()) {
- // still kvs left, add back to priority queue
- minHeap.offer(element);
- } else {
- // reach end of batch, clean up
- element.iterator.releaseBatch();
- nextBatchReaders.add(element.reader);
- }
- }
- polled.clear();
-
- // there are readers reaching end of batch, so we end current batch
- if (!nextBatchReaders.isEmpty()) {
- return false;
- }
-
- mergeFunctionWrapper.reset();
- InternalRow key =
- Preconditions.checkNotNull(minHeap.peek(), "Min heap is
empty. This is a bug.")
- .kv
- .key();
-
- // fetch all elements with the same key
- // note that the same iterator should not produce the same keys,
so this code is correct
- while (!minHeap.isEmpty()) {
- Element element = minHeap.peek();
- if (userKeyComparator.compare(key, element.kv.key()) != 0) {
- break;
- }
- minHeap.poll();
- mergeFunctionWrapper.add(element.kv);
- polled.add(element);
- }
- return true;
- }
-
- @Override
- public void releaseBatch() {
- released = true;
- }
- }
-
- private static class Element {
- private KeyValue kv;
- private final RecordIterator<KeyValue> iterator;
- private final RecordReader<KeyValue> reader;
-
- private Element(
- KeyValue kv, RecordIterator<KeyValue> iterator,
RecordReader<KeyValue> reader) {
- this.kv = kv;
- this.iterator = iterator;
- this.reader = reader;
- }
-
- // IMPORTANT: Must not call this for elements still in priority queue!
- private boolean update() throws IOException {
- KeyValue nextKv = iterator.next();
- if (nextKv == null) {
- return false;
- }
- kv = nextKv;
- return true;
+ MergeFunctionWrapper<T> mergeFunctionWrapper,
+ SortEngine sortEngine) {
+ switch (sortEngine) {
+ case MIN_HEAP:
+ return new SortMergeReaderWithMinHeap<>(
+ readers, userKeyComparator, mergeFunctionWrapper);
+ case LOSER_TREE:
+ return new SortMergeReaderWithLoserTree<>(
+ readers, userKeyComparator, mergeFunctionWrapper);
+ default:
+ throw new UnsupportedOperationException("Unsupported sort
engine: " + sortEngine);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithLoserTree.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithLoserTree.java
new file mode 100644
index 000000000..f7a20ee67
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithLoserTree.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.utils.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+
+/** {@link SortMergeReader} implemented with loser-tree. */
+public class SortMergeReaderWithLoserTree<T> implements SortMergeReader<T> {
+
+ private final MergeFunctionWrapper<T> mergeFunctionWrapper;
+ private final LoserTree<KeyValue> loserTree;
+
+ public SortMergeReaderWithLoserTree(
+ List<RecordReader<KeyValue>> readers,
+ Comparator<InternalRow> userKeyComparator,
+ MergeFunctionWrapper<T> mergeFunctionWrapper) {
+ this.mergeFunctionWrapper = mergeFunctionWrapper;
+ this.loserTree =
+ new LoserTree<>(
+ readers,
+ (e1, e2) -> userKeyComparator.compare(e2.key(),
e1.key()),
+ (e1, e2) -> Long.compare(e2.sequenceNumber(),
e1.sequenceNumber()));
+ }
+
+ /** Compared with heapsort, {@link LoserTree} will only produce one batch.
*/
+ @Nullable
+ @Override
+ public RecordIterator<T> readBatch() throws IOException {
+ loserTree.initializeIfNeeded();
+ return loserTree.peekWinner() == null ? null : new SortMergeIterator();
+ }
+
+ @Override
+ public void close() throws IOException {
+ loserTree.close();
+ }
+
+ /** The iterator iterates on {@link SortMergeReaderWithLoserTree}. */
+ private class SortMergeIterator implements RecordIterator<T> {
+
+ private boolean released = false;
+
+ @Nullable
+ @Override
+ public T next() throws IOException {
+ while (true) {
+ loserTree.adjustForNextLoop();
+ KeyValue winner = loserTree.popWinner();
+ if (winner == null) {
+ return null;
+ }
+ mergeFunctionWrapper.reset();
+ mergeFunctionWrapper.add(winner);
+
+ T result = merge();
+ if (result != null) {
+ return result;
+ }
+ }
+ }
+
+ private T merge() {
+ Preconditions.checkState(
+ !released, "SortMergeIterator#nextImpl is called after
release");
+
+ while (loserTree.peekWinner() != null) {
+ mergeFunctionWrapper.add(loserTree.popWinner());
+ }
+ return mergeFunctionWrapper.getResult();
+ }
+
+ @Override
+ public void releaseBatch() {
+ released = true;
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReader.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithMinHeap.java
similarity index 93%
copy from
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReader.java
copy to
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithMinHeap.java
index 897f8d83a..adca53fdb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithMinHeap.java
@@ -31,14 +31,8 @@ import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
-/**
- * This reader is to read a list of {@link RecordReader}, which is already
sorted by key and
- * sequence number, and perform a sort merge algorithm. {@link KeyValue}s with
the same key will
- * also be combined during sort merging.
- *
- * <p>NOTE: {@link KeyValue}s from the same {@link RecordReader} must not
contain the same key.
- */
-public class SortMergeReader<T> implements RecordReader<T> {
+/** {@link SortMergeReader} implemented with min-heap. */
+public class SortMergeReaderWithMinHeap<T> implements SortMergeReader<T> {
private final List<RecordReader<KeyValue>> nextBatchReaders;
private final Comparator<InternalRow> userKeyComparator;
@@ -47,7 +41,7 @@ public class SortMergeReader<T> implements RecordReader<T> {
private final PriorityQueue<Element> minHeap;
private final List<Element> polled;
- public SortMergeReader(
+ public SortMergeReaderWithMinHeap(
List<RecordReader<KeyValue>> readers,
Comparator<InternalRow> userKeyComparator,
MergeFunctionWrapper<T> mergeFunctionWrapper) {
@@ -109,7 +103,7 @@ public class SortMergeReader<T> implements RecordReader<T> {
}
}
- /** The iterator iterates on {@link SortMergeReader}. */
+ /** The iterator iterates on {@link SortMergeReaderWithMinHeap}. */
private class SortMergeIterator implements RecordIterator<T> {
private boolean released = false;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
index 3a056fa8a..8512afb5f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
@@ -18,6 +18,7 @@
package org.apache.paimon.operation;
+import org.apache.paimon.CoreOptions.SortEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.data.InternalRow;
@@ -33,6 +34,7 @@ import org.apache.paimon.mergetree.compact.IntervalPartition;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
+import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
@@ -53,6 +55,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
+import static org.apache.paimon.CoreOptions.SORT_ENGINE;
import static org.apache.paimon.io.DataFilePathFactory.CHANGELOG_FILE_PREFIX;
import static org.apache.paimon.predicate.PredicateBuilder.containsFields;
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
@@ -65,6 +68,7 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
private final Comparator<InternalRow> keyComparator;
private final MergeFunctionFactory<KeyValue> mfFactory;
private final boolean valueCountMode;
+ private final SortEngine sortEngine;
@Nullable private int[][] keyProjectedFields;
@@ -99,6 +103,7 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
this.keyComparator = keyComparator;
this.mfFactory = mfFactory;
this.valueCountMode = tableSchema.trimmedPrimaryKeys().isEmpty();
+ this.sortEngine =
Options.fromMap(tableSchema.options()).get(SORT_ENGINE);
}
public KeyValueFileStoreRead withKeyProjection(int[][] projectedFields) {
@@ -196,7 +201,8 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
? overlappedSectionFactory
: nonOverlappedSectionFactory,
keyComparator,
- mergeFuncWrapper));
+ mergeFuncWrapper,
+ sortEngine));
}
DropDeleteReader reader =
new
DropDeleteReader(ConcatRecordReader.create(sectionReaders));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 72af8fb57..b346168a4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -210,14 +210,24 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
readerFactory,
writerFactory,
keyComparator,
- mfFactory);
+ mfFactory,
+ options.sortEngine());
case LOOKUP:
LookupLevels lookupLevels = createLookupLevels(levels,
readerFactory);
return new LookupMergeTreeCompactRewriter(
- lookupLevels, readerFactory, writerFactory,
keyComparator, mfFactory);
+ lookupLevels,
+ readerFactory,
+ writerFactory,
+ keyComparator,
+ mfFactory,
+ options.sortEngine());
default:
return new MergeTreeCompactRewriter(
- readerFactory, writerFactory, keyComparator,
mfFactory);
+ readerFactory,
+ writerFactory,
+ keyComparator,
+ mfFactory,
+ options.sortEngine());
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
similarity index 95%
rename from
paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 3a2ce21cc..5538928e9 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.mergetree;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
+import org.apache.paimon.CoreOptions.SortEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.data.BinaryRow;
@@ -85,7 +86,7 @@ import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link MergeTreeReaders} and {@link MergeTreeWriter}. */
-public class MergeTreeTest {
+public abstract class MergeTreeTestBase {
@TempDir java.nio.file.Path tempDir;
private static ExecutorService service;
@@ -99,12 +100,14 @@ public class MergeTreeTest {
private KeyValueFileWriterFactory writerFactory;
private KeyValueFileWriterFactory compactWriterFactory;
private RecordWriter<KeyValue> writer;
+ private SortEngine sortEngine;
@BeforeEach
public void beforeEach() throws IOException {
path = new Path(tempDir.toString());
pathFactory = new FileStorePathFactory(path);
comparator = Comparator.comparingInt(o -> o.getInt(0));
+ sortEngine = getSortEngine();
recreateMergeTree(1024 * 1024);
Path bucketDir =
writerFactory.pathFactory().toPath("ignore").getParent();
LocalFileIO.create().mkdirs(bucketDir);
@@ -433,7 +436,8 @@ public class MergeTreeTest {
dropDelete,
readerFactory,
comparator,
- DeduplicateMergeFunction.factory().create());
+ DeduplicateMergeFunction.factory().create(),
+ sortEngine);
List<TestRecord> records = new ArrayList<>();
try (RecordReaderIterator<KeyValue> iterator = new
RecordReaderIterator<>(reader)) {
while (iterator.hasNext()) {
@@ -462,6 +466,8 @@ public class MergeTreeTest {
return records;
}
+ protected abstract SortEngine getSortEngine();
+
private class TestRewriter extends AbstractCompactRewriter {
@Override
@@ -476,7 +482,8 @@ public class MergeTreeTest {
dropDelete,
compactReaderFactory,
comparator,
- DeduplicateMergeFunction.factory().create());
+ DeduplicateMergeFunction.factory().create(),
+ sortEngine);
writer.write(new RecordReaderIterator<>(sectionsReader));
writer.close();
return new CompactResult(extractFilesFromSections(sections),
writer.result());
@@ -512,4 +519,22 @@ public class MergeTreeTest {
return "TestRecord{" + "kind=" + kind + ", k=" + k + ", v=" + v +
'}';
}
}
+
+ /** {@link MergeTreeTestBase} with {@link SortEngine#LOSER_TREE}. */
+ public static class MergeTreeTestWithLoserTree extends MergeTreeTestBase {
+
+ @Override
+ protected SortEngine getSortEngine() {
+ return SortEngine.LOSER_TREE;
+ }
+ }
+
+ /** {@link MergeTreeTestBase} with {@link SortEngine#MIN_HEAP}. */
+ public static class MergeTreeTestWithMinHeap extends MergeTreeTestBase {
+
+ @Override
+ protected SortEngine getSortEngine() {
+ return SortEngine.MIN_HEAP;
+ }
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/CombiningRecordReaderTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/CombiningRecordReaderTestBase.java
index 0012c54aa..cddb0cbde 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/CombiningRecordReaderTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/CombiningRecordReaderTestBase.java
@@ -18,13 +18,15 @@
package org.apache.paimon.mergetree.compact;
+import org.apache.paimon.CoreOptions.SortEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordComparator;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.utils.ReusingTestData;
import org.apache.paimon.utils.TestReusingRecordReader;
-import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import java.io.IOException;
import java.util.ArrayList;
@@ -47,11 +49,14 @@ public abstract class CombiningRecordReaderTestBase {
protected abstract List<ReusingTestData> getExpected(List<ReusingTestData>
input);
protected abstract RecordReader<KeyValue> createRecordReader(
- List<TestReusingRecordReader> readers);
+ List<TestReusingRecordReader> readers, SortEngine sortEngine);
- @RepeatedTest(100)
- public void testRandom() throws IOException {
- runTest(generateRandomData());
+ @ParameterizedTest
+ @EnumSource(SortEngine.class)
+ public void testRandom(SortEngine sortEngine) throws IOException {
+ for (int i = 0; i < 100; i++) {
+ runTest(generateRandomData(), sortEngine);
+ }
}
protected List<List<ReusingTestData>> parseData(String... stringsData) {
@@ -74,7 +79,8 @@ public abstract class CombiningRecordReaderTestBase {
return readersData;
}
- protected void runTest(List<List<ReusingTestData>> readersData) throws
IOException {
+ protected void runTest(List<List<ReusingTestData>> readersData, SortEngine
sortEngine)
+ throws IOException {
Iterator<ReusingTestData> expectedIterator =
getExpected(
readersData.stream()
@@ -85,7 +91,7 @@ public abstract class CombiningRecordReaderTestBase {
for (List<ReusingTestData> readerData : readersData) {
readers.add(new TestReusingRecordReader(readerData));
}
- RecordReader<KeyValue> recordReader = createRecordReader(readers);
+ RecordReader<KeyValue> recordReader = createRecordReader(readers,
sortEngine);
RecordReader.RecordIterator<KeyValue> batch;
while ((batch = recordReader.readBatch()) != null) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ConcatRecordReaderTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ConcatRecordReaderTest.java
index 5afaecc2d..6b4a85c0c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ConcatRecordReaderTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ConcatRecordReaderTest.java
@@ -18,6 +18,8 @@
package org.apache.paimon.mergetree.compact;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.SortEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.utils.ReusingTestData;
@@ -43,7 +45,8 @@ public class ConcatRecordReaderTest extends
CombiningRecordReaderTestBase {
}
@Override
- protected RecordReader<KeyValue>
createRecordReader(List<TestReusingRecordReader> readers) {
+ protected RecordReader<KeyValue> createRecordReader(
+ List<TestReusingRecordReader> readers, SortEngine sortEngine) {
return new ConcatRecordReader(
readers.stream()
.map(r -> (ConcatRecordReader.ReaderSupplier) () -> r)
@@ -65,4 +68,8 @@ public class ConcatRecordReaderTest extends
CombiningRecordReaderTestBase {
"",
" 12, 60, +, 1200 | 14, 70, -, 1400 | 16, 80, +,
1600 | 18, 90, -, 1800"));
}
+
+ private void runTest(List<List<ReusingTestData>> readersData) throws
IOException {
+ runTest(readersData, CoreOptions.SORT_ENGINE.defaultValue());
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LoserTreeTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LoserTreeTest.java
new file mode 100644
index 000000000..d1547ed02
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LoserTreeTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.utils.ReusingTestData;
+import org.apache.paimon.utils.TestReusingRecordReader;
+
+import org.junit.jupiter.api.RepeatedTest;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link LoserTree}. */
+public class LoserTreeTest {
+ private static final Comparator<KeyValue> KEY_COMPARATOR =
+ Comparator.comparingInt(o -> o.key().getInt(0));
+ private static final Comparator<KeyValue> SEQUENCE_COMPARATOR =
+ Comparator.comparingLong(KeyValue::sequenceNumber);
+
+ @RepeatedTest(100)
+ public void testLoserTreeIsOrdered() throws IOException {
+ List<ReusingTestData> reusingTestData =
ReusingTestData.generateData(1000, false);
+ List<RecordReader<KeyValue>> sortedTestReaders = new ArrayList<>();
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int numberReaders = random.nextInt(20) + 1;
+ int lowerBound = 0, upperBound = reusingTestData.size();
+ for (int i = 0; i < numberReaders; i++) {
+ int subUpperBound = random.nextInt(lowerBound, upperBound);
+ List<ReusingTestData> subReusingTestData =
+ reusingTestData.subList(lowerBound, subUpperBound);
+ Collections.sort(subReusingTestData);
+ sortedTestReaders.add(new
TestReusingRecordReader(subReusingTestData));
+ lowerBound = subUpperBound;
+ }
+ Collections.sort(reusingTestData);
+ checkLoserTree(sortedTestReaders, reusingTestData);
+ }
+
+ private void checkLoserTree(
+ List<RecordReader<KeyValue>> sortedTestReaders,
List<ReusingTestData> expectedData)
+ throws IOException {
+ try (LoserTree<KeyValue> loserTree =
+ new LoserTree<>(sortedTestReaders, KEY_COMPARATOR,
SEQUENCE_COMPARATOR)) {
+ Iterator<ReusingTestData> expectedIterator =
expectedData.iterator();
+ do {
+ loserTree.adjustForNextLoop();
+ for (KeyValue winner = loserTree.popWinner();
+ winner != null;
+ winner = loserTree.popWinner()) {
+ assertThat(expectedIterator.hasNext());
+ expectedIterator.next().assertEquals(winner);
+ }
+ } while (loserTree.peekWinner() != null &&
expectedIterator.hasNext());
+ }
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
index d0200c6ab..e19b8113e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
@@ -18,12 +18,15 @@
package org.apache.paimon.mergetree.compact;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.SortEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.utils.ReusingTestData;
import org.apache.paimon.utils.TestReusingRecordReader;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import java.io.IOException;
import java.util.ArrayList;
@@ -35,43 +38,53 @@ public abstract class SortMergeReaderTestBase extends
CombiningRecordReaderTestB
protected abstract MergeFunction<KeyValue> createMergeFunction();
@Override
- protected RecordReader<KeyValue>
createRecordReader(List<TestReusingRecordReader> readers) {
- return new SortMergeReader<>(
+ protected RecordReader<KeyValue> createRecordReader(
+ List<TestReusingRecordReader> readers, CoreOptions.SortEngine
sortEngine) {
+ return SortMergeReader.createSortMergeReader(
new ArrayList<>(readers),
KEY_COMPARATOR,
- new ReducerMergeFunctionWrapper(createMergeFunction()));
+ new ReducerMergeFunctionWrapper(createMergeFunction()),
+ sortEngine);
}
- @Test
- public void testEmpty() throws IOException {
- runTest(parseData(""));
- runTest(parseData("", "", ""));
+ @ParameterizedTest
+ @EnumSource(SortEngine.class)
+ public void testEmpty(SortEngine sortEngine) throws IOException {
+ runTest(parseData(""), sortEngine);
+ runTest(parseData("", "", ""), sortEngine);
}
- @Test
- public void testAlternateKeys() throws IOException {
+ @ParameterizedTest
+ @EnumSource(SortEngine.class)
+ public void testAlternateKeys(SortEngine sortEngine) throws IOException {
runTest(
parseData(
"1, 1, +, 100 | 3, 2, +, 300 | 5, 3, +, 200 | 7, 4, +,
600 | 9, 20, +, 400",
"0, 5, +, 0",
"0, 10, +, 0",
"",
- "2, 6, +, 200 | 4, 7, +, 400 | 6, 8, +, 600 | 8, 9, +,
800"));
+ "2, 6, +, 200 | 4, 7, +, 400 | 6, 8, +, 600 | 8, 9, +,
800"),
+ sortEngine);
}
- @Test
- public void testDuplicateKeys() throws IOException {
- runTest(parseData("1, 1, +, 100 | 3, 3, +, 300", "1, 4, +, 200 | 3, 5,
+, 300"));
+ @ParameterizedTest
+ @EnumSource(SortEngine.class)
+ public void testDuplicateKeys(SortEngine sortEngine) throws IOException {
+ runTest(
+ parseData("1, 1, +, 100 | 3, 3, +, 300", "1, 4, +, 200 | 3, 5,
+, 300"),
+ sortEngine);
}
- @Test
- public void testLongTailRecords() throws IOException {
+ @ParameterizedTest
+ @EnumSource(SortEngine.class)
+ public void testLongTailRecords(SortEngine sortEngine) throws IOException {
runTest(
parseData(
"1, 1, +, 100 | 2, 500, +, 200",
"1, 3, +, 100 | 3, 4, +, 300 | 5, 501, +, 500 | 7,
503, +, 700 | "
+ "8, 504, +, 800 | 9, 505, +, 900 | 10, 506,
+, 1000 | "
- + "11, 507, +, 1100 | 12, 508, +, 1200 | 13,
509, +, 1300"));
+ + "11, 507, +, 1100 | 12, 508, +, 1200 | 13,
509, +, 1300"),
+ sortEngine);
}
/** Tests for {@link SortMergeReader} with {@link
DeduplicateMergeFunction}. */
@@ -111,8 +124,9 @@ public abstract class SortMergeReaderTestBase extends
CombiningRecordReaderTestB
return new ValueCountMergeFunction();
}
- @Test
- public void testCancelingRecords() throws IOException {
+ @ParameterizedTest
+ @EnumSource(SortEngine.class)
+ public void testCancelingRecords(SortEngine sortEngine) throws
IOException {
runTest(
parseData(
"1, 1, +, 100 | 3, 5, +, -300 | 5, 300, +, 300",
@@ -120,8 +134,9 @@ public abstract class SortMergeReaderTestBase extends
CombiningRecordReaderTestB
"1, 4, +, -200 | 3, 3, +, 300",
"5, 100, +, -200 | 7, 123, +, -500",
"7, 321, +, 200",
- "7, 456, +, 300"));
- runTest(parseData("1, 2, +, 100", "1, 1, +, -100"));
+ "7, 456, +, 300"),
+ sortEngine);
+ runTest(parseData("1, 2, +, 100", "1, 1, +, -100"), sortEngine);
}
}
}