This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new 40e9501a5fc [FLINK-30989][runtime] Some config options related to 
sorting and spilling are not valid.
40e9501a5fc is described below

commit 40e9501a5fcd7a71af4a7e79cd1556e190488137
Author: Weijie Guo <[email protected]>
AuthorDate: Fri Feb 17 01:16:09 2023 +0800

    [FLINK-30989][runtime] Some config options related to sorting and spilling 
are not valid.
---
 .../streaming/api/operators/sort/MultiInputSortingDataInput.java | 6 +++---
 .../flink/streaming/api/operators/sort/SortingDataInput.java     | 9 +++++----
 .../runtime/io/StreamMultipleInputProcessorFactory.java          | 2 +-
 .../streaming/runtime/io/StreamTwoInputProcessorFactory.java     | 2 +-
 .../apache/flink/streaming/runtime/tasks/OneInputStreamTask.java | 2 +-
 5 files changed, 11 insertions(+), 10 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java
index 7355e5b0fde..e398028f787 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java
@@ -150,7 +150,7 @@ public final class MultiInputSortingDataInput<IN, K> 
implements StreamTaskInput<
             IOManager ioManager,
             boolean objectReuse,
             double managedMemoryFraction,
-            Configuration jobConfiguration,
+            Configuration taskManagerConfiguration,
             ExecutionConfig executionConfig) {
         int keyLength = keySerializer.getLength();
         final TypeComparator<Tuple2<byte[], StreamRecord<Object>>> comparator;
@@ -196,11 +196,11 @@ public final class MultiInputSortingDataInput<IN, K> 
implements StreamTaskInput<
                                                                         / 
numberOfInputs)
                                                         .enableSpilling(
                                                                 ioManager,
-                                                                
jobConfiguration.get(
+                                                                
taskManagerConfiguration.get(
                                                                         
AlgorithmOptions
                                                                                
 .SORT_SPILLING_THRESHOLD))
                                                         .maxNumFileHandles(
-                                                                
jobConfiguration.get(
+                                                                
taskManagerConfiguration.get(
                                                                                
 AlgorithmOptions
                                                                                
         .SPILLING_MAX_FAN)
                                                                         / 
numberOfInputs)
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/SortingDataInput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/SortingDataInput.java
index f68d4962c8c..088519a2dec 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/SortingDataInput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/SortingDataInput.java
@@ -86,7 +86,7 @@ public final class SortingDataInput<T, K> implements 
StreamTaskInput<T> {
             IOManager ioManager,
             boolean objectReuse,
             double managedMemoryFraction,
-            Configuration jobConfiguration,
+            Configuration taskManagerConfiguration,
             TaskInvokable containingTask,
             ExecutionConfig executionConfig) {
         try {
@@ -115,12 +115,13 @@ public final class SortingDataInput<T, K> implements 
StreamTaskInput<T> {
                             .memoryFraction(managedMemoryFraction)
                             .enableSpilling(
                                     ioManager,
-                                    
jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD))
+                                    taskManagerConfiguration.get(
+                                            
AlgorithmOptions.SORT_SPILLING_THRESHOLD))
                             .maxNumFileHandles(
-                                    
jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN))
+                                    
taskManagerConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN))
                             .objectReuse(objectReuse)
                             .largeRecords(
-                                    jobConfiguration.get(
+                                    taskManagerConfiguration.get(
                                             
AlgorithmOptions.USE_LARGE_RECORDS_HANDLER))
                             .build();
         } catch (MemoryAllocationException e) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
index f318e796abe..ee504eb76cf 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
@@ -180,7 +180,7 @@ public class StreamMultipleInputProcessorFactory {
                                     ManagedMemoryUseCase.OPERATOR,
                                     taskManagerConfig,
                                     userClassloader),
-                            jobConfig,
+                            taskManagerConfig,
                             executionConfig);
 
             StreamTaskInput<?>[] sortedInputs = 
selectableSortingInputs.getSortedInputs();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
index 3b2dc87a911..dbfc295ace0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
@@ -159,7 +159,7 @@ public class StreamTwoInputProcessorFactory {
                                     ManagedMemoryUseCase.OPERATOR,
                                     taskManagerConfig,
                                     userClassloader),
-                            jobConfig,
+                            taskManagerConfig,
                             executionConfig);
             inputSelectable = selectableSortingInputs.getInputSelectable();
             StreamTaskInput<?>[] sortedInputs = 
selectableSortingInputs.getSortedInputs();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 025e04cfa08..bc3abb489ec 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -149,7 +149,7 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
                         ManagedMemoryUseCase.OPERATOR,
                         getEnvironment().getTaskConfiguration(),
                         userCodeClassLoader),
-                getJobConfiguration(),
+                getEnvironment().getTaskManagerInfo().getConfiguration(),
                 this,
                 getExecutionConfig());
     }

Reply via email to