This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5d363cf97 [flink] SortOperator should use disk from Flink IOManager
5d363cf97 is described below
commit 5d363cf97f2a40236f65bfd3c2dd64977039319a
Author: Jingsong <[email protected]>
AuthorDate: Tue Sep 12 16:19:01 2023 +0800
[flink] SortOperator should use disk from Flink IOManager
---
.../org/apache/paimon/flink/sorter/SortOperator.java | 20 +++++++++-----------
.../org/apache/paimon/flink/sorter/SortUtils.java | 10 +++++++---
2 files changed, 16 insertions(+), 14 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
index 368639756..b2da6972b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
@@ -34,16 +34,11 @@ import org.apache.paimon.sort.BinaryInMemorySortBuffer;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.MutableObjectIterator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
-import static org.apache.paimon.disk.IOManagerImpl.splitPaths;
-
/** SortOperator to sort the `InternalRow`s by the `KeyType`. */
public class SortOperator extends TableStreamOperator<InternalRow>
implements OneInputStreamOperator<InternalRow, InternalRow>,
BoundedOneInput {
@@ -52,13 +47,15 @@ public class SortOperator extends
TableStreamOperator<InternalRow>
private final long maxMemory;
private final int pageSize;
private final int arity;
+ private final int spillSortMaxNumFiles;
private transient BinaryExternalSortBuffer buffer;
- public SortOperator(RowType rowType, long maxMemory, int pageSize) {
+ public SortOperator(RowType rowType, long maxMemory, int pageSize, int
spillSortMaxNumFiles) {
this.rowType = rowType;
this.maxMemory = maxMemory;
this.pageSize = pageSize;
this.arity = rowType.getFieldCount();
+ this.spillSortMaxNumFiles = spillSortMaxNumFiles;
}
@Override
@@ -78,17 +75,18 @@ public class SortOperator extends
TableStreamOperator<InternalRow>
BinaryInMemorySortBuffer.createBuffer(
normalizedKeyComputer, serializer, keyComparator,
memoryPool);
- Configuration jobConfig = getContainingTask().getJobConfiguration();
-
buffer =
new BinaryExternalSortBuffer(
new BinaryRowSerializer(serializer.getArity()),
keyComparator,
memoryPool.pageSize(),
inMemorySortBuffer,
- new
IOManagerImpl(splitPaths(jobConfig.get(CoreOptions.TMP_DIRS))),
- jobConfig.getInteger(
-
ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES));
+ new IOManagerImpl(
+ getContainingTask()
+ .getEnvironment()
+ .getIOManager()
+ .getSpillingDirectoriesPaths()),
+ spillSortMaxNumFiles);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
index 754086221..bd1932282 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.sorter;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
@@ -87,8 +88,7 @@ public class SortUtils {
final RowType valueRowType = table.rowType();
final int parallelism = inputStream.getParallelism();
- final long maxSortMemory = table.coreOptions().writeBufferSize();
- final int pageSize = table.coreOptions().pageSize();
+ CoreOptions options = table.coreOptions();
String sinkParallelismValue =
table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key());
@@ -154,7 +154,11 @@ public class SortUtils {
.transform(
"LOCAL SORT",
internalRowType,
- new SortOperator(longRowType, maxSortMemory, pageSize))
+ new SortOperator(
+ longRowType,
+ options.writeBufferSize(),
+ options.pageSize(),
+ options.localSortMaxNumFileHandles()))
.setParallelism(sinkParallelism)
// remove the key column from every row
.map(