http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandlerSpec.groovy
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandlerSpec.groovy
 
b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandlerSpec.groovy
new file mode 100644
index 0000000..4505676
--- /dev/null
+++ 
b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandlerSpec.groovy
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.fgs
+
+import org.apache.hadoop.yarn.api.records.ContainerState
+import org.apache.hadoop.yarn.api.records.ContainerStatus
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent
+import org.apache.hadoop.yarn.util.resource.Resources
+import org.apache.mesos.Protos
+import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry
+import org.apache.myriad.state.SchedulerState
+import org.slf4j.Logger
+
+/**
+ *
+ * Tests for NMHeartBeatHandler
+ *
+ */
+class NMHeartBeatHandlerSpec extends FGSTestBaseSpec {
+
+    def "Node Manager registration"() {
+        given:
+        def hbHandler = getNMHeartBeatHandler()
+        hbHandler.logger = Mock(Logger)
+
+        def nonZeroNM = getRMNode(2, 2048, "test_host1", null)
+        def zeroNM = getRMNode(0, 0, "test_host2", null)
+
+        when:
+        hbHandler.beforeRMNodeEventHandled(getNMRegistrationEvent(nonZeroNM), 
rmContext)
+
+        then:
+        1 * hbHandler.logger.warn('FineGrainedScaling feature got invoked for 
a NM with non-zero capacity. ' +
+                'Host: {}, Mem: {}, CPU: {}. Setting the NM\'s capacity to 
(0G,0CPU)', 'test_host1', 2048, 2)
+        nonZeroNM.getTotalCapability().getMemory() == 0
+        nonZeroNM.getTotalCapability().getVirtualCores() == 0
+
+        when:
+        hbHandler.beforeRMNodeEventHandled(getNMRegistrationEvent(zeroNM), 
rmContext)
+
+        then:
+        0 * hbHandler.logger.warn(_) // no logger.warn invoked
+        nonZeroNM.getTotalCapability().getMemory() == 0
+        nonZeroNM.getTotalCapability().getVirtualCores() == 0
+    }
+
+    def "Node Manager HeartBeat"() {
+        given:
+        def host = "test_host"
+        def slaveId = Protos.SlaveID.newBuilder().setValue(host + 
"_slave_id").build()
+        def zeroNM = getRMNode(0, 0, host, slaveId)
+
+        def fgsContainer1 = getFGSContainer(zeroNM, 1, 1, 1024, 
ContainerState.RUNNING)
+        def fgsContainer2 = getFGSContainer(zeroNM, 2, 1, 1024, 
ContainerState.COMPLETE)
+        def fgsContainer3 = getFGSContainer(zeroNM, 3, 1, 1024, 
ContainerState.RUNNING)
+
+        addOfferToFeed(slaveId, host, 2, 2048)
+
+        def yarnNodeCapacityManager = Mock(YarnNodeCapacityManager)
+        def hbHandler = getNMHeartBeatHandler(yarnNodeCapacityManager)
+
+        when:
+        hbHandler.handleStatusUpdate(
+                getHBEvent(
+                        zeroNM,
+                        fgsContainer1.containerStatus,
+                        fgsContainer2.containerStatus,
+                        fgsContainer3.containerStatus),
+                rmContext)
+
+        then:
+        nodeStore.getNode(host).getContainerSnapshot().size() == 3
+        1 * yarnNodeCapacityManager.setNodeCapacity(zeroNM, 
Resources.createResource(4096, 4))
+    }
+
+
+    RMNodeStartedEvent getNMRegistrationEvent(RMNode node) {
+        new RMNodeStartedEvent(node.getNodeID(), null, null)
+    }
+
+    RMNodeStatusEvent getHBEvent(RMNode node, ContainerStatus... statuses) {
+        return new RMNodeStatusEvent(node.getNodeID(), null, 
Arrays.asList(statuses), null, null)
+    }
+
+    NMHeartBeatHandler getNMHeartBeatHandler() {
+        return getNMHeartBeatHandler(Mock(YarnNodeCapacityManager))
+    }
+
+    NMHeartBeatHandler getNMHeartBeatHandler(YarnNodeCapacityManager 
yarnNodeCapacityMgr) {
+        def registry = Mock(InterceptorRegistry)
+        def state = Mock(SchedulerState)
+        return new NMHeartBeatHandler(registry, yarnScheduler, myriadDriver,
+                yarnNodeCapacityMgr, offerLifecycleManager, nodeStore, state)
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
 
b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
new file mode 100644
index 0000000..bdebef2
--- /dev/null
+++ 
b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler.fgs
+
+import org.apache.hadoop.yarn.api.records.ContainerState
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent
+import org.apache.hadoop.yarn.util.resource.Resources
+import org.apache.mesos.Protos
+import org.apache.myriad.configuration.NodeManagerConfiguration
+import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry
+import org.apache.myriad.state.NodeTask
+import org.apache.myriad.state.SchedulerState
+
+/**
+ *
+ * Tests for YarnNodeCapacityManager
+ *
+ */
+class YarnNodeCapacityManagerSpec extends FGSTestBaseSpec {
+
+    def "No Containers Allocated Due To Mesos Offers"() {
+        given:
+        def yarnNodeCapacityMgr = getYarnNodeCapacityManager()
+
+        def host = "test_host"
+        def slaveId = Protos.SlaveID.newBuilder().setValue(host + 
"_slave_id").build()
+        def zeroNM = getRMNode(0, 0, host, slaveId)
+
+        // have a mesos offer before HB
+        def offer = addOfferToFeed(slaveId, host, 4, 4096)
+        offerLifecycleManager.markAsConsumed(offer)
+
+        //  2 containers before HB.
+        def fgsContainer1 = getFGSContainer(zeroNM, 1, 1, 1024, 
ContainerState.RUNNING)
+        def fgsContainer2 = getFGSContainer(zeroNM, 2, 1, 1024, 
ContainerState.RUNNING)
+        nodeStore.getNode(host).snapshotRunningContainers()
+
+        // Node's capacity set to match the size of 2 containers + mesos offers
+        yarnNodeCapacityMgr.setNodeCapacity(zeroNM, 
Resources.createResource(6144, 6))
+
+        // no new container allocations
+
+        when:
+        yarnNodeCapacityMgr.handleContainerAllocation(zeroNM)
+
+        then:
+        nodeStore.getNode(host).getNode().getRunningContainers().size() == 2  
// 2 containers still running
+        1 * mesosDriver.declineOffer(offer.getId())   // offer rejected, as 
it's not used to allocate more containers
+        zeroNM.getTotalCapability().getVirtualCores() == 2 // capacity returns 
back to match size of running containers
+        zeroNM.getTotalCapability().getMemory() == 2048
+        nodeStore.getNode(host).getContainerSnapshot() == null // container 
snapshot is released
+    }
+
+    def "Containers Allocated Due To Mesos Offers"() {
+        given:
+        def yarnNodeCapacityMgr = getYarnNodeCapacityManager()
+
+        def host = "test_host"
+        def slaveId = Protos.SlaveID.newBuilder().setValue(host + 
"_slave_id").build()
+        def zeroNM = getRMNode(0, 0, host, slaveId)
+
+        // have a mesos offer before HB
+        def offer = addOfferToFeed(slaveId, host, 4, 4096)
+        offerLifecycleManager.markAsConsumed(offer)
+
+        //  2 containers before HB.
+        def fgsContainer1 = getFGSContainer(zeroNM, 1, 1, 1024, 
ContainerState.RUNNING)
+        def fgsContainer2 = getFGSContainer(zeroNM, 2, 1, 1024, 
ContainerState.RUNNING)
+        nodeStore.getNode(host).snapshotRunningContainers()
+
+        // Node's capacity set to match the size of 2 running containers + 
mesos offers
+        yarnNodeCapacityMgr.setNodeCapacity(zeroNM, 
Resources.createResource(6144, 6))
+
+        // 2 new containers allocated after HB
+        def fgsContainer3 = getFGSContainer(zeroNM, 3, 1, 1024, 
ContainerState.NEW)
+        def fgsContainer4 = getFGSContainer(zeroNM, 4, 1, 1024, 
ContainerState.NEW)
+
+        when:
+        yarnNodeCapacityMgr.handleContainerAllocation(zeroNM)
+
+        then:
+        nodeStore.getNode(host).getNode().getRunningContainers().size() == 4  
// 2 running + 2 new
+        1 * mesosDriver.launchTasks(_ as Collection<Protos.OfferID>, _ as 
List<Protos.TaskInfo>) // for place holder tasks
+        zeroNM.getTotalCapability().getVirtualCores() == 4 // capacity equals 
size of running + new containers
+        zeroNM.getTotalCapability().getMemory() == 4096
+        nodeStore.getNode(host).getContainerSnapshot() == null // container 
snapshot is released
+    }
+
+    def "Set Node Capacity"() {
+        given:
+        def zeroNM = getRMNode(0, 0, "test_host", null)
+        def yarnNodeCapacityMgr = getYarnNodeCapacityManager()
+
+        when:
+        yarnNodeCapacityMgr.setNodeCapacity(zeroNM, 
Resources.createResource(2048, 2))
+
+        then:
+        zeroNM.getTotalCapability().getMemory() == 2048
+        zeroNM.getTotalCapability().getVirtualCores() == 2
+        1 * rmContext.getDispatcher().getEventHandler().handle(_ as 
NodeResourceUpdateSchedulerEvent)
+    }
+
+    YarnNodeCapacityManager getYarnNodeCapacityManager() {
+        def registry = Mock(InterceptorRegistry)
+        def executorInfo = Protos.ExecutorInfo.newBuilder()
+                
.setExecutorId(Protos.ExecutorID.newBuilder().setValue("some_id"))
+                .setCommand(Protos.CommandInfo.newBuilder())
+                .build()
+        def nodeTask = Mock(NodeTask) {
+            getExecutorInfo() >> executorInfo
+        }
+        def state = Mock(SchedulerState) {
+            getNodeTask(_, NodeManagerConfiguration.NM_TASK_PREFIX) >> nodeTask
+        }
+        return new YarnNodeCapacityManager(registry, yarnScheduler, rmContext,
+                myriadDriver, offerLifecycleManager, nodeStore, state)
+
+    }
+}

Reply via email to