Refactor which addresses Myriad 213, 214, and 136 in the process. -Refactored ExecutorCommandLineGenerator classes to use this class (resolves Myriad-214 in the process). -Refactor TackFactory classes as necessary to work with this. This adds a removes several methods to TaskUtils to get port Resources. -Created a ResourceOfferContainer moving the work of checking offers and creating the resources from them out of the ResourceOfferEventHandler and TaskFactory Classes. -Remove NMPorts and Ports classes. JIRA: [MYRIAD-136] https://issues.apache.org/jira/browse/MYRIAD-136 [MYRIAD-213] https://issues.apache.org/jira/browse/MYRIAD-213 [MYRIAD-214] https://issues.apache.org/jira/browse/MYRIAD-214 Pull Request: Closes #79
Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/7aea259c Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/7aea259c Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/7aea259c Branch: refs/heads/master Commit: 7aea259cf231655bdbd033f61584eec7715fc97f Parents: df4cbc0 Author: DarinJ <dar...@apache.org> Authored: Thu Jun 9 10:08:46 2016 -0400 Committer: darinj <dar...@apache.org> Committed: Fri Aug 12 09:27:14 2016 -0400 ---------------------------------------------------------------------- build.gradle | 4 + .../myriad/executor/MyriadExecutorDefaults.java | 23 -- .../src/main/java/org/apache/myriad/Main.java | 47 ++- .../java/org/apache/myriad/MyriadModule.java | 29 +- .../configuration/MyriadConfiguration.java | 15 +- .../configuration/NodeManagerConfiguration.java | 25 +- .../configuration/ServiceConfiguration.java | 6 +- .../scheduler/DownloadNMExecutorCLGenImpl.java | 90 ------ .../scheduler/ExecutorCommandLineGenerator.java | 148 ++++++++- .../scheduler/ExtendedResourceProfile.java | 27 +- .../myriad/scheduler/MyriadOperations.java | 2 +- .../myriad/scheduler/NMExecutorCLGenImpl.java | 176 ----------- .../NMExecutorCommandLineGenerator.java | 150 +++++++++ .../org/apache/myriad/scheduler/NMPorts.java | 78 ----- .../apache/myriad/scheduler/NMTaskFactory.java | 74 +++++ .../java/org/apache/myriad/scheduler/Ports.java | 26 -- .../scheduler/ServiceCommandLineGenerator.java | 86 +++++- .../scheduler/ServiceResourceProfile.java | 30 +- .../scheduler/ServiceTaskConstraints.java | 49 --- .../myriad/scheduler/ServiceTaskFactory.java | 76 +++++ .../scheduler/ServiceTaskFactoryImpl.java | 253 --------------- .../myriad/scheduler/TaskConstraints.java | 35 --- .../scheduler/TaskConstraintsManager.java | 48 --- .../apache/myriad/scheduler/TaskFactory.java | 304 +++++++------------ .../org/apache/myriad/scheduler/TaskUtils.java | 86 +----- .../handlers/ResourceOffersEventHandler.java | 175 ++--------- .../scheduler/resource/RangeResource.java | 218 +++++++++++++ .../resource/ResourceOfferContainer.java | 207 +++++++++++++ .../scheduler/resource/ScalarResource.java | 88 ++++++ .../scheduler/yarn/MyriadFairScheduler.java | 1 - .../org/apache/myriad/MyriadTestModule.java | 14 +- .../myriad/api/SchedulerStateResourceTest.java | 9 +- .../myriad/scheduler/MockSchedulerDriver.java | 17 ++ .../myriad/scheduler/MyriadDriverTest.java | 17 ++ .../myriad/scheduler/MyriadOperationsTest.java | 22 +- .../myriad/scheduler/NMProfileManagerTest.java | 17 ++ .../myriad/scheduler/SchedulerUtilsSpec.groovy | 6 +- .../scheduler/ServiceResourceProfileTest.java | 24 +- .../myriad/scheduler/TMSTaskFactoryImpl.java | 9 +- .../scheduler/TaskConstraintsManagerTest.java | 32 -- .../myriad/scheduler/TestNMTaskFactory.java | 72 +++++ .../myriad/scheduler/TestRandomPorts.java | 203 ------------- .../scheduler/TestServiceCommandLine.java | 97 ++++-- .../scheduler/TestServiceTaskFactory.java | 77 +++++ .../apache/myriad/scheduler/TestTaskUtils.java | 37 +-- .../fgs/YarnNodeCapacityManagerTest.java | 4 +- .../myriad/scheduler/offer/OfferBuilder.java | 117 +++++++ .../resource/TestResourceOfferContainer.java | 166 ++++++++++ .../org/apache/myriad/state/ClusterTest.java | 9 +- .../org/apache/myriad/state/NodeTaskTest.java | 9 +- .../apache/myriad/state/SchedulerStateTest.java | 5 +- .../state/utils/ByteBufferSupportTest.java | 5 +- ...iad-config-test-default-with-docker-info.yml | 1 + .../resources/myriad-config-test-default.yml | 32 +- 54 files changed, 1928 insertions(+), 1649 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 55e8d85..b48cfc9 100644 --- a/build.gradle +++ b/build.gradle @@ -21,6 +21,10 @@ allprojects { apply plugin: 'eclipse' } +tasks.withType(JavaCompile) { + options.compilerArgs << "-Xlint:unchecked" << "-Werror" +} + buildscript { repositories { jcenter() http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-commons/src/main/java/org/apache/myriad/executor/MyriadExecutorDefaults.java ---------------------------------------------------------------------- diff --git a/myriad-commons/src/main/java/org/apache/myriad/executor/MyriadExecutorDefaults.java b/myriad-commons/src/main/java/org/apache/myriad/executor/MyriadExecutorDefaults.java index c7e4515..250c812 100644 --- a/myriad-commons/src/main/java/org/apache/myriad/executor/MyriadExecutorDefaults.java +++ b/myriad-commons/src/main/java/org/apache/myriad/executor/MyriadExecutorDefaults.java @@ -36,29 +36,6 @@ public class MyriadExecutorDefaults { "org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor"; /** - * YARN class to help handle LCE resources - */ - public static final String KEY_YARN_NM_LCE_RH_CLASS = "yarn.nodemanager.linux-container-executor.resources-handler.class"; - - public static final String VAL_YARN_NM_LCE_RH_CLASS = "org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler"; - - public static final String KEY_YARN_NM_LCE_CGROUPS_HIERARCHY = "yarn.nodemanager.linux-container-executor.cgroups.hierarchy"; - - public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT = "yarn.nodemanager.linux-container-executor.cgroups.mount"; - - public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH = "yarn.nodemanager.linux-container-executor.cgroups.mount-path"; - - public static final String KEY_YARN_NM_LCE_GROUP = "yarn.nodemanager.linux-container-executor.group"; - - public static final String KEY_YARN_NM_LCE_PATH = "yarn.nodemanager.linux-container-executor.path"; - - public static final String KEY_YARN_HOME = "yarn.home"; - - public static final String KEY_NM_RESOURCE_CPU_VCORES = "nodemanager.resource.cpu-vcores"; - - public static final String KEY_NM_RESOURCE_MEM_MB = "nodemanager.resource.memory-mb"; - - /** * Allot 10% more memory to account for JVM overhead. */ public static final double JVM_OVERHEAD = 0.1; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/Main.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/Main.java b/myriad-scheduler/src/main/java/org/apache/myriad/Main.java index 0615ebd..8c028f1 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/Main.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/Main.java @@ -18,13 +18,17 @@ */ package org.apache.myriad; +import com.codahale.metrics.JmxReporter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.health.HealthCheckRegistry; +import com.google.inject.Guice; +import com.google.inject.Injector; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - import org.apache.commons.collections.MapUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -43,9 +47,6 @@ import org.apache.myriad.scheduler.NMProfile; import org.apache.myriad.scheduler.Rebalancer; import org.apache.myriad.scheduler.ServiceProfileManager; import org.apache.myriad.scheduler.ServiceResourceProfile; -import org.apache.myriad.scheduler.ServiceTaskConstraints; -import org.apache.myriad.scheduler.TaskConstraintsManager; -import org.apache.myriad.scheduler.TaskFactory; import org.apache.myriad.scheduler.TaskTerminator; import org.apache.myriad.scheduler.TaskUtils; import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry; @@ -55,21 +56,15 @@ import org.apache.myriad.webapp.WebAppGuiceModule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.JmxReporter; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.health.HealthCheckRegistry; -import com.google.inject.Guice; -import com.google.inject.Injector; - /** * Main is the bootstrap class for the Myriad scheduler, managing the lifecycles of * the following components: - * + * * 1. MyriadDriverManager * 2. MyriadWebServer * 3. TaskTerminator * 4. HealthCheckRegistry - * + * * Main uses the Guice Injector framework to manage the Myriad object graph and is * configured by myriad-config-default.yml */ @@ -87,36 +82,36 @@ public class Main { /** * Main is the bootstrap class for the Myriad scheduler, managing the lifecycles of * the following components: - * + * * 1. MyriadDriverManager * 2. MyriadWebServer * 3. TaskTerminator * 4. HealthCheckRegistry - * + * * Main uses the Guice Injector framework to manage the Myriad object graph and is * configured by myriad-config-default.yml */ public static void initialize(Configuration hadoopConf, AbstractYarnScheduler yarnScheduler, RMContext rmContext, - InterceptorRegistry registry) throws Exception { + InterceptorRegistry registry) throws Exception { MyriadModule myriadModule = new MyriadModule("myriad-config-default.yml", hadoopConf, yarnScheduler, rmContext, registry); MesosModule mesosModule = new MesosModule(); injector = Guice.createInjector(myriadModule, mesosModule, new WebAppGuiceModule()); new Main().run(injector.getInstance(MyriadConfiguration.class)); } - + // TODO (Kannan Rajah) Hack to get injector in unit test. public static Injector getInjector() { return injector; } - /** + /** * Initializes the Myriad object graph via MyriadConfiguration and starts * the Mesos interface (MyriadDriverManager) as well as all Myriad services - * + * *@param cfg MyriadConfiguration * @throws Exception - */ + */ public void run(MyriadConfiguration cfg) throws Exception { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Bindings: " + injector.getAllBindings()); @@ -171,8 +166,6 @@ public class Main { private void initProfiles(Injector injector) { LOGGER.info("Initializing Profiles"); ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class); - TaskConstraintsManager taskConstraintsManager = injector.getInstance(TaskConstraintsManager.class); - taskConstraintsManager.addTaskConstraints(NodeManagerConfiguration.NM_TASK_PREFIX, new TaskFactory.NMTaskConstraints()); Map<String, Map<String, String>> profiles = injector.getInstance(MyriadConfiguration.class).getProfiles(); TaskUtils taskUtils = injector.getInstance(TaskUtils.class); if (MapUtils.isNotEmpty(profiles)) { @@ -181,10 +174,8 @@ public class Main { if (MapUtils.isNotEmpty(profiles) && profileResourceMap.containsKey("cpu") && profileResourceMap.containsKey("mem")) { Long cpu = Long.parseLong(profileResourceMap.get("cpu")); Long mem = Long.parseLong(profileResourceMap.get("mem")); - - ServiceResourceProfile serviceProfile = new ExtendedResourceProfile(new NMProfile(profile.getKey(), cpu, mem), taskUtils.getExecutorCpus(), taskUtils.getExecutorMemory(), - taskUtils.getNodeManagerCpus(), taskUtils.getNodeManagerMemory()); - + ServiceResourceProfile serviceProfile = new ExtendedResourceProfile(new NMProfile(profile.getKey(), cpu, mem), + taskUtils.getNodeManagerCpus(), taskUtils.getNodeManagerMemory(), taskUtils.getNodeManagerPorts()); profileManager.add(serviceProfile); } else { LOGGER.error("Invalid definition for profile: " + profile.getKey()); @@ -259,7 +250,6 @@ public class Main { private void initServiceConfigurations(MyriadConfiguration cfg, Injector injector) { LOGGER.info("Initializing initServiceConfigurations"); ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class); - TaskConstraintsManager taskConstraintsManager = injector.getInstance(TaskConstraintsManager.class); Map<String, ServiceConfiguration> servicesConfigs = injector.getInstance(MyriadConfiguration.class).getServiceConfigurations(); @@ -268,9 +258,8 @@ public class Main { ServiceConfiguration config = entry.getValue(); final Double cpu = config.getCpus(); final Double mem = config.getJvmMaxMemoryMB(); - - profileManager.add(new ServiceResourceProfile(taskPrefix, cpu, mem)); - taskConstraintsManager.addTaskConstraints(taskPrefix, new ServiceTaskConstraints(cfg, taskPrefix)); + final Map<String, Long> ports = config.getPorts(); + profileManager.add(new ServiceResourceProfile(taskPrefix, cpu, mem, ports)); } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java b/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java index 8748dcb..bb560a4 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java @@ -32,22 +32,19 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.myriad.configuration.MyriadConfiguration; -import org.apache.myriad.configuration.MyriadExecutorConfiguration; import org.apache.myriad.configuration.NodeManagerConfiguration; import org.apache.myriad.configuration.ServiceConfiguration; import org.apache.myriad.policy.LeastAMNodesFirstPolicy; import org.apache.myriad.policy.NodeScaleDownPolicy; -import org.apache.myriad.scheduler.DownloadNMExecutorCLGenImpl; import org.apache.myriad.scheduler.ExecutorCommandLineGenerator; import org.apache.myriad.scheduler.MyriadDriverManager; -import org.apache.myriad.scheduler.NMExecutorCLGenImpl; +import org.apache.myriad.scheduler.NMExecutorCommandLineGenerator; import org.apache.myriad.scheduler.NMTaskFactoryAnnotation; import org.apache.myriad.scheduler.ReconcileService; import org.apache.myriad.scheduler.ServiceProfileManager; -import org.apache.myriad.scheduler.ServiceTaskFactoryImpl; -import org.apache.myriad.scheduler.TaskConstraintsManager; +import org.apache.myriad.scheduler.ServiceTaskFactory; import org.apache.myriad.scheduler.TaskFactory; -import org.apache.myriad.scheduler.TaskFactory.NMTaskFactoryImpl; +import org.apache.myriad.scheduler.NMTaskFactory; import org.apache.myriad.scheduler.fgs.NMHeartBeatHandler; import org.apache.myriad.scheduler.fgs.NodeStore; import org.apache.myriad.scheduler.fgs.OfferLifecycleManager; @@ -96,17 +93,16 @@ public class MyriadModule extends AbstractModule { bind(ReconcileService.class).in(Scopes.SINGLETON); bind(HttpConnectorProvider.class).in(Scopes.SINGLETON); bind(MyriadWebServer.class).in(Scopes.SINGLETON); - bind(TaskConstraintsManager.class).in(Scopes.SINGLETON); - // add special binding between TaskFactory and NMTaskFactoryImpl to ease up + // add special binding between TaskFactory and NMTaskFactory to ease up // usage of TaskFactory - bind(TaskFactory.class).annotatedWith(NMTaskFactoryAnnotation.class).to(NMTaskFactoryImpl.class); + bind(TaskFactory.class).annotatedWith(NMTaskFactoryAnnotation.class).to(NMTaskFactory.class); bind(YarnNodeCapacityManager.class).in(Scopes.SINGLETON); bind(NodeStore.class).in(Scopes.SINGLETON); bind(OfferLifecycleManager.class).in(Scopes.SINGLETON); bind(NMHeartBeatHandler.class).asEagerSingleton(); MapBinder<String, TaskFactory> mapBinder = MapBinder.newMapBinder(binder(), String.class, TaskFactory.class); - mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactoryImpl.class).in(Scopes.SINGLETON); + mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactory.class).in(Scopes.SINGLETON); Map<String, ServiceConfiguration> auxServicesConfigs = cfg.getServiceConfigurations(); for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) { @@ -119,8 +115,8 @@ public class MyriadModule extends AbstractModule { LOGGER.error("ClassNotFoundException", e); } } else { - //TODO (kjyost) Confirm if this else statement and logic should still be here - mapBinder.addBinding(entry.getKey()).to(ServiceTaskFactoryImpl.class).in(Scopes.SINGLETON); + //TODO (hokiegeek2) Confirm if this else statement and logic should still be here + mapBinder.addBinding(entry.getKey()).to(ServiceTaskFactory.class).in(Scopes.SINGLETON); } } @@ -163,14 +159,7 @@ public class MyriadModule extends AbstractModule { @Provides @Singleton ExecutorCommandLineGenerator providesCLIGenerator(MyriadConfiguration cfg) { - ExecutorCommandLineGenerator cliGenerator = null; - MyriadExecutorConfiguration myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration(); - if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) { - cliGenerator = new DownloadNMExecutorCLGenImpl(cfg, myriadExecutorConfiguration.getNodeManagerUri().get()); - } else { - cliGenerator = new NMExecutorCLGenImpl(cfg); - } - return cliGenerator; + return new NMExecutorCommandLineGenerator(cfg); } protected MyriadConfiguration generateMyriadConfiguration(String configFile) { http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java index fa8dca2..8a6e5f3 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java @@ -110,7 +110,12 @@ public class MyriadConfiguration { public static final String DEFAULT_ZK_SERVERS = "localhost:2181"; public static final String DEFAULT_CGROUP_PATH = "/sys/fs/cgroup"; - + + /** + * By default ha is turned off. + */ + public static final Boolean DEFAULT_CGROUPS_ENABLED = false; + public static final Map<String, ServiceConfiguration> EMPTY_SERVICE_CONFIGURATION = Collections.emptyMap(); @JsonProperty @@ -194,6 +199,8 @@ public class MyriadConfiguration { @JsonProperty private String cgroupPath; + @JsonProperty + private Boolean cgroupEnabled; public MyriadConfiguration() { } @@ -301,4 +308,8 @@ public class MyriadConfiguration { public String getCGroupPath() { return Optional.fromNullable(cgroupPath).or(DEFAULT_CGROUP_PATH); } -} \ No newline at end of file + + public Boolean isCgroupEnabled() { + return Optional.fromNullable(cgroupEnabled).or(DEFAULT_CGROUPS_ENABLED); + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java index e60a5d5..56ea43d 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java @@ -21,10 +21,18 @@ package org.apache.myriad.configuration; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; +import java.util.Map; +import java.util.TreeMap; + /** * YARN NodeManager Configuration */ public class NodeManagerConfiguration { + + public static final String KEY_NM_ADDRESS = "myriad.yarn.nodemanager.address"; + public static final String KEY_NM_LOCALIZER_ADDRESS = "myriad.yarn.nodemanager.localizer.address"; + public static final String KEY_NM_WEBAPP_ADDRESS = "myriad.yarn.nodemanager.webapp.address"; + public static final String KEY_NM_SHUFFLE_PORT = "myriad.mapreduce.shuffle.port"; /** * Allot 10% more memory to account for JVM overhead. */ @@ -68,7 +76,10 @@ public class NodeManagerConfiguration { */ @JsonProperty private String jvmOpts; - + + @JsonProperty + private Map<String, Long> ports; + /** * Determines if cgroups are enabled for the NodeManager */ @@ -91,6 +102,18 @@ public class NodeManagerConfiguration { return Optional.fromNullable(cpus).or(DEFAULT_NM_CPUS); } + public synchronized Map<String, Long> getPorts() { + if (ports == null) { + //Good idea to have deterministic order + ports = new TreeMap<>(); + ports.put(KEY_NM_ADDRESS, 0L); + ports.put(KEY_NM_WEBAPP_ADDRESS, 0L); + ports.put(KEY_NM_LOCALIZER_ADDRESS, 0L); + ports.put(KEY_NM_SHUFFLE_PORT, 0L); + } + return ports; + } + public boolean getCgroups() { return Optional.fromNullable(cgroups).or(DEFAULT_NM_CGROUPS); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java index 0f733a9..0b00bca 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java @@ -19,6 +19,7 @@ package org.apache.myriad.configuration; import java.util.Map; +import java.util.TreeMap; import org.hibernate.validator.constraints.NotEmpty; @@ -86,6 +87,7 @@ public class ServiceConfiguration { protected Integer maxInstances; @JsonProperty + @NotEmpty protected String command; @JsonProperty @@ -123,8 +125,8 @@ public class ServiceConfiguration { return envSettings; } - public Optional<Map<String, Long>> getPorts() { - return Optional.fromNullable(ports); + public Map<String, Long> getPorts() { + return Optional.fromNullable(ports).or(new TreeMap<String, Long>()); } public Optional<Integer> getMaxInstances() { http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java deleted file mode 100644 index 74deda3..0000000 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.myriad.scheduler; - -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import org.apache.myriad.configuration.MyriadConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implementation assumes NM binaries will be downloaded - */ -public class DownloadNMExecutorCLGenImpl extends NMExecutorCLGenImpl { - - private static final Logger LOGGER = LoggerFactory. - getLogger(DownloadNMExecutorCLGenImpl.class); - - private final String nodeManagerUri; - - public DownloadNMExecutorCLGenImpl(MyriadConfiguration cfg, String nodeManagerUri) { - super(cfg); - this.nodeManagerUri = nodeManagerUri; - } - - @Override - public String generateCommandLine(ServiceResourceProfile profile, Ports ports) { - StringBuilder cmdLine = new StringBuilder(); - LOGGER.info("Using remote distribution"); - generateEnvironment(profile, (NMPorts) ports); - appendDistroExtractionCommands(cmdLine); - appendCgroupsCmds(cmdLine); - appendYarnHomeExport(cmdLine); - appendUserSudo(cmdLine); - appendEnvForNM(cmdLine); - cmdLine.append(YARN_NM_CMD); - return cmdLine.toString(); - } - - protected void appendDistroExtractionCommands(StringBuilder cmdLine) { - /* - TODO(darinj): Overall this is messier than I'd like. We can't let mesos untar the distribution, since - it will change the permissions. Instead we simply download the tarball and execute tar -xvpf. We also - pull the config from the resource manager and put them in the conf dir. This is also why we need - frameworkSuperUser. This will be refactored after Mesos-1790 is resolved. - */ - - //TODO(DarinJ) support other compression, as this is a temp fix for Mesos 1760 may not get to it. - //Extract tarball keeping permissions, necessary to keep HADOOP_HOME/bin/container-executor suidbit set. - appendSudo(cmdLine); - cmdLine.append("tar -zxpf ").append(getFileName(nodeManagerUri)); - //Place the hadoop config where in the HADOOP_CONF_DIR where it will be read by the NodeManager - //The url for the resource manager config is: http(s)://hostname:port/conf so fetcher.cpp downloads the - //config file to conf, It's an xml file with the parameters of yarn-site.xml, core-site.xml and hdfs.xml. - cmdLine.append(" && "); - appendSudo(cmdLine); - cmdLine.append(" cp conf "); - cmdLine.append(cfg.getYarnEnvironment().get("YARN_HOME")); - cmdLine.append("/etc/hadoop/yarn-site.xml;"); - } - - private static String getFileName(String uri) { - int lastSlash = uri.lastIndexOf('/'); - if (lastSlash == -1) { - return uri; - } else { - String fileName = uri.substring(lastSlash + 1); - Preconditions.checkArgument(!Strings.isNullOrEmpty(fileName), "URI should not have a slash at the end"); - return fileName; - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExecutorCommandLineGenerator.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExecutorCommandLineGenerator.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExecutorCommandLineGenerator.java index 82782f2..c2a1c68 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExecutorCommandLineGenerator.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExecutorCommandLineGenerator.java @@ -19,11 +19,153 @@ package org.apache.myriad.scheduler; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.mesos.Protos; +import org.apache.myriad.configuration.MyriadConfiguration; +import org.apache.myriad.configuration.MyriadExecutorConfiguration; +import org.apache.myriad.configuration.ServiceConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + /** * Interface to plugin multiple implementations for executor command generation */ -public interface ExecutorCommandLineGenerator { - String generateCommandLine(ServiceResourceProfile profile, Ports ports); +public abstract class ExecutorCommandLineGenerator { + + protected static final Logger LOGGER = LoggerFactory.getLogger(NMExecutorCommandLineGenerator.class); + + public static final String KEY_YARN_RM_HOSTNAME = "yarn.resourcemanager.hostname"; + public static final String KEY_YARN_HOME = "yarn.home"; + + protected static final String ALL_LOCAL_IPV4ADDR = "0.0.0.0:"; + + protected static final String PROPERTY_FORMAT = " -D%s=%s "; + protected static final String CMD_FORMAT = "echo \"%1$s\" && %1$s"; + + protected Protos.CommandInfo staticCommandInfo; + + protected MyriadConfiguration myriadConfiguration; + protected MyriadExecutorConfiguration myriadExecutorConfiguration; + protected YarnConfiguration yarnConfiguration = new YarnConfiguration(); + + abstract Protos.CommandInfo generateCommandLine(ServiceResourceProfile profile, ServiceConfiguration serviceConfiguration, Collection<Long> ports); + + protected void appendDistroExtractionCommands(StringBuilder cmdLine) { + /* + TODO(darinj): Overall this is messier than I'd like. We can't let mesos untar the distribution, since + it will change the permissions. Instead we simply download the tarball and execute tar -xvpf. We also + pull the config from the resource manager and put them in the yarnConfiguration dir. This is also why we need + frameworkSuperUser. This will be refactored after Mesos-1790 is resolved. + */ + + //TODO(DarinJ) support other compression, as this is a temp fix for Mesos 1760 may not get to it. + if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) { + //Extract tarball keeping permissions, necessary to keep HADOOP_HOME/bin/container-executor suidbit set. + //If SudoUser not enable LinuxExecutor will not work + appendSudo(cmdLine); + cmdLine.append("tar -zxpf ").append(getFileName(myriadExecutorConfiguration.getNodeManagerUri().get())); + cmdLine.append(" && "); + //Place the hadoop config where in the HADOOP_CONF_DIR where it will be read by the NodeManager + //The url for the resource manager config is: http(s)://hostname:port/yarnConfiguration so fetcher.cpp downloads the + //config file to yarnConfiguration, It's an xml file with the parameters of yarn-site.xml, core-site.xml and hdfs.xml. + if (!myriadExecutorConfiguration.getConfigUri().isPresent()) { + appendSudo(cmdLine); + cmdLine.append(" cp yarnConfiguration "); + cmdLine.append(myriadConfiguration.getYarnEnvironment().get("YARN_HOME")); + cmdLine.append("/etc/hadoop/yarn-site.xml && "); + } + } + } + + protected void addJavaOpt(StringBuilder opts, String propertyName, String propertyValue) { + String envOpt = String.format(PROPERTY_FORMAT, propertyName, propertyValue); + opts.append(envOpt); + } + + protected void appendSudo(StringBuilder cmdLine) { + if (myriadConfiguration.getFrameworkSuperUser().isPresent()) { + cmdLine.append(" sudo "); + } + } + + protected void appendUserSudo(StringBuilder cmdLine) { + if (myriadConfiguration.getFrameworkSuperUser().isPresent()) { + cmdLine.append(" sudo -E -u "); + cmdLine.append(myriadConfiguration.getFrameworkUser().get()); + cmdLine.append(" -H "); + } + } + + public String getConfigurationUrl() { + String httpPolicy = yarnConfiguration.get(TaskFactory.YARN_HTTP_POLICY); + String address = StringUtils.EMPTY; + if (httpPolicy != null && httpPolicy.equals(TaskFactory.YARN_HTTP_POLICY_HTTPS_ONLY)) { + address = yarnConfiguration.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS); + if (StringUtils.isEmpty(address)) { + address = yarnConfiguration.get(TaskFactory.YARN_RESOURCEMANAGER_HOSTNAME) + ":8090"; + } + return "https://" + address + "/yarnConfiguration"; + } else { + address = yarnConfiguration.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_ADDRESS); + if (StringUtils.isEmpty(address)) { + address = yarnConfiguration.get(TaskFactory.YARN_RESOURCEMANAGER_HOSTNAME) + ":8088"; + } + return "http://" + address + "/yarnConfiguration"; + } + } + + private static String getFileName(String uri) { + int lastSlash = uri.lastIndexOf('/'); + if (lastSlash == -1) { + return uri; + } else { + String fileName = uri.substring(lastSlash + 1); + Preconditions.checkArgument(!Strings.isNullOrEmpty(fileName), "URI should not have a slash at the end"); + return fileName; + } + } + + protected String getUser() { + if (myriadConfiguration.getFrameworkSuperUser().isPresent()) { + return myriadConfiguration.getFrameworkSuperUser().get(); + } else { + return myriadConfiguration.getFrameworkUser().get(); + } + } - String getConfigurationUrl(); + protected List<Protos.CommandInfo.URI> getUris() { + List<Protos.CommandInfo.URI> uris = new ArrayList<>(); + if (myriadExecutorConfiguration.getJvmUri().isPresent()) { + final String jvmRemoteUri = myriadExecutorConfiguration.getJvmUri().get(); + LOGGER.info("Getting JRE distribution from:" + jvmRemoteUri); + uris.add(Protos.CommandInfo.URI.newBuilder().setValue(jvmRemoteUri).build()); + } + if (myriadExecutorConfiguration.getConfigUri().isPresent()) { + String configURI = myriadExecutorConfiguration.getConfigUri().get(); + LOGGER.info("Getting Hadoop configuration from: {}", configURI); + uris.add(Protos.CommandInfo.URI.newBuilder().setValue(configURI).build()); + } else if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) { + String configURI = getConfigurationUrl(); + LOGGER.info("Getting Hadoop configuration from: {}", configURI); + uris.add(Protos.CommandInfo.URI.newBuilder().setValue(configURI).build()); + } + if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) { + //Both FrameworkUser and FrameworkSuperuser to get all of the directory permissions correct. + if (!(myriadConfiguration.getFrameworkUser().isPresent() && myriadConfiguration.getFrameworkSuperUser().isPresent())) { + LOGGER.warn("Trying to use remote distribution, but frameworkUser and/or frameworkSuperUser not set!" + + "Some features may not work"); + } + String nodeManagerUri = myriadExecutorConfiguration.getNodeManagerUri().get(); + LOGGER.info("Getting Hadoop distribution from: {}", nodeManagerUri); + uris.add(Protos.CommandInfo.URI.newBuilder().setValue(nodeManagerUri).setExtract(false).build()); + } + return uris; + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExtendedResourceProfile.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExtendedResourceProfile.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExtendedResourceProfile.java index 6232258..59e2cb0 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExtendedResourceProfile.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExtendedResourceProfile.java @@ -20,6 +20,8 @@ package org.apache.myriad.scheduler; import com.google.gson.Gson; +import java.util.Map; + /** * Extended ServiceResourceProfile for services that need to pass set of resources downstream * currently the only such service is NodeManager @@ -33,33 +35,12 @@ public class ExtendedResourceProfile extends ServiceResourceProfile { * @param cpu * @param mem will throw NullPoiterException if childProfile is null */ - public ExtendedResourceProfile(NMProfile childProfile, Double cpu, Double mem, Double execCpu, Double execMemory) { - super(childProfile.getName(), cpu, mem, execCpu, execMemory); - + public ExtendedResourceProfile(NMProfile childProfile, Double cpu, Double mem, Map<String, Long> ports) { + super(childProfile.getName(), cpu, mem, ports); this.childProfile = childProfile; this.className = ExtendedResourceProfile.class.getName(); } - /** - * @param childProfile - should be null - * @param cpu - * @param mem will throw NullPoiterException if childProfile is null - */ - public ExtendedResourceProfile(NMProfile childProfile, Double cpu, Double mem) { - super(childProfile.getName(), cpu, mem); - - this.childProfile = childProfile; - this.className = ExtendedResourceProfile.class.getName(); - } - - public NMProfile getChildProfile() { - return childProfile; - } - - public void setChildProfile(NMProfile nmProfile) { - this.childProfile = nmProfile; - } - @Override public String getName() { return childProfile.getName(); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java index 99f7e78..fb1f1bb 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java @@ -136,7 +136,7 @@ public class MyriadOperations { Collection<NodeTask> nodes = new HashSet<>(); for (int i = 0; i < instances; i++) { - NodeTask nodeTask = new NodeTask(new ServiceResourceProfile(serviceName, cpu, mem), null); + NodeTask nodeTask = new NodeTask(new ServiceResourceProfile(serviceName, cpu, mem, auxTaskConf.getPorts()), null); nodeTask.setTaskPrefix(serviceName); nodes.add(nodeTask); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java deleted file mode 100644 index db945ec..0000000 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java +++ /dev/null @@ -1,176 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.myriad.scheduler; - -import java.util.HashMap; -import java.util.Map; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.myriad.configuration.MyriadConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implementation assumes NM binaries already deployed - */ -public class NMExecutorCLGenImpl implements ExecutorCommandLineGenerator { - - private static final Logger LOGGER = LoggerFactory.getLogger(NMExecutorCLGenImpl.class); - - public static final String ENV_YARN_NODEMANAGER_OPTS = "YARN_NODEMANAGER_OPTS"; - public static final String KEY_YARN_NM_CGROUPS_PATH = "yarn.nodemanager.cgroups.path"; - public static final String KEY_YARN_RM_HOSTNAME = "yarn.resourcemanager.hostname"; - - /** - * YARN class to help handle LCE resources - */ - // TODO (mohit): Should it be configurable ? - public static final String KEY_YARN_NM_LCE_CGROUPS_HIERARCHY = "yarn.nodemanager.linux-container-executor.cgroups.hierarchy"; - public static final String KEY_YARN_HOME = "yarn.home"; - public static final String KEY_NM_RESOURCE_CPU_VCORES = "nodemanager.resource.cpu-vcores"; - public static final String KEY_NM_RESOURCE_MEM_MB = "nodemanager.resource.memory-mb"; - public static final String YARN_NM_CMD = " $YARN_HOME/bin/yarn nodemanager"; - public static final String KEY_NM_ADDRESS = "myriad.yarn.nodemanager.address"; - public static final String KEY_NM_LOCALIZER_ADDRESS = "myriad.yarn.nodemanager.localizer.address"; - public static final String KEY_NM_WEBAPP_ADDRESS = "myriad.yarn.nodemanager.webapp.address"; - public static final String KEY_NM_SHUFFLE_PORT = "myriad.mapreduce.shuffle.port"; - - private static final String ALL_LOCAL_IPV4ADDR = "0.0.0.0:"; - private static final String PROPERTY_FORMAT = "-D%s=%s"; - - private Map<String, String> environment = new HashMap<>(); - protected MyriadConfiguration cfg; - protected YarnConfiguration conf = new YarnConfiguration(); - - public NMExecutorCLGenImpl(MyriadConfiguration cfg) { - this.cfg = cfg; - } - - @Override - public String generateCommandLine(ServiceResourceProfile profile, Ports ports) { - StringBuilder cmdLine = new StringBuilder(); - - generateEnvironment(profile, (NMPorts) ports); - appendCgroupsCmds(cmdLine); - appendYarnHomeExport(cmdLine); - appendEnvForNM(cmdLine); - cmdLine.append(YARN_NM_CMD); - return cmdLine.toString(); - } - - protected void generateEnvironment(ServiceResourceProfile profile, NMPorts ports) { - //yarnEnvironemnt configuration from yaml file - Map<String, String> yarnEnvironmentMap = cfg.getYarnEnvironment(); - if (yarnEnvironmentMap != null) { - environment.putAll(yarnEnvironmentMap); - } - - String rmHostName = System.getProperty(KEY_YARN_RM_HOSTNAME); - if (StringUtils.isNotEmpty(rmHostName)) { - addYarnNodemanagerOpt(KEY_YARN_RM_HOSTNAME, rmHostName); - } - - if (cfg.getNodeManagerConfiguration().getCgroups()) { - addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_HIERARCHY, "mesos/$TASK_DIR"); - if (environment.containsKey("YARN_HOME")) { - addYarnNodemanagerOpt(KEY_YARN_HOME, environment.get("YARN_HOME")); - } - } - addYarnNodemanagerOpt(KEY_NM_RESOURCE_CPU_VCORES, Integer.toString(profile.getCpus().intValue())); - addYarnNodemanagerOpt(KEY_NM_RESOURCE_MEM_MB, Integer.toString(profile.getMemory().intValue())); - addYarnNodemanagerOpt(KEY_NM_ADDRESS, ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getRpcPort()).toString()); - addYarnNodemanagerOpt(KEY_NM_LOCALIZER_ADDRESS, ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getLocalizerPort()).toString()); - addYarnNodemanagerOpt(KEY_NM_WEBAPP_ADDRESS, ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getWebAppHttpPort()).toString()); - addYarnNodemanagerOpt(KEY_NM_SHUFFLE_PORT, Long.valueOf(ports.getShufflePort()).toString()); - } - - protected void appendEnvForNM(StringBuilder cmdLine) { - cmdLine.append(" env "); - for (Map.Entry<String, String> env : environment.entrySet()) { - cmdLine.append(env.getKey()).append("=").append("\"").append(env.getValue()).append("\" "); - } - } - - protected void appendCgroupsCmds(StringBuilder cmdLine) { - if (cfg.getFrameworkSuperUser().isPresent()) { - cmdLine.append(" export TASK_DIR=`basename $PWD`&&"); - //The container executor script expects mount-path to exist and owned by the yarn user - //See: https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/NodeManagerCgroups.html - //If YARN ever moves to cgroup/mem it will be necessary to add a mem version. - appendSudo(cmdLine); - cmdLine.append("chown " + cfg.getFrameworkUser().get() + " "); - cmdLine.append(cfg.getCGroupPath()); - cmdLine.append("/cpu/mesos/$TASK_DIR &&"); - } else { - LOGGER.info("frameworkSuperUser not enabled ignoring cgroup configuration"); - } - } - - protected void appendYarnHomeExport(StringBuilder cmdLine) { - if (environment.containsKey("YARN_HOME")) { - cmdLine.append(" export YARN_HOME="); - cmdLine.append(environment.get("YARN_HOME")); - cmdLine.append(";"); - } - } - - protected void appendSudo(StringBuilder cmdLine) { - if (cfg.getFrameworkSuperUser().isPresent()) { - cmdLine.append(" sudo "); - } - } - - protected void appendUserSudo(StringBuilder cmdLine) { - if (cfg.getFrameworkSuperUser().isPresent()) { - cmdLine.append(" sudo -E -u "); - cmdLine.append(cfg.getFrameworkUser().get()); - cmdLine.append(" -H "); - } - } - - protected void addYarnNodemanagerOpt(String propertyName, String propertyValue) { - String envOpt = String.format(PROPERTY_FORMAT, propertyName, propertyValue); - if (environment.containsKey(ENV_YARN_NODEMANAGER_OPTS)) { - String existingOpts = environment.get(ENV_YARN_NODEMANAGER_OPTS); - environment.put(ENV_YARN_NODEMANAGER_OPTS, existingOpts + " " + envOpt); - } else { - environment.put(ENV_YARN_NODEMANAGER_OPTS, envOpt); - } - } - - @Override - public String getConfigurationUrl() { - String httpPolicy = conf.get(TaskFactory.YARN_HTTP_POLICY); - String address = StringUtils.EMPTY; - if (httpPolicy != null && httpPolicy.equals(TaskFactory.YARN_HTTP_POLICY_HTTPS_ONLY)) { - address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS); - if (StringUtils.isEmpty(address)) { - address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_HOSTNAME) + ":8090"; - } - } else { - address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_ADDRESS); - if (StringUtils.isEmpty(address)) { - address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_HOSTNAME) + ":8088"; - } - } - - return "http://" + address + "/conf"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCommandLineGenerator.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCommandLineGenerator.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCommandLineGenerator.java new file mode 100644 index 0000000..035da3f --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCommandLineGenerator.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.myriad.scheduler; + + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; + + + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import org.apache.commons.lang3.StringUtils; +import org.apache.mesos.Protos; +import org.apache.myriad.configuration.MyriadConfiguration; +import org.apache.myriad.configuration.ServiceConfiguration; + +import org.apache.mesos.Protos.CommandInfo; + +/** + * Implementation assumes NM binaries already deployed + */ +public class NMExecutorCommandLineGenerator extends ExecutorCommandLineGenerator { + + /** + * YARN class to help handle LCE resources + */ + public static final String ENV_YARN_NODEMANAGER_OPTS = "YARN_NODEMANAGER_OPTS"; + public static final String KEY_YARN_NM_LCE_CGROUPS_HIERARCHY = "yarn.nodemanager.linux-container-executor.cgroups.hierarchy"; + public static final String KEY_NM_RESOURCE_CPU_VCORES = "nodemanager.resource.cpu-vcores"; + public static final String KEY_NM_RESOURCE_MEM_MB = "nodemanager.resource.memory-mb"; + + public static final String YARN_NM_CMD = " $YARN_HOME/bin/yarn nodemanager"; + + public NMExecutorCommandLineGenerator(MyriadConfiguration cfg) { + this.myriadConfiguration = cfg; + this.myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration(); + generateStaticCommandLine(); + } + + @Override + CommandInfo generateCommandLine(ServiceResourceProfile profile, + ServiceConfiguration serviceConfiguration, Collection<Long> ports) { + CommandInfo.Builder builder = CommandInfo.newBuilder(); + builder.mergeFrom(staticCommandInfo); + builder.setEnvironment(generateEnvironment(profile, ports)); + builder.setUser(getUser()); + return builder.build(); + } + + protected void generateStaticCommandLine() { + CommandInfo.Builder builder = CommandInfo.newBuilder(); + StringBuilder cmdLine = new StringBuilder(); + appendCgroupsCmds(cmdLine); + appendDistroExtractionCommands(cmdLine); + appendUserSudo(cmdLine); + cmdLine.append(YARN_NM_CMD); + builder.setValue(String.format(CMD_FORMAT, cmdLine.toString())); + builder.addAllUris(getUris()); + staticCommandInfo = builder.build(); + } + + protected Protos.Environment generateEnvironment(ServiceResourceProfile profile, Collection<Long> ports) { + Map<String, String> yarnEnv = myriadConfiguration.getYarnEnvironment(); + Protos.Environment.Builder builder = Protos.Environment.newBuilder(); + builder.addAllVariables(Iterables.transform(yarnEnv.entrySet(), new Function<Map.Entry<String, String>, Protos.Environment.Variable>() { + public Protos.Environment.Variable apply(Map.Entry<String, String> x) { + return Protos.Environment.Variable.newBuilder().setName(x.getKey()).setValue(x.getValue()).build(); + } + })); + + StringBuilder yarnOpts = new StringBuilder(); + String rmHostName = System.getProperty(KEY_YARN_RM_HOSTNAME); + + + if (StringUtils.isNotEmpty(rmHostName)) { + addJavaOpt(yarnOpts, KEY_YARN_RM_HOSTNAME, rmHostName); + } + + if (yarnEnv.containsKey(KEY_YARN_HOME)) { + addJavaOpt(yarnOpts, KEY_YARN_HOME, yarnEnv.get("YARN_HOME")); + } + + addJavaOpt(yarnOpts, KEY_NM_RESOURCE_CPU_VCORES, Integer.toString(profile.getCpus().intValue())); + addJavaOpt(yarnOpts, KEY_NM_RESOURCE_MEM_MB, Integer.toString(profile.getMemory().intValue())); + Map<String, Long> portsMap = profile.getPorts(); + Preconditions.checkState(portsMap.size() == ports.size()); + + Iterator itr = ports.iterator(); + for (String portProperty : portsMap.keySet()) { + if (portProperty.endsWith("address")) { + addJavaOpt(yarnOpts, portProperty, ALL_LOCAL_IPV4ADDR + itr.next()); + } else if (portProperty.endsWith("port")) { + addJavaOpt(yarnOpts, portProperty, itr.next().toString()); + } else { + LOGGER.warn("{} propery isn't an address or port!", portProperty); + } + } + + + if (myriadConfiguration.getYarnEnvironment().containsKey(ENV_YARN_NODEMANAGER_OPTS)) { + yarnOpts.append(" ").append(yarnEnv.get(ENV_YARN_NODEMANAGER_OPTS)); + } + builder.addAllVariables(Collections.singleton( + Protos.Environment.Variable.newBuilder() + .setName(ENV_YARN_NODEMANAGER_OPTS) + .setValue(yarnOpts.toString()).build()) + ); + return builder.build(); + } + + protected void appendCgroupsCmds(StringBuilder cmdLine) { + //These can't be set in the environment as they require commands to be run on the host + if (myriadConfiguration.getFrameworkSuperUser().isPresent() && myriadConfiguration.isCgroupEnabled()) { + cmdLine.append(" export TASK_DIR=`cat /proc/self/cgroup | grep :cpu: | cut -d: -f3` &&"); + //The container executor script expects mount-path to exist and owned by the yarn user + //See: https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/NodeManagerCgroups.html + //If YARN ever moves to cgroup/mem it will be necessary to add a mem version. + appendSudo(cmdLine); + cmdLine.append("chown " + myriadConfiguration.getFrameworkUser().get() + " "); + cmdLine.append(myriadConfiguration.getCGroupPath()); + cmdLine.append("/cpu$TASK_DIR &&"); + cmdLine.append(String.format("export %s=\"$%s -D%s=%s\" && ", ENV_YARN_NODEMANAGER_OPTS, ENV_YARN_NODEMANAGER_OPTS, + KEY_YARN_NM_LCE_CGROUPS_HIERARCHY, "$TASK_DIR")); + } else if (myriadConfiguration.isCgroupEnabled()) { + LOGGER.info("frameworkSuperUser not set ignoring cgroup configuration, this will likely can the nodemanager to crash"); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java deleted file mode 100644 index 12490a7..0000000 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.myriad.scheduler; - -import com.google.common.base.Preconditions; -import java.util.HashMap; -import java.util.Map; - -/** - * Helper class for dynamically assigning ports to nodemanager - */ -public class NMPorts implements Ports { - private static final String NM_RPC_PORT_KEY = "nm.rpc.port"; - private static final String NM_LOCALIZER_PORT_KEY = "nm.localizer.port"; - private static final String NM_WEBAPP_HTTP_PORT_KEY = "nm.webapp.http.port"; - private static final String NM_HTTP_SHUFFLE_PORT_KEY = "nm.http.shuffle.port"; - - private static final String[] NM_PORT_KEYS = - {NM_RPC_PORT_KEY, NM_LOCALIZER_PORT_KEY, NM_WEBAPP_HTTP_PORT_KEY, NM_HTTP_SHUFFLE_PORT_KEY}; - - private Map<String, Long> portsMap = new HashMap<>(NM_PORT_KEYS.length); - - public NMPorts(Long[] ports) { - Preconditions.checkState(ports.length == NM_PORT_KEYS.length, "NMPorts: array \"ports\" is of unexpected length"); - for (int i = 0; i < NM_PORT_KEYS.length; i++) { - portsMap.put(NM_PORT_KEYS[i], ports[i]); - } - } - - public long getRpcPort() { - return portsMap.get(NM_RPC_PORT_KEY); - } - - public long getLocalizerPort() { - return portsMap.get(NM_LOCALIZER_PORT_KEY); - } - - public long getWebAppHttpPort() { - return portsMap.get(NM_WEBAPP_HTTP_PORT_KEY); - } - - public long getShufflePort() { - return portsMap.get(NM_HTTP_SHUFFLE_PORT_KEY); - } - - public static int expectedNumPorts() { - return NM_PORT_KEYS.length; - } - - /** - * @return a string representation of NMPorts - */ - @Override - public String toString() { - StringBuilder sb = new StringBuilder().append("{"); - for (String key : NM_PORT_KEYS) { - sb.append(key).append(": ").append(portsMap.get(key).toString()).append(", "); - } - sb.replace(sb.length() - 2, sb.length(), "}"); - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactory.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactory.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactory.java new file mode 100644 index 0000000..6c7e209 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactory.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.myriad.scheduler; + +import com.google.inject.Inject; +import org.apache.mesos.Protos; +import org.apache.myriad.configuration.MyriadConfiguration; +import org.apache.myriad.scheduler.resource.ResourceOfferContainer; +import org.apache.myriad.state.NodeTask; + +import java.util.List; + +/** + * Creates Node Manager Tasks based upon Mesos offers + */ +public class NMTaskFactory extends TaskFactory { + + + @Inject + NMTaskFactory(MyriadConfiguration cfg, TaskUtils taskUtils, ExecutorCommandLineGenerator clGenerator) { + super(cfg, taskUtils, clGenerator); + } + + @Override + public Protos.TaskInfo createTask(ResourceOfferContainer resourceOfferContainer, Protos.FrameworkID frameworkId, Protos.TaskID taskId, NodeTask nodeTask) { + ServiceResourceProfile serviceProfile = nodeTask.getProfile(); + Double taskMemory = serviceProfile.getAggregateMemory(); + Double taskCpus = serviceProfile.getAggregateCpu(); + List<Protos.Resource> portResources = resourceOfferContainer.consumePorts(serviceProfile.getPorts().values()); + Protos.CommandInfo commandInfo = clGenerator.generateCommandLine(serviceProfile, null, rangesConverter(portResources)); + Protos.ExecutorInfo executorInfo = getExecutorInfoForSlave(resourceOfferContainer, frameworkId, commandInfo); + Protos.TaskInfo.Builder taskBuilder = Protos.TaskInfo.newBuilder().setName(cfg.getFrameworkName() + "-" + taskId.getValue()).setTaskId(taskId).setSlaveId( + resourceOfferContainer.getSlaveId()); + + return taskBuilder + .addAllResources(resourceOfferContainer.consumeCpus(taskCpus)) + .addAllResources(resourceOfferContainer.consumeMem(taskMemory)) + .addAllResources(portResources) + .setExecutor(executorInfo) + .build(); + } + + @Override + public Protos.ExecutorInfo getExecutorInfoForSlave(ResourceOfferContainer resourceOfferContainer, Protos.FrameworkID frameworkId, Protos.CommandInfo commandInfo) { + Protos.ExecutorID executorId = Protos.ExecutorID.newBuilder() + .setValue(EXECUTOR_PREFIX + frameworkId.getValue() + resourceOfferContainer.getOfferId() + + resourceOfferContainer.getSlaveId().getValue()) + .build(); + Protos.ExecutorInfo.Builder executorInfo = Protos.ExecutorInfo.newBuilder().setCommand(commandInfo).setName(EXECUTOR_NAME).setExecutorId(executorId) + .addAllResources(resourceOfferContainer.consumeCpus(taskUtils.getExecutorCpus())) + .addAllResources(resourceOfferContainer.consumeMem(taskUtils.getExecutorMemory())); + if (cfg.getContainerInfo().isPresent()) { + executorInfo.setContainer(getContainerInfo()); + } + return executorInfo.build(); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Ports.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Ports.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Ports.java deleted file mode 100644 index 03150fb..0000000 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Ports.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.myriad.scheduler; - -/** - * Generic interface to represent ports - */ -public interface Ports { - -} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceCommandLineGenerator.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceCommandLineGenerator.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceCommandLineGenerator.java index 8765226..083d54d 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceCommandLineGenerator.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceCommandLineGenerator.java @@ -17,24 +17,94 @@ */ package org.apache.myriad.scheduler; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import org.apache.commons.lang3.StringUtils; +import org.apache.mesos.Protos; import org.apache.myriad.configuration.MyriadConfiguration; +import org.apache.myriad.configuration.ServiceConfiguration; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; /** * CommandLineGenerator for any aux service launched by Myriad as binary distro */ -public class ServiceCommandLineGenerator extends DownloadNMExecutorCLGenImpl { +public class ServiceCommandLineGenerator extends ExecutorCommandLineGenerator { + + public static final String ENV_HADOOP_OPTS = "HADOOP_OPTS"; + private String baseCmd; - public ServiceCommandLineGenerator(MyriadConfiguration cfg, String nodeManagerUri) { - super(cfg, nodeManagerUri); + public ServiceCommandLineGenerator(MyriadConfiguration cfg) { + this.myriadConfiguration = cfg; + myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration(); + generateStaticCommandLine(); + } + + protected void generateStaticCommandLine() { + Protos.CommandInfo.Builder builder = Protos.CommandInfo.newBuilder(); + builder.addAllUris(getUris()); + builder.setUser(getUser()); + staticCommandInfo = builder.build(); + + StringBuilder cmdLine = new StringBuilder(); + appendDistroExtractionCommands(cmdLine); + appendUserSudo(cmdLine); + baseCmd = cmdLine.toString(); } @Override - public String generateCommandLine(ServiceResourceProfile profile, Ports ports) { - StringBuilder strB = new StringBuilder(); - appendDistroExtractionCommands(strB); - appendUserSudo(strB); - return strB.toString(); + public Protos.CommandInfo generateCommandLine(ServiceResourceProfile profile, + ServiceConfiguration serviceConfiguration, + Collection<Long> ports) { + Protos.CommandInfo.Builder builder = Protos.CommandInfo.newBuilder(); + builder.mergeFrom(staticCommandInfo); + builder.setValue(String.format(CMD_FORMAT, baseCmd + " " + serviceConfiguration.getCommand().get())); + builder.setEnvironment(generateEnvironment(profile, ports)); + return builder.build(); + } + + protected Protos.Environment generateEnvironment(ServiceResourceProfile serviceResourceProfile, Collection<Long> ports) { + Map<String, String> yarnEnv = myriadConfiguration.getYarnEnvironment(); + Protos.Environment.Builder builder = Protos.Environment.newBuilder(); + + builder.addAllVariables(Iterables.transform(yarnEnv.entrySet(), new Function<Map.Entry<String, String>, Protos.Environment.Variable>() { + public Protos.Environment.Variable apply(Map.Entry<String, String> x) { + return Protos.Environment.Variable.newBuilder().setName(x.getKey()).setValue(x.getValue()).build(); + } + })); + + StringBuilder hadoopOpts = new StringBuilder(); + String rmHostName = System.getProperty(KEY_YARN_RM_HOSTNAME); + + if (StringUtils.isNotEmpty(rmHostName)) { + addJavaOpt(hadoopOpts, KEY_YARN_RM_HOSTNAME, rmHostName); + } + + if (yarnEnv.containsKey(KEY_YARN_HOME)) { + addJavaOpt(hadoopOpts, KEY_YARN_HOME, yarnEnv.get("YARN_HOME")); + } + + Map<String, Long> portsMap = serviceResourceProfile.getPorts(); + Preconditions.checkState(portsMap.size() == ports.size()); + Iterator itr = ports.iterator(); + for (String portProperty : portsMap.keySet()) { + addJavaOpt(hadoopOpts, portProperty, ALL_LOCAL_IPV4ADDR + itr.next()); + } + + if (myriadConfiguration.getYarnEnvironment().containsKey(ENV_HADOOP_OPTS)) { + hadoopOpts.append(" ").append(yarnEnv.get(ENV_HADOOP_OPTS)); + } + builder.addAllVariables(Collections.singleton( + Protos.Environment.Variable.newBuilder() + .setName(ENV_HADOOP_OPTS) + .setValue(hadoopOpts.toString()).build()) + ); + return builder.build(); } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java index 146a80c..147ed47 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java @@ -24,11 +24,13 @@ import com.google.gson.JsonDeserializer; import com.google.gson.JsonElement; import com.google.gson.JsonParseException; import java.lang.reflect.Type; +import java.util.Map; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Resource Profile for any service + * Resource Profile for any service */ public class ServiceResourceProfile { @@ -50,18 +52,14 @@ public class ServiceResourceProfile { protected String className = ServiceResourceProfile.class.getName(); - public ServiceResourceProfile(String name, Double cpus, Double mem) { - this.name = name; - this.cpus = cpus; - this.memory = mem; - } + protected Map<String, Long> ports; - public ServiceResourceProfile(String name, Double cpus, Double mem, Double execCpus, Double execMemory) { + public ServiceResourceProfile(String name, Double cpus, Double mem, Map<String, Long> ports) { this.name = name; this.cpus = cpus; this.memory = mem; - this.executorCpu = execCpus; - this.executorMemory = execMemory; + this.ports = ports; + this.className = ServiceResourceProfile.class.getName(); } public String getName() { @@ -84,12 +82,8 @@ public class ServiceResourceProfile { return cpus; } - public Double getExecutorCpu() { - return executorCpu; - } - - public Double getExecutorMemory() { - return executorMemory; + public Map<String, Long> getPorts() { + return ports; } @Override @@ -144,10 +138,10 @@ public class ServiceResourceProfile { } if (obj == null) { return false; - } + } if (getClass() != obj.getClass()) { return false; - } + } ServiceResourceProfile other = (ServiceResourceProfile) obj; if (className == null) { if (other.className != null) { @@ -190,7 +184,7 @@ public class ServiceResourceProfile { } } else if (!name.equals(other.name)) { return false; - } + } return true; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java deleted file mode 100644 index 60d4c44..0000000 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.myriad.scheduler; - -import java.util.Map; -import org.apache.myriad.configuration.MyriadConfiguration; -import org.apache.myriad.configuration.ServiceConfiguration; - -/** - * ServiceTaskConstraints is an implementation of TaskConstraints for a service - * at this point constraints are on ports - * Later on there may be other types of constraints added - */ -public class ServiceTaskConstraints implements TaskConstraints { - - private int portsCount = 0; - - public ServiceTaskConstraints(MyriadConfiguration cfg, String taskPrefix) { - Map<String, ServiceConfiguration> auxConfigs = cfg.getServiceConfigurations(); - - ServiceConfiguration serviceConfig = auxConfigs.get(taskPrefix); - if (serviceConfig != null) { - if (serviceConfig.getPorts().isPresent()) { - portsCount = serviceConfig.getPorts().get().size(); - } - } - } - - @Override - public int portsCount() { - return portsCount; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactory.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactory.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactory.java new file mode 100644 index 0000000..57e104f --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactory.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.myriad.scheduler; + +import com.google.inject.Inject; +import org.apache.mesos.Protos; +import org.apache.myriad.configuration.MyriadConfiguration; +import org.apache.myriad.configuration.ServiceConfiguration; +import org.apache.myriad.scheduler.resource.ResourceOfferContainer; +import org.apache.myriad.state.NodeTask; + +import java.util.List; +import java.util.Objects; + +/** + * Generic Service Class that allows to create a service solely base don the configuration + * Main properties of configuration are: + * 1. command to run + * 2. Additional env. variables to set (serviceOpts) + * 3. ports to use with names of the properties + * 4. TODO (yufeldman) executor info + */ +public class ServiceTaskFactory extends TaskFactory { + + @Inject + ServiceTaskFactory(MyriadConfiguration cfg, TaskUtils taskUtils, ExecutorCommandLineGenerator clGenerator) { + super(cfg, taskUtils, clGenerator); + this.clGenerator = new ServiceCommandLineGenerator(cfg); + } + + @Override + public Protos.TaskInfo createTask(ResourceOfferContainer resourceOfferContainer, Protos.FrameworkID frameworkId, Protos.TaskID taskId, NodeTask nodeTask) { + ServiceConfiguration serviceConfig = cfg.getServiceConfiguration(nodeTask.getTaskPrefix()).get(); + + Objects.requireNonNull(serviceConfig, "ServiceConfig should be non-null"); + Objects.requireNonNull(serviceConfig.getCommand().orNull(), "command for ServiceConfig should be non-null"); + List<Protos.Resource> portResources = resourceOfferContainer.consumePorts(nodeTask.getProfile().getPorts().values()); + Protos.CommandInfo commandInfo = clGenerator.generateCommandLine(nodeTask.getProfile(), serviceConfig, rangesConverter(portResources)); + + LOGGER.info("Command line for service: {} is: {}", commandInfo.getValue()); + + Protos.TaskInfo.Builder taskBuilder = Protos.TaskInfo.newBuilder(); + + taskBuilder.setName(nodeTask.getTaskPrefix()).setTaskId(taskId).setSlaveId(resourceOfferContainer.getSlaveId()) + .addAllResources(resourceOfferContainer.consumeCpus(nodeTask.getProfile().getCpus())) + .addAllResources(resourceOfferContainer.consumeMem(nodeTask.getProfile().getMemory())) + .addAllResources(portResources); + + taskBuilder.setCommand(commandInfo); + if (cfg.getContainerInfo().isPresent()) { + taskBuilder.setContainer(getContainerInfo()); + } + return taskBuilder.build(); + } + + @Override + public Protos.ExecutorInfo getExecutorInfoForSlave(ResourceOfferContainer resourceOfferContainer, Protos.FrameworkID frameworkId, Protos.CommandInfo commandInfo) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java deleted file mode 100644 index 42f698a..0000000 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java +++ /dev/null @@ -1,253 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.myriad.scheduler; - -import com.google.common.annotations.VisibleForTesting; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import javax.inject.Inject; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.mesos.Protos; -import org.apache.mesos.Protos.CommandInfo; -import org.apache.mesos.Protos.CommandInfo.URI; -import org.apache.mesos.Protos.ExecutorInfo; -import org.apache.mesos.Protos.FrameworkID; -import org.apache.mesos.Protos.Offer; -import org.apache.mesos.Protos.Resource; -import org.apache.mesos.Protos.TaskID; -import org.apache.mesos.Protos.TaskInfo; -import org.apache.mesos.Protos.Value; -import org.apache.myriad.configuration.MyriadConfiguration; -import org.apache.myriad.configuration.MyriadExecutorConfiguration; -import org.apache.myriad.configuration.ServiceConfiguration; -import org.apache.myriad.state.NodeTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Generic Service Class that allows to create a service solely base don the configuration - * Main properties of configuration are: - * 1. command to run - * 2. Additional env. variables to set (serviceOpts) - * 3. ports to use with names of the properties - * 4. TODO (yufeldman) executor info - */ -public class ServiceTaskFactoryImpl implements TaskFactory { - private static final Logger LOGGER = LoggerFactory.getLogger(ServiceTaskFactoryImpl.class); - - public static final long DEFAULT_PORT_NUMBER = 0; - - private MyriadConfiguration cfg; - @SuppressWarnings("unused") - private TaskUtils taskUtils; - private ServiceCommandLineGenerator clGenerator; - - @Inject - public ServiceTaskFactoryImpl(MyriadConfiguration cfg, TaskUtils taskUtils) { - this.cfg = cfg; - this.taskUtils = taskUtils; - this.clGenerator = new ServiceCommandLineGenerator(cfg, cfg.getMyriadExecutorConfiguration().getNodeManagerUri().orNull()); - } - - @Override - public TaskInfo createTask(Offer offer, FrameworkID frameworkId, TaskID taskId, NodeTask nodeTask) { - Objects.requireNonNull(offer, "Offer should be non-null"); - Objects.requireNonNull(nodeTask, "NodeTask should be non-null"); - - ServiceConfiguration serviceConfig = cfg.getServiceConfiguration(nodeTask.getTaskPrefix()).get(); - - Objects.requireNonNull(serviceConfig, "ServiceConfig should be non-null"); - Objects.requireNonNull(serviceConfig.getCommand().orNull(), "command for ServiceConfig should be non-null"); - - final String serviceHostName = "0.0.0.0"; - final String serviceEnv = serviceConfig.getEnvSettings(); - final String rmHostName = System.getProperty(YARN_RESOURCEMANAGER_HOSTNAME); - List<Long> additionalPortsNumbers = null; - - final StringBuilder strB = new StringBuilder("env "); - if (serviceConfig.getServiceOpts().isPresent()) { - strB.append(serviceConfig.getServiceOpts().get()).append("="); - - strB.append("\""); - if (StringUtils.isNotEmpty(rmHostName)) { - strB.append("-D" + YARN_RESOURCEMANAGER_HOSTNAME + "=" + rmHostName + " "); - } - - Map<String, Long> ports = serviceConfig.getPorts().orNull(); - - if (MapUtils.isNotEmpty(ports)) { - int neededPortsCount = 0; - for (Map.Entry<String, Long> portEntry : ports.entrySet()) { - Long port = portEntry.getValue(); - if (port == DEFAULT_PORT_NUMBER) { - neededPortsCount++; - } - } - // use provided ports - additionalPortsNumbers = getAvailablePorts(offer, neededPortsCount); - LOGGER.info("No specified ports found or number of specified ports is not enough. Using ports from Mesos Offers: {}", - additionalPortsNumbers); - int index = 0; - for (Map.Entry<String, Long> portEntry : ports.entrySet()) { - String portProperty = portEntry.getKey(); - Long port = portEntry.getValue(); - if (port == DEFAULT_PORT_NUMBER) { - port = additionalPortsNumbers.get(index++); - } - strB.append("-D" + portProperty + "=" + serviceHostName + ":" + port + " "); - } - } - strB.append(serviceEnv); - strB.append("\""); - } - - strB.append(" "); - strB.append(serviceConfig.getCommand().get()); - - CommandInfo commandInfo = createCommandInfo(nodeTask.getProfile(), strB.toString()); - - LOGGER.info("Command line for service: {} is: {}", nodeTask.getTaskPrefix(), strB.toString()); - - TaskInfo.Builder taskBuilder = TaskInfo.newBuilder(); - - taskBuilder.setName(nodeTask.getTaskPrefix()).setTaskId(taskId).setSlaveId(offer.getSlaveId()) - .addAllResources(taskUtils.getScalarResource(offer, "cpus", nodeTask.getProfile().getCpus(), 0.0)) - .addAllResources(taskUtils.getScalarResource(offer, "mem", nodeTask.getProfile().getMemory(), 0.0)); - - if (CollectionUtils.isNotEmpty(additionalPortsNumbers)) { - // set ports - Value.Ranges.Builder valueRanger = Value.Ranges.newBuilder(); - for (Long port : additionalPortsNumbers) { - valueRanger.addRange(Value.Range.newBuilder().setBegin(port).setEnd(port)); - } - - taskBuilder.addResources(Resource.newBuilder().setName("ports").setType(Value.Type.RANGES).setRanges(valueRanger.build())); - } - taskBuilder.setCommand(commandInfo); - if (cfg.getContainerInfo().isPresent()) { - taskBuilder.setContainer(taskUtils.getContainerInfo()); - } - return taskBuilder.build(); - } - - @VisibleForTesting - CommandInfo createCommandInfo(ServiceResourceProfile profile, String executorCmd) { - MyriadExecutorConfiguration myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration(); - CommandInfo.Builder commandInfo = CommandInfo.newBuilder(); - Map<String, String> envVars = cfg.getYarnEnvironment(); - if (MapUtils.isNotEmpty(envVars)) { - Protos.Environment.Builder yarnHomeB = Protos.Environment.newBuilder(); - for (Map.Entry<String, String> envEntry : envVars.entrySet()) { - Protos.Environment.Variable.Builder yarnEnvB = Protos.Environment.Variable.newBuilder(); - yarnEnvB.setName(envEntry.getKey()).setValue(envEntry.getValue()); - yarnHomeB.addVariables(yarnEnvB.build()); - } - commandInfo.mergeEnvironment(yarnHomeB.build()); - } - - if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) { - //Both FrameworkUser and FrameworkSuperuser to get all of the directory permissions correct. - if (!minimumUserSet(cfg)) { - throw new RuntimeException("Trying to use remote distribution, but frameworkUser" + "and/or frameworkSuperUser not set!"); - } - - LOGGER.info("Using remote distribution"); - String clGeneratedCommand = clGenerator.generateCommandLine(profile, null); - - String nmURIString = myriadExecutorConfiguration.getNodeManagerUri().get(); - - //Concatenate all the subcommands - String cmd = clGeneratedCommand + " " + executorCmd; - - //get the nodemanagerURI - //We're going to extract ourselves, so setExtract is false - LOGGER.info("Getting Hadoop distribution from:" + nmURIString); - URI nmUri = URI.newBuilder().setValue(nmURIString).setExtract(false).build(); - - //get configs directly from resource manager - String configUrlString = clGenerator.getConfigurationUrl(); - LOGGER.info("Getting config from:" + configUrlString); - URI configUri = URI.newBuilder().setValue(configUrlString).build(); - - LOGGER.info("Slave will execute command:" + cmd); - commandInfo.addUris(nmUri).addUris(configUri).setValue("echo \"" + cmd + "\";" + cmd); - commandInfo.setUser(cfg.getFrameworkSuperUser().get()); - - } else { - commandInfo.setValue(executorCmd); - } - return commandInfo.build(); - } - - private Boolean minimumUserSet(MyriadConfiguration conf) { - return (cfg.getFrameworkUser().isPresent() || cfg.getFrameworkSuperUser().isPresent()); - } - - @Override - public ExecutorInfo getExecutorInfoForSlave(FrameworkID frameworkId, Offer offer, CommandInfo commandInfo) { - // TODO (yufeldman) if executor specified use it , otherwise return null - // nothing to implement here, since we are using default slave executor - return null; - } - - /** - * Helper method to reserve ports - * - * @param offer - * @param requestedPorts - * @return - */ - private List<Long> getAvailablePorts(Offer offer, int requestedPorts) { - if (requestedPorts == 0) { - return null; - } - - final List<Long> returnedPorts = new ArrayList<>(); - for (Resource resource : offer.getResourcesList()) { - if (resource.getName().equals("ports") && (isDefaultRole(resource))) { - Iterator<Value.Range> itr = resource.getRanges().getRangeList().iterator(); - while (itr.hasNext()) { - Value.Range range = itr.next(); - if (range.getBegin() <= range.getEnd()) { - long i = range.getBegin(); - while (i <= range.getEnd() && returnedPorts.size() < requestedPorts) { - returnedPorts.add(i); - i++; - } - if (returnedPorts.size() >= requestedPorts) { - return returnedPorts; - } - } - } - } - } - //this is actually an error condition - we did not have enough ports to use - //TODO (hokiegeek2) does this need error handling then? - return returnedPorts; - } - - private boolean isDefaultRole(Resource resource) { - return !resource.hasRole() || resource.getRole().equals("*"); - } -} \ No newline at end of file