YARN-1959. Fix headroom calculation in FairScheduler. (Anubhav Dhoot via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/568d3dc2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/568d3dc2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/568d3dc2

Branch: refs/heads/HDFS-6581
Commit: 568d3dc2bbe43b7d2833d5da2b0e6d75eb86e5dd
Parents: a9a55db
Author: Karthik Kambatla <ka...@apache.org>
Authored: Mon Sep 22 23:49:39 2014 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Mon Sep 22 23:49:39 2014 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../scheduler/fair/FSAppAttempt.java            | 27 +++++++++
 .../scheduler/fair/SchedulingPolicy.java        | 15 +++++
 .../DominantResourceFairnessPolicy.java         | 17 +++++-
 .../fair/policies/FairSharePolicy.java          | 11 ++++
 .../scheduler/fair/policies/FifoPolicy.java     | 12 ++++
 .../scheduler/fair/TestFSAppAttempt.java        | 63 ++++++++++++++++++++
 7 files changed, 147 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/568d3dc2/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index ec96302..b8a938d 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -244,6 +244,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2539. FairScheduler: Set the default value for maxAMShare to 0.5. 
     (Wei Yan via kasha)
 
+    YARN-1959. Fix headroom calculation in FairScheduler. 
+    (Anubhav Dhoot via kasha)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/568d3dc2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 825c398..b9966e7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -171,6 +171,33 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
         + priority + "; currentReservation " + currentReservation);
   }
 
+  @Override
+  public synchronized Resource getHeadroom() {
+    final FSQueue queue = (FSQueue) this.queue;
+    SchedulingPolicy policy = queue.getPolicy();
+
+    Resource queueFairShare = queue.getFairShare();
+    Resource queueUsage = queue.getResourceUsage();
+    Resource clusterResource = this.scheduler.getClusterResource();
+    Resource clusterUsage = this.scheduler.getRootQueueMetrics()
+        .getAllocatedResources();
+    Resource clusterAvailableResource = Resources.subtract(clusterResource,
+        clusterUsage);
+    Resource headroom = policy.getHeadroom(queueFairShare,
+        queueUsage, clusterAvailableResource);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Headroom calculation for " + this.getName() + ":" +
+          "Min(" +
+          "(queueFairShare=" + queueFairShare +
+          " - queueUsage=" + queueUsage + ")," +
+          " clusterAvailableResource=" + clusterAvailableResource +
+          "(clusterResource=" + clusterResource +
+          " - clusterUsage=" + clusterUsage + ")" +
+          "Headroom=" + headroom);
+    }
+    return headroom;
+  }
+
   public synchronized float getLocalityWaitFactor(
       Priority priority, int clusterNodes) {
     // Estimate: Required unique resources (i.e. hosts + racks)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/568d3dc2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
index ca006c5..4f3123d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
@@ -175,4 +175,19 @@ public abstract class SchedulingPolicy {
    */
   public abstract boolean checkIfAMResourceUsageOverLimit(
       Resource usage, Resource maxAMResource);
+
+  /**
+   * Get headroom by calculating the min of <code>clusterAvailable</code> and
+   * (<code>queueFairShare</code> - <code>queueUsage</code>) resources that are
+   * applicable to this policy. For eg if only memory then leave other
+   * resources such as CPU to same as clusterAvailable.
+   *
+   * @param queueFairShare fairshare in the queue
+   * @param queueUsage resources used in the queue
+   * @param clusterAvailable available resource in cluster
+   * @return calculated headroom
+   */
+  public abstract Resource getHeadroom(Resource queueFairShare,
+      Resource queueUsage, Resource clusterAvailable);
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/568d3dc2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
index 42044bc..3f6cbd1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
@@ -77,7 +77,7 @@ public class DominantResourceFairnessPolicy extends 
SchedulingPolicy {
       ComputeFairShares.computeSteadyShares(queues, totalResources, type);
     }
   }
-  
+
   @Override
   public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) 
{
     return !Resources.fitsIn(usage, fairShare);
@@ -89,6 +89,21 @@ public class DominantResourceFairnessPolicy extends 
SchedulingPolicy {
   }
 
   @Override
+  public Resource getHeadroom(Resource queueFairShare, Resource queueUsage,
+                              Resource clusterAvailable) {
+    int queueAvailableMemory =
+        Math.max(queueFairShare.getMemory() - queueUsage.getMemory(), 0);
+    int queueAvailableCPU =
+        Math.max(queueFairShare.getVirtualCores() - queueUsage
+            .getVirtualCores(), 0);
+    Resource headroom = Resources.createResource(
+        Math.min(clusterAvailable.getMemory(), queueAvailableMemory),
+        Math.min(clusterAvailable.getVirtualCores(),
+            queueAvailableCPU));
+    return headroom;
+  }
+
+  @Override
   public void initialize(Resource clusterCapacity) {
     comparator.setClusterCapacity(clusterCapacity);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/568d3dc2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
index 66bb88b..97669cb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
@@ -115,6 +115,17 @@ public class FairSharePolicy extends SchedulingPolicy {
   }
 
   @Override
+  public Resource getHeadroom(Resource queueFairShare,
+                              Resource queueUsage, Resource clusterAvailable) {
+    int queueAvailableMemory = Math.max(
+        queueFairShare.getMemory() - queueUsage.getMemory(), 0);
+    Resource headroom = Resources.createResource(
+        Math.min(clusterAvailable.getMemory(), queueAvailableMemory),
+        clusterAvailable.getVirtualCores());
+    return headroom;
+  }
+
+  @Override
   public void computeShares(Collection<? extends Schedulable> schedulables,
       Resource totalResources) {
     ComputeFairShares.computeShares(schedulables, totalResources, 
ResourceType.MEMORY);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/568d3dc2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
index 591ee49..a2e17ec 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
@@ -108,6 +108,18 @@ public class FifoPolicy extends SchedulingPolicy {
   }
 
   @Override
+  public Resource getHeadroom(Resource queueFairShare,
+                              Resource queueUsage, Resource clusterAvailable) {
+    int queueAvailableMemory = Math.max(
+        queueFairShare.getMemory() - queueUsage.getMemory(), 0);
+    Resource headroom = Resources.createResource(
+        Math.min(clusterAvailable.getMemory(), queueAvailableMemory),
+        clusterAvailable.getVirtualCores());
+    return headroom;
+  }
+
+
+  @Override
   public byte getApplicableDepth() {
     return SchedulingPolicy.DEPTH_LEAF;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/568d3dc2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
index 0ab1f70..f560690 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import static org.junit.Assert.assertEquals;
 
@@ -26,7 +27,12 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
 import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -185,4 +191,61 @@ public class TestFSAppAttempt extends 
FairSchedulerTestBase {
     assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel(
         prio, 10, -1.0, -1.0));
   }
+
+  @Test
+  public void testHeadroom() {
+    final FairScheduler mockScheduler = Mockito.mock(FairScheduler.class);
+    Mockito.when(mockScheduler.getClock()).thenReturn(scheduler.getClock());
+
+    final FSLeafQueue mockQueue = Mockito.mock(FSLeafQueue.class);
+    final Resource queueFairShare = Resources.createResource(4096, 4);
+    final Resource queueUsage = Resource.newInstance(1024, 1);
+    final Resource clusterResource = Resources.createResource(8192, 8);
+    final Resource clusterUsage = Resources.createResource(6144, 2);
+    final QueueMetrics fakeRootQueueMetrics = Mockito.mock(QueueMetrics.class);
+
+    ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
+    RMContext rmContext = resourceManager.getRMContext();
+    FSAppAttempt schedulerApp =
+        new FSAppAttempt(mockScheduler, applicationAttemptId, "user1", 
mockQueue ,
+            null, rmContext);
+
+    Mockito.when(mockQueue.getFairShare()).thenReturn(queueFairShare);
+    Mockito.when(mockQueue.getResourceUsage()).thenReturn(queueUsage);
+    Mockito.when(mockScheduler.getClusterResource()).thenReturn
+        (clusterResource);
+    Mockito.when(fakeRootQueueMetrics.getAllocatedResources()).thenReturn
+        (clusterUsage);
+    Mockito.when(mockScheduler.getRootQueueMetrics()).thenReturn
+        (fakeRootQueueMetrics);
+
+    int minClusterAvailableMemory = 2048;
+    int minClusterAvailableCPU = 6;
+    int minQueueAvailableCPU = 3;
+
+    // Min of Memory and CPU across cluster and queue is used in
+    // DominantResourceFairnessPolicy
+    Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy
+        .getInstance(DominantResourceFairnessPolicy.class));
+    verifyHeadroom(schedulerApp, minClusterAvailableMemory,
+        minQueueAvailableCPU);
+
+    // Fair and Fifo ignore CPU of queue, so use cluster available CPU
+    Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy
+        .getInstance(FairSharePolicy.class));
+    verifyHeadroom(schedulerApp, minClusterAvailableMemory,
+        minClusterAvailableCPU);
+
+    Mockito.when(mockQueue.getPolicy()).thenReturn(SchedulingPolicy
+        .getInstance(FifoPolicy.class));
+    verifyHeadroom(schedulerApp, minClusterAvailableMemory,
+        minClusterAvailableCPU);
+  }
+
+  protected void verifyHeadroom(FSAppAttempt schedulerApp,
+                                int expectedMemory, int expectedCPU) {
+    Resource headroom = schedulerApp.getHeadroom();
+    assertEquals(expectedMemory, headroom.getMemory());
+    assertEquals(expectedCPU, headroom.getVirtualCores());
+  }
 }

Reply via email to