This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new 40e9501a5fc [FLINK-30989][runtime] Some config options related to
sorting and spilling are not valid.
40e9501a5fc is described below
commit 40e9501a5fcd7a71af4a7e79cd1556e190488137
Author: Weijie Guo <[email protected]>
AuthorDate: Fri Feb 17 01:16:09 2023 +0800
[FLINK-30989][runtime] Some config options related to sorting and spilling
are not valid.
---
.../streaming/api/operators/sort/MultiInputSortingDataInput.java | 6 +++---
.../flink/streaming/api/operators/sort/SortingDataInput.java | 9 +++++----
.../runtime/io/StreamMultipleInputProcessorFactory.java | 2 +-
.../streaming/runtime/io/StreamTwoInputProcessorFactory.java | 2 +-
.../apache/flink/streaming/runtime/tasks/OneInputStreamTask.java | 2 +-
5 files changed, 11 insertions(+), 10 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java
index 7355e5b0fde..e398028f787 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java
@@ -150,7 +150,7 @@ public final class MultiInputSortingDataInput<IN, K>
implements StreamTaskInput<
IOManager ioManager,
boolean objectReuse,
double managedMemoryFraction,
- Configuration jobConfiguration,
+ Configuration taskManagerConfiguration,
ExecutionConfig executionConfig) {
int keyLength = keySerializer.getLength();
final TypeComparator<Tuple2<byte[], StreamRecord<Object>>> comparator;
@@ -196,11 +196,11 @@ public final class MultiInputSortingDataInput<IN, K>
implements StreamTaskInput<
/
numberOfInputs)
.enableSpilling(
ioManager,
-
jobConfiguration.get(
+
taskManagerConfiguration.get(
AlgorithmOptions
.SORT_SPILLING_THRESHOLD))
.maxNumFileHandles(
-
jobConfiguration.get(
+
taskManagerConfiguration.get(
AlgorithmOptions
.SPILLING_MAX_FAN)
/
numberOfInputs)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/SortingDataInput.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/SortingDataInput.java
index f68d4962c8c..088519a2dec 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/SortingDataInput.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/SortingDataInput.java
@@ -86,7 +86,7 @@ public final class SortingDataInput<T, K> implements
StreamTaskInput<T> {
IOManager ioManager,
boolean objectReuse,
double managedMemoryFraction,
- Configuration jobConfiguration,
+ Configuration taskManagerConfiguration,
TaskInvokable containingTask,
ExecutionConfig executionConfig) {
try {
@@ -115,12 +115,13 @@ public final class SortingDataInput<T, K> implements
StreamTaskInput<T> {
.memoryFraction(managedMemoryFraction)
.enableSpilling(
ioManager,
-
jobConfiguration.get(AlgorithmOptions.SORT_SPILLING_THRESHOLD))
+ taskManagerConfiguration.get(
+
AlgorithmOptions.SORT_SPILLING_THRESHOLD))
.maxNumFileHandles(
-
jobConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN))
+
taskManagerConfiguration.get(AlgorithmOptions.SPILLING_MAX_FAN))
.objectReuse(objectReuse)
.largeRecords(
- jobConfiguration.get(
+ taskManagerConfiguration.get(
AlgorithmOptions.USE_LARGE_RECORDS_HANDLER))
.build();
} catch (MemoryAllocationException e) {
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
index f318e796abe..ee504eb76cf 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
@@ -180,7 +180,7 @@ public class StreamMultipleInputProcessorFactory {
ManagedMemoryUseCase.OPERATOR,
taskManagerConfig,
userClassloader),
- jobConfig,
+ taskManagerConfig,
executionConfig);
StreamTaskInput<?>[] sortedInputs =
selectableSortingInputs.getSortedInputs();
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
index 3b2dc87a911..dbfc295ace0 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
@@ -159,7 +159,7 @@ public class StreamTwoInputProcessorFactory {
ManagedMemoryUseCase.OPERATOR,
taskManagerConfig,
userClassloader),
- jobConfig,
+ taskManagerConfig,
executionConfig);
inputSelectable = selectableSortingInputs.getInputSelectable();
StreamTaskInput<?>[] sortedInputs =
selectableSortingInputs.getSortedInputs();
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 025e04cfa08..bc3abb489ec 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -149,7 +149,7 @@ public class OneInputStreamTask<IN, OUT> extends
StreamTask<OUT, OneInputStreamO
ManagedMemoryUseCase.OPERATOR,
getEnvironment().getTaskConfiguration(),
userCodeClassLoader),
- getJobConfiguration(),
+ getEnvironment().getTaskManagerInfo().getConfiguration(),
this,
getExecutionConfig());
}