Changes for merging MyriadExecutor with NodeManager. * NM command line generation is now done on the Myriad scheduler side. * Add two classes that generate NM command line. **DownloadNMExecutorCLGenImpl(downloads binaries) **NMExecutorCLGenImpl(assumes binaries already present) * Myriad Executor now runs as a YARN auxillary service
Tested by doing a flex up and running a YARN Teragen job. Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/1a2f8a05 Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/1a2f8a05 Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/1a2f8a05 Branch: refs/heads/master Commit: 1a2f8a051973652a2704bddc4720962b1c57b5e7 Parents: 96d6f02 Author: Swapnil Daingade <sdaing...@maprtech.com> Authored: Thu Jul 16 03:03:59 2015 -0700 Committer: Swapnil Daingade <sdaing...@maprtech.com> Committed: Sat Aug 29 11:41:33 2015 -0700 ---------------------------------------------------------------------- myriad-executor/build.gradle | 4 +- .../ebay/myriad/executor/MyriadExecutor.java | 226 +------------------ .../executor/MyriadExecutorAuxService.java | 75 ++++++ .../scheduler/DownloadNMExecutorCLGenImpl.java | 101 +++++++++ .../scheduler/ExecutorCommandLineGenerator.java | 26 +++ .../myriad/scheduler/NMExecutorCLGenImpl.java | 196 ++++++++++++++++ .../com/ebay/myriad/scheduler/TaskFactory.java | 147 +++--------- .../scheduler/fgs/YarnNodeCapacityManager.java | 2 +- 8 files changed, 439 insertions(+), 338 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/1a2f8a05/myriad-executor/build.gradle ---------------------------------------------------------------------- diff --git a/myriad-executor/build.gradle b/myriad-executor/build.gradle index 1b4260d..4d41c64 100644 --- a/myriad-executor/build.gradle +++ b/myriad-executor/build.gradle @@ -2,6 +2,8 @@ dependencies { compile project(':myriad-commons') compile 'org.slf4j:slf4j-log4j12:1.7.7' + compile "org.apache.hadoop:hadoop-yarn-api:${hadoopVer}" + compile "org.apache.hadoop:hadoop-common:${hadoopVer}" } @@ -26,4 +28,4 @@ task capsule(type: Jar, dependsOn: jar) { } } -build.dependsOn capsule \ No newline at end of file +build.dependsOn capsule http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/1a2f8a05/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java ---------------------------------------------------------------------- diff --git a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java index b7e7ea4..633777b 100644 --- a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java +++ b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java @@ -15,108 +15,27 @@ */ package com.ebay.myriad.executor; -import com.google.gson.Gson; -import com.google.gson.JsonSyntaxException; import org.apache.mesos.Executor; import org.apache.mesos.ExecutorDriver; -import org.apache.mesos.MesosExecutorDriver; import org.apache.mesos.Protos.ExecutorInfo; import org.apache.mesos.Protos.FrameworkInfo; import org.apache.mesos.Protos.SlaveInfo; -import org.apache.mesos.Protos.Status; import org.apache.mesos.Protos.TaskID; import org.apache.mesos.Protos.TaskInfo; import org.apache.mesos.Protos.TaskState; import org.apache.mesos.Protos.TaskStatus; -import org.apache.mesos.Protos.TaskStatus.Builder; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; import java.nio.charset.Charset; -import java.nio.file.Paths; -import java.util.Map; /** * Myriad's Executor */ public class MyriadExecutor implements Executor { - private static final Logger LOGGER = LoggerFactory.getLogger(MyriadExecutor.class); - public static final String ENV_YARN_NODEMANAGER_OPTS = "YARN_NODEMANAGER_OPTS"; - - /** - * YARN container executor class. - */ - public static final String KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS = "yarn.nodemanager.container-executor.class"; - - // TODO (mohit): Should it be configurable ? - public static final String VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS = "org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor"; - - public static final String DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS = "org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor"; - - /** - * YARN class to help handle LCE resources - */ - public static final String KEY_YARN_NM_LCE_RH_CLASS = "yarn.nodemanager.linux-container-executor.resources-handler.class"; - - // TODO (mohit): Should it be configurable ? - public static final String VAL_YARN_NM_LCE_RH_CLASS = "org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler"; - - public static final String KEY_YARN_NM_LCE_CGROUPS_HIERARCHY = "yarn.nodemanager.linux-container-executor.cgroups.hierarchy"; - - public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT = "yarn.nodemanager.linux-container-executor.cgroups.mount"; - - public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH = "yarn.nodemanager.linux-container-executor.cgroups.mount-path"; - - public static final String KEY_YARN_NM_LCE_GROUP = "yarn.nodemanager.linux-container-executor.group"; - - public static final String KEY_YARN_NM_LCE_PATH = "yarn.nodemanager.linux-container-executor.path"; - - public static final String KEY_YARN_HOME = "yarn.home"; - - public static final String KEY_NM_RESOURCE_CPU_VCORES = "nodemanager.resource.cpu-vcores"; - - public static final String KEY_NM_RESOURCE_MEM_MB = "nodemanager.resource.memory-mb"; - - public static final String KEY_NM_ADDRESS = "myriad.yarn.nodemanager.address"; - - public static final String KEY_NM_LOCALIZER_ADDRESS = "myriad.yarn.nodemanager.localizer.address"; - - public static final String KEY_NM_WEBAPP_ADDRESS = "myriad.yarn.nodemanager.webapp.address"; - - public static final String KEY_NM_SHUFFLE_PORT = "myriad.mapreduce.shuffle.port"; - - - - /** - * Allot 10% more memory to account for JVM overhead. - */ - public static final double JVM_OVERHEAD = 0.1; - - /** - * Default -Xmx for executor JVM. - */ - - public static final double DEFAULT_JVM_MAX_MEMORY_MB = 256; - /** - * Default cpus for executor JVM. - */ - public static final double DEFAULT_CPUS = 0.2; - - public static final Gson GSON = new Gson(); - - private static final String PROPERTY_FORMAT = "-D%s=%s "; - - private SlaveInfo slaveInfo; - - private Process process; - public static void main(String[] args) throws Exception { - LOGGER.info("Starting MyriadExecutor..."); - MesosExecutorDriver driver = new MesosExecutorDriver(new MyriadExecutor()); - System.exit(driver.run() == Status.DRIVER_STOPPED ? 0 : 1); - } + private static final Logger LOGGER = LoggerFactory.getLogger(MyriadExecutor.class); @Override public void registered(ExecutorDriver driver, @@ -124,7 +43,6 @@ public class MyriadExecutor implements Executor { FrameworkInfo frameworkInfo, SlaveInfo slaveInfo) { LOGGER.debug("Registered ", executorInfo, " for framework ", frameworkInfo, " on mesos slave ", slaveInfo); - this.slaveInfo = slaveInfo; } @Override @@ -139,152 +57,28 @@ public class MyriadExecutor implements Executor { @Override public void launchTask(final ExecutorDriver driver, final TaskInfo task) { - // "task id beginning with "yarn" is a mesos task for yarn container - if (task.getTaskId().getValue().startsWith("yarn")) { - TaskStatus status = TaskStatus.newBuilder().setTaskId(task.getTaskId()) - .setState(TaskState.TASK_RUNNING).build(); - driver.sendStatusUpdate(status); - return; - } - - // Launch NM as a task and return status to framework - new Thread(new Runnable() { - public void run() { - TaskStatus.Builder statusBuilder = TaskStatus.newBuilder() - .setTaskId(task.getTaskId()); - try { - NMTaskConfig taskConfig = GSON.fromJson(task.getData().toStringUtf8(), NMTaskConfig.class); - LOGGER.info("TaskConfig: ", taskConfig); - ProcessBuilder processBuilder = buildProcessBuilder(task, taskConfig); - MyriadExecutor.this.process = processBuilder.start(); - - int waitFor = MyriadExecutor.this.process.waitFor(); - - if (waitFor == 0) { - statusBuilder.setState(TaskState.TASK_FINISHED); - } else { - statusBuilder.setState(TaskState.TASK_FAILED); - } - } catch (InterruptedException | IOException e) { - LOGGER.error("launchTask", e); - statusBuilder.setState(TaskState.TASK_FAILED); - } catch (RuntimeException e) { - LOGGER.error("launchTask", e); - statusBuilder.setState(TaskState.TASK_FAILED); - throw e; - } finally { - driver.sendStatusUpdate(statusBuilder.build()); - } - } - }).start(); - - TaskStatus status = TaskStatus.newBuilder() - .setTaskId(task.getTaskId()) - .setState(TaskState.TASK_RUNNING) - .build(); - driver.sendStatusUpdate(status); - } - - private ProcessBuilder buildProcessBuilder(TaskInfo task, NMTaskConfig taskConfig) { - ProcessBuilder processBuilder = new ProcessBuilder("bash", "-c", "$YARN_HOME/bin/yarn nodemanager"); - - Map<String, String> environment = processBuilder.environment(); - - Map<String, String> yarnEnvironmentMap = taskConfig.getYarnEnvironment(); - if (yarnEnvironmentMap != null) { - for (Map.Entry<String, String> yarnEnvironment : yarnEnvironmentMap.entrySet()) { - environment.put(yarnEnvironment.getKey(), yarnEnvironment.getValue()); - } - } - - String envNMOptions = getNMOpts(taskConfig); - LOGGER.info(ENV_YARN_NODEMANAGER_OPTS, ": ", envNMOptions); - - if (environment.containsKey(ENV_YARN_NODEMANAGER_OPTS)) { - String existingOpts = environment.get(ENV_YARN_NODEMANAGER_OPTS); - environment.put(ENV_YARN_NODEMANAGER_OPTS, existingOpts + " " + envNMOptions); - } else { - environment.put(ENV_YARN_NODEMANAGER_OPTS, envNMOptions); - } - - processBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT); - processBuilder.redirectError(ProcessBuilder.Redirect.INHERIT); - return processBuilder; - } - - private void makeWritable(String path) { - File file = new File(path); - if (!file.setWritable(true, false)) { - LOGGER.error(path, " is not writable"); - } - } - - private String getNMOpts(NMTaskConfig taskConfig) { - String envNMOptions = ""; - - // If cgroups are enabled then configure - if (taskConfig.getCgroups()) { - envNMOptions += String.format(PROPERTY_FORMAT, KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS, VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS); - envNMOptions += String.format(PROPERTY_FORMAT, KEY_YARN_NM_LCE_RH_CLASS, VAL_YARN_NM_LCE_RH_CLASS); - - String containerId = getContainerId(); - - makeWritable("/sys/fs/cgroup/cpu/mesos/" + containerId); - - // TODO: Configure hierarchy - envNMOptions += String.format(PROPERTY_FORMAT, KEY_YARN_NM_LCE_CGROUPS_HIERARCHY, "mesos/" + containerId); - envNMOptions += String.format(PROPERTY_FORMAT, KEY_YARN_NM_LCE_CGROUPS_MOUNT, "true"); - // TODO: Make it configurable - envNMOptions += String.format(PROPERTY_FORMAT, KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH, "/sys/fs/cgroup"); - envNMOptions += String.format(PROPERTY_FORMAT, KEY_YARN_NM_LCE_GROUP, "root"); - envNMOptions += String.format(PROPERTY_FORMAT, KEY_YARN_HOME, taskConfig.getYarnEnvironment().get("YARN_HOME")); - } else { - // Otherwise configure to use Default - envNMOptions += String.format(PROPERTY_FORMAT, KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS, DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS); - } - envNMOptions += String.format(PROPERTY_FORMAT, KEY_NM_RESOURCE_CPU_VCORES, taskConfig.getAdvertisableCpus() + ""); - envNMOptions += String.format(PROPERTY_FORMAT, KEY_NM_RESOURCE_MEM_MB, taskConfig.getAdvertisableMem() + ""); - envNMOptions += String.format(PROPERTY_FORMAT, KEY_NM_ADDRESS, "0.0.0.0:" + taskConfig.getRpcPort().toString() + ""); - envNMOptions += String.format(PROPERTY_FORMAT, KEY_NM_LOCALIZER_ADDRESS, "0.0.0.0:" + taskConfig.getLocalizerPort().toString() + ""); - envNMOptions += String.format(PROPERTY_FORMAT, KEY_NM_WEBAPP_ADDRESS, "0.0.0.0:" + taskConfig.gettWebAppHttpPort().toString() + ""); - envNMOptions += String.format(PROPERTY_FORMAT, KEY_NM_SHUFFLE_PORT, taskConfig.getShufflePort().toString() + ""); - - return envNMOptions; - } - - public String getContainerId() { - String cwd = Paths.get(".").toAbsolutePath().normalize().toString(); - String[] split = cwd.split("/"); - return split[split.length - 1]; + TaskStatus status = TaskStatus.newBuilder() + .setTaskId(task.getTaskId()) + .setState(TaskState.TASK_RUNNING) + .build(); + driver.sendStatusUpdate(status); } @Override public void killTask(ExecutorDriver driver, TaskID taskId) { LOGGER.debug("KillTask received for taskId: " + taskId.getValue()); - this.process.destroy(); + TaskStatus status = TaskStatus.newBuilder() .setTaskId(taskId) .setState(TaskState.TASK_KILLED) .build(); driver.sendStatusUpdate(status); + throw new RuntimeException("NodeManager shutdown after receiving" + + " KillTask for taskId " + taskId.getValue()); } @Override public void frameworkMessage(ExecutorDriver driver, byte[] data) { - // TODO(Santosh): Currently ContainerTaskStatusRequest is the only - // message a framework sends to the executor. - // Change this to handle other types of framework->executor messages. - try { - ContainerTaskStatusRequest request = GSON.fromJson(new String(data, Charset.defaultCharset()), ContainerTaskStatusRequest.class); - Builder statusBuilder = TaskStatus.newBuilder() - .setTaskId(TaskID.newBuilder().setValue(request.getMesosTaskId())) - .setState(TaskState.valueOf(request.getState())); - driver.sendStatusUpdate(statusBuilder.build()); - System.out.println("Sent out status update for task id: " + - request.getMesosTaskId() + ", status: " + request.getState()); - } catch (JsonSyntaxException jse) { - jse.printStackTrace(); // spit it out to stderr - } LOGGER.info("Framework message received: ", new String(data, Charset.defaultCharset())); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/1a2f8a05/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java ---------------------------------------------------------------------- diff --git a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java new file mode 100644 index 0000000..2c7d87d --- /dev/null +++ b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java @@ -0,0 +1,75 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package com.ebay.myriad.executor; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; +import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; +import org.apache.hadoop.yarn.server.api.AuxiliaryService; + +import org.apache.mesos.MesosExecutorDriver; +import org.apache.mesos.Protos.Status; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Auxillary service wrapper for MyriadExecutor + */ +public class MyriadExecutorAuxService extends AuxiliaryService { + + private static final Logger LOGGER = LoggerFactory.getLogger(MyriadExecutor.class); + private static final String SERVICE_NAME = "myriad_service"; + + protected MyriadExecutorAuxService() { + super(SERVICE_NAME); + } + + @Override + protected void serviceStart() throws Exception { + LOGGER.info("Starting MyriadExecutor..."); + + new Thread(new Runnable() { + public void run() { + MesosExecutorDriver driver = new MesosExecutorDriver(new MyriadExecutor()); + LOGGER.error("MyriadExecutor exit with status " + + Integer.toString(driver.run() == Status.DRIVER_STOPPED ? 0 : 1)); + } + }).start(); + } + + @Override + public void initializeApplication( + ApplicationInitializationContext initAppContext) { + LOGGER.debug("initializeApplication"); + } + + @Override + public void stopApplication(ApplicationTerminationContext stopAppContext) { + LOGGER.debug("stopApplication"); + } + + @Override + public ByteBuffer getMetaData() { + LOGGER.debug("getMetaData"); + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/1a2f8a05/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/DownloadNMExecutorCLGenImpl.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/DownloadNMExecutorCLGenImpl.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/DownloadNMExecutorCLGenImpl.java new file mode 100644 index 0000000..c70d96d --- /dev/null +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/DownloadNMExecutorCLGenImpl.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ebay.myriad.scheduler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.ebay.myriad.configuration.MyriadConfiguration; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; + +/** + * Implementation assumes NM binaries will be downloaded + */ +public class DownloadNMExecutorCLGenImpl extends NMExecutorCLGenImpl { + + private static final Logger LOGGER = LoggerFactory. + getLogger(DownloadNMExecutorCLGenImpl.class); + + private final String nodeManagerUri; + + public DownloadNMExecutorCLGenImpl(MyriadConfiguration cfg, NMProfile profile, + NMPorts ports, String nodeManagerUri) { + super(cfg, profile, ports); + this.nodeManagerUri = nodeManagerUri; + } + +@Override + public String generateCommandLine() { + + StringBuffer cmdLine = new StringBuffer(); + LOGGER.info("Using remote distribution"); + + generateEnvironment(); + appendNMExtractionCommands(cmdLine); + appendCgroupsCmds(cmdLine); + appendYarnHomeExport(cmdLine); + appendUser(cmdLine); + appendEnvForNM(cmdLine); + cmdLine.append(YARN_NM_CMD); + return cmdLine.toString(); + } + + private void appendNMExtractionCommands(StringBuffer cmdLine) { + /* + TODO(darinj): Overall this is messier than I'd like. We can't let mesos untar the distribution, since + it will change the permissions. Instead we simply download the tarball and execute tar -xvpf. We also + pull the config from the resource manager and put them in the conf dir. This is also why we need + frameworkSuperUser. This will be refactored after Mesos-1790 is resolved. + */ + + //TODO(DarinJ) support other compression, as this is a temp fix for Mesos 1760 may not get to it. + //Extract tarball keeping permissions, necessary to keep HADOOP_HOME/bin/container-executor suidbit set. + cmdLine.append("sudo tar -zxpf ").append(getFileName(nodeManagerUri)); + + //We need the current directory to be writable by frameworkUser for capsuleExecutor to create directories. + //Best to simply give owenership to the user running the executor but we don't want to use -R as this + //will silently remove the suid bit on container executor. + cmdLine.append(" && sudo chown ").append(cfg.getFrameworkUser().get()).append(" ."); + + //Place the hadoop config where in the HADOOP_CONF_DIR where it will be read by the NodeManager + //The url for the resource manager config is: http(s)://hostname:port/conf so fetcher.cpp downloads the + //config file to conf, It's an xml file with the parameters of yarn-site.xml, core-site.xml and hdfs.xml. + cmdLine.append(" && cp conf ") + .append(cfg.getYarnEnvironment().get("YARN_HOME")) + .append("/etc/hadoop/yarn-site.xml;"); + } + + private void appendUser(StringBuffer cmdLine) { + cmdLine.append(" sudo -E -u ").append(cfg.getFrameworkUser().get()).append(" -H"); + } + + private static String getFileName(String uri) { + int lastSlash = uri.lastIndexOf('/'); + if (lastSlash == -1) { + return uri; + } else { + String fileName = uri.substring(lastSlash + 1); + Preconditions.checkArgument(!Strings.isNullOrEmpty(fileName), + "URI should not have a slash at the end"); + return fileName; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/1a2f8a05/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExecutorCommandLineGenerator.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExecutorCommandLineGenerator.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExecutorCommandLineGenerator.java new file mode 100644 index 0000000..82e9d0e --- /dev/null +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/ExecutorCommandLineGenerator.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ebay.myriad.scheduler; + +/** + * Interface to plugin multiple implementations for executor command generation + */ +public interface ExecutorCommandLineGenerator { + String generateCommandLine(); +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/1a2f8a05/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java new file mode 100644 index 0000000..572c359 --- /dev/null +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/NMExecutorCLGenImpl.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ebay.myriad.scheduler; + +import java.util.Map; +import java.util.HashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.ebay.myriad.configuration.MyriadConfiguration; + +/** + * Implementation assumes NM binaries already deployed + */ +public class NMExecutorCLGenImpl implements ExecutorCommandLineGenerator { + + private static final Logger LOGGER = LoggerFactory.getLogger(NMExecutorCLGenImpl.class); + + public static final String ENV_YARN_NODEMANAGER_OPTS = + "YARN_NODEMANAGER_OPTS"; + public static final String KEY_YARN_NM_CGROUPS_PATH = + "yarn.nodemanager.cgroups.path"; + public static final String KEY_YARN_RM_HOSTNAME = + "yarn.resourcemanager.hostname"; + + /** + * YARN container executor class. + */ + public static final String KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS = + "yarn.nodemanager.container-executor.class"; + // TODO (mohit): Should it be configurable ? + public static final String VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS = + "org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor"; + public static final String DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS = + "org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor"; + + /** + * YARN class to help handle LCE resources + */ + public static final String KEY_YARN_NM_LCE_RH_CLASS = + "yarn.nodemanager.linux-container-executor.resources-handler.class"; + + // TODO (mohit): Should it be configurable ? + public static final String VAL_YARN_NM_LCE_RH_CLASS = + "org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler"; + public static final String KEY_YARN_NM_LCE_CGROUPS_HIERARCHY = + "yarn.nodemanager.linux-container-executor.cgroups.hierarchy"; + public static final String VAL_YARN_NM_LCE_CGROUPS_HIERARCHY = + "mesos/$TASK_DIR"; + public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT = + "yarn.nodemanager.linux-container-executor.cgroups.mount"; + public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH = + "yarn.nodemanager.linux-container-executor.cgroups.mount-path"; + public static final String VAL_YARN_NM_LCE_CGROUPS_MOUNT_PATH = "/sys/fs/cgroup"; + public static final String KEY_YARN_NM_LCE_GROUP = + "yarn.nodemanager.linux-container-executor.group"; + public static final String KEY_YARN_NM_LCE_PATH = + "yarn.nodemanager.linux-container-executor.path"; + public static final String KEY_YARN_HOME = "yarn.home"; + public static final String KEY_NM_RESOURCE_CPU_VCORES = + "nodemanager.resource.cpu-vcores"; + public static final String KEY_NM_RESOURCE_MEM_MB = + "nodemanager.resource.memory-mb"; + public static final String YARN_NM_CMD = + " $YARN_HOME/bin/yarn nodemanager"; + public static final String KEY_NM_ADDRESS = "myriad.yarn.nodemanager.address"; + public static final String KEY_NM_LOCALIZER_ADDRESS = + "myriad.yarn.nodemanager.localizer.address"; + public static final String KEY_NM_WEBAPP_ADDRESS = + "myriad.yarn.nodemanager.webapp.address"; + public static final String KEY_NM_SHUFFLE_PORT = + "myriad.mapreduce.shuffle.port"; + + private static final String ALL_LOCAL_IPV4ADDR = "0.0.0.0:"; + private static final String PROPERTY_FORMAT = "-D%s=%s"; + + private Map<String, String> environment = new HashMap<>(); + protected MyriadConfiguration cfg; + private NMProfile profile; + private NMPorts ports; + + public NMExecutorCLGenImpl(MyriadConfiguration cfg, NMProfile profile, + NMPorts ports) { + this.cfg = cfg; + this.profile = profile; + this.ports = ports; + } + + @Override + public String generateCommandLine() { + StringBuffer cmdLine = new StringBuffer(); + + generateEnvironment(); + appendCgroupsCmds(cmdLine); + appendYarnHomeExport(cmdLine); + appendEnvForNM(cmdLine); + cmdLine.append(YARN_NM_CMD); + return cmdLine.toString(); + } + + protected void generateEnvironment() { + //yarnEnvironemnt configuration from yaml file + Map<String, String> yarnEnvironmentMap = cfg.getYarnEnvironment(); + if (yarnEnvironmentMap != null) { + environment.putAll(yarnEnvironmentMap); + } + + String rmHostName = System.getProperty(KEY_YARN_RM_HOSTNAME); + if (rmHostName != null && !rmHostName.isEmpty()) { + addYarnNodemanagerOpt(KEY_YARN_RM_HOSTNAME, rmHostName); + } + + if (cfg.getNodeManagerConfiguration().getCgroups().or(Boolean.FALSE)) { + addYarnNodemanagerOpt(KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS, + VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS); + addYarnNodemanagerOpt(KEY_YARN_NM_LCE_RH_CLASS, VAL_YARN_NM_LCE_RH_CLASS); + + // TODO: Configure hierarchy + addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_HIERARCHY, + VAL_YARN_NM_LCE_CGROUPS_HIERARCHY); + addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_MOUNT, "true"); + // TODO: Make it configurable + addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH, + VAL_YARN_NM_LCE_CGROUPS_MOUNT_PATH); + addYarnNodemanagerOpt(KEY_YARN_NM_LCE_GROUP, "root"); + if (environment.containsKey("YARN_HOME")) { + addYarnNodemanagerOpt(KEY_YARN_HOME, environment.get("YARN_HOME")); + } + } else { + // Otherwise configure to use Default + addYarnNodemanagerOpt(KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS, + DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS); + } + addYarnNodemanagerOpt(KEY_NM_RESOURCE_CPU_VCORES, + Integer.toString(profile.getCpus().intValue())); + addYarnNodemanagerOpt(KEY_NM_RESOURCE_MEM_MB, + Integer.toString(profile.getMemory().intValue())); + addYarnNodemanagerOpt(KEY_NM_ADDRESS, ALL_LOCAL_IPV4ADDR + + Long.valueOf(ports.getRpcPort()).toString()); + addYarnNodemanagerOpt(KEY_NM_LOCALIZER_ADDRESS, + ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getLocalizerPort()).toString()); + addYarnNodemanagerOpt(KEY_NM_WEBAPP_ADDRESS, + ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getWebAppHttpPort()).toString()); + addYarnNodemanagerOpt(KEY_NM_SHUFFLE_PORT, + ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getShufflePort()).toString()); + } + + protected void appendEnvForNM(StringBuffer cmdLine) { + cmdLine.append(" env "); + for (Map.Entry<String, String> env : environment.entrySet()) { + cmdLine.append(env.getKey()).append("=").append("\"") + .append(env.getValue()).append("\" "); + } + } + + protected void appendCgroupsCmds(StringBuffer cmdLine) { + if (cfg.getNodeManagerConfiguration().getCgroups().or(Boolean.FALSE)) { + cmdLine.append(" export TASK_DIR=`basename $PWD`;"); + cmdLine.append(" chmod +x /sys/fs/cgroup/cpu/mesos/$TASK_DIR;"); + } + } + + protected void appendYarnHomeExport(StringBuffer cmdLine) { + if (environment.containsKey("YARN_HOME")) { + cmdLine.append(" export YARN_HOME=" + environment.get("YARN_HOME") + ";"); + } + } + + protected void addYarnNodemanagerOpt(String propertyName, String propertyValue) { + String envOpt = String.format(PROPERTY_FORMAT, propertyName, propertyValue); + if (environment.containsKey(ENV_YARN_NODEMANAGER_OPTS)) { + String existingOpts = environment.get(ENV_YARN_NODEMANAGER_OPTS); + environment.put(ENV_YARN_NODEMANAGER_OPTS, existingOpts + " " + envOpt); + } else { + environment.put(ENV_YARN_NODEMANAGER_OPTS, envOpt); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/1a2f8a05/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java index 723d4cf..4c51c93 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskFactory.java @@ -2,13 +2,8 @@ package com.ebay.myriad.scheduler; import com.ebay.myriad.configuration.MyriadConfiguration; import com.ebay.myriad.configuration.MyriadExecutorConfiguration; -import com.ebay.myriad.configuration.NodeManagerConfiguration; -import com.ebay.myriad.executor.NMTaskConfig; import com.ebay.myriad.state.NodeTask; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.gson.Gson; -import com.google.protobuf.ByteString; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.mesos.Protos.CommandInfo; import org.apache.mesos.Protos.CommandInfo.URI; @@ -25,7 +20,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.inject.Inject; -import java.nio.charset.Charset; import java.util.HashSet; import java.util.Iterator; import java.util.Objects; @@ -41,7 +35,7 @@ public interface TaskFactory { // yarn containers (for fine grained scaling). // If mesos supports just specifying the 'ExecutorId' without the full // ExecutorInfo, we wouldn't need this interface method. - ExecutorInfo getExecutorInfoForSlave(SlaveID slave); + ExecutorInfo getExecutorInfoForSlave(SlaveID slave, CommandInfo commandInfo); /** * Creates TaskInfo objects to launch NMs as mesos tasks. @@ -71,11 +65,11 @@ public interface TaskFactory { HashSet<Long> ports = new HashSet<>(); for (Resource resource : offer.getResourcesList()){ if (resource.getName().equals("ports")){ - /* - ranges.getRangeList() returns a list of ranges, each range specifies a begin and end only. - so must loop though each range until we get all ports needed. We exit each loop as soon as all - ports are found so bounded by NMPorts.expectedNumPorts. - */ + /* + ranges.getRangeList() returns a list of ranges, each range specifies a begin and end only. + so must loop though each range until we get all ports needed. We exit each loop as soon as all + ports are found so bounded by NMPorts.expectedNumPorts. + */ Iterator<Value.Range> itr = resource.getRanges().getRangeList().iterator(); while (itr.hasNext() && ports.size() < NMPorts.expectedNumPorts()) { Value.Range range = itr.next(); @@ -95,18 +89,6 @@ public interface TaskFactory { return new NMPorts(portArray); } - private static String getFileName(String uri) { - int lastSlash = uri.lastIndexOf('/'); - if (lastSlash == -1) { - return uri; - } else { - String fileName = uri.substring(lastSlash + 1); - Preconditions.checkArgument(!Strings.isNullOrEmpty(fileName), - "URI should not have a slash at the end"); - return fileName; - } - } - private String getConfigurationUrl() { YarnConfiguration conf = new YarnConfiguration(); String httpPolicy = conf.get(YARN_HTTP_POLICY); @@ -125,82 +107,40 @@ public interface TaskFactory { } } - private CommandInfo getCommandInfo() { + private CommandInfo getCommandInfo(NMProfile profile, NMPorts ports) { MyriadExecutorConfiguration myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration(); CommandInfo.Builder commandInfo = CommandInfo.newBuilder(); - if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) { - /* - TODO(darinj): Overall this is messier than I'd like. We can't let mesos untar the distribution, since - it will change the permissions. Instead we simply download the tarball and execute tar -xvpf. We also - pull the config from the resource manager and put them in the conf dir. This is also why we need - frameworkSuperUser. This will be refactored after Mesos-1790 is resolved. - */ + ExecutorCommandLineGenerator clGenerator; + String cmd; + if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) { //Both FrameworkUser and FrameworkSuperuser to get all of the directory permissions correct. if (!(cfg.getFrameworkUser().isPresent() && cfg.getFrameworkSuperUser().isPresent())) { throw new RuntimeException("Trying to use remote distribution, but frameworkUser" + - "and/or frameworkSuperUser not set!"); + "and/or frameworkSuperUser not set!"); } - - LOGGER.info("Using remote distribution"); - - String nmURIString = myriadExecutorConfiguration.getNodeManagerUri().get(); - - //TODO(DarinJ) support other compression, as this is a temp fix for Mesos 1760 may not get to it. - //Extract tarball keeping permissions, necessary to keep HADOOP_HOME/bin/container-executor suidbit set. - String tarCmd = "sudo tar -zxpf " + getFileName(nmURIString); - - //We need the current directory to be writable by frameworkUser for capsuleExecutor to create directories. - //Best to simply give owenership to the user running the executor but we don't want to use -R as this - //will silently remove the suid bit on container executor. - String chownCmd = "sudo chown " + cfg.getFrameworkUser().get() + " ."; - - //Place the hadoop config where in the HADOOP_CONF_DIR where it will be read by the NodeManager - //The url for the resource manager config is: http(s)://hostname:port/conf so fetcher.cpp downloads the - //config file to conf, It's an xml file with the parameters of yarn-site.xml, core-site.xml and hdfs.xml. - String configCopyCmd = "cp conf " + cfg.getYarnEnvironment().get("YARN_HOME") + - "/etc/hadoop/yarn-site.xml"; - - //Command to run the executor - String executorPathString = myriadExecutorConfiguration.getPath(); - String executorCmd = "export CAPSULE_CACHE_DIR=`pwd`;echo $CAPSULE_CACHE_DIR; " + - "sudo -E -u " + cfg.getFrameworkUser().get() + " -H " + - "java -Dcapsule.log=verbose -jar " + getFileName(executorPathString); - - //Concatenate all the subcommands - String cmd = tarCmd + "&&" + chownCmd + "&&" + configCopyCmd + "&&" + executorCmd; + String nodeManagerUri = myriadExecutorConfiguration.getNodeManagerUri().get(); + clGenerator = new DownloadNMExecutorCLGenImpl(cfg, profile, ports, nodeManagerUri); + cmd = clGenerator.generateCommandLine(); //get the nodemanagerURI //We're going to extract ourselves, so setExtract is false - LOGGER.info("Getting Hadoop distribution from:" + nmURIString); - URI nmUri = URI.newBuilder().setValue(nmURIString).setExtract(false) - .build(); + LOGGER.info("Getting Hadoop distribution from:" + nodeManagerUri); + URI nmUri = URI.newBuilder().setValue(nodeManagerUri).setExtract(false).build(); //get configs directly from resource manager String configUrlString = getConfigurationUrl(); LOGGER.info("Getting config from:" + configUrlString); URI configUri = URI.newBuilder().setValue(configUrlString) - .build(); - - //get the executor URI - LOGGER.info("Getting executor from:" + executorPathString); - URI executorUri = URI.newBuilder().setValue(executorPathString).setExecutable(true) - .build(); - + .build(); LOGGER.info("Slave will execute command:" + cmd); - commandInfo.addUris(nmUri).addUris(configUri).addUris(executorUri).setValue("echo \"" + cmd + "\";" + cmd); - + commandInfo.addUris(nmUri).addUris(configUri).setValue("echo \"" + cmd + "\";" + cmd); commandInfo.setUser(cfg.getFrameworkSuperUser().get()); } else { - String cmdPrefix = "export CAPSULE_CACHE_DIR=`pwd` ;" + - "echo $CAPSULE_CACHE_DIR; java -Dcapsule.log=verbose -jar "; - String executorPath = myriadExecutorConfiguration.getPath(); - String cmd = cmdPrefix + getFileName(executorPath); - URI executorURI = URI.newBuilder().setValue(executorPath) - .setExecutable(true).build(); - commandInfo.addUris(executorURI) - .setValue("echo \"" + cmd + "\";" + cmd); + clGenerator = new NMExecutorCLGenImpl(cfg, profile, ports); + cmd = clGenerator.generateCommandLine(); + commandInfo.setValue("echo \"" + cmd + "\";" + cmd); if (cfg.getFrameworkUser().isPresent()) { commandInfo.setUser(cfg.getFrameworkUser().get()); @@ -218,53 +158,21 @@ public interface TaskFactory { LOGGER.debug(ports.toString()); NMProfile profile = nodeTask.getProfile(); - NMTaskConfig nmTaskConfig = new NMTaskConfig(); - nmTaskConfig.setAdvertisableCpus(profile.getCpus()); - nmTaskConfig.setAdvertisableMem(profile.getMemory()); - NodeManagerConfiguration nodeManagerConfiguration = this.cfg.getNodeManagerConfiguration(); - nmTaskConfig.setJvmOpts(nodeManagerConfiguration.getJvmOpts().orNull()); - nmTaskConfig.setCgroups(nodeManagerConfiguration.getCgroups().or(Boolean.FALSE)); - nmTaskConfig.setRpcPort(ports.getRpcPort()); - nmTaskConfig.setLocalizerPort(ports.getLocalizerPort()); - nmTaskConfig.setWebAppHttpPort(ports.getWebAppHttpPort()); - nmTaskConfig.setShufflePort(ports.getShufflePort()); - nmTaskConfig.setYarnEnvironment(cfg.getYarnEnvironment()); - - // if RM's hostname is passed in as a system property, pass it along - // to Node Managers launched via Myriad - String rmHostName = System.getProperty(YARN_RESOURCEMANAGER_HOSTNAME); - if (rmHostName != null && !rmHostName.isEmpty()) { - - String nmOpts = nmTaskConfig.getYarnEnvironment().get(YARN_NODEMANAGER_OPTS_KEY); - if (nmOpts == null) { - nmOpts = ""; - } - nmOpts += " " + "-D" + YARN_RESOURCEMANAGER_HOSTNAME + "=" + rmHostName; - nmTaskConfig.getYarnEnvironment().put(YARN_NODEMANAGER_OPTS_KEY, nmOpts); - LOGGER.info(YARN_RESOURCEMANAGER_HOSTNAME + " is set to " + rmHostName + - " via YARN_RESOURCEMANAGER_OPTS. Passing it into YARN_NODEMANAGER_OPTS."); - } -// else { - // TODO(Santosh): Handle this case. Couple of options: - // 1. Lookup a hostname here and use it as "RM's hostname" - // 2. Abort here.. RM cannot start unless a hostname is passed in as it requires it to pass to NMs. - - String taskConfigJSON = new Gson().toJson(nmTaskConfig); - Scalar taskMemory = Scalar.newBuilder() .setValue(taskUtils.getTaskMemory(profile)) .build(); Scalar taskCpus = Scalar.newBuilder() .setValue(taskUtils.getTaskCpus(profile)) .build(); - ExecutorInfo executorInfo = getExecutorInfoForSlave(offer.getSlaveId()); + + CommandInfo commandInfo = getCommandInfo(profile, ports); + ExecutorInfo executorInfo = getExecutorInfoForSlave(offer.getSlaveId(), commandInfo); TaskInfo.Builder taskBuilder = TaskInfo.newBuilder() .setName("task-" + taskId.getValue()) .setTaskId(taskId) .setSlaveId(offer.getSlaveId()); - ByteString data = ByteString.copyFrom(taskConfigJSON.getBytes(Charset.defaultCharset())); return taskBuilder .addResources( Resource.newBuilder().setName("cpus") @@ -296,18 +204,17 @@ public interface TaskFactory { .setBegin(ports.getShufflePort()) .setEnd(ports.getShufflePort()) .build()))) - .setExecutor(executorInfo).setData(data).build(); + .setExecutor(executorInfo).build(); } @Override - public ExecutorInfo getExecutorInfoForSlave(SlaveID slave) { + public ExecutorInfo getExecutorInfoForSlave(SlaveID slave, + CommandInfo commandInfo) { Scalar executorMemory = Scalar.newBuilder() .setValue(taskUtils.getExecutorMemory()).build(); Scalar executorCpus = Scalar.newBuilder() .setValue(taskUtils.getExecutorCpus()).build(); - CommandInfo commandInfo = getCommandInfo(); - ExecutorID executorId = ExecutorID.newBuilder() .setValue(EXECUTOR_PREFIX + slave.getValue()) .build(); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/1a2f8a05/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java index 2784e03..497b43d 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java @@ -216,7 +216,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor { Protos.ExecutorInfo executorInfo = node.getExecInfo(); if (executorInfo == null) { executorInfo = Protos.ExecutorInfo.newBuilder( - taskFactory.getExecutorInfoForSlave(offer.getSlaveId())) + taskFactory.getExecutorInfoForSlave(offer.getSlaveId(), null)) .setFrameworkId(offer.getFrameworkId()).build(); node.setExecInfo(executorInfo); }