Repository: incubator-myriad Updated Branches: refs/heads/master fe493af32 -> 12a679c2b
Continuation of Myriad 171, started to work towards public methods incrementResources and decrementResources, as it's easier to reason about purely additve functions in multithreaded environments. Fixed minor bugs in previous Myriad-171 patch, placed the guard for Node Managers having negative resources in setNodeCapacity and reverted back any from calling yarnScheduler.updateNode directly. Very well tested. Todo: Figure out how to make setNodeCapacity private. JIRA: [Myriad-171] https://issues.apache.org/jira/browse/MYRIAD-171 Pull Request: Closes #70 Author: DarinJ <dar...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/12a679c2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/12a679c2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/12a679c2 Branch: refs/heads/master Commit: 12a679c2b05e550e6edabe2a0033aaf206786e6f Parents: fe493af Author: darinj <darinj.w...@gmail.com> Authored: Tue May 10 16:47:02 2016 -0400 Committer: darinj <dar...@apache.org> Committed: Thu May 12 00:23:27 2016 -0400 ---------------------------------------------------------------------- .../scheduler/fgs/YarnNodeCapacityManager.java | 60 ++++++++++++++------ .../fgs/YarnNodeCapacityManagerSpec.groovy | 2 +- 2 files changed, 43 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/12a679c2/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java index 1dee5fa..e922fc6 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java @@ -24,6 +24,8 @@ import com.google.common.collect.Sets; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import javax.inject.Inject; import org.apache.hadoop.yarn.api.records.Container; @@ -77,6 +79,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor { private final OfferLifecycleManager offerLifecycleMgr; private final NodeStore nodeStore; private final SchedulerState state; + private static final Lock yarnSchedulerLock = new ReentrantLock(); private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0); private TaskUtils taskUtils; @@ -123,7 +126,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor { } } - private synchronized void removeYarnTask(RMContainer rmContainer) { + private void removeYarnTask(RMContainer rmContainer) { if (rmContainer != null && rmContainer.getContainer() != null) { Protos.TaskID taskId = containerToTaskId(rmContainer); //TODO (darinj) Reliable messaging @@ -134,8 +137,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor { if (node != null) { RMNode rmNode = node.getNode().getRMNode(); Resource resource = rmContainer.getContainer().getResource(); - Resource diff = ResourceUtils.componentwiseMax(ZERO_RESOURCE, Resources.subtract(rmNode.getTotalCapability(), resource)); - setNodeCapacity(rmNode, diff); + decrementNodeCapacity(rmNode, resource); LOGGER.info("Removed task yarn_{} with exit status freeing {} cpu and {} mem.", rmContainer.getContainer().toString(), rmContainer.getContainerExitStatus(), resource.getVirtualCores(), resource.getMemory()); } else { @@ -206,8 +208,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor { for (Protos.Offer offer : consumedOffer.getOffers()) { offerLifecycleMgr.declineOffer(offer); } - setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), OfferUtils.getYarnResourcesFromMesosOffers( - consumedOffer.getOffers()))); + decrementNodeCapacity(rmNode, OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers())); } else { LOGGER.debug("Containers allocated using Mesos offers for host: {} count: {}", host, containersAllocatedByMesosOffer.size()); @@ -223,8 +224,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor { // Reduce node capacity to account for unused offers Resource resOffered = OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers()); Resource resUnused = Resources.subtract(resOffered, resUsed); - setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), resUnused)); - + decrementNodeCapacity(rmNode, resUnused); myriadDriver.getDriver().launchTasks(consumedOffer.getOfferIds(), tasks); } @@ -232,6 +232,15 @@ public class YarnNodeCapacityManager extends BaseInterceptor { node.removeContainerSnapshot(); } + + public void incrementNodeCapacity(RMNode rmNode, Resource addedCapacity) { + setNodeCapacity(rmNode, Resources.add(rmNode.getTotalCapability(), addedCapacity)); + } + + public void decrementNodeCapacity(RMNode rmNode, Resource removedCapacity) { + setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), removedCapacity)); + } + /** * 1. Updates {@link RMNode#getTotalCapability()} with newCapacity. * 2. Sends out a {@link NodeResourceUpdateSchedulerEvent} that's handled by YARN's scheduler. @@ -243,19 +252,34 @@ public class YarnNodeCapacityManager extends BaseInterceptor { @SuppressWarnings("unchecked") public void setNodeCapacity(RMNode rmNode, Resource newCapacity) { //NOOP prevent YARN warning changing to same size - if (!Resources.equals(rmNode.getTotalCapability(), newCapacity)) { - rmNode.getTotalCapability().setMemory(newCapacity.getMemory()); - rmNode.getTotalCapability().setVirtualCores(newCapacity.getVirtualCores()); - LOGGER.debug("Setting capacity for node {} to {}", rmNode.getHostName(), newCapacity); - // updates the scheduler with the new capacity for the NM. - synchronized (yarnScheduler) { - if (yarnScheduler.getSchedulerNode(rmNode.getNodeID()) != null) { - yarnScheduler.updateNodeResource(rmNode, - ResourceOption.newInstance(rmNode.getTotalCapability(), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)); - } else { - LOGGER.info("Yarn Scheduler doesn't have node {}, probably UNHEALTHY", rmNode.getNodeID()); + if ((Resources.equals(rmNode.getTotalCapability(), newCapacity))) { + return; + } + if (yarnScheduler.getSchedulerNode(rmNode.getNodeID()) == null) { + LOGGER.info("Yarn Scheduler doesn't have node {}, probably UNHEALTHY", rmNode.getNodeID()); + return; + } + yarnSchedulerLock.lock(); + try { + if (newCapacity.getMemory() < 0 || newCapacity.getVirtualCores() < 0) { + Resource zeroed = ResourceUtils.componentwiseMax(ZERO_RESOURCE, newCapacity); + rmNode.getTotalCapability().setMemory(zeroed.getMemory()); + rmNode.getTotalCapability().setVirtualCores(zeroed.getVirtualCores()); + LOGGER.warn("Asked to set Node {} to a value less than zero! Had {}, setting to {}.", + rmNode.getHttpAddress(), rmNode.getTotalCapability().toString(), zeroed.toString()); + } else { + rmNode.getTotalCapability().setMemory(newCapacity.getMemory()); + rmNode.getTotalCapability().setVirtualCores(newCapacity.getVirtualCores()); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("Setting capacity for node {} to {}", rmNode.getHostName(), newCapacity); } } + // updates the scheduler with the new capacity for the NM. + // the event is handled by the scheduler asynchronously + rmContext.getDispatcher().getEventHandler().handle(new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption.newInstance( + rmNode.getTotalCapability(), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT))); + } finally { + yarnSchedulerLock.unlock(); } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/12a679c2/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy index 5d59c68..f7d8c43 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy +++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy @@ -117,7 +117,7 @@ class YarnNodeCapacityManagerSpec extends FGSTestBaseSpec { then: zeroNM.getTotalCapability().getMemory() == 2048 zeroNM.getTotalCapability().getVirtualCores() == 2 - 1 * yarnScheduler.updateNodeResource( _ as RMNode, _ as ResourceOption) + 1 * rmContext.getDispatcher().getEventHandler().handle(_ as NodeResourceUpdateSchedulerEvent) } YarnNodeCapacityManager getYarnNodeCapacityManager() {