tillrohrmann commented on a change in pull request #12264:
URL: https://github.com/apache/flink/pull/12264#discussion_r431066056



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
##########
@@ -270,7 +271,8 @@ public static TaskManagerServicesConfiguration 
fromConfiguration(
 
                final String[] alwaysParentFirstLoaderPatterns = 
CoreOptions.getParentFirstLoaderPatterns(configuration);
 
-               final int numIoThreads = 
configuration.get(TaskManagerOptions.NUM_IO_THREADS);
+               // multiply core-count to be on the safer side, since we used a 
pool with size=64 in the past
+               final int numIoThreads = 
ClusterEntrypointUtils.getPoolSize(configuration) * 4;

Review comment:
       ```suggestion
                final int numIoThreads = 
ClusterEntrypointUtils.getPoolSize(configuration);
   ```
   
   I will update `getPoolSize` to return the new default value of `4 * cores` 
as part of changing the type of the thread pool on the JM side.

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/executor/TestExecutorResource.java
##########
@@ -19,35 +19,37 @@
 
 import org.junit.rules.ExternalResource;
 
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Supplier;
 
 /**
- * Resource which starts an {@link ExecutorService} for testing purposes.
+ * Resource which starts/stops an {@link ExecutorService} for testing purposes.
  */
-public class TestExecutorServiceResource extends ExternalResource {
+public class TestExecutorResource extends ExternalResource {
 
        private final Supplier<ExecutorService> serviceFactory;
 
-       private ExecutorService executorService;
+       private ExecutorService executor;

Review comment:
       The name `executorService` could have been kept.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
##########
@@ -366,11 +367,15 @@ public static TaskExecutor startTaskManager(
                        resourceID,
                        
taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
 
+               final ExecutorService ioExecutor = 
Executors.newCachedThreadPool(
+                       taskManagerServicesConfiguration.getNumIoThreads(),
+                       new ExecutorThreadFactory("flink-taskexecutor-io"));

Review comment:
       This could be moved into the `fromConfiguration` method. Given that the 
`TaskManagerServices` is responsible for managing the created 
`ExecutorService`, I think it is fine to move the creation into the 
`fromConfiguration` method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to