Changes for merging MyriadExecutor with NodeManager.

* NM command line generation is now done on the Myriad scheduler side.
* Add two classes that generate NM command line.
**DownloadNMExecutorCLGenImpl(downloads binaries)
**NMExecutorCLGenImpl(assumes binaries already present)
* Myriad Executor now runs as a YARN auxillary service

Tested by doing a flex up and running a YARN Teragen job.


Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/1a2f8a05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/1a2f8a05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/1a2f8a05

Branch: refs/heads/master
Commit: 1a2f8a051973652a2704bddc4720962b1c57b5e7
Parents: 96d6f02
Author: Swapnil Daingade <sdaing...@maprtech.com>
Authored: Thu Jul 16 03:03:59 2015 -0700
Committer: Swapnil Daingade <sdaing...@maprtech.com>
Committed: Sat Aug 29 11:41:33 2015 -0700

----------------------------------------------------------------------
 myriad-executor/build.gradle                    |   4 +-
 .../ebay/myriad/executor/MyriadExecutor.java    | 226 +------------------
 .../executor/MyriadExecutorAuxService.java      |  75 ++++++
 .../scheduler/DownloadNMExecutorCLGenImpl.java  | 101 +++++++++
 .../scheduler/ExecutorCommandLineGenerator.java |  26 +++
 .../myriad/scheduler/NMExecutorCLGenImpl.java   | 196 ++++++++++++++++
 .../com/ebay/myriad/scheduler/TaskFactory.java  | 147 +++---------
 .../scheduler/fgs/YarnNodeCapacityManager.java  |   2 +-
 8 files changed, 439 insertions(+), 338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/1a2f8a05/myriad-executor/build.gradle
----------------------------------------------------------------------
diff --git a/myriad-executor/build.gradle b/myriad-executor/build.gradle
index 1b4260d..4d41c64 100644
--- a/myriad-executor/build.gradle
+++ b/myriad-executor/build.gradle
@@ -2,6 +2,8 @@ dependencies {
     compile project(':myriad-commons')
     compile 'org.slf4j:slf4j-log4j12:1.7.7'
 
+    compile "org.apache.hadoop:hadoop-yarn-api:${hadoopVer}"
+    compile "org.apache.hadoop:hadoop-common:${hadoopVer}"
 }
 
 
@@ -26,4 +28,4 @@ task capsule(type: Jar, dependsOn: jar) {
     }
 }
 
-build.dependsOn capsule
\ No newline at end of file
+build.dependsOn capsule

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/1a2f8a05/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java
----------------------------------------------------------------------
diff --git 
a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java 
b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java
index b7e7ea4..633777b 100644
--- a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java
+++ b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java
@@ -15,108 +15,27 @@
  */
 package com.ebay.myriad.executor;
 
-import com.google.gson.Gson;
-import com.google.gson.JsonSyntaxException;
 import org.apache.mesos.Executor;
 import org.apache.mesos.ExecutorDriver;
-import org.apache.mesos.MesosExecutorDriver;
 import org.apache.mesos.Protos.ExecutorInfo;
 import org.apache.mesos.Protos.FrameworkInfo;
 import org.apache.mesos.Protos.SlaveInfo;
-import org.apache.mesos.Protos.Status;
 import org.apache.mesos.Protos.TaskID;
 import org.apache.mesos.Protos.TaskInfo;
 import org.apache.mesos.Protos.TaskState;
 import org.apache.mesos.Protos.TaskStatus;
-import org.apache.mesos.Protos.TaskStatus.Builder;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
 import java.nio.charset.Charset;
-import java.nio.file.Paths;
-import java.util.Map;
 
 /**
  * Myriad's Executor
  */
 public class MyriadExecutor implements Executor {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(MyriadExecutor.class);
-    public static final String ENV_YARN_NODEMANAGER_OPTS = 
"YARN_NODEMANAGER_OPTS";
-
-    /**
-     * YARN container executor class.
-     */
-    public static final String KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS = 
"yarn.nodemanager.container-executor.class";
-
-    // TODO (mohit): Should it be configurable ?
-    public static final String VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS = 
"org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor";
-
-    public static final String DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS = 
"org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor";
-
-    /**
-     * YARN class to help handle LCE resources
-     */
-    public static final String KEY_YARN_NM_LCE_RH_CLASS = 
"yarn.nodemanager.linux-container-executor.resources-handler.class";
-
-    // TODO (mohit): Should it be configurable ?
-    public static final String VAL_YARN_NM_LCE_RH_CLASS = 
"org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler";
-
-    public static final String KEY_YARN_NM_LCE_CGROUPS_HIERARCHY = 
"yarn.nodemanager.linux-container-executor.cgroups.hierarchy";
-
-    public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT = 
"yarn.nodemanager.linux-container-executor.cgroups.mount";
-
-    public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH = 
"yarn.nodemanager.linux-container-executor.cgroups.mount-path";
-
-    public static final String KEY_YARN_NM_LCE_GROUP = 
"yarn.nodemanager.linux-container-executor.group";
-
-    public static final String KEY_YARN_NM_LCE_PATH = 
"yarn.nodemanager.linux-container-executor.path";
-
-    public static final String KEY_YARN_HOME = "yarn.home";
-
-    public static final String KEY_NM_RESOURCE_CPU_VCORES = 
"nodemanager.resource.cpu-vcores";
-
-    public static final String KEY_NM_RESOURCE_MEM_MB = 
"nodemanager.resource.memory-mb";
-
-    public static final String KEY_NM_ADDRESS = 
"myriad.yarn.nodemanager.address";
-
-    public static final String KEY_NM_LOCALIZER_ADDRESS = 
"myriad.yarn.nodemanager.localizer.address";
-
-    public static final String KEY_NM_WEBAPP_ADDRESS = 
"myriad.yarn.nodemanager.webapp.address";
-
-    public static final String KEY_NM_SHUFFLE_PORT = 
"myriad.mapreduce.shuffle.port";
-
-
-
-    /**
-     * Allot 10% more memory to account for JVM overhead.
-     */
-    public static final double JVM_OVERHEAD = 0.1;
-
-    /**
-     * Default -Xmx for executor JVM.
-     */
-
-    public static final double DEFAULT_JVM_MAX_MEMORY_MB = 256;
-    /**
-     * Default cpus for executor JVM.
-     */
-    public static final double DEFAULT_CPUS = 0.2;
-
-    public static final Gson GSON = new Gson();
-
-    private static final String PROPERTY_FORMAT = "-D%s=%s ";
-
-    private SlaveInfo slaveInfo;
-
-    private Process process;
 
-    public static void main(String[] args) throws Exception {
-        LOGGER.info("Starting MyriadExecutor...");
-        MesosExecutorDriver driver = new MesosExecutorDriver(new 
MyriadExecutor());
-        System.exit(driver.run() == Status.DRIVER_STOPPED ? 0 : 1);
-    }
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MyriadExecutor.class);
 
     @Override
     public void registered(ExecutorDriver driver,
@@ -124,7 +43,6 @@ public class MyriadExecutor implements Executor {
                            FrameworkInfo frameworkInfo,
                            SlaveInfo slaveInfo) {
         LOGGER.debug("Registered ", executorInfo, " for framework ", 
frameworkInfo, " on mesos slave ", slaveInfo);
-        this.slaveInfo = slaveInfo;
     }
 
     @Override
@@ -139,152 +57,28 @@ public class MyriadExecutor implements Executor {
 
     @Override
     public void launchTask(final ExecutorDriver driver, final TaskInfo task) {
-        // "task id beginning with "yarn" is a mesos task for yarn container
-        if (task.getTaskId().getValue().startsWith("yarn")) {
-            TaskStatus status = 
TaskStatus.newBuilder().setTaskId(task.getTaskId())
-                .setState(TaskState.TASK_RUNNING).build();
-            driver.sendStatusUpdate(status);
-            return;
-        }
-
-        // Launch NM as a task and return status to framework
-        new Thread(new Runnable() {
-            public void run() {
-                TaskStatus.Builder statusBuilder = TaskStatus.newBuilder()
-                        .setTaskId(task.getTaskId());
-                try {
-                    NMTaskConfig taskConfig = 
GSON.fromJson(task.getData().toStringUtf8(), NMTaskConfig.class);
-                    LOGGER.info("TaskConfig: ", taskConfig);
-                    ProcessBuilder processBuilder = buildProcessBuilder(task, 
taskConfig);
-                    MyriadExecutor.this.process = processBuilder.start();
-
-                    int waitFor = MyriadExecutor.this.process.waitFor();
-
-                    if (waitFor == 0) {
-                        statusBuilder.setState(TaskState.TASK_FINISHED);
-                    } else {
-                        statusBuilder.setState(TaskState.TASK_FAILED);
-                    }
-                } catch (InterruptedException | IOException e) {
-                    LOGGER.error("launchTask", e);
-                    statusBuilder.setState(TaskState.TASK_FAILED);
-                } catch (RuntimeException e) {
-                    LOGGER.error("launchTask", e);
-                    statusBuilder.setState(TaskState.TASK_FAILED);
-                    throw e;
-                } finally {
-                    driver.sendStatusUpdate(statusBuilder.build());
-                }
-            }
-        }).start();
-
-        TaskStatus status = TaskStatus.newBuilder()
-                .setTaskId(task.getTaskId())
-                .setState(TaskState.TASK_RUNNING)
-                .build();
-        driver.sendStatusUpdate(status);
-    }
-
-    private ProcessBuilder buildProcessBuilder(TaskInfo task, NMTaskConfig 
taskConfig) {
-        ProcessBuilder processBuilder = new ProcessBuilder("bash", "-c", 
"$YARN_HOME/bin/yarn nodemanager");
-
-        Map<String, String> environment = processBuilder.environment();
-
-        Map<String, String> yarnEnvironmentMap = 
taskConfig.getYarnEnvironment();
-        if (yarnEnvironmentMap != null) {
-            for (Map.Entry<String, String> yarnEnvironment : 
yarnEnvironmentMap.entrySet()) {
-                environment.put(yarnEnvironment.getKey(), 
yarnEnvironment.getValue());
-            }
-        }
-
-        String envNMOptions = getNMOpts(taskConfig);
-        LOGGER.info(ENV_YARN_NODEMANAGER_OPTS, ": ", envNMOptions);
-
-        if (environment.containsKey(ENV_YARN_NODEMANAGER_OPTS)) {
-            String existingOpts = environment.get(ENV_YARN_NODEMANAGER_OPTS);
-            environment.put(ENV_YARN_NODEMANAGER_OPTS, existingOpts + " " + 
envNMOptions);
-        } else {
-            environment.put(ENV_YARN_NODEMANAGER_OPTS, envNMOptions);
-        }
-
-        processBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
-        processBuilder.redirectError(ProcessBuilder.Redirect.INHERIT);
-        return processBuilder;
-    }
-
-    private void makeWritable(String path) {
-        File file = new File(path);
-        if (!file.setWritable(true, false)) {
-            LOGGER.error(path, " is not writable");
-        }
-    }
-
-    private String getNMOpts(NMTaskConfig taskConfig) {
-        String envNMOptions = "";
-
-        // If cgroups are enabled then configure
-        if (taskConfig.getCgroups()) {
-            envNMOptions += String.format(PROPERTY_FORMAT, 
KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS, VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS);
-            envNMOptions += String.format(PROPERTY_FORMAT, 
KEY_YARN_NM_LCE_RH_CLASS, VAL_YARN_NM_LCE_RH_CLASS);
-
-            String containerId = getContainerId();
-
-            makeWritable("/sys/fs/cgroup/cpu/mesos/" + containerId);
-
-            // TODO: Configure hierarchy
-            envNMOptions += String.format(PROPERTY_FORMAT, 
KEY_YARN_NM_LCE_CGROUPS_HIERARCHY, "mesos/" + containerId);
-            envNMOptions += String.format(PROPERTY_FORMAT, 
KEY_YARN_NM_LCE_CGROUPS_MOUNT, "true");
-            // TODO: Make it configurable
-            envNMOptions += String.format(PROPERTY_FORMAT, 
KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH, "/sys/fs/cgroup");
-            envNMOptions += String.format(PROPERTY_FORMAT, 
KEY_YARN_NM_LCE_GROUP, "root");
-            envNMOptions += String.format(PROPERTY_FORMAT, KEY_YARN_HOME, 
taskConfig.getYarnEnvironment().get("YARN_HOME"));
-        } else {
-            // Otherwise configure to use Default
-            envNMOptions += String.format(PROPERTY_FORMAT, 
KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS, DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS);
-        }
-        envNMOptions += String.format(PROPERTY_FORMAT, 
KEY_NM_RESOURCE_CPU_VCORES, taskConfig.getAdvertisableCpus() + "");
-        envNMOptions += String.format(PROPERTY_FORMAT, KEY_NM_RESOURCE_MEM_MB, 
taskConfig.getAdvertisableMem() + "");
-        envNMOptions += String.format(PROPERTY_FORMAT, KEY_NM_ADDRESS, 
"0.0.0.0:" + taskConfig.getRpcPort().toString() + "");
-        envNMOptions += String.format(PROPERTY_FORMAT, 
KEY_NM_LOCALIZER_ADDRESS, "0.0.0.0:" + taskConfig.getLocalizerPort().toString() 
+ "");
-        envNMOptions += String.format(PROPERTY_FORMAT, KEY_NM_WEBAPP_ADDRESS, 
"0.0.0.0:" + taskConfig.gettWebAppHttpPort().toString() + "");
-        envNMOptions += String.format(PROPERTY_FORMAT, KEY_NM_SHUFFLE_PORT, 
taskConfig.getShufflePort().toString() + "");
-
-        return envNMOptions;
-    }
-
-    public String getContainerId() {
-        String cwd = Paths.get(".").toAbsolutePath().normalize().toString();
-        String[] split = cwd.split("/");
-        return split[split.length - 1];
+      TaskStatus status = TaskStatus.newBuilder()
+        .setTaskId(task.getTaskId())
+        .setState(TaskState.TASK_RUNNING)
+        .build();
+      driver.sendStatusUpdate(status);
     }
 
     @Override
     public void killTask(ExecutorDriver driver, TaskID taskId) {
         LOGGER.debug("KillTask received for taskId: " + taskId.getValue());
-        this.process.destroy();
+
         TaskStatus status = TaskStatus.newBuilder()
                 .setTaskId(taskId)
                 .setState(TaskState.TASK_KILLED)
                 .build();
         driver.sendStatusUpdate(status);
+        throw new RuntimeException("NodeManager shutdown after receiving" +
+          " KillTask for taskId " + taskId.getValue());
     }
 
     @Override
     public void frameworkMessage(ExecutorDriver driver, byte[] data) {
-        // TODO(Santosh): Currently ContainerTaskStatusRequest is the only
-        // message a framework sends to the executor.
-        // Change this to handle other types of framework->executor messages.
-        try {
-            ContainerTaskStatusRequest request = GSON.fromJson(new 
String(data, Charset.defaultCharset()), ContainerTaskStatusRequest.class);
-            Builder statusBuilder = TaskStatus.newBuilder()
-                
.setTaskId(TaskID.newBuilder().setValue(request.getMesosTaskId()))
-                .setState(TaskState.valueOf(request.getState()));
-            driver.sendStatusUpdate(statusBuilder.build());
-            System.out.println("Sent out status update for task id: " +
-                request.getMesosTaskId() + ", status: " + request.getState());
-        } catch (JsonSyntaxException jse) {
-            jse.printStackTrace(); // spit it out to stderr
-        }
         LOGGER.info("Framework message received: ", new String(data, 
Charset.defaultCharset()));
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/1a2f8a05/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java
----------------------------------------------------------------------
diff --git 
a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java
 
b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java
new file mode 100644
index 0000000..2c7d87d
--- /dev/null
+++ 
b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java
@@ -0,0 +1,75 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package com.ebay.myriad.executor;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
+import org.apache.hadoop.yarn.server.api.AuxiliaryService;
+
+import org.apache.mesos.MesosExecutorDriver;
+import org.apache.mesos.Protos.Status;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Auxillary service wrapper for MyriadExecutor 
+ */
+public class MyriadExecutorAuxService  extends AuxiliaryService {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MyriadExecutor.class);
+  private static final String SERVICE_NAME = "myriad_service";
+
+  protected MyriadExecutorAuxService() {
+    super(SERVICE_NAME);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    LOGGER.info("Starting MyriadExecutor...");
+
+    new Thread(new Runnable() {
+      public void run() {
+        MesosExecutorDriver driver = new MesosExecutorDriver(new 
MyriadExecutor());
+        LOGGER.error("MyriadExecutor exit with status " +
+        Integer.toString(driver.run() == Status.DRIVER_STOPPED ? 0 : 1));
+      }
+    }).start();
+  }
+
+  @Override
+  public void initializeApplication(
+    ApplicationInitializationContext initAppContext) {
+    LOGGER.debug("initializeApplication");
+  }
+
+  @Override
+  public void stopApplication(ApplicationTerminationContext stopAppContext) {
+    LOGGER.debug("stopApplication");
+  }
+
+  @Override
+  public ByteBuffer getMetaData() {
+    LOGGER.debug("getMetaData");
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/1a2f8a05/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/DownloadNMExecutorCLGenImpl.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/DownloadNMExecutorCLGenImpl.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/DownloadNMExecutorCLGenImpl.java
new file mode 100644
index 0000000..c70d96d
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/DownloadNMExecutorCLGenImpl.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ebay.myriad.scheduler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ebay.myriad.configuration.MyriadConfiguration;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+/**
+ * Implementation assumes NM binaries will be downloaded 
+ */
+public class DownloadNMExecutorCLGenImpl extends NMExecutorCLGenImpl {
+
+  private static final Logger LOGGER = LoggerFactory.
+    getLogger(DownloadNMExecutorCLGenImpl.class);
+
+  private final String nodeManagerUri;
+
+  public DownloadNMExecutorCLGenImpl(MyriadConfiguration cfg, NMProfile 
profile,
+    NMPorts ports, String nodeManagerUri) {
+    super(cfg, profile, ports);
+    this.nodeManagerUri = nodeManagerUri;
+  }
+
+@Override
+  public String generateCommandLine() {
+
+    StringBuffer cmdLine = new StringBuffer();
+    LOGGER.info("Using remote distribution");
+
+    generateEnvironment();
+    appendNMExtractionCommands(cmdLine);
+    appendCgroupsCmds(cmdLine);
+    appendYarnHomeExport(cmdLine);
+    appendUser(cmdLine);
+    appendEnvForNM(cmdLine);
+    cmdLine.append(YARN_NM_CMD);
+    return cmdLine.toString();
+  }
+
+  private void appendNMExtractionCommands(StringBuffer cmdLine) {
+    /*
+    TODO(darinj): Overall this is messier than I'd like. We can't let mesos 
untar the distribution, since
+    it will change the permissions.  Instead we simply download the tarball 
and execute tar -xvpf. We also
+    pull the config from the resource manager and put them in the conf dir.  
This is also why we need
+    frameworkSuperUser. This will be refactored after Mesos-1790 is resolved.
+   */
+
+    //TODO(DarinJ) support other compression, as this is a temp fix for Mesos 
1760 may not get to it.
+    //Extract tarball keeping permissions, necessary to keep 
HADOOP_HOME/bin/container-executor suidbit set.
+    cmdLine.append("sudo tar -zxpf ").append(getFileName(nodeManagerUri));
+
+    //We need the current directory to be writable by frameworkUser for 
capsuleExecutor to create directories.
+    //Best to simply give owenership to the user running the executor but we 
don't want to use -R as this
+    //will silently remove the suid bit on container executor.
+   cmdLine.append(" && sudo chown 
").append(cfg.getFrameworkUser().get()).append(" .");
+
+    //Place the hadoop config where in the HADOOP_CONF_DIR where it will be 
read by the NodeManager
+    //The url for the resource manager config is: http(s)://hostname:port/conf 
so fetcher.cpp downloads the
+    //config file to conf, It's an xml file with the parameters of 
yarn-site.xml, core-site.xml and hdfs.xml.
+    cmdLine.append(" && cp conf ")
+      .append(cfg.getYarnEnvironment().get("YARN_HOME"))
+      .append("/etc/hadoop/yarn-site.xml;");
+  }
+
+  private void appendUser(StringBuffer cmdLine) {
+    cmdLine.append(" sudo -E -u 
").append(cfg.getFrameworkUser().get()).append(" -H");
+  }
+
+  private static String getFileName(String uri) {
+    int lastSlash = uri.lastIndexOf('/');
+    if (lastSlash == -1) {
+        return uri;
+    } else {
+        String fileName = uri.substring(lastSlash + 1);
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(fileName),
+          "URI should not have a slash at the end");
+        return fileName;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/1a2f8a05/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExecutorCommandLineGenerator.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExecutorCommandLineGenerator.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExecutorCommandLineGenerator.java
new file mode 100644
index 0000000..82e9d0e
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExecutorCommandLineGenerator.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ebay.myriad.scheduler;
+
+/**
+ * Interface to plugin multiple implementations for executor command 
generation  
+ */
+public interface ExecutorCommandLineGenerator {
+    String generateCommandLine();
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/1a2f8a05/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java
new file mode 100644
index 0000000..572c359
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ebay.myriad.scheduler;
+
+import java.util.Map;
+import java.util.HashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ebay.myriad.configuration.MyriadConfiguration;
+
+/**
+ * Implementation assumes NM binaries already deployed 
+ */
+public class NMExecutorCLGenImpl implements ExecutorCommandLineGenerator {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(NMExecutorCLGenImpl.class);
+
+  public static final String ENV_YARN_NODEMANAGER_OPTS =
+    "YARN_NODEMANAGER_OPTS";
+  public static final String KEY_YARN_NM_CGROUPS_PATH =
+    "yarn.nodemanager.cgroups.path";
+  public static final String KEY_YARN_RM_HOSTNAME =
+    "yarn.resourcemanager.hostname";
+
+  /**
+   * YARN container executor class.
+   */
+  public static final String KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS =
+    "yarn.nodemanager.container-executor.class";
+  // TODO (mohit): Should it be configurable ?
+  public static final String VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS =
+    "org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor";
+  public static final String DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS =
+    "org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor";
+
+  /**
+   * YARN class to help handle LCE resources
+   */
+  public static final String KEY_YARN_NM_LCE_RH_CLASS =
+    "yarn.nodemanager.linux-container-executor.resources-handler.class";
+
+  // TODO (mohit): Should it be configurable ?
+  public static final String VAL_YARN_NM_LCE_RH_CLASS =
+    
"org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler";
+  public static final String KEY_YARN_NM_LCE_CGROUPS_HIERARCHY =
+    "yarn.nodemanager.linux-container-executor.cgroups.hierarchy";
+  public static final String VAL_YARN_NM_LCE_CGROUPS_HIERARCHY =
+    "mesos/$TASK_DIR";
+  public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT =
+    "yarn.nodemanager.linux-container-executor.cgroups.mount";
+  public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH =
+    "yarn.nodemanager.linux-container-executor.cgroups.mount-path";
+  public static final String VAL_YARN_NM_LCE_CGROUPS_MOUNT_PATH = 
"/sys/fs/cgroup";
+  public static final String KEY_YARN_NM_LCE_GROUP =
+    "yarn.nodemanager.linux-container-executor.group";
+  public static final String KEY_YARN_NM_LCE_PATH =
+    "yarn.nodemanager.linux-container-executor.path";
+  public static final String KEY_YARN_HOME = "yarn.home";
+  public static final String KEY_NM_RESOURCE_CPU_VCORES =
+    "nodemanager.resource.cpu-vcores";
+  public static final String KEY_NM_RESOURCE_MEM_MB =
+    "nodemanager.resource.memory-mb";
+  public static final String YARN_NM_CMD =
+      " $YARN_HOME/bin/yarn nodemanager";
+  public static final String KEY_NM_ADDRESS = 
"myriad.yarn.nodemanager.address";
+  public static final String KEY_NM_LOCALIZER_ADDRESS =
+    "myriad.yarn.nodemanager.localizer.address";
+  public static final String KEY_NM_WEBAPP_ADDRESS =
+    "myriad.yarn.nodemanager.webapp.address";
+  public static final String KEY_NM_SHUFFLE_PORT =
+    "myriad.mapreduce.shuffle.port";
+
+  private static final String ALL_LOCAL_IPV4ADDR =  "0.0.0.0:";
+  private static final String PROPERTY_FORMAT = "-D%s=%s";
+
+  private Map<String, String> environment = new HashMap<>();
+  protected MyriadConfiguration cfg;
+  private NMProfile profile;
+  private NMPorts ports;
+
+  public NMExecutorCLGenImpl(MyriadConfiguration cfg, NMProfile profile,
+    NMPorts ports) {
+    this.cfg = cfg;
+    this.profile = profile;
+    this.ports = ports;
+  }
+
+  @Override
+  public String generateCommandLine() {
+    StringBuffer cmdLine = new StringBuffer();
+
+    generateEnvironment();
+    appendCgroupsCmds(cmdLine);
+    appendYarnHomeExport(cmdLine);
+    appendEnvForNM(cmdLine);
+    cmdLine.append(YARN_NM_CMD);
+    return cmdLine.toString();
+  }
+
+  protected void generateEnvironment() {
+    //yarnEnvironemnt configuration from yaml file 
+    Map<String, String> yarnEnvironmentMap = cfg.getYarnEnvironment();
+    if (yarnEnvironmentMap != null) {
+      environment.putAll(yarnEnvironmentMap);
+    }
+
+    String rmHostName = System.getProperty(KEY_YARN_RM_HOSTNAME);
+    if (rmHostName != null && !rmHostName.isEmpty()) {
+      addYarnNodemanagerOpt(KEY_YARN_RM_HOSTNAME, rmHostName);
+    }
+
+    if (cfg.getNodeManagerConfiguration().getCgroups().or(Boolean.FALSE)) {
+      addYarnNodemanagerOpt(KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS,
+          VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS);
+      addYarnNodemanagerOpt(KEY_YARN_NM_LCE_RH_CLASS, 
VAL_YARN_NM_LCE_RH_CLASS);
+
+        // TODO: Configure hierarchy
+        addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_HIERARCHY,
+          VAL_YARN_NM_LCE_CGROUPS_HIERARCHY);
+        addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_MOUNT, "true");
+        // TODO: Make it configurable
+        addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH,
+          VAL_YARN_NM_LCE_CGROUPS_MOUNT_PATH);
+        addYarnNodemanagerOpt(KEY_YARN_NM_LCE_GROUP, "root");
+        if (environment.containsKey("YARN_HOME")) {
+          addYarnNodemanagerOpt(KEY_YARN_HOME, environment.get("YARN_HOME"));
+        }
+    } else {
+        // Otherwise configure to use Default
+      addYarnNodemanagerOpt(KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS,
+          DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS);
+    }
+    addYarnNodemanagerOpt(KEY_NM_RESOURCE_CPU_VCORES,
+      Integer.toString(profile.getCpus().intValue()));
+    addYarnNodemanagerOpt(KEY_NM_RESOURCE_MEM_MB,
+      Integer.toString(profile.getMemory().intValue()));
+    addYarnNodemanagerOpt(KEY_NM_ADDRESS, ALL_LOCAL_IPV4ADDR +
+      Long.valueOf(ports.getRpcPort()).toString());
+    addYarnNodemanagerOpt(KEY_NM_LOCALIZER_ADDRESS,
+      ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getLocalizerPort()).toString()); 
+    addYarnNodemanagerOpt(KEY_NM_WEBAPP_ADDRESS,
+      ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getWebAppHttpPort()).toString());
+    addYarnNodemanagerOpt(KEY_NM_SHUFFLE_PORT,
+      ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getShufflePort()).toString());
+  }
+
+  protected void appendEnvForNM(StringBuffer cmdLine) {
+    cmdLine.append(" env ");
+    for (Map.Entry<String, String> env : environment.entrySet()) {
+      cmdLine.append(env.getKey()).append("=").append("\"")
+          .append(env.getValue()).append("\" ");
+    }
+  }
+
+  protected void appendCgroupsCmds(StringBuffer cmdLine) { 
+    if (cfg.getNodeManagerConfiguration().getCgroups().or(Boolean.FALSE)) {
+      cmdLine.append(" export TASK_DIR=`basename $PWD`;");
+      cmdLine.append(" chmod +x /sys/fs/cgroup/cpu/mesos/$TASK_DIR;");
+    }
+  }
+
+  protected void appendYarnHomeExport(StringBuffer cmdLine) {
+    if (environment.containsKey("YARN_HOME")) {
+      cmdLine.append(" export YARN_HOME=" + environment.get("YARN_HOME") + 
";");
+    }
+  }
+
+  protected void addYarnNodemanagerOpt(String propertyName, String 
propertyValue) {
+    String envOpt = String.format(PROPERTY_FORMAT, propertyName, 
propertyValue);
+      if (environment.containsKey(ENV_YARN_NODEMANAGER_OPTS)) {
+          String existingOpts = environment.get(ENV_YARN_NODEMANAGER_OPTS);
+          environment.put(ENV_YARN_NODEMANAGER_OPTS, existingOpts + " " + 
envOpt);
+      } else {
+          environment.put(ENV_YARN_NODEMANAGER_OPTS, envOpt);
+      }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/1a2f8a05/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java
index 723d4cf..4c51c93 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java
@@ -2,13 +2,8 @@ package com.ebay.myriad.scheduler;
 
 import com.ebay.myriad.configuration.MyriadConfiguration;
 import com.ebay.myriad.configuration.MyriadExecutorConfiguration;
-import com.ebay.myriad.configuration.NodeManagerConfiguration;
-import com.ebay.myriad.executor.NMTaskConfig;
 import com.ebay.myriad.state.NodeTask;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.gson.Gson;
-import com.google.protobuf.ByteString;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.mesos.Protos.CommandInfo;
 import org.apache.mesos.Protos.CommandInfo.URI;
@@ -25,7 +20,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
-import java.nio.charset.Charset;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Objects;
@@ -41,7 +35,7 @@ public interface TaskFactory {
   // yarn containers (for fine grained scaling).
   // If mesos supports just specifying the 'ExecutorId' without the full
   // ExecutorInfo, we wouldn't need this interface method.
-  ExecutorInfo getExecutorInfoForSlave(SlaveID slave);
+  ExecutorInfo getExecutorInfoForSlave(SlaveID slave, CommandInfo commandInfo);
 
   /**
    * Creates TaskInfo objects to launch NMs as mesos tasks.
@@ -71,11 +65,11 @@ public interface TaskFactory {
       HashSet<Long> ports = new HashSet<>();
       for (Resource resource : offer.getResourcesList()){
         if (resource.getName().equals("ports")){
-                    /*
-                    ranges.getRangeList() returns a list of ranges, each range 
specifies a begin and end only.
-                    so must loop though each range until we get all ports 
needed.  We exit each loop as soon as all
-                    ports are found so bounded by NMPorts.expectedNumPorts.
-                    */
+          /*
+          ranges.getRangeList() returns a list of ranges, each range specifies 
a begin and end only.
+          so must loop though each range until we get all ports needed.  We 
exit each loop as soon as all
+          ports are found so bounded by NMPorts.expectedNumPorts.
+          */
           Iterator<Value.Range> itr = 
resource.getRanges().getRangeList().iterator();
           while (itr.hasNext() && ports.size() < NMPorts.expectedNumPorts()) {
             Value.Range range = itr.next();
@@ -95,18 +89,6 @@ public interface TaskFactory {
       return new NMPorts(portArray);
     }
 
-    private static String getFileName(String uri) {
-      int lastSlash = uri.lastIndexOf('/');
-      if (lastSlash == -1) {
-        return uri;
-      } else {
-        String fileName = uri.substring(lastSlash + 1);
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(fileName),
-            "URI should not have a slash at the end");
-        return fileName;
-      }
-    }
-
     private String getConfigurationUrl() {
       YarnConfiguration conf = new YarnConfiguration();
       String httpPolicy = conf.get(YARN_HTTP_POLICY);
@@ -125,82 +107,40 @@ public interface TaskFactory {
       }
     }
 
-    private CommandInfo getCommandInfo() {
+    private CommandInfo getCommandInfo(NMProfile profile, NMPorts ports) {
       MyriadExecutorConfiguration myriadExecutorConfiguration = 
cfg.getMyriadExecutorConfiguration();
       CommandInfo.Builder commandInfo = CommandInfo.newBuilder();
-      if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
-                /*
-                 TODO(darinj): Overall this is messier than I'd like. We can't 
let mesos untar the distribution, since
-                 it will change the permissions.  Instead we simply download 
the tarball and execute tar -xvpf. We also
-                 pull the config from the resource manager and put them in the 
conf dir.  This is also why we need
-                 frameworkSuperUser. This will be refactored after Mesos-1790 
is resolved.
-                */
+      ExecutorCommandLineGenerator clGenerator;
+      String cmd;
 
+      if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
         //Both FrameworkUser and FrameworkSuperuser to get all of the 
directory permissions correct.
         if (!(cfg.getFrameworkUser().isPresent() && 
cfg.getFrameworkSuperUser().isPresent())) {
           throw new RuntimeException("Trying to use remote distribution, but 
frameworkUser" +
-              "and/or frameworkSuperUser not set!");
+            "and/or frameworkSuperUser not set!");
         }
-
-        LOGGER.info("Using remote distribution");
-
-        String nmURIString = 
myriadExecutorConfiguration.getNodeManagerUri().get();
-
-        //TODO(DarinJ) support other compression, as this is a temp fix for 
Mesos 1760 may not get to it.
-        //Extract tarball keeping permissions, necessary to keep 
HADOOP_HOME/bin/container-executor suidbit set.
-        String tarCmd = "sudo tar -zxpf " + getFileName(nmURIString);
-
-        //We need the current directory to be writable by frameworkUser for 
capsuleExecutor to create directories.
-        //Best to simply give owenership to the user running the executor but 
we don't want to use -R as this
-        //will silently remove the suid bit on container executor.
-        String chownCmd = "sudo chown " + cfg.getFrameworkUser().get() + " .";
-
-        //Place the hadoop config where in the HADOOP_CONF_DIR where it will 
be read by the NodeManager
-        //The url for the resource manager config is: 
http(s)://hostname:port/conf so fetcher.cpp downloads the
-        //config file to conf, It's an xml file with the parameters of 
yarn-site.xml, core-site.xml and hdfs.xml.
-        String configCopyCmd = "cp conf " + 
cfg.getYarnEnvironment().get("YARN_HOME") +
-            "/etc/hadoop/yarn-site.xml";
-
-        //Command to run the executor
-        String executorPathString = myriadExecutorConfiguration.getPath();
-        String executorCmd = "export CAPSULE_CACHE_DIR=`pwd`;echo 
$CAPSULE_CACHE_DIR; " +
-            "sudo -E -u " + cfg.getFrameworkUser().get() + " -H " +
-            "java -Dcapsule.log=verbose -jar " + 
getFileName(executorPathString);
-
-        //Concatenate all the subcommands
-        String cmd = tarCmd + "&&" + chownCmd + "&&" + configCopyCmd + "&&" + 
executorCmd;
+        String nodeManagerUri = 
myriadExecutorConfiguration.getNodeManagerUri().get();
+        clGenerator = new DownloadNMExecutorCLGenImpl(cfg, profile, ports, 
nodeManagerUri);
+        cmd = clGenerator.generateCommandLine();
 
         //get the nodemanagerURI
         //We're going to extract ourselves, so setExtract is false
-        LOGGER.info("Getting Hadoop distribution from:" + nmURIString);
-        URI nmUri = URI.newBuilder().setValue(nmURIString).setExtract(false)
-            .build();
+        LOGGER.info("Getting Hadoop distribution from:" + nodeManagerUri);
+        URI nmUri = 
URI.newBuilder().setValue(nodeManagerUri).setExtract(false).build();
 
         //get configs directly from resource manager
         String configUrlString = getConfigurationUrl();
         LOGGER.info("Getting config from:" + configUrlString);
         URI configUri = URI.newBuilder().setValue(configUrlString)
-            .build();
-
-        //get the executor URI
-        LOGGER.info("Getting executor from:" + executorPathString);
-        URI executorUri = 
URI.newBuilder().setValue(executorPathString).setExecutable(true)
-            .build();
-
+          .build();
         LOGGER.info("Slave will execute command:" + cmd);
-        
commandInfo.addUris(nmUri).addUris(configUri).addUris(executorUri).setValue("echo
 \"" + cmd + "\";" + cmd);
-
+        commandInfo.addUris(nmUri).addUris(configUri).setValue("echo \"" + cmd 
+ "\";" + cmd);
         commandInfo.setUser(cfg.getFrameworkSuperUser().get());
 
       } else {
-        String cmdPrefix = "export CAPSULE_CACHE_DIR=`pwd` ;" +
-            "echo $CAPSULE_CACHE_DIR; java -Dcapsule.log=verbose -jar ";
-        String executorPath = myriadExecutorConfiguration.getPath();
-        String cmd = cmdPrefix + getFileName(executorPath);
-        URI executorURI = URI.newBuilder().setValue(executorPath)
-            .setExecutable(true).build();
-        commandInfo.addUris(executorURI)
-            .setValue("echo \"" + cmd + "\";" + cmd);
+        clGenerator = new NMExecutorCLGenImpl(cfg, profile, ports);
+        cmd = clGenerator.generateCommandLine();
+        commandInfo.setValue("echo \"" + cmd + "\";" + cmd);
 
         if (cfg.getFrameworkUser().isPresent()) {
           commandInfo.setUser(cfg.getFrameworkUser().get());
@@ -218,53 +158,21 @@ public interface TaskFactory {
       LOGGER.debug(ports.toString());
 
       NMProfile profile = nodeTask.getProfile();
-      NMTaskConfig nmTaskConfig = new NMTaskConfig();
-      nmTaskConfig.setAdvertisableCpus(profile.getCpus());
-      nmTaskConfig.setAdvertisableMem(profile.getMemory());
-      NodeManagerConfiguration nodeManagerConfiguration = 
this.cfg.getNodeManagerConfiguration();
-      nmTaskConfig.setJvmOpts(nodeManagerConfiguration.getJvmOpts().orNull());
-      
nmTaskConfig.setCgroups(nodeManagerConfiguration.getCgroups().or(Boolean.FALSE));
-      nmTaskConfig.setRpcPort(ports.getRpcPort());
-      nmTaskConfig.setLocalizerPort(ports.getLocalizerPort());
-      nmTaskConfig.setWebAppHttpPort(ports.getWebAppHttpPort());
-      nmTaskConfig.setShufflePort(ports.getShufflePort());
-      nmTaskConfig.setYarnEnvironment(cfg.getYarnEnvironment());
-
-      // if RM's hostname is passed in as a system property, pass it along
-      // to Node Managers launched via Myriad
-      String rmHostName = System.getProperty(YARN_RESOURCEMANAGER_HOSTNAME);
-      if (rmHostName != null && !rmHostName.isEmpty()) {
-
-        String nmOpts = 
nmTaskConfig.getYarnEnvironment().get(YARN_NODEMANAGER_OPTS_KEY);
-        if (nmOpts == null) {
-          nmOpts = "";
-        }
-        nmOpts += " " + "-D" + YARN_RESOURCEMANAGER_HOSTNAME + "=" + 
rmHostName;
-        nmTaskConfig.getYarnEnvironment().put(YARN_NODEMANAGER_OPTS_KEY, 
nmOpts);
-        LOGGER.info(YARN_RESOURCEMANAGER_HOSTNAME + " is set to " + rmHostName 
+
-            " via YARN_RESOURCEMANAGER_OPTS. Passing it into 
YARN_NODEMANAGER_OPTS.");
-      }
-//            else {
-      // TODO(Santosh): Handle this case. Couple of options:
-      // 1. Lookup a hostname here and use it as "RM's hostname"
-      // 2. Abort here.. RM cannot start unless a hostname is passed in as it 
requires it to pass to NMs.
-
-      String taskConfigJSON = new Gson().toJson(nmTaskConfig);
-
       Scalar taskMemory = Scalar.newBuilder()
           .setValue(taskUtils.getTaskMemory(profile))
           .build();
       Scalar taskCpus = Scalar.newBuilder()
           .setValue(taskUtils.getTaskCpus(profile))
           .build();
-      ExecutorInfo executorInfo = getExecutorInfoForSlave(offer.getSlaveId());
+
+      CommandInfo commandInfo = getCommandInfo(profile, ports);
+      ExecutorInfo executorInfo = getExecutorInfoForSlave(offer.getSlaveId(), 
commandInfo);
 
       TaskInfo.Builder taskBuilder = TaskInfo.newBuilder()
           .setName("task-" + taskId.getValue())
           .setTaskId(taskId)
           .setSlaveId(offer.getSlaveId());
 
-      ByteString data = 
ByteString.copyFrom(taskConfigJSON.getBytes(Charset.defaultCharset()));
       return taskBuilder
           .addResources(
               Resource.newBuilder().setName("cpus")
@@ -296,18 +204,17 @@ public interface TaskFactory {
                           .setBegin(ports.getShufflePort())
                           .setEnd(ports.getShufflePort())
                           .build())))
-          .setExecutor(executorInfo).setData(data).build();
+          .setExecutor(executorInfo).build();
     }
 
     @Override
-    public ExecutorInfo getExecutorInfoForSlave(SlaveID slave) {
+    public ExecutorInfo getExecutorInfoForSlave(SlaveID slave,
+      CommandInfo commandInfo) {
       Scalar executorMemory = Scalar.newBuilder()
           .setValue(taskUtils.getExecutorMemory()).build();
       Scalar executorCpus = Scalar.newBuilder()
           .setValue(taskUtils.getExecutorCpus()).build();
 
-      CommandInfo commandInfo = getCommandInfo();
-
       ExecutorID executorId = ExecutorID.newBuilder()
           .setValue(EXECUTOR_PREFIX + slave.getValue())
           .build();

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/1a2f8a05/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
index 2784e03..497b43d 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
@@ -216,7 +216,7 @@ public class YarnNodeCapacityManager extends 
BaseInterceptor {
         Protos.ExecutorInfo executorInfo = node.getExecInfo();
         if (executorInfo == null) {
             executorInfo = Protos.ExecutorInfo.newBuilder(
-                taskFactory.getExecutorInfoForSlave(offer.getSlaveId()))
+                taskFactory.getExecutorInfoForSlave(offer.getSlaveId(), null))
                 .setFrameworkId(offer.getFrameworkId()).build();
             node.setExecInfo(executorInfo);
         }


Reply via email to