Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5114#discussion_r154725875
--- Diff:
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
---
@@ -217,15 +222,13 @@ public String toString() {
dynamicProperties.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY,
taskManagerHostname);
}
- // use the assigned ports for the TM
- if (assignment.getAssignedPorts().size() < TM_PORT_KEYS.length)
{
- throw new IllegalArgumentException("unsufficient # of
ports assigned");
- }
- for (int i = 0; i < TM_PORT_KEYS.length; i++) {
- int port = assignment.getAssignedPorts().get(i);
- String key = TM_PORT_KEYS[i];
- taskInfo.addResources(ranges("ports",
mesosConfiguration.frameworkInfo().getRole(), range(port, port)));
- dynamicProperties.setInteger(key, port);
+ // take needed ports for the TM
+ List<Protos.Resource> portResources =
allocation.takeRanges("ports", TM_PORT_KEYS.length, roles);
+ taskInfo.addAllResources(portResources);
+ Iterator<String> portsToAssign =
Iterators.forArray(TM_PORT_KEYS);
+ rangeValues(portResources).forEach(port ->
dynamicProperties.setLong(portsToAssign.next(), port));
--- End diff --
This might fail with a `NoSuchElementException` if `portsToAssign.size <
rangeValues(portResources).size`. I assume that we don't have to fail in this
scenario.
---