This is an automated email from the ASF dual-hosted git repository.
yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2f76a305693 [FLINK-28376][network] Restrict the number of threads for
sort-shuffle data read
2f76a305693 is described below
commit 2f76a30569332728def4df0c316ee1a4f3409aa9
Author: Tan Yuxin <[email protected]>
AuthorDate: Thu Jul 21 11:24:16 2022 +0800
[FLINK-28376][network] Restrict the number of threads for sort-shuffle data
read
This closes #20308.
---
.../io/network/NettyShuffleServiceFactory.java | 27 +++++++++++++++-------
.../runtime/shuffle/ShuffleEnvironmentContext.java | 14 +++++++++++
.../runtime/taskexecutor/TaskManagerServices.java | 2 ++
.../io/network/NettyShuffleEnvironmentBuilder.java | 5 +++-
.../SharedPoolNettyShuffleServiceFactory.java | 4 +++-
5 files changed, 42 insertions(+), 10 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index e3789924fa8..6f9c0bb29d7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -38,7 +38,6 @@ import
org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
import org.apache.flink.runtime.shuffle.ShuffleServiceFactory;
import
org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
-import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
@@ -81,7 +80,9 @@ public class NettyShuffleServiceFactory
shuffleEnvironmentContext.getTaskExecutorResourceId(),
shuffleEnvironmentContext.getEventPublisher(),
shuffleEnvironmentContext.getParentMetricGroup(),
- shuffleEnvironmentContext.getIoExecutor());
+ shuffleEnvironmentContext.getIoExecutor(),
+ shuffleEnvironmentContext.getNumberOfSlots(),
+ shuffleEnvironmentContext.getTmpDirPaths());
}
@VisibleForTesting
@@ -90,14 +91,18 @@ public class NettyShuffleServiceFactory
ResourceID taskExecutorResourceId,
TaskEventPublisher taskEventPublisher,
MetricGroup metricGroup,
- Executor ioExecutor) {
+ Executor ioExecutor,
+ int numberOfSlots,
+ String[] tmpDirPaths) {
return createNettyShuffleEnvironment(
config,
taskExecutorResourceId,
taskEventPublisher,
new ResultPartitionManager(),
metricGroup,
- ioExecutor);
+ ioExecutor,
+ numberOfSlots,
+ tmpDirPaths);
}
@VisibleForTesting
@@ -107,7 +112,9 @@ public class NettyShuffleServiceFactory
TaskEventPublisher taskEventPublisher,
ResultPartitionManager resultPartitionManager,
MetricGroup metricGroup,
- Executor ioExecutor) {
+ Executor ioExecutor,
+ int numberOfSlots,
+ String[] tmpDirPaths) {
NettyConfig nettyConfig = config.nettyConfig();
ConnectionManager connectionManager =
nettyConfig != null
@@ -125,7 +132,9 @@ public class NettyShuffleServiceFactory
resultPartitionManager,
connectionManager,
metricGroup,
- ioExecutor);
+ ioExecutor,
+ numberOfSlots,
+ tmpDirPaths);
}
@VisibleForTesting
@@ -136,7 +145,9 @@ public class NettyShuffleServiceFactory
ResultPartitionManager resultPartitionManager,
ConnectionManager connectionManager,
MetricGroup metricGroup,
- Executor ioExecutor) {
+ Executor ioExecutor,
+ int numberOfSlots,
+ String[] tmpDirPaths) {
checkNotNull(config);
checkNotNull(taskExecutorResourceId);
checkNotNull(taskEventPublisher);
@@ -177,7 +188,7 @@ public class NettyShuffleServiceFactory
1,
Math.min(
batchShuffleReadBufferPool.getMaxConcurrentRequests(),
- 4 * Hardware.getNumberCPUCores())),
+ Math.max(numberOfSlots,
tmpDirPaths.length))),
new ExecutorThreadFactory("blocking-shuffle-io"));
registerShuffleMetrics(metricGroup, networkBufferPool);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
index bf86acb295c..c50fcb0c0f8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
@@ -38,6 +38,8 @@ public class ShuffleEnvironmentContext {
private final InetAddress hostAddress;
private final TaskEventPublisher eventPublisher;
private final MetricGroup parentMetricGroup;
+ private final int numberOfSlots;
+ private final String[] tmpDirPaths;
private final Executor ioExecutor;
@@ -47,6 +49,8 @@ public class ShuffleEnvironmentContext {
MemorySize networkMemorySize,
boolean localCommunicationOnly,
InetAddress hostAddress,
+ int numberOfSlots,
+ String[] tmpDirPaths,
TaskEventPublisher eventPublisher,
MetricGroup parentMetricGroup,
Executor ioExecutor) {
@@ -58,6 +62,8 @@ public class ShuffleEnvironmentContext {
this.eventPublisher = checkNotNull(eventPublisher);
this.parentMetricGroup = checkNotNull(parentMetricGroup);
this.ioExecutor = ioExecutor;
+ this.numberOfSlots = numberOfSlots;
+ this.tmpDirPaths = checkNotNull(tmpDirPaths);
}
public Configuration getConfiguration() {
@@ -91,4 +97,12 @@ public class ShuffleEnvironmentContext {
public Executor getIoExecutor() {
return ioExecutor;
}
+
+ public int getNumberOfSlots() {
+ return numberOfSlots;
+ }
+
+ public String[] getTmpDirPaths() {
+ return tmpDirPaths;
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 78c1e282823..5bc5f292092 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -414,6 +414,8 @@ public class TaskManagerServices {
taskManagerServicesConfiguration.getNetworkMemorySize(),
taskManagerServicesConfiguration.isLocalCommunicationOnly(),
taskManagerServicesConfiguration.getBindAddress(),
+ taskManagerServicesConfiguration.getNumberOfSlots(),
+ taskManagerServicesConfiguration.getTmpDirPaths(),
taskEventDispatcher,
taskManagerMetricGroup,
ioExecutor);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index 5b6bd50ee21..1f9ed2e21b4 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -36,6 +36,7 @@ import java.util.concurrent.Executor;
/** Builder for the {@link NettyShuffleEnvironment}. */
public class NettyShuffleEnvironmentBuilder {
+ private static final int DEFAULT_NUM_SLOTS = 1;
private static final int DEFAULT_NETWORK_BUFFER_SIZE = 32 << 10;
private static final int DEFAULT_NUM_NETWORK_BUFFERS = 1024;
@@ -231,6 +232,8 @@ public class NettyShuffleEnvironmentBuilder {
new TaskEventDispatcher(),
resultPartitionManager,
metricGroup,
- ioExecutor);
+ ioExecutor,
+ DEFAULT_NUM_SLOTS,
+ DEFAULT_TEMP_DIRS);
}
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SharedPoolNettyShuffleServiceFactory.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SharedPoolNettyShuffleServiceFactory.java
index e22187cc6d2..184b61bfc56 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SharedPoolNettyShuffleServiceFactory.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SharedPoolNettyShuffleServiceFactory.java
@@ -99,6 +99,8 @@ public final class SharedPoolNettyShuffleServiceFactory
resultPartitionManager,
connectionManager,
shuffleEnvironmentContext.getParentMetricGroup(),
- shuffleEnvironmentContext.getIoExecutor());
+ shuffleEnvironmentContext.getIoExecutor(),
+ shuffleEnvironmentContext.getNumberOfSlots(),
+ shuffleEnvironmentContext.getTmpDirPaths());
}
}