tillrohrmann commented on a change in pull request #11957:
URL: https://github.com/apache/flink/pull/11957#discussion_r425287489
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
##########
@@ -304,8 +305,12 @@ public void start() throws Exception {
RpcUtils.getHostname(commonRpcService),
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
+ final int poolSize =
configuration.getInteger(ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE,
Hardware.getNumberCPUCores());
+ Preconditions.checkArgument(poolSize > 0,
+ String.format("Illegal pool size (%s)
of io-executor for mini cluster, please re-configure '%s'.",
+ poolSize,
ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE.key()));
Review comment:
The logic to obtain the `poolSize` and to verify it could be factored
out into a method which is shared by the `MiniCluster` and the
`ClusterEntrypoint`. This would avoid code duplication.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
##########
@@ -157,8 +159,12 @@ public static JobManagerSharedServices fromConfiguration(
throw new
IllegalConfigurationException(AkkaUtils.formatDurationParsingErrorMessage());
}
+ final int poolSize =
config.getInteger(ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE,
Hardware.getNumberCPUCores());
+ Preconditions.checkArgument(poolSize > 0,
+ String.format("Illegal pool size (%s) of io-executor
for cluster entry-point, please re-configure '%s'.",
+ poolSize,
ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE.key()));
final ScheduledExecutorService futureExecutor =
Executors.newScheduledThreadPool(
- Hardware.getNumberCPUCores(),
+ poolSize,
new ExecutorThreadFactory("jobmanager-future"));
Review comment:
Instead of making the future executor of the `JobManager` configurable I
would suggest to make the `ioExecutor` of the `ClusterEntrypoint` configurable:
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L263
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]