This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d363cf97 [flink] SortOperator should use disk from Flink IOManager
5d363cf97 is described below

commit 5d363cf97f2a40236f65bfd3c2dd64977039319a
Author: Jingsong <[email protected]>
AuthorDate: Tue Sep 12 16:19:01 2023 +0800

    [flink] SortOperator should use disk from Flink IOManager
---
 .../org/apache/paimon/flink/sorter/SortOperator.java | 20 +++++++++-----------
 .../org/apache/paimon/flink/sorter/SortUtils.java    | 10 +++++++---
 2 files changed, 16 insertions(+), 14 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
index 368639756..b2da6972b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
@@ -34,16 +34,11 @@ import org.apache.paimon.sort.BinaryInMemorySortBuffer;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.MutableObjectIterator;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.runtime.operators.TableStreamOperator;
 
-import static org.apache.paimon.disk.IOManagerImpl.splitPaths;
-
 /** SortOperator to sort the `InternalRow`s by the `KeyType`. */
 public class SortOperator extends TableStreamOperator<InternalRow>
         implements OneInputStreamOperator<InternalRow, InternalRow>, 
BoundedOneInput {
@@ -52,13 +47,15 @@ public class SortOperator extends 
TableStreamOperator<InternalRow>
     private final long maxMemory;
     private final int pageSize;
     private final int arity;
+    private final int spillSortMaxNumFiles;
     private transient BinaryExternalSortBuffer buffer;
 
-    public SortOperator(RowType rowType, long maxMemory, int pageSize) {
+    public SortOperator(RowType rowType, long maxMemory, int pageSize, int 
spillSortMaxNumFiles) {
         this.rowType = rowType;
         this.maxMemory = maxMemory;
         this.pageSize = pageSize;
         this.arity = rowType.getFieldCount();
+        this.spillSortMaxNumFiles = spillSortMaxNumFiles;
     }
 
     @Override
@@ -78,17 +75,18 @@ public class SortOperator extends 
TableStreamOperator<InternalRow>
                 BinaryInMemorySortBuffer.createBuffer(
                         normalizedKeyComputer, serializer, keyComparator, 
memoryPool);
 
-        Configuration jobConfig = getContainingTask().getJobConfiguration();
-
         buffer =
                 new BinaryExternalSortBuffer(
                         new BinaryRowSerializer(serializer.getArity()),
                         keyComparator,
                         memoryPool.pageSize(),
                         inMemorySortBuffer,
-                        new 
IOManagerImpl(splitPaths(jobConfig.get(CoreOptions.TMP_DIRS))),
-                        jobConfig.getInteger(
-                                
ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES));
+                        new IOManagerImpl(
+                                getContainingTask()
+                                        .getEnvironment()
+                                        .getIOManager()
+                                        .getSpillingDirectoriesPaths()),
+                        spillSortMaxNumFiles);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
index 754086221..bd1932282 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.sorter;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.JoinedRow;
 import org.apache.paimon.flink.FlinkConnectorOptions;
@@ -87,8 +88,7 @@ public class SortUtils {
 
         final RowType valueRowType = table.rowType();
         final int parallelism = inputStream.getParallelism();
-        final long maxSortMemory = table.coreOptions().writeBufferSize();
-        final int pageSize = table.coreOptions().pageSize();
+        CoreOptions options = table.coreOptions();
 
         String sinkParallelismValue =
                 
table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key());
@@ -154,7 +154,11 @@ public class SortUtils {
                 .transform(
                         "LOCAL SORT",
                         internalRowType,
-                        new SortOperator(longRowType, maxSortMemory, pageSize))
+                        new SortOperator(
+                                longRowType,
+                                options.writeBufferSize(),
+                                options.pageSize(),
+                                options.localSortMaxNumFileHandles()))
                 .setParallelism(sinkParallelism)
                 // remove the key column from every row
                 .map(

Reply via email to