goiri commented on a change in pull request #3717:
URL: https://github.com/apache/hadoop/pull/3717#discussion_r762122292



##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/ClusterNode.java
##########
@@ -20,74 +20,239 @@
 
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
  * Represents a node in the cluster from the NodeQueueLoadMonitor's perspective
  */
 public class ClusterNode {
-  private final AtomicInteger queueLength = new AtomicInteger(0);
-  private final AtomicInteger queueWaitTime = new AtomicInteger(-1);
+  private int queueLength = 0;
+  private int queueWaitTime = -1;
   private long timestamp;
   final NodeId nodeId;
   private int queueCapacity = 0;
   private final HashSet<String> labels;
+  private Resource capability = Resources.none();
+  private Resource allocatedResource = Resources.none();
+  private final ReentrantReadWriteLock.WriteLock writeLock;
+  private final ReentrantReadWriteLock.ReadLock readLock;
 
   public ClusterNode(NodeId nodeId) {
     this.nodeId = nodeId;
     this.labels = new HashSet<>();
+    final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    writeLock = lock.writeLock();
+    readLock = lock.readLock();
     updateTimestamp();
   }
 
+  public ClusterNode setCapability(Resource nodeCapability) {
+    writeLock.lock();
+    try {
+      if (nodeCapability == null) {
+        this.capability = Resources.none();
+      } else {
+        this.capability = nodeCapability;
+      }
+      return this;
+    }
+    finally {
+      writeLock.unlock();
+    }
+  }
+
+  public ClusterNode setAllocatedResource(
+      Resource allocResource) {
+    writeLock.lock();
+    try {
+      if (allocResource == null) {
+        this.allocatedResource = Resources.none();
+      } else {
+        this.allocatedResource = allocResource;
+      }
+      return this;
+    }
+    finally {
+      writeLock.unlock();
+    }
+  }
+
+  public Resource getAllocatedResource() {
+    readLock.lock();
+    try {
+      return this.allocatedResource;
+    }

Review comment:
       } finally {

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
##########
@@ -64,39 +69,141 @@
    * of two Nodes are compared.
    */
   public enum LoadComparator implements Comparator<ClusterNode> {
+    /**
+     * This policy only considers queue length.
+     * When allocating, increments queue length without looking at resources
+     * available on the node, and when sorting, also only sorts by queue 
length.
+     */
     QUEUE_LENGTH,
-    QUEUE_WAIT_TIME;
+    /**
+     * This policy only considers the wait time of containers in the queue.
+     * Neither looks at resources nor at queue length.
+     */
+    QUEUE_WAIT_TIME,
+    /**
+     * This policy considers both queue length and resources.
+     * When allocating, first decrements resources available on a node.
+     * If resources are available, does not place OContainers on the node 
queue.
+     * When sorting, it first sorts by queue length,
+     * then by available resources.
+     */
+    QUEUE_LENGTH_THEN_RESOURCES;
+
+    private Resource clusterResource = Resources.none();
+    private final DominantResourceCalculator resourceCalculator =
+        new DominantResourceCalculator();
 
     @Override
     public int compare(ClusterNode o1, ClusterNode o2) {
-      if (getMetric(o1) == getMetric(o2)) {
-        return (int)(o2.getTimestamp() - o1.getTimestamp());
+      int diff = 0;
+      switch (this) {
+      case QUEUE_LENGTH_THEN_RESOURCES:
+        diff = o1.getQueueLength() - o2.getQueueLength();
+        if (diff != 0) {
+          break;
+        }
+
+        // NOTE: cluster resource should be

Review comment:
       Do we just go ahead with leaving a note?
   It doesn't seem very maintanable.

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/ClusterNode.java
##########
@@ -20,74 +20,239 @@
 
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
  * Represents a node in the cluster from the NodeQueueLoadMonitor's perspective
  */
 public class ClusterNode {
-  private final AtomicInteger queueLength = new AtomicInteger(0);
-  private final AtomicInteger queueWaitTime = new AtomicInteger(-1);
+  private int queueLength = 0;
+  private int queueWaitTime = -1;
   private long timestamp;
   final NodeId nodeId;
   private int queueCapacity = 0;
   private final HashSet<String> labels;
+  private Resource capability = Resources.none();
+  private Resource allocatedResource = Resources.none();
+  private final ReentrantReadWriteLock.WriteLock writeLock;
+  private final ReentrantReadWriteLock.ReadLock readLock;
 
   public ClusterNode(NodeId nodeId) {
     this.nodeId = nodeId;
     this.labels = new HashSet<>();
+    final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    writeLock = lock.writeLock();
+    readLock = lock.readLock();
     updateTimestamp();
   }
 
+  public ClusterNode setCapability(Resource nodeCapability) {
+    writeLock.lock();
+    try {
+      if (nodeCapability == null) {
+        this.capability = Resources.none();
+      } else {
+        this.capability = nodeCapability;
+      }
+      return this;
+    }
+    finally {
+      writeLock.unlock();
+    }
+  }
+
+  public ClusterNode setAllocatedResource(
+      Resource allocResource) {
+    writeLock.lock();
+    try {
+      if (allocResource == null) {
+        this.allocatedResource = Resources.none();
+      } else {
+        this.allocatedResource = allocResource;
+      }
+      return this;
+    }
+    finally {
+      writeLock.unlock();
+    }
+  }
+
+  public Resource getAllocatedResource() {
+    readLock.lock();
+    try {
+      return this.allocatedResource;
+    }
+    finally {
+      readLock.unlock();
+    }
+  }
+
+  public Resource getAvailableResource() {
+    readLock.lock();
+    try {
+      return Resources.subtractNonNegative(capability, allocatedResource);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public Resource getCapability() {
+    readLock.lock();
+    try {
+      return this.capability;
+    }

Review comment:
       } finally {

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
##########
@@ -64,39 +69,141 @@
    * of two Nodes are compared.
    */
   public enum LoadComparator implements Comparator<ClusterNode> {
+    /**
+     * This policy only considers queue length.
+     * When allocating, increments queue length without looking at resources
+     * available on the node, and when sorting, also only sorts by queue 
length.
+     */
     QUEUE_LENGTH,
-    QUEUE_WAIT_TIME;
+    /**
+     * This policy only considers the wait time of containers in the queue.
+     * Neither looks at resources nor at queue length.
+     */
+    QUEUE_WAIT_TIME,
+    /**
+     * This policy considers both queue length and resources.
+     * When allocating, first decrements resources available on a node.
+     * If resources are available, does not place OContainers on the node 
queue.
+     * When sorting, it first sorts by queue length,
+     * then by available resources.
+     */
+    QUEUE_LENGTH_THEN_RESOURCES;
+
+    private Resource clusterResource = Resources.none();
+    private final DominantResourceCalculator resourceCalculator =
+        new DominantResourceCalculator();
 
     @Override
     public int compare(ClusterNode o1, ClusterNode o2) {
-      if (getMetric(o1) == getMetric(o2)) {
-        return (int)(o2.getTimestamp() - o1.getTimestamp());
+      int diff = 0;
+      switch (this) {
+      case QUEUE_LENGTH_THEN_RESOURCES:
+        diff = o1.getQueueLength() - o2.getQueueLength();

Review comment:
       This math is a little intense.
   It might be better to extract it to a function just for this so you can 
return things right away and have comments throughout.

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
##########
@@ -374,7 +491,7 @@ public RMNode selectRackLocalNode(String rackName, 
Set<String> blacklist) {
     return null;
   }
 
-  public RMNode selectAnyNode(Set<String> blacklist) {
+  public RMNode selectAnyNode(Set<String> blacklist, Resource request) {

Review comment:
       We should add javadocs explaining what these parameters mean.

##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/ClusterNode.java
##########
@@ -20,74 +20,239 @@
 
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
  * Represents a node in the cluster from the NodeQueueLoadMonitor's perspective
  */
 public class ClusterNode {
-  private final AtomicInteger queueLength = new AtomicInteger(0);
-  private final AtomicInteger queueWaitTime = new AtomicInteger(-1);
+  private int queueLength = 0;
+  private int queueWaitTime = -1;
   private long timestamp;
   final NodeId nodeId;
   private int queueCapacity = 0;
   private final HashSet<String> labels;
+  private Resource capability = Resources.none();
+  private Resource allocatedResource = Resources.none();
+  private final ReentrantReadWriteLock.WriteLock writeLock;
+  private final ReentrantReadWriteLock.ReadLock readLock;
 
   public ClusterNode(NodeId nodeId) {
     this.nodeId = nodeId;
     this.labels = new HashSet<>();
+    final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    writeLock = lock.writeLock();
+    readLock = lock.readLock();
     updateTimestamp();
   }
 
+  public ClusterNode setCapability(Resource nodeCapability) {
+    writeLock.lock();
+    try {
+      if (nodeCapability == null) {
+        this.capability = Resources.none();
+      } else {
+        this.capability = nodeCapability;
+      }
+      return this;
+    }
+    finally {
+      writeLock.unlock();
+    }
+  }
+
+  public ClusterNode setAllocatedResource(
+      Resource allocResource) {
+    writeLock.lock();
+    try {
+      if (allocResource == null) {
+        this.allocatedResource = Resources.none();
+      } else {
+        this.allocatedResource = allocResource;
+      }
+      return this;
+    }
+    finally {
+      writeLock.unlock();
+    }
+  }
+
+  public Resource getAllocatedResource() {
+    readLock.lock();
+    try {
+      return this.allocatedResource;
+    }
+    finally {
+      readLock.unlock();
+    }
+  }
+
+  public Resource getAvailableResource() {
+    readLock.lock();
+    try {
+      return Resources.subtractNonNegative(capability, allocatedResource);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public Resource getCapability() {
+    readLock.lock();
+    try {
+      return this.capability;
+    }
+      finally {
+      readLock.unlock();
+    }
+  }
+
   public ClusterNode setQueueLength(int qLength) {
-    this.queueLength.set(qLength);
-    return this;
+    writeLock.lock();
+    try {
+      this.queueLength = qLength;
+      return this;
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   public ClusterNode setQueueWaitTime(int wTime) {
-    this.queueWaitTime.set(wTime);
-    return this;
+    writeLock.lock();
+    try {
+      this.queueWaitTime = wTime;
+      return this;
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   public ClusterNode updateTimestamp() {
-    this.timestamp = System.currentTimeMillis();
-    return this;
+    writeLock.lock();
+    try {
+      this.timestamp = System.currentTimeMillis();
+      return this;
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   public ClusterNode setQueueCapacity(int capacity) {
-    this.queueCapacity = capacity;
-    return this;
+    writeLock.lock();
+    try {
+      this.queueCapacity = capacity;
+      return this;
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   public ClusterNode setNodeLabels(Collection<String> labelsToAdd) {
-    labels.clear();
-    labels.addAll(labelsToAdd);
-    return this;
+    writeLock.lock();
+    try {
+      labels.clear();
+      labels.addAll(labelsToAdd);
+      return this;
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   public boolean hasLabel(String label) {
-    return this.labels.contains(label);
+    readLock.lock();
+    try {
+      return this.labels.contains(label);
+    } finally {
+      readLock.unlock();
+    }
   }
 
   public long getTimestamp() {
-    return this.timestamp;
+    readLock.lock();
+    try {
+      return this.timestamp;
+    } finally {
+      readLock.unlock();
+    }
   }
 
-  public AtomicInteger getQueueLength() {
-    return this.queueLength;
+  public int getQueueLength() {
+    readLock.lock();
+    try {
+      return this.queueLength;
+    } finally {
+      readLock.unlock();
+    }
   }
 
-  public AtomicInteger getQueueWaitTime() {
-    return this.queueWaitTime;
+  public int getQueueWaitTime() {
+    readLock.lock();
+    try {
+      return this.queueWaitTime;
+    } finally {
+      readLock.unlock();
+    }
   }
 
   public int getQueueCapacity() {
-    return this.queueCapacity;
+    readLock.lock();
+    try {
+      return this.queueCapacity;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public boolean compareAndIncrementAllocation(
+      final int incrementQLen,
+      final ResourceCalculator resourceCalculator,
+      final Resource requested) {
+    writeLock.lock();
+    try {
+      final Resource currAvailable = Resources.subtractNonNegative(
+          capability, allocatedResource);
+      if (resourceCalculator.fitsIn(requested, currAvailable)) {
+        allocatedResource = Resources.add(allocatedResource, requested);
+        return true;
+      }
+
+      if (!resourceCalculator.fitsIn(requested, capability)) {
+        // If does not fit at all, do not allocate
+        return false;
+      }
+
+      return compareAndIncrementAllocation(incrementQLen);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public boolean compareAndIncrementAllocation(

Review comment:
       Doesn't this fit in one line?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to