zhuzhurk commented on a change in pull request #15199:
URL: https://github.com/apache/flink/pull/15199#discussion_r598681288



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
##########
@@ -119,13 +121,33 @@ static NettyShuffleEnvironment 
createNettyShuffleEnvironment(
                         config.networkBufferSize(),
                         config.getRequestSegmentsTimeout());
 
+        // we create a separated buffer pool here for batch shuffle instead of 
reusing the network
+        // buffer pool directly to avoid potential side effects of memory 
contention, for example,
+        // dead lock or "insufficient network buffer" error
+        BatchShuffleReadBufferPool batchShuffleReadBufferPool =
+                new BatchShuffleReadBufferPool(
+                        config.batchShuffleReadMemoryBytes(), 
config.networkBufferSize());
+
+        // we create a separated IO executor pool here for batch shuffle 
instead of reusing the
+        // TaskManager IO executor pool directly to avoid the potential side 
effects of execution
+        // contention, for example, too long IO or waiting time leading to 
starvation or timeout
+        BatchShuffleReadIOExecutor batchShuffleReadIOExecutor =
+                new BatchShuffleReadIOExecutor(
+                        Math.max(

Review comment:
       maybe let's do this min and max in the ctor of 
BatchShuffleReadIOExecutor? Then `BatchShuffleReadIOExecutor.MIN_NUM_THREADS` 
and `BatchShuffleReadIOExecutor.MAX_NUM_THREADS` can be private.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to