This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 10a982e1c [core] Optimize MergeSorter to avoid spill all data (#3798)
10a982e1c is described below
commit 10a982e1c870a2a34986b78e407fe6ca2dc93f17
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jul 24 10:43:59 2024 +0800
[core] Optimize MergeSorter to avoid spill all data (#3798)
---
.../org/apache/paimon/reader/ReaderSupplier.java | 20 +-
.../apache/paimon/reader/SizedReaderSupplier.java | 18 +-
.../src/main/java/org/apache/paimon/KeyValue.java | 11 +
.../org/apache/paimon/disk/ChannelWithMeta.java | 12 +-
.../paimon/disk/ChannelWriterOutputView.java | 7 +-
.../org/apache/paimon/disk/ExternalBuffer.java | 1 -
.../org/apache/paimon/mergetree/MergeSorter.java | 300 +++++++++------------
.../apache/paimon/mergetree/MergeTreeReaders.java | 18 +-
.../mergetree/compact/ConcatRecordReader.java | 12 +-
.../paimon/operation/MergeFileSplitRead.java | 2 +-
.../apache/paimon/operation/RawFileSplitRead.java | 3 +-
.../org/apache/paimon/schema/SystemColumns.java | 1 +
.../paimon/sort/AbstractBinaryExternalMerger.java | 5 +-
.../paimon/sort/BinaryExternalSortBuffer.java | 7 +-
.../org/apache/paimon/table/source/TableRead.java | 3 +-
.../source/splitread/IncrementalDiffSplitRead.java | 2 +-
.../KeyValueWithLevelNoReusingSerializer.java | 60 +++++
.../apache/paimon/mergetree/MergeSorterTest.java | 20 +-
.../mergetree/compact/ConcatRecordReaderTest.java | 5 +-
.../paimon/operation/OrphanFilesCleanTest.java | 5 +-
.../paimon/table/FileStoreTableTestBase.java | 2 +-
.../paimon/table/SchemaEvolutionTableTestBase.java | 3 +-
.../table/source/snapshot/ScannerTestBase.java | 3 +-
.../paimon/flink/lookup/LookupStreamingReader.java | 3 +-
24 files changed, 264 insertions(+), 259 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java
b/paimon-common/src/main/java/org/apache/paimon/reader/ReaderSupplier.java
similarity index 59%
copy from paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java
copy to paimon-common/src/main/java/org/apache/paimon/reader/ReaderSupplier.java
index f45960706..4664f5aea 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java
+++ b/paimon-common/src/main/java/org/apache/paimon/reader/ReaderSupplier.java
@@ -16,20 +16,12 @@
* limitations under the License.
*/
-package org.apache.paimon.schema;
+package org.apache.paimon.reader;
-import java.util.Arrays;
-import java.util.List;
+import java.io.IOException;
-/** System columns for key value store. */
-public class SystemColumns {
-
- /** System field names. */
- public static final String KEY_FIELD_PREFIX = "_KEY_";
-
- public static final String VALUE_COUNT = "_VALUE_COUNT";
- public static final String SEQUENCE_NUMBER = "_SEQUENCE_NUMBER";
- public static final String VALUE_KIND = "_VALUE_KIND";
- public static final List<String> SYSTEM_FIELD_NAMES =
- Arrays.asList(VALUE_COUNT, SEQUENCE_NUMBER, VALUE_KIND);
+/** Supplier to get {@link RecordReader}. */
+@FunctionalInterface
+public interface ReaderSupplier<T> {
+ RecordReader<T> get() throws IOException;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java
b/paimon-common/src/main/java/org/apache/paimon/reader/SizedReaderSupplier.java
similarity index 59%
copy from paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java
copy to
paimon-common/src/main/java/org/apache/paimon/reader/SizedReaderSupplier.java
index f45960706..e2dfd2864 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java
+++
b/paimon-common/src/main/java/org/apache/paimon/reader/SizedReaderSupplier.java
@@ -16,20 +16,10 @@
* limitations under the License.
*/
-package org.apache.paimon.schema;
+package org.apache.paimon.reader;
-import java.util.Arrays;
-import java.util.List;
+/** Supplier to get {@link RecordReader} with size. */
+public interface SizedReaderSupplier<T> extends ReaderSupplier<T> {
-/** System columns for key value store. */
-public class SystemColumns {
-
- /** System field names. */
- public static final String KEY_FIELD_PREFIX = "_KEY_";
-
- public static final String VALUE_COUNT = "_VALUE_COUNT";
- public static final String SEQUENCE_NUMBER = "_SEQUENCE_NUMBER";
- public static final String VALUE_KIND = "_VALUE_KIND";
- public static final List<String> SYSTEM_FIELD_NAMES =
- Arrays.asList(VALUE_COUNT, SEQUENCE_NUMBER, VALUE_KIND);
+ long estimateSize();
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
index bbab367e4..31d3749bf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TinyIntType;
@@ -33,6 +34,7 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.paimon.schema.SystemColumns.LEVEL;
import static org.apache.paimon.schema.SystemColumns.SEQUENCE_NUMBER;
import static org.apache.paimon.schema.SystemColumns.VALUE_KIND;
import static org.apache.paimon.utils.Preconditions.checkState;
@@ -120,6 +122,15 @@ public class KeyValue {
return new RowType(fields);
}
+ public static RowType schemaWithLevel(RowType keyType, RowType valueType) {
+ RowType.Builder builder = RowType.builder();
+ schema(keyType, valueType)
+ .getFields()
+ .forEach(f -> builder.field(f.name(), f.type(),
f.description()));
+ builder.field(LEVEL, DataTypes.INT().notNull());
+ return builder.build();
+ }
+
/**
* Create key-value fields, we need to add a const value to the id of
value field to ensure that
* they are consistent when compared by field id. For example, there are
two table with key
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWithMeta.java
b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWithMeta.java
index a3cb7b0e3..5c61cc44c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWithMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWithMeta.java
@@ -23,17 +23,11 @@ public class ChannelWithMeta {
private final FileIOChannel.ID channel;
private final int blockCount;
- private final int numBytesInLastBlock;
private final long numBytes;
- public ChannelWithMeta(
- FileIOChannel.ID channel,
- int blockCount,
- int numBytesInLastBlock,
- long numEstimatedBytes) {
+ public ChannelWithMeta(FileIOChannel.ID channel, int blockCount, long
numEstimatedBytes) {
this.channel = channel;
this.blockCount = blockCount;
- this.numBytesInLastBlock = numBytesInLastBlock;
this.numBytes = numEstimatedBytes;
}
@@ -45,10 +39,6 @@ public class ChannelWithMeta {
return blockCount;
}
- public int getNumBytesInLastBlock() {
- return numBytesInLastBlock;
- }
-
public long getNumBytes() {
return numBytes;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java
b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java
index 6a1838e9b..c2d3cc205 100644
---
a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java
+++
b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java
@@ -25,6 +25,7 @@ import org.apache.paimon.io.DataOutputView;
import org.apache.paimon.memory.Buffer;
import org.apache.paimon.memory.MemorySegment;
+import java.io.Closeable;
import java.io.IOException;
/**
@@ -32,7 +33,7 @@ import java.io.IOException;
* output stream. The view will compress its data before writing it in blocks
to the underlying
* channel.
*/
-public final class ChannelWriterOutputView extends AbstractPagedOutputView {
+public final class ChannelWriterOutputView extends AbstractPagedOutputView
implements Closeable {
private final MemorySegment compressedBuffer;
private final BlockCompressor compressor;
@@ -60,7 +61,8 @@ public final class ChannelWriterOutputView extends
AbstractPagedOutputView {
return writer;
}
- public int close() throws IOException {
+ @Override
+ public void close() throws IOException {
if (!writer.isClosed()) {
int currentPositionInSegment = getCurrentPositionInSegment();
writeCompressed(currentSegment, currentPositionInSegment);
@@ -68,7 +70,6 @@ public final class ChannelWriterOutputView extends
AbstractPagedOutputView {
this.writeBytes = writer.getSize();
this.writer.close();
}
- return -1;
}
public void closeAndDelete() throws IOException {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
index 529d63d0c..34c082371 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
@@ -188,7 +188,6 @@ public class ExternalBuffer implements RowBuffer {
new ChannelWithMeta(
channel,
inMemoryBuffer.getNumRecordBuffers(),
- inMemoryBuffer.getNumBytesInLastBuffer(),
channelWriterOutputView.getNumBytes()));
inMemoryBuffer.reset();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
index 420613899..f02cdaa3e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
@@ -21,31 +21,30 @@ package org.apache.paimon.mergetree;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.SortEngine;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.disk.ChannelReaderInputView;
+import org.apache.paimon.disk.ChannelReaderInputViewIterator;
+import org.apache.paimon.disk.ChannelWithMeta;
+import org.apache.paimon.disk.ChannelWriterOutputView;
+import org.apache.paimon.disk.FileChannelUtil;
+import org.apache.paimon.disk.FileIOChannel;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.memory.CachelessSegmentPool;
import org.apache.paimon.memory.MemorySegmentPool;
-import org.apache.paimon.mergetree.compact.ConcatRecordReader;
-import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
import org.apache.paimon.mergetree.compact.SortMergeReader;
import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.sort.BinaryExternalSortBuffer;
-import org.apache.paimon.sort.SortBuffer;
-import org.apache.paimon.types.BigIntType;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.IntType;
-import org.apache.paimon.types.RowKind;
+import org.apache.paimon.reader.RecordReader.RecordIterator;
+import org.apache.paimon.reader.SizedReaderSupplier;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.IOUtils;
-import org.apache.paimon.utils.MutableObjectIterator;
-import org.apache.paimon.utils.OffsetRow;
+import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer;
import javax.annotation.Nullable;
@@ -53,10 +52,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
-import java.util.stream.IntStream;
-import static org.apache.paimon.schema.SystemColumns.SEQUENCE_NUMBER;
-import static org.apache.paimon.schema.SystemColumns.VALUE_KIND;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
/** The merge sorter to sort and merge readers with key overlap. */
public class MergeSorter {
@@ -66,9 +63,7 @@ public class MergeSorter {
private final SortEngine sortEngine;
private final int spillThreshold;
- private final int spillSortMaxNumFiles;
private final String compression;
- private final MemorySize maxDiskSize;
private final MemorySegmentPool memoryPool;
@@ -81,14 +76,12 @@ public class MergeSorter {
@Nullable IOManager ioManager) {
this.sortEngine = options.sortEngine();
this.spillThreshold = options.sortSpillThreshold();
- this.spillSortMaxNumFiles = options.localSortMaxNumFileHandles();
this.compression = options.spillCompression();
this.keyType = keyType;
this.valueType = valueType;
this.memoryPool =
new CachelessSegmentPool(options.sortSpillBufferSize(),
options.pageSize());
this.ioManager = ioManager;
- this.maxDiskSize = options.writeBufferSpillDiskSize();
}
public MemorySegmentPool memoryPool() {
@@ -108,7 +101,7 @@ public class MergeSorter {
}
public <T> RecordReader<T> mergeSort(
- List<ReaderSupplier<KeyValue>> lazyReaders,
+ List<SizedReaderSupplier<KeyValue>> lazyReaders,
Comparator<InternalRow> keyComparator,
@Nullable FieldsComparator userDefinedSeqComparator,
MergeFunctionWrapper<T> mergeFunction)
@@ -118,6 +111,16 @@ public class MergeSorter {
lazyReaders, keyComparator, userDefinedSeqComparator,
mergeFunction);
}
+ return mergeSortNoSpill(
+ lazyReaders, keyComparator, userDefinedSeqComparator,
mergeFunction);
+ }
+
+ public <T> RecordReader<T> mergeSortNoSpill(
+ List<? extends ReaderSupplier<KeyValue>> lazyReaders,
+ Comparator<InternalRow> keyComparator,
+ @Nullable FieldsComparator userDefinedSeqComparator,
+ MergeFunctionWrapper<T> mergeFunction)
+ throws IOException {
List<RecordReader<KeyValue>> readers = new
ArrayList<>(lazyReaders.size());
for (ReaderSupplier<KeyValue> supplier : lazyReaders) {
try {
@@ -134,192 +137,129 @@ public class MergeSorter {
}
private <T> RecordReader<T> spillMergeSort(
- List<ReaderSupplier<KeyValue>> readers,
+ List<SizedReaderSupplier<KeyValue>> inputReaders,
Comparator<InternalRow> keyComparator,
@Nullable FieldsComparator userDefinedSeqComparator,
MergeFunctionWrapper<T> mergeFunction)
throws IOException {
- ExternalSorterWithLevel sorter = new
ExternalSorterWithLevel(userDefinedSeqComparator);
- ConcatRecordReader.create(readers).forIOEachRemaining(sorter::put);
- sorter.flushMemory();
-
- NoReusingMergeIterator<T> iterator = sorter.newIterator(keyComparator,
mergeFunction);
- return new RecordReader<T>() {
-
- private boolean read = false;
-
- @Nullable
- @Override
- public RecordIterator<T> readBatch() {
- if (read) {
- return null;
- }
-
- read = true;
- return new RecordIterator<T>() {
- @Override
- public T next() throws IOException {
- return iterator.next();
- }
-
- @Override
- public void releaseBatch() {}
- };
- }
+ List<SizedReaderSupplier<KeyValue>> sortedReaders = new
ArrayList<>(inputReaders);
+
sortedReaders.sort(Comparator.comparingLong(SizedReaderSupplier::estimateSize));
+ int spillSize = inputReaders.size() - spillThreshold;
+
+ List<ReaderSupplier<KeyValue>> readers =
+ new ArrayList<>(sortedReaders.subList(spillSize,
sortedReaders.size()));
+ for (ReaderSupplier<KeyValue> supplier : sortedReaders.subList(0,
spillSize)) {
+ readers.add(spill(supplier));
+ }
- @Override
- public void close() {
- sorter.clear();
- }
- };
+ return mergeSortNoSpill(readers, keyComparator,
userDefinedSeqComparator, mergeFunction);
}
- /**
- * Here can not use {@link SortBufferWriteBuffer} for two reasons:
- *
- * <p>1.Changelog-producer: full-compaction and lookup need to know the
level of the KeyValue.
- *
- * <p>2.Changelog-producer: full-compaction and lookup need to store the
reference of
- * update_before.
- */
- private class ExternalSorterWithLevel {
-
- private final SortBuffer buffer;
-
- public ExternalSorterWithLevel(@Nullable FieldsComparator
userDefinedSeqComparator) {
- if (memoryPool.freePages() < 3) {
- throw new IllegalArgumentException(
- "Write buffer requires a minimum of 3 page memory,
please increase write buffer memory size.");
- }
-
- // key fields
- IntStream sortFields = IntStream.range(0, keyType.getFieldCount());
-
- // user define sequence fields
- if (userDefinedSeqComparator != null) {
- IntStream udsFields =
- IntStream.of(userDefinedSeqComparator.compareFields())
- .map(operand -> operand +
keyType.getFieldCount() + 3);
- sortFields = IntStream.concat(sortFields, udsFields);
+ private ReaderSupplier<KeyValue> spill(ReaderSupplier<KeyValue>
readerSupplier)
+ throws IOException {
+ checkArgument(ioManager != null);
+
+ FileIOChannel.ID channel = ioManager.createChannel();
+ KeyValueWithLevelNoReusingSerializer serializer =
+ new KeyValueWithLevelNoReusingSerializer(keyType, valueType);
+ BlockCompressionFactory compressFactory =
BlockCompressionFactory.create(compression);
+ int compressBlock = (int) MemorySize.parse("64 kb").getBytes();
+
+ ChannelWithMeta channelWithMeta;
+ ChannelWriterOutputView out =
+ FileChannelUtil.createOutputView(
+ ioManager, channel, compressFactory, compressBlock);
+ try (RecordReader<KeyValue> reader = readerSupplier.get(); ) {
+ RecordIterator<KeyValue> batch;
+ KeyValue record;
+ while ((batch = reader.readBatch()) != null) {
+ while ((record = batch.next()) != null) {
+ serializer.serialize(record, out);
+ }
+ batch.releaseBatch();
}
-
- // sequence field
- sortFields = IntStream.concat(sortFields,
IntStream.of(keyType.getFieldCount()));
-
- // row type
- List<DataField> fields = new ArrayList<>(keyType.getFields());
- fields.add(new DataField(0, SEQUENCE_NUMBER, new
BigIntType(false)));
- fields.add(new DataField(1, VALUE_KIND, new TinyIntType(false)));
- fields.add(new DataField(2, "_LEVEL", new IntType(false)));
- fields.addAll(valueType.getFields());
-
- this.buffer =
- BinaryExternalSortBuffer.create(
- ioManager,
- new RowType(fields),
- sortFields.toArray(),
- memoryPool,
- spillSortMaxNumFiles,
- compression,
- maxDiskSize);
+ } finally {
+ out.close();
+ channelWithMeta =
+ new ChannelWithMeta(channel, out.getBlockCount(),
out.getWriteBytes());
}
- public boolean put(KeyValue keyValue) throws IOException {
- GenericRow meta = new GenericRow(3);
- meta.setField(0, keyValue.sequenceNumber());
- meta.setField(1, keyValue.valueKind().toByteValue());
- meta.setField(2, keyValue.level());
- JoinedRow row =
- new JoinedRow()
- .replace(
- new JoinedRow().replace(keyValue.key(),
meta),
- keyValue.value());
- return buffer.write(row);
- }
-
- public boolean flushMemory() throws IOException {
- return buffer.flushMemory();
- }
+ return new SpilledReaderSupplier(
+ channelWithMeta, compressFactory, compressBlock, serializer);
+ }
- public void clear() {
- buffer.clear();
+ private class SpilledReaderSupplier implements ReaderSupplier<KeyValue> {
+
+ private final ChannelWithMeta channel;
+ private final BlockCompressionFactory compressFactory;
+ private final int compressBlock;
+ private final KeyValueWithLevelNoReusingSerializer serializer;
+
+ public SpilledReaderSupplier(
+ ChannelWithMeta channel,
+ BlockCompressionFactory compressFactory,
+ int compressBlock,
+ KeyValueWithLevelNoReusingSerializer serializer) {
+ this.channel = channel;
+ this.compressFactory = compressFactory;
+ this.compressBlock = compressBlock;
+ this.serializer = serializer;
}
- public <T> NoReusingMergeIterator<T> newIterator(
- Comparator<InternalRow> keyComparator, MergeFunctionWrapper<T>
mergeFunction)
- throws IOException {
- return new NoReusingMergeIterator<>(
- buffer.sortedIterator(), keyComparator, mergeFunction);
+ @Override
+ public RecordReader<KeyValue> get() throws IOException {
+ ChannelReaderInputView view =
+ FileChannelUtil.createInputView(
+ ioManager, channel, new ArrayList<>(),
compressFactory, compressBlock);
+ BinaryRowSerializer rowSerializer = new
BinaryRowSerializer(serializer.numFields());
+ ChannelReaderInputViewIterator iterator =
+ new ChannelReaderInputViewIterator(view, null,
rowSerializer);
+ return new ChannelReaderReader(view, iterator, serializer);
}
}
- private class NoReusingMergeIterator<T> {
-
- private final MutableObjectIterator<BinaryRow> kvIter;
- private final Comparator<InternalRow> keyComparator;
- private final MergeFunctionWrapper<T> mergeFunc;
+ private static class ChannelReaderReader implements RecordReader<KeyValue>
{
- private KeyValue left;
+ private final ChannelReaderInputView view;
+ private final ChannelReaderInputViewIterator iterator;
+ private final KeyValueWithLevelNoReusingSerializer serializer;
- private boolean isEnd;
-
- private NoReusingMergeIterator(
- MutableObjectIterator<BinaryRow> kvIter,
- Comparator<InternalRow> keyComparator,
- MergeFunctionWrapper<T> mergeFunction) {
- this.kvIter = kvIter;
- this.keyComparator = keyComparator;
- this.mergeFunc = mergeFunction;
- this.isEnd = false;
+ private ChannelReaderReader(
+ ChannelReaderInputView view,
+ ChannelReaderInputViewIterator iterator,
+ KeyValueWithLevelNoReusingSerializer serializer) {
+ this.view = view;
+ this.iterator = iterator;
+ this.serializer = serializer;
}
- public T next() throws IOException {
- if (isEnd) {
+ private boolean read = false;
+
+ @Override
+ public RecordIterator<KeyValue> readBatch() {
+ if (read) {
return null;
}
- T result;
- do {
- mergeFunc.reset();
- InternalRow key = null;
- KeyValue keyValue;
- while ((keyValue = readOnce()) != null) {
- if (key != null && keyComparator.compare(keyValue.key(),
key) != 0) {
- break;
+ read = true;
+ return new RecordIterator<KeyValue>() {
+ @Override
+ public KeyValue next() throws IOException {
+ BinaryRow noReuseRow = iterator.next();
+ if (noReuseRow == null) {
+ return null;
}
- key = keyValue.key();
- mergeFunc.add(keyValue);
+ return serializer.fromRow(noReuseRow);
}
- left = keyValue;
- if (key == null) {
- return null;
- }
- result = mergeFunc.getResult();
- } while (result == null);
- return result;
- }
- private KeyValue readOnce() throws IOException {
- if (left != null) {
- KeyValue ret = left;
- left = null;
- return ret;
- }
- BinaryRow row = kvIter.next();
- if (row == null) {
- isEnd = true;
- return null;
- }
+ @Override
+ public void releaseBatch() {}
+ };
+ }
- int keyArity = keyType.getFieldCount();
- int valueArity = valueType.getFieldCount();
- return new KeyValue()
- .replace(
- new OffsetRow(keyArity, 0).replace(row),
- row.getLong(keyArity),
- RowKind.fromByteValue(row.getByte(keyArity + 1)),
- new OffsetRow(valueArity, keyArity +
3).replace(row))
- .setLevel(row.getInt(keyArity + 2));
+ @Override
+ public void close() throws IOException {
+ view.getChannel().closeAndDelete();
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
index 766dccca2..88a9c2593 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
@@ -23,9 +23,10 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.FileReaderFactory;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
-import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
+import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.SizedReaderSupplier;
import org.apache.paimon.utils.FieldsComparator;
import javax.annotation.Nullable;
@@ -71,9 +72,20 @@ public class MergeTreeReaders {
MergeFunctionWrapper<T> mergeFunctionWrapper,
MergeSorter mergeSorter)
throws IOException {
- List<ReaderSupplier<KeyValue>> readers = new ArrayList<>();
+ List<SizedReaderSupplier<KeyValue>> readers = new ArrayList<>();
for (SortedRun run : section) {
- readers.add(() -> readerForRun(run, readerFactory));
+ readers.add(
+ new SizedReaderSupplier<KeyValue>() {
+ @Override
+ public long estimateSize() {
+ return run.totalSize();
+ }
+
+ @Override
+ public RecordReader<KeyValue> get() throws IOException
{
+ return readerForRun(run, readerFactory);
+ }
+ });
}
return mergeSorter.mergeSort(
readers, userKeyComparator, userDefinedSeqComparator,
mergeFunctionWrapper);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ConcatRecordReader.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ConcatRecordReader.java
index 3b8e6908f..c9f5bebfc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ConcatRecordReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ConcatRecordReader.java
@@ -18,6 +18,7 @@
package org.apache.paimon.mergetree.compact;
+import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.utils.Preconditions;
@@ -40,14 +41,15 @@ public class ConcatRecordReader<T> implements
RecordReader<T> {
private RecordReader<T> current;
- protected ConcatRecordReader(List<ReaderSupplier<T>> readerFactories) {
+ protected ConcatRecordReader(List<? extends ReaderSupplier<T>>
readerFactories) {
readerFactories.forEach(
supplier ->
Preconditions.checkNotNull(supplier, "Reader factory
must not be null."));
this.queue = new LinkedList<>(readerFactories);
}
- public static <R> RecordReader<R> create(List<ReaderSupplier<R>> readers)
throws IOException {
+ public static <R> RecordReader<R> create(List<? extends ReaderSupplier<R>>
readers)
+ throws IOException {
return readers.size() == 1 ? readers.get(0).get() : new
ConcatRecordReader<>(readers);
}
@@ -81,10 +83,4 @@ public class ConcatRecordReader<T> implements
RecordReader<T> {
current.close();
}
}
-
- /** Supplier to get {@link RecordReader}. */
- @FunctionalInterface
- public interface ReaderSupplier<T> {
- RecordReader<T> get() throws IOException;
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
index 8002b62f0..20bee30c1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
@@ -33,13 +33,13 @@ import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeReaders;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
-import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
import org.apache.paimon.mergetree.compact.IntervalPartition;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import
org.apache.paimon.mergetree.compact.MergeFunctionFactory.AdjustedProjection;
import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.DataSplit;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index 7e6e91731..777a4588a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -37,6 +37,7 @@ import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.partition.PartitionUtils;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.EmptyRecordReader;
+import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.IndexCastMapping;
import org.apache.paimon.schema.SchemaEvolutionUtil;
@@ -131,7 +132,7 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
public RecordReader<InternalRow> createReader(DataSplit split) throws
IOException {
DataFilePathFactory dataFilePathFactory =
pathFactory.createDataFilePathFactory(split.partition(),
split.bucket());
- List<ConcatRecordReader.ReaderSupplier<InternalRow>> suppliers = new
ArrayList<>();
+ List<ReaderSupplier<InternalRow>> suppliers = new ArrayList<>();
if (split.beforeFiles().size() > 0) {
LOG.info("Ignore split before files: " + split.beforeFiles());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java
index f45960706..f6350f44a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java
@@ -30,6 +30,7 @@ public class SystemColumns {
public static final String VALUE_COUNT = "_VALUE_COUNT";
public static final String SEQUENCE_NUMBER = "_SEQUENCE_NUMBER";
public static final String VALUE_KIND = "_VALUE_KIND";
+ public static final String LEVEL = "_LEVEL";
public static final List<String> SYSTEM_FIELD_NAMES =
Arrays.asList(VALUE_COUNT, SEQUENCE_NUMBER, VALUE_KIND);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/sort/AbstractBinaryExternalMerger.java
b/paimon-core/src/main/java/org/apache/paimon/sort/AbstractBinaryExternalMerger.java
index 56b526313..e1db28c16 100644
---
a/paimon-core/src/main/java/org/apache/paimon/sort/AbstractBinaryExternalMerger.java
+++
b/paimon-core/src/main/java/org/apache/paimon/sort/AbstractBinaryExternalMerger.java
@@ -183,7 +183,7 @@ public abstract class AbstractBinaryExternalMerger<Entry>
implements Closeable {
compressionCodecFactory,
compressionBlockSize);
writeMergingOutput(mergeIterator, output);
- numBytesInLastBlock = output.close();
+ output.close();
numBlocksWritten = output.getBlockCount();
} catch (IOException e) {
if (output != null) {
@@ -202,8 +202,7 @@ public abstract class AbstractBinaryExternalMerger<Entry>
implements Closeable {
}
}
- return new ChannelWithMeta(
- mergedChannelID, numBlocksWritten, numBytesInLastBlock,
output.getWriteBytes());
+ return new ChannelWithMeta(mergedChannelID, numBlocksWritten,
output.getWriteBytes());
}
//
-------------------------------------------------------------------------------------------
diff --git
a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
index c709dcc1a..72003408a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
@@ -251,7 +251,6 @@ public class BinaryExternalSortBuffer implements SortBuffer
{
channelManager.addChannel(channel);
ChannelWriterOutputView output = null;
- int bytesInLastBuffer;
int blockCount;
try {
@@ -260,7 +259,7 @@ public class BinaryExternalSortBuffer implements SortBuffer
{
ioManager, channel, compressionCodecFactory,
compressionBlockSize);
new QuickSort().sort(inMemorySortBuffer);
inMemorySortBuffer.writeToOutput(output);
- bytesInLastBuffer = output.close();
+ output.close();
blockCount = output.getBlockCount();
} catch (IOException e) {
if (output != null) {
@@ -270,9 +269,7 @@ public class BinaryExternalSortBuffer implements SortBuffer
{
throw e;
}
- spillChannelIDs.add(
- new ChannelWithMeta(
- channel, blockCount, bytesInLastBuffer,
output.getWriteBytes()));
+ spillChannelIDs.add(new ChannelWithMeta(channel, blockCount,
output.getWriteBytes()));
inMemorySortBuffer.clear();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
index 1b2c6299b..7b78d5aed 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.operation.SplitRead;
+import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
import java.io.IOException;
@@ -44,7 +45,7 @@ public interface TableRead {
RecordReader<InternalRow> createReader(Split split) throws IOException;
default RecordReader<InternalRow> createReader(List<Split> splits) throws
IOException {
- List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers = new
ArrayList<>();
+ List<ReaderSupplier<InternalRow>> readers = new ArrayList<>();
for (Split split : splits) {
readers.add(() -> createReader(split));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
index 0519da9fd..b610ff60b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
@@ -117,7 +117,7 @@ public class IncrementalDiffSplitRead implements
SplitRead<InternalRow> {
MergeSorter sorter,
boolean keepDelete)
throws IOException {
- return sorter.mergeSort(
+ return sorter.mergeSortNoSpill(
Arrays.asList(
() -> wrapLevelToReader(beforeReader, BEFORE_LEVEL),
() -> wrapLevelToReader(afterReader, AFTER_LEVEL)),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/KeyValueWithLevelNoReusingSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/utils/KeyValueWithLevelNoReusingSerializer.java
new file mode 100644
index 000000000..456dc43a0
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/KeyValueWithLevelNoReusingSerializer.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import static org.apache.paimon.data.JoinedRow.join;
+
+/** Serializer for {@link KeyValue} with Level. */
+public class KeyValueWithLevelNoReusingSerializer extends
ObjectSerializer<KeyValue> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int keyArity;
+ private final int valueArity;
+
+ public KeyValueWithLevelNoReusingSerializer(RowType keyType, RowType
valueType) {
+ super(KeyValue.schemaWithLevel(keyType, valueType));
+
+ this.keyArity = keyType.getFieldCount();
+ this.valueArity = valueType.getFieldCount();
+ }
+
+ @Override
+ public InternalRow toRow(KeyValue kv) {
+ GenericRow meta = GenericRow.of(kv.sequenceNumber(),
kv.valueKind().toByteValue());
+ return join(join(join(kv.key(), meta), kv.value()),
GenericRow.of(kv.level()));
+ }
+
+ @Override
+ public KeyValue fromRow(InternalRow row) {
+ return new KeyValue()
+ .replace(
+ new OffsetRow(keyArity, 0).replace(row),
+ row.getLong(keyArity),
+ RowKind.fromByteValue(row.getByte(keyArity + 1)),
+ new OffsetRow(valueArity, keyArity + 2).replace(row))
+ .setLevel(row.getInt(keyArity + 2 + valueArity));
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeSorterTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeSorterTest.java
index 557bd2959..b368e6014 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeSorterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeSorterTest.java
@@ -24,10 +24,11 @@ import org.apache.paimon.KeyValue;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.SizedReaderSupplier;
import
org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
import org.apache.paimon.testutils.junit.parameterized.Parameters;
import org.apache.paimon.types.DataTypes;
@@ -139,11 +140,11 @@ public class MergeSorterTest {
}
comparator = comparator.thenComparingLong(KeyValue::sequenceNumber);
- List<ReaderSupplier<KeyValue>> readers = new ArrayList<>();
+ List<SizedReaderSupplier<KeyValue>> readers = new ArrayList<>();
Random rnd = new Random();
List<KeyValue> expectedKvs = new ArrayList<>();
Set<Long> distinctSeq = new HashSet<>();
- for (int i = 0; i < rnd.nextInt(10) + 3; i++) {
+ for (int i = 0; i < rnd.nextInt(20) + 3; i++) {
List<KeyValue> kvs = new ArrayList<>();
Set<Integer> distinctKeys = new HashSet<>();
for (int j = 0; j < 100; j++) {
@@ -172,7 +173,18 @@ public class MergeSorterTest {
}
expectedKvs.addAll(kvs);
kvs.sort(comparator);
- readers.add(() -> new IteratorRecordReader<>(kvs.iterator()));
+ readers.add(
+ new SizedReaderSupplier<KeyValue>() {
+ @Override
+ public long estimateSize() {
+ return kvs.size();
+ }
+
+ @Override
+ public RecordReader<KeyValue> get() {
+ return new IteratorRecordReader<>(kvs.iterator());
+ }
+ });
}
expectedKvs.sort(comparator);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ConcatRecordReaderTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ConcatRecordReaderTest.java
index 6b4a85c0c..ca04d8a7b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ConcatRecordReaderTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ConcatRecordReaderTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.mergetree.compact;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.SortEngine;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.utils.ReusingTestData;
import org.apache.paimon.utils.TestReusingRecordReader;
@@ -48,9 +49,7 @@ public class ConcatRecordReaderTest extends
CombiningRecordReaderTestBase {
protected RecordReader<KeyValue> createRecordReader(
List<TestReusingRecordReader> readers, SortEngine sortEngine) {
return new ConcatRecordReader(
- readers.stream()
- .map(r -> (ConcatRecordReader.ReaderSupplier) () -> r)
- .collect(Collectors.toList()));
+ readers.stream().map(r -> (ReaderSupplier) () ->
r).collect(Collectors.toList()));
}
@Test
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
index ec9197b8b..ff67dbc27 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java
@@ -32,6 +32,7 @@ import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.Schema;
@@ -284,7 +285,7 @@ public class OrphanFilesCleanTest {
while (id <= max) {
List<Split> splits = scan.plan().splits();
if (!splits.isEmpty()) {
- List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers =
new ArrayList<>();
+ List<ReaderSupplier<InternalRow>> readers = new ArrayList<>();
for (Split split : splits) {
readers.add(() -> scanTable.newRead().createReader(split));
}
@@ -318,7 +319,7 @@ public class OrphanFilesCleanTest {
private void validateSnapshot(Snapshot snapshot, List<TestPojo> data)
throws Exception {
List<Split> splits =
table.newSnapshotReader().withSnapshot(snapshot).read().splits();
- List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers = new
ArrayList<>();
+ List<ReaderSupplier<InternalRow>> readers = new ArrayList<>();
for (Split split : splits) {
readers.add(() -> table.newRead().createReader(split));
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index daf5db716..937005d21 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -37,11 +37,11 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
-import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
import org.apache.paimon.operation.FileStoreTestUtils;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.SchemaChange;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
index f5874bed7..277892410 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
@@ -30,6 +30,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.Schema;
@@ -446,7 +447,7 @@ public abstract class SchemaEvolutionTableTestBase {
protected List<String> getResult(
TableRead read, List<Split> splits, Function<InternalRow, String>
rowDataToString) {
try {
- List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers = new
ArrayList<>();
+ List<ReaderSupplier<InternalRow>> readers = new ArrayList<>();
for (Split split : splits) {
readers.add(() -> read.createReader(split));
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
index defbe24ec..2ed0d5c9b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
@@ -28,6 +28,7 @@ import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.Schema;
@@ -109,7 +110,7 @@ public abstract class ScannerTestBase {
}
protected List<String> getResult(TableRead read, List<Split> splits)
throws Exception {
- List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers = new
ArrayList<>();
+ List<ReaderSupplier<InternalRow>> readers = new ArrayList<>();
for (Split split : splits) {
readers.add(() -> read.createReader(split));
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
index ea9256830..ceb40c1a8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
@@ -25,6 +25,7 @@ import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
@@ -144,7 +145,7 @@ public class LookupStreamingReader {
options.pageSize(),
new
Options(table.options()).get(LOOKUP_BOOTSTRAP_PARALLELISM));
} else {
- List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers = new
ArrayList<>();
+ List<ReaderSupplier<InternalRow>> readers = new ArrayList<>();
for (Split split : splits) {
readers.add(() -> readerSupplier.apply(split));
}