bibinchundatt commented on a change in pull request #3717:
URL: https://github.com/apache/hadoop/pull/3717#discussion_r761812542



##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
##########
@@ -64,40 +69,165 @@
    * of two Nodes are compared.
    */
   public enum LoadComparator implements Comparator<ClusterNode> {
+    /**
+     * This policy only considers queue length.
+     * When allocating, increments queue length without looking at resources
+     * available on the node, and when sorting, also only sorts by queue 
length.
+     */
     QUEUE_LENGTH,
-    QUEUE_WAIT_TIME;
+    /**
+     * This policy only considers the wait time of containers in the queue.
+     * Neither looks at resources nor at queue length.
+     */
+    QUEUE_WAIT_TIME,
+    /**
+     * This policy considers both queue length and resources.
+     * When allocating, first decrements resources available on a node.
+     * If resources are available, does not place OContainers on the node 
queue.
+     * When sorting, it first sorts by queue length,
+     * then by available resources.
+     */
+    QUEUE_LENGTH_THEN_RESOURCES;
+
+    private Resource clusterResource = Resources.none();
+    private final DominantResourceCalculator resourceCalculator =
+        new DominantResourceCalculator();
+
+    private Resource computeAvailableResource(final ClusterNode clusterNode) {
+      return Resources.subtractNonNegative(
+          clusterNode.getCapability(),
+          clusterNode.getAllocatedResource());
+    }
 
     @Override
     public int compare(ClusterNode o1, ClusterNode o2) {
-      if (getMetric(o1) == getMetric(o2)) {
-        return (int)(o2.getTimestamp() - o1.getTimestamp());
+      int diff = 0;
+      switch (this) {
+      case QUEUE_LENGTH_THEN_RESOURCES:
+        diff = o1.getQueueLength().get() - o2.getQueueLength().get();
+        if (diff != 0) {
+          break;
+        }
+
+        // NOTE: cluster resource should be
+        // set always before LoadComparator is used
+        final Resource availableResource1 = computeAvailableResource(o1);
+        final Resource availableResource2 = computeAvailableResource(o2);
+        final boolean isClusterResourceLeq0 = resourceCalculator
+            .isAnyMajorResourceZeroOrNegative(clusterResource);
+        if (!isClusterResourceLeq0) {
+          // Takes the least available resource of the two nodes,
+          // normalized to the overall cluster resource
+          final float availableRatio1 =
+              resourceCalculator.minRatio(availableResource1, clusterResource);
+          final float availableRatio2 =
+              resourceCalculator.minRatio(availableResource2, clusterResource);
+
+          // The one with more available resources should be placed first
+          diff = Precision
+              .compareTo(availableRatio2, availableRatio1, Precision.EPSILON);
+        }
+
+        if (diff == 0) {
+          // Compare absolute value if ratios are the same
+          diff = availableResource2.getVirtualCores() - 
availableResource1.getVirtualCores();
+        }
+
+        if (diff == 0) {
+          diff = Long.compare(availableResource2.getMemorySize(),
+              availableResource1.getMemorySize());
+        }
+        break;
+      case QUEUE_WAIT_TIME:
+      case QUEUE_LENGTH:
+      default:
+        diff = getMetric(o1) - getMetric(o2);
+        break;
+      }
+
+      if (diff == 0) {
+        return (int) (o2.getTimestamp() - o1.getTimestamp());
       }
-      return getMetric(o1) - getMetric(o2);
+      return diff;
+    }
+
+    private void setClusterResource(Resource clusterResource) {
+      this.clusterResource = clusterResource;
+    }
+
+    public ResourceCalculator getResourceCalculator() {
+      return resourceCalculator;
     }
 
     public int getMetric(ClusterNode c) {
-      return (this == QUEUE_LENGTH) ?
-          c.getQueueLength().get() : c.getQueueWaitTime().get();
+      switch (this) {
+      case QUEUE_WAIT_TIME:
+        return c.getQueueWaitTime().get();
+      case QUEUE_LENGTH:
+      case QUEUE_LENGTH_THEN_RESOURCES:
+      default:
+        return c.getQueueLength().get();
+      }
     }
 
     /**
      * Increment the metric by a delta if it is below the threshold.
      * @param c ClusterNode
      * @param incrementSize increment size
+     * @param requested the requested resource
      * @return true if the metric was below threshold and was incremented.
      */
-    public boolean compareAndIncrement(ClusterNode c, int incrementSize) {
-      if(this == QUEUE_LENGTH) {
-        int ret = c.getQueueLength().addAndGet(incrementSize);
-        if (ret <= c.getQueueCapacity()) {
+    public boolean compareAndIncrement(
+        ClusterNode c, int incrementSize, Resource requested) {
+      if (this == QUEUE_LENGTH_THEN_RESOURCES) {
+        // Assignment and getting value is atomic
+        // Can be slightly inaccurate here, don't grab lock for performance
+        final Resource capability = c.getCapability();
+        final Resource currAllocated = c.getAllocatedResource();
+        final Resource currAvailable = Resources.subtractNonNegative(
+            capability, currAllocated);
+        if (resourceCalculator.fitsIn(requested, currAvailable)) {
+          final Resource newAllocated = Resources.add(currAllocated, 
requested);
+          c.setAllocatedResource(newAllocated);

Review comment:
       with multiple application running in parallel the allocated resource 
update could be wrong here. Do we take clusterNode level lock ??




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to