[hotfix][yarn] Extract number of task slots once from configuration Let the YarnResourceManager only extract the number of task slots once from the provided configuration.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1fcd458 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1fcd458 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1fcd458 Branch: refs/heads/release-1.5 Commit: e1fcd458a4faf21c756181bd01342e9ab92b7967 Parents: 4b0aca5 Author: Till Rohrmann <[email protected]> Authored: Mon Jul 2 16:46:14 2018 +0200 Committer: Till Rohrmann <[email protected]> Committed: Mon Jul 2 23:50:22 2018 +0200 ---------------------------------------------------------------------- .../org/apache/flink/yarn/YarnResourceManager.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e1fcd458/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index ab031be..572e6ba 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -97,9 +97,9 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme @Nullable private final String webInterfaceUrl; - private final int defaultTaskManagerMemoryMB; + private final int numberOfTaskSlots; - private final int defaultNumSlots; + private final int defaultTaskManagerMemoryMB; private final int defaultCpus; @@ -161,9 +161,9 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme numPendingContainerRequests = 0; this.webInterfaceUrl = webInterfaceUrl; + this.numberOfTaskSlots = flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS); this.defaultTaskManagerMemoryMB = flinkConfig.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY); - this.defaultNumSlots = flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS); - this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, defaultNumSlots); + this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots); } protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient( @@ -460,10 +460,8 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme // init the ContainerLaunchContext final String currDir = env.get(ApplicationConstants.Environment.PWD.key()); - final int numSlots = flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS); - final ContaineredTaskManagerParameters taskManagerParameters = - ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), numSlots); + ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), numberOfTaskSlots); log.debug("TaskExecutor {} will be started with container size {} MB, JVM heap size {} MB, " + "JVM direct memory limit {} MB", @@ -515,7 +513,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme */ private void internalRequestYarnContainer(Resource resource, Priority priority) { int pendingSlotRequests = getNumberPendingSlotRequests(); - int pendingSlotAllocation = numPendingContainerRequests * defaultNumSlots; + int pendingSlotAllocation = numPendingContainerRequests * numberOfTaskSlots; if (pendingSlotRequests > pendingSlotAllocation) { requestYarnContainer(resource, priority); }
