This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new edaac231d [core] Introduce sort-spill-threshold (#1431)
edaac231d is described below
commit edaac231dbccce9cddd48937e37c74e84eb638f0
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 26 11:55:55 2023 +0800
[core] Introduce sort-spill-threshold (#1431)
---
.../shortcodes/generated/core_configuration.html | 18 +-
.../apache/paimon/memory/CachelessSegmentPool.java | 61 ++++
.../org/apache/paimon/reader/RecordReader.java | 21 ++
.../paimon/utils/ConsumerWithIOException.java | 32 +++
.../main/java/org/apache/paimon/CoreOptions.java | 46 +--
.../java/org/apache/paimon/disk/IOManagerImpl.java | 10 +-
.../org/apache/paimon/mergetree/MergeSorter.java | 312 +++++++++++++++++++++
.../apache/paimon/mergetree/MergeTreeReaders.java | 43 +--
.../paimon/mergetree/SortBufferWriteBuffer.java | 46 ---
.../compact/ChangelogMergeTreeRewriter.java | 23 +-
.../FullChangelogMergeTreeCompactRewriter.java | 6 +-
.../compact/LookupMergeTreeCompactRewriter.java | 6 +-
.../compact/MergeTreeCompactRewriter.java | 10 +-
.../mergetree/compact/UniversalCompaction.java | 13 +-
.../paimon/operation/KeyValueFileStoreRead.java | 19 +-
.../paimon/operation/KeyValueFileStoreWrite.java | 15 +-
.../paimon/table/sink/BatchWriteBuilderImpl.java | 4 +-
.../paimon/table/source/KeyValueTableRead.java | 7 +
.../org/apache/paimon/table/source/TableRead.java | 5 +
.../apache/paimon/mergetree/MergeSorterTest.java | 151 ++++++++++
.../apache/paimon/mergetree/MergeTreeTestBase.java | 10 +-
.../compact/ForceUpLevel0CompactionTest.java | 2 +-
.../mergetree/compact/UniversalCompactionTest.java | 55 +---
.../table/ChangelogWithKeyFileStoreTableTest.java | 24 +-
.../apache/paimon/flink/source/FlinkSource.java | 10 +-
.../paimon/flink/source/operator/ReadOperator.java | 9 +-
.../apache/paimon/flink/BatchFileStoreITCase.java | 14 +
.../apache/paimon/spark/SparkReaderFactory.java | 8 +-
.../java/org/apache/paimon/spark/SparkUtils.java | 32 +++
.../spark/commands/WriteIntoPaimonTable.scala | 2 +
.../org/apache/paimon/spark/SparkWriteITCase.java | 8 +
31 files changed, 811 insertions(+), 211 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 53c514d36..bf4265e27 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -68,12 +68,6 @@ under the License.
<td>Integer</td>
<td>The size amplification is defined as the amount (in
percentage) of additional storage needed to store a single byte of data in the
merge tree for changelog mode table.</td>
</tr>
- <tr>
- <td><h5>compaction.max-sorted-run-num</h5></td>
- <td style="word-wrap: break-word;">2147483647</td>
- <td>Integer</td>
- <td>The maximum sorted run number to pick for compaction. This
value avoids merging too much sorted runs at the same time during compaction,
which may lead to OutOfMemoryError.</td>
- </tr>
<tr>
<td><h5>compaction.max.file-num</h5></td>
<td style="word-wrap: break-word;">50</td>
@@ -410,6 +404,18 @@ under the License.
<td><p>Enum</p></td>
<td>Specify the sort engine for table with primary key.<br /><br
/>Possible values:<ul><li>"min-heap": Use min-heap for multiway
sorting.</li><li>"loser-tree": Use loser-tree for multiway sorting. Compared
with heapsort, loser-tree has fewer comparisons and is more
efficient.</li></ul></td>
</tr>
+ <tr>
+ <td><h5>sort-spill-buffer-size</h5></td>
+ <td style="word-wrap: break-word;">64 mb</td>
+ <td>MemorySize</td>
+ <td>Amount of data to spill records to disk in spilled sort.</td>
+ </tr>
+ <tr>
+ <td><h5>sort-spill-threshold</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Integer</td>
+ <td>If the maximum number of sort readers exceeds this value, a
spill will be attempted. This prevents too many readers from consuming too much
memory and causing OOM.</td>
+ </tr>
<tr>
<td><h5>source.split.open-file-cost</h5></td>
<td style="word-wrap: break-word;">4 mb</td>
diff --git
a/paimon-common/src/main/java/org/apache/paimon/memory/CachelessSegmentPool.java
b/paimon-common/src/main/java/org/apache/paimon/memory/CachelessSegmentPool.java
new file mode 100644
index 000000000..cec1db858
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/memory/CachelessSegmentPool.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.memory;
+
+import java.util.List;
+
+/** A {@link MemorySegmentPool} without cache. */
+public class CachelessSegmentPool implements MemorySegmentPool {
+
+ private final int maxPages;
+ private final int pageSize;
+
+ private int numPage;
+
+ public CachelessSegmentPool(long maxMemory, int pageSize) {
+ this.maxPages = (int) (maxMemory / pageSize);
+ this.pageSize = pageSize;
+ this.numPage = 0;
+ }
+
+ @Override
+ public MemorySegment nextSegment() {
+ if (numPage < maxPages) {
+ numPage++;
+ return MemorySegment.allocateHeapMemory(pageSize);
+ }
+
+ return null;
+ }
+
+ @Override
+ public int pageSize() {
+ return pageSize;
+ }
+
+ @Override
+ public void returnAll(List<MemorySegment> memory) {
+ numPage -= memory.size();
+ }
+
+ @Override
+ public int freePages() {
+ return maxPages - numPage;
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
index 474e33b4d..79fab867d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
@@ -20,6 +20,7 @@ package org.apache.paimon.reader;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.ConsumerWithIOException;
import org.apache.paimon.utils.Filter;
import javax.annotation.Nullable;
@@ -141,6 +142,26 @@ public interface RecordReader<T> extends Closeable {
}
}
+ /**
+ * Performs the given action for each remaining element in {@link
RecordReader} until all
+ * elements have been processed or the action throws an exception.
+ */
+ default void forIOEachRemaining(ConsumerWithIOException<? super T> action)
throws IOException {
+ RecordReader.RecordIterator<T> batch;
+ T record;
+
+ try {
+ while ((batch = readBatch()) != null) {
+ while ((record = batch.next()) != null) {
+ action.accept(record);
+ }
+ batch.releaseBatch();
+ }
+ } finally {
+ close();
+ }
+ }
+
/** Returns a {@link RecordReader} that applies {@code function} to each
element. */
default <R> RecordReader<R> transform(Function<T, R> function) {
RecordReader<T> thisReader = this;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/ConsumerWithIOException.java
b/paimon-common/src/main/java/org/apache/paimon/utils/ConsumerWithIOException.java
new file mode 100644
index 000000000..cd62c9ec2
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/ConsumerWithIOException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.io.IOException;
+
+/** A consumer with {@link IOException}. */
+@FunctionalInterface
+public interface ConsumerWithIOException<T> {
+
+ /**
+ * Performs this operation on the given argument.
+ *
+ * @param t the input argument
+ */
+ void accept(T t) throws IOException;
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index aafbd2c72..d8ce5747f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -199,6 +199,20 @@ public class CoreOptions implements Serializable {
.defaultValue(SortEngine.LOSER_TREE)
.withDescription("Specify the sort engine for table with
primary key.");
+ public static final ConfigOption<Integer> SORT_SPILL_THRESHOLD =
+ key("sort-spill-threshold")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "If the maximum number of sort readers exceeds
this value, a spill will be attempted. "
+ + "This prevents too many readers from
consuming too much memory and causing OOM.");
+
+ public static final ConfigOption<MemorySize> SORT_SPILL_BUFFER_SIZE =
+ key("sort-spill-buffer-size")
+ .memoryType()
+ .defaultValue(MemorySize.parse("64 mb"))
+ .withDescription("Amount of data to spill records to disk
in spilled sort.");
+
@Immutable
public static final ConfigOption<WriteMode> WRITE_MODE =
key("write-mode")
@@ -337,15 +351,6 @@ public class CoreOptions implements Serializable {
+ "for append-only table, even if
sum(size(f_i)) < targetFileSize. This value "
+ "avoids pending too much small files,
which slows down the performance.");
- public static final ConfigOption<Integer> COMPACTION_MAX_SORTED_RUN_NUM =
- key("compaction.max-sorted-run-num")
- .intType()
- .defaultValue(Integer.MAX_VALUE)
- .withDescription(
- "The maximum sorted run number to pick for
compaction. "
- + "This value avoids merging too much
sorted runs at the same time during compaction, "
- + "which may lead to OutOfMemoryError.");
-
public static final ConfigOption<ChangelogProducer> CHANGELOG_PRODUCER =
key("changelog-producer")
.enumType(ChangelogProducer.class)
@@ -785,6 +790,15 @@ public class CoreOptions implements Serializable {
return options.get(SORT_ENGINE);
}
+ public int sortSpillThreshold() {
+ Integer maxSortedRunNum = options.get(SORT_SPILL_THRESHOLD);
+ if (maxSortedRunNum == null) {
+ int stopNum = numSortedRunStopTrigger();
+ maxSortedRunNum = Math.max(stopNum, stopNum + 1);
+ }
+ return maxSortedRunNum;
+ }
+
public long splitTargetSize() {
return options.get(SOURCE_SPLIT_TARGET_SIZE).getBytes();
}
@@ -802,6 +816,10 @@ public class CoreOptions implements Serializable {
return
options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore ||
!isStreaming);
}
+ public long sortSpillBufferSize() {
+ return options.get(SORT_SPILL_BUFFER_SIZE).getBytes();
+ }
+
public Duration continuousDiscoveryInterval() {
return options.get(CONTINUOUS_DISCOVERY_INTERVAL);
}
@@ -834,11 +852,7 @@ public class CoreOptions implements Serializable {
// By default, this ensures that the compaction does not fall to level
0, but at least to
// level 1
Integer numLevels = options.get(NUM_LEVELS);
- int expectedRuns =
- maxSortedRunNum() == Integer.MAX_VALUE
- ? numSortedRunCompactionTrigger()
- : numSortedRunStopTrigger();
- numLevels = numLevels == null ? expectedRuns + 1 : numLevels;
+ numLevels = numLevels == null ? numSortedRunCompactionTrigger() + 1 :
numLevels;
return numLevels;
}
@@ -862,10 +876,6 @@ public class CoreOptions implements Serializable {
return options.get(COMPACTION_MAX_FILE_NUM);
}
- public int maxSortedRunNum() {
- return options.get(COMPACTION_MAX_SORTED_RUN_NUM);
- }
-
public long dynamicBucketTargetRowNum() {
return options.get(DYNAMIC_BUCKET_TARGET_ROW_NUM);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
index 55be7a0f6..cae471aa8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
@@ -25,6 +25,8 @@ import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
@@ -47,7 +49,7 @@ public class IOManagerImpl implements IOManager {
*
* @param tempDirs The basic directories for files underlying anonymous
channels.
*/
- public IOManagerImpl(String[] tempDirs) {
+ public IOManagerImpl(String... tempDirs) {
this.fileChannelManager =
new
FileChannelManagerImpl(Preconditions.checkNotNull(tempDirs), DIR_NAME_PREFIX);
if (LOG.isInfoEnabled()) {
@@ -122,4 +124,10 @@ public class IOManagerImpl implements IOManager {
public BufferFileReader createBufferFileReader(FileIOChannel.ID channelID)
throws IOException {
return new BufferFileReaderImpl(channelID);
}
+
+ public static String[] splitPaths(@Nonnull String separatedPaths) {
+ return separatedPaths.length() > 0
+ ? separatedPaths.split(",|" + File.pathSeparator)
+ : new String[0];
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
new file mode 100644
index 000000000..9f5b34e12
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.SortEngine;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.NormalizedKeyComputer;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.memory.CachelessSegmentPool;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.mergetree.compact.ConcatRecordReader;
+import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
+import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
+import org.apache.paimon.mergetree.compact.SortMergeReader;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.sort.BinaryExternalSortBuffer;
+import org.apache.paimon.sort.BinaryInMemorySortBuffer;
+import org.apache.paimon.sort.SortBuffer;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.MutableObjectIterator;
+import org.apache.paimon.utils.OffsetRow;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.apache.paimon.schema.SystemColumns.SEQUENCE_NUMBER;
+import static org.apache.paimon.schema.SystemColumns.VALUE_KIND;
+
+/** The merge sorter to sort and merge readers with key overlap. */
+public class MergeSorter {
+
+ private final RowType keyType;
+ private final RowType valueType;
+
+ private final SortEngine sortEngine;
+ private final int spillThreshold;
+ private final int spillSortMaxNumFiles;
+
+ private final MemorySegmentPool memoryPool;
+
+ @Nullable private IOManager ioManager;
+
+ public MergeSorter(
+ CoreOptions options,
+ RowType keyType,
+ RowType valueType,
+ @Nullable IOManager ioManager) {
+ this.sortEngine = options.sortEngine();
+ this.spillThreshold = options.sortSpillThreshold();
+ this.spillSortMaxNumFiles = options.localSortMaxNumFileHandles();
+ this.keyType = keyType;
+ this.valueType = valueType;
+ this.memoryPool =
+ new CachelessSegmentPool(options.sortSpillBufferSize(),
options.pageSize());
+ this.ioManager = ioManager;
+ }
+
+ public MemorySegmentPool memoryPool() {
+ return memoryPool;
+ }
+
+ public void setIOManager(IOManager ioManager) {
+ this.ioManager = ioManager;
+ }
+
+ public <T> RecordReader<T> mergeSort(
+ List<ReaderSupplier<KeyValue>> lazyReaders,
+ Comparator<InternalRow> keyComparator,
+ MergeFunctionWrapper<T> mergeFunction)
+ throws IOException {
+ if (ioManager != null && lazyReaders.size() > spillThreshold) {
+ return spillMergeSort(lazyReaders, keyComparator, mergeFunction);
+ }
+
+ List<RecordReader<KeyValue>> readers = new
ArrayList<>(lazyReaders.size());
+ for (ReaderSupplier<KeyValue> supplier : lazyReaders) {
+ try {
+ readers.add(supplier.get());
+ } catch (IOException e) {
+ // if one of the readers creating failed, we need to close
them all.
+ readers.forEach(IOUtils::closeQuietly);
+ throw e;
+ }
+ }
+
+ return SortMergeReader.createSortMergeReader(
+ readers, keyComparator, mergeFunction, sortEngine);
+ }
+
+ private <T> RecordReader<T> spillMergeSort(
+ List<ReaderSupplier<KeyValue>> readers,
+ Comparator<InternalRow> keyComparator,
+ MergeFunctionWrapper<T> mergeFunction)
+ throws IOException {
+ ExternalSorterWithLevel sorter = new ExternalSorterWithLevel();
+ ConcatRecordReader.create(readers).forIOEachRemaining(sorter::put);
+ sorter.flushMemory();
+
+ NoReusingMergeIterator<T> iterator = sorter.newIterator(keyComparator,
mergeFunction);
+ return new RecordReader<T>() {
+
+ private boolean read = false;
+
+ @Nullable
+ @Override
+ public RecordIterator<T> readBatch() {
+ if (read) {
+ return null;
+ }
+
+ read = true;
+ return new RecordIterator<T>() {
+ @Override
+ public T next() throws IOException {
+ return iterator.next();
+ }
+
+ @Override
+ public void releaseBatch() {}
+ };
+ }
+
+ @Override
+ public void close() {
+ sorter.clear();
+ }
+ };
+ }
+
+ /**
+ * Here can not use {@link SortBufferWriteBuffer} for two reasons:
+ *
+ * <p>1.Changelog-producer: full-compaction and lookup need to know the
level of the KeyValue.
+ *
+ * <p>2.Changelog-producer: full-compaction and lookup need to store the
reference of
+ * update_before.
+ */
+ private class ExternalSorterWithLevel {
+
+ private final SortBuffer buffer;
+
+ public ExternalSorterWithLevel() {
+ // user key + sequenceNumber
+ List<DataType> sortKeyTypes = new
ArrayList<>(keyType.getFieldTypes());
+ sortKeyTypes.add(new BigIntType(false));
+
+ // for sort binary buffer
+ NormalizedKeyComputer normalizedKeyComputer =
+ CodeGenUtils.newNormalizedKeyComputer(sortKeyTypes,
"MemTableKeyComputer");
+ RecordComparator keyComparator =
+ CodeGenUtils.newRecordComparator(sortKeyTypes,
"MemTableComparator");
+
+ if (memoryPool.freePages() < 3) {
+ throw new IllegalArgumentException(
+ "Write buffer requires a minimum of 3 page memory,
please increase write buffer memory size.");
+ }
+
+ List<DataField> fields = new ArrayList<>(keyType.getFields());
+ fields.add(new DataField(0, SEQUENCE_NUMBER, new
BigIntType(false)));
+ fields.add(new DataField(1, VALUE_KIND, new TinyIntType(false)));
+ fields.add(new DataField(2, "_LEVEL", new IntType(false)));
+ fields.addAll(valueType.getFields());
+ RowType schemaWithLevel = new RowType(fields);
+ InternalRowSerializer serializer =
InternalSerializers.create(schemaWithLevel);
+
+ this.buffer =
+ new BinaryExternalSortBuffer(
+ new BinaryRowSerializer(serializer.getArity()),
+ keyComparator,
+ memoryPool.pageSize(),
+ BinaryInMemorySortBuffer.createBuffer(
+ normalizedKeyComputer, serializer,
keyComparator, memoryPool),
+ ioManager,
+ spillSortMaxNumFiles);
+ }
+
+ public boolean put(KeyValue keyValue) throws IOException {
+ GenericRow meta = new GenericRow(3);
+ meta.setField(0, keyValue.sequenceNumber());
+ meta.setField(1, keyValue.valueKind().toByteValue());
+ meta.setField(2, keyValue.level());
+ JoinedRow row =
+ new JoinedRow()
+ .replace(
+ new JoinedRow().replace(keyValue.key(),
meta),
+ keyValue.value());
+ return buffer.write(row);
+ }
+
+ public boolean flushMemory() throws IOException {
+ return buffer.flushMemory();
+ }
+
+ public void clear() {
+ buffer.clear();
+ }
+
+ public <T> NoReusingMergeIterator<T> newIterator(
+ Comparator<InternalRow> keyComparator, MergeFunctionWrapper<T>
mergeFunction)
+ throws IOException {
+ return new NoReusingMergeIterator<>(
+ buffer.sortedIterator(), keyComparator, mergeFunction);
+ }
+ }
+
+ private class NoReusingMergeIterator<T> {
+
+ private final MutableObjectIterator<BinaryRow> kvIter;
+ private final Comparator<InternalRow> keyComparator;
+ private final MergeFunctionWrapper<T> mergeFunc;
+
+ private KeyValue left;
+
+ private boolean isEnd;
+
+ private NoReusingMergeIterator(
+ MutableObjectIterator<BinaryRow> kvIter,
+ Comparator<InternalRow> keyComparator,
+ MergeFunctionWrapper<T> mergeFunction) {
+ this.kvIter = kvIter;
+ this.keyComparator = keyComparator;
+ this.mergeFunc = mergeFunction;
+ this.isEnd = false;
+ }
+
+ public T next() throws IOException {
+ if (isEnd) {
+ return null;
+ }
+
+ T result;
+ do {
+ mergeFunc.reset();
+ InternalRow key = null;
+ KeyValue keyValue;
+ while ((keyValue = readOnce()) != null) {
+ if (key != null && keyComparator.compare(keyValue.key(),
key) != 0) {
+ break;
+ }
+ key = keyValue.key();
+ mergeFunc.add(keyValue);
+ }
+ left = keyValue;
+ if (key == null) {
+ return null;
+ }
+ result = mergeFunc.getResult();
+ } while (result == null);
+ return result;
+ }
+
+ private KeyValue readOnce() throws IOException {
+ if (left != null) {
+ KeyValue ret = left;
+ left = null;
+ return ret;
+ }
+ BinaryRow row = kvIter.next();
+ if (row == null) {
+ isEnd = true;
+ return null;
+ }
+
+ int keyArity = keyType.getFieldCount();
+ int valueArity = valueType.getFieldCount();
+ return new KeyValue()
+ .replace(
+ new OffsetRow(keyArity, 0).replace(row),
+ row.getLong(keyArity),
+ RowKind.fromByteValue(row.getByte(keyArity + 1)),
+ new OffsetRow(valueArity, keyArity +
3).replace(row))
+ .setLevel(row.getInt(keyArity + 2));
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
index ed2468056..aa2bb4a30 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
@@ -18,18 +18,16 @@
package org.apache.paimon.mergetree;
-import org.apache.paimon.CoreOptions.SortEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
+import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
-import org.apache.paimon.mergetree.compact.SortMergeReader;
import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.utils.IOUtils;
import java.io.IOException;
import java.util.ArrayList;
@@ -47,9 +45,9 @@ public class MergeTreeReaders {
KeyValueFileReaderFactory readerFactory,
Comparator<InternalRow> userKeyComparator,
MergeFunction<KeyValue> mergeFunction,
- SortEngine sortEngine)
+ MergeSorter mergeSorter)
throws IOException {
- List<ConcatRecordReader.ReaderSupplier<KeyValue>> readers = new
ArrayList<>();
+ List<ReaderSupplier<KeyValue>> readers = new ArrayList<>();
for (List<SortedRun> section : sections) {
readers.add(
() ->
@@ -58,7 +56,7 @@ public class MergeTreeReaders {
readerFactory,
userKeyComparator,
new
ReducerMergeFunctionWrapper(mergeFunction),
- sortEngine));
+ mergeSorter));
}
RecordReader<KeyValue> reader = ConcatRecordReader.create(readers);
if (dropDelete) {
@@ -67,25 +65,23 @@ public class MergeTreeReaders {
return reader;
}
- public static RecordReader<KeyValue> readerForSection(
+ public static <T> RecordReader<T> readerForSection(
List<SortedRun> section,
KeyValueFileReaderFactory readerFactory,
Comparator<InternalRow> userKeyComparator,
- MergeFunctionWrapper<KeyValue> mergeFunctionWrapper,
- SortEngine sortEngine)
+ MergeFunctionWrapper<T> mergeFunctionWrapper,
+ MergeSorter mergeSorter)
throws IOException {
- List<RecordReader<KeyValue>> readers = readerForSection(section,
readerFactory);
- if (readers.size() == 1) {
- return readers.get(0);
- } else {
- return SortMergeReader.createSortMergeReader(
- readers, userKeyComparator, mergeFunctionWrapper,
sortEngine);
+ List<ReaderSupplier<KeyValue>> readers = new ArrayList<>();
+ for (SortedRun run : section) {
+ readers.add(() -> readerForRun(run, readerFactory));
}
+ return mergeSorter.mergeSort(readers, userKeyComparator,
mergeFunctionWrapper);
}
public static RecordReader<KeyValue> readerForRun(
SortedRun run, KeyValueFileReaderFactory readerFactory) throws
IOException {
- List<ConcatRecordReader.ReaderSupplier<KeyValue>> readers = new
ArrayList<>();
+ List<ReaderSupplier<KeyValue>> readers = new ArrayList<>();
for (DataFileMeta file : run.files()) {
readers.add(
() ->
@@ -94,19 +90,4 @@ public class MergeTreeReaders {
}
return ConcatRecordReader.create(readers);
}
-
- public static List<RecordReader<KeyValue>> readerForSection(
- List<SortedRun> runs, KeyValueFileReaderFactory readerFactory)
throws IOException {
- List<RecordReader<KeyValue>> readers = new ArrayList<>();
- try {
- for (SortedRun run : runs) {
- readers.add(readerForRun(run, readerFactory));
- }
- } catch (IOException e) {
- // if one of the readers creating failed, we need to close them
all.
- readers.forEach(IOUtils::closeQuietly);
- throw e;
- }
- return readers;
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
index 8ff9d536c..53cc0969e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
@@ -47,7 +47,6 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
-import java.util.Iterator;
import java.util.List;
/** A {@link WriteBuffer} which stores records in {@link
BinaryInMemorySortBuffer}. */
@@ -248,49 +247,4 @@ public class SortBufferWriteBuffer implements WriteBuffer {
currentRow = tmpRow;
}
}
-
- private class RawIterator implements Iterator<KeyValue> {
- private final MutableObjectIterator<BinaryRow> kvIter;
- private final KeyValueSerializer current;
-
- private BinaryRow currentRow;
- private boolean advanced;
-
- private RawIterator(MutableObjectIterator<BinaryRow> kvIter) {
- this.kvIter = kvIter;
- this.current = new KeyValueSerializer(keyType, valueType);
- this.currentRow =
- new BinaryRow(keyType.getFieldCount() + 2 +
valueType.getFieldCount());
- this.advanced = false;
- }
-
- @Override
- public boolean hasNext() {
- if (!advanced) {
- advanceNext();
- }
- return currentRow != null;
- }
-
- @Override
- public KeyValue next() {
- if (!hasNext()) {
- return null;
- }
- advanced = false;
- return current.getReusedKv();
- }
-
- private void advanceNext() {
- try {
- currentRow = kvIter.next(currentRow);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- if (currentRow != null) {
- current.fromRow(currentRow);
- }
- advanced = true;
- }
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index 332798964..84d884483 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -18,7 +18,6 @@
package org.apache.paimon.mergetree.compact;
-import org.apache.paimon.CoreOptions.SortEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.compact.CompactResult;
@@ -27,9 +26,9 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeReaders;
import org.apache.paimon.mergetree.SortedRun;
-import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import java.util.ArrayList;
@@ -48,10 +47,10 @@ public abstract class ChangelogMergeTreeRewriter extends
MergeTreeCompactRewrite
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
MergeFunctionFactory<KeyValue> mfFactory,
- SortEngine sortEngine,
+ MergeSorter mergeSorter,
RecordEqualiser valueEqualiser,
boolean changelogRowDeduplicate) {
- super(readerFactory, writerFactory, keyComparator, mfFactory,
sortEngine);
+ super(readerFactory, writerFactory, keyComparator, mfFactory,
mergeSorter);
this.valueEqualiser = valueEqualiser;
this.changelogRowDeduplicate = changelogRowDeduplicate;
}
@@ -78,15 +77,13 @@ public abstract class ChangelogMergeTreeRewriter extends
MergeTreeCompactRewrite
List<ConcatRecordReader.ReaderSupplier<ChangelogResult>>
sectionReaders = new ArrayList<>();
for (List<SortedRun> section : sections) {
sectionReaders.add(
- () -> {
- List<RecordReader<KeyValue>> runReaders =
- MergeTreeReaders.readerForSection(section,
readerFactory);
- return SortMergeReader.createSortMergeReader(
- runReaders,
- keyComparator,
- createMergeWrapper(outputLevel),
- sortEngine);
- });
+ () ->
+ MergeTreeReaders.readerForSection(
+ section,
+ readerFactory,
+ keyComparator,
+ createMergeWrapper(outputLevel),
+ mergeSorter));
}
RecordReaderIterator<ChangelogResult> iterator = null;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
index 09c79fe48..2cc02ec1d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
@@ -18,13 +18,13 @@
package org.apache.paimon.mergetree.compact;
-import org.apache.paimon.CoreOptions.SortEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
+import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.utils.Preconditions;
@@ -43,7 +43,7 @@ public class FullChangelogMergeTreeCompactRewriter extends
ChangelogMergeTreeRew
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
MergeFunctionFactory<KeyValue> mfFactory,
- SortEngine sortEngine,
+ MergeSorter mergeSorter,
RecordEqualiser valueComparator,
boolean changelogRowDeduplicate) {
super(
@@ -51,7 +51,7 @@ public class FullChangelogMergeTreeCompactRewriter extends
ChangelogMergeTreeRew
writerFactory,
keyComparator,
mfFactory,
- sortEngine,
+ mergeSorter,
valueComparator,
changelogRowDeduplicate);
this.maxLevel = maxLevel;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index 07eef93e9..d0c710590 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -18,7 +18,6 @@
package org.apache.paimon.mergetree.compact;
-import org.apache.paimon.CoreOptions.SortEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.data.InternalRow;
@@ -26,6 +25,7 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.mergetree.LookupLevels;
+import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.SortedRun;
import java.io.IOException;
@@ -47,7 +47,7 @@ public class LookupMergeTreeCompactRewriter extends
ChangelogMergeTreeRewriter {
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
MergeFunctionFactory<KeyValue> mfFactory,
- SortEngine sortEngine,
+ MergeSorter mergeSorter,
RecordEqualiser valueEqualiser,
boolean changelogRowDeduplicate) {
super(
@@ -55,7 +55,7 @@ public class LookupMergeTreeCompactRewriter extends
ChangelogMergeTreeRewriter {
writerFactory,
keyComparator,
mfFactory,
- sortEngine,
+ mergeSorter,
valueEqualiser,
changelogRowDeduplicate);
this.lookupLevels = lookupLevels;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
index a4691bcb4..228a3aa11 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
@@ -18,7 +18,6 @@
package org.apache.paimon.mergetree.compact;
-import org.apache.paimon.CoreOptions.SortEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.data.InternalRow;
@@ -26,6 +25,7 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeReaders;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.reader.RecordReader;
@@ -41,19 +41,19 @@ public class MergeTreeCompactRewriter extends
AbstractCompactRewriter {
protected final KeyValueFileWriterFactory writerFactory;
protected final Comparator<InternalRow> keyComparator;
protected final MergeFunctionFactory<KeyValue> mfFactory;
- protected final SortEngine sortEngine;
+ protected final MergeSorter mergeSorter;
public MergeTreeCompactRewriter(
KeyValueFileReaderFactory readerFactory,
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
MergeFunctionFactory<KeyValue> mfFactory,
- SortEngine sortEngine) {
+ MergeSorter mergeSorter) {
this.readerFactory = readerFactory;
this.writerFactory = writerFactory;
this.keyComparator = keyComparator;
this.mfFactory = mfFactory;
- this.sortEngine = sortEngine;
+ this.mergeSorter = mergeSorter;
}
@Override
@@ -73,7 +73,7 @@ public class MergeTreeCompactRewriter extends
AbstractCompactRewriter {
readerFactory,
keyComparator,
mfFactory.create(),
- sortEngine);
+ mergeSorter);
writer.write(new RecordReaderIterator<>(sectionsReader));
writer.close();
return new CompactResult(extractFilesFromSections(sections),
writer.result());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
index 32c838879..86e7597a1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
@@ -43,14 +43,11 @@ public class UniversalCompaction implements CompactStrategy
{
private final int maxSizeAmp;
private final int sizeRatio;
private final int numRunCompactionTrigger;
- private final int maxSortedRunNum;
- public UniversalCompaction(
- int maxSizeAmp, int sizeRatio, int numRunCompactionTrigger, int
maxSortedRunNum) {
+ public UniversalCompaction(int maxSizeAmp, int sizeRatio, int
numRunCompactionTrigger) {
this.maxSizeAmp = maxSizeAmp;
this.sizeRatio = sizeRatio;
this.numRunCompactionTrigger = numRunCompactionTrigger;
- this.maxSortedRunNum = maxSortedRunNum;
}
@Override
@@ -138,7 +135,7 @@ public class UniversalCompaction implements CompactStrategy
{
}
if (forcePick || candidateCount > 1) {
- return createUnit(runs, maxLevel, candidateCount, maxSortedRunNum);
+ return createUnit(runs, maxLevel, candidateCount);
}
return null;
@@ -153,12 +150,8 @@ public class UniversalCompaction implements
CompactStrategy {
}
@VisibleForTesting
- static CompactUnit createUnit(
- List<LevelSortedRun> runs, int maxLevel, int runCount, int
maxSortedRunNum) {
+ static CompactUnit createUnit(List<LevelSortedRun> runs, int maxLevel, int
runCount) {
int outputLevel;
- if (runCount > maxSortedRunNum) {
- runCount = maxSortedRunNum;
- }
if (runCount == runs.size()) {
outputLevel = maxLevel;
} else {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
index 29a298bb0..fdfb3bb6d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
@@ -18,15 +18,17 @@
package org.apache.paimon.operation;
-import org.apache.paimon.CoreOptions.SortEngine;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.mergetree.DropDeleteReader;
+import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeReaders;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
@@ -35,7 +37,6 @@ import
org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import
org.apache.paimon.mergetree.compact.MergeFunctionFactory.AdjustedProjection;
import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
-import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
@@ -56,7 +57,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
-import static org.apache.paimon.CoreOptions.SORT_ENGINE;
import static org.apache.paimon.io.DataFilePathFactory.CHANGELOG_FILE_PREFIX;
import static org.apache.paimon.predicate.PredicateBuilder.containsFields;
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
@@ -69,7 +69,7 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
private final Comparator<InternalRow> keyComparator;
private final MergeFunctionFactory<KeyValue> mfFactory;
private final boolean valueCountMode;
- private final SortEngine sortEngine;
+ private final MergeSorter mergeSorter;
@Nullable private int[][] keyProjectedFields;
@@ -105,7 +105,9 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
this.keyComparator = keyComparator;
this.mfFactory = mfFactory;
this.valueCountMode = tableSchema.trimmedPrimaryKeys().isEmpty();
- this.sortEngine =
Options.fromMap(tableSchema.options()).get(SORT_ENGINE);
+ this.mergeSorter =
+ new MergeSorter(
+ CoreOptions.fromMap(tableSchema.options()), keyType,
valueType, null);
}
public KeyValueFileStoreRead withKeyProjection(int[][] projectedFields) {
@@ -124,6 +126,11 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
return this;
}
+ public KeyValueFileStoreRead withIOManager(IOManager ioManager) {
+ this.mergeSorter.setIOManager(ioManager);
+ return this;
+ }
+
@Override
public FileStoreRead<KeyValue> withFilter(Predicate predicate) {
List<Predicate> allFilters = new ArrayList<>();
@@ -218,7 +225,7 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
: nonOverlappedSectionFactory,
keyComparator,
mergeFuncWrapper,
- sortEngine));
+ mergeSorter));
}
DropDeleteReader reader =
new
DropDeleteReader(ConcatRecordReader.create(sectionReaders));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 1cb311633..01cf7b526 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -37,6 +37,7 @@ import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
import org.apache.paimon.mergetree.Levels;
import org.apache.paimon.mergetree.LookupLevels;
+import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeWriter;
import org.apache.paimon.mergetree.compact.CompactRewriter;
import org.apache.paimon.mergetree.compact.CompactStrategy;
@@ -153,8 +154,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
new UniversalCompaction(
options.maxSizeAmplificationPercent(),
options.sortedRunSizeRatio(),
- options.numSortedRunCompactionTrigger(),
- options.maxSortedRunNum());
+ options.numSortedRunCompactionTrigger());
CompactStrategy compactStrategy =
options.changelogProducer() == ChangelogProducer.LOOKUP
? new LookupCompaction(universalCompaction)
@@ -211,6 +211,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
bucket,
options.fileCompressionPerLevel(),
options.fileCompression());
+ MergeSorter mergeSorter = new MergeSorter(options, keyType, valueType,
ioManager);
switch (options.changelogProducer()) {
case FULL_COMPACTION:
return new FullChangelogMergeTreeCompactRewriter(
@@ -219,7 +220,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
writerFactory,
keyComparator,
mfFactory,
- options.sortEngine(),
+ mergeSorter,
valueEqualiserSupplier.get(),
options.changelogRowDeduplicate());
case LOOKUP:
@@ -230,16 +231,12 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
writerFactory,
keyComparator,
mfFactory,
- options.sortEngine(),
+ mergeSorter,
valueEqualiserSupplier.get(),
options.changelogRowDeduplicate());
default:
return new MergeTreeCompactRewriter(
- readerFactory,
- writerFactory,
- keyComparator,
- mfFactory,
- options.sortEngine());
+ readerFactory, writerFactory, keyComparator,
mfFactory, mergeSorter);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
index 4acd6be33..bf4cf8800 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
@@ -58,7 +58,9 @@ public class BatchWriteBuilderImpl implements
BatchWriteBuilder {
@Override
public BatchTableWrite newWrite() {
- return
table.newWrite(commitUser).withIgnorePreviousFiles(staticPartition != null);
+ return table.newWrite(commitUser)
+ .withIgnorePreviousFiles(staticPartition != null)
+ .isStreamingMode(false);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index ec861d1ce..a838806ee 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.operation.KeyValueFileStoreRead;
import org.apache.paimon.reader.RecordReader;
@@ -39,6 +40,12 @@ public abstract class KeyValueTableRead implements
InnerTableRead {
this.read = read;
}
+ @Override
+ public TableRead withIOManager(IOManager ioManager) {
+ read.withIOManager(ioManager);
+ return this;
+ }
+
@Override
public RecordReader<InternalRow> createReader(Split split) throws
IOException {
return new RowDataRecordReader(read.createReader((DataSplit) split));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
index f496e574a..8ba6eed3b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.operation.FileStoreRead;
import org.apache.paimon.reader.RecordReader;
@@ -36,6 +37,10 @@ import java.util.List;
@Public
public interface TableRead {
+ default TableRead withIOManager(IOManager ioManager) {
+ return this;
+ }
+
RecordReader<InternalRow> createReader(Split split) throws IOException;
default RecordReader<InternalRow> createReader(List<Split> splits) throws
IOException {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeSorterTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeSorterTest.java
new file mode 100644
index 000000000..b13eea843
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeSorterTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
+import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.IteratorRecordReader;
+
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link MergeSorter}. */
+public class MergeSorterTest {
+
+ private static final int MEMORY_SIZE = 1024 * 1024 * 32;
+
+ private final RowType keyType = RowType.builder().field("k",
DataTypes.INT()).build();
+
+ private final RowType valueType = RowType.builder().field("v",
DataTypes.INT()).build();
+
+ @TempDir Path tempDir;
+
+ private IOManager ioManager;
+ private MergeSorter sorter;
+ private int totalPages;
+
+ @BeforeEach
+ public void beforeTest() {
+ ioManager = IOManager.create(tempDir.toString());
+ Options options = new Options();
+ options.set(CoreOptions.SORT_SPILL_BUFFER_SIZE, new
MemorySize(MEMORY_SIZE));
+ sorter = new MergeSorter(new CoreOptions(options), keyType, valueType,
ioManager);
+ totalPages = sorter.memoryPool().freePages();
+ }
+
+ @AfterEach
+ public void afterTest() throws Exception {
+ assertThat(sorter.memoryPool().freePages()).isEqualTo(totalPages);
+ List<File> files =
+ Files.walk(tempDir)
+ .map(Path::toFile)
+ .filter(f -> !f.isDirectory())
+ .collect(Collectors.toList());
+ assertThat(files).isEmpty();
+ this.ioManager.close();
+ }
+
+ @Test
+ public void testSortAndMerge() throws Exception {
+ List<ReaderSupplier<KeyValue>> readers = new ArrayList<>();
+ Random rnd = new Random();
+ List<KeyValue> expectedKvs = new ArrayList<>();
+ Set<Long> distinctSeq = new HashSet<>();
+ for (int i = 0; i < 10; i++) {
+ List<KeyValue> kvs = new ArrayList<>();
+ for (int j = 0; j < 100; j++) {
+ long seq = rnd.nextLong();
+ while (distinctSeq.contains(seq)) {
+ rnd.nextLong();
+ }
+ distinctSeq.add(seq);
+ kvs.add(
+ new KeyValue()
+ .replace(
+ GenericRow.of(rnd.nextInt(100)),
+ seq,
+ RowKind.fromByteValue((byte)
rnd.nextInt(4)),
+ GenericRow.of(rnd.nextInt()))
+ .setLevel(rnd.nextInt(100)));
+ }
+ expectedKvs.addAll(kvs);
+ readers.add(() -> new IteratorRecordReader<>(kvs.iterator()));
+ }
+
+ expectedKvs.sort(
+ Comparator.comparingInt((KeyValue o) -> o.key().getInt(0))
+ .thenComparingLong(KeyValue::sequenceNumber));
+
+ MergeFunctionWrapper<List<KeyValue>> collectFunc =
+ new MergeFunctionWrapper<List<KeyValue>>() {
+
+ private List<KeyValue> result;
+
+ @Override
+ public void reset() {
+ result = new ArrayList<>();
+ }
+
+ @Override
+ public void add(KeyValue kv) {
+ result.add(kv);
+ }
+
+ @Nullable
+ @Override
+ public List<KeyValue> getResult() {
+ return result;
+ }
+ };
+ List<KeyValue> all = new ArrayList<>();
+ sorter.mergeSort(readers, Comparator.comparingInt(o -> o.getInt(0)),
collectFunc)
+ .forEachRemaining(all::addAll);
+
+
assertThat(toString(all)).containsExactlyElementsOf(toString(expectedKvs));
+ }
+
+ private List<String> toString(List<KeyValue> kvs) {
+ return kvs.stream().map(kv -> kv.toString(keyType,
valueType)).collect(Collectors.toList());
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 7cc4c79c7..fb4533a27 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -287,8 +287,7 @@ public abstract class MergeTreeTestBase {
new UniversalCompaction(
options.maxSizeAmplificationPercent(),
options.sortedRunSizeRatio(),
- options.numSortedRunCompactionTrigger(),
- options.maxSortedRunNum()),
+ options.numSortedRunCompactionTrigger()),
comparator,
options.targetFileSize(),
options.numSortedRunStopTrigger(),
@@ -448,8 +447,7 @@ public abstract class MergeTreeTestBase {
new UniversalCompaction(
options.maxSizeAmplificationPercent(),
options.sortedRunSizeRatio(),
- options.numSortedRunCompactionTrigger(),
- options.maxSortedRunNum());
+ options.numSortedRunCompactionTrigger());
return new MergeTreeCompactManager(
compactExecutor,
new Levels(comparator, files, options.numLevels()),
@@ -562,7 +560,7 @@ public abstract class MergeTreeTestBase {
readerFactory,
comparator,
DeduplicateMergeFunction.factory().create(),
- sortEngine);
+ new MergeSorter(options, null, null, null));
List<TestRecord> records = new ArrayList<>();
try (RecordReaderIterator<KeyValue> iterator = new
RecordReaderIterator<>(reader)) {
while (iterator.hasNext()) {
@@ -608,7 +606,7 @@ public abstract class MergeTreeTestBase {
compactReaderFactory,
comparator,
DeduplicateMergeFunction.factory().create(),
- sortEngine);
+ new MergeSorter(options, null, null, null));
writer.write(new RecordReaderIterator<>(sectionsReader));
writer.close();
return new CompactResult(extractFilesFromSections(sections),
writer.result());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ForceUpLevel0CompactionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ForceUpLevel0CompactionTest.java
index 8e64ef5c3..9c8fac85b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ForceUpLevel0CompactionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ForceUpLevel0CompactionTest.java
@@ -37,7 +37,7 @@ public class ForceUpLevel0CompactionTest {
@Test
public void testForceCompaction0() {
ForceUpLevel0Compaction compaction =
- new ForceUpLevel0Compaction(new UniversalCompaction(200, 1, 5,
Integer.MAX_VALUE));
+ new ForceUpLevel0Compaction(new UniversalCompaction(200, 1,
5));
Optional<CompactUnit> result = compaction.pick(3, Arrays.asList(run(0,
1), run(0, 1)));
assertThat(result).isPresent();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
index ef865f098..d549954a7 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
@@ -39,21 +39,16 @@ public class UniversalCompactionTest {
@Test
public void testOutputLevel() {
- assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 1,
Integer.MAX_VALUE).outputLevel())
- .isEqualTo(1);
- assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 2,
Integer.MAX_VALUE).outputLevel())
- .isEqualTo(1);
- assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 3,
Integer.MAX_VALUE).outputLevel())
- .isEqualTo(2);
- assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 4,
Integer.MAX_VALUE).outputLevel())
- .isEqualTo(3);
- assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 5,
Integer.MAX_VALUE).outputLevel())
- .isEqualTo(5);
+ assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5,
1).outputLevel()).isEqualTo(1);
+ assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5,
2).outputLevel()).isEqualTo(1);
+ assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5,
3).outputLevel()).isEqualTo(2);
+ assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5,
4).outputLevel()).isEqualTo(3);
+ assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5,
5).outputLevel()).isEqualTo(5);
}
@Test
public void testPick() {
- UniversalCompaction compaction = new UniversalCompaction(25, 1, 3,
Integer.MAX_VALUE);
+ UniversalCompaction compaction = new UniversalCompaction(25, 1, 3);
// by size amplification
Optional<CompactUnit> pick = compaction.pick(3, level0(1, 2, 3, 3));
@@ -79,37 +74,9 @@ public class UniversalCompactionTest {
assertThat(results).isEqualTo(new long[] {1, 2, 3});
}
- @Test
- public void testPickWithMaxSortedRunNum() {
- UniversalCompaction compaction = new UniversalCompaction(25, 1, 3, 2);
-
- // by size amplification
- Optional<CompactUnit> pick = compaction.pick(3, level0(1, 2, 3, 3));
- assertThat(pick.isPresent()).isTrue();
- long[] results =
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
- assertThat(results).isEqualTo(new long[] {1, 2, 3, 3});
-
- // by size ratio
- pick =
- compaction.pick(
- 4, Arrays.asList(level(0, 1), level(1, 1), level(2,
1), level(3, 50)));
- assertThat(pick.isPresent()).isTrue();
- results =
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
- assertThat(results).isEqualTo(new long[] {1, 1});
-
- // by file num
- pick =
- compaction.pick(
- 4, Arrays.asList(level(0, 1), level(1, 2), level(2,
3), level(3, 50)));
- assertThat(pick.isPresent()).isTrue();
- results =
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
- // 3 should be in the candidate, by size ratio after picking by file
num
- assertThat(results).isEqualTo(new long[] {1, 2});
- }
-
@Test
public void testNoOutputLevel0() {
- UniversalCompaction compaction = new UniversalCompaction(25, 1, 3, 2);
+ UniversalCompaction compaction = new UniversalCompaction(25, 1, 3);
Optional<CompactUnit> pick =
compaction.pick(
@@ -129,7 +96,7 @@ public class UniversalCompactionTest {
@Test
public void testSizeAmplification() {
- UniversalCompaction compaction = new UniversalCompaction(25, 0, 1,
Integer.MAX_VALUE);
+ UniversalCompaction compaction = new UniversalCompaction(25, 0, 1);
long[] sizes = new long[] {1};
sizes = appendAndPickForSizeAmp(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {2});
@@ -169,7 +136,7 @@ public class UniversalCompactionTest {
@Test
public void testSizeRatio() {
- UniversalCompaction compaction = new UniversalCompaction(25, 1, 5,
Integer.MAX_VALUE);
+ UniversalCompaction compaction = new UniversalCompaction(25, 1, 5);
long[] sizes = new long[] {1, 1, 1, 1};
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {5});
@@ -222,9 +189,9 @@ public class UniversalCompactionTest {
@Test
public void testSizeRatioThreshold() {
long[] sizes = new long[] {8, 9, 10};
- assertThat(pickForSizeRatio(new UniversalCompaction(25, 10, 2,
Integer.MAX_VALUE), sizes))
+ assertThat(pickForSizeRatio(new UniversalCompaction(25, 10, 2), sizes))
.isEqualTo(new long[] {8, 9, 10});
- assertThat(pickForSizeRatio(new UniversalCompaction(25, 20, 2,
Integer.MAX_VALUE), sizes))
+ assertThat(pickForSizeRatio(new UniversalCompaction(25, 20, 2), sizes))
.isEqualTo(new long[] {27});
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
index 33878452e..46e740333 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
@@ -25,6 +25,7 @@ import org.apache.paimon.WriteMode;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.operation.ScanKind;
@@ -358,13 +359,26 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
@Test
public void testStreamingFullChangelog() throws Exception {
+ innerTestStreamingFullChangelog(options -> {});
+ }
+
+ @Test
+ public void testStreamingFullChangelogWithSpill() throws Exception {
+ innerTestStreamingFullChangelog(
+ options -> options.set(CoreOptions.SORT_SPILL_THRESHOLD, 2));
+ }
+
+ private void innerTestStreamingFullChangelog(Consumer<Options> configure)
throws Exception {
FileStoreTable table =
createFileStoreTable(
- conf ->
- conf.set(
- CoreOptions.CHANGELOG_PRODUCER,
- ChangelogProducer.FULL_COMPACTION));
- StreamTableWrite write = table.newWrite(commitUser);
+ conf -> {
+ conf.set(
+ CoreOptions.CHANGELOG_PRODUCER,
+ ChangelogProducer.FULL_COMPACTION);
+ configure.accept(conf);
+ });
+ StreamTableWrite write =
+ table.newWrite(commitUser).withIOManager(new
IOManagerImpl(tempDir.toString()));
StreamTableCommit commit = table.newCommit(commitUser);
write.write(rowData(1, 10, 110L));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
index bbc493a19..aec2faa35 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
@@ -18,6 +18,8 @@
package org.apache.paimon.flink.source;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.flink.api.connector.source.Source;
@@ -25,10 +27,13 @@ import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.data.RowData;
import javax.annotation.Nullable;
+import static org.apache.paimon.disk.IOManagerImpl.splitPaths;
+
/** A Flink {@link Source} for paimon. */
public abstract class FlinkSource
implements Source<RowData, FileStoreSourceSplit,
PendingSplitsCheckpoint> {
@@ -46,7 +51,10 @@ public abstract class FlinkSource
@Override
public SourceReader<RowData, FileStoreSourceSplit>
createReader(SourceReaderContext context) {
- return new FileStoreSourceReader(context, readBuilder.newRead(),
limit);
+ IOManager ioManager =
+ new
IOManagerImpl(splitPaths(context.getConfiguration().get(CoreOptions.TMP_DIRS)));
+ return new FileStoreSourceReader(
+ context, readBuilder.newRead().withIOManager(ioManager),
limit);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
index f33e31b5f..d6834ee70 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.source.operator;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
@@ -53,7 +54,13 @@ public class ReadOperator extends
AbstractStreamOperator<RowData>
@Override
public void open() throws Exception {
super.open();
- this.read = readBuilder.newRead();
+ IOManagerImpl ioManager =
+ new IOManagerImpl(
+ getContainingTask()
+ .getEnvironment()
+ .getIOManager()
+ .getSpillingDirectoriesPaths());
+ this.read = readBuilder.newRead().withIOManager(ioManager);
this.reuseRow = new FlinkRowData(null);
this.reuseRecord = new StreamRecord<>(reuseRow);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 88eb01619..a5eb105e5 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -146,4 +146,18 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
.hasRootCauseInstanceOf(IllegalArgumentException.class)
.hasRootCauseMessage("Tag 'unknown' doesn't exist.");
}
+
+ @Test
+ public void testSortSpillMerge() {
+ sql(
+ "CREATE TABLE IF NOT EXISTS KT (a INT PRIMARY KEY NOT
ENFORCED, b INT) WITH ('sort-spill-threshold'='2')");
+ sql("INSERT INTO KT VALUES (1, 1)");
+ sql("INSERT INTO KT VALUES (1, 2)");
+ sql("INSERT INTO KT VALUES (1, 3)");
+ sql("INSERT INTO KT VALUES (1, 4)");
+ sql("INSERT INTO KT VALUES (1, 5)");
+ sql("INSERT INTO KT VALUES (1, 6)");
+ sql("INSERT INTO KT VALUES (1, 7)");
+ assertThat(sql("SELECT * FROM
KT")).containsExactlyInAnyOrder(Row.of(1, 7));
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java
index abb8c336c..64c74c5f8 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java
@@ -29,6 +29,8 @@ import
org.apache.spark.sql.connector.read.PartitionReaderFactory;
import java.io.IOException;
import java.io.UncheckedIOException;
+import static org.apache.paimon.spark.SparkUtils.createIOManager;
+
/** A Spark {@link PartitionReaderFactory} for paimon. */
public class SparkReaderFactory implements PartitionReaderFactory {
@@ -45,7 +47,11 @@ public class SparkReaderFactory implements
PartitionReaderFactory {
InputPartition partition) {
RecordReader<InternalRow> reader;
try {
- reader = readBuilder.newRead().createReader(((SparkInputPartition)
partition).split());
+ reader =
+ readBuilder
+ .newRead()
+ .withIOManager(createIOManager())
+ .createReader(((SparkInputPartition)
partition).split());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkUtils.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkUtils.java
new file mode 100644
index 000000000..2a293261f
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.IOManagerImpl;
+
+import org.apache.spark.SparkEnv;
+
+/** Utils for Spark. */
+public class SparkUtils {
+
+ public static IOManager createIOManager() {
+ String[] localDirs =
SparkEnv.get().blockManager().diskBlockManager().localDirsString();
+ return new IOManagerImpl(localDirs);
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 83d1a238a..06c9f0660 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -20,6 +20,7 @@ package org.apache.paimon.spark.commands
import org.apache.paimon.data.BinaryRow
import org.apache.paimon.index.PartitionIndex
import org.apache.paimon.spark.SparkRow
+import org.apache.paimon.spark.SparkUtils.createIOManager
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.{BatchWriteBuilder,
CommitMessageSerializer, DynamicBucketRow, InnerTableCommit,
RowPartitionKeyExtractor}
import org.apache.paimon.types.RowType
@@ -94,6 +95,7 @@ case class WriteIntoPaimonTable(table: FileStoreTable,
overwrite: Boolean, data:
.mapPartitions {
iter =>
val write = writeBuilder.newWrite()
+ write.withIOManager(createIOManager)
try {
iter.foreach {
row =>
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
index 8ee592326..e214adb5f 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
@@ -79,6 +79,14 @@ public class SparkWriteITCase {
innerSimpleWrite();
}
+ @Test
+ public void testSortSpill() {
+ spark.sql(
+ "CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES"
+ + " ('primary-key'='a', 'bucket'='4',
'file.format'='avro', 'sort-spill-threshold'='1')");
+ innerSimpleWrite();
+ }
+
private void innerSimpleWrite() {
spark.sql("INSERT INTO T VALUES (1, 2, '3')").collectAsList();
List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();