This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f76a305693 [FLINK-28376][network] Restrict the number of threads for 
sort-shuffle data read
2f76a305693 is described below

commit 2f76a30569332728def4df0c316ee1a4f3409aa9
Author: Tan Yuxin <[email protected]>
AuthorDate: Thu Jul 21 11:24:16 2022 +0800

    [FLINK-28376][network] Restrict the number of threads for sort-shuffle data 
read
    
    This closes #20308.
---
 .../io/network/NettyShuffleServiceFactory.java     | 27 +++++++++++++++-------
 .../runtime/shuffle/ShuffleEnvironmentContext.java | 14 +++++++++++
 .../runtime/taskexecutor/TaskManagerServices.java  |  2 ++
 .../io/network/NettyShuffleEnvironmentBuilder.java |  5 +++-
 .../SharedPoolNettyShuffleServiceFactory.java      |  4 +++-
 5 files changed, 42 insertions(+), 10 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index e3789924fa8..6f9c0bb29d7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -38,7 +38,6 @@ import 
org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
 import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
 import org.apache.flink.runtime.shuffle.ShuffleServiceFactory;
 import 
org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
-import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import org.slf4j.Logger;
@@ -81,7 +80,9 @@ public class NettyShuffleServiceFactory
                 shuffleEnvironmentContext.getTaskExecutorResourceId(),
                 shuffleEnvironmentContext.getEventPublisher(),
                 shuffleEnvironmentContext.getParentMetricGroup(),
-                shuffleEnvironmentContext.getIoExecutor());
+                shuffleEnvironmentContext.getIoExecutor(),
+                shuffleEnvironmentContext.getNumberOfSlots(),
+                shuffleEnvironmentContext.getTmpDirPaths());
     }
 
     @VisibleForTesting
@@ -90,14 +91,18 @@ public class NettyShuffleServiceFactory
             ResourceID taskExecutorResourceId,
             TaskEventPublisher taskEventPublisher,
             MetricGroup metricGroup,
-            Executor ioExecutor) {
+            Executor ioExecutor,
+            int numberOfSlots,
+            String[] tmpDirPaths) {
         return createNettyShuffleEnvironment(
                 config,
                 taskExecutorResourceId,
                 taskEventPublisher,
                 new ResultPartitionManager(),
                 metricGroup,
-                ioExecutor);
+                ioExecutor,
+                numberOfSlots,
+                tmpDirPaths);
     }
 
     @VisibleForTesting
@@ -107,7 +112,9 @@ public class NettyShuffleServiceFactory
             TaskEventPublisher taskEventPublisher,
             ResultPartitionManager resultPartitionManager,
             MetricGroup metricGroup,
-            Executor ioExecutor) {
+            Executor ioExecutor,
+            int numberOfSlots,
+            String[] tmpDirPaths) {
         NettyConfig nettyConfig = config.nettyConfig();
         ConnectionManager connectionManager =
                 nettyConfig != null
@@ -125,7 +132,9 @@ public class NettyShuffleServiceFactory
                 resultPartitionManager,
                 connectionManager,
                 metricGroup,
-                ioExecutor);
+                ioExecutor,
+                numberOfSlots,
+                tmpDirPaths);
     }
 
     @VisibleForTesting
@@ -136,7 +145,9 @@ public class NettyShuffleServiceFactory
             ResultPartitionManager resultPartitionManager,
             ConnectionManager connectionManager,
             MetricGroup metricGroup,
-            Executor ioExecutor) {
+            Executor ioExecutor,
+            int numberOfSlots,
+            String[] tmpDirPaths) {
         checkNotNull(config);
         checkNotNull(taskExecutorResourceId);
         checkNotNull(taskEventPublisher);
@@ -177,7 +188,7 @@ public class NettyShuffleServiceFactory
                                 1,
                                 Math.min(
                                         
batchShuffleReadBufferPool.getMaxConcurrentRequests(),
-                                        4 * Hardware.getNumberCPUCores())),
+                                        Math.max(numberOfSlots, 
tmpDirPaths.length))),
                         new ExecutorThreadFactory("blocking-shuffle-io"));
 
         registerShuffleMetrics(metricGroup, networkBufferPool);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
index bf86acb295c..c50fcb0c0f8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java
@@ -38,6 +38,8 @@ public class ShuffleEnvironmentContext {
     private final InetAddress hostAddress;
     private final TaskEventPublisher eventPublisher;
     private final MetricGroup parentMetricGroup;
+    private final int numberOfSlots;
+    private final String[] tmpDirPaths;
 
     private final Executor ioExecutor;
 
@@ -47,6 +49,8 @@ public class ShuffleEnvironmentContext {
             MemorySize networkMemorySize,
             boolean localCommunicationOnly,
             InetAddress hostAddress,
+            int numberOfSlots,
+            String[] tmpDirPaths,
             TaskEventPublisher eventPublisher,
             MetricGroup parentMetricGroup,
             Executor ioExecutor) {
@@ -58,6 +62,8 @@ public class ShuffleEnvironmentContext {
         this.eventPublisher = checkNotNull(eventPublisher);
         this.parentMetricGroup = checkNotNull(parentMetricGroup);
         this.ioExecutor = ioExecutor;
+        this.numberOfSlots = numberOfSlots;
+        this.tmpDirPaths = checkNotNull(tmpDirPaths);
     }
 
     public Configuration getConfiguration() {
@@ -91,4 +97,12 @@ public class ShuffleEnvironmentContext {
     public Executor getIoExecutor() {
         return ioExecutor;
     }
+
+    public int getNumberOfSlots() {
+        return numberOfSlots;
+    }
+
+    public String[] getTmpDirPaths() {
+        return tmpDirPaths;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 78c1e282823..5bc5f292092 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -414,6 +414,8 @@ public class TaskManagerServices {
                         
taskManagerServicesConfiguration.getNetworkMemorySize(),
                         
taskManagerServicesConfiguration.isLocalCommunicationOnly(),
                         taskManagerServicesConfiguration.getBindAddress(),
+                        taskManagerServicesConfiguration.getNumberOfSlots(),
+                        taskManagerServicesConfiguration.getTmpDirPaths(),
                         taskEventDispatcher,
                         taskManagerMetricGroup,
                         ioExecutor);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index 5b6bd50ee21..1f9ed2e21b4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -36,6 +36,7 @@ import java.util.concurrent.Executor;
 /** Builder for the {@link NettyShuffleEnvironment}. */
 public class NettyShuffleEnvironmentBuilder {
 
+    private static final int DEFAULT_NUM_SLOTS = 1;
     private static final int DEFAULT_NETWORK_BUFFER_SIZE = 32 << 10;
     private static final int DEFAULT_NUM_NETWORK_BUFFERS = 1024;
 
@@ -231,6 +232,8 @@ public class NettyShuffleEnvironmentBuilder {
                 new TaskEventDispatcher(),
                 resultPartitionManager,
                 metricGroup,
-                ioExecutor);
+                ioExecutor,
+                DEFAULT_NUM_SLOTS,
+                DEFAULT_TEMP_DIRS);
     }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SharedPoolNettyShuffleServiceFactory.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SharedPoolNettyShuffleServiceFactory.java
index e22187cc6d2..184b61bfc56 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SharedPoolNettyShuffleServiceFactory.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SharedPoolNettyShuffleServiceFactory.java
@@ -99,6 +99,8 @@ public final class SharedPoolNettyShuffleServiceFactory
                 resultPartitionManager,
                 connectionManager,
                 shuffleEnvironmentContext.getParentMetricGroup(),
-                shuffleEnvironmentContext.getIoExecutor());
+                shuffleEnvironmentContext.getIoExecutor(),
+                shuffleEnvironmentContext.getNumberOfSlots(),
+                shuffleEnvironmentContext.getTmpDirPaths());
     }
 }

Reply via email to