This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new edaac231d [core] Introduce sort-spill-threshold (#1431)
edaac231d is described below

commit edaac231dbccce9cddd48937e37c74e84eb638f0
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 26 11:55:55 2023 +0800

    [core] Introduce sort-spill-threshold (#1431)
---
 .../shortcodes/generated/core_configuration.html   |  18 +-
 .../apache/paimon/memory/CachelessSegmentPool.java |  61 ++++
 .../org/apache/paimon/reader/RecordReader.java     |  21 ++
 .../paimon/utils/ConsumerWithIOException.java      |  32 +++
 .../main/java/org/apache/paimon/CoreOptions.java   |  46 +--
 .../java/org/apache/paimon/disk/IOManagerImpl.java |  10 +-
 .../org/apache/paimon/mergetree/MergeSorter.java   | 312 +++++++++++++++++++++
 .../apache/paimon/mergetree/MergeTreeReaders.java  |  43 +--
 .../paimon/mergetree/SortBufferWriteBuffer.java    |  46 ---
 .../compact/ChangelogMergeTreeRewriter.java        |  23 +-
 .../FullChangelogMergeTreeCompactRewriter.java     |   6 +-
 .../compact/LookupMergeTreeCompactRewriter.java    |   6 +-
 .../compact/MergeTreeCompactRewriter.java          |  10 +-
 .../mergetree/compact/UniversalCompaction.java     |  13 +-
 .../paimon/operation/KeyValueFileStoreRead.java    |  19 +-
 .../paimon/operation/KeyValueFileStoreWrite.java   |  15 +-
 .../paimon/table/sink/BatchWriteBuilderImpl.java   |   4 +-
 .../paimon/table/source/KeyValueTableRead.java     |   7 +
 .../org/apache/paimon/table/source/TableRead.java  |   5 +
 .../apache/paimon/mergetree/MergeSorterTest.java   | 151 ++++++++++
 .../apache/paimon/mergetree/MergeTreeTestBase.java |  10 +-
 .../compact/ForceUpLevel0CompactionTest.java       |   2 +-
 .../mergetree/compact/UniversalCompactionTest.java |  55 +---
 .../table/ChangelogWithKeyFileStoreTableTest.java  |  24 +-
 .../apache/paimon/flink/source/FlinkSource.java    |  10 +-
 .../paimon/flink/source/operator/ReadOperator.java |   9 +-
 .../apache/paimon/flink/BatchFileStoreITCase.java  |  14 +
 .../apache/paimon/spark/SparkReaderFactory.java    |   8 +-
 .../java/org/apache/paimon/spark/SparkUtils.java   |  32 +++
 .../spark/commands/WriteIntoPaimonTable.scala      |   2 +
 .../org/apache/paimon/spark/SparkWriteITCase.java  |   8 +
 31 files changed, 811 insertions(+), 211 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 53c514d36..bf4265e27 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -68,12 +68,6 @@ under the License.
             <td>Integer</td>
             <td>The size amplification is defined as the amount (in 
percentage) of additional storage needed to store a single byte of data in the 
merge tree for changelog mode table.</td>
         </tr>
-        <tr>
-            <td><h5>compaction.max-sorted-run-num</h5></td>
-            <td style="word-wrap: break-word;">2147483647</td>
-            <td>Integer</td>
-            <td>The maximum sorted run number to pick for compaction. This 
value avoids merging too much sorted runs at the same time during compaction, 
which may lead to OutOfMemoryError.</td>
-        </tr>
         <tr>
             <td><h5>compaction.max.file-num</h5></td>
             <td style="word-wrap: break-word;">50</td>
@@ -410,6 +404,18 @@ under the License.
             <td><p>Enum</p></td>
             <td>Specify the sort engine for table with primary key.<br /><br 
/>Possible values:<ul><li>"min-heap": Use min-heap for multiway 
sorting.</li><li>"loser-tree": Use loser-tree for multiway sorting. Compared 
with heapsort, loser-tree has fewer comparisons and is more 
efficient.</li></ul></td>
         </tr>
+        <tr>
+            <td><h5>sort-spill-buffer-size</h5></td>
+            <td style="word-wrap: break-word;">64 mb</td>
+            <td>MemorySize</td>
+            <td>Amount of data to spill records to disk in spilled sort.</td>
+        </tr>
+        <tr>
+            <td><h5>sort-spill-threshold</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Integer</td>
+            <td>If the maximum number of sort readers exceeds this value, a 
spill will be attempted. This prevents too many readers from consuming too much 
memory and causing OOM.</td>
+        </tr>
         <tr>
             <td><h5>source.split.open-file-cost</h5></td>
             <td style="word-wrap: break-word;">4 mb</td>
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/memory/CachelessSegmentPool.java
 
b/paimon-common/src/main/java/org/apache/paimon/memory/CachelessSegmentPool.java
new file mode 100644
index 000000000..cec1db858
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/memory/CachelessSegmentPool.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.memory;
+
+import java.util.List;
+
+/** A {@link MemorySegmentPool} without cache. */
+public class CachelessSegmentPool implements MemorySegmentPool {
+
+    private final int maxPages;
+    private final int pageSize;
+
+    private int numPage;
+
+    public CachelessSegmentPool(long maxMemory, int pageSize) {
+        this.maxPages = (int) (maxMemory / pageSize);
+        this.pageSize = pageSize;
+        this.numPage = 0;
+    }
+
+    @Override
+    public MemorySegment nextSegment() {
+        if (numPage < maxPages) {
+            numPage++;
+            return MemorySegment.allocateHeapMemory(pageSize);
+        }
+
+        return null;
+    }
+
+    @Override
+    public int pageSize() {
+        return pageSize;
+    }
+
+    @Override
+    public void returnAll(List<MemorySegment> memory) {
+        numPage -= memory.size();
+    }
+
+    @Override
+    public int freePages() {
+        return maxPages - numPage;
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java 
b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
index 474e33b4d..79fab867d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
@@ -20,6 +20,7 @@ package org.apache.paimon.reader;
 
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.ConsumerWithIOException;
 import org.apache.paimon.utils.Filter;
 
 import javax.annotation.Nullable;
@@ -141,6 +142,26 @@ public interface RecordReader<T> extends Closeable {
         }
     }
 
+    /**
+     * Performs the given action for each remaining element in {@link 
RecordReader} until all
+     * elements have been processed or the action throws an exception.
+     */
+    default void forIOEachRemaining(ConsumerWithIOException<? super T> action) 
throws IOException {
+        RecordReader.RecordIterator<T> batch;
+        T record;
+
+        try {
+            while ((batch = readBatch()) != null) {
+                while ((record = batch.next()) != null) {
+                    action.accept(record);
+                }
+                batch.releaseBatch();
+            }
+        } finally {
+            close();
+        }
+    }
+
     /** Returns a {@link RecordReader} that applies {@code function} to each 
element. */
     default <R> RecordReader<R> transform(Function<T, R> function) {
         RecordReader<T> thisReader = this;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/ConsumerWithIOException.java
 
b/paimon-common/src/main/java/org/apache/paimon/utils/ConsumerWithIOException.java
new file mode 100644
index 000000000..cd62c9ec2
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/ConsumerWithIOException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.io.IOException;
+
+/** A consumer with {@link IOException}. */
+@FunctionalInterface
+public interface ConsumerWithIOException<T> {
+
+    /**
+     * Performs this operation on the given argument.
+     *
+     * @param t the input argument
+     */
+    void accept(T t) throws IOException;
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index aafbd2c72..d8ce5747f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -199,6 +199,20 @@ public class CoreOptions implements Serializable {
                     .defaultValue(SortEngine.LOSER_TREE)
                     .withDescription("Specify the sort engine for table with 
primary key.");
 
+    public static final ConfigOption<Integer> SORT_SPILL_THRESHOLD =
+            key("sort-spill-threshold")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "If the maximum number of sort readers exceeds 
this value, a spill will be attempted. "
+                                    + "This prevents too many readers from 
consuming too much memory and causing OOM.");
+
+    public static final ConfigOption<MemorySize> SORT_SPILL_BUFFER_SIZE =
+            key("sort-spill-buffer-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("64 mb"))
+                    .withDescription("Amount of data to spill records to disk 
in spilled sort.");
+
     @Immutable
     public static final ConfigOption<WriteMode> WRITE_MODE =
             key("write-mode")
@@ -337,15 +351,6 @@ public class CoreOptions implements Serializable {
                                     + "for append-only table, even if 
sum(size(f_i)) < targetFileSize. This value "
                                     + "avoids pending too much small files, 
which slows down the performance.");
 
-    public static final ConfigOption<Integer> COMPACTION_MAX_SORTED_RUN_NUM =
-            key("compaction.max-sorted-run-num")
-                    .intType()
-                    .defaultValue(Integer.MAX_VALUE)
-                    .withDescription(
-                            "The maximum sorted run number to pick for 
compaction. "
-                                    + "This value avoids merging too much 
sorted runs at the same time during compaction, "
-                                    + "which may lead to OutOfMemoryError.");
-
     public static final ConfigOption<ChangelogProducer> CHANGELOG_PRODUCER =
             key("changelog-producer")
                     .enumType(ChangelogProducer.class)
@@ -785,6 +790,15 @@ public class CoreOptions implements Serializable {
         return options.get(SORT_ENGINE);
     }
 
+    public int sortSpillThreshold() {
+        Integer maxSortedRunNum = options.get(SORT_SPILL_THRESHOLD);
+        if (maxSortedRunNum == null) {
+            int stopNum = numSortedRunStopTrigger();
+            maxSortedRunNum = Math.max(stopNum, stopNum + 1);
+        }
+        return maxSortedRunNum;
+    }
+
     public long splitTargetSize() {
         return options.get(SOURCE_SPLIT_TARGET_SIZE).getBytes();
     }
@@ -802,6 +816,10 @@ public class CoreOptions implements Serializable {
         return 
options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore || 
!isStreaming);
     }
 
+    public long sortSpillBufferSize() {
+        return options.get(SORT_SPILL_BUFFER_SIZE).getBytes();
+    }
+
     public Duration continuousDiscoveryInterval() {
         return options.get(CONTINUOUS_DISCOVERY_INTERVAL);
     }
@@ -834,11 +852,7 @@ public class CoreOptions implements Serializable {
         // By default, this ensures that the compaction does not fall to level 
0, but at least to
         // level 1
         Integer numLevels = options.get(NUM_LEVELS);
-        int expectedRuns =
-                maxSortedRunNum() == Integer.MAX_VALUE
-                        ? numSortedRunCompactionTrigger()
-                        : numSortedRunStopTrigger();
-        numLevels = numLevels == null ? expectedRuns + 1 : numLevels;
+        numLevels = numLevels == null ? numSortedRunCompactionTrigger() + 1 : 
numLevels;
         return numLevels;
     }
 
@@ -862,10 +876,6 @@ public class CoreOptions implements Serializable {
         return options.get(COMPACTION_MAX_FILE_NUM);
     }
 
-    public int maxSortedRunNum() {
-        return options.get(COMPACTION_MAX_SORTED_RUN_NUM);
-    }
-
     public long dynamicBucketTargetRowNum() {
         return options.get(DYNAMIC_BUCKET_TARGET_ROW_NUM);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
index 55be7a0f6..cae471aa8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
@@ -25,6 +25,8 @@ import org.apache.paimon.utils.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
@@ -47,7 +49,7 @@ public class IOManagerImpl implements IOManager {
      *
      * @param tempDirs The basic directories for files underlying anonymous 
channels.
      */
-    public IOManagerImpl(String[] tempDirs) {
+    public IOManagerImpl(String... tempDirs) {
         this.fileChannelManager =
                 new 
FileChannelManagerImpl(Preconditions.checkNotNull(tempDirs), DIR_NAME_PREFIX);
         if (LOG.isInfoEnabled()) {
@@ -122,4 +124,10 @@ public class IOManagerImpl implements IOManager {
     public BufferFileReader createBufferFileReader(FileIOChannel.ID channelID) 
throws IOException {
         return new BufferFileReaderImpl(channelID);
     }
+
+    public static String[] splitPaths(@Nonnull String separatedPaths) {
+        return separatedPaths.length() > 0
+                ? separatedPaths.split(",|" + File.pathSeparator)
+                : new String[0];
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
new file mode 100644
index 000000000..9f5b34e12
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.SortEngine;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.NormalizedKeyComputer;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.memory.CachelessSegmentPool;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.mergetree.compact.ConcatRecordReader;
+import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
+import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
+import org.apache.paimon.mergetree.compact.SortMergeReader;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.sort.BinaryExternalSortBuffer;
+import org.apache.paimon.sort.BinaryInMemorySortBuffer;
+import org.apache.paimon.sort.SortBuffer;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.MutableObjectIterator;
+import org.apache.paimon.utils.OffsetRow;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.apache.paimon.schema.SystemColumns.SEQUENCE_NUMBER;
+import static org.apache.paimon.schema.SystemColumns.VALUE_KIND;
+
+/** The merge sorter to sort and merge readers with key overlap. */
+public class MergeSorter {
+
+    private final RowType keyType;
+    private final RowType valueType;
+
+    private final SortEngine sortEngine;
+    private final int spillThreshold;
+    private final int spillSortMaxNumFiles;
+
+    private final MemorySegmentPool memoryPool;
+
+    @Nullable private IOManager ioManager;
+
+    public MergeSorter(
+            CoreOptions options,
+            RowType keyType,
+            RowType valueType,
+            @Nullable IOManager ioManager) {
+        this.sortEngine = options.sortEngine();
+        this.spillThreshold = options.sortSpillThreshold();
+        this.spillSortMaxNumFiles = options.localSortMaxNumFileHandles();
+        this.keyType = keyType;
+        this.valueType = valueType;
+        this.memoryPool =
+                new CachelessSegmentPool(options.sortSpillBufferSize(), 
options.pageSize());
+        this.ioManager = ioManager;
+    }
+
+    public MemorySegmentPool memoryPool() {
+        return memoryPool;
+    }
+
+    public void setIOManager(IOManager ioManager) {
+        this.ioManager = ioManager;
+    }
+
+    public <T> RecordReader<T> mergeSort(
+            List<ReaderSupplier<KeyValue>> lazyReaders,
+            Comparator<InternalRow> keyComparator,
+            MergeFunctionWrapper<T> mergeFunction)
+            throws IOException {
+        if (ioManager != null && lazyReaders.size() > spillThreshold) {
+            return spillMergeSort(lazyReaders, keyComparator, mergeFunction);
+        }
+
+        List<RecordReader<KeyValue>> readers = new 
ArrayList<>(lazyReaders.size());
+        for (ReaderSupplier<KeyValue> supplier : lazyReaders) {
+            try {
+                readers.add(supplier.get());
+            } catch (IOException e) {
+                // if one of the readers creating failed, we need to close 
them all.
+                readers.forEach(IOUtils::closeQuietly);
+                throw e;
+            }
+        }
+
+        return SortMergeReader.createSortMergeReader(
+                readers, keyComparator, mergeFunction, sortEngine);
+    }
+
+    private <T> RecordReader<T> spillMergeSort(
+            List<ReaderSupplier<KeyValue>> readers,
+            Comparator<InternalRow> keyComparator,
+            MergeFunctionWrapper<T> mergeFunction)
+            throws IOException {
+        ExternalSorterWithLevel sorter = new ExternalSorterWithLevel();
+        ConcatRecordReader.create(readers).forIOEachRemaining(sorter::put);
+        sorter.flushMemory();
+
+        NoReusingMergeIterator<T> iterator = sorter.newIterator(keyComparator, 
mergeFunction);
+        return new RecordReader<T>() {
+
+            private boolean read = false;
+
+            @Nullable
+            @Override
+            public RecordIterator<T> readBatch() {
+                if (read) {
+                    return null;
+                }
+
+                read = true;
+                return new RecordIterator<T>() {
+                    @Override
+                    public T next() throws IOException {
+                        return iterator.next();
+                    }
+
+                    @Override
+                    public void releaseBatch() {}
+                };
+            }
+
+            @Override
+            public void close() {
+                sorter.clear();
+            }
+        };
+    }
+
+    /**
+     * Here can not use {@link SortBufferWriteBuffer} for two reasons:
+     *
+     * <p>1.Changelog-producer: full-compaction and lookup need to know the 
level of the KeyValue.
+     *
+     * <p>2.Changelog-producer: full-compaction and lookup need to store the 
reference of
+     * update_before.
+     */
+    private class ExternalSorterWithLevel {
+
+        private final SortBuffer buffer;
+
+        public ExternalSorterWithLevel() {
+            // user key + sequenceNumber
+            List<DataType> sortKeyTypes = new 
ArrayList<>(keyType.getFieldTypes());
+            sortKeyTypes.add(new BigIntType(false));
+
+            // for sort binary buffer
+            NormalizedKeyComputer normalizedKeyComputer =
+                    CodeGenUtils.newNormalizedKeyComputer(sortKeyTypes, 
"MemTableKeyComputer");
+            RecordComparator keyComparator =
+                    CodeGenUtils.newRecordComparator(sortKeyTypes, 
"MemTableComparator");
+
+            if (memoryPool.freePages() < 3) {
+                throw new IllegalArgumentException(
+                        "Write buffer requires a minimum of 3 page memory, 
please increase write buffer memory size.");
+            }
+
+            List<DataField> fields = new ArrayList<>(keyType.getFields());
+            fields.add(new DataField(0, SEQUENCE_NUMBER, new 
BigIntType(false)));
+            fields.add(new DataField(1, VALUE_KIND, new TinyIntType(false)));
+            fields.add(new DataField(2, "_LEVEL", new IntType(false)));
+            fields.addAll(valueType.getFields());
+            RowType schemaWithLevel = new RowType(fields);
+            InternalRowSerializer serializer = 
InternalSerializers.create(schemaWithLevel);
+
+            this.buffer =
+                    new BinaryExternalSortBuffer(
+                            new BinaryRowSerializer(serializer.getArity()),
+                            keyComparator,
+                            memoryPool.pageSize(),
+                            BinaryInMemorySortBuffer.createBuffer(
+                                    normalizedKeyComputer, serializer, 
keyComparator, memoryPool),
+                            ioManager,
+                            spillSortMaxNumFiles);
+        }
+
+        public boolean put(KeyValue keyValue) throws IOException {
+            GenericRow meta = new GenericRow(3);
+            meta.setField(0, keyValue.sequenceNumber());
+            meta.setField(1, keyValue.valueKind().toByteValue());
+            meta.setField(2, keyValue.level());
+            JoinedRow row =
+                    new JoinedRow()
+                            .replace(
+                                    new JoinedRow().replace(keyValue.key(), 
meta),
+                                    keyValue.value());
+            return buffer.write(row);
+        }
+
+        public boolean flushMemory() throws IOException {
+            return buffer.flushMemory();
+        }
+
+        public void clear() {
+            buffer.clear();
+        }
+
+        public <T> NoReusingMergeIterator<T> newIterator(
+                Comparator<InternalRow> keyComparator, MergeFunctionWrapper<T> 
mergeFunction)
+                throws IOException {
+            return new NoReusingMergeIterator<>(
+                    buffer.sortedIterator(), keyComparator, mergeFunction);
+        }
+    }
+
+    private class NoReusingMergeIterator<T> {
+
+        private final MutableObjectIterator<BinaryRow> kvIter;
+        private final Comparator<InternalRow> keyComparator;
+        private final MergeFunctionWrapper<T> mergeFunc;
+
+        private KeyValue left;
+
+        private boolean isEnd;
+
+        private NoReusingMergeIterator(
+                MutableObjectIterator<BinaryRow> kvIter,
+                Comparator<InternalRow> keyComparator,
+                MergeFunctionWrapper<T> mergeFunction) {
+            this.kvIter = kvIter;
+            this.keyComparator = keyComparator;
+            this.mergeFunc = mergeFunction;
+            this.isEnd = false;
+        }
+
+        public T next() throws IOException {
+            if (isEnd) {
+                return null;
+            }
+
+            T result;
+            do {
+                mergeFunc.reset();
+                InternalRow key = null;
+                KeyValue keyValue;
+                while ((keyValue = readOnce()) != null) {
+                    if (key != null && keyComparator.compare(keyValue.key(), 
key) != 0) {
+                        break;
+                    }
+                    key = keyValue.key();
+                    mergeFunc.add(keyValue);
+                }
+                left = keyValue;
+                if (key == null) {
+                    return null;
+                }
+                result = mergeFunc.getResult();
+            } while (result == null);
+            return result;
+        }
+
+        private KeyValue readOnce() throws IOException {
+            if (left != null) {
+                KeyValue ret = left;
+                left = null;
+                return ret;
+            }
+            BinaryRow row = kvIter.next();
+            if (row == null) {
+                isEnd = true;
+                return null;
+            }
+
+            int keyArity = keyType.getFieldCount();
+            int valueArity = valueType.getFieldCount();
+            return new KeyValue()
+                    .replace(
+                            new OffsetRow(keyArity, 0).replace(row),
+                            row.getLong(keyArity),
+                            RowKind.fromByteValue(row.getByte(keyArity + 1)),
+                            new OffsetRow(valueArity, keyArity + 
3).replace(row))
+                    .setLevel(row.getInt(keyArity + 2));
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
index ed2468056..aa2bb4a30 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
@@ -18,18 +18,16 @@
 
 package org.apache.paimon.mergetree;
 
-import org.apache.paimon.CoreOptions.SortEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
+import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
 import org.apache.paimon.mergetree.compact.MergeFunction;
 import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
 import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
-import org.apache.paimon.mergetree.compact.SortMergeReader;
 import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.utils.IOUtils;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -47,9 +45,9 @@ public class MergeTreeReaders {
             KeyValueFileReaderFactory readerFactory,
             Comparator<InternalRow> userKeyComparator,
             MergeFunction<KeyValue> mergeFunction,
-            SortEngine sortEngine)
+            MergeSorter mergeSorter)
             throws IOException {
-        List<ConcatRecordReader.ReaderSupplier<KeyValue>> readers = new 
ArrayList<>();
+        List<ReaderSupplier<KeyValue>> readers = new ArrayList<>();
         for (List<SortedRun> section : sections) {
             readers.add(
                     () ->
@@ -58,7 +56,7 @@ public class MergeTreeReaders {
                                     readerFactory,
                                     userKeyComparator,
                                     new 
ReducerMergeFunctionWrapper(mergeFunction),
-                                    sortEngine));
+                                    mergeSorter));
         }
         RecordReader<KeyValue> reader = ConcatRecordReader.create(readers);
         if (dropDelete) {
@@ -67,25 +65,23 @@ public class MergeTreeReaders {
         return reader;
     }
 
-    public static RecordReader<KeyValue> readerForSection(
+    public static <T> RecordReader<T> readerForSection(
             List<SortedRun> section,
             KeyValueFileReaderFactory readerFactory,
             Comparator<InternalRow> userKeyComparator,
-            MergeFunctionWrapper<KeyValue> mergeFunctionWrapper,
-            SortEngine sortEngine)
+            MergeFunctionWrapper<T> mergeFunctionWrapper,
+            MergeSorter mergeSorter)
             throws IOException {
-        List<RecordReader<KeyValue>> readers = readerForSection(section, 
readerFactory);
-        if (readers.size() == 1) {
-            return readers.get(0);
-        } else {
-            return SortMergeReader.createSortMergeReader(
-                    readers, userKeyComparator, mergeFunctionWrapper, 
sortEngine);
+        List<ReaderSupplier<KeyValue>> readers = new ArrayList<>();
+        for (SortedRun run : section) {
+            readers.add(() -> readerForRun(run, readerFactory));
         }
+        return mergeSorter.mergeSort(readers, userKeyComparator, 
mergeFunctionWrapper);
     }
 
     public static RecordReader<KeyValue> readerForRun(
             SortedRun run, KeyValueFileReaderFactory readerFactory) throws 
IOException {
-        List<ConcatRecordReader.ReaderSupplier<KeyValue>> readers = new 
ArrayList<>();
+        List<ReaderSupplier<KeyValue>> readers = new ArrayList<>();
         for (DataFileMeta file : run.files()) {
             readers.add(
                     () ->
@@ -94,19 +90,4 @@ public class MergeTreeReaders {
         }
         return ConcatRecordReader.create(readers);
     }
-
-    public static List<RecordReader<KeyValue>> readerForSection(
-            List<SortedRun> runs, KeyValueFileReaderFactory readerFactory) 
throws IOException {
-        List<RecordReader<KeyValue>> readers = new ArrayList<>();
-        try {
-            for (SortedRun run : runs) {
-                readers.add(readerForRun(run, readerFactory));
-            }
-        } catch (IOException e) {
-            // if one of the readers creating failed, we need to close them 
all.
-            readers.forEach(IOUtils::closeQuietly);
-            throw e;
-        }
-        return readers;
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
index 8ff9d536c..53cc0969e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
@@ -47,7 +47,6 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
-import java.util.Iterator;
 import java.util.List;
 
 /** A {@link WriteBuffer} which stores records in {@link 
BinaryInMemorySortBuffer}. */
@@ -248,49 +247,4 @@ public class SortBufferWriteBuffer implements WriteBuffer {
             currentRow = tmpRow;
         }
     }
-
-    private class RawIterator implements Iterator<KeyValue> {
-        private final MutableObjectIterator<BinaryRow> kvIter;
-        private final KeyValueSerializer current;
-
-        private BinaryRow currentRow;
-        private boolean advanced;
-
-        private RawIterator(MutableObjectIterator<BinaryRow> kvIter) {
-            this.kvIter = kvIter;
-            this.current = new KeyValueSerializer(keyType, valueType);
-            this.currentRow =
-                    new BinaryRow(keyType.getFieldCount() + 2 + 
valueType.getFieldCount());
-            this.advanced = false;
-        }
-
-        @Override
-        public boolean hasNext() {
-            if (!advanced) {
-                advanceNext();
-            }
-            return currentRow != null;
-        }
-
-        @Override
-        public KeyValue next() {
-            if (!hasNext()) {
-                return null;
-            }
-            advanced = false;
-            return current.getReusedKv();
-        }
-
-        private void advanceNext() {
-            try {
-                currentRow = kvIter.next(currentRow);
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-            if (currentRow != null) {
-                current.fromRow(currentRow);
-            }
-            advanced = true;
-        }
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index 332798964..84d884483 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.mergetree.compact;
 
-import org.apache.paimon.CoreOptions.SortEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.codegen.RecordEqualiser;
 import org.apache.paimon.compact.CompactResult;
@@ -27,9 +26,9 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
 import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.MergeTreeReaders;
 import org.apache.paimon.mergetree.SortedRun;
-import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.reader.RecordReaderIterator;
 
 import java.util.ArrayList;
@@ -48,10 +47,10 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
             KeyValueFileWriterFactory writerFactory,
             Comparator<InternalRow> keyComparator,
             MergeFunctionFactory<KeyValue> mfFactory,
-            SortEngine sortEngine,
+            MergeSorter mergeSorter,
             RecordEqualiser valueEqualiser,
             boolean changelogRowDeduplicate) {
-        super(readerFactory, writerFactory, keyComparator, mfFactory, 
sortEngine);
+        super(readerFactory, writerFactory, keyComparator, mfFactory, 
mergeSorter);
         this.valueEqualiser = valueEqualiser;
         this.changelogRowDeduplicate = changelogRowDeduplicate;
     }
@@ -78,15 +77,13 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
         List<ConcatRecordReader.ReaderSupplier<ChangelogResult>> 
sectionReaders = new ArrayList<>();
         for (List<SortedRun> section : sections) {
             sectionReaders.add(
-                    () -> {
-                        List<RecordReader<KeyValue>> runReaders =
-                                MergeTreeReaders.readerForSection(section, 
readerFactory);
-                        return SortMergeReader.createSortMergeReader(
-                                runReaders,
-                                keyComparator,
-                                createMergeWrapper(outputLevel),
-                                sortEngine);
-                    });
+                    () ->
+                            MergeTreeReaders.readerForSection(
+                                    section,
+                                    readerFactory,
+                                    keyComparator,
+                                    createMergeWrapper(outputLevel),
+                                    mergeSorter));
         }
 
         RecordReaderIterator<ChangelogResult> iterator = null;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
index 09c79fe48..2cc02ec1d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
@@ -18,13 +18,13 @@
 
 package org.apache.paimon.mergetree.compact;
 
-import org.apache.paimon.CoreOptions.SortEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.codegen.RecordEqualiser;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
+import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.SortedRun;
 import org.apache.paimon.utils.Preconditions;
 
@@ -43,7 +43,7 @@ public class FullChangelogMergeTreeCompactRewriter extends 
ChangelogMergeTreeRew
             KeyValueFileWriterFactory writerFactory,
             Comparator<InternalRow> keyComparator,
             MergeFunctionFactory<KeyValue> mfFactory,
-            SortEngine sortEngine,
+            MergeSorter mergeSorter,
             RecordEqualiser valueComparator,
             boolean changelogRowDeduplicate) {
         super(
@@ -51,7 +51,7 @@ public class FullChangelogMergeTreeCompactRewriter extends 
ChangelogMergeTreeRew
                 writerFactory,
                 keyComparator,
                 mfFactory,
-                sortEngine,
+                mergeSorter,
                 valueComparator,
                 changelogRowDeduplicate);
         this.maxLevel = maxLevel;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index 07eef93e9..d0c710590 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.mergetree.compact;
 
-import org.apache.paimon.CoreOptions.SortEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.codegen.RecordEqualiser;
 import org.apache.paimon.data.InternalRow;
@@ -26,6 +25,7 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
 import org.apache.paimon.mergetree.LookupLevels;
+import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.SortedRun;
 
 import java.io.IOException;
@@ -47,7 +47,7 @@ public class LookupMergeTreeCompactRewriter extends 
ChangelogMergeTreeRewriter {
             KeyValueFileWriterFactory writerFactory,
             Comparator<InternalRow> keyComparator,
             MergeFunctionFactory<KeyValue> mfFactory,
-            SortEngine sortEngine,
+            MergeSorter mergeSorter,
             RecordEqualiser valueEqualiser,
             boolean changelogRowDeduplicate) {
         super(
@@ -55,7 +55,7 @@ public class LookupMergeTreeCompactRewriter extends 
ChangelogMergeTreeRewriter {
                 writerFactory,
                 keyComparator,
                 mfFactory,
-                sortEngine,
+                mergeSorter,
                 valueEqualiser,
                 changelogRowDeduplicate);
         this.lookupLevels = lookupLevels;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
index a4691bcb4..228a3aa11 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.mergetree.compact;
 
-import org.apache.paimon.CoreOptions.SortEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.compact.CompactResult;
 import org.apache.paimon.data.InternalRow;
@@ -26,6 +25,7 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
 import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.MergeTreeReaders;
 import org.apache.paimon.mergetree.SortedRun;
 import org.apache.paimon.reader.RecordReader;
@@ -41,19 +41,19 @@ public class MergeTreeCompactRewriter extends 
AbstractCompactRewriter {
     protected final KeyValueFileWriterFactory writerFactory;
     protected final Comparator<InternalRow> keyComparator;
     protected final MergeFunctionFactory<KeyValue> mfFactory;
-    protected final SortEngine sortEngine;
+    protected final MergeSorter mergeSorter;
 
     public MergeTreeCompactRewriter(
             KeyValueFileReaderFactory readerFactory,
             KeyValueFileWriterFactory writerFactory,
             Comparator<InternalRow> keyComparator,
             MergeFunctionFactory<KeyValue> mfFactory,
-            SortEngine sortEngine) {
+            MergeSorter mergeSorter) {
         this.readerFactory = readerFactory;
         this.writerFactory = writerFactory;
         this.keyComparator = keyComparator;
         this.mfFactory = mfFactory;
-        this.sortEngine = sortEngine;
+        this.mergeSorter = mergeSorter;
     }
 
     @Override
@@ -73,7 +73,7 @@ public class MergeTreeCompactRewriter extends 
AbstractCompactRewriter {
                         readerFactory,
                         keyComparator,
                         mfFactory.create(),
-                        sortEngine);
+                        mergeSorter);
         writer.write(new RecordReaderIterator<>(sectionsReader));
         writer.close();
         return new CompactResult(extractFilesFromSections(sections), 
writer.result());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
index 32c838879..86e7597a1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
@@ -43,14 +43,11 @@ public class UniversalCompaction implements CompactStrategy 
{
     private final int maxSizeAmp;
     private final int sizeRatio;
     private final int numRunCompactionTrigger;
-    private final int maxSortedRunNum;
 
-    public UniversalCompaction(
-            int maxSizeAmp, int sizeRatio, int numRunCompactionTrigger, int 
maxSortedRunNum) {
+    public UniversalCompaction(int maxSizeAmp, int sizeRatio, int 
numRunCompactionTrigger) {
         this.maxSizeAmp = maxSizeAmp;
         this.sizeRatio = sizeRatio;
         this.numRunCompactionTrigger = numRunCompactionTrigger;
-        this.maxSortedRunNum = maxSortedRunNum;
     }
 
     @Override
@@ -138,7 +135,7 @@ public class UniversalCompaction implements CompactStrategy 
{
         }
 
         if (forcePick || candidateCount > 1) {
-            return createUnit(runs, maxLevel, candidateCount, maxSortedRunNum);
+            return createUnit(runs, maxLevel, candidateCount);
         }
 
         return null;
@@ -153,12 +150,8 @@ public class UniversalCompaction implements 
CompactStrategy {
     }
 
     @VisibleForTesting
-    static CompactUnit createUnit(
-            List<LevelSortedRun> runs, int maxLevel, int runCount, int 
maxSortedRunNum) {
+    static CompactUnit createUnit(List<LevelSortedRun> runs, int maxLevel, int 
runCount) {
         int outputLevel;
-        if (runCount > maxSortedRunNum) {
-            runCount = maxSortedRunNum;
-        }
         if (runCount == runs.size()) {
             outputLevel = maxLevel;
         } else {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
index 29a298bb0..fdfb3bb6d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
@@ -18,15 +18,17 @@
 
 package org.apache.paimon.operation;
 
-import org.apache.paimon.CoreOptions.SortEngine;
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.KeyValueFileStore;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.mergetree.DropDeleteReader;
+import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.MergeTreeReaders;
 import org.apache.paimon.mergetree.SortedRun;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
@@ -35,7 +37,6 @@ import 
org.apache.paimon.mergetree.compact.MergeFunctionFactory;
 import 
org.apache.paimon.mergetree.compact.MergeFunctionFactory.AdjustedProjection;
 import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
 import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
-import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
@@ -56,7 +57,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static org.apache.paimon.CoreOptions.SORT_ENGINE;
 import static org.apache.paimon.io.DataFilePathFactory.CHANGELOG_FILE_PREFIX;
 import static org.apache.paimon.predicate.PredicateBuilder.containsFields;
 import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
@@ -69,7 +69,7 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
     private final Comparator<InternalRow> keyComparator;
     private final MergeFunctionFactory<KeyValue> mfFactory;
     private final boolean valueCountMode;
-    private final SortEngine sortEngine;
+    private final MergeSorter mergeSorter;
 
     @Nullable private int[][] keyProjectedFields;
 
@@ -105,7 +105,9 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
         this.keyComparator = keyComparator;
         this.mfFactory = mfFactory;
         this.valueCountMode = tableSchema.trimmedPrimaryKeys().isEmpty();
-        this.sortEngine = 
Options.fromMap(tableSchema.options()).get(SORT_ENGINE);
+        this.mergeSorter =
+                new MergeSorter(
+                        CoreOptions.fromMap(tableSchema.options()), keyType, 
valueType, null);
     }
 
     public KeyValueFileStoreRead withKeyProjection(int[][] projectedFields) {
@@ -124,6 +126,11 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
         return this;
     }
 
+    public KeyValueFileStoreRead withIOManager(IOManager ioManager) {
+        this.mergeSorter.setIOManager(ioManager);
+        return this;
+    }
+
     @Override
     public FileStoreRead<KeyValue> withFilter(Predicate predicate) {
         List<Predicate> allFilters = new ArrayList<>();
@@ -218,7 +225,7 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
                                                 : nonOverlappedSectionFactory,
                                         keyComparator,
                                         mergeFuncWrapper,
-                                        sortEngine));
+                                        mergeSorter));
             }
             DropDeleteReader reader =
                     new 
DropDeleteReader(ConcatRecordReader.create(sectionReaders));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 1cb311633..01cf7b526 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -37,6 +37,7 @@ import org.apache.paimon.io.KeyValueFileWriterFactory;
 import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
 import org.apache.paimon.mergetree.Levels;
 import org.apache.paimon.mergetree.LookupLevels;
+import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.MergeTreeWriter;
 import org.apache.paimon.mergetree.compact.CompactRewriter;
 import org.apache.paimon.mergetree.compact.CompactStrategy;
@@ -153,8 +154,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 new UniversalCompaction(
                         options.maxSizeAmplificationPercent(),
                         options.sortedRunSizeRatio(),
-                        options.numSortedRunCompactionTrigger(),
-                        options.maxSortedRunNum());
+                        options.numSortedRunCompactionTrigger());
         CompactStrategy compactStrategy =
                 options.changelogProducer() == ChangelogProducer.LOOKUP
                         ? new LookupCompaction(universalCompaction)
@@ -211,6 +211,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                         bucket,
                         options.fileCompressionPerLevel(),
                         options.fileCompression());
+        MergeSorter mergeSorter = new MergeSorter(options, keyType, valueType, 
ioManager);
         switch (options.changelogProducer()) {
             case FULL_COMPACTION:
                 return new FullChangelogMergeTreeCompactRewriter(
@@ -219,7 +220,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                         writerFactory,
                         keyComparator,
                         mfFactory,
-                        options.sortEngine(),
+                        mergeSorter,
                         valueEqualiserSupplier.get(),
                         options.changelogRowDeduplicate());
             case LOOKUP:
@@ -230,16 +231,12 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                         writerFactory,
                         keyComparator,
                         mfFactory,
-                        options.sortEngine(),
+                        mergeSorter,
                         valueEqualiserSupplier.get(),
                         options.changelogRowDeduplicate());
             default:
                 return new MergeTreeCompactRewriter(
-                        readerFactory,
-                        writerFactory,
-                        keyComparator,
-                        mfFactory,
-                        options.sortEngine());
+                        readerFactory, writerFactory, keyComparator, 
mfFactory, mergeSorter);
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
index 4acd6be33..bf4cf8800 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
@@ -58,7 +58,9 @@ public class BatchWriteBuilderImpl implements 
BatchWriteBuilder {
 
     @Override
     public BatchTableWrite newWrite() {
-        return 
table.newWrite(commitUser).withIgnorePreviousFiles(staticPartition != null);
+        return table.newWrite(commitUser)
+                .withIgnorePreviousFiles(staticPartition != null)
+                .isStreamingMode(false);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index ec861d1ce..a838806ee 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source;
 
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.operation.KeyValueFileStoreRead;
 import org.apache.paimon.reader.RecordReader;
 
@@ -39,6 +40,12 @@ public abstract class KeyValueTableRead implements 
InnerTableRead {
         this.read = read;
     }
 
+    @Override
+    public TableRead withIOManager(IOManager ioManager) {
+        read.withIOManager(ioManager);
+        return this;
+    }
+
     @Override
     public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
         return new RowDataRecordReader(read.createReader((DataSplit) split));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
index f496e574a..8ba6eed3b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source;
 
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.operation.FileStoreRead;
 import org.apache.paimon.reader.RecordReader;
@@ -36,6 +37,10 @@ import java.util.List;
 @Public
 public interface TableRead {
 
+    default TableRead withIOManager(IOManager ioManager) {
+        return this;
+    }
+
     RecordReader<InternalRow> createReader(Split split) throws IOException;
 
     default RecordReader<InternalRow> createReader(List<Split> splits) throws 
IOException {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeSorterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeSorterTest.java
new file mode 100644
index 000000000..b13eea843
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeSorterTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
+import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.IteratorRecordReader;
+
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link MergeSorter}. */
+public class MergeSorterTest {
+
+    private static final int MEMORY_SIZE = 1024 * 1024 * 32;
+
+    private final RowType keyType = RowType.builder().field("k", 
DataTypes.INT()).build();
+
+    private final RowType valueType = RowType.builder().field("v", 
DataTypes.INT()).build();
+
+    @TempDir Path tempDir;
+
+    private IOManager ioManager;
+    private MergeSorter sorter;
+    private int totalPages;
+
+    @BeforeEach
+    public void beforeTest() {
+        ioManager = IOManager.create(tempDir.toString());
+        Options options = new Options();
+        options.set(CoreOptions.SORT_SPILL_BUFFER_SIZE, new 
MemorySize(MEMORY_SIZE));
+        sorter = new MergeSorter(new CoreOptions(options), keyType, valueType, 
ioManager);
+        totalPages = sorter.memoryPool().freePages();
+    }
+
+    @AfterEach
+    public void afterTest() throws Exception {
+        assertThat(sorter.memoryPool().freePages()).isEqualTo(totalPages);
+        List<File> files =
+                Files.walk(tempDir)
+                        .map(Path::toFile)
+                        .filter(f -> !f.isDirectory())
+                        .collect(Collectors.toList());
+        assertThat(files).isEmpty();
+        this.ioManager.close();
+    }
+
+    @Test
+    public void testSortAndMerge() throws Exception {
+        List<ReaderSupplier<KeyValue>> readers = new ArrayList<>();
+        Random rnd = new Random();
+        List<KeyValue> expectedKvs = new ArrayList<>();
+        Set<Long> distinctSeq = new HashSet<>();
+        for (int i = 0; i < 10; i++) {
+            List<KeyValue> kvs = new ArrayList<>();
+            for (int j = 0; j < 100; j++) {
+                long seq = rnd.nextLong();
+                while (distinctSeq.contains(seq)) {
+                    rnd.nextLong();
+                }
+                distinctSeq.add(seq);
+                kvs.add(
+                        new KeyValue()
+                                .replace(
+                                        GenericRow.of(rnd.nextInt(100)),
+                                        seq,
+                                        RowKind.fromByteValue((byte) 
rnd.nextInt(4)),
+                                        GenericRow.of(rnd.nextInt()))
+                                .setLevel(rnd.nextInt(100)));
+            }
+            expectedKvs.addAll(kvs);
+            readers.add(() -> new IteratorRecordReader<>(kvs.iterator()));
+        }
+
+        expectedKvs.sort(
+                Comparator.comparingInt((KeyValue o) -> o.key().getInt(0))
+                        .thenComparingLong(KeyValue::sequenceNumber));
+
+        MergeFunctionWrapper<List<KeyValue>> collectFunc =
+                new MergeFunctionWrapper<List<KeyValue>>() {
+
+                    private List<KeyValue> result;
+
+                    @Override
+                    public void reset() {
+                        result = new ArrayList<>();
+                    }
+
+                    @Override
+                    public void add(KeyValue kv) {
+                        result.add(kv);
+                    }
+
+                    @Nullable
+                    @Override
+                    public List<KeyValue> getResult() {
+                        return result;
+                    }
+                };
+        List<KeyValue> all = new ArrayList<>();
+        sorter.mergeSort(readers, Comparator.comparingInt(o -> o.getInt(0)), 
collectFunc)
+                .forEachRemaining(all::addAll);
+
+        
assertThat(toString(all)).containsExactlyElementsOf(toString(expectedKvs));
+    }
+
+    private List<String> toString(List<KeyValue> kvs) {
+        return kvs.stream().map(kv -> kv.toString(keyType, 
valueType)).collect(Collectors.toList());
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 7cc4c79c7..fb4533a27 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -287,8 +287,7 @@ public abstract class MergeTreeTestBase {
                         new UniversalCompaction(
                                 options.maxSizeAmplificationPercent(),
                                 options.sortedRunSizeRatio(),
-                                options.numSortedRunCompactionTrigger(),
-                                options.maxSortedRunNum()),
+                                options.numSortedRunCompactionTrigger()),
                         comparator,
                         options.targetFileSize(),
                         options.numSortedRunStopTrigger(),
@@ -448,8 +447,7 @@ public abstract class MergeTreeTestBase {
                 new UniversalCompaction(
                         options.maxSizeAmplificationPercent(),
                         options.sortedRunSizeRatio(),
-                        options.numSortedRunCompactionTrigger(),
-                        options.maxSortedRunNum());
+                        options.numSortedRunCompactionTrigger());
         return new MergeTreeCompactManager(
                 compactExecutor,
                 new Levels(comparator, files, options.numLevels()),
@@ -562,7 +560,7 @@ public abstract class MergeTreeTestBase {
                         readerFactory,
                         comparator,
                         DeduplicateMergeFunction.factory().create(),
-                        sortEngine);
+                        new MergeSorter(options, null, null, null));
         List<TestRecord> records = new ArrayList<>();
         try (RecordReaderIterator<KeyValue> iterator = new 
RecordReaderIterator<>(reader)) {
             while (iterator.hasNext()) {
@@ -608,7 +606,7 @@ public abstract class MergeTreeTestBase {
                             compactReaderFactory,
                             comparator,
                             DeduplicateMergeFunction.factory().create(),
-                            sortEngine);
+                            new MergeSorter(options, null, null, null));
             writer.write(new RecordReaderIterator<>(sectionsReader));
             writer.close();
             return new CompactResult(extractFilesFromSections(sections), 
writer.result());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ForceUpLevel0CompactionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ForceUpLevel0CompactionTest.java
index 8e64ef5c3..9c8fac85b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ForceUpLevel0CompactionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ForceUpLevel0CompactionTest.java
@@ -37,7 +37,7 @@ public class ForceUpLevel0CompactionTest {
     @Test
     public void testForceCompaction0() {
         ForceUpLevel0Compaction compaction =
-                new ForceUpLevel0Compaction(new UniversalCompaction(200, 1, 5, 
Integer.MAX_VALUE));
+                new ForceUpLevel0Compaction(new UniversalCompaction(200, 1, 
5));
 
         Optional<CompactUnit> result = compaction.pick(3, Arrays.asList(run(0, 
1), run(0, 1)));
         assertThat(result).isPresent();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
index ef865f098..d549954a7 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
@@ -39,21 +39,16 @@ public class UniversalCompactionTest {
 
     @Test
     public void testOutputLevel() {
-        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 1, 
Integer.MAX_VALUE).outputLevel())
-                .isEqualTo(1);
-        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 2, 
Integer.MAX_VALUE).outputLevel())
-                .isEqualTo(1);
-        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 3, 
Integer.MAX_VALUE).outputLevel())
-                .isEqualTo(2);
-        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 4, 
Integer.MAX_VALUE).outputLevel())
-                .isEqualTo(3);
-        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 5, 
Integer.MAX_VALUE).outputLevel())
-                .isEqualTo(5);
+        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 
1).outputLevel()).isEqualTo(1);
+        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 
2).outputLevel()).isEqualTo(1);
+        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 
3).outputLevel()).isEqualTo(2);
+        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 
4).outputLevel()).isEqualTo(3);
+        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 
5).outputLevel()).isEqualTo(5);
     }
 
     @Test
     public void testPick() {
-        UniversalCompaction compaction = new UniversalCompaction(25, 1, 3, 
Integer.MAX_VALUE);
+        UniversalCompaction compaction = new UniversalCompaction(25, 1, 3);
 
         // by size amplification
         Optional<CompactUnit> pick = compaction.pick(3, level0(1, 2, 3, 3));
@@ -79,37 +74,9 @@ public class UniversalCompactionTest {
         assertThat(results).isEqualTo(new long[] {1, 2, 3});
     }
 
-    @Test
-    public void testPickWithMaxSortedRunNum() {
-        UniversalCompaction compaction = new UniversalCompaction(25, 1, 3, 2);
-
-        // by size amplification
-        Optional<CompactUnit> pick = compaction.pick(3, level0(1, 2, 3, 3));
-        assertThat(pick.isPresent()).isTrue();
-        long[] results = 
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
-        assertThat(results).isEqualTo(new long[] {1, 2, 3, 3});
-
-        // by size ratio
-        pick =
-                compaction.pick(
-                        4, Arrays.asList(level(0, 1), level(1, 1), level(2, 
1), level(3, 50)));
-        assertThat(pick.isPresent()).isTrue();
-        results = 
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
-        assertThat(results).isEqualTo(new long[] {1, 1});
-
-        // by file num
-        pick =
-                compaction.pick(
-                        4, Arrays.asList(level(0, 1), level(1, 2), level(2, 
3), level(3, 50)));
-        assertThat(pick.isPresent()).isTrue();
-        results = 
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
-        // 3 should be in the candidate, by size ratio after picking by file 
num
-        assertThat(results).isEqualTo(new long[] {1, 2});
-    }
-
     @Test
     public void testNoOutputLevel0() {
-        UniversalCompaction compaction = new UniversalCompaction(25, 1, 3, 2);
+        UniversalCompaction compaction = new UniversalCompaction(25, 1, 3);
 
         Optional<CompactUnit> pick =
                 compaction.pick(
@@ -129,7 +96,7 @@ public class UniversalCompactionTest {
 
     @Test
     public void testSizeAmplification() {
-        UniversalCompaction compaction = new UniversalCompaction(25, 0, 1, 
Integer.MAX_VALUE);
+        UniversalCompaction compaction = new UniversalCompaction(25, 0, 1);
         long[] sizes = new long[] {1};
         sizes = appendAndPickForSizeAmp(compaction, sizes);
         assertThat(sizes).isEqualTo(new long[] {2});
@@ -169,7 +136,7 @@ public class UniversalCompactionTest {
 
     @Test
     public void testSizeRatio() {
-        UniversalCompaction compaction = new UniversalCompaction(25, 1, 5, 
Integer.MAX_VALUE);
+        UniversalCompaction compaction = new UniversalCompaction(25, 1, 5);
         long[] sizes = new long[] {1, 1, 1, 1};
         sizes = appendAndPickForSizeRatio(compaction, sizes);
         assertThat(sizes).isEqualTo(new long[] {5});
@@ -222,9 +189,9 @@ public class UniversalCompactionTest {
     @Test
     public void testSizeRatioThreshold() {
         long[] sizes = new long[] {8, 9, 10};
-        assertThat(pickForSizeRatio(new UniversalCompaction(25, 10, 2, 
Integer.MAX_VALUE), sizes))
+        assertThat(pickForSizeRatio(new UniversalCompaction(25, 10, 2), sizes))
                 .isEqualTo(new long[] {8, 9, 10});
-        assertThat(pickForSizeRatio(new UniversalCompaction(25, 20, 2, 
Integer.MAX_VALUE), sizes))
+        assertThat(pickForSizeRatio(new UniversalCompaction(25, 20, 2), sizes))
                 .isEqualTo(new long[] {27});
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
index 33878452e..46e740333 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
@@ -25,6 +25,7 @@ import org.apache.paimon.WriteMode;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManagerImpl;
 import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.operation.ScanKind;
@@ -358,13 +359,26 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
 
     @Test
     public void testStreamingFullChangelog() throws Exception {
+        innerTestStreamingFullChangelog(options -> {});
+    }
+
+    @Test
+    public void testStreamingFullChangelogWithSpill() throws Exception {
+        innerTestStreamingFullChangelog(
+                options -> options.set(CoreOptions.SORT_SPILL_THRESHOLD, 2));
+    }
+
+    private void innerTestStreamingFullChangelog(Consumer<Options> configure) 
throws Exception {
         FileStoreTable table =
                 createFileStoreTable(
-                        conf ->
-                                conf.set(
-                                        CoreOptions.CHANGELOG_PRODUCER,
-                                        ChangelogProducer.FULL_COMPACTION));
-        StreamTableWrite write = table.newWrite(commitUser);
+                        conf -> {
+                            conf.set(
+                                    CoreOptions.CHANGELOG_PRODUCER,
+                                    ChangelogProducer.FULL_COMPACTION);
+                            configure.accept(conf);
+                        });
+        StreamTableWrite write =
+                table.newWrite(commitUser).withIOManager(new 
IOManagerImpl(tempDir.toString()));
         StreamTableCommit commit = table.newCommit(commitUser);
 
         write.write(rowData(1, 10, 110L));
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
index bbc493a19..aec2faa35 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.flink.source;
 
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.IOManagerImpl;
 import org.apache.paimon.table.source.ReadBuilder;
 
 import org.apache.flink.api.connector.source.Source;
@@ -25,10 +27,13 @@ import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.table.data.RowData;
 
 import javax.annotation.Nullable;
 
+import static org.apache.paimon.disk.IOManagerImpl.splitPaths;
+
 /** A Flink {@link Source} for paimon. */
 public abstract class FlinkSource
         implements Source<RowData, FileStoreSourceSplit, 
PendingSplitsCheckpoint> {
@@ -46,7 +51,10 @@ public abstract class FlinkSource
 
     @Override
     public SourceReader<RowData, FileStoreSourceSplit> 
createReader(SourceReaderContext context) {
-        return new FileStoreSourceReader(context, readBuilder.newRead(), 
limit);
+        IOManager ioManager =
+                new 
IOManagerImpl(splitPaths(context.getConfiguration().get(CoreOptions.TMP_DIRS)));
+        return new FileStoreSourceReader(
+                context, readBuilder.newRead().withIOManager(ioManager), 
limit);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
index f33e31b5f..d6834ee70 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.source.operator;
 
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManagerImpl;
 import org.apache.paimon.flink.FlinkRowData;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
@@ -53,7 +54,13 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
     @Override
     public void open() throws Exception {
         super.open();
-        this.read = readBuilder.newRead();
+        IOManagerImpl ioManager =
+                new IOManagerImpl(
+                        getContainingTask()
+                                .getEnvironment()
+                                .getIOManager()
+                                .getSpillingDirectoriesPaths());
+        this.read = readBuilder.newRead().withIOManager(ioManager);
         this.reuseRow = new FlinkRowData(null);
         this.reuseRecord = new StreamRecord<>(reuseRow);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 88eb01619..a5eb105e5 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -146,4 +146,18 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
                 .hasRootCauseInstanceOf(IllegalArgumentException.class)
                 .hasRootCauseMessage("Tag 'unknown' doesn't exist.");
     }
+
+    @Test
+    public void testSortSpillMerge() {
+        sql(
+                "CREATE TABLE IF NOT EXISTS KT (a INT PRIMARY KEY NOT 
ENFORCED, b INT) WITH ('sort-spill-threshold'='2')");
+        sql("INSERT INTO KT VALUES (1, 1)");
+        sql("INSERT INTO KT VALUES (1, 2)");
+        sql("INSERT INTO KT VALUES (1, 3)");
+        sql("INSERT INTO KT VALUES (1, 4)");
+        sql("INSERT INTO KT VALUES (1, 5)");
+        sql("INSERT INTO KT VALUES (1, 6)");
+        sql("INSERT INTO KT VALUES (1, 7)");
+        assertThat(sql("SELECT * FROM 
KT")).containsExactlyInAnyOrder(Row.of(1, 7));
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java
index abb8c336c..64c74c5f8 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java
@@ -29,6 +29,8 @@ import 
org.apache.spark.sql.connector.read.PartitionReaderFactory;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 
+import static org.apache.paimon.spark.SparkUtils.createIOManager;
+
 /** A Spark {@link PartitionReaderFactory} for paimon. */
 public class SparkReaderFactory implements PartitionReaderFactory {
 
@@ -45,7 +47,11 @@ public class SparkReaderFactory implements 
PartitionReaderFactory {
             InputPartition partition) {
         RecordReader<InternalRow> reader;
         try {
-            reader = readBuilder.newRead().createReader(((SparkInputPartition) 
partition).split());
+            reader =
+                    readBuilder
+                            .newRead()
+                            .withIOManager(createIOManager())
+                            .createReader(((SparkInputPartition) 
partition).split());
         } catch (IOException e) {
             throw new UncheckedIOException(e);
         }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkUtils.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkUtils.java
new file mode 100644
index 000000000..2a293261f
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.IOManagerImpl;
+
+import org.apache.spark.SparkEnv;
+
+/** Utils for Spark. */
+public class SparkUtils {
+
+    public static IOManager createIOManager() {
+        String[] localDirs = 
SparkEnv.get().blockManager().diskBlockManager().localDirsString();
+        return new IOManagerImpl(localDirs);
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 83d1a238a..06c9f0660 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -20,6 +20,7 @@ package org.apache.paimon.spark.commands
 import org.apache.paimon.data.BinaryRow
 import org.apache.paimon.index.PartitionIndex
 import org.apache.paimon.spark.SparkRow
+import org.apache.paimon.spark.SparkUtils.createIOManager
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.table.sink.{BatchWriteBuilder, 
CommitMessageSerializer, DynamicBucketRow, InnerTableCommit, 
RowPartitionKeyExtractor}
 import org.apache.paimon.types.RowType
@@ -94,6 +95,7 @@ case class WriteIntoPaimonTable(table: FileStoreTable, 
overwrite: Boolean, data:
         .mapPartitions {
           iter =>
             val write = writeBuilder.newWrite()
+            write.withIOManager(createIOManager)
             try {
               iter.foreach {
                 row =>
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
index 8ee592326..e214adb5f 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
@@ -79,6 +79,14 @@ public class SparkWriteITCase {
         innerSimpleWrite();
     }
 
+    @Test
+    public void testSortSpill() {
+        spark.sql(
+                "CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES"
+                        + " ('primary-key'='a', 'bucket'='4', 
'file.format'='avro', 'sort-spill-threshold'='1')");
+        innerSimpleWrite();
+    }
+
     private void innerSimpleWrite() {
         spark.sql("INSERT INTO T VALUES (1, 2, '3')").collectAsList();
         List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();

Reply via email to