Repository: incubator-myriad Updated Branches: refs/heads/master 79ba4a5f0 -> 4bce035ec
[Myriad 188] - NodeManager switch to UNHEALTHY causes NPE on ResourceManager. JIRA: [Myriad-188] https://issues.apache.org/jira/browse/MYRIAD-188 [Myriad-156] https://issues.apache.org/jira/browse/MYRIAD-156 Pull Request: Closes #62 Author: darinj dar...@apache.org Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/4bce035e Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/4bce035e Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/4bce035e Branch: refs/heads/master Commit: 4bce035ec8feff66f4804cc121c3051f5ecff4fa Parents: 79ba4a5 Author: darinj <darinj.w...@gmail.com> Authored: Tue Mar 8 05:51:51 2016 -0500 Committer: darinj <darinj.w...@gmail.com> Committed: Sun Mar 13 17:03:54 2016 -0400 ---------------------------------------------------------------------- .../myriad/scheduler/fgs/YarnNodeCapacityManager.java | 12 +++++++++--- .../apache/myriad/scheduler/fgs/FGSTestBaseSpec.groovy | 7 +++++-- .../scheduler/fgs/YarnNodeCapacityManagerSpec.groovy | 5 +++-- 3 files changed, 17 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/4bce035e/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java index 1a5d185..1dee5fa 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java @@ -70,6 +70,7 @@ import org.slf4j.LoggerFactory; */ public class YarnNodeCapacityManager extends BaseInterceptor { private static final Logger LOGGER = LoggerFactory.getLogger(YarnNodeCapacityManager.class); + private final AbstractYarnScheduler yarnScheduler; private final RMContext rmContext; private final MyriadDriver myriadDriver; @@ -247,9 +248,14 @@ public class YarnNodeCapacityManager extends BaseInterceptor { rmNode.getTotalCapability().setVirtualCores(newCapacity.getVirtualCores()); LOGGER.debug("Setting capacity for node {} to {}", rmNode.getHostName(), newCapacity); // updates the scheduler with the new capacity for the NM. - // the event is handled by the scheduler asynchronously - rmContext.getDispatcher().getEventHandler().handle(new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption.newInstance( - rmNode.getTotalCapability(), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT))); + synchronized (yarnScheduler) { + if (yarnScheduler.getSchedulerNode(rmNode.getNodeID()) != null) { + yarnScheduler.updateNodeResource(rmNode, + ResourceOption.newInstance(rmNode.getTotalCapability(), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)); + } else { + LOGGER.info("Yarn Scheduler doesn't have node {}, probably UNHEALTHY", rmNode.getNodeID()); + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/4bce035e/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/FGSTestBaseSpec.groovy ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/FGSTestBaseSpec.groovy b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/FGSTestBaseSpec.groovy index 5b15e59..c769999 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/FGSTestBaseSpec.groovy +++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/FGSTestBaseSpec.groovy @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode import org.apache.hadoop.yarn.util.resource.Resources import org.apache.mesos.Protos import org.apache.mesos.SchedulerDriver @@ -65,6 +66,7 @@ class FGSTestBaseSpec extends Specification { def rmNodes = new ConcurrentHashMap<NodeId, RMNode>() + RMNode getRMNode(int cpu, int mem, String host, Protos.SlaveID slaveId) { RMNode rmNode = MockNodes.newNodeInfo(0, Resources.createResource(mem, cpu), 0, host) if (rmNodes[rmNode.getNodeID()]) { @@ -80,11 +82,9 @@ class FGSTestBaseSpec extends Specification { SchedulerNode getSchedulerNode(RMNode rmNode) { SchedulerNode schedulerNode = new SchedulerNode(rmNode, false) { - @Override void reserveResource(SchedulerApplicationAttempt attempt, Priority priority, RMContainer container) { } - @Override void unreserveResource(SchedulerApplicationAttempt attempt) { } @@ -92,6 +92,7 @@ class FGSTestBaseSpec extends Specification { return schedulerNode } + /******************* RMContext Related ****************/ def publisher = Mock(SystemMetricsPublisher) {} @@ -143,6 +144,8 @@ class FGSTestBaseSpec extends Specification { AbstractYarnScheduler yarnScheduler = Mock(AbstractYarnScheduler) { getRMContainer(_ as ContainerId) >> { ContainerId cid -> fgsContainers.get(cid).rmContainer } + getSchedulerNode(_ as NodeId) >> { NodeId nodeId -> getSchedulerNode(rmNodes.get(nodeId)) } + updateNodeResource(_ as RMNode, _ as ResourceOption) >> { } } FGSContainer getFGSContainer(RMNode node, int cid, int cpu, int mem, ContainerState state) { http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/4bce035e/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy index 57e6384..5d59c68 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy +++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy @@ -19,6 +19,8 @@ package org.apache.myriad.scheduler.fgs import org.apache.hadoop.yarn.api.records.ContainerState +import org.apache.hadoop.yarn.api.records.ResourceOption +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent import org.apache.hadoop.yarn.util.resource.Resources import org.apache.mesos.Protos @@ -115,7 +117,7 @@ class YarnNodeCapacityManagerSpec extends FGSTestBaseSpec { then: zeroNM.getTotalCapability().getMemory() == 2048 zeroNM.getTotalCapability().getVirtualCores() == 2 - 1 * rmContext.getDispatcher().getEventHandler().handle(_ as NodeResourceUpdateSchedulerEvent) + 1 * yarnScheduler.updateNodeResource( _ as RMNode, _ as ResourceOption) } YarnNodeCapacityManager getYarnNodeCapacityManager() { @@ -137,6 +139,5 @@ class YarnNodeCapacityManagerSpec extends FGSTestBaseSpec { def taskUtils = new TaskUtils(cfg) return new YarnNodeCapacityManager(registry, yarnScheduler, rmContext, myriadDriver, offerLifecycleManager, nodeStore, state, taskUtils) - } }