Implementation of MYRIAD-229, MYRIAD-237, MYRIAD-238, MYRIAD-225
JIRA:
        [MYRIAD-225] https://issues.apache.org/jira/browse/MYRIAD-225
        [MYRIAD-229] https://issues.apache.org/jira/browse/MYRIAD-239
        [MYRIAD-237] https://issues.apache.org/jira/browse/MYRIAD-237
        [MYRIAD-237] https://issues.apache.org/jira/browse/MYRIAD-237
Pull Request:
  Closes #91

Author:    hokiegeek2 <hokiege...@gmail.com>
Date:      Wed Aug 17 15:21:56 2016 -0400


Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/577c30b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/577c30b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/577c30b1

Branch: refs/heads/master
Commit: 577c30b1abdaa30313251c887c014aebac3fd93c
Parents: 7aea259
Author: hokiegeek2 <hokiege...@gmail.com>
Authored: Wed Aug 17 15:21:56 2016 -0400
Committer: darinj <dar...@apache.org>
Committed: Tue Aug 30 13:08:36 2016 -0400

----------------------------------------------------------------------
 build.gradle                                    |   1 -
 gradle/spock.gradle                             |  46 ---
 .../src/main/java/org/apache/myriad/Main.java   |  18 +-
 .../java/org/apache/myriad/MyriadModule.java    |   2 +-
 .../myriad/api/SchedulerStateResource.java      |   2 +-
 .../configuration/NodeManagerConfiguration.java |  18 +-
 .../myriad/scheduler/MyriadOperations.java      |   9 +-
 .../org/apache/myriad/scheduler/Rebalancer.java |   4 +-
 .../apache/myriad/scheduler/SchedulerUtils.java |   2 +-
 .../apache/myriad/scheduler/TaskFactory.java    |   2 +-
 .../apache/myriad/scheduler/TaskTerminator.java |   4 +-
 .../org/apache/myriad/scheduler/TaskUtils.java  |   8 +-
 .../handlers/StatusUpdateEventHandler.java      |   2 +-
 .../scheduler/fgs/NMHeartBeatHandler.java       |  67 +++--
 .../scheduler/fgs/OfferLifecycleManager.java    |  67 +++--
 .../scheduler/fgs/YarnNodeCapacityManager.java  |  10 +-
 .../scheduler/yarn/MyriadFairScheduler.java     |   5 +
 .../org/apache/myriad/state/SchedulerState.java |  14 +-
 .../MyriadFileSystemRMStateStoreTest.java       |  22 +-
 .../org/apache/myriad/BaseConfigurableTest.java |  77 ++++-
 .../org/apache/myriad/MyriadTestModule.java     |  32 +-
 .../org/apache/myriad/TestObjectFactory.java    | 290 +++++++++++++------
 .../myriad/api/ArtifactsResourceTest.java       |  17 ++
 .../myriad/api/SchedulerStateResourceTest.java  |  17 ++
 .../configuration/MyriadConfigurationTest.java  |   5 +-
 .../myriad/health/HealthCheckUtilsTest.java     |  17 ++
 .../health/MesosDriverHealthCheckTest.java      |  17 ++
 .../myriad/scheduler/MyriadOperationsTest.java  |  97 ++++---
 .../myriad/scheduler/SchedulerUtilsSpec.groovy  |  90 ------
 .../myriad/scheduler/SchedulerUtilsTest.java    |  89 ++++++
 .../apache/myriad/scheduler/TestTaskUtils.java  |   3 +-
 .../constraints/LikeConstraintSpec.groovy       |  93 ------
 .../constraints/LikeConstraintTest.java         |  86 ++++++
 .../myriad/scheduler/fgs/FGSTestBaseSpec.groovy | 175 -----------
 .../scheduler/fgs/NMHeartBeatHandlerSpec.groovy | 114 --------
 .../scheduler/fgs/NMHeartBeatHandlerTest.java   | 240 +++++++++++++++
 .../myriad/scheduler/fgs/NodeStoreTest.java     |  23 +-
 .../apache/myriad/scheduler/fgs/NodeTest.java   |  89 ++++++
 .../fgs/OfferLifeCycleManagerTest.java          |  22 +-
 .../fgs/YarnNodeCapacityManagerSpec.groovy      | 143 ---------
 .../fgs/YarnNodeCapacityManagerTest.java        | 149 ++++++----
 .../org/apache/myriad/state/ClusterTest.java    |  17 ++
 .../org/apache/myriad/state/MockDispatcher.java |  28 +-
 .../org/apache/myriad/state/MockFuture.java     |  17 ++
 .../java/org/apache/myriad/state/MockRMApp.java |  17 ++
 .../org/apache/myriad/state/MockRMContext.java  |  25 +-
 .../org/apache/myriad/state/MockRMNode.java     |  17 ++
 .../java/org/apache/myriad/state/MockState.java |  17 ++
 .../org/apache/myriad/state/MockVariable.java   |  17 ++
 .../apache/myriad/state/MyriadStateTest.java    |  17 ++
 .../org/apache/myriad/state/NodeTaskTest.java   |  17 ++
 .../apache/myriad/state/SchedulerStateTest.java | 159 +++++++---
 .../state/utils/ByteBufferSupportTest.java      |  17 ++
 .../webapp/HttpConnectorProviderTest.java       |  17 ++
 .../myriad/webapp/MyriadWebServerTest.java      |  17 ++
 .../resources/myriad-config-test-default.yml    |  39 +--
 56 files changed, 1590 insertions(+), 1036 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index b48cfc9..a551ba0 100644
--- a/build.gradle
+++ b/build.gradle
@@ -57,7 +57,6 @@ subprojects {
 
     apply plugin: 'java'
     apply plugin: 'application'
-    apply from: "$rootDir/gradle/spock.gradle"
     apply from: "$rootDir/gradle/quality.gradle"
 
     sourceCompatibility = '1.7'

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/gradle/spock.gradle
----------------------------------------------------------------------
diff --git a/gradle/spock.gradle b/gradle/spock.gradle
deleted file mode 100644
index fc974ec..0000000
--- a/gradle/spock.gradle
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-* 
-*   http://www.apache.org/licenses/LICENSE-2.0
-* 
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-// used for unit tests
-apply plugin: 'groovy'
-
-def spockVersion = '1.0-groovy-2.4'
-def powermockVersion = "1.6.1"
-
-dependencies {
-
-    testCompile "org.codehaus.groovy:groovy-all:2.4.1"
-    testCompile "org.spockframework:spock-core:$spockVersion"
-
-    testCompile 'cglib:cglib-nodep:2.2.2'               // need to mock classes
-
-    // useful to mock out statics and final classes in Java.
-    testCompile "org.powermock:powermock-module-junit4:$powermockVersion"
-    testCompile "org.powermock:powermock-module-junit4-rule:$powermockVersion"
-    testCompile 
"org.powermock:powermock-classloading-xstream:$powermockVersion"
-    testCompile "org.powermock:powermock-api-mockito:$powermockVersion"
-}
-
-// for spock to live in test java tree
-sourceSets {
-    test {
-        groovy { srcDir 'src/test/java' }
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/Main.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/Main.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/Main.java
index 8c028f1..463c4c5 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/Main.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/Main.java
@@ -79,18 +79,6 @@ public class Main {
 
   private static Injector injector;
 
-  /**
-  * Main is the bootstrap class for the Myriad scheduler, managing the 
lifecycles of
-  * the following components:
-  *
-  *  1. MyriadDriverManager
-  *  2. MyriadWebServer
-  *  3. TaskTerminator
-  *  4. HealthCheckRegistry
-  *
-  *  Main uses the Guice Injector framework to manage the Myriad object graph 
and is
-  *  configured by myriad-config-default.yml
-  */
   public static void initialize(Configuration hadoopConf, 
AbstractYarnScheduler yarnScheduler, RMContext rmContext,
                                 InterceptorRegistry registry) throws Exception 
{
     MyriadModule myriadModule = new MyriadModule("myriad-config-default.yml", 
hadoopConf, yarnScheduler, rmContext, registry);
@@ -217,19 +205,19 @@ public class Main {
     SchedulerState schedulerState = injector.getInstance(SchedulerState.class);
 
     Set<org.apache.myriad.state.NodeTask> launchedNMTasks = new HashSet<>();
-    
launchedNMTasks.addAll(schedulerState.getPendingTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX));
+    
launchedNMTasks.addAll(schedulerState.getPendingTasksByType(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX));
     if (!launchedNMTasks.isEmpty()) {
       LOGGER.info("{} NM(s) in pending state. Not launching additional NMs", 
launchedNMTasks.size());
       return;
     }
 
-    
launchedNMTasks.addAll(schedulerState.getStagingTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX));
+    
launchedNMTasks.addAll(schedulerState.getStagingTasksByType(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX));
     if (!launchedNMTasks.isEmpty()) {
       LOGGER.info("{} NM(s) in staging state. Not launching additional NMs", 
launchedNMTasks.size());
       return;
     }
 
-    
launchedNMTasks.addAll(schedulerState.getActiveTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX));
+    
launchedNMTasks.addAll(schedulerState.getActiveTasksByType(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX));
     if (!launchedNMTasks.isEmpty()) {
       LOGGER.info("{} NM(s) in active state. Not launching additional NMs", 
launchedNMTasks.size());
       return;

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java
index bb560a4..4bcb628 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java
@@ -102,7 +102,7 @@ public class MyriadModule extends AbstractModule {
     bind(NMHeartBeatHandler.class).asEagerSingleton();
 
     MapBinder<String, TaskFactory> mapBinder = 
MapBinder.newMapBinder(binder(), String.class, TaskFactory.class);
-    
mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactory.class).in(Scopes.SINGLETON);
+    
mapBinder.addBinding(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX).to(NMTaskFactory.class).in(Scopes.SINGLETON);
 
     Map<String, ServiceConfiguration> auxServicesConfigs = 
cfg.getServiceConfigurations();
     for (Map.Entry<String, ServiceConfiguration> entry : 
auxServicesConfigs.entrySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java
index ae21c69..2050199 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java
@@ -51,7 +51,7 @@ public class SchedulerStateResource {
   @GET
   public GetSchedulerStateResponse getState() {
     return new 
GetSchedulerStateResponse(toStringCollection(state.getPendingTaskIds()), 
toStringCollection(
-        state.getStagingTaskIds()), 
toStringCollection(state.getActiveTaskIds()), 
toStringCollection(state.getKillableTasks()));
+        state.getStagingTaskIds()), 
toStringCollection(state.getActiveTaskIds()), 
toStringCollection(state.getKillableTaskIds()));
   }
 
   private Collection<String> toStringCollection(Collection<Protos.TaskID> 
collection) {

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
index 56ea43d..79a8301 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
@@ -56,8 +56,13 @@ public class NodeManagerConfiguration {
   /**
    * Default NodeManager Mesos task prefix
    */
-  public static final String NM_TASK_PREFIX = "nm";
-
+  public static final String DEFAULT_NM_TASK_PREFIX = "nm";
+  
+  /**
+   * Default max CPU cores for NodeManager JVM
+   */
+  public static final double DEFAULT_NM_MAX_CPUS = 24;
+  
   /**
    * Translates to -Xmx for the NodeManager JVM.
    */
@@ -85,7 +90,10 @@ public class NodeManagerConfiguration {
    */
   @JsonProperty
   private Boolean cgroups;
-
+  
+  @JsonProperty
+  private Double maxCpus;
+  
   private Double generateNodeManagerMemory() {
     return (NodeManagerConfiguration.DEFAULT_JVM_MAX_MEMORY_MB) * (1 + 
NodeManagerConfiguration.JVM_OVERHEAD);
   }
@@ -117,4 +125,8 @@ public class NodeManagerConfiguration {
   public boolean getCgroups() {
     return Optional.fromNullable(cgroups).or(DEFAULT_NM_CGROUPS);
   }
+  
+  public Double getMaxCpus() {
+    return Optional.fromNullable(maxCpus).or(DEFAULT_NM_MAX_CPUS);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java
index fb1f1bb..13e83cd 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java
@@ -40,6 +40,7 @@ import org.apache.myriad.webapp.MyriadWebServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.inject.Inject;
 
@@ -49,7 +50,6 @@ import com.google.inject.Inject;
 public class MyriadOperations {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MyriadOperations.class);
   private final SchedulerState schedulerState;
-
   private MyriadConfiguration cfg;
   private NodeScaleDownPolicy nodeScaleDownPolicy;
   private MyriadDriverManager driverManager;
@@ -69,12 +69,17 @@ public class MyriadOperations {
       myriadStateStore = (MyriadStateStore) rmContext.getStateStore();
     }
   }
+  
+  @VisibleForTesting
+  protected SchedulerState getSchedulerState() {
+    return schedulerState;
+  }
 
   public void flexUpCluster(ServiceResourceProfile serviceResourceProfile, int 
instances, Constraint constraint) {
     Collection<NodeTask> nodes = new HashSet<>();
     for (int i = 0; i < instances; i++) {
       NodeTask nodeTask = new NodeTask(serviceResourceProfile, constraint);
-      nodeTask.setTaskPrefix(NodeManagerConfiguration.NM_TASK_PREFIX);
+      nodeTask.setTaskPrefix(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX);
       nodes.add(nodeTask);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java
index 3c3ce79..aa09f89 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java
@@ -46,8 +46,8 @@ public class Rebalancer implements Runnable {
 
   @Override
   public void run() {
-    final Set<Protos.TaskID> activeIds = 
schedulerState.getActiveTaskIds(NodeManagerConfiguration.NM_TASK_PREFIX);
-    final Set<Protos.TaskID> pendingIds = 
schedulerState.getPendingTaskIds(NodeManagerConfiguration.NM_TASK_PREFIX);
+    final Set<Protos.TaskID> activeIds = 
schedulerState.getActiveTaskIds(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX);
+    final Set<Protos.TaskID> pendingIds = 
schedulerState.getPendingTaskIds(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX);
     LOGGER.info("Active {}, Pending {}", activeIds.size(), pendingIds.size());
     if (activeIds.size() < 1 && pendingIds.size() < 1) {
       myriadOperations.flexUpCluster(profileManager.get("small"), 1, null);

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java
index 693ae63..583be2f 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java
@@ -59,7 +59,7 @@ public class SchedulerUtils {
    * @return
    */
   public static boolean isEligibleForFineGrainedScaling(String hostName, 
SchedulerState state) {
-    for (NodeTask activeNMTask : 
state.getActiveTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX)) {
+    for (NodeTask activeNMTask : 
state.getActiveTasksByType(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX)) {
       if (activeNMTask.getProfile().getCpus() == 0 &&
           activeNMTask.getProfile().getMemory() == 0 &&
           activeNMTask.getHostname().equals(hostName)) {

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java
index 7e63e0d..8c806c1 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java
@@ -150,7 +150,7 @@ public abstract class TaskFactory {
    * Simple helper to convert Mesos Range Resource to a list of longs.
    */
   protected List<Long> rangesConverter(List<Protos.Resource> rangeResources) {
-    List<Long> ret = new ArrayList();
+    List<Long> ret = new ArrayList<Long>();
     for (Protos.Resource range : rangeResources) {
       ret.add(range.getRanges().getRange(0).getBegin());
     }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java
index 4110b37..e695b40 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java
@@ -62,12 +62,12 @@ public class TaskTerminator implements Runnable {
   @Override
   public void run() { 
     //If there are 1..n killable tasks, proceed; otherwise, simply return
-    if (CollectionUtils.isNotEmpty(schedulerState.getKillableTasks())) {
+    if (CollectionUtils.isNotEmpty(schedulerState.getKillableTaskIds())) {
       /*
        * Clone the killable task collection, iterate through all tasks, and 
        * process any pending and/or non-pending tasks
        */
-      Set<TaskID> killableTasks = 
Sets.newHashSet(schedulerState.getKillableTasks());
+      Set<TaskID> killableTasks = 
Sets.newHashSet(schedulerState.getKillableTaskIds());
       Status driverStatus = driverManager.getDriverStatus();
 
       //TODO (hokiegeek2) Can the DriverManager be restarted? If not, should 
the ResourceManager stop?

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
index 4bd60bc..b26bdaf 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
@@ -46,7 +46,11 @@ public class TaskUtils {
   public double getNodeManagerMemory() {
     return cfg.getNodeManagerConfiguration().getJvmMaxMemoryMB();
   }
-  
+
+  public double getNodeManagerMaxCpus() {
+    return cfg.getNodeManagerConfiguration().getMaxCpus();
+  }
+
   public double getNodeManagerCpus() {
     return cfg.getNodeManagerConfiguration().getCpus();
   }
@@ -80,7 +84,7 @@ public class TaskUtils {
    */
   public Iterable<Protos.Resource> getScalarResource(Protos.Offer offer, 
String name, Double value, Double used) {
     String role = cfg.getFrameworkRole();
-    List<Protos.Resource> resources = new ArrayList<>();
+    List<Protos.Resource> resources = new ArrayList<Protos.Resource>();
 
     double resourceDifference = 0; //used to determine the resource difference 
of value and the resources requested from role *
     //Find role by name, must loop through resources

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
index 079df4b..b787b48 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
@@ -124,6 +124,6 @@ public class StatusUpdateEventHandler implements 
EventHandler<StatusUpdateEvent>
     LOGGER.info("Removed {} task with id {}", stopReason, taskId);
   }
   private boolean taskIsKillable(TaskID taskId) {
-    return schedulerState.getKillableTasks().contains(taskId);
+    return schedulerState.getKillableTaskIds().contains(taskId);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java
index 86bbc8c..e16e8b4 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java
@@ -35,6 +35,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.Offer;
+import org.apache.myriad.configuration.NodeManagerConfiguration;
 import org.apache.myriad.scheduler.MyriadDriver;
 import org.apache.myriad.scheduler.SchedulerUtils;
 import org.apache.myriad.scheduler.yarn.interceptor.BaseInterceptor;
@@ -56,11 +57,12 @@ public class NMHeartBeatHandler extends BaseInterceptor {
   private final OfferLifecycleManager offerLifecycleMgr;
   private final NodeStore nodeStore;
   private final SchedulerState state;
+  private final NodeManagerConfiguration conf;
 
   @Inject
   public NMHeartBeatHandler(InterceptorRegistry registry, 
AbstractYarnScheduler yarnScheduler, MyriadDriver myriadDriver,
                             YarnNodeCapacityManager yarnNodeCapacityMgr, 
OfferLifecycleManager offerLifecycleMgr,
-                            NodeStore nodeStore, SchedulerState state) {
+                            NodeStore nodeStore, SchedulerState state, 
NodeManagerConfiguration conf) {
 
     if (registry != null) {
       registry.register(this);
@@ -72,6 +74,7 @@ public class NMHeartBeatHandler extends BaseInterceptor {
     this.offerLifecycleMgr = offerLifecycleMgr;
     this.nodeStore = nodeStore;
     this.state = state;
+    this.conf = conf;
   }
 
   @Override
@@ -88,9 +91,11 @@ public class NMHeartBeatHandler extends BaseInterceptor {
   public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) {
     switch (event.getType()) {
       case STARTED:
+        // Since the RMNode was just started, it should not have a non-zero 
capacity
         RMNode rmNode = context.getRMNodes().get(event.getNodeId());
-        Resource totalCapability = rmNode.getTotalCapability();
-        if (totalCapability.getMemory() != 0 || 
totalCapability.getVirtualCores() != 0) {
+        
+        if (isNonZeroCapacityNode(rmNode)) {
+          Resource totalCapability = rmNode.getTotalCapability();
           logger.warn(
               "FineGrainedScaling feature got invoked for a NM with non-zero 
capacity. Host: {}, Mem: {}, CPU: {}. Setting the " +
               "NM's capacity to (0G,0CPU)", rmNode.getHostName(), 
totalCapability.getMemory(), totalCapability.getVirtualCores());
@@ -109,6 +114,12 @@ public class NMHeartBeatHandler extends BaseInterceptor {
   }
 
   @VisibleForTesting
+  protected boolean isNonZeroCapacityNode(RMNode node) {
+    Resource resource = node.getTotalCapability();
+    return (resource.getMemory() != 0 || resource.getVirtualCores() != 0);
+  }
+  
+  @VisibleForTesting
   protected void handleStatusUpdate(RMNodeEvent event, RMContext context) {
     if (!(event instanceof RMNodeStatusEvent)) {
       logger.error("{} not an instance of {}", event.getClass().getName(), 
RMNodeStatusEvent.class.getName());
@@ -124,25 +135,42 @@ public class NMHeartBeatHandler extends BaseInterceptor {
       host.snapshotRunningContainers();
     }
 
-    // New capacity of the node =
-    // resources under use on the node (due to previous offers) +
-    // new resources offered by mesos for the node
-    yarnNodeCapacityMgr.setNodeCapacity(rmNode, 
Resources.add(getResourcesUnderUse(statusEvent), getNewResourcesOfferedByMesos(
-        hostName)));
+    /*
+     * Set the new node capacity which is the sum of the current node 
resources plus those offered by Mesos. 
+     * If the sum is greater than the max capacity of the node, reject the 
offer.
+     */
+    Resource offeredResources = getNewResourcesOfferedByMesos(hostName);
+    Resource currentResources = getResourcesUnderUse(statusEvent);
+    
+    if (offerWithinResourceLimits(currentResources, offeredResources)) {
+      yarnNodeCapacityMgr.setNodeCapacity(rmNode, 
Resources.add(currentResources, offeredResources));
+      logger.info("Updated resources for {} with {} cores and {} memory", 
rmNode.getNode().getName(), 
+              offeredResources.getVirtualCores(), 
offeredResources.getMemory());
+    } else {
+      logger.info("Did not update {} with {} cores and {} memory, over max cpu 
cores and/or max memory", 
+              rmNode.getNode().getName(), offeredResources.getVirtualCores(), 
offeredResources.getMemory());
+    }
   }
-
-  private Resource getNewResourcesOfferedByMesos(String hostname) {
+  
+  @VisibleForTesting
+  protected boolean offerWithinResourceLimits(Resource currentResources, 
Resource offeredResources) {
+    int newMemory = currentResources.getMemory() + 
offeredResources.getMemory();
+    int newCores = currentResources.getVirtualCores() + 
offeredResources.getVirtualCores();
+       
+    return (newMemory <= conf.getJvmMaxMemoryMB() && newCores <= 
conf.getMaxCpus());
+  }
+  
+  @VisibleForTesting
+  protected Resource getNewResourcesOfferedByMesos(String hostname) {
     OfferFeed feed = offerLifecycleMgr.getOfferFeed(hostname);
-    if (feed == null) {
-      logger.debug("No offer feed for: {}", hostname);
-      return Resource.newInstance(0, 0);
-    }
     List<Offer> offers = new ArrayList<>();
     Protos.Offer offer;
+        
     while ((offer = feed.poll()) != null) {
-      offers.add(offer);
+      offers.add(offer);     
       offerLifecycleMgr.markAsConsumed(offer);
     }
+    
     Resource fromMesosOffers = 
OfferUtils.getYarnResourcesFromMesosOffers(offers);
 
     if (logger.isDebugEnabled()) {
@@ -153,10 +181,11 @@ public class NMHeartBeatHandler extends BaseInterceptor {
     return fromMesosOffers;
   }
 
-  private Resource getResourcesUnderUse(RMNodeStatusEvent statusEvent) {
+  @VisibleForTesting
+  protected Resource getResourcesUnderUse(RMNodeStatusEvent statusEvent) {
     Resource usedResources = Resource.newInstance(0, 0);
     for (ContainerStatus status : statusEvent.getContainers()) {
-      if (status.getState() == ContainerState.NEW || status.getState() == 
ContainerState.RUNNING) {
+      if (containerInUse(status)) {
         RMContainer rmContainer = 
yarnScheduler.getRMContainer(status.getContainerId());
         // (sdaingade) This check is needed as RMContainer information may not 
be populated
         // immediately after a RM restart.
@@ -167,4 +196,8 @@ public class NMHeartBeatHandler extends BaseInterceptor {
     }
     return usedResources;
   }
+  
+  private boolean containerInUse(ContainerStatus status) {
+    return (status.getState() == ContainerState.NEW || status.getState() == 
ContainerState.RUNNING);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java
index e4cec83..9698fe7 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 
 /**
  * Manages the Mesos offers tracked by Myriad.
@@ -57,10 +58,31 @@ public class OfferLifecycleManager {
     this.myriadDriver = myriadDriver;
   }
 
-  public OfferFeed getOfferFeed(String hostname) {
-    return offerFeedMap.get(hostname);
+  /**
+   * Retrieves the OfferFeed for a host and, if null, creates a new OfferFeed
+   * for a host that has either been added to, or rejoined, the Mesos cluster.
+   * 
+   * @param hostname
+   * @return feed
+   */
+  @VisibleForTesting
+  protected OfferFeed getOfferFeed(String hostname) {
+    OfferFeed feed = offerFeedMap.get(hostname);
+    if (feed == null) {
+      feed = new OfferFeed();
+      offerFeedMap.put(hostname, feed);
+    }
+    return feed;
   }
-
+  
+  protected Optional<Node> getOfferNode(String host) {
+    return Optional.fromNullable(nodeStore.getNode(host));
+  }
+  
+  protected Optional<Offer> getOffer(OfferFeed feed) {
+    return Optional.fromNullable(feed.poll());
+  }
+  
   public void declineOffer(Protos.Offer offer) {
     myriadDriver.getDriver().declineOffer(offer.getId());
     LOGGER.debug("Declined offer {}", offer.getId());
@@ -69,16 +91,12 @@ public class OfferLifecycleManager {
   public void addOffers(Protos.Offer... offers) {
     for (Protos.Offer offer : offers) {
       String hostname = offer.getHostname();
-      Node node = nodeStore.getNode(hostname);
-      if (node != null) {
-        OfferFeed feed = offerFeedMap.get(hostname);
-        if (feed == null) {
-          feed = new OfferFeed();
-          offerFeedMap.put(hostname, feed);
-        }
+   
+      Optional<Node> optNode = getOfferNode(hostname);
+      if (optNode.isPresent()) {
+        OfferFeed feed = getOfferFeed(hostname);
         feed.add(offer);
-
-        node.setSlaveId(offer.getSlaveId());
+        optNode.get().setSlaveId(offer.getSlaveId());
 
         LOGGER.debug("addResourceOffers: caching offer for host {}, offer id 
{}", hostname, offer.getId().getValue());
       } else {
@@ -98,6 +116,17 @@ public class OfferLifecycleManager {
     consumedOffer.add(offer);
   }
 
+  @VisibleForTesting
+  protected ConsumedOffer getConsumedOffer(String hostname) {
+    ConsumedOffer cOffer = consumedOfferMap.get(hostname);
+    if (cOffer == null) {
+      cOffer = new ConsumedOffer();
+      consumedOfferMap.put(hostname, cOffer);
+    }    
+    
+    return cOffer;
+  }
+  
   public ConsumedOffer drainConsumedOffer(String hostname) {
     return consumedOfferMap.remove(hostname);
   }
@@ -105,18 +134,14 @@ public class OfferLifecycleManager {
   public void declineOutstandingOffers(String hostname) {
     int numOutStandingOffers = 0;
     OfferFeed offerFeed = getOfferFeed(hostname);
-    Offer offer;
-    while (offerFeed != null && (offer = offerFeed.poll()) != null) {
-      declineOffer(offer);
+    Optional<Offer> optOffer;
+  
+    while ((optOffer = getOffer(offerFeed)).isPresent()) {
+      declineOffer(optOffer.get());
       numOutStandingOffers++;
     }
     if (numOutStandingOffers > 0) {
       LOGGER.info("Declined {} outstanding offers for host {}", 
numOutStandingOffers, hostname);
     }
   }
-  
-  @VisibleForTesting
-  public ConsumedOffer getConsumedOffer(String hostname) {
-    return consumedOfferMap.get(hostname);
-  }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
index 8f7c6f5..52cdfeb 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
@@ -18,14 +18,12 @@
  */
 package org.apache.myriad.scheduler.fgs;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+
 import javax.inject.Inject;
 
 import org.apache.hadoop.yarn.api.records.Container;
@@ -59,6 +57,10 @@ import org.apache.myriad.state.SchedulerState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 /**
  * Manages the capacity exposed by NodeManager. It uses the offers available
  * from Mesos to inflate the node capacity and lets ResourceManager make the
@@ -316,7 +318,7 @@ public class YarnNodeCapacityManager extends 
BaseInterceptor {
     // as this is now cached in the NodeTask object in scheduler state.
     Protos.ExecutorInfo executorInfo = node.getExecInfo();
     if (executorInfo == null) {
-      executorInfo = 
Protos.ExecutorInfo.newBuilder(state.getNodeTask(offer.getSlaveId(), 
NodeManagerConfiguration.NM_TASK_PREFIX)
+      executorInfo = 
Protos.ExecutorInfo.newBuilder(state.getNodeTask(offer.getSlaveId(), 
NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX)
           .getExecutorInfo()).setFrameworkId(offer.getFrameworkId()).build();
       node.setExecInfo(executorInfo);
     }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java
index 5251f19..b81b881 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java
@@ -28,6 +28,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor;
 import org.apache.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor;
@@ -101,5 +102,9 @@ public class MyriadFairScheduler extends FairScheduler {
     super.handle(event);
     this.yarnSchedulerInterceptor.afterSchedulerEventHandled(event);
   }
+  
+  public void addNode(FSSchedulerNode node) {
+    this.nodes.put(node.getNodeID(), node);
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java
index 8b5fb51..7ee2b62 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java
@@ -169,10 +169,10 @@ public class SchedulerState {
    * Return a list of TaskIDs corresponding to all killable tasks
    * @return
    */
-  public synchronized Set<Protos.TaskID> getKillableTasks() {
+  public synchronized Set<Protos.TaskID> getKillableTaskIds() {
     Set<Protos.TaskID> returnSet = new HashSet<>();
     for (Map.Entry<String, SchedulerStateForType> entry : 
statesForTaskType.entrySet()) {
-      returnSet.addAll(entry.getValue().getKillableTasks());
+      returnSet.addAll(entry.getValue().getKillableTaskIds());
     }
     return returnSet;
   }
@@ -183,9 +183,9 @@ public class SchedulerState {
    * @param taskPrefix
    * @return
    */
-  public synchronized Set<Protos.TaskID> getKillableTasks(String taskPrefix) {
+  public synchronized Set<Protos.TaskID> getKillableTaskIds(String taskPrefix) 
{
     SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix);
-    return (stateTask == null ? new HashSet<Protos.TaskID>() : 
stateTask.getKillableTasks());
+    return (stateTask == null ? new HashSet<Protos.TaskID>() : 
stateTask.getKillableTaskIds());
   }
 
   public synchronized void removeTask(Protos.TaskID taskId) {
@@ -384,7 +384,7 @@ public class SchedulerState {
 
     try {
       StoreContext sc = new StoreContext(frameworkId, tasks, 
getPendingTaskIds(), getStagingTaskIds(), getActiveTaskIds(),
-          getLostTaskIds(), getKillableTasks());
+          getLostTaskIds(), getKillableTaskIds());
       stateStore.storeMyriadState(sc);
     } catch (Exception e) {
       LOGGER.error("Failed to update scheduler state to state store", e);
@@ -411,7 +411,7 @@ public class SchedulerState {
         LOGGER.debug("State Store state includes frameworkId: {}, pending 
tasks count: {}, staging tasks count: {} " +
                      "active tasks count: {}, lost tasks count: {}, and 
killable tasks count: {}", frameworkId.getValue(),
                       this.getPendingTaskIds().size(), 
this.getStagingTaskIds().size(), this.getActiveTaskIds().size(),
-                      this.getLostTaskIds().size(), 
this.getKillableTasks().size());
+                      this.getLostTaskIds().size(), 
this.getKillableTaskIds().size());
       }
     } catch (Exception e) {
       LOGGER.error("Failed to read scheduler state from state store", e);
@@ -545,7 +545,7 @@ public class SchedulerState {
       return Collections.unmodifiableSet(this.lostTasks);
     }
 
-    public synchronized Set<Protos.TaskID> getKillableTasks() {
+    public synchronized Set<Protos.TaskID> getKillableTaskIds() {
       return Collections.unmodifiableSet(this.killableTasks);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStoreTest.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStoreTest.java
 
b/myriad-scheduler/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStoreTest.java
index a0a9ed1..df3e35b 100644
--- 
a/myriad-scheduler/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStoreTest.java
+++ 
b/myriad-scheduler/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStoreTest.java
@@ -6,7 +6,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.myriad.state.MockDispatcher;
+import org.apache.myriad.TestObjectFactory;
 import org.apache.myriad.state.MockRMApp;
 import org.junit.Test;
 
@@ -17,13 +17,16 @@ public class MyriadFileSystemRMStateStoreTest {
 
   @Test
   public void testInit() throws Exception {
-    Configuration conf = getConfiguration();
+    Configuration conf = new Configuration();
+    conf.set("yarn.resourcemanager.fs.state-store.uri", "file:///" + 
"/tmp/myriad-file-system-rm-state-store-test");
     MyriadFileSystemRMStateStore store = new MyriadFileSystemRMStateStore();
     assertTrue(store.isInState(STATE.NOTINITED));
     store.init(conf);
     assertTrue(store.isInState(STATE.INITED));
-    store.startInternal();
+    store.start();
+    assertTrue(store.isInState(STATE.STARTED));
     store.close();
+    assertTrue(store.isInState(STATE.STOPPED));
   }
 
   @Test
@@ -51,19 +54,8 @@ public class MyriadFileSystemRMStateStoreTest {
   }
 
   private MyriadFileSystemRMStateStore getInitializedStore() throws Exception {
-    Configuration conf = getConfiguration();
-    MyriadFileSystemRMStateStore store = new MyriadFileSystemRMStateStore();  
-    store.init(conf);
-    store.startInternal();
-    store.loadState();   
+    MyriadFileSystemRMStateStore store = TestObjectFactory.getStateStore(new 
Configuration(), "/tmp/myriad-file-system-rm-state-store-test");
     store.loadMyriadState();
-    store.setRMDispatcher(new MockDispatcher());
     return store;
   }
-  
-  private Configuration getConfiguration() {
-    Configuration conf = new Configuration();
-    conf.set("yarn.resourcemanager.fs.state-store.uri", "file:///tmp/");
-    return conf;
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/BaseConfigurableTest.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/test/java/org/apache/myriad/BaseConfigurableTest.java 
b/myriad-scheduler/src/test/java/org/apache/myriad/BaseConfigurableTest.java
index e5c3f57..13b97de 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/BaseConfigurableTest.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/BaseConfigurableTest.java
@@ -1,6 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.myriad;
 
+import java.io.File;
+import java.net.URL;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.myriad.configuration.MyriadConfiguration;
+import org.junit.After;
 import org.junit.Before;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -15,15 +38,63 @@ public class BaseConfigurableTest {
   protected MyriadConfiguration cfg;
   protected MyriadConfiguration cfgWithRole;
   protected MyriadConfiguration cfgWithDocker;
+  protected String baseStateStoreDirectory = StringUtils.EMPTY;
 
+  /**
+   * This is normally overridden in derived classes. Be sure to invoke this 
implementation; 
+   * otherwise, cfg, cfgWithRole, and cfgWithDocker will all be null.
+   * 
+   * @throws Exception
+   */
   @Before
   public void setUp() throws Exception {
     ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
-    cfg = 
mapper.readValue(Thread.currentThread().getContextClassLoader().getResource("myriad-config-test-default.yml"),
+    cfg = mapper.readValue(getConfURL("myriad-config-test-default.yml"),
     MyriadConfiguration.class);
-    cfgWithRole = 
mapper.readValue(Thread.currentThread().getContextClassLoader().getResource("myriad-config-test-default-with-framework-role.yml"),
+    cfgWithRole = 
mapper.readValue(getConfURL("myriad-config-test-default-with-framework-role.yml"),
             MyriadConfiguration.class);
-    cfgWithDocker = 
mapper.readValue(Thread.currentThread().getContextClassLoader().getResource("myriad-config-test-default-with-docker-info.yml"),
+    cfgWithDocker = 
mapper.readValue(getConfURL("myriad-config-test-default-with-docker-info.yml"),
                 MyriadConfiguration.class);
   } 
+
+  /**
+   * Deletes the directories and files that back the 
MyriadFileSystemRMStateStore to ensure there
+   * is no stale state within the MyriadFileSystemRMStateStore that could 
result in race conditions
+   * depending upon how the unit tests are executed.
+   * 
+   * @throws Exception
+   */
+  protected void resetStoreState() throws Exception {
+    checkConfiguration();
+    File rootFile = new File(baseStateStoreDirectory + 
"/FSRMStateRoot/RMMyriadRoot");
+    //Delete directory if present and recursively create directory path
+    FileUtils.deleteDirectory(rootFile);
+    FileUtils.forceMkdir(rootFile);
+
+    File storeFile = new File(rootFile.getAbsolutePath() + "/MyriadState");
+
+    if (!storeFile.createNewFile()) {
+      throw new IllegalStateException(rootFile.getAbsolutePath() + 
"/MyriadState could not be created");
+    }
+  }
+
+  /**
+   * Confirms the configuration of object graph is correct
+   * 
+   * @throws IllegalStateException
+   */
+  protected void checkConfiguration() throws IllegalStateException {
+    if (StringUtils.isEmpty(baseStateStoreDirectory)) {
+      throw new IllegalStateException("The baseStateStoreDirectory must be 
set, preferably in overridden setUp method");
+    }
+  }
+
+  private URL getConfURL(String file) {
+    return Thread.currentThread().getContextClassLoader().getResource(file);
+  }
+  
+  @After
+  public void cleanUp() throws Exception {
+    FileUtils.deleteDirectory(new File(baseStateStoreDirectory));
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java 
b/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java
index dc7a00b..4652f1c 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java
@@ -18,15 +18,27 @@
 
 package org.apache.myriad;
 
-import com.fasterxml.jackson.databind.*;
-import com.fasterxml.jackson.dataformat.yaml.*;
-import com.google.inject.*;
-import com.google.inject.multibindings.*;
-import java.io.*;
-import java.util.*;
-import org.apache.myriad.configuration.*;
-import org.apache.myriad.scheduler.*;
-import org.slf4j.*;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.configuration.NodeManagerConfiguration;
+import org.apache.myriad.configuration.ServiceConfiguration;
+import org.apache.myriad.scheduler.ExecutorCommandLineGenerator;
+import org.apache.myriad.scheduler.NMExecutorCommandLineGenerator;
+import org.apache.myriad.scheduler.NMTaskFactory;
+import org.apache.myriad.scheduler.ServiceTaskFactory;
+import org.apache.myriad.scheduler.TaskFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Scopes;
+import com.google.inject.Singleton;
+import com.google.inject.multibindings.MapBinder;
 
 /**
  * AbstractModule extension for UnitTests
@@ -57,7 +69,7 @@ public class MyriadTestModule extends AbstractModule {
     bind(MyriadConfiguration.class).toInstance(cfg);
 
     MapBinder<String, TaskFactory> mapBinder = 
MapBinder.newMapBinder(binder(), String.class, TaskFactory.class);
-    
mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactory.class).in(Scopes.SINGLETON);
+    
mapBinder.addBinding(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX).to(NMTaskFactory.class).in(Scopes.SINGLETON);
     Map<String, ServiceConfiguration> auxServicesConfigs = 
cfg.getServiceConfigurations();
     for (Map.Entry<String, ServiceConfiguration> entry : 
auxServicesConfigs.entrySet()) {
       String taskFactoryClass = 
entry.getValue().getTaskFactoryImplName().orNull();

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/TestObjectFactory.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/test/java/org/apache/myriad/TestObjectFactory.java 
b/myriad-scheduler/src/test/java/org/apache/myriad/TestObjectFactory.java
index 9117e3b..43014fa 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/TestObjectFactory.java
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/TestObjectFactory.java
@@ -1,40 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.myriad;
 
+import java.util.HashMap;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import 
org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import 
org.apache.hadoop.yarn.server.api.records.impl.pb.NodeHealthStatusPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import 
org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
-import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.MyriadFileSystemRMStateStore;
-import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.ExecutorInfo;
 import org.apache.mesos.Protos.FrameworkID;
 import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.Value.Type;
+import org.apache.mesos.SchedulerDriver;
 import org.apache.myriad.configuration.MyriadConfiguration;
-import org.apache.myriad.policy.LeastAMNodesFirstPolicy;
+import org.apache.myriad.scheduler.ExtendedResourceProfile;
 import org.apache.myriad.scheduler.MockSchedulerDriver;
 import org.apache.myriad.scheduler.MyriadDriver;
 import org.apache.myriad.scheduler.MyriadDriverManager;
-import org.apache.myriad.scheduler.MyriadOperations;
+import org.apache.myriad.scheduler.NMProfile;
+import org.apache.myriad.scheduler.ServiceResourceProfile;
+import org.apache.myriad.scheduler.constraints.LikeConstraint;
 import org.apache.myriad.scheduler.yarn.MyriadCapacityScheduler;
+import org.apache.myriad.scheduler.yarn.MyriadFairScheduler;
 import org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor;
 import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
 import org.apache.myriad.state.MockDispatcher;
 import org.apache.myriad.state.MockRMContext;
-import org.apache.myriad.state.MockRMNode;
+import org.apache.myriad.state.MyriadStateStore;
+import org.apache.myriad.state.NodeTask;
 import org.apache.myriad.state.SchedulerState;
 import org.apache.myriad.webapp.HttpConnectorProvider;
 import org.apache.myriad.webapp.MyriadWebServer;
@@ -44,28 +81,165 @@ import org.mortbay.jetty.servlet.DefaultServlet;
 import org.mortbay.jetty.servlet.ServletHandler;
 import org.mortbay.jetty.servlet.ServletHolder;
 
+import com.google.common.collect.Lists;
 import com.google.inject.servlet.GuiceFilter;
 
 /**
- * Factory for common standard and mock objects utilized for JUnit tests
+ * Factory for common objects utilized over 1..n JUnit tests
  */
 public class TestObjectFactory {
-  public static SchedulerState getSchedulerState(MyriadConfiguration cfg) 
throws Exception {
-    Configuration conf = new Configuration();
-    SchedulerState state = new 
SchedulerState(TestObjectFactory.getStateStore(conf, false));
+
+  /**
+   * Returns a new RMContainer corresponding to the RMNode and RMContext. The 
RMContainer is the 
+   * ResourceManager's view of an application container per the Hadoop docs
+   * 
+   * @param node
+   * @param context
+   * @param appId
+   * @param cores
+   * @param memory
+   * @return RMContainer
+   */
+  public static RMContainer getRMContainer(RMNode node, RMContext context, int 
appId, int cores, int memory) {
+    ContainerId containerId = 
ContainerId.newContainerId(ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(123456789, 1), 1), appId);
+
+    Container container = Container.newInstance(containerId, node.getNodeID(), 
node.getHttpAddress(),
+        Resources.createResource(memory, cores), null, null);
+    return new RMContainerImpl(container, 
containerId.getApplicationAttemptId(), node.getNodeID(), "user1", context);
+  }
+
+  public static RMNodeStatusEvent getRMStatusEvent(RMNode node) {
+    NodeId id = node.getNodeID();
+    NodeHealthStatus hStatus = NodeHealthStatusPBImpl.newInstance(true, 
"HEALTHY", System.currentTimeMillis());
+    List<ContainerStatus> cStatus = 
Lists.newArrayList(getContainerStatus(node));
+    List<ApplicationId> keepAliveIds = 
Lists.newArrayList(getApplicationId(node.getHttpPort()));
+    NodeHeartbeatResponse response = new NodeHeartbeatResponsePBImpl();
+    
+    return new RMNodeStatusEvent(id, hStatus, cStatus, keepAliveIds, response);
+  }
+  
+  private static ContainerStatus getContainerStatus(RMNode node) {
+    ContainerStatus status = new ContainerStatusPBImpl();
+    return status;
+  }
+
+  private static ApplicationId getApplicationId(int id) {
+    return ApplicationId.newInstance(System.currentTimeMillis(), id);
+  }
+  /**
+   * Returns a ServiceResourceProfile or ExtendedResourceProfile object 
depending upon
+   * whether the execCores and execMemory parameters are null or non-null, 
respectively
+   * 
+   * @param profileName
+   * @param cores
+   * @param memory
+   * @param execCores
+   * @param execMemory
+   * @return ServiceResourceProfile if execCores and execMemory are null, 
ExtendedResourceProfile otherwise
+   */
+  public static ServiceResourceProfile getServiceResourceProfile(String 
profileName, Double cores, Double memory, 
+      Long execCores, Long execMemory) {
+    if (isExtendedResource(execCores, execMemory)) {
+      NMProfile nmProfile = new NMProfile(profileName, execCores, execMemory);
+      return new ExtendedResourceProfile(nmProfile, cores, memory, new 
HashMap<String, Long>());
+    }
+    return new ServiceResourceProfile(profileName, cores, memory, new 
HashMap<String, Long>());
+  }
+  
+  private static boolean isExtendedResource(Long execCores, Long execMemory) {
+    return execCores != null && execMemory != null;
+  }
+  
+  /**
+   * Returns a NodeTask with either a ServiceResourceProfile or an 
ExtendedResourceProfile, 
+   * depending upon whether execCores and execMemory are null or non-null, 
respectively
+   * 
+   * @param profileName
+   * @param hostName
+   * @param cores
+   * @param memory
+   * @param execCores
+   * @param execMemory
+   * @return NodeTask
+   */
+  public static NodeTask getNodeTask(String profileName, String hostName, 
Double cores, Double memory, 
+      Long execCores, Long execMemory) {
+    NodeTask task = new NodeTask(getServiceResourceProfile(profileName, cores, 
memory, execCores, execMemory), 
+        new LikeConstraint(hostName, "host-[0-9]*.example.com"));
+    task.setHostname(hostName);
+    task.setTaskPrefix("nm");
+    task.setSlaveId(SlaveID.newBuilder().setValue(profileName + "-" + 
hostName).build());
+    
task.setExecutorInfo(ExecutorInfo.newBuilder().setExecutorId(ExecutorID.newBuilder().setValue("exec")).
+        
setCommand(org.apache.mesos.Protos.CommandInfo.newBuilder().setValue("command")).build());
+    return task;
+  }
+
+ /**
+  * Returns a NodeTask given a ServiceResourceProfile and hostname
+  * 
+  * @param hostName
+  * @param profile
+  * @return
+  */
+  public static NodeTask getNodeTask(String hostName, ServiceResourceProfile 
profile) {
+    NodeTask task = new NodeTask(profile, new LikeConstraint(hostName, 
"host-[0-9]*.example.com"));
+    task.setHostname(hostName);
+    task.setTaskPrefix("nm");
+    task.setSlaveId(SlaveID.newBuilder().setValue(profile.getName() + "-" + 
hostName).build());
+    
task.setExecutorInfo(ExecutorInfo.newBuilder().setExecutorId(ExecutorID.newBuilder().setValue("exec")).
+         
setCommand(org.apache.mesos.Protos.CommandInfo.newBuilder().setValue("command")).build());
+    return task;
+  }  
+  
+  public static RMNode getRMNode(String host, int port, Resource resource) {
+    NodeId id = NodeId.newInstance(host, port);
+    RMContext context = new MockRMContext();
+    return new RMNodeImpl(id, context, id.getHost(), id.getPort(), 
id.getPort(), new NodeBase(host, "/tmp"), resource, "version-one");
+  }
+
+  public static RMNode getRMNode(String host, int port, int memory, int cores) 
{
+    Resource resource = Resource.newInstance(memory, cores);
+    return getRMNode(host, port, resource);
+  }
+  
+  public static Dispatcher getMockDispatcher() {
+    return new MockDispatcher();
+  }
+  
+  public static SchedulerState getSchedulerState(MyriadConfiguration cfg, 
String baseDir) throws Exception {
+    MyriadStateStore store = TestObjectFactory.getStateStore(new 
Configuration(), baseDir);
+    SchedulerState state = new SchedulerState(store);
     
state.setFrameworkId(FrameworkID.newBuilder().setValue("mock-framework").build());
     return state;  
   }
 
-  public static FileSystemRMStateStore getRMStateStore(Configuration conf) 
throws Exception {
-    FileSystemRMStateStore store = new MyriadFileSystemRMStateStore();
-    conf.set("yarn.resourcemanager.fs.state-store.uri", "/tmp");
-    store.initInternal(conf);
-    return store;
+  public static MyriadFairScheduler getMyriadFairScheduler(RMContext context) {
+    MyriadFairScheduler scheduler = new MyriadFairScheduler();
+    scheduler.setRMContext(context); 
+    return scheduler;
   }
-
+  
+  public static SchedulerNode getSchedulerNode(String host, int port, int 
cores, int memory) {
+    RMNode node = TestObjectFactory.getRMNode(host, port, cores, memory);
+    return new FiCaSchedulerNode(node, false);
+  }
+  
+  public static MyriadFairScheduler getYarnFairScheduler() {
+    RMContext context = new MockRMContext();
+    return getMyriadFairScheduler(context);
+  }  
+  
+  public static MyriadDriverManager getMyriadDriverManager(MyriadDriver 
driver) {
+    return new MyriadDriverManager(driver);
+  }
+  
   public static MyriadDriverManager getMyriadDriverManager() {
-    return new MyriadDriverManager(new MyriadDriver(new 
MockSchedulerDriver()));
+    return getMyriadDriverManager(new MyriadDriver(new MockSchedulerDriver()));
+  }
+  
+  public static MyriadDriver getMyriadDriver(SchedulerDriver driver) {
+    return new MyriadDriver(driver);
   }
 
   public static InterceptorRegistry getInterceptorRegistry() {
@@ -77,7 +251,7 @@ public class TestObjectFactory {
     return scheduler;
   }
 
-  private static Server getJettyServer() {
+  public static Server getJettyServer() {
     Server server = new Server();
     ServletHandler context = new ServletHandler();
     ServletHolder holder = new ServletHolder(DefaultServlet.class);
@@ -97,76 +271,24 @@ public class TestObjectFactory {
     return new MyriadWebServer(server, connector, new GuiceFilter());
   }
   
-  public static MyriadFileSystemRMStateStore getStateStore(Configuration conf, 
boolean loadState) throws Exception {
-    conf.set("yarn.resourcemanager.fs.state-store.uri", "file:///tmp/");
+  public static MyriadFileSystemRMStateStore getStateStore(Configuration conf, 
String baseDir) throws Exception {
+    conf.set("yarn.resourcemanager.fs.state-store.uri", "file:///" + baseDir);
     MyriadFileSystemRMStateStore store = new MyriadFileSystemRMStateStore();
     store.init(conf);
     store.start();
-    if (loadState) {
-      store.loadState(); 
-    }
+    store.loadState();   
     store.setRMDispatcher(new MockDispatcher());
     return store;
   }
-
-  public static Offer getOffer(String host, String slaveId, String 
frameworkId, String offerId) {
+  
+  public static Offer getOffer(String host, String slaveId, String 
frameworkId, String offerId, double cpuCores, double memory) {
     Protos.SlaveID sid = SlaveID.newBuilder().setValue(slaveId).build();
     Protos.FrameworkID fid = 
FrameworkID.newBuilder().setValue(frameworkId).build();
-    return 
Protos.Offer.newBuilder().setHostname(host).setId(OfferID.newBuilder().setValue(offerId)).setSlaveId(sid).setFrameworkId(fid).build();
  
-  }
-
-  public static RMContext getRMContext(Configuration conf) throws Exception {
-    conf.set("yarn.resourcemanager.fs.state-store.uri", "file:///tmp/");
-    MockRMContext context = null;
-    Dispatcher dispatcher = new MockDispatcher();
-
-    RMApplicationHistoryWriter rmApplicationHistoryWriter = new 
RMApplicationHistoryWriter(); 
-    AMLivelinessMonitor amLivelinessMonitor = new 
AMLivelinessMonitor(dispatcher);
-    AMLivelinessMonitor amFinishingMonitor = new 
AMLivelinessMonitor(dispatcher);    
-    RMDelegationTokenSecretManager delegationTokenSecretManager = new 
RMDelegationTokenSecretManager(1, 1, 1, 1, context);
-
-    context = new MockRMContext();
-    context.setStateStore(TestObjectFactory.getStateStore(conf, false));
-    context.setAmLivelinessMonitor(amLivelinessMonitor);
-    context.setAmFinishingMonitor(amFinishingMonitor);
-    context.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
-    context.setRMDelegationTokenSecretManager(delegationTokenSecretManager);
-    return context;
-  }
-  
-  public static MyriadOperations getMyriadOperations(MyriadConfiguration cfg) 
throws Exception {
-    AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> scheduler = 
TestObjectFactory.getYarnScheduler();
-    SchedulerState sState = TestObjectFactory.getSchedulerState(cfg);
-    
sState.setFrameworkId(FrameworkID.newBuilder().setValue("mock-framework").build());
-
-    MyriadDriverManager manager = TestObjectFactory.getMyriadDriverManager();
-    MyriadWebServer webServer = TestObjectFactory.getMyriadWebServer(cfg);
-    CompositeInterceptor registry = new CompositeInterceptor();
-    LeastAMNodesFirstPolicy policy = new LeastAMNodesFirstPolicy(registry, 
scheduler, sState);
-    return new MyriadOperations(cfg, sState, policy, manager, webServer, 
TestObjectFactory.getRMContext(new Configuration()));
-  }
-  
-  public static SchedulerNode getSchedulerNode(NodeId nodeId, int vCores, int 
memory) {
-    RMNode node = getMockRMNode(nodeId, vCores, memory);
-    return new FiCaSchedulerNode(node, true);
-  }
-
-  public static RMNode getMockRMNode(NodeId nodeId, int vCores, int memory) {
-    MockRMNode node = new MockRMNode(nodeId, NodeState.NEW, new 
NodeBase("/tmp"));
-    node.setCommandPort(8041);
-    node.setHostName("0.0.0.0");
-    node.setHttpPort(8042);
-    node.setRackName("r01n07");
-    node.setHttpAddress("localhost:8042");
-    node.setTotalCapability(getResource(vCores, memory));
- 
-    return node;
-  }
-
-  public static Resource getResource(int vCores, int memory) {
-    Resource resource = new ResourcePBImpl();
-    resource.setVirtualCores(vCores);
-    resource.setMemory(memory);
-    return resource;
+    Protos.Value.Scalar cores = 
Protos.Value.Scalar.newBuilder().setValue(cpuCores).build();
+    Protos.Value.Scalar mem = 
Protos.Value.Scalar.newBuilder().setValue(memory).build();
+    Protos.Resource cpuResource = 
Protos.Resource.newBuilder().setName("cpus").setScalar(cores).setType(Type.SCALAR).build();
+    Protos.Resource memResource  = 
Protos.Resource.newBuilder().setName("mem").setScalar(mem).setType(Type.SCALAR).build();
+    return 
Protos.Offer.newBuilder().setHostname(host).setId(OfferID.newBuilder().setValue(offerId)).
+        
setSlaveId(sid).setFrameworkId(fid).addResources(cpuResource).addResources(memResource).build();
  
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/api/ArtifactsResourceTest.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/test/java/org/apache/myriad/api/ArtifactsResourceTest.java
 
b/myriad-scheduler/src/test/java/org/apache/myriad/api/ArtifactsResourceTest.java
index 5d7bb75..b30e2a7 100644
--- 
a/myriad-scheduler/src/test/java/org/apache/myriad/api/ArtifactsResourceTest.java
+++ 
b/myriad-scheduler/src/test/java/org/apache/myriad/api/ArtifactsResourceTest.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.myriad.api;
 
 import static org.junit.Assert.assertEquals;

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java
 
b/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java
index e0eda0f..7965f02 100644
--- 
a/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java
+++ 
b/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.myriad.api;
 
 import static org.junit.Assert.assertNotNull;

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/configuration/MyriadConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/test/java/org/apache/myriad/configuration/MyriadConfigurationTest.java
 
b/myriad-scheduler/src/test/java/org/apache/myriad/configuration/MyriadConfigurationTest.java
index 562d128..9aef9a9 100644
--- 
a/myriad-scheduler/src/test/java/org/apache/myriad/configuration/MyriadConfigurationTest.java
+++ 
b/myriad-scheduler/src/test/java/org/apache/myriad/configuration/MyriadConfigurationTest.java
@@ -89,8 +89,9 @@ public class MyriadConfigurationTest extends 
BaseConfigurableTest {
     NodeManagerConfiguration config = cfg.getNodeManagerConfiguration();
 
     assertFalse(config.getCgroups());
-    assertEquals(new Double(0.2), config.getCpus());
-    assertEquals(new Double(1024.0), config.getJvmMaxMemoryMB());
+    assertEquals(new Double(0.8), config.getCpus());
+    assertEquals(new Double(2048.0), config.getJvmMaxMemoryMB());
+    assertEquals(new Double(4.0), config.getMaxCpus());
   }
   
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/health/HealthCheckUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/test/java/org/apache/myriad/health/HealthCheckUtilsTest.java
 
b/myriad-scheduler/src/test/java/org/apache/myriad/health/HealthCheckUtilsTest.java
index e403f90..4adc6e5 100644
--- 
a/myriad-scheduler/src/test/java/org/apache/myriad/health/HealthCheckUtilsTest.java
+++ 
b/myriad-scheduler/src/test/java/org/apache/myriad/health/HealthCheckUtilsTest.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.myriad.health;
 
 import java.net.ServerSocket;

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/health/MesosDriverHealthCheckTest.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/test/java/org/apache/myriad/health/MesosDriverHealthCheckTest.java
 
b/myriad-scheduler/src/test/java/org/apache/myriad/health/MesosDriverHealthCheckTest.java
index cebf2c7..7d1a786 100644
--- 
a/myriad-scheduler/src/test/java/org/apache/myriad/health/MesosDriverHealthCheckTest.java
+++ 
b/myriad-scheduler/src/test/java/org/apache/myriad/health/MesosDriverHealthCheckTest.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.myriad.health;
 
 import static org.junit.Assert.assertEquals;

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java
 
b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java
index 29087e7..cd4683a 100644
--- 
a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java
+++ 
b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java
@@ -19,10 +19,17 @@ package org.apache.myriad.scheduler;
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.HashMap;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import 
org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
 import org.apache.mesos.Protos.FrameworkID;
 import org.apache.myriad.BaseConfigurableTest;
 import org.apache.myriad.TestObjectFactory;
@@ -31,95 +38,99 @@ import org.apache.myriad.policy.LeastAMNodesFirstPolicy;
 import org.apache.myriad.scheduler.constraints.Constraint;
 import org.apache.myriad.scheduler.constraints.LikeConstraint;
 import org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor;
+import org.apache.myriad.state.MockDispatcher;
+import org.apache.myriad.state.MockRMContext;
 import org.apache.myriad.state.SchedulerState;
 import org.apache.myriad.webapp.MyriadWebServer;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.TreeMap;
-
 /**
  * Unit tests for MyriadOperations class
  */
 public class MyriadOperationsTest extends BaseConfigurableTest {
   ServiceResourceProfile small;
   Constraint constraint = new LikeConstraint("localhost", 
"host-[0-9]*.example.com");
-  MyriadWebServer webServer;
 
-  private SchedulerState getSchedulerState() throws Exception {
-    SchedulerState state = TestObjectFactory.getSchedulerState(this.cfg);
-    
state.setFrameworkId(FrameworkID.newBuilder().setValue("mock-framework").build());
-    return state;
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    this.baseStateStoreDirectory = "/tmp/myriad-operations-test";
+    generateProfiles();
   }
-  
-  private MyriadOperations getMyriadOperations(SchedulerState state) throws 
Exception {
-    MyriadDriverManager manager = TestObjectFactory.getMyriadDriverManager();
 
+  private MyriadOperations initialize() throws Exception {
+    resetStoreState();
+    SchedulerState sState = TestObjectFactory.getSchedulerState(cfg, 
"tmp/myriad-operations-test");
+    
sState.setFrameworkId(FrameworkID.newBuilder().setValue("mock-framework").build());
     AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> scheduler = 
TestObjectFactory.getYarnScheduler();
+    MyriadDriverManager manager = TestObjectFactory.getMyriadDriverManager();
+    MyriadWebServer webServer = TestObjectFactory.getMyriadWebServer(cfg);
     CompositeInterceptor registry = new CompositeInterceptor();
-    LeastAMNodesFirstPolicy policy = new LeastAMNodesFirstPolicy(registry, 
scheduler, state);
+    LeastAMNodesFirstPolicy policy = new LeastAMNodesFirstPolicy(registry, 
scheduler, sState);
+
     manager.startDriver();
 
-    return new MyriadOperations(cfg, state, policy, manager, webServer, 
TestObjectFactory.getRMContext(new Configuration()));    
+    return new MyriadOperations(cfg, sState, policy, manager, webServer, 
generateRMContext(scheduler));
   }
-  
-  @Before
-  public void setUp() throws Exception {
-    super.setUp();
-    webServer = TestObjectFactory.getMyriadWebServer(cfg);
-    generateProfiles();
+  private void generateProfiles() {
+    small = new ServiceResourceProfile("small", new Double(0.1), new 
Double(512.0), new HashMap<String, Long>());
   }
 
-  private void generateProfiles() {
-    TreeMap<String, Long> ports = new TreeMap<>();
-    small = new ServiceResourceProfile("small", 0.1, 512.0, ports);
+  private RMContext generateRMContext(AbstractYarnScheduler<FiCaSchedulerApp, 
FiCaSchedulerNode> scheduler) throws Exception {
+    Configuration conf = new Configuration();
+    MockRMContext context = null;
+    Dispatcher dispatcher = new MockDispatcher();
+
+    RMApplicationHistoryWriter rmApplicationHistoryWriter = new 
RMApplicationHistoryWriter(); 
+    AMLivelinessMonitor amLivelinessMonitor = new 
AMLivelinessMonitor(dispatcher);
+    AMLivelinessMonitor amFinishingMonitor = new 
AMLivelinessMonitor(dispatcher);    
+    RMDelegationTokenSecretManager delegationTokenSecretManager = new 
RMDelegationTokenSecretManager(1, 1, 1, 1, context);
+
+    context = new MockRMContext();
+    context.setStateStore(TestObjectFactory.getStateStore(conf, 
"tmp/myriad-operations-test"));
+    context.setAmLivelinessMonitor(amLivelinessMonitor);
+    context.setAmFinishingMonitor(amFinishingMonitor);
+    context.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
+    context.setRMDelegationTokenSecretManager(delegationTokenSecretManager);
+    return context;
   }
 
   @Test 
   public void testFlexUpAndFlexDownCluster() throws Exception {
-    SchedulerState sState = this.getSchedulerState();
-    MyriadOperations ops = this.getMyriadOperations(sState);
-    assertEquals(0, sState.getPendingTaskIds().size());
+    MyriadOperations ops = initialize();
+    assertEquals(0, ops.getSchedulerState().getPendingTaskIds().size());
     ops.flexUpCluster(small, 1, constraint);
-    assertEquals(1, sState.getPendingTaskIds().size());
+    assertEquals(1, ops.getSchedulerState().getPendingTaskIds().size());
     ops.flexDownCluster(small, constraint, 1);
-    assertEquals(0, sState.getPendingTaskIds().size());
+    assertEquals(0, ops.getSchedulerState().getPendingTaskIds().size());
   }
-  
+
   @Test
   public void testFlexUpAndFlexDownService() throws Exception {
-    SchedulerState sState = this.getSchedulerState();
-    MyriadOperations ops = this.getMyriadOperations(sState);
+    MyriadOperations ops = initialize();
     ops.flexUpAService(1, "jobhistory");
-    assertEquals(1, sState.getPendingTasksByType("jobhistory").size());
+    assertEquals(1, 
ops.getSchedulerState().getPendingTasksByType("jobhistory").size());
     ops.flexDownAService(1, "jobhistory");
-    assertEquals(0, sState.getPendingTasksByType("jobhistory").size());
+    assertEquals(0, 
ops.getSchedulerState().getPendingTasksByType("jobhistory").size());
   }
 
   @Test(expected = MyriadBadConfigurationException.class)
   public void testFlexUpAServiceOverMaxInstances() throws Exception {
-    SchedulerState sState = this.getSchedulerState();
-    MyriadOperations ops = this.getMyriadOperations(sState);
-    /*
-     * There is 1 jobhhistory task loaded from configuration file, so flexing 
up
-     * by two should result in MyriadBadConfigurationException
-     */
-    ops.flexUpAService(2, "jobhistory");
+    MyriadOperations ops = initialize();
+    ops.flexUpAService(3, "jobhistory");
   }
 
   @Test
   public void testGetFlexibleInstances() throws Exception {
-    SchedulerState sState = this.getSchedulerState();
-    MyriadOperations ops = this.getMyriadOperations(sState);
-    assertEquals(0, ops.getFlexibleInstances("jobhistory").intValue());
+    MyriadOperations ops = initialize();
     ops.flexUpAService(1, "jobhistory");
     assertEquals(1, ops.getFlexibleInstances("jobhistory").intValue());
   }
 
   @Test
   public void testShutdownCluster() throws Exception {
-    SchedulerState sState = this.getSchedulerState();
-    MyriadOperations ops  = this.getMyriadOperations(sState);
+    MyriadOperations ops = initialize();
     ops.shutdownFramework();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsSpec.groovy
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsSpec.groovy
 
b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsSpec.groovy
deleted file mode 100644
index 6555f04..0000000
--- 
a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsSpec.groovy
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.myriad.scheduler
-
-import org.apache.mesos.Protos
-import org.apache.myriad.configuration.NodeManagerConfiguration
-import org.apache.myriad.state.NodeTask
-import org.apache.myriad.state.SchedulerState
-import spock.lang.Specification
-
-/**
- *
- * @author kensipe
- */
-class SchedulerUtilsSpec extends Specification {
-
-    def "is unique host name"() {
-        given:
-        def offer = Mock(Protos.OfferOrBuilder)
-        offer.getHostname() >> "hostname"
-
-        expect:
-        returnValue == SchedulerUtils.isUniqueHostname(offer, launchTask, 
tasks)
-
-        where:
-        tasks                                              | launchTask        
             | returnValue
-        []                                                 | null              
             | true
-        null                                               | null              
             | true
-        createNodeTaskList("hostname")                     | 
createNodeTask("hostname")     | false
-        createNodeTaskList("missinghost")                  | 
createNodeTask("hostname")     | true
-        createNodeTaskList("missinghost1", "missinghost2") | 
createNodeTask("missinghost3") | true
-        createNodeTaskList("missinghost1", "hostname")     | 
createNodeTask("hostname")     | false
-
-    }
-
-    def "is eligible for Fine Grained Scaling"() {
-        given:
-        def state = Mock(SchedulerState)
-        def tasks = []
-        def fgsNMTask = new NodeTask(new ExtendedResourceProfile(new 
NMProfile("zero", 0, 0), 1.0, 2.0, new HashMap<String, Long>()), null)
-        def cgsNMTask = new NodeTask(new ExtendedResourceProfile(new 
NMProfile("low", 2, 4096), 1.0, 2.0, new HashMap<String, Long>()), null)
-        fgsNMTask.setHostname("test_fgs_hostname")
-        cgsNMTask.setHostname("test_cgs_hostname")
-        tasks << fgsNMTask << cgsNMTask
-        state.getActiveTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX) >> 
tasks
-
-        expect:
-        returnValue == 
SchedulerUtils.isEligibleForFineGrainedScaling(hostName, state)
-
-        where:
-        hostName            | returnValue
-        "test_fgs_hostname" | true
-        "test_cgs_hostname" | false
-        "blah"              | false
-        ""                  | false
-        null                | false
-    }
-
-    ArrayList<NodeTask> createNodeTaskList(String... hostnames) {
-        def list = []
-        hostnames.each { hostname ->
-            list << createNodeTask(hostname)
-        }
-        return list
-    }
-
-
-    NodeTask createNodeTask(String hostname) {
-        def node = new NodeTask(new ExtendedResourceProfile(new NMProfile("", 
1, 1), 1.0, 1.0, new HashMap<String, Long>()), null)
-        node.hostname = hostname
-        node.taskPrefix = "nm"
-        node
-    }
-}


Reply via email to