Refactor which addresses Myriad 213, 214, and 136 in the process.
-Refactored ExecutorCommandLineGenerator classes to use this class (resolves 
Myriad-214 in the process).
-Refactor TackFactory classes as necessary to work with this. This adds a 
removes several methods to TaskUtils to get port Resources.
-Created a ResourceOfferContainer moving the work of checking offers and 
creating the resources from them out of the ResourceOfferEventHandler and 
TaskFactory Classes.
-Remove NMPorts and Ports classes.
JIRA:
        [MYRIAD-136] https://issues.apache.org/jira/browse/MYRIAD-136
        [MYRIAD-213] https://issues.apache.org/jira/browse/MYRIAD-213
        [MYRIAD-214] https://issues.apache.org/jira/browse/MYRIAD-214
Pull Request:
  Closes #79


Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/7aea259c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/7aea259c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/7aea259c

Branch: refs/heads/master
Commit: 7aea259cf231655bdbd033f61584eec7715fc97f
Parents: df4cbc0
Author: DarinJ <dar...@apache.org>
Authored: Thu Jun 9 10:08:46 2016 -0400
Committer: darinj <dar...@apache.org>
Committed: Fri Aug 12 09:27:14 2016 -0400

----------------------------------------------------------------------
 build.gradle                                    |   4 +
 .../myriad/executor/MyriadExecutorDefaults.java |  23 --
 .../src/main/java/org/apache/myriad/Main.java   |  47 ++-
 .../java/org/apache/myriad/MyriadModule.java    |  29 +-
 .../configuration/MyriadConfiguration.java      |  15 +-
 .../configuration/NodeManagerConfiguration.java |  25 +-
 .../configuration/ServiceConfiguration.java     |   6 +-
 .../scheduler/DownloadNMExecutorCLGenImpl.java  |  90 ------
 .../scheduler/ExecutorCommandLineGenerator.java | 148 ++++++++-
 .../scheduler/ExtendedResourceProfile.java      |  27 +-
 .../myriad/scheduler/MyriadOperations.java      |   2 +-
 .../myriad/scheduler/NMExecutorCLGenImpl.java   | 176 -----------
 .../NMExecutorCommandLineGenerator.java         | 150 +++++++++
 .../org/apache/myriad/scheduler/NMPorts.java    |  78 -----
 .../apache/myriad/scheduler/NMTaskFactory.java  |  74 +++++
 .../java/org/apache/myriad/scheduler/Ports.java |  26 --
 .../scheduler/ServiceCommandLineGenerator.java  |  86 +++++-
 .../scheduler/ServiceResourceProfile.java       |  30 +-
 .../scheduler/ServiceTaskConstraints.java       |  49 ---
 .../myriad/scheduler/ServiceTaskFactory.java    |  76 +++++
 .../scheduler/ServiceTaskFactoryImpl.java       | 253 ---------------
 .../myriad/scheduler/TaskConstraints.java       |  35 ---
 .../scheduler/TaskConstraintsManager.java       |  48 ---
 .../apache/myriad/scheduler/TaskFactory.java    | 304 +++++++------------
 .../org/apache/myriad/scheduler/TaskUtils.java  |  86 +-----
 .../handlers/ResourceOffersEventHandler.java    | 175 ++---------
 .../scheduler/resource/RangeResource.java       | 218 +++++++++++++
 .../resource/ResourceOfferContainer.java        | 207 +++++++++++++
 .../scheduler/resource/ScalarResource.java      |  88 ++++++
 .../scheduler/yarn/MyriadFairScheduler.java     |   1 -
 .../org/apache/myriad/MyriadTestModule.java     |  14 +-
 .../myriad/api/SchedulerStateResourceTest.java  |   9 +-
 .../myriad/scheduler/MockSchedulerDriver.java   |  17 ++
 .../myriad/scheduler/MyriadDriverTest.java      |  17 ++
 .../myriad/scheduler/MyriadOperationsTest.java  |  22 +-
 .../myriad/scheduler/NMProfileManagerTest.java  |  17 ++
 .../myriad/scheduler/SchedulerUtilsSpec.groovy  |   6 +-
 .../scheduler/ServiceResourceProfileTest.java   |  24 +-
 .../myriad/scheduler/TMSTaskFactoryImpl.java    |   9 +-
 .../scheduler/TaskConstraintsManagerTest.java   |  32 --
 .../myriad/scheduler/TestNMTaskFactory.java     |  72 +++++
 .../myriad/scheduler/TestRandomPorts.java       | 203 -------------
 .../scheduler/TestServiceCommandLine.java       |  97 ++++--
 .../scheduler/TestServiceTaskFactory.java       |  77 +++++
 .../apache/myriad/scheduler/TestTaskUtils.java  |  37 +--
 .../fgs/YarnNodeCapacityManagerTest.java        |   4 +-
 .../myriad/scheduler/offer/OfferBuilder.java    | 117 +++++++
 .../resource/TestResourceOfferContainer.java    | 166 ++++++++++
 .../org/apache/myriad/state/ClusterTest.java    |   9 +-
 .../org/apache/myriad/state/NodeTaskTest.java   |   9 +-
 .../apache/myriad/state/SchedulerStateTest.java |   5 +-
 .../state/utils/ByteBufferSupportTest.java      |   5 +-
 ...iad-config-test-default-with-docker-info.yml |   1 +
 .../resources/myriad-config-test-default.yml    |  32 +-
 54 files changed, 1928 insertions(+), 1649 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 55e8d85..b48cfc9 100644
--- a/build.gradle
+++ b/build.gradle
@@ -21,6 +21,10 @@ allprojects {
     apply plugin: 'eclipse'
 }
 
+tasks.withType(JavaCompile) {
+  options.compilerArgs << "-Xlint:unchecked" << "-Werror"
+}
+
 buildscript {
     repositories {
         jcenter()

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-commons/src/main/java/org/apache/myriad/executor/MyriadExecutorDefaults.java
----------------------------------------------------------------------
diff --git 
a/myriad-commons/src/main/java/org/apache/myriad/executor/MyriadExecutorDefaults.java
 
b/myriad-commons/src/main/java/org/apache/myriad/executor/MyriadExecutorDefaults.java
index c7e4515..250c812 100644
--- 
a/myriad-commons/src/main/java/org/apache/myriad/executor/MyriadExecutorDefaults.java
+++ 
b/myriad-commons/src/main/java/org/apache/myriad/executor/MyriadExecutorDefaults.java
@@ -36,29 +36,6 @@ public class MyriadExecutorDefaults {
       "org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor";
 
   /**
-   * YARN class to help handle LCE resources
-   */
-  public static final String KEY_YARN_NM_LCE_RH_CLASS = 
"yarn.nodemanager.linux-container-executor.resources-handler.class";
-
-  public static final String VAL_YARN_NM_LCE_RH_CLASS = 
"org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler";
-
-  public static final String KEY_YARN_NM_LCE_CGROUPS_HIERARCHY = 
"yarn.nodemanager.linux-container-executor.cgroups.hierarchy";
-
-  public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT = 
"yarn.nodemanager.linux-container-executor.cgroups.mount";
-
-  public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH = 
"yarn.nodemanager.linux-container-executor.cgroups.mount-path";
-
-  public static final String KEY_YARN_NM_LCE_GROUP = 
"yarn.nodemanager.linux-container-executor.group";
-
-  public static final String KEY_YARN_NM_LCE_PATH = 
"yarn.nodemanager.linux-container-executor.path";
-
-  public static final String KEY_YARN_HOME = "yarn.home";
-
-  public static final String KEY_NM_RESOURCE_CPU_VCORES = 
"nodemanager.resource.cpu-vcores";
-
-  public static final String KEY_NM_RESOURCE_MEM_MB = 
"nodemanager.resource.memory-mb";
-
-  /**
    * Allot 10% more memory to account for JVM overhead.
    */
   public static final double JVM_OVERHEAD = 0.1;

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/Main.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/Main.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/Main.java
index 0615ebd..8c028f1 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/Main.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/Main.java
@@ -18,13 +18,17 @@
  */
 package org.apache.myriad;
 
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.health.HealthCheckRegistry;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.collections.MapUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -43,9 +47,6 @@ import org.apache.myriad.scheduler.NMProfile;
 import org.apache.myriad.scheduler.Rebalancer;
 import org.apache.myriad.scheduler.ServiceProfileManager;
 import org.apache.myriad.scheduler.ServiceResourceProfile;
-import org.apache.myriad.scheduler.ServiceTaskConstraints;
-import org.apache.myriad.scheduler.TaskConstraintsManager;
-import org.apache.myriad.scheduler.TaskFactory;
 import org.apache.myriad.scheduler.TaskTerminator;
 import org.apache.myriad.scheduler.TaskUtils;
 import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
@@ -55,21 +56,15 @@ import org.apache.myriad.webapp.WebAppGuiceModule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.codahale.metrics.JmxReporter;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.health.HealthCheckRegistry;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-
 /**
  * Main is the bootstrap class for the Myriad scheduler, managing the 
lifecycles of
  * the following components:
- * 
+ *
  *  1. MyriadDriverManager
  *  2. MyriadWebServer
  *  3. TaskTerminator
  *  4. HealthCheckRegistry
- *  
+ *
  *  Main uses the Guice Injector framework to manage the Myriad object graph 
and is
  *  configured by myriad-config-default.yml
  */
@@ -87,36 +82,36 @@ public class Main {
   /**
   * Main is the bootstrap class for the Myriad scheduler, managing the 
lifecycles of
   * the following components:
-  * 
+  *
   *  1. MyriadDriverManager
   *  2. MyriadWebServer
   *  3. TaskTerminator
   *  4. HealthCheckRegistry
-  *  
+  *
   *  Main uses the Guice Injector framework to manage the Myriad object graph 
and is
   *  configured by myriad-config-default.yml
   */
   public static void initialize(Configuration hadoopConf, 
AbstractYarnScheduler yarnScheduler, RMContext rmContext,
-                                InterceptorRegistry registry) throws Exception 
{       
+                                InterceptorRegistry registry) throws Exception 
{
     MyriadModule myriadModule = new MyriadModule("myriad-config-default.yml", 
hadoopConf, yarnScheduler, rmContext, registry);
     MesosModule mesosModule = new MesosModule();
     injector = Guice.createInjector(myriadModule, mesosModule, new 
WebAppGuiceModule());
 
     new Main().run(injector.getInstance(MyriadConfiguration.class));
   }
-  
+
   // TODO (Kannan Rajah) Hack to get injector in unit test.
   public static Injector getInjector() {
     return injector;
   }
 
-  /** 
+  /**
    * Initializes the Myriad object graph via MyriadConfiguration and starts
    * the Mesos interface (MyriadDriverManager) as well as all Myriad services
-   * 
+   *
    *@param cfg MyriadConfiguration
    * @throws Exception
-   */  
+   */
   public void run(MyriadConfiguration cfg) throws Exception {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("Bindings: " + injector.getAllBindings());
@@ -171,8 +166,6 @@ public class Main {
   private void initProfiles(Injector injector) {
     LOGGER.info("Initializing Profiles");
     ServiceProfileManager profileManager = 
injector.getInstance(ServiceProfileManager.class);
-    TaskConstraintsManager taskConstraintsManager = 
injector.getInstance(TaskConstraintsManager.class);
-    
taskConstraintsManager.addTaskConstraints(NodeManagerConfiguration.NM_TASK_PREFIX,
 new TaskFactory.NMTaskConstraints());
     Map<String, Map<String, String>> profiles = 
injector.getInstance(MyriadConfiguration.class).getProfiles();
     TaskUtils taskUtils = injector.getInstance(TaskUtils.class);
     if (MapUtils.isNotEmpty(profiles)) {
@@ -181,10 +174,8 @@ public class Main {
         if (MapUtils.isNotEmpty(profiles) && 
profileResourceMap.containsKey("cpu") && profileResourceMap.containsKey("mem")) 
{
           Long cpu = Long.parseLong(profileResourceMap.get("cpu"));
           Long mem = Long.parseLong(profileResourceMap.get("mem"));
-
-          ServiceResourceProfile serviceProfile = new 
ExtendedResourceProfile(new NMProfile(profile.getKey(), cpu, mem), 
taskUtils.getExecutorCpus(), taskUtils.getExecutorMemory(),
-              taskUtils.getNodeManagerCpus(), 
taskUtils.getNodeManagerMemory());
-
+          ServiceResourceProfile serviceProfile = new 
ExtendedResourceProfile(new NMProfile(profile.getKey(), cpu, mem),
+              taskUtils.getNodeManagerCpus(), 
taskUtils.getNodeManagerMemory(), taskUtils.getNodeManagerPorts());
           profileManager.add(serviceProfile);
         } else {
           LOGGER.error("Invalid definition for profile: " + profile.getKey());
@@ -259,7 +250,6 @@ public class Main {
   private void initServiceConfigurations(MyriadConfiguration cfg, Injector 
injector) {
     LOGGER.info("Initializing initServiceConfigurations");
     ServiceProfileManager profileManager = 
injector.getInstance(ServiceProfileManager.class);
-    TaskConstraintsManager taskConstraintsManager = 
injector.getInstance(TaskConstraintsManager.class);
 
     Map<String, ServiceConfiguration> servicesConfigs = 
injector.getInstance(MyriadConfiguration.class).getServiceConfigurations();
  
@@ -268,9 +258,8 @@ public class Main {
       ServiceConfiguration config = entry.getValue();
       final Double cpu = config.getCpus();
       final Double mem = config.getJvmMaxMemoryMB();
-
-      profileManager.add(new ServiceResourceProfile(taskPrefix, cpu, mem));
-      taskConstraintsManager.addTaskConstraints(taskPrefix, new 
ServiceTaskConstraints(cfg, taskPrefix));
+      final Map<String, Long> ports = config.getPorts();
+      profileManager.add(new ServiceResourceProfile(taskPrefix, cpu, mem, 
ports));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java
index 8748dcb..bb560a4 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java
@@ -32,22 +32,19 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.myriad.configuration.MyriadConfiguration;
-import org.apache.myriad.configuration.MyriadExecutorConfiguration;
 import org.apache.myriad.configuration.NodeManagerConfiguration;
 import org.apache.myriad.configuration.ServiceConfiguration;
 import org.apache.myriad.policy.LeastAMNodesFirstPolicy;
 import org.apache.myriad.policy.NodeScaleDownPolicy;
-import org.apache.myriad.scheduler.DownloadNMExecutorCLGenImpl;
 import org.apache.myriad.scheduler.ExecutorCommandLineGenerator;
 import org.apache.myriad.scheduler.MyriadDriverManager;
-import org.apache.myriad.scheduler.NMExecutorCLGenImpl;
+import org.apache.myriad.scheduler.NMExecutorCommandLineGenerator;
 import org.apache.myriad.scheduler.NMTaskFactoryAnnotation;
 import org.apache.myriad.scheduler.ReconcileService;
 import org.apache.myriad.scheduler.ServiceProfileManager;
-import org.apache.myriad.scheduler.ServiceTaskFactoryImpl;
-import org.apache.myriad.scheduler.TaskConstraintsManager;
+import org.apache.myriad.scheduler.ServiceTaskFactory;
 import org.apache.myriad.scheduler.TaskFactory;
-import org.apache.myriad.scheduler.TaskFactory.NMTaskFactoryImpl;
+import org.apache.myriad.scheduler.NMTaskFactory;
 import org.apache.myriad.scheduler.fgs.NMHeartBeatHandler;
 import org.apache.myriad.scheduler.fgs.NodeStore;
 import org.apache.myriad.scheduler.fgs.OfferLifecycleManager;
@@ -96,17 +93,16 @@ public class MyriadModule extends AbstractModule {
     bind(ReconcileService.class).in(Scopes.SINGLETON);
     bind(HttpConnectorProvider.class).in(Scopes.SINGLETON);
     bind(MyriadWebServer.class).in(Scopes.SINGLETON);
-    bind(TaskConstraintsManager.class).in(Scopes.SINGLETON);
-    // add special binding between TaskFactory and NMTaskFactoryImpl to ease up
+    // add special binding between TaskFactory and NMTaskFactory to ease up
     // usage of TaskFactory
-    
bind(TaskFactory.class).annotatedWith(NMTaskFactoryAnnotation.class).to(NMTaskFactoryImpl.class);
+    
bind(TaskFactory.class).annotatedWith(NMTaskFactoryAnnotation.class).to(NMTaskFactory.class);
     bind(YarnNodeCapacityManager.class).in(Scopes.SINGLETON);
     bind(NodeStore.class).in(Scopes.SINGLETON);
     bind(OfferLifecycleManager.class).in(Scopes.SINGLETON);
     bind(NMHeartBeatHandler.class).asEagerSingleton();
 
     MapBinder<String, TaskFactory> mapBinder = 
MapBinder.newMapBinder(binder(), String.class, TaskFactory.class);
-    
mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactoryImpl.class).in(Scopes.SINGLETON);
+    
mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactory.class).in(Scopes.SINGLETON);
 
     Map<String, ServiceConfiguration> auxServicesConfigs = 
cfg.getServiceConfigurations();
     for (Map.Entry<String, ServiceConfiguration> entry : 
auxServicesConfigs.entrySet()) {
@@ -119,8 +115,8 @@ public class MyriadModule extends AbstractModule {
           LOGGER.error("ClassNotFoundException", e);
         }
       } else {
-        //TODO (kjyost) Confirm if this else statement and logic should still 
be here
-        
mapBinder.addBinding(entry.getKey()).to(ServiceTaskFactoryImpl.class).in(Scopes.SINGLETON);
+        //TODO (hokiegeek2) Confirm if this else statement and logic should 
still be here
+        
mapBinder.addBinding(entry.getKey()).to(ServiceTaskFactory.class).in(Scopes.SINGLETON);
       }
     }
 
@@ -163,14 +159,7 @@ public class MyriadModule extends AbstractModule {
   @Provides
   @Singleton
   ExecutorCommandLineGenerator providesCLIGenerator(MyriadConfiguration cfg) {
-    ExecutorCommandLineGenerator cliGenerator = null;
-    MyriadExecutorConfiguration myriadExecutorConfiguration = 
cfg.getMyriadExecutorConfiguration();
-    if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
-      cliGenerator = new DownloadNMExecutorCLGenImpl(cfg, 
myriadExecutorConfiguration.getNodeManagerUri().get());
-    } else {
-      cliGenerator = new NMExecutorCLGenImpl(cfg);
-    }
-    return cliGenerator;
+    return new NMExecutorCommandLineGenerator(cfg);
   }
   
   protected MyriadConfiguration generateMyriadConfiguration(String configFile) 
{

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java
index fa8dca2..8a6e5f3 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java
@@ -110,7 +110,12 @@ public class MyriadConfiguration {
   public static final String DEFAULT_ZK_SERVERS = "localhost:2181";
   
   public static final String DEFAULT_CGROUP_PATH = "/sys/fs/cgroup";
-  
+
+  /**
+   * By default ha is turned off.
+   */
+  public static final Boolean DEFAULT_CGROUPS_ENABLED = false;
+
   public static final Map<String, ServiceConfiguration> 
EMPTY_SERVICE_CONFIGURATION = Collections.emptyMap();
 
   @JsonProperty
@@ -194,6 +199,8 @@ public class MyriadConfiguration {
   @JsonProperty
   private String cgroupPath;
 
+  @JsonProperty
+  private Boolean cgroupEnabled;
 
   public MyriadConfiguration() {
   }
@@ -301,4 +308,8 @@ public class MyriadConfiguration {
   public String getCGroupPath() {
     return Optional.fromNullable(cgroupPath).or(DEFAULT_CGROUP_PATH);
   }
-}
\ No newline at end of file
+
+  public Boolean isCgroupEnabled() {
+    return Optional.fromNullable(cgroupEnabled).or(DEFAULT_CGROUPS_ENABLED);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
index e60a5d5..56ea43d 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java
@@ -21,10 +21,18 @@ package org.apache.myriad.configuration;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Optional;
 
+import java.util.Map;
+import java.util.TreeMap;
+
 /**
  * YARN NodeManager Configuration
  */
 public class NodeManagerConfiguration {
+
+  public static final String KEY_NM_ADDRESS = 
"myriad.yarn.nodemanager.address";
+  public static final String KEY_NM_LOCALIZER_ADDRESS = 
"myriad.yarn.nodemanager.localizer.address";
+  public static final String KEY_NM_WEBAPP_ADDRESS = 
"myriad.yarn.nodemanager.webapp.address";
+  public static final String KEY_NM_SHUFFLE_PORT = 
"myriad.mapreduce.shuffle.port";
   /**
    * Allot 10% more memory to account for JVM overhead.
    */
@@ -68,7 +76,10 @@ public class NodeManagerConfiguration {
    */
   @JsonProperty
   private String jvmOpts;
-  
+
+  @JsonProperty
+  private Map<String, Long> ports;
+
   /**
    * Determines if cgroups are enabled for the NodeManager
    */
@@ -91,6 +102,18 @@ public class NodeManagerConfiguration {
     return Optional.fromNullable(cpus).or(DEFAULT_NM_CPUS);
   }
 
+  public synchronized Map<String, Long> getPorts() {
+    if (ports == null) {
+      //Good idea to have deterministic order
+      ports = new TreeMap<>();
+      ports.put(KEY_NM_ADDRESS, 0L);
+      ports.put(KEY_NM_WEBAPP_ADDRESS, 0L);
+      ports.put(KEY_NM_LOCALIZER_ADDRESS, 0L);
+      ports.put(KEY_NM_SHUFFLE_PORT, 0L);
+    }
+    return ports;
+  }
+
   public boolean getCgroups() {
     return Optional.fromNullable(cgroups).or(DEFAULT_NM_CGROUPS);
   }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java
index 0f733a9..0b00bca 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java
@@ -19,6 +19,7 @@
 package org.apache.myriad.configuration;
 
 import java.util.Map;
+import java.util.TreeMap;
 
 import org.hibernate.validator.constraints.NotEmpty;
 
@@ -86,6 +87,7 @@ public class ServiceConfiguration {
   protected Integer maxInstances;
 
   @JsonProperty
+  @NotEmpty
   protected String command;
 
   @JsonProperty
@@ -123,8 +125,8 @@ public class ServiceConfiguration {
     return envSettings;
   }
 
-  public Optional<Map<String, Long>> getPorts() {
-    return Optional.fromNullable(ports);
+  public Map<String, Long> getPorts() {
+    return Optional.fromNullable(ports).or(new TreeMap<String, Long>());
   }
 
   public Optional<Integer> getMaxInstances() {

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java
deleted file mode 100644
index 74deda3..0000000
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.myriad.scheduler;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import org.apache.myriad.configuration.MyriadConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation assumes NM binaries will be downloaded
- */
-public class DownloadNMExecutorCLGenImpl extends NMExecutorCLGenImpl {
-
-  private static final Logger LOGGER = LoggerFactory.
-      getLogger(DownloadNMExecutorCLGenImpl.class);
-
-  private final String nodeManagerUri;
-
-  public DownloadNMExecutorCLGenImpl(MyriadConfiguration cfg, String 
nodeManagerUri) {
-    super(cfg);
-    this.nodeManagerUri = nodeManagerUri;
-  }
-
-  @Override
-  public String generateCommandLine(ServiceResourceProfile profile, Ports 
ports) {
-    StringBuilder cmdLine = new StringBuilder();
-    LOGGER.info("Using remote distribution");
-    generateEnvironment(profile, (NMPorts) ports);
-    appendDistroExtractionCommands(cmdLine);
-    appendCgroupsCmds(cmdLine);
-    appendYarnHomeExport(cmdLine);
-    appendUserSudo(cmdLine);
-    appendEnvForNM(cmdLine);
-    cmdLine.append(YARN_NM_CMD);
-    return cmdLine.toString();
-  }
-
-  protected void appendDistroExtractionCommands(StringBuilder cmdLine) {
-    /*
-    TODO(darinj): Overall this is messier than I'd like. We can't let mesos 
untar the distribution, since
-    it will change the permissions.  Instead we simply download the tarball 
and execute tar -xvpf. We also
-    pull the config from the resource manager and put them in the conf dir.  
This is also why we need
-    frameworkSuperUser. This will be refactored after Mesos-1790 is resolved.
-   */
-
-    //TODO(DarinJ) support other compression, as this is a temp fix for Mesos 
1760 may not get to it.
-    //Extract tarball keeping permissions, necessary to keep 
HADOOP_HOME/bin/container-executor suidbit set.
-    appendSudo(cmdLine);
-    cmdLine.append("tar -zxpf ").append(getFileName(nodeManagerUri));
-    //Place the hadoop config where in the HADOOP_CONF_DIR where it will be 
read by the NodeManager
-    //The url for the resource manager config is: http(s)://hostname:port/conf 
so fetcher.cpp downloads the
-    //config file to conf, It's an xml file with the parameters of 
yarn-site.xml, core-site.xml and hdfs.xml.
-    cmdLine.append(" && ");
-    appendSudo(cmdLine);
-    cmdLine.append(" cp conf ");
-    cmdLine.append(cfg.getYarnEnvironment().get("YARN_HOME"));
-    cmdLine.append("/etc/hadoop/yarn-site.xml;");
-  }
-
-  private static String getFileName(String uri) {
-    int lastSlash = uri.lastIndexOf('/');
-    if (lastSlash == -1) {
-      return uri;
-    } else {
-      String fileName = uri.substring(lastSlash + 1);
-      Preconditions.checkArgument(!Strings.isNullOrEmpty(fileName), "URI 
should not have a slash at the end");
-      return fileName;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExecutorCommandLineGenerator.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExecutorCommandLineGenerator.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExecutorCommandLineGenerator.java
index 82782f2..c2a1c68 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExecutorCommandLineGenerator.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExecutorCommandLineGenerator.java
@@ -19,11 +19,153 @@
 
 package org.apache.myriad.scheduler;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.mesos.Protos;
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.configuration.MyriadExecutorConfiguration;
+import org.apache.myriad.configuration.ServiceConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
 /**
  * Interface to plugin multiple implementations for executor command generation
  */
-public interface ExecutorCommandLineGenerator {
-  String generateCommandLine(ServiceResourceProfile profile, Ports ports);
+public abstract class ExecutorCommandLineGenerator {
+
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(NMExecutorCommandLineGenerator.class);
+
+  public static final String KEY_YARN_RM_HOSTNAME = 
"yarn.resourcemanager.hostname";
+  public static final String KEY_YARN_HOME = "yarn.home";
+
+  protected static final String ALL_LOCAL_IPV4ADDR = "0.0.0.0:";
+
+  protected static final String PROPERTY_FORMAT = " -D%s=%s ";
+  protected static final String CMD_FORMAT = "echo \"%1$s\" && %1$s";
+
+  protected Protos.CommandInfo staticCommandInfo;
+
+  protected MyriadConfiguration myriadConfiguration;
+  protected MyriadExecutorConfiguration myriadExecutorConfiguration;
+  protected YarnConfiguration yarnConfiguration = new YarnConfiguration();
+
+  abstract Protos.CommandInfo generateCommandLine(ServiceResourceProfile 
profile, ServiceConfiguration serviceConfiguration, Collection<Long> ports);
+
+  protected void appendDistroExtractionCommands(StringBuilder cmdLine) {
+    /*
+    TODO(darinj): Overall this is messier than I'd like. We can't let mesos 
untar the distribution, since
+    it will change the permissions.  Instead we simply download the tarball 
and execute tar -xvpf. We also
+    pull the config from the resource manager and put them in the 
yarnConfiguration dir.  This is also why we need
+    frameworkSuperUser. This will be refactored after Mesos-1790 is resolved.
+   */
+
+    //TODO(DarinJ) support other compression, as this is a temp fix for Mesos 
1760 may not get to it.
+    if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
+      //Extract tarball keeping permissions, necessary to keep 
HADOOP_HOME/bin/container-executor suidbit set.
+      //If SudoUser not enable LinuxExecutor will not work
+      appendSudo(cmdLine);
+      cmdLine.append("tar -zxpf 
").append(getFileName(myriadExecutorConfiguration.getNodeManagerUri().get()));
+      cmdLine.append(" && ");
+      //Place the hadoop config where in the HADOOP_CONF_DIR where it will be 
read by the NodeManager
+      //The url for the resource manager config is: 
http(s)://hostname:port/yarnConfiguration so fetcher.cpp downloads the
+      //config file to yarnConfiguration, It's an xml file with the parameters 
of yarn-site.xml, core-site.xml and hdfs.xml.
+      if (!myriadExecutorConfiguration.getConfigUri().isPresent()) {
+        appendSudo(cmdLine);
+        cmdLine.append(" cp yarnConfiguration ");
+        
cmdLine.append(myriadConfiguration.getYarnEnvironment().get("YARN_HOME"));
+        cmdLine.append("/etc/hadoop/yarn-site.xml && ");
+      }
+    }
+  }
+
+  protected void addJavaOpt(StringBuilder opts, String propertyName, String 
propertyValue) {
+    String envOpt = String.format(PROPERTY_FORMAT, propertyName, 
propertyValue);
+    opts.append(envOpt);
+  }
+
+  protected void appendSudo(StringBuilder cmdLine) {
+    if (myriadConfiguration.getFrameworkSuperUser().isPresent()) {
+      cmdLine.append(" sudo ");
+    }
+  }
+
+  protected void appendUserSudo(StringBuilder cmdLine) {
+    if (myriadConfiguration.getFrameworkSuperUser().isPresent()) {
+      cmdLine.append(" sudo -E -u ");
+      cmdLine.append(myriadConfiguration.getFrameworkUser().get());
+      cmdLine.append(" -H ");
+    }
+  }
+
+  public String getConfigurationUrl() {
+    String httpPolicy = yarnConfiguration.get(TaskFactory.YARN_HTTP_POLICY);
+    String address = StringUtils.EMPTY;
+    if (httpPolicy != null && 
httpPolicy.equals(TaskFactory.YARN_HTTP_POLICY_HTTPS_ONLY)) {
+      address = 
yarnConfiguration.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS);
+      if (StringUtils.isEmpty(address)) {
+        address = 
yarnConfiguration.get(TaskFactory.YARN_RESOURCEMANAGER_HOSTNAME) + ":8090";
+      }
+      return "https://"; + address + "/yarnConfiguration";
+    } else {
+      address = 
yarnConfiguration.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_ADDRESS);
+      if (StringUtils.isEmpty(address)) {
+        address = 
yarnConfiguration.get(TaskFactory.YARN_RESOURCEMANAGER_HOSTNAME) + ":8088";
+      }
+      return "http://"; + address + "/yarnConfiguration";
+    }
+  }
+
+  private static String getFileName(String uri) {
+    int lastSlash = uri.lastIndexOf('/');
+    if (lastSlash == -1) {
+      return uri;
+    } else {
+      String fileName = uri.substring(lastSlash + 1);
+      Preconditions.checkArgument(!Strings.isNullOrEmpty(fileName), "URI 
should not have a slash at the end");
+      return fileName;
+    }
+  }
+
+  protected String getUser() {
+    if (myriadConfiguration.getFrameworkSuperUser().isPresent()) {
+      return myriadConfiguration.getFrameworkSuperUser().get();
+    } else {
+      return myriadConfiguration.getFrameworkUser().get();
+    }
+  }
 
-  String getConfigurationUrl();
+  protected List<Protos.CommandInfo.URI> getUris() {
+    List<Protos.CommandInfo.URI> uris = new ArrayList<>();
+    if (myriadExecutorConfiguration.getJvmUri().isPresent()) {
+      final String jvmRemoteUri = 
myriadExecutorConfiguration.getJvmUri().get();
+      LOGGER.info("Getting JRE distribution from:" + jvmRemoteUri);
+      
uris.add(Protos.CommandInfo.URI.newBuilder().setValue(jvmRemoteUri).build());
+    }
+    if (myriadExecutorConfiguration.getConfigUri().isPresent()) {
+      String configURI = myriadExecutorConfiguration.getConfigUri().get();
+      LOGGER.info("Getting Hadoop configuration from: {}", configURI);
+      
uris.add(Protos.CommandInfo.URI.newBuilder().setValue(configURI).build());
+    } else if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
+      String configURI = getConfigurationUrl();
+      LOGGER.info("Getting Hadoop configuration from: {}", configURI);
+      
uris.add(Protos.CommandInfo.URI.newBuilder().setValue(configURI).build());
+    }
+    if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
+      //Both FrameworkUser and FrameworkSuperuser to get all of the directory 
permissions correct.
+      if (!(myriadConfiguration.getFrameworkUser().isPresent() && 
myriadConfiguration.getFrameworkSuperUser().isPresent())) {
+        LOGGER.warn("Trying to use remote distribution, but frameworkUser 
and/or frameworkSuperUser not set!" +
+            "Some features may not work");
+      }
+      String nodeManagerUri = 
myriadExecutorConfiguration.getNodeManagerUri().get();
+      LOGGER.info("Getting Hadoop distribution from: {}", nodeManagerUri);
+      
uris.add(Protos.CommandInfo.URI.newBuilder().setValue(nodeManagerUri).setExtract(false).build());
+    }
+    return uris;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExtendedResourceProfile.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExtendedResourceProfile.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExtendedResourceProfile.java
index 6232258..59e2cb0 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExtendedResourceProfile.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExtendedResourceProfile.java
@@ -20,6 +20,8 @@ package org.apache.myriad.scheduler;
 
 import com.google.gson.Gson;
 
+import java.util.Map;
+
 /**
  * Extended ServiceResourceProfile for services that need to pass set of 
resources downstream
  * currently the only such service is NodeManager
@@ -33,33 +35,12 @@ public class ExtendedResourceProfile extends 
ServiceResourceProfile {
    * @param cpu
    * @param mem          will throw NullPoiterException if childProfile is null
    */
-  public ExtendedResourceProfile(NMProfile childProfile, Double cpu, Double 
mem, Double execCpu, Double execMemory) {
-    super(childProfile.getName(), cpu, mem, execCpu, execMemory);
-
+  public ExtendedResourceProfile(NMProfile childProfile, Double cpu, Double 
mem, Map<String, Long> ports) {
+    super(childProfile.getName(), cpu, mem, ports);
     this.childProfile = childProfile;
     this.className = ExtendedResourceProfile.class.getName();
   }
 
- /**
-  * @param childProfile - should be null
-  * @param cpu
-  * @param mem          will throw NullPoiterException if childProfile is null
-  */
-  public ExtendedResourceProfile(NMProfile childProfile, Double cpu, Double 
mem) {
-    super(childProfile.getName(), cpu, mem);
-
-    this.childProfile = childProfile;
-    this.className = ExtendedResourceProfile.class.getName();
-  }
-
-  public NMProfile getChildProfile() {
-    return childProfile;
-  }
-
-  public void setChildProfile(NMProfile nmProfile) {
-    this.childProfile = nmProfile;
-  }
-
   @Override
   public String getName() {
     return childProfile.getName();

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java
index 99f7e78..fb1f1bb 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java
@@ -136,7 +136,7 @@ public class MyriadOperations {
 
     Collection<NodeTask> nodes = new HashSet<>();
     for (int i = 0; i < instances; i++) {
-      NodeTask nodeTask = new NodeTask(new ServiceResourceProfile(serviceName, 
cpu, mem), null);
+      NodeTask nodeTask = new NodeTask(new ServiceResourceProfile(serviceName, 
cpu, mem, auxTaskConf.getPorts()), null);
       nodeTask.setTaskPrefix(serviceName);
       nodes.add(nodeTask);
     }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java
deleted file mode 100644
index db945ec..0000000
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.myriad.scheduler;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.myriad.configuration.MyriadConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation assumes NM binaries already deployed
- */
-public class NMExecutorCLGenImpl implements ExecutorCommandLineGenerator {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(NMExecutorCLGenImpl.class);
-
-  public static final String ENV_YARN_NODEMANAGER_OPTS = 
"YARN_NODEMANAGER_OPTS";
-  public static final String KEY_YARN_NM_CGROUPS_PATH = 
"yarn.nodemanager.cgroups.path";
-  public static final String KEY_YARN_RM_HOSTNAME = 
"yarn.resourcemanager.hostname";
-
-  /**
-   * YARN class to help handle LCE resources
-   */
-  // TODO (mohit): Should it be configurable ?
-  public static final String KEY_YARN_NM_LCE_CGROUPS_HIERARCHY = 
"yarn.nodemanager.linux-container-executor.cgroups.hierarchy";
-  public static final String KEY_YARN_HOME = "yarn.home";
-  public static final String KEY_NM_RESOURCE_CPU_VCORES = 
"nodemanager.resource.cpu-vcores";
-  public static final String KEY_NM_RESOURCE_MEM_MB = 
"nodemanager.resource.memory-mb";
-  public static final String YARN_NM_CMD = " $YARN_HOME/bin/yarn nodemanager";
-  public static final String KEY_NM_ADDRESS = 
"myriad.yarn.nodemanager.address";
-  public static final String KEY_NM_LOCALIZER_ADDRESS = 
"myriad.yarn.nodemanager.localizer.address";
-  public static final String KEY_NM_WEBAPP_ADDRESS = 
"myriad.yarn.nodemanager.webapp.address";
-  public static final String KEY_NM_SHUFFLE_PORT = 
"myriad.mapreduce.shuffle.port";
-
-  private static final String ALL_LOCAL_IPV4ADDR = "0.0.0.0:";
-  private static final String PROPERTY_FORMAT = "-D%s=%s";
-
-  private Map<String, String> environment = new HashMap<>();
-  protected MyriadConfiguration cfg;
-  protected YarnConfiguration conf = new YarnConfiguration();
-
-  public NMExecutorCLGenImpl(MyriadConfiguration cfg) {
-    this.cfg = cfg;
-  }
-
-  @Override
-  public String generateCommandLine(ServiceResourceProfile profile, Ports 
ports) {
-    StringBuilder cmdLine = new StringBuilder();
-
-    generateEnvironment(profile, (NMPorts) ports);
-    appendCgroupsCmds(cmdLine);
-    appendYarnHomeExport(cmdLine);
-    appendEnvForNM(cmdLine);
-    cmdLine.append(YARN_NM_CMD);
-    return cmdLine.toString();
-  }
-
-  protected void generateEnvironment(ServiceResourceProfile profile, NMPorts 
ports) {
-    //yarnEnvironemnt configuration from yaml file
-    Map<String, String> yarnEnvironmentMap = cfg.getYarnEnvironment();
-    if (yarnEnvironmentMap != null) {
-      environment.putAll(yarnEnvironmentMap);
-    }
-
-    String rmHostName = System.getProperty(KEY_YARN_RM_HOSTNAME);
-    if (StringUtils.isNotEmpty(rmHostName)) {
-      addYarnNodemanagerOpt(KEY_YARN_RM_HOSTNAME, rmHostName);
-    }
-
-    if (cfg.getNodeManagerConfiguration().getCgroups()) {
-      addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_HIERARCHY, 
"mesos/$TASK_DIR");
-      if (environment.containsKey("YARN_HOME")) {
-        addYarnNodemanagerOpt(KEY_YARN_HOME, environment.get("YARN_HOME"));
-      }
-    }
-    addYarnNodemanagerOpt(KEY_NM_RESOURCE_CPU_VCORES, 
Integer.toString(profile.getCpus().intValue()));
-    addYarnNodemanagerOpt(KEY_NM_RESOURCE_MEM_MB, 
Integer.toString(profile.getMemory().intValue()));
-    addYarnNodemanagerOpt(KEY_NM_ADDRESS, ALL_LOCAL_IPV4ADDR + 
Long.valueOf(ports.getRpcPort()).toString());
-    addYarnNodemanagerOpt(KEY_NM_LOCALIZER_ADDRESS, ALL_LOCAL_IPV4ADDR + 
Long.valueOf(ports.getLocalizerPort()).toString());
-    addYarnNodemanagerOpt(KEY_NM_WEBAPP_ADDRESS, ALL_LOCAL_IPV4ADDR + 
Long.valueOf(ports.getWebAppHttpPort()).toString());
-    addYarnNodemanagerOpt(KEY_NM_SHUFFLE_PORT, 
Long.valueOf(ports.getShufflePort()).toString());
-  }
-
-  protected void appendEnvForNM(StringBuilder cmdLine) {
-    cmdLine.append(" env ");
-    for (Map.Entry<String, String> env : environment.entrySet()) {
-      
cmdLine.append(env.getKey()).append("=").append("\"").append(env.getValue()).append("\"
 ");
-    }
-  }
-
-  protected void appendCgroupsCmds(StringBuilder cmdLine) {
-    if (cfg.getFrameworkSuperUser().isPresent()) {
-      cmdLine.append(" export TASK_DIR=`basename $PWD`&&");
-      //The container executor script expects mount-path to exist and owned by 
the yarn user
-      //See: 
https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/NodeManagerCgroups.html
-      //If YARN ever moves to cgroup/mem it will be necessary to add a mem 
version.
-      appendSudo(cmdLine);
-      cmdLine.append("chown " + cfg.getFrameworkUser().get() + " ");
-      cmdLine.append(cfg.getCGroupPath());
-      cmdLine.append("/cpu/mesos/$TASK_DIR &&");
-    } else {
-      LOGGER.info("frameworkSuperUser not enabled ignoring cgroup 
configuration");
-    }
-  }
-
-  protected void appendYarnHomeExport(StringBuilder cmdLine) {
-    if (environment.containsKey("YARN_HOME")) {
-      cmdLine.append(" export YARN_HOME=");
-      cmdLine.append(environment.get("YARN_HOME"));
-      cmdLine.append(";");
-    }
-  }
-
-  protected void appendSudo(StringBuilder cmdLine) {
-    if (cfg.getFrameworkSuperUser().isPresent()) {
-      cmdLine.append(" sudo ");
-    }
-  }
-
-  protected void appendUserSudo(StringBuilder cmdLine) {
-    if (cfg.getFrameworkSuperUser().isPresent()) {
-      cmdLine.append(" sudo -E -u ");
-      cmdLine.append(cfg.getFrameworkUser().get());
-      cmdLine.append(" -H ");
-    }
-  }
-
-  protected void addYarnNodemanagerOpt(String propertyName, String 
propertyValue) {
-    String envOpt = String.format(PROPERTY_FORMAT, propertyName, 
propertyValue);
-    if (environment.containsKey(ENV_YARN_NODEMANAGER_OPTS)) {
-      String existingOpts = environment.get(ENV_YARN_NODEMANAGER_OPTS);
-      environment.put(ENV_YARN_NODEMANAGER_OPTS, existingOpts + " " + envOpt);
-    } else {
-      environment.put(ENV_YARN_NODEMANAGER_OPTS, envOpt);
-    }
-  }
-
-  @Override
-  public String getConfigurationUrl() {
-    String httpPolicy = conf.get(TaskFactory.YARN_HTTP_POLICY);
-    String address = StringUtils.EMPTY;
-    if (httpPolicy != null && 
httpPolicy.equals(TaskFactory.YARN_HTTP_POLICY_HTTPS_ONLY)) {
-      address = 
conf.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS);
-      if (StringUtils.isEmpty(address)) {
-        address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_HOSTNAME) + 
":8090";
-      }
-    } else {
-      address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_ADDRESS);
-      if (StringUtils.isEmpty(address)) {
-        address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_HOSTNAME) + 
":8088";
-      }
-    }
-    
-    return "http://"; + address + "/conf";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCommandLineGenerator.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCommandLineGenerator.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCommandLineGenerator.java
new file mode 100644
index 0000000..035da3f
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCommandLineGenerator.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.myriad.scheduler;
+
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+
+
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.mesos.Protos;
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.configuration.ServiceConfiguration;
+
+import org.apache.mesos.Protos.CommandInfo;
+
+/**
+ * Implementation assumes NM binaries already deployed
+ */
+public class NMExecutorCommandLineGenerator extends 
ExecutorCommandLineGenerator {
+
+  /**
+   * YARN class to help handle LCE resources
+   */
+  public static final String ENV_YARN_NODEMANAGER_OPTS = 
"YARN_NODEMANAGER_OPTS";
+  public static final String KEY_YARN_NM_LCE_CGROUPS_HIERARCHY = 
"yarn.nodemanager.linux-container-executor.cgroups.hierarchy";
+  public static final String KEY_NM_RESOURCE_CPU_VCORES = 
"nodemanager.resource.cpu-vcores";
+  public static final String KEY_NM_RESOURCE_MEM_MB = 
"nodemanager.resource.memory-mb";
+
+  public static final String YARN_NM_CMD = " $YARN_HOME/bin/yarn nodemanager";
+
+  public NMExecutorCommandLineGenerator(MyriadConfiguration cfg) {
+    this.myriadConfiguration = cfg;
+    this.myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration();
+    generateStaticCommandLine();
+  }
+
+  @Override
+  CommandInfo generateCommandLine(ServiceResourceProfile profile,
+                                  ServiceConfiguration serviceConfiguration, 
Collection<Long> ports) {
+    CommandInfo.Builder builder = CommandInfo.newBuilder();
+    builder.mergeFrom(staticCommandInfo);
+    builder.setEnvironment(generateEnvironment(profile, ports));
+    builder.setUser(getUser());
+    return builder.build();
+  }
+
+  protected void generateStaticCommandLine() {
+    CommandInfo.Builder builder = CommandInfo.newBuilder();
+    StringBuilder cmdLine = new StringBuilder();
+    appendCgroupsCmds(cmdLine);
+    appendDistroExtractionCommands(cmdLine);
+    appendUserSudo(cmdLine);
+    cmdLine.append(YARN_NM_CMD);
+    builder.setValue(String.format(CMD_FORMAT, cmdLine.toString()));
+    builder.addAllUris(getUris());
+    staticCommandInfo = builder.build();
+  }
+
+  protected Protos.Environment generateEnvironment(ServiceResourceProfile 
profile, Collection<Long> ports) {
+    Map<String, String> yarnEnv = myriadConfiguration.getYarnEnvironment();
+    Protos.Environment.Builder builder = Protos.Environment.newBuilder();
+    builder.addAllVariables(Iterables.transform(yarnEnv.entrySet(), new 
Function<Map.Entry<String, String>, Protos.Environment.Variable>() {
+      public Protos.Environment.Variable apply(Map.Entry<String, String> x) {
+        return 
Protos.Environment.Variable.newBuilder().setName(x.getKey()).setValue(x.getValue()).build();
+      }
+    }));
+
+    StringBuilder yarnOpts = new StringBuilder();
+    String rmHostName = System.getProperty(KEY_YARN_RM_HOSTNAME);
+
+
+    if (StringUtils.isNotEmpty(rmHostName)) {
+      addJavaOpt(yarnOpts, KEY_YARN_RM_HOSTNAME, rmHostName);
+    }
+
+    if (yarnEnv.containsKey(KEY_YARN_HOME)) {
+      addJavaOpt(yarnOpts, KEY_YARN_HOME, yarnEnv.get("YARN_HOME"));
+    }
+
+    addJavaOpt(yarnOpts, KEY_NM_RESOURCE_CPU_VCORES, 
Integer.toString(profile.getCpus().intValue()));
+    addJavaOpt(yarnOpts, KEY_NM_RESOURCE_MEM_MB, 
Integer.toString(profile.getMemory().intValue()));
+    Map<String, Long> portsMap = profile.getPorts();
+    Preconditions.checkState(portsMap.size() == ports.size());
+
+    Iterator itr = ports.iterator();
+    for (String portProperty : portsMap.keySet()) {
+      if (portProperty.endsWith("address")) {
+        addJavaOpt(yarnOpts, portProperty, ALL_LOCAL_IPV4ADDR + itr.next());
+      } else if (portProperty.endsWith("port")) {
+        addJavaOpt(yarnOpts, portProperty, itr.next().toString());
+      } else {
+        LOGGER.warn("{} propery isn't an address or port!", portProperty);
+      }
+    }
+
+
+    if 
(myriadConfiguration.getYarnEnvironment().containsKey(ENV_YARN_NODEMANAGER_OPTS))
 {
+      yarnOpts.append(" ").append(yarnEnv.get(ENV_YARN_NODEMANAGER_OPTS));
+    }
+    builder.addAllVariables(Collections.singleton(
+            Protos.Environment.Variable.newBuilder()
+                .setName(ENV_YARN_NODEMANAGER_OPTS)
+                .setValue(yarnOpts.toString()).build())
+    );
+    return builder.build();
+  }
+
+  protected void appendCgroupsCmds(StringBuilder cmdLine) {
+    //These can't be set in the environment as they require commands to be run 
on the host
+    if (myriadConfiguration.getFrameworkSuperUser().isPresent() && 
myriadConfiguration.isCgroupEnabled()) {
+      cmdLine.append(" export TASK_DIR=`cat /proc/self/cgroup | grep :cpu: | 
cut -d: -f3` &&");
+      //The container executor script expects mount-path to exist and owned by 
the yarn user
+      //See: 
https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/NodeManagerCgroups.html
+      //If YARN ever moves to cgroup/mem it will be necessary to add a mem 
version.
+      appendSudo(cmdLine);
+      cmdLine.append("chown " + myriadConfiguration.getFrameworkUser().get() + 
" ");
+      cmdLine.append(myriadConfiguration.getCGroupPath());
+      cmdLine.append("/cpu$TASK_DIR &&");
+      cmdLine.append(String.format("export %s=\"$%s -D%s=%s\" && ", 
ENV_YARN_NODEMANAGER_OPTS, ENV_YARN_NODEMANAGER_OPTS,
+          KEY_YARN_NM_LCE_CGROUPS_HIERARCHY, "$TASK_DIR"));
+    } else if (myriadConfiguration.isCgroupEnabled()) {
+      LOGGER.info("frameworkSuperUser not set ignoring cgroup configuration, 
this will likely can the nodemanager to crash");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java
deleted file mode 100644
index 12490a7..0000000
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.myriad.scheduler;
-
-import com.google.common.base.Preconditions;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Helper class for dynamically assigning ports to nodemanager
- */
-public class NMPorts implements Ports {
-  private static final String NM_RPC_PORT_KEY = "nm.rpc.port";
-  private static final String NM_LOCALIZER_PORT_KEY = "nm.localizer.port";
-  private static final String NM_WEBAPP_HTTP_PORT_KEY = "nm.webapp.http.port";
-  private static final String NM_HTTP_SHUFFLE_PORT_KEY = 
"nm.http.shuffle.port";
-
-  private static final String[] NM_PORT_KEYS =
-      {NM_RPC_PORT_KEY, NM_LOCALIZER_PORT_KEY, NM_WEBAPP_HTTP_PORT_KEY, 
NM_HTTP_SHUFFLE_PORT_KEY};
-
-  private Map<String, Long> portsMap = new HashMap<>(NM_PORT_KEYS.length);
-
-  public NMPorts(Long[] ports) {
-    Preconditions.checkState(ports.length == NM_PORT_KEYS.length, "NMPorts: 
array \"ports\" is of unexpected length");
-    for (int i = 0; i < NM_PORT_KEYS.length; i++) {
-      portsMap.put(NM_PORT_KEYS[i], ports[i]);
-    }
-  }
-
-  public long getRpcPort() {
-    return portsMap.get(NM_RPC_PORT_KEY);
-  }
-
-  public long getLocalizerPort() {
-    return portsMap.get(NM_LOCALIZER_PORT_KEY);
-  }
-
-  public long getWebAppHttpPort() {
-    return portsMap.get(NM_WEBAPP_HTTP_PORT_KEY);
-  }
-
-  public long getShufflePort() {
-    return portsMap.get(NM_HTTP_SHUFFLE_PORT_KEY);
-  }
-
-  public static int expectedNumPorts() {
-    return NM_PORT_KEYS.length;
-  }
-
-  /**
-   * @return a string representation of NMPorts
-   */
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder().append("{");
-    for (String key : NM_PORT_KEYS) {
-      sb.append(key).append(": 
").append(portsMap.get(key).toString()).append(", ");
-    }
-    sb.replace(sb.length() - 2, sb.length(), "}");
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactory.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactory.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactory.java
new file mode 100644
index 0000000..6c7e209
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactory.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.myriad.scheduler;
+
+import com.google.inject.Inject;
+import org.apache.mesos.Protos;
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.scheduler.resource.ResourceOfferContainer;
+import org.apache.myriad.state.NodeTask;
+
+import java.util.List;
+
+/**
+ * Creates Node Manager Tasks based upon Mesos offers
+ */
+public class NMTaskFactory extends TaskFactory {
+
+
+  @Inject
+  NMTaskFactory(MyriadConfiguration cfg, TaskUtils taskUtils, 
ExecutorCommandLineGenerator clGenerator) {
+    super(cfg, taskUtils, clGenerator);
+  }
+
+  @Override
+  public Protos.TaskInfo createTask(ResourceOfferContainer 
resourceOfferContainer, Protos.FrameworkID frameworkId, Protos.TaskID taskId, 
NodeTask nodeTask) {
+    ServiceResourceProfile serviceProfile = nodeTask.getProfile();
+    Double taskMemory = serviceProfile.getAggregateMemory();
+    Double taskCpus = serviceProfile.getAggregateCpu();
+    List<Protos.Resource> portResources = 
resourceOfferContainer.consumePorts(serviceProfile.getPorts().values());
+    Protos.CommandInfo commandInfo = 
clGenerator.generateCommandLine(serviceProfile, null, 
rangesConverter(portResources));
+    Protos.ExecutorInfo executorInfo = 
getExecutorInfoForSlave(resourceOfferContainer, frameworkId, commandInfo);
+    Protos.TaskInfo.Builder taskBuilder = 
Protos.TaskInfo.newBuilder().setName(cfg.getFrameworkName() + "-" + 
taskId.getValue()).setTaskId(taskId).setSlaveId(
+        resourceOfferContainer.getSlaveId());
+
+    return taskBuilder
+        .addAllResources(resourceOfferContainer.consumeCpus(taskCpus))
+        .addAllResources(resourceOfferContainer.consumeMem(taskMemory))
+        .addAllResources(portResources)
+        .setExecutor(executorInfo)
+        .build();
+  }
+
+  @Override
+  public Protos.ExecutorInfo getExecutorInfoForSlave(ResourceOfferContainer 
resourceOfferContainer, Protos.FrameworkID frameworkId, Protos.CommandInfo 
commandInfo) {
+    Protos.ExecutorID executorId = Protos.ExecutorID.newBuilder()
+        .setValue(EXECUTOR_PREFIX + frameworkId.getValue() + 
resourceOfferContainer.getOfferId() +
+            resourceOfferContainer.getSlaveId().getValue())
+        .build();
+    Protos.ExecutorInfo.Builder executorInfo = 
Protos.ExecutorInfo.newBuilder().setCommand(commandInfo).setName(EXECUTOR_NAME).setExecutorId(executorId)
+        
.addAllResources(resourceOfferContainer.consumeCpus(taskUtils.getExecutorCpus()))
+        
.addAllResources(resourceOfferContainer.consumeMem(taskUtils.getExecutorMemory()));
+    if (cfg.getContainerInfo().isPresent()) {
+      executorInfo.setContainer(getContainerInfo());
+    }
+    return executorInfo.build();
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Ports.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Ports.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Ports.java
deleted file mode 100644
index 03150fb..0000000
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Ports.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.myriad.scheduler;
-
-/**
- * Generic interface to represent ports
- */
-public interface Ports {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceCommandLineGenerator.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceCommandLineGenerator.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceCommandLineGenerator.java
index 8765226..083d54d 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceCommandLineGenerator.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceCommandLineGenerator.java
@@ -17,24 +17,94 @@
  */
 package org.apache.myriad.scheduler;
 
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.mesos.Protos;
 import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.configuration.ServiceConfiguration;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
 
 /**
  * CommandLineGenerator for any aux service launched by Myriad as binary distro
  */
-public class ServiceCommandLineGenerator extends DownloadNMExecutorCLGenImpl {
+public class ServiceCommandLineGenerator extends ExecutorCommandLineGenerator {
+
+  public static final String ENV_HADOOP_OPTS = "HADOOP_OPTS";
 
+  private String baseCmd;
 
-  public ServiceCommandLineGenerator(MyriadConfiguration cfg, String 
nodeManagerUri) {
-    super(cfg, nodeManagerUri);
+  public ServiceCommandLineGenerator(MyriadConfiguration cfg) {
+    this.myriadConfiguration = cfg;
+    myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration();
+    generateStaticCommandLine();
+  }
+
+  protected void generateStaticCommandLine() {
+    Protos.CommandInfo.Builder builder = Protos.CommandInfo.newBuilder();
+    builder.addAllUris(getUris());
+    builder.setUser(getUser());
+    staticCommandInfo = builder.build();
+
+    StringBuilder cmdLine = new StringBuilder();
+    appendDistroExtractionCommands(cmdLine);
+    appendUserSudo(cmdLine);
+    baseCmd = cmdLine.toString();
   }
 
   @Override
-  public String generateCommandLine(ServiceResourceProfile profile, Ports 
ports) {
-    StringBuilder strB = new StringBuilder();
-    appendDistroExtractionCommands(strB);
-    appendUserSudo(strB);
-    return strB.toString();
+  public Protos.CommandInfo generateCommandLine(ServiceResourceProfile profile,
+                                                ServiceConfiguration 
serviceConfiguration,
+                                                Collection<Long> ports) {
+    Protos.CommandInfo.Builder builder = Protos.CommandInfo.newBuilder();
+    builder.mergeFrom(staticCommandInfo);
+    builder.setValue(String.format(CMD_FORMAT, baseCmd + " " + 
serviceConfiguration.getCommand().get()));
+    builder.setEnvironment(generateEnvironment(profile, ports));
+    return builder.build();
+  }
+
+  protected Protos.Environment generateEnvironment(ServiceResourceProfile 
serviceResourceProfile, Collection<Long> ports) {
+    Map<String, String> yarnEnv = myriadConfiguration.getYarnEnvironment();
+    Protos.Environment.Builder builder = Protos.Environment.newBuilder();
+
+    builder.addAllVariables(Iterables.transform(yarnEnv.entrySet(), new 
Function<Map.Entry<String, String>, Protos.Environment.Variable>() {
+      public Protos.Environment.Variable apply(Map.Entry<String, String> x) {
+        return 
Protos.Environment.Variable.newBuilder().setName(x.getKey()).setValue(x.getValue()).build();
+      }
+    }));
+
+    StringBuilder hadoopOpts = new StringBuilder();
+    String rmHostName = System.getProperty(KEY_YARN_RM_HOSTNAME);
+
+    if (StringUtils.isNotEmpty(rmHostName)) {
+      addJavaOpt(hadoopOpts, KEY_YARN_RM_HOSTNAME, rmHostName);
+    }
+
+    if (yarnEnv.containsKey(KEY_YARN_HOME)) {
+      addJavaOpt(hadoopOpts, KEY_YARN_HOME, yarnEnv.get("YARN_HOME"));
+    }
+
+    Map<String, Long> portsMap = serviceResourceProfile.getPorts();
+    Preconditions.checkState(portsMap.size() == ports.size());
+    Iterator itr = ports.iterator();
+    for (String portProperty : portsMap.keySet()) {
+      addJavaOpt(hadoopOpts, portProperty, ALL_LOCAL_IPV4ADDR + itr.next());
+    }
+
+    if (myriadConfiguration.getYarnEnvironment().containsKey(ENV_HADOOP_OPTS)) 
{
+      hadoopOpts.append(" ").append(yarnEnv.get(ENV_HADOOP_OPTS));
+    }
+    builder.addAllVariables(Collections.singleton(
+            Protos.Environment.Variable.newBuilder()
+                .setName(ENV_HADOOP_OPTS)
+                .setValue(hadoopOpts.toString()).build())
+    );
+    return builder.build();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java
index 146a80c..147ed47 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java
@@ -24,11 +24,13 @@ import com.google.gson.JsonDeserializer;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonParseException;
 import java.lang.reflect.Type;
+import java.util.Map;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Resource Profile for any service 
+ * Resource Profile for any service
  */
 public class ServiceResourceProfile {
 
@@ -50,18 +52,14 @@ public class ServiceResourceProfile {
 
   protected String className = ServiceResourceProfile.class.getName();
 
-  public ServiceResourceProfile(String name, Double cpus, Double mem) {
-    this.name = name;
-    this.cpus = cpus;
-    this.memory = mem;
-  }
+  protected Map<String, Long> ports;
 
-  public ServiceResourceProfile(String name, Double cpus, Double mem, Double 
execCpus, Double execMemory) {
+  public ServiceResourceProfile(String name, Double cpus, Double mem, 
Map<String, Long> ports) {
     this.name = name;
     this.cpus = cpus;
     this.memory = mem;
-    this.executorCpu = execCpus;
-    this.executorMemory = execMemory;
+    this.ports = ports;
+    this.className = ServiceResourceProfile.class.getName();
   }
 
   public String getName() {
@@ -84,12 +82,8 @@ public class ServiceResourceProfile {
     return cpus;
   }
 
-  public Double getExecutorCpu() {
-    return executorCpu;
-  }
-
-  public Double getExecutorMemory() {
-    return executorMemory;
+  public Map<String, Long> getPorts() {
+    return ports;
   }
 
   @Override
@@ -144,10 +138,10 @@ public class ServiceResourceProfile {
     }
     if (obj == null) {
       return false;
-    }      
+    }
     if (getClass() != obj.getClass()) {
       return false;
-    } 
+    }
     ServiceResourceProfile other = (ServiceResourceProfile) obj;
     if (className == null) {
       if (other.className != null) {
@@ -190,7 +184,7 @@ public class ServiceResourceProfile {
       }
     } else if (!name.equals(other.name)) {
       return false;
-    }  
+    }
     return true;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java
deleted file mode 100644
index 60d4c44..0000000
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.myriad.scheduler;
-
-import java.util.Map;
-import org.apache.myriad.configuration.MyriadConfiguration;
-import org.apache.myriad.configuration.ServiceConfiguration;
-
-/**
- * ServiceTaskConstraints is an implementation of TaskConstraints for a service
- * at this point constraints are on ports
- * Later on there may be other types of constraints added
- */
-public class ServiceTaskConstraints implements TaskConstraints {
-
-  private int portsCount = 0;
-
-  public ServiceTaskConstraints(MyriadConfiguration cfg, String taskPrefix) {
-    Map<String, ServiceConfiguration> auxConfigs = 
cfg.getServiceConfigurations();
-
-    ServiceConfiguration serviceConfig = auxConfigs.get(taskPrefix);
-    if (serviceConfig != null) {
-      if (serviceConfig.getPorts().isPresent()) {
-        portsCount = serviceConfig.getPorts().get().size();
-      }
-    }
-  }
-
-  @Override
-  public int portsCount() {
-    return portsCount;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactory.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactory.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactory.java
new file mode 100644
index 0000000..57e104f
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactory.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.myriad.scheduler;
+
+import com.google.inject.Inject;
+import org.apache.mesos.Protos;
+import org.apache.myriad.configuration.MyriadConfiguration;
+import org.apache.myriad.configuration.ServiceConfiguration;
+import org.apache.myriad.scheduler.resource.ResourceOfferContainer;
+import org.apache.myriad.state.NodeTask;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Generic Service Class that allows to create a service solely base don the 
configuration
+ * Main properties of configuration are:
+ * 1. command to run
+ * 2. Additional env. variables to set (serviceOpts)
+ * 3. ports to use with names of the properties
+ * 4. TODO (yufeldman) executor info
+ */
+public class ServiceTaskFactory extends TaskFactory {
+
+  @Inject
+  ServiceTaskFactory(MyriadConfiguration cfg, TaskUtils taskUtils, 
ExecutorCommandLineGenerator clGenerator) {
+    super(cfg, taskUtils, clGenerator);
+    this.clGenerator = new ServiceCommandLineGenerator(cfg);
+  }
+
+  @Override
+  public Protos.TaskInfo createTask(ResourceOfferContainer 
resourceOfferContainer, Protos.FrameworkID frameworkId, Protos.TaskID taskId, 
NodeTask nodeTask) {
+    ServiceConfiguration serviceConfig = 
cfg.getServiceConfiguration(nodeTask.getTaskPrefix()).get();
+
+    Objects.requireNonNull(serviceConfig, "ServiceConfig should be non-null");
+    Objects.requireNonNull(serviceConfig.getCommand().orNull(), "command for 
ServiceConfig should be non-null");
+    List<Protos.Resource> portResources = 
resourceOfferContainer.consumePorts(nodeTask.getProfile().getPorts().values());
+    Protos.CommandInfo commandInfo = 
clGenerator.generateCommandLine(nodeTask.getProfile(), serviceConfig, 
rangesConverter(portResources));
+
+    LOGGER.info("Command line for service: {} is: {}", commandInfo.getValue());
+
+    Protos.TaskInfo.Builder taskBuilder = Protos.TaskInfo.newBuilder();
+
+    
taskBuilder.setName(nodeTask.getTaskPrefix()).setTaskId(taskId).setSlaveId(resourceOfferContainer.getSlaveId())
+        
.addAllResources(resourceOfferContainer.consumeCpus(nodeTask.getProfile().getCpus()))
+        
.addAllResources(resourceOfferContainer.consumeMem(nodeTask.getProfile().getMemory()))
+        .addAllResources(portResources);
+
+    taskBuilder.setCommand(commandInfo);
+    if (cfg.getContainerInfo().isPresent()) {
+      taskBuilder.setContainer(getContainerInfo());
+    }
+    return taskBuilder.build();
+  }
+
+  @Override
+  public Protos.ExecutorInfo getExecutorInfoForSlave(ResourceOfferContainer 
resourceOfferContainer, Protos.FrameworkID frameworkId, Protos.CommandInfo 
commandInfo) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java
 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java
deleted file mode 100644
index 42f698a..0000000
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.myriad.scheduler;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import javax.inject.Inject;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.CommandInfo;
-import org.apache.mesos.Protos.CommandInfo.URI;
-import org.apache.mesos.Protos.ExecutorInfo;
-import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.Resource;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.Protos.Value;
-import org.apache.myriad.configuration.MyriadConfiguration;
-import org.apache.myriad.configuration.MyriadExecutorConfiguration;
-import org.apache.myriad.configuration.ServiceConfiguration;
-import org.apache.myriad.state.NodeTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Generic Service Class that allows to create a service solely base don the 
configuration
- * Main properties of configuration are:
- * 1. command to run
- * 2. Additional env. variables to set (serviceOpts)
- * 3. ports to use with names of the properties
- * 4. TODO (yufeldman) executor info
- */
-public class ServiceTaskFactoryImpl implements TaskFactory {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(ServiceTaskFactoryImpl.class);
-
-  public static final long DEFAULT_PORT_NUMBER = 0;
-
-  private MyriadConfiguration cfg;
-  @SuppressWarnings("unused")
-  private TaskUtils taskUtils;
-  private ServiceCommandLineGenerator clGenerator;
-
-  @Inject
-  public ServiceTaskFactoryImpl(MyriadConfiguration cfg, TaskUtils taskUtils) {
-    this.cfg = cfg;
-    this.taskUtils = taskUtils;
-    this.clGenerator = new ServiceCommandLineGenerator(cfg, 
cfg.getMyriadExecutorConfiguration().getNodeManagerUri().orNull());
-  }
-  
-  @Override
-  public TaskInfo createTask(Offer offer, FrameworkID frameworkId, TaskID 
taskId, NodeTask nodeTask) {
-    Objects.requireNonNull(offer, "Offer should be non-null");
-    Objects.requireNonNull(nodeTask, "NodeTask should be non-null");
-
-    ServiceConfiguration serviceConfig = 
cfg.getServiceConfiguration(nodeTask.getTaskPrefix()).get();
-
-    Objects.requireNonNull(serviceConfig, "ServiceConfig should be non-null");
-    Objects.requireNonNull(serviceConfig.getCommand().orNull(), "command for 
ServiceConfig should be non-null");
-
-    final String serviceHostName = "0.0.0.0";
-    final String serviceEnv = serviceConfig.getEnvSettings();
-    final String rmHostName = 
System.getProperty(YARN_RESOURCEMANAGER_HOSTNAME);
-    List<Long> additionalPortsNumbers = null;
-
-    final StringBuilder strB = new StringBuilder("env ");
-    if (serviceConfig.getServiceOpts().isPresent()) {
-      strB.append(serviceConfig.getServiceOpts().get()).append("=");
-
-      strB.append("\"");
-      if (StringUtils.isNotEmpty(rmHostName)) {
-        strB.append("-D" + YARN_RESOURCEMANAGER_HOSTNAME + "=" + rmHostName + 
" ");
-      }
-
-      Map<String, Long> ports = serviceConfig.getPorts().orNull();
-      
-      if (MapUtils.isNotEmpty(ports)) {
-        int neededPortsCount = 0;
-        for (Map.Entry<String, Long> portEntry : ports.entrySet()) {
-          Long port = portEntry.getValue();
-          if (port == DEFAULT_PORT_NUMBER) {
-            neededPortsCount++;
-          }
-        }
-        // use provided ports
-        additionalPortsNumbers = getAvailablePorts(offer, neededPortsCount);
-        LOGGER.info("No specified ports found or number of specified ports is 
not enough. Using ports from Mesos Offers: {}",
-            additionalPortsNumbers);
-        int index = 0;
-        for (Map.Entry<String, Long> portEntry : ports.entrySet()) {
-          String portProperty = portEntry.getKey();
-          Long port = portEntry.getValue();
-          if (port == DEFAULT_PORT_NUMBER) {
-            port = additionalPortsNumbers.get(index++);
-          }
-          strB.append("-D" + portProperty + "=" + serviceHostName + ":" + port 
+ " ");
-        }
-      }
-      strB.append(serviceEnv);
-      strB.append("\"");
-    }
-
-    strB.append(" ");
-    strB.append(serviceConfig.getCommand().get());
-
-    CommandInfo commandInfo = createCommandInfo(nodeTask.getProfile(), 
strB.toString());
-
-    LOGGER.info("Command line for service: {} is: {}", 
nodeTask.getTaskPrefix(), strB.toString());
-
-    TaskInfo.Builder taskBuilder = TaskInfo.newBuilder();
-
-    
taskBuilder.setName(nodeTask.getTaskPrefix()).setTaskId(taskId).setSlaveId(offer.getSlaveId())
-        .addAllResources(taskUtils.getScalarResource(offer, "cpus", 
nodeTask.getProfile().getCpus(), 0.0))
-        .addAllResources(taskUtils.getScalarResource(offer, "mem", 
nodeTask.getProfile().getMemory(), 0.0));
-
-    if (CollectionUtils.isNotEmpty(additionalPortsNumbers)) {
-      // set ports
-      Value.Ranges.Builder valueRanger = Value.Ranges.newBuilder();
-      for (Long port : additionalPortsNumbers) {
-        
valueRanger.addRange(Value.Range.newBuilder().setBegin(port).setEnd(port));
-      }
-
-      
taskBuilder.addResources(Resource.newBuilder().setName("ports").setType(Value.Type.RANGES).setRanges(valueRanger.build()));
-    }
-    taskBuilder.setCommand(commandInfo);
-    if (cfg.getContainerInfo().isPresent()) {
-      taskBuilder.setContainer(taskUtils.getContainerInfo());
-    }
-    return taskBuilder.build();
-  }
-
-  @VisibleForTesting
-  CommandInfo createCommandInfo(ServiceResourceProfile profile, String 
executorCmd) {
-    MyriadExecutorConfiguration myriadExecutorConfiguration = 
cfg.getMyriadExecutorConfiguration();
-    CommandInfo.Builder commandInfo = CommandInfo.newBuilder();
-    Map<String, String> envVars = cfg.getYarnEnvironment();
-    if (MapUtils.isNotEmpty(envVars)) {
-      Protos.Environment.Builder yarnHomeB = Protos.Environment.newBuilder();
-      for (Map.Entry<String, String> envEntry : envVars.entrySet()) {
-        Protos.Environment.Variable.Builder yarnEnvB = 
Protos.Environment.Variable.newBuilder();
-        yarnEnvB.setName(envEntry.getKey()).setValue(envEntry.getValue());
-        yarnHomeB.addVariables(yarnEnvB.build());
-      }
-      commandInfo.mergeEnvironment(yarnHomeB.build());
-    }
-
-    if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
-      //Both FrameworkUser and FrameworkSuperuser to get all of the directory 
permissions correct.
-      if (!minimumUserSet(cfg)) {
-        throw new RuntimeException("Trying to use remote distribution, but 
frameworkUser" + "and/or frameworkSuperUser not set!");
-      }
-
-      LOGGER.info("Using remote distribution");
-      String clGeneratedCommand = clGenerator.generateCommandLine(profile, 
null);
-
-      String nmURIString = 
myriadExecutorConfiguration.getNodeManagerUri().get();
-
-      //Concatenate all the subcommands
-      String cmd = clGeneratedCommand + " " + executorCmd;
-
-      //get the nodemanagerURI
-      //We're going to extract ourselves, so setExtract is false
-      LOGGER.info("Getting Hadoop distribution from:" + nmURIString);
-      URI nmUri = 
URI.newBuilder().setValue(nmURIString).setExtract(false).build();
-
-      //get configs directly from resource manager
-      String configUrlString = clGenerator.getConfigurationUrl();
-      LOGGER.info("Getting config from:" + configUrlString);
-      URI configUri = URI.newBuilder().setValue(configUrlString).build();
-
-      LOGGER.info("Slave will execute command:" + cmd);
-      commandInfo.addUris(nmUri).addUris(configUri).setValue("echo \"" + cmd + 
"\";" + cmd);
-      commandInfo.setUser(cfg.getFrameworkSuperUser().get());
-
-    } else {
-      commandInfo.setValue(executorCmd);
-    }
-    return commandInfo.build();
-  }
-
-  private Boolean minimumUserSet(MyriadConfiguration conf) {
-    return (cfg.getFrameworkUser().isPresent() || 
cfg.getFrameworkSuperUser().isPresent());
-  }
-  
-  @Override
-  public ExecutorInfo getExecutorInfoForSlave(FrameworkID frameworkId, Offer 
offer, CommandInfo commandInfo) {
-    // TODO (yufeldman) if executor specified use it , otherwise return null
-    // nothing to implement here, since we are using default slave executor
-    return null;
-  }
-
-  /**
-   * Helper method to reserve ports
-   *
-   * @param offer
-   * @param requestedPorts
-   * @return
-   */
-  private List<Long> getAvailablePorts(Offer offer, int requestedPorts) {
-    if (requestedPorts == 0) {
-      return null;
-    }
-    
-    final List<Long> returnedPorts = new ArrayList<>();
-    for (Resource resource : offer.getResourcesList()) {
-      if (resource.getName().equals("ports") && (isDefaultRole(resource))) {
-        Iterator<Value.Range> itr = 
resource.getRanges().getRangeList().iterator();
-        while (itr.hasNext()) {
-          Value.Range range = itr.next();
-          if (range.getBegin() <= range.getEnd()) {
-            long i = range.getBegin();
-            while (i <= range.getEnd() && returnedPorts.size() < 
requestedPorts) {
-              returnedPorts.add(i);
-              i++;
-            }
-            if (returnedPorts.size() >= requestedPorts) {
-              return returnedPorts;
-            }
-          }
-        }
-      }
-    }
-    //this is actually an error condition - we did not have enough ports to use
-    //TODO (hokiegeek2) does this need error handling then?
-    return returnedPorts;
-  }
-  
-  private boolean isDefaultRole(Resource resource) {
-    return !resource.hasRole() || resource.getRole().equals("*");
-  }
-}
\ No newline at end of file


Reply via email to