Repository: incubator-myriad
Updated Branches:
  refs/heads/master 79ba4a5f0 -> 4bce035ec


[Myriad 188] - NodeManager switch to UNHEALTHY causes NPE on ResourceManager.

JIRA:
  [Myriad-188] https://issues.apache.org/jira/browse/MYRIAD-188
  [Myriad-156] https://issues.apache.org/jira/browse/MYRIAD-156

Pull Request:
  Closes #62

Author:
  darinj dar...@apache.org


Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/4bce035e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/4bce035e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/4bce035e

Branch: refs/heads/master
Commit: 4bce035ec8feff66f4804cc121c3051f5ecff4fa
Parents: 79ba4a5
Author: darinj <darinj.w...@gmail.com>
Authored: Tue Mar 8 05:51:51 2016 -0500
Committer: darinj <darinj.w...@gmail.com>
Committed: Sun Mar 13 17:03:54 2016 -0400

----------------------------------------------------------------------
 .../myriad/scheduler/fgs/YarnNodeCapacityManager.java   | 12 +++++++++---
 .../apache/myriad/scheduler/fgs/FGSTestBaseSpec.groovy  |  7 +++++--
 .../scheduler/fgs/YarnNodeCapacityManagerSpec.groovy    |  5 +++--
 3 files changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/4bce035e/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
index 1a5d185..1dee5fa 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
@@ -70,6 +70,7 @@ import org.slf4j.LoggerFactory;
  */
 public class YarnNodeCapacityManager extends BaseInterceptor {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(YarnNodeCapacityManager.class);
+
   private final AbstractYarnScheduler yarnScheduler;
   private final RMContext rmContext;
   private final MyriadDriver myriadDriver;
@@ -247,9 +248,14 @@ public class YarnNodeCapacityManager extends 
BaseInterceptor {
       
rmNode.getTotalCapability().setVirtualCores(newCapacity.getVirtualCores());
       LOGGER.debug("Setting capacity for node {} to {}", rmNode.getHostName(), 
newCapacity);
       // updates the scheduler with the new capacity for the NM.
-      // the event is handled by the scheduler asynchronously
-      rmContext.getDispatcher().getEventHandler().handle(new 
NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption.newInstance(
-          rmNode.getTotalCapability(), 
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
+      synchronized (yarnScheduler) {
+        if (yarnScheduler.getSchedulerNode(rmNode.getNodeID()) != null) {
+          yarnScheduler.updateNodeResource(rmNode,
+              ResourceOption.newInstance(rmNode.getTotalCapability(), 
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT));
+        } else {
+          LOGGER.info("Yarn Scheduler doesn't have node {}, probably 
UNHEALTHY", rmNode.getNodeID());
+        }
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/4bce035e/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/FGSTestBaseSpec.groovy
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/FGSTestBaseSpec.groovy
 
b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/FGSTestBaseSpec.groovy
index 5b15e59..c769999 100644
--- 
a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/FGSTestBaseSpec.groovy
+++ 
b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/FGSTestBaseSpec.groovy
@@ -33,6 +33,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode
 import org.apache.hadoop.yarn.util.resource.Resources
 import org.apache.mesos.Protos
 import org.apache.mesos.SchedulerDriver
@@ -65,6 +66,7 @@ class FGSTestBaseSpec extends Specification {
 
     def rmNodes = new ConcurrentHashMap<NodeId, RMNode>()
 
+
     RMNode getRMNode(int cpu, int mem, String host, Protos.SlaveID slaveId) {
         RMNode rmNode = MockNodes.newNodeInfo(0, Resources.createResource(mem, 
cpu), 0, host)
         if (rmNodes[rmNode.getNodeID()]) {
@@ -80,11 +82,9 @@ class FGSTestBaseSpec extends Specification {
 
     SchedulerNode getSchedulerNode(RMNode rmNode) {
         SchedulerNode schedulerNode = new SchedulerNode(rmNode, false) {
-
             @Override
             void reserveResource(SchedulerApplicationAttempt attempt, Priority 
priority, RMContainer container) {
             }
-
             @Override
             void unreserveResource(SchedulerApplicationAttempt attempt) {
             }
@@ -92,6 +92,7 @@ class FGSTestBaseSpec extends Specification {
         return schedulerNode
     }
 
+
     /******************* RMContext Related ****************/
 
     def publisher = Mock(SystemMetricsPublisher) {}
@@ -143,6 +144,8 @@ class FGSTestBaseSpec extends Specification {
 
     AbstractYarnScheduler yarnScheduler = Mock(AbstractYarnScheduler) {
         getRMContainer(_ as ContainerId) >> { ContainerId cid -> 
fgsContainers.get(cid).rmContainer }
+        getSchedulerNode(_ as NodeId) >> { NodeId nodeId -> 
getSchedulerNode(rmNodes.get(nodeId)) }
+        updateNodeResource(_ as RMNode, _ as ResourceOption) >> { }
     }
 
     FGSContainer getFGSContainer(RMNode node, int cid, int cpu, int mem, 
ContainerState state) {

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/4bce035e/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
 
b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
index 57e6384..5d59c68 100644
--- 
a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
+++ 
b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
@@ -19,6 +19,8 @@
 package org.apache.myriad.scheduler.fgs
 
 import org.apache.hadoop.yarn.api.records.ContainerState
+import org.apache.hadoop.yarn.api.records.ResourceOption
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent
 import org.apache.hadoop.yarn.util.resource.Resources
 import org.apache.mesos.Protos
@@ -115,7 +117,7 @@ class YarnNodeCapacityManagerSpec extends FGSTestBaseSpec {
         then:
         zeroNM.getTotalCapability().getMemory() == 2048
         zeroNM.getTotalCapability().getVirtualCores() == 2
-        1 * rmContext.getDispatcher().getEventHandler().handle(_ as 
NodeResourceUpdateSchedulerEvent)
+        1 * yarnScheduler.updateNodeResource( _ as RMNode, _ as ResourceOption)
     }
 
     YarnNodeCapacityManager getYarnNodeCapacityManager() {
@@ -137,6 +139,5 @@ class YarnNodeCapacityManagerSpec extends FGSTestBaseSpec {
         def taskUtils = new TaskUtils(cfg)
         return new YarnNodeCapacityManager(registry, yarnScheduler, rmContext,
                 myriadDriver, offerLifecycleManager, nodeStore, state, 
taskUtils)
-
     }
 }

Reply via email to