Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/5834#discussion_r184031845 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java --- @@ -355,13 +359,53 @@ public static RpcService createRpcService( taskManagerHostname, taskManagerAddress.getHostAddress()); } - final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0); + final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT, "0"); - checkState(rpcPort >= 0 && rpcPort <= 65535, "Invalid value for " + - "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " + - "use 0 to let the system choose port automatically.", - ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort); + // parse port range definition and create port iterator + Iterator<Integer> portsIterator; + try { + portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); + } + + while (portsIterator.hasNext()) { + // first, we check if the port is available by opening a socket + // if the actor system fails to start on the port, we try further + ServerSocket availableSocket = NetUtils.createSocketFromPorts( --- End diff -- @zentol the document always states it support the port range : https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#taskmanager-rpc-port
---