[hotfix][yarn] Extract number of task slots once from configuration

Let the YarnResourceManager only extract the number of task slots once from the
provided configuration.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a49587a6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a49587a6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a49587a6

Branch: refs/heads/master
Commit: a49587a6e9e1500e55e6bad8510791f5ec01c216
Parents: 49156e8
Author: Till Rohrmann <[email protected]>
Authored: Mon Jul 2 16:46:14 2018 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Mon Jul 2 23:48:42 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flink/yarn/YarnResourceManager.java    | 14 ++++++--------
 1 file changed, 6 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a49587a6/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index ab031be..572e6ba 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -97,9 +97,9 @@ public class YarnResourceManager extends 
ResourceManager<YarnWorkerNode> impleme
        @Nullable
        private final String webInterfaceUrl;
 
-       private final int defaultTaskManagerMemoryMB;
+       private final int numberOfTaskSlots;
 
-       private final int defaultNumSlots;
+       private final int defaultTaskManagerMemoryMB;
 
        private final int defaultCpus;
 
@@ -161,9 +161,9 @@ public class YarnResourceManager extends 
ResourceManager<YarnWorkerNode> impleme
                numPendingContainerRequests = 0;
 
                this.webInterfaceUrl = webInterfaceUrl;
+               this.numberOfTaskSlots = 
flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
                this.defaultTaskManagerMemoryMB = 
flinkConfig.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
-               this.defaultNumSlots = 
flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
-               this.defaultCpus = 
flinkConfig.getInteger(YarnConfigOptions.VCORES, defaultNumSlots);
+               this.defaultCpus = 
flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots);
        }
 
        protected AMRMClientAsync<AMRMClient.ContainerRequest> 
createAndStartResourceManagerClient(
@@ -460,10 +460,8 @@ public class YarnResourceManager extends 
ResourceManager<YarnWorkerNode> impleme
                // init the ContainerLaunchContext
                final String currDir = 
env.get(ApplicationConstants.Environment.PWD.key());
 
-               final int numSlots = 
flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
-
                final ContaineredTaskManagerParameters taskManagerParameters =
-                               
ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), 
numSlots);
+                               
ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), 
numberOfTaskSlots);
 
                log.debug("TaskExecutor {} will be started with container size 
{} MB, JVM heap size {} MB, " +
                                "JVM direct memory limit {} MB",
@@ -515,7 +513,7 @@ public class YarnResourceManager extends 
ResourceManager<YarnWorkerNode> impleme
         */
        private void internalRequestYarnContainer(Resource resource, Priority 
priority) {
                int pendingSlotRequests = getNumberPendingSlotRequests();
-               int pendingSlotAllocation = numPendingContainerRequests * 
defaultNumSlots;
+               int pendingSlotAllocation = numPendingContainerRequests * 
numberOfTaskSlots;
                if (pendingSlotRequests > pendingSlotAllocation) {
                        requestYarnContainer(resource, priority);
                }

Reply via email to