afchung commented on a change in pull request #3717:
URL: https://github.com/apache/hadoop/pull/3717#discussion_r763059347
##########
File path:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/ClusterNode.java
##########
@@ -20,74 +20,239 @@
import java.util.Collection;
import java.util.HashSet;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
/**
* Represents a node in the cluster from the NodeQueueLoadMonitor's perspective
*/
public class ClusterNode {
- private final AtomicInteger queueLength = new AtomicInteger(0);
- private final AtomicInteger queueWaitTime = new AtomicInteger(-1);
+ private int queueLength = 0;
+ private int queueWaitTime = -1;
private long timestamp;
final NodeId nodeId;
private int queueCapacity = 0;
private final HashSet<String> labels;
+ private Resource capability = Resources.none();
+ private Resource allocatedResource = Resources.none();
+ private final ReentrantReadWriteLock.WriteLock writeLock;
+ private final ReentrantReadWriteLock.ReadLock readLock;
public ClusterNode(NodeId nodeId) {
this.nodeId = nodeId;
this.labels = new HashSet<>();
+ final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ writeLock = lock.writeLock();
+ readLock = lock.readLock();
updateTimestamp();
}
+ public ClusterNode setCapability(Resource nodeCapability) {
+ writeLock.lock();
+ try {
+ if (nodeCapability == null) {
+ this.capability = Resources.none();
+ } else {
+ this.capability = nodeCapability;
+ }
+ return this;
+ }
+ finally {
+ writeLock.unlock();
+ }
+ }
+
+ public ClusterNode setAllocatedResource(
+ Resource allocResource) {
+ writeLock.lock();
+ try {
+ if (allocResource == null) {
+ this.allocatedResource = Resources.none();
+ } else {
+ this.allocatedResource = allocResource;
+ }
+ return this;
+ }
+ finally {
+ writeLock.unlock();
+ }
+ }
+
+ public Resource getAllocatedResource() {
+ readLock.lock();
+ try {
+ return this.allocatedResource;
+ }
+ finally {
+ readLock.unlock();
+ }
+ }
+
+ public Resource getAvailableResource() {
+ readLock.lock();
+ try {
+ return Resources.subtractNonNegative(capability, allocatedResource);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public Resource getCapability() {
+ readLock.lock();
+ try {
+ return this.capability;
+ }
+ finally {
+ readLock.unlock();
+ }
+ }
+
public ClusterNode setQueueLength(int qLength) {
- this.queueLength.set(qLength);
- return this;
+ writeLock.lock();
+ try {
+ this.queueLength = qLength;
+ return this;
+ } finally {
+ writeLock.unlock();
+ }
}
public ClusterNode setQueueWaitTime(int wTime) {
- this.queueWaitTime.set(wTime);
- return this;
+ writeLock.lock();
+ try {
+ this.queueWaitTime = wTime;
+ return this;
+ } finally {
+ writeLock.unlock();
+ }
}
public ClusterNode updateTimestamp() {
- this.timestamp = System.currentTimeMillis();
- return this;
+ writeLock.lock();
+ try {
+ this.timestamp = System.currentTimeMillis();
+ return this;
+ } finally {
+ writeLock.unlock();
+ }
}
public ClusterNode setQueueCapacity(int capacity) {
- this.queueCapacity = capacity;
- return this;
+ writeLock.lock();
+ try {
+ this.queueCapacity = capacity;
+ return this;
+ } finally {
+ writeLock.unlock();
+ }
}
public ClusterNode setNodeLabels(Collection<String> labelsToAdd) {
- labels.clear();
- labels.addAll(labelsToAdd);
- return this;
+ writeLock.lock();
+ try {
+ labels.clear();
+ labels.addAll(labelsToAdd);
+ return this;
+ } finally {
+ writeLock.unlock();
+ }
}
public boolean hasLabel(String label) {
- return this.labels.contains(label);
+ readLock.lock();
+ try {
+ return this.labels.contains(label);
+ } finally {
+ readLock.unlock();
+ }
}
public long getTimestamp() {
- return this.timestamp;
+ readLock.lock();
+ try {
+ return this.timestamp;
+ } finally {
+ readLock.unlock();
+ }
}
- public AtomicInteger getQueueLength() {
- return this.queueLength;
+ public int getQueueLength() {
+ readLock.lock();
+ try {
+ return this.queueLength;
+ } finally {
+ readLock.unlock();
+ }
}
- public AtomicInteger getQueueWaitTime() {
- return this.queueWaitTime;
+ public int getQueueWaitTime() {
+ readLock.lock();
+ try {
+ return this.queueWaitTime;
+ } finally {
+ readLock.unlock();
+ }
}
public int getQueueCapacity() {
- return this.queueCapacity;
+ readLock.lock();
+ try {
+ return this.queueCapacity;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public boolean compareAndIncrementAllocation(
+ final int incrementQLen,
+ final ResourceCalculator resourceCalculator,
+ final Resource requested) {
+ writeLock.lock();
+ try {
+ final Resource currAvailable = Resources.subtractNonNegative(
+ capability, allocatedResource);
+ if (resourceCalculator.fitsIn(requested, currAvailable)) {
+ allocatedResource = Resources.add(allocatedResource, requested);
+ return true;
+ }
+
+ if (!resourceCalculator.fitsIn(requested, capability)) {
+ // If does not fit at all, do not allocate
+ return false;
+ }
+
+ return compareAndIncrementAllocation(incrementQLen);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public boolean compareAndIncrementAllocation(
Review comment:
Fixed, good catch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]