This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 10a982e1c [core] Optimize MergeSorter to avoid spill all data (#3798)
10a982e1c is described below

commit 10a982e1c870a2a34986b78e407fe6ca2dc93f17
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jul 24 10:43:59 2024 +0800

    [core] Optimize MergeSorter to avoid spill all data (#3798)
---
 .../org/apache/paimon/reader/ReaderSupplier.java   |  20 +-
 .../apache/paimon/reader/SizedReaderSupplier.java  |  18 +-
 .../src/main/java/org/apache/paimon/KeyValue.java  |  11 +
 .../org/apache/paimon/disk/ChannelWithMeta.java    |  12 +-
 .../paimon/disk/ChannelWriterOutputView.java       |   7 +-
 .../org/apache/paimon/disk/ExternalBuffer.java     |   1 -
 .../org/apache/paimon/mergetree/MergeSorter.java   | 300 +++++++++------------
 .../apache/paimon/mergetree/MergeTreeReaders.java  |  18 +-
 .../mergetree/compact/ConcatRecordReader.java      |  12 +-
 .../paimon/operation/MergeFileSplitRead.java       |   2 +-
 .../apache/paimon/operation/RawFileSplitRead.java  |   3 +-
 .../org/apache/paimon/schema/SystemColumns.java    |   1 +
 .../paimon/sort/AbstractBinaryExternalMerger.java  |   5 +-
 .../paimon/sort/BinaryExternalSortBuffer.java      |   7 +-
 .../org/apache/paimon/table/source/TableRead.java  |   3 +-
 .../source/splitread/IncrementalDiffSplitRead.java |   2 +-
 .../KeyValueWithLevelNoReusingSerializer.java      |  60 +++++
 .../apache/paimon/mergetree/MergeSorterTest.java   |  20 +-
 .../mergetree/compact/ConcatRecordReaderTest.java  |   5 +-
 .../paimon/operation/OrphanFilesCleanTest.java     |   5 +-
 .../paimon/table/FileStoreTableTestBase.java       |   2 +-
 .../paimon/table/SchemaEvolutionTableTestBase.java |   3 +-
 .../table/source/snapshot/ScannerTestBase.java     |   3 +-
 .../paimon/flink/lookup/LookupStreamingReader.java |   3 +-
 24 files changed, 264 insertions(+), 259 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java 
b/paimon-common/src/main/java/org/apache/paimon/reader/ReaderSupplier.java
similarity index 59%
copy from paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java
copy to paimon-common/src/main/java/org/apache/paimon/reader/ReaderSupplier.java
index f45960706..4664f5aea 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java
+++ b/paimon-common/src/main/java/org/apache/paimon/reader/ReaderSupplier.java
@@ -16,20 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.schema;
+package org.apache.paimon.reader;
 
-import java.util.Arrays;
-import java.util.List;
+import java.io.IOException;
 
-/** System columns for key value store. */
-public class SystemColumns {
-
-    /** System field names. */
-    public static final String KEY_FIELD_PREFIX = "_KEY_";
-
-    public static final String VALUE_COUNT = "_VALUE_COUNT";
-    public static final String SEQUENCE_NUMBER = "_SEQUENCE_NUMBER";
-    public static final String VALUE_KIND = "_VALUE_KIND";
-    public static final List<String> SYSTEM_FIELD_NAMES =
-            Arrays.asList(VALUE_COUNT, SEQUENCE_NUMBER, VALUE_KIND);
+/** Supplier to get {@link RecordReader}. */
+@FunctionalInterface
+public interface ReaderSupplier<T> {
+    RecordReader<T> get() throws IOException;
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java 
b/paimon-common/src/main/java/org/apache/paimon/reader/SizedReaderSupplier.java
similarity index 59%
copy from paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java
copy to 
paimon-common/src/main/java/org/apache/paimon/reader/SizedReaderSupplier.java
index f45960706..e2dfd2864 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/reader/SizedReaderSupplier.java
@@ -16,20 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.schema;
+package org.apache.paimon.reader;
 
-import java.util.Arrays;
-import java.util.List;
+/** Supplier to get {@link RecordReader} with size. */
+public interface SizedReaderSupplier<T> extends ReaderSupplier<T> {
 
-/** System columns for key value store. */
-public class SystemColumns {
-
-    /** System field names. */
-    public static final String KEY_FIELD_PREFIX = "_KEY_";
-
-    public static final String VALUE_COUNT = "_VALUE_COUNT";
-    public static final String SEQUENCE_NUMBER = "_SEQUENCE_NUMBER";
-    public static final String VALUE_KIND = "_VALUE_KIND";
-    public static final List<String> SYSTEM_FIELD_NAMES =
-            Arrays.asList(VALUE_COUNT, SEQUENCE_NUMBER, VALUE_KIND);
+    long estimateSize();
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValue.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
index bbab367e4..31d3749bf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TinyIntType;
@@ -33,6 +34,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.paimon.schema.SystemColumns.LEVEL;
 import static org.apache.paimon.schema.SystemColumns.SEQUENCE_NUMBER;
 import static org.apache.paimon.schema.SystemColumns.VALUE_KIND;
 import static org.apache.paimon.utils.Preconditions.checkState;
@@ -120,6 +122,15 @@ public class KeyValue {
         return new RowType(fields);
     }
 
+    public static RowType schemaWithLevel(RowType keyType, RowType valueType) {
+        RowType.Builder builder = RowType.builder();
+        schema(keyType, valueType)
+                .getFields()
+                .forEach(f -> builder.field(f.name(), f.type(), 
f.description()));
+        builder.field(LEVEL, DataTypes.INT().notNull());
+        return builder.build();
+    }
+
     /**
      * Create key-value fields, we need to add a const value to the id of 
value field to ensure that
      * they are consistent when compared by field id. For example, there are 
two table with key
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWithMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWithMeta.java
index a3cb7b0e3..5c61cc44c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWithMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWithMeta.java
@@ -23,17 +23,11 @@ public class ChannelWithMeta {
 
     private final FileIOChannel.ID channel;
     private final int blockCount;
-    private final int numBytesInLastBlock;
     private final long numBytes;
 
-    public ChannelWithMeta(
-            FileIOChannel.ID channel,
-            int blockCount,
-            int numBytesInLastBlock,
-            long numEstimatedBytes) {
+    public ChannelWithMeta(FileIOChannel.ID channel, int blockCount, long 
numEstimatedBytes) {
         this.channel = channel;
         this.blockCount = blockCount;
-        this.numBytesInLastBlock = numBytesInLastBlock;
         this.numBytes = numEstimatedBytes;
     }
 
@@ -45,10 +39,6 @@ public class ChannelWithMeta {
         return blockCount;
     }
 
-    public int getNumBytesInLastBlock() {
-        return numBytesInLastBlock;
-    }
-
     public long getNumBytes() {
         return numBytes;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java
index 6a1838e9b..c2d3cc205 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java
@@ -25,6 +25,7 @@ import org.apache.paimon.io.DataOutputView;
 import org.apache.paimon.memory.Buffer;
 import org.apache.paimon.memory.MemorySegment;
 
+import java.io.Closeable;
 import java.io.IOException;
 
 /**
@@ -32,7 +33,7 @@ import java.io.IOException;
  * output stream. The view will compress its data before writing it in blocks 
to the underlying
  * channel.
  */
-public final class ChannelWriterOutputView extends AbstractPagedOutputView {
+public final class ChannelWriterOutputView extends AbstractPagedOutputView 
implements Closeable {
 
     private final MemorySegment compressedBuffer;
     private final BlockCompressor compressor;
@@ -60,7 +61,8 @@ public final class ChannelWriterOutputView extends 
AbstractPagedOutputView {
         return writer;
     }
 
-    public int close() throws IOException {
+    @Override
+    public void close() throws IOException {
         if (!writer.isClosed()) {
             int currentPositionInSegment = getCurrentPositionInSegment();
             writeCompressed(currentSegment, currentPositionInSegment);
@@ -68,7 +70,6 @@ public final class ChannelWriterOutputView extends 
AbstractPagedOutputView {
             this.writeBytes = writer.getSize();
             this.writer.close();
         }
-        return -1;
     }
 
     public void closeAndDelete() throws IOException {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
index 529d63d0c..34c082371 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
@@ -188,7 +188,6 @@ public class ExternalBuffer implements RowBuffer {
                 new ChannelWithMeta(
                         channel,
                         inMemoryBuffer.getNumRecordBuffers(),
-                        inMemoryBuffer.getNumBytesInLastBuffer(),
                         channelWriterOutputView.getNumBytes()));
 
         inMemoryBuffer.reset();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
index 420613899..f02cdaa3e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
@@ -21,31 +21,30 @@ package org.apache.paimon.mergetree;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.SortEngine;
 import org.apache.paimon.KeyValue;
+import org.apache.paimon.compression.BlockCompressionFactory;
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.disk.ChannelReaderInputView;
+import org.apache.paimon.disk.ChannelReaderInputViewIterator;
+import org.apache.paimon.disk.ChannelWithMeta;
+import org.apache.paimon.disk.ChannelWriterOutputView;
+import org.apache.paimon.disk.FileChannelUtil;
+import org.apache.paimon.disk.FileIOChannel;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.memory.CachelessSegmentPool;
 import org.apache.paimon.memory.MemorySegmentPool;
-import org.apache.paimon.mergetree.compact.ConcatRecordReader;
-import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
 import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
 import org.apache.paimon.mergetree.compact.SortMergeReader;
 import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.reader.ReaderSupplier;
 import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.sort.BinaryExternalSortBuffer;
-import org.apache.paimon.sort.SortBuffer;
-import org.apache.paimon.types.BigIntType;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.IntType;
-import org.apache.paimon.types.RowKind;
+import org.apache.paimon.reader.RecordReader.RecordIterator;
+import org.apache.paimon.reader.SizedReaderSupplier;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.TinyIntType;
 import org.apache.paimon.utils.FieldsComparator;
 import org.apache.paimon.utils.IOUtils;
-import org.apache.paimon.utils.MutableObjectIterator;
-import org.apache.paimon.utils.OffsetRow;
+import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer;
 
 import javax.annotation.Nullable;
 
@@ -53,10 +52,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
-import java.util.stream.IntStream;
 
-import static org.apache.paimon.schema.SystemColumns.SEQUENCE_NUMBER;
-import static org.apache.paimon.schema.SystemColumns.VALUE_KIND;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** The merge sorter to sort and merge readers with key overlap. */
 public class MergeSorter {
@@ -66,9 +63,7 @@ public class MergeSorter {
 
     private final SortEngine sortEngine;
     private final int spillThreshold;
-    private final int spillSortMaxNumFiles;
     private final String compression;
-    private final MemorySize maxDiskSize;
 
     private final MemorySegmentPool memoryPool;
 
@@ -81,14 +76,12 @@ public class MergeSorter {
             @Nullable IOManager ioManager) {
         this.sortEngine = options.sortEngine();
         this.spillThreshold = options.sortSpillThreshold();
-        this.spillSortMaxNumFiles = options.localSortMaxNumFileHandles();
         this.compression = options.spillCompression();
         this.keyType = keyType;
         this.valueType = valueType;
         this.memoryPool =
                 new CachelessSegmentPool(options.sortSpillBufferSize(), 
options.pageSize());
         this.ioManager = ioManager;
-        this.maxDiskSize = options.writeBufferSpillDiskSize();
     }
 
     public MemorySegmentPool memoryPool() {
@@ -108,7 +101,7 @@ public class MergeSorter {
     }
 
     public <T> RecordReader<T> mergeSort(
-            List<ReaderSupplier<KeyValue>> lazyReaders,
+            List<SizedReaderSupplier<KeyValue>> lazyReaders,
             Comparator<InternalRow> keyComparator,
             @Nullable FieldsComparator userDefinedSeqComparator,
             MergeFunctionWrapper<T> mergeFunction)
@@ -118,6 +111,16 @@ public class MergeSorter {
                     lazyReaders, keyComparator, userDefinedSeqComparator, 
mergeFunction);
         }
 
+        return mergeSortNoSpill(
+                lazyReaders, keyComparator, userDefinedSeqComparator, 
mergeFunction);
+    }
+
+    public <T> RecordReader<T> mergeSortNoSpill(
+            List<? extends ReaderSupplier<KeyValue>> lazyReaders,
+            Comparator<InternalRow> keyComparator,
+            @Nullable FieldsComparator userDefinedSeqComparator,
+            MergeFunctionWrapper<T> mergeFunction)
+            throws IOException {
         List<RecordReader<KeyValue>> readers = new 
ArrayList<>(lazyReaders.size());
         for (ReaderSupplier<KeyValue> supplier : lazyReaders) {
             try {
@@ -134,192 +137,129 @@ public class MergeSorter {
     }
 
     private <T> RecordReader<T> spillMergeSort(
-            List<ReaderSupplier<KeyValue>> readers,
+            List<SizedReaderSupplier<KeyValue>> inputReaders,
             Comparator<InternalRow> keyComparator,
             @Nullable FieldsComparator userDefinedSeqComparator,
             MergeFunctionWrapper<T> mergeFunction)
             throws IOException {
-        ExternalSorterWithLevel sorter = new 
ExternalSorterWithLevel(userDefinedSeqComparator);
-        ConcatRecordReader.create(readers).forIOEachRemaining(sorter::put);
-        sorter.flushMemory();
-
-        NoReusingMergeIterator<T> iterator = sorter.newIterator(keyComparator, 
mergeFunction);
-        return new RecordReader<T>() {
-
-            private boolean read = false;
-
-            @Nullable
-            @Override
-            public RecordIterator<T> readBatch() {
-                if (read) {
-                    return null;
-                }
-
-                read = true;
-                return new RecordIterator<T>() {
-                    @Override
-                    public T next() throws IOException {
-                        return iterator.next();
-                    }
-
-                    @Override
-                    public void releaseBatch() {}
-                };
-            }
+        List<SizedReaderSupplier<KeyValue>> sortedReaders = new 
ArrayList<>(inputReaders);
+        
sortedReaders.sort(Comparator.comparingLong(SizedReaderSupplier::estimateSize));
+        int spillSize = inputReaders.size() - spillThreshold;
+
+        List<ReaderSupplier<KeyValue>> readers =
+                new ArrayList<>(sortedReaders.subList(spillSize, 
sortedReaders.size()));
+        for (ReaderSupplier<KeyValue> supplier : sortedReaders.subList(0, 
spillSize)) {
+            readers.add(spill(supplier));
+        }
 
-            @Override
-            public void close() {
-                sorter.clear();
-            }
-        };
+        return mergeSortNoSpill(readers, keyComparator, 
userDefinedSeqComparator, mergeFunction);
     }
 
-    /**
-     * Here can not use {@link SortBufferWriteBuffer} for two reasons:
-     *
-     * <p>1.Changelog-producer: full-compaction and lookup need to know the 
level of the KeyValue.
-     *
-     * <p>2.Changelog-producer: full-compaction and lookup need to store the 
reference of
-     * update_before.
-     */
-    private class ExternalSorterWithLevel {
-
-        private final SortBuffer buffer;
-
-        public ExternalSorterWithLevel(@Nullable FieldsComparator 
userDefinedSeqComparator) {
-            if (memoryPool.freePages() < 3) {
-                throw new IllegalArgumentException(
-                        "Write buffer requires a minimum of 3 page memory, 
please increase write buffer memory size.");
-            }
-
-            // key fields
-            IntStream sortFields = IntStream.range(0, keyType.getFieldCount());
-
-            // user define sequence fields
-            if (userDefinedSeqComparator != null) {
-                IntStream udsFields =
-                        IntStream.of(userDefinedSeqComparator.compareFields())
-                                .map(operand -> operand + 
keyType.getFieldCount() + 3);
-                sortFields = IntStream.concat(sortFields, udsFields);
+    private ReaderSupplier<KeyValue> spill(ReaderSupplier<KeyValue> 
readerSupplier)
+            throws IOException {
+        checkArgument(ioManager != null);
+
+        FileIOChannel.ID channel = ioManager.createChannel();
+        KeyValueWithLevelNoReusingSerializer serializer =
+                new KeyValueWithLevelNoReusingSerializer(keyType, valueType);
+        BlockCompressionFactory compressFactory = 
BlockCompressionFactory.create(compression);
+        int compressBlock = (int) MemorySize.parse("64 kb").getBytes();
+
+        ChannelWithMeta channelWithMeta;
+        ChannelWriterOutputView out =
+                FileChannelUtil.createOutputView(
+                        ioManager, channel, compressFactory, compressBlock);
+        try (RecordReader<KeyValue> reader = readerSupplier.get(); ) {
+            RecordIterator<KeyValue> batch;
+            KeyValue record;
+            while ((batch = reader.readBatch()) != null) {
+                while ((record = batch.next()) != null) {
+                    serializer.serialize(record, out);
+                }
+                batch.releaseBatch();
             }
-
-            // sequence field
-            sortFields = IntStream.concat(sortFields, 
IntStream.of(keyType.getFieldCount()));
-
-            // row type
-            List<DataField> fields = new ArrayList<>(keyType.getFields());
-            fields.add(new DataField(0, SEQUENCE_NUMBER, new 
BigIntType(false)));
-            fields.add(new DataField(1, VALUE_KIND, new TinyIntType(false)));
-            fields.add(new DataField(2, "_LEVEL", new IntType(false)));
-            fields.addAll(valueType.getFields());
-
-            this.buffer =
-                    BinaryExternalSortBuffer.create(
-                            ioManager,
-                            new RowType(fields),
-                            sortFields.toArray(),
-                            memoryPool,
-                            spillSortMaxNumFiles,
-                            compression,
-                            maxDiskSize);
+        } finally {
+            out.close();
+            channelWithMeta =
+                    new ChannelWithMeta(channel, out.getBlockCount(), 
out.getWriteBytes());
         }
 
-        public boolean put(KeyValue keyValue) throws IOException {
-            GenericRow meta = new GenericRow(3);
-            meta.setField(0, keyValue.sequenceNumber());
-            meta.setField(1, keyValue.valueKind().toByteValue());
-            meta.setField(2, keyValue.level());
-            JoinedRow row =
-                    new JoinedRow()
-                            .replace(
-                                    new JoinedRow().replace(keyValue.key(), 
meta),
-                                    keyValue.value());
-            return buffer.write(row);
-        }
-
-        public boolean flushMemory() throws IOException {
-            return buffer.flushMemory();
-        }
+        return new SpilledReaderSupplier(
+                channelWithMeta, compressFactory, compressBlock, serializer);
+    }
 
-        public void clear() {
-            buffer.clear();
+    private class SpilledReaderSupplier implements ReaderSupplier<KeyValue> {
+
+        private final ChannelWithMeta channel;
+        private final BlockCompressionFactory compressFactory;
+        private final int compressBlock;
+        private final KeyValueWithLevelNoReusingSerializer serializer;
+
+        public SpilledReaderSupplier(
+                ChannelWithMeta channel,
+                BlockCompressionFactory compressFactory,
+                int compressBlock,
+                KeyValueWithLevelNoReusingSerializer serializer) {
+            this.channel = channel;
+            this.compressFactory = compressFactory;
+            this.compressBlock = compressBlock;
+            this.serializer = serializer;
         }
 
-        public <T> NoReusingMergeIterator<T> newIterator(
-                Comparator<InternalRow> keyComparator, MergeFunctionWrapper<T> 
mergeFunction)
-                throws IOException {
-            return new NoReusingMergeIterator<>(
-                    buffer.sortedIterator(), keyComparator, mergeFunction);
+        @Override
+        public RecordReader<KeyValue> get() throws IOException {
+            ChannelReaderInputView view =
+                    FileChannelUtil.createInputView(
+                            ioManager, channel, new ArrayList<>(), 
compressFactory, compressBlock);
+            BinaryRowSerializer rowSerializer = new 
BinaryRowSerializer(serializer.numFields());
+            ChannelReaderInputViewIterator iterator =
+                    new ChannelReaderInputViewIterator(view, null, 
rowSerializer);
+            return new ChannelReaderReader(view, iterator, serializer);
         }
     }
 
-    private class NoReusingMergeIterator<T> {
-
-        private final MutableObjectIterator<BinaryRow> kvIter;
-        private final Comparator<InternalRow> keyComparator;
-        private final MergeFunctionWrapper<T> mergeFunc;
+    private static class ChannelReaderReader implements RecordReader<KeyValue> 
{
 
-        private KeyValue left;
+        private final ChannelReaderInputView view;
+        private final ChannelReaderInputViewIterator iterator;
+        private final KeyValueWithLevelNoReusingSerializer serializer;
 
-        private boolean isEnd;
-
-        private NoReusingMergeIterator(
-                MutableObjectIterator<BinaryRow> kvIter,
-                Comparator<InternalRow> keyComparator,
-                MergeFunctionWrapper<T> mergeFunction) {
-            this.kvIter = kvIter;
-            this.keyComparator = keyComparator;
-            this.mergeFunc = mergeFunction;
-            this.isEnd = false;
+        private ChannelReaderReader(
+                ChannelReaderInputView view,
+                ChannelReaderInputViewIterator iterator,
+                KeyValueWithLevelNoReusingSerializer serializer) {
+            this.view = view;
+            this.iterator = iterator;
+            this.serializer = serializer;
         }
 
-        public T next() throws IOException {
-            if (isEnd) {
+        private boolean read = false;
+
+        @Override
+        public RecordIterator<KeyValue> readBatch() {
+            if (read) {
                 return null;
             }
 
-            T result;
-            do {
-                mergeFunc.reset();
-                InternalRow key = null;
-                KeyValue keyValue;
-                while ((keyValue = readOnce()) != null) {
-                    if (key != null && keyComparator.compare(keyValue.key(), 
key) != 0) {
-                        break;
+            read = true;
+            return new RecordIterator<KeyValue>() {
+                @Override
+                public KeyValue next() throws IOException {
+                    BinaryRow noReuseRow = iterator.next();
+                    if (noReuseRow == null) {
+                        return null;
                     }
-                    key = keyValue.key();
-                    mergeFunc.add(keyValue);
+                    return serializer.fromRow(noReuseRow);
                 }
-                left = keyValue;
-                if (key == null) {
-                    return null;
-                }
-                result = mergeFunc.getResult();
-            } while (result == null);
-            return result;
-        }
 
-        private KeyValue readOnce() throws IOException {
-            if (left != null) {
-                KeyValue ret = left;
-                left = null;
-                return ret;
-            }
-            BinaryRow row = kvIter.next();
-            if (row == null) {
-                isEnd = true;
-                return null;
-            }
+                @Override
+                public void releaseBatch() {}
+            };
+        }
 
-            int keyArity = keyType.getFieldCount();
-            int valueArity = valueType.getFieldCount();
-            return new KeyValue()
-                    .replace(
-                            new OffsetRow(keyArity, 0).replace(row),
-                            row.getLong(keyArity),
-                            RowKind.fromByteValue(row.getByte(keyArity + 1)),
-                            new OffsetRow(valueArity, keyArity + 
3).replace(row))
-                    .setLevel(row.getInt(keyArity + 2));
+        @Override
+        public void close() throws IOException {
+            view.getChannel().closeAndDelete();
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
index 766dccca2..88a9c2593 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
@@ -23,9 +23,10 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.FileReaderFactory;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
-import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
 import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
+import org.apache.paimon.reader.ReaderSupplier;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.SizedReaderSupplier;
 import org.apache.paimon.utils.FieldsComparator;
 
 import javax.annotation.Nullable;
@@ -71,9 +72,20 @@ public class MergeTreeReaders {
             MergeFunctionWrapper<T> mergeFunctionWrapper,
             MergeSorter mergeSorter)
             throws IOException {
-        List<ReaderSupplier<KeyValue>> readers = new ArrayList<>();
+        List<SizedReaderSupplier<KeyValue>> readers = new ArrayList<>();
         for (SortedRun run : section) {
-            readers.add(() -> readerForRun(run, readerFactory));
+            readers.add(
+                    new SizedReaderSupplier<KeyValue>() {
+                        @Override
+                        public long estimateSize() {
+                            return run.totalSize();
+                        }
+
+                        @Override
+                        public RecordReader<KeyValue> get() throws IOException 
{
+                            return readerForRun(run, readerFactory);
+                        }
+                    });
         }
         return mergeSorter.mergeSort(
                 readers, userKeyComparator, userDefinedSeqComparator, 
mergeFunctionWrapper);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ConcatRecordReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ConcatRecordReader.java
index 3b8e6908f..c9f5bebfc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ConcatRecordReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ConcatRecordReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.mergetree.compact;
 
+import org.apache.paimon.reader.ReaderSupplier;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.utils.Preconditions;
 
@@ -40,14 +41,15 @@ public class ConcatRecordReader<T> implements 
RecordReader<T> {
 
     private RecordReader<T> current;
 
-    protected ConcatRecordReader(List<ReaderSupplier<T>> readerFactories) {
+    protected ConcatRecordReader(List<? extends ReaderSupplier<T>> 
readerFactories) {
         readerFactories.forEach(
                 supplier ->
                         Preconditions.checkNotNull(supplier, "Reader factory 
must not be null."));
         this.queue = new LinkedList<>(readerFactories);
     }
 
-    public static <R> RecordReader<R> create(List<ReaderSupplier<R>> readers) 
throws IOException {
+    public static <R> RecordReader<R> create(List<? extends ReaderSupplier<R>> 
readers)
+            throws IOException {
         return readers.size() == 1 ? readers.get(0).get() : new 
ConcatRecordReader<>(readers);
     }
 
@@ -81,10 +83,4 @@ public class ConcatRecordReader<T> implements 
RecordReader<T> {
             current.close();
         }
     }
-
-    /** Supplier to get {@link RecordReader}. */
-    @FunctionalInterface
-    public interface ReaderSupplier<T> {
-        RecordReader<T> get() throws IOException;
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
index 8002b62f0..20bee30c1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
@@ -33,13 +33,13 @@ import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.MergeTreeReaders;
 import org.apache.paimon.mergetree.SortedRun;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
-import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
 import org.apache.paimon.mergetree.compact.IntervalPartition;
 import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
 import 
org.apache.paimon.mergetree.compact.MergeFunctionFactory.AdjustedProjection;
 import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
 import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.ReaderSupplier;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.source.DataSplit;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index 7e6e91731..777a4588a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -37,6 +37,7 @@ import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.partition.PartitionUtils;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.EmptyRecordReader;
+import org.apache.paimon.reader.ReaderSupplier;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.IndexCastMapping;
 import org.apache.paimon.schema.SchemaEvolutionUtil;
@@ -131,7 +132,7 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
     public RecordReader<InternalRow> createReader(DataSplit split) throws 
IOException {
         DataFilePathFactory dataFilePathFactory =
                 pathFactory.createDataFilePathFactory(split.partition(), 
split.bucket());
-        List<ConcatRecordReader.ReaderSupplier<InternalRow>> suppliers = new 
ArrayList<>();
+        List<ReaderSupplier<InternalRow>> suppliers = new ArrayList<>();
         if (split.beforeFiles().size() > 0) {
             LOG.info("Ignore split before files: " + split.beforeFiles());
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java
index f45960706..f6350f44a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java
@@ -30,6 +30,7 @@ public class SystemColumns {
     public static final String VALUE_COUNT = "_VALUE_COUNT";
     public static final String SEQUENCE_NUMBER = "_SEQUENCE_NUMBER";
     public static final String VALUE_KIND = "_VALUE_KIND";
+    public static final String LEVEL = "_LEVEL";
     public static final List<String> SYSTEM_FIELD_NAMES =
             Arrays.asList(VALUE_COUNT, SEQUENCE_NUMBER, VALUE_KIND);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/sort/AbstractBinaryExternalMerger.java
 
b/paimon-core/src/main/java/org/apache/paimon/sort/AbstractBinaryExternalMerger.java
index 56b526313..e1db28c16 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/sort/AbstractBinaryExternalMerger.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/sort/AbstractBinaryExternalMerger.java
@@ -183,7 +183,7 @@ public abstract class AbstractBinaryExternalMerger<Entry> 
implements Closeable {
                             compressionCodecFactory,
                             compressionBlockSize);
             writeMergingOutput(mergeIterator, output);
-            numBytesInLastBlock = output.close();
+            output.close();
             numBlocksWritten = output.getBlockCount();
         } catch (IOException e) {
             if (output != null) {
@@ -202,8 +202,7 @@ public abstract class AbstractBinaryExternalMerger<Entry> 
implements Closeable {
             }
         }
 
-        return new ChannelWithMeta(
-                mergedChannelID, numBlocksWritten, numBytesInLastBlock, 
output.getWriteBytes());
+        return new ChannelWithMeta(mergedChannelID, numBlocksWritten, 
output.getWriteBytes());
     }
 
     // 
-------------------------------------------------------------------------------------------
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
 
b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
index c709dcc1a..72003408a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
@@ -251,7 +251,6 @@ public class BinaryExternalSortBuffer implements SortBuffer 
{
         channelManager.addChannel(channel);
 
         ChannelWriterOutputView output = null;
-        int bytesInLastBuffer;
         int blockCount;
 
         try {
@@ -260,7 +259,7 @@ public class BinaryExternalSortBuffer implements SortBuffer 
{
                             ioManager, channel, compressionCodecFactory, 
compressionBlockSize);
             new QuickSort().sort(inMemorySortBuffer);
             inMemorySortBuffer.writeToOutput(output);
-            bytesInLastBuffer = output.close();
+            output.close();
             blockCount = output.getBlockCount();
         } catch (IOException e) {
             if (output != null) {
@@ -270,9 +269,7 @@ public class BinaryExternalSortBuffer implements SortBuffer 
{
             throw e;
         }
 
-        spillChannelIDs.add(
-                new ChannelWithMeta(
-                        channel, blockCount, bytesInLastBuffer, 
output.getWriteBytes()));
+        spillChannelIDs.add(new ChannelWithMeta(channel, blockCount, 
output.getWriteBytes()));
         inMemorySortBuffer.clear();
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
index 1b2c6299b..7b78d5aed 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.operation.SplitRead;
+import org.apache.paimon.reader.ReaderSupplier;
 import org.apache.paimon.reader.RecordReader;
 
 import java.io.IOException;
@@ -44,7 +45,7 @@ public interface TableRead {
     RecordReader<InternalRow> createReader(Split split) throws IOException;
 
     default RecordReader<InternalRow> createReader(List<Split> splits) throws 
IOException {
-        List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers = new 
ArrayList<>();
+        List<ReaderSupplier<InternalRow>> readers = new ArrayList<>();
         for (Split split : splits) {
             readers.add(() -> createReader(split));
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
index 0519da9fd..b610ff60b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
@@ -117,7 +117,7 @@ public class IncrementalDiffSplitRead implements 
SplitRead<InternalRow> {
             MergeSorter sorter,
             boolean keepDelete)
             throws IOException {
-        return sorter.mergeSort(
+        return sorter.mergeSortNoSpill(
                 Arrays.asList(
                         () -> wrapLevelToReader(beforeReader, BEFORE_LEVEL),
                         () -> wrapLevelToReader(afterReader, AFTER_LEVEL)),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/KeyValueWithLevelNoReusingSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/utils/KeyValueWithLevelNoReusingSerializer.java
new file mode 100644
index 000000000..456dc43a0
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/KeyValueWithLevelNoReusingSerializer.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import static org.apache.paimon.data.JoinedRow.join;
+
+/** Serializer for {@link KeyValue} with Level. */
+public class KeyValueWithLevelNoReusingSerializer extends 
ObjectSerializer<KeyValue> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int keyArity;
+    private final int valueArity;
+
+    public KeyValueWithLevelNoReusingSerializer(RowType keyType, RowType 
valueType) {
+        super(KeyValue.schemaWithLevel(keyType, valueType));
+
+        this.keyArity = keyType.getFieldCount();
+        this.valueArity = valueType.getFieldCount();
+    }
+
+    @Override
+    public InternalRow toRow(KeyValue kv) {
+        GenericRow meta = GenericRow.of(kv.sequenceNumber(), 
kv.valueKind().toByteValue());
+        return join(join(join(kv.key(), meta), kv.value()), 
GenericRow.of(kv.level()));
+    }
+
+    @Override
+    public KeyValue fromRow(InternalRow row) {
+        return new KeyValue()
+                .replace(
+                        new OffsetRow(keyArity, 0).replace(row),
+                        row.getLong(keyArity),
+                        RowKind.fromByteValue(row.getByte(keyArity + 1)),
+                        new OffsetRow(valueArity, keyArity + 2).replace(row))
+                .setLevel(row.getInt(keyArity + 2 + valueArity));
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeSorterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeSorterTest.java
index 557bd2959..b368e6014 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeSorterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeSorterTest.java
@@ -24,10 +24,11 @@ import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
 import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.SizedReaderSupplier;
 import 
org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
 import org.apache.paimon.testutils.junit.parameterized.Parameters;
 import org.apache.paimon.types.DataTypes;
@@ -139,11 +140,11 @@ public class MergeSorterTest {
         }
         comparator = comparator.thenComparingLong(KeyValue::sequenceNumber);
 
-        List<ReaderSupplier<KeyValue>> readers = new ArrayList<>();
+        List<SizedReaderSupplier<KeyValue>> readers = new ArrayList<>();
         Random rnd = new Random();
         List<KeyValue> expectedKvs = new ArrayList<>();
         Set<Long> distinctSeq = new HashSet<>();
-        for (int i = 0; i < rnd.nextInt(10) + 3; i++) {
+        for (int i = 0; i < rnd.nextInt(20) + 3; i++) {
             List<KeyValue> kvs = new ArrayList<>();
             Set<Integer> distinctKeys = new HashSet<>();
             for (int j = 0; j < 100; j++) {
@@ -172,7 +173,18 @@ public class MergeSorterTest {
             }
             expectedKvs.addAll(kvs);
             kvs.sort(comparator);
-            readers.add(() -> new IteratorRecordReader<>(kvs.iterator()));
+            readers.add(
+                    new SizedReaderSupplier<KeyValue>() {
+                        @Override
+                        public long estimateSize() {
+                            return kvs.size();
+                        }
+
+                        @Override
+                        public RecordReader<KeyValue> get() {
+                            return new IteratorRecordReader<>(kvs.iterator());
+                        }
+                    });
         }
 
         expectedKvs.sort(comparator);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ConcatRecordReaderTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ConcatRecordReaderTest.java
index 6b4a85c0c..ca04d8a7b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ConcatRecordReaderTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ConcatRecordReaderTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.mergetree.compact;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.SortEngine;
 import org.apache.paimon.KeyValue;
+import org.apache.paimon.reader.ReaderSupplier;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.utils.ReusingTestData;
 import org.apache.paimon.utils.TestReusingRecordReader;
@@ -48,9 +49,7 @@ public class ConcatRecordReaderTest extends 
CombiningRecordReaderTestBase {
     protected RecordReader<KeyValue> createRecordReader(
             List<TestReusingRecordReader> readers, SortEngine sortEngine) {
         return new ConcatRecordReader(
-                readers.stream()
-                        .map(r -> (ConcatRecordReader.ReaderSupplier) () -> r)
-                        .collect(Collectors.toList()));
+                readers.stream().map(r -> (ReaderSupplier) () -> 
r).collect(Collectors.toList()));
     }
 
     @Test
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
index ec9197b8b..ff67dbc27 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
@@ -32,6 +32,7 @@ import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.ReaderSupplier;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.schema.Schema;
@@ -284,7 +285,7 @@ public class OrphanFilesCleanTest {
         while (id <= max) {
             List<Split> splits = scan.plan().splits();
             if (!splits.isEmpty()) {
-                List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers = 
new ArrayList<>();
+                List<ReaderSupplier<InternalRow>> readers = new ArrayList<>();
                 for (Split split : splits) {
                     readers.add(() -> scanTable.newRead().createReader(split));
                 }
@@ -318,7 +319,7 @@ public class OrphanFilesCleanTest {
 
     private void validateSnapshot(Snapshot snapshot, List<TestPojo> data) 
throws Exception {
         List<Split> splits = 
table.newSnapshotReader().withSnapshot(snapshot).read().splits();
-        List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers = new 
ArrayList<>();
+        List<ReaderSupplier<InternalRow>> readers = new ArrayList<>();
         for (Split split : splits) {
             readers.add(() -> table.newRead().createReader(split));
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index daf5db716..937005d21 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -37,11 +37,11 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
-import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
 import org.apache.paimon.operation.FileStoreTestUtils;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.ReaderSupplier;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.schema.SchemaChange;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
index f5874bed7..277892410 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
@@ -30,6 +30,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.ReaderSupplier;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.schema.Schema;
@@ -446,7 +447,7 @@ public abstract class SchemaEvolutionTableTestBase {
     protected List<String> getResult(
             TableRead read, List<Split> splits, Function<InternalRow, String> 
rowDataToString) {
         try {
-            List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers = new 
ArrayList<>();
+            List<ReaderSupplier<InternalRow>> readers = new ArrayList<>();
             for (Split split : splits) {
                 readers.add(() -> read.createReader(split));
             }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
index defbe24ec..2ed0d5c9b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
@@ -28,6 +28,7 @@ import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.ReaderSupplier;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.schema.Schema;
@@ -109,7 +110,7 @@ public abstract class ScannerTestBase {
     }
 
     protected List<String> getResult(TableRead read, List<Split> splits) 
throws Exception {
-        List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers = new 
ArrayList<>();
+        List<ReaderSupplier<InternalRow>> readers = new ArrayList<>();
         for (Split split : splits) {
             readers.add(() -> read.createReader(split));
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
index ea9256830..ceb40c1a8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
@@ -25,6 +25,7 @@ import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.ReaderSupplier;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
@@ -144,7 +145,7 @@ public class LookupStreamingReader {
                             options.pageSize(),
                             new 
Options(table.options()).get(LOOKUP_BOOTSTRAP_PARALLELISM));
         } else {
-            List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers = new 
ArrayList<>();
+            List<ReaderSupplier<InternalRow>> readers = new ArrayList<>();
             for (Split split : splits) {
                 readers.add(() -> readerSupplier.apply(split));
             }

Reply via email to