Repository: incubator-myriad
Updated Branches:
  refs/heads/master fe493af32 -> 12a679c2b


Continuation of Myriad 171, started to work towards public methods 
incrementResources and decrementResources, as it's easier to reason about 
purely additve functions in multithreaded environments.  Fixed minor bugs in 
previous Myriad-171 patch, placed the guard for Node Managers having negative 
resources in setNodeCapacity and reverted back any from calling 
yarnScheduler.updateNode directly.  Very well tested.

Todo: Figure out how to make setNodeCapacity private.
JIRA:
  [Myriad-171] https://issues.apache.org/jira/browse/MYRIAD-171
Pull Request:
  Closes #70
Author:
  DarinJ <dar...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/12a679c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/12a679c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/12a679c2

Branch: refs/heads/master
Commit: 12a679c2b05e550e6edabe2a0033aaf206786e6f
Parents: fe493af
Author: darinj <darinj.w...@gmail.com>
Authored: Tue May 10 16:47:02 2016 -0400
Committer: darinj <dar...@apache.org>
Committed: Thu May 12 00:23:27 2016 -0400

----------------------------------------------------------------------
 .../scheduler/fgs/YarnNodeCapacityManager.java  | 60 ++++++++++++++------
 .../fgs/YarnNodeCapacityManagerSpec.groovy      |  2 +-
 2 files changed, 43 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/12a679c2/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
index 1dee5fa..e922fc6 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
@@ -24,6 +24,8 @@ import com.google.common.collect.Sets;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.inject.Inject;
 
 import org.apache.hadoop.yarn.api.records.Container;
@@ -77,6 +79,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor {
   private final OfferLifecycleManager offerLifecycleMgr;
   private final NodeStore nodeStore;
   private final SchedulerState state;
+  private static final Lock yarnSchedulerLock = new ReentrantLock();
   private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
   private TaskUtils taskUtils;
 
@@ -123,7 +126,7 @@ public class YarnNodeCapacityManager extends 
BaseInterceptor {
     }
   }
 
-  private synchronized void removeYarnTask(RMContainer rmContainer) {
+  private void removeYarnTask(RMContainer rmContainer) {
     if (rmContainer != null && rmContainer.getContainer() != null) {
       Protos.TaskID taskId = containerToTaskId(rmContainer);
       //TODO (darinj) Reliable messaging
@@ -134,8 +137,7 @@ public class YarnNodeCapacityManager extends 
BaseInterceptor {
       if (node != null) {
         RMNode rmNode = node.getNode().getRMNode();
         Resource resource = rmContainer.getContainer().getResource();
-        Resource diff = ResourceUtils.componentwiseMax(ZERO_RESOURCE, 
Resources.subtract(rmNode.getTotalCapability(), resource));
-        setNodeCapacity(rmNode, diff);
+        decrementNodeCapacity(rmNode, resource);
         LOGGER.info("Removed task yarn_{} with exit status freeing {} cpu and 
{} mem.", rmContainer.getContainer().toString(),
             rmContainer.getContainerExitStatus(), resource.getVirtualCores(), 
resource.getMemory());
       } else {
@@ -206,8 +208,7 @@ public class YarnNodeCapacityManager extends 
BaseInterceptor {
       for (Protos.Offer offer : consumedOffer.getOffers()) {
         offerLifecycleMgr.declineOffer(offer);
       }
-      setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), 
OfferUtils.getYarnResourcesFromMesosOffers(
-          consumedOffer.getOffers())));
+      decrementNodeCapacity(rmNode, 
OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers()));
     } else {
       LOGGER.debug("Containers allocated using Mesos offers for host: {} 
count: {}", host, containersAllocatedByMesosOffer.size());
 
@@ -223,8 +224,7 @@ public class YarnNodeCapacityManager extends 
BaseInterceptor {
       // Reduce node capacity to account for unused offers
       Resource resOffered = 
OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers());
       Resource resUnused = Resources.subtract(resOffered, resUsed);
-      setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), 
resUnused));
-
+      decrementNodeCapacity(rmNode, resUnused);
       myriadDriver.getDriver().launchTasks(consumedOffer.getOfferIds(), tasks);
     }
 
@@ -232,6 +232,15 @@ public class YarnNodeCapacityManager extends 
BaseInterceptor {
     node.removeContainerSnapshot();
   }
 
+
+  public void incrementNodeCapacity(RMNode rmNode, Resource addedCapacity) {
+    setNodeCapacity(rmNode, Resources.add(rmNode.getTotalCapability(), 
addedCapacity));
+  }
+
+  public void decrementNodeCapacity(RMNode rmNode, Resource removedCapacity) {
+    setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), 
removedCapacity));
+  }
+
   /**
    * 1. Updates {@link RMNode#getTotalCapability()} with newCapacity.
    * 2. Sends out a {@link NodeResourceUpdateSchedulerEvent} that's handled by 
YARN's scheduler.
@@ -243,19 +252,34 @@ public class YarnNodeCapacityManager extends 
BaseInterceptor {
   @SuppressWarnings("unchecked")
   public void setNodeCapacity(RMNode rmNode, Resource newCapacity) {
     //NOOP prevent YARN warning changing to same size
-    if (!Resources.equals(rmNode.getTotalCapability(), newCapacity)) {
-      rmNode.getTotalCapability().setMemory(newCapacity.getMemory());
-      
rmNode.getTotalCapability().setVirtualCores(newCapacity.getVirtualCores());
-      LOGGER.debug("Setting capacity for node {} to {}", rmNode.getHostName(), 
newCapacity);
-      // updates the scheduler with the new capacity for the NM.
-      synchronized (yarnScheduler) {
-        if (yarnScheduler.getSchedulerNode(rmNode.getNodeID()) != null) {
-          yarnScheduler.updateNodeResource(rmNode,
-              ResourceOption.newInstance(rmNode.getTotalCapability(), 
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT));
-        } else {
-          LOGGER.info("Yarn Scheduler doesn't have node {}, probably 
UNHEALTHY", rmNode.getNodeID());
+    if ((Resources.equals(rmNode.getTotalCapability(), newCapacity))) {
+      return;
+    }
+    if (yarnScheduler.getSchedulerNode(rmNode.getNodeID()) == null) {
+      LOGGER.info("Yarn Scheduler doesn't have node {}, probably UNHEALTHY", 
rmNode.getNodeID());
+      return;
+    }
+    yarnSchedulerLock.lock();
+    try {
+      if (newCapacity.getMemory() < 0 || newCapacity.getVirtualCores() < 0) {
+        Resource zeroed = ResourceUtils.componentwiseMax(ZERO_RESOURCE, 
newCapacity);
+        rmNode.getTotalCapability().setMemory(zeroed.getMemory());
+        rmNode.getTotalCapability().setVirtualCores(zeroed.getVirtualCores());
+        LOGGER.warn("Asked to set Node {} to a value less than zero!  Had {}, 
setting to {}.",
+            rmNode.getHttpAddress(), rmNode.getTotalCapability().toString(), 
zeroed.toString());
+      } else {
+        rmNode.getTotalCapability().setMemory(newCapacity.getMemory());
+        
rmNode.getTotalCapability().setVirtualCores(newCapacity.getVirtualCores());
+        if (LOGGER.isInfoEnabled()) {
+          LOGGER.info("Setting capacity for node {} to {}", 
rmNode.getHostName(), newCapacity);
         }
       }
+      // updates the scheduler with the new capacity for the NM.
+      // the event is handled by the scheduler asynchronously
+      rmContext.getDispatcher().getEventHandler().handle(new 
NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption.newInstance(
+          rmNode.getTotalCapability(), 
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
+    } finally {
+      yarnSchedulerLock.unlock();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/12a679c2/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
 
b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
index 5d59c68..f7d8c43 100644
--- 
a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
+++ 
b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
@@ -117,7 +117,7 @@ class YarnNodeCapacityManagerSpec extends FGSTestBaseSpec {
         then:
         zeroNM.getTotalCapability().getMemory() == 2048
         zeroNM.getTotalCapability().getVirtualCores() == 2
-        1 * yarnScheduler.updateNodeResource( _ as RMNode, _ as ResourceOption)
+        1 * rmContext.getDispatcher().getEventHandler().handle(_ as 
NodeResourceUpdateSchedulerEvent)
     }
 
     YarnNodeCapacityManager getYarnNodeCapacityManager() {

Reply via email to