http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandlerSpec.groovy ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandlerSpec.groovy b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandlerSpec.groovy new file mode 100644 index 0000000..4505676 --- /dev/null +++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandlerSpec.groovy @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.fgs + +import org.apache.hadoop.yarn.api.records.ContainerState +import org.apache.hadoop.yarn.api.records.ContainerStatus +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent +import org.apache.hadoop.yarn.util.resource.Resources +import org.apache.mesos.Protos +import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry +import org.apache.myriad.state.SchedulerState +import org.slf4j.Logger + +/** + * + * Tests for NMHeartBeatHandler + * + */ +class NMHeartBeatHandlerSpec extends FGSTestBaseSpec { + + def "Node Manager registration"() { + given: + def hbHandler = getNMHeartBeatHandler() + hbHandler.logger = Mock(Logger) + + def nonZeroNM = getRMNode(2, 2048, "test_host1", null) + def zeroNM = getRMNode(0, 0, "test_host2", null) + + when: + hbHandler.beforeRMNodeEventHandled(getNMRegistrationEvent(nonZeroNM), rmContext) + + then: + 1 * hbHandler.logger.warn('FineGrainedScaling feature got invoked for a NM with non-zero capacity. ' + + 'Host: {}, Mem: {}, CPU: {}. Setting the NM\'s capacity to (0G,0CPU)', 'test_host1', 2048, 2) + nonZeroNM.getTotalCapability().getMemory() == 0 + nonZeroNM.getTotalCapability().getVirtualCores() == 0 + + when: + hbHandler.beforeRMNodeEventHandled(getNMRegistrationEvent(zeroNM), rmContext) + + then: + 0 * hbHandler.logger.warn(_) // no logger.warn invoked + nonZeroNM.getTotalCapability().getMemory() == 0 + nonZeroNM.getTotalCapability().getVirtualCores() == 0 + } + + def "Node Manager HeartBeat"() { + given: + def host = "test_host" + def slaveId = Protos.SlaveID.newBuilder().setValue(host + "_slave_id").build() + def zeroNM = getRMNode(0, 0, host, slaveId) + + def fgsContainer1 = getFGSContainer(zeroNM, 1, 1, 1024, ContainerState.RUNNING) + def fgsContainer2 = getFGSContainer(zeroNM, 2, 1, 1024, ContainerState.COMPLETE) + def fgsContainer3 = getFGSContainer(zeroNM, 3, 1, 1024, ContainerState.RUNNING) + + addOfferToFeed(slaveId, host, 2, 2048) + + def yarnNodeCapacityManager = Mock(YarnNodeCapacityManager) + def hbHandler = getNMHeartBeatHandler(yarnNodeCapacityManager) + + when: + hbHandler.handleStatusUpdate( + getHBEvent( + zeroNM, + fgsContainer1.containerStatus, + fgsContainer2.containerStatus, + fgsContainer3.containerStatus), + rmContext) + + then: + nodeStore.getNode(host).getContainerSnapshot().size() == 3 + 1 * yarnNodeCapacityManager.setNodeCapacity(zeroNM, Resources.createResource(4096, 4)) + } + + + RMNodeStartedEvent getNMRegistrationEvent(RMNode node) { + new RMNodeStartedEvent(node.getNodeID(), null, null) + } + + RMNodeStatusEvent getHBEvent(RMNode node, ContainerStatus... statuses) { + return new RMNodeStatusEvent(node.getNodeID(), null, Arrays.asList(statuses), null, null) + } + + NMHeartBeatHandler getNMHeartBeatHandler() { + return getNMHeartBeatHandler(Mock(YarnNodeCapacityManager)) + } + + NMHeartBeatHandler getNMHeartBeatHandler(YarnNodeCapacityManager yarnNodeCapacityMgr) { + def registry = Mock(InterceptorRegistry) + def state = Mock(SchedulerState) + return new NMHeartBeatHandler(registry, yarnScheduler, myriadDriver, + yarnNodeCapacityMgr, offerLifecycleManager, nodeStore, state) + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy new file mode 100644 index 0000000..bdebef2 --- /dev/null +++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.fgs + +import org.apache.hadoop.yarn.api.records.ContainerState +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent +import org.apache.hadoop.yarn.util.resource.Resources +import org.apache.mesos.Protos +import org.apache.myriad.configuration.NodeManagerConfiguration +import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry +import org.apache.myriad.state.NodeTask +import org.apache.myriad.state.SchedulerState + +/** + * + * Tests for YarnNodeCapacityManager + * + */ +class YarnNodeCapacityManagerSpec extends FGSTestBaseSpec { + + def "No Containers Allocated Due To Mesos Offers"() { + given: + def yarnNodeCapacityMgr = getYarnNodeCapacityManager() + + def host = "test_host" + def slaveId = Protos.SlaveID.newBuilder().setValue(host + "_slave_id").build() + def zeroNM = getRMNode(0, 0, host, slaveId) + + // have a mesos offer before HB + def offer = addOfferToFeed(slaveId, host, 4, 4096) + offerLifecycleManager.markAsConsumed(offer) + + // 2 containers before HB. + def fgsContainer1 = getFGSContainer(zeroNM, 1, 1, 1024, ContainerState.RUNNING) + def fgsContainer2 = getFGSContainer(zeroNM, 2, 1, 1024, ContainerState.RUNNING) + nodeStore.getNode(host).snapshotRunningContainers() + + // Node's capacity set to match the size of 2 containers + mesos offers + yarnNodeCapacityMgr.setNodeCapacity(zeroNM, Resources.createResource(6144, 6)) + + // no new container allocations + + when: + yarnNodeCapacityMgr.handleContainerAllocation(zeroNM) + + then: + nodeStore.getNode(host).getNode().getRunningContainers().size() == 2 // 2 containers still running + 1 * mesosDriver.declineOffer(offer.getId()) // offer rejected, as it's not used to allocate more containers + zeroNM.getTotalCapability().getVirtualCores() == 2 // capacity returns back to match size of running containers + zeroNM.getTotalCapability().getMemory() == 2048 + nodeStore.getNode(host).getContainerSnapshot() == null // container snapshot is released + } + + def "Containers Allocated Due To Mesos Offers"() { + given: + def yarnNodeCapacityMgr = getYarnNodeCapacityManager() + + def host = "test_host" + def slaveId = Protos.SlaveID.newBuilder().setValue(host + "_slave_id").build() + def zeroNM = getRMNode(0, 0, host, slaveId) + + // have a mesos offer before HB + def offer = addOfferToFeed(slaveId, host, 4, 4096) + offerLifecycleManager.markAsConsumed(offer) + + // 2 containers before HB. + def fgsContainer1 = getFGSContainer(zeroNM, 1, 1, 1024, ContainerState.RUNNING) + def fgsContainer2 = getFGSContainer(zeroNM, 2, 1, 1024, ContainerState.RUNNING) + nodeStore.getNode(host).snapshotRunningContainers() + + // Node's capacity set to match the size of 2 running containers + mesos offers + yarnNodeCapacityMgr.setNodeCapacity(zeroNM, Resources.createResource(6144, 6)) + + // 2 new containers allocated after HB + def fgsContainer3 = getFGSContainer(zeroNM, 3, 1, 1024, ContainerState.NEW) + def fgsContainer4 = getFGSContainer(zeroNM, 4, 1, 1024, ContainerState.NEW) + + when: + yarnNodeCapacityMgr.handleContainerAllocation(zeroNM) + + then: + nodeStore.getNode(host).getNode().getRunningContainers().size() == 4 // 2 running + 2 new + 1 * mesosDriver.launchTasks(_ as Collection<Protos.OfferID>, _ as List<Protos.TaskInfo>) // for place holder tasks + zeroNM.getTotalCapability().getVirtualCores() == 4 // capacity equals size of running + new containers + zeroNM.getTotalCapability().getMemory() == 4096 + nodeStore.getNode(host).getContainerSnapshot() == null // container snapshot is released + } + + def "Set Node Capacity"() { + given: + def zeroNM = getRMNode(0, 0, "test_host", null) + def yarnNodeCapacityMgr = getYarnNodeCapacityManager() + + when: + yarnNodeCapacityMgr.setNodeCapacity(zeroNM, Resources.createResource(2048, 2)) + + then: + zeroNM.getTotalCapability().getMemory() == 2048 + zeroNM.getTotalCapability().getVirtualCores() == 2 + 1 * rmContext.getDispatcher().getEventHandler().handle(_ as NodeResourceUpdateSchedulerEvent) + } + + YarnNodeCapacityManager getYarnNodeCapacityManager() { + def registry = Mock(InterceptorRegistry) + def executorInfo = Protos.ExecutorInfo.newBuilder() + .setExecutorId(Protos.ExecutorID.newBuilder().setValue("some_id")) + .setCommand(Protos.CommandInfo.newBuilder()) + .build() + def nodeTask = Mock(NodeTask) { + getExecutorInfo() >> executorInfo + } + def state = Mock(SchedulerState) { + getNodeTask(_, NodeManagerConfiguration.NM_TASK_PREFIX) >> nodeTask + } + return new YarnNodeCapacityManager(registry, yarnScheduler, rmContext, + myriadDriver, offerLifecycleManager, nodeStore, state) + + } +}