YARN-7934. [GQ] Refactor preemption calculators to allow overriding for 
Federation Global Algos. (Contributed by curino)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/514794e1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/514794e1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/514794e1

Branch: refs/heads/HDFS-12996
Commit: 514794e1a5a39ca61de3981d53a05547ae17f5e4
Parents: 95904f6
Author: Carlo Curino <cur...@apache.org>
Authored: Thu Feb 22 18:12:12 2018 -0800
Committer: Carlo Curino <cur...@apache.org>
Committed: Thu Feb 22 18:12:12 2018 -0800

----------------------------------------------------------------------
 .../AbstractPreemptableResourceCalculator.java  |  38 +++++--
 .../capacity/AbstractPreemptionEntity.java      |   4 +
 .../CapacitySchedulerPreemptionContext.java     |   6 +-
 .../capacity/PreemptableResourceCalculator.java |  21 ++--
 .../monitor/capacity/TempQueuePerPartition.java | 106 +++++++++++++++----
 .../webapp/dao/ResourceInfo.java                |   5 +-
 6 files changed, 139 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/514794e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
index 5196831..2589970 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
@@ -18,6 +18,12 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
@@ -26,12 +32,6 @@ import 
org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.PriorityQueue;
-
 /**
  * Calculate how much resources need to be preempted for each queue,
  * will be used by {@link PreemptionCandidatesSelector}.
@@ -126,11 +126,18 @@ public class AbstractPreemptableResourceCalculator {
       TempQueuePerPartition q = i.next();
       Resource used = q.getUsed();
 
+      Resource initIdealAssigned;
       if (Resources.greaterThan(rc, totGuarant, used, q.getGuaranteed())) {
-        q.idealAssigned = Resources.add(q.getGuaranteed(), q.untouchableExtra);
+        initIdealAssigned =
+            Resources.add(q.getGuaranteed(), q.untouchableExtra);
       } else {
-        q.idealAssigned = Resources.clone(used);
+        initIdealAssigned = Resources.clone(used);
       }
+
+      // perform initial assignment
+      initIdealAssignment(totGuarant, q, initIdealAssigned);
+
+
       Resources.subtractFrom(unassigned, q.idealAssigned);
       // If idealAssigned < (allocated + used + pending), q needs more
       // resources, so
@@ -188,6 +195,21 @@ public class AbstractPreemptableResourceCalculator {
     }
   }
 
+
+  /**
+   * This method is visible to allow sub-classes to override the initialization
+   * behavior.
+   *
+   * @param totGuarant total resources (useful for {@code ResourceCalculator}
+   *          operations)
+   * @param q the {@code TempQueuePerPartition} being initialized
+   * @param initIdealAssigned the proposed initialization value.
+   */
+  protected void initIdealAssignment(Resource totGuarant,
+      TempQueuePerPartition q, Resource initIdealAssigned) {
+    q.idealAssigned = initIdealAssigned;
+  }
+
   /**
    * Computes a normalizedGuaranteed capacity based on active queues.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/514794e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java
index dbd1f0a..cb4d7af 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java
@@ -59,6 +59,10 @@ public class AbstractPreemptionEntity {
     this.selected = Resource.newInstance(0, 0);
   }
 
+  public String getQueueName() {
+    return queueName;
+  }
+
   public Resource getUsed() {
     return current;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/514794e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
index d6f3f6c..098acdd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
@@ -30,7 +30,11 @@ import java.util.Collection;
 import java.util.LinkedHashSet;
 import java.util.Set;
 
-interface CapacitySchedulerPreemptionContext {
+/**
+ * This interface provides context for the calculation of ideal allocation
+ * and preemption for the {@code CapacityScheduler}.
+ */
+public interface CapacitySchedulerPreemptionContext {
   CapacityScheduler getScheduler();
 
   TempQueuePerPartition getQueueByPartition(String queueName,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/514794e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
index 907785e..2d2cdf6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
@@ -18,6 +18,11 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -26,11 +31,6 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
 /**
  * Calculate how much resources need to be preempted for each queue,
  * will be used by {@link PreemptionCandidatesSelector}
@@ -70,7 +70,7 @@ public class PreemptableResourceCalculator
    * @param totalPreemptionAllowed total amount of preemption we allow
    * @param tot_guarant the amount of capacity assigned to this pool of queues
    */
-  private void computeIdealResourceDistribution(ResourceCalculator rc,
+  protected void computeIdealResourceDistribution(ResourceCalculator rc,
       List<TempQueuePerPartition> queues, Resource totalPreemptionAllowed,
       Resource tot_guarant) {
 
@@ -138,14 +138,13 @@ public class PreemptableResourceCalculator
   /**
    * This method recursively computes the ideal assignment of resources to each
    * level of the hierarchy. This ensures that leafs that are over-capacity but
-   * with parents within capacity will not be preemptionCandidates. 
Preemptions are allowed
-   * within each subtree according to local over/under capacity.
+   * with parents within capacity will not be preemptionCandidates. Preemptions
+   * are allowed within each subtree according to local over/under capacity.
    *
    * @param root the root of the cloned queue hierachy
    * @param totalPreemptionAllowed maximum amount of preemption allowed
-   * @return a list of leaf queues updated with preemption targets
    */
-  private void recursivelyComputeIdealAssignment(
+  protected void recursivelyComputeIdealAssignment(
       TempQueuePerPartition root, Resource totalPreemptionAllowed) {
     if (root.getChildren() != null &&
         root.getChildren().size() > 0) {
@@ -242,7 +241,7 @@ public class PreemptableResourceCalculator
 
       // compute the ideal distribution of resources among queues
       // updates cloned queues state accordingly
-      tRoot.idealAssigned = tRoot.getGuaranteed();
+      tRoot.initializeRootIdealWithGuarangeed();
       recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/514794e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
index fdeee52..9d8297d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
@@ -18,22 +18,20 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
 import org.apache.hadoop.yarn.api.records.Resource;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
-
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
-    .ParentQueue;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
 /**
  * Temporary data-structure tracking resource availability, pending resource
  * need, current utilization. This is per-queue-per-partition data structure
@@ -74,7 +72,8 @@ public class TempQueuePerPartition extends 
AbstractPreemptionEntity {
   // idealAssigned, used etc.
   Map<String, TempUserPerPartition> usersPerPartition = new LinkedHashMap<>();
 
-  TempQueuePerPartition(String queueName, Resource current,
+  @SuppressWarnings("checkstyle:parameternumber")
+  public TempQueuePerPartition(String queueName, Resource current,
       boolean preemptionDisabled, String partition, Resource killable,
       float absCapacity, float absMaxCapacity, Resource totalPartitionResource,
       Resource reserved, CSQueue queue, Resource effMinRes,
@@ -94,7 +93,7 @@ public class TempQueuePerPartition extends 
AbstractPreemptionEntity {
       pendingDeductReserved = Resources.createResource(0);
     }
 
-    if (ParentQueue.class.isAssignableFrom(queue.getClass())) {
+    if (queue != null && ParentQueue.class.isAssignableFrom(queue.getClass())) 
{
       parentQueue = (ParentQueue) queue;
     }
 
@@ -179,15 +178,14 @@ public class TempQueuePerPartition extends 
AbstractPreemptionEntity {
     // Because for a satisfied parent queue, it could have some under-utilized
     // leaf queues. Such under-utilized leaf queue could preemption resources
     // from over-utilized leaf queue located at other hierarchies.
-    if (null == children || children.isEmpty()) {
-      Resource maxOfGuranteedAndUsedDeductAssigned = Resources.subtract(
-          Resources.max(rc, clusterResource, getUsed(), getGuaranteed()),
-          idealAssigned);
-      maxOfGuranteedAndUsedDeductAssigned = Resources.max(rc, clusterResource,
-          maxOfGuranteedAndUsedDeductAssigned, Resources.none());
-      accepted = Resources.min(rc, clusterResource, accepted,
-          maxOfGuranteedAndUsedDeductAssigned);
-    }
+
+    accepted = filterByMaxDeductAssigned(rc, clusterResource, accepted);
+
+    // accepted so far contains the "quota acceptable" amount, we now filter by
+    // locality acceptable
+
+    accepted = acceptedByLocality(rc, accepted);
+
     Resource remain = Resources.subtract(avail, accepted);
     Resources.addTo(idealAssigned, accepted);
     return remain;
@@ -329,4 +327,72 @@ public class TempQueuePerPartition extends 
AbstractPreemptionEntity {
   public Map<String, TempUserPerPartition> getUsersPerPartition() {
     return usersPerPartition;
   }
+
+  public void setPending(Resource pending) {
+    this.pending = pending;
+  }
+
+  public Resource getIdealAssigned() {
+    return idealAssigned;
+  }
+
+  public String toGlobalString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("\n").append(toString());
+    for (TempQueuePerPartition c : children) {
+      sb.append(c.toGlobalString());
+    }
+    return sb.toString();
+  }
+
+  /**
+   * This method is visible to allow sub-classes to override the behavior,
+   * specifically to take into account locality-based limitations of how much
+   * the queue can consumed.
+   *
+   * @param rc the ResourceCalculator to be used.
+   * @param offered the input amount of Resource offered to this queue.
+   *
+   * @return  the subset of Resource(s) that the queue can consumed after
+   *          accounting for locality effects.
+   */
+  protected Resource acceptedByLocality(ResourceCalculator rc,
+      Resource offered) {
+    return offered;
+  }
+
+  /**
+   * This method is visible to allow sub-classes to override the behavior,
+   * specifically for federation purposes we do not want to cap resources as it
+   * is done here.
+   *
+   * @param rc the {@code ResourceCalculator} to be used
+   * @param clusterResource the total cluster resources
+   * @param offered the resources offered to this queue
+   * @return the amount of resources accepted after considering max and
+   *         deducting assigned.
+   */
+  protected Resource filterByMaxDeductAssigned(ResourceCalculator rc,
+      Resource clusterResource, Resource offered) {
+    if (null == children || children.isEmpty()) {
+      Resource maxOfGuranteedAndUsedDeductAssigned = Resources.subtract(
+          Resources.max(rc, clusterResource, getUsed(), getGuaranteed()),
+          idealAssigned);
+      maxOfGuranteedAndUsedDeductAssigned = Resources.max(rc, clusterResource,
+          maxOfGuranteedAndUsedDeductAssigned, Resources.none());
+      offered = Resources.min(rc, clusterResource, offered,
+          maxOfGuranteedAndUsedDeductAssigned);
+    }
+    return offered;
+  }
+
+  /**
+   * This method is visible to allow sub-classes to ovverride the behavior,
+   * specifically for federation purposes we need to initialize per-sub-cluster
+   * roots as well as the global one.
+   */
+  protected void initializeRootIdealWithGuarangeed() {
+    idealAssigned = Resources.clone(getGuaranteed());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/514794e1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ResourceInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ResourceInfo.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ResourceInfo.java
index 5bed936..9a335e9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ResourceInfo.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ResourceInfo.java
@@ -70,7 +70,7 @@ public class ResourceInfo {
 
   @Override
   public String toString() {
-    return resources.toString();
+    return getResource().toString();
   }
 
   public void setMemory(int memory) {
@@ -90,6 +90,9 @@ public class ResourceInfo {
   }
 
   public Resource getResource() {
+    if (resources == null) {
+      resources = Resource.newInstance(memory, vCores);
+    }
     return Resource.newInstance(resources);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to