Repository: incubator-myriad Updated Branches: refs/heads/master 63b58698e -> cbdcbbe03
Bulk commit of MYRIAD-198 changes We overuse Optionals in the config and then use an or method in various factories later. In many cases having the configuration return a default when the parameter was specified would create cleaner code. Pull Request: Closes #76 Author: hokiegeek2 <hokiege...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/cbdcbbe0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/cbdcbbe0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/cbdcbbe0 Branch: refs/heads/master Commit: cbdcbbe0349f7a10a3fba98ccd120e228982a193 Parents: 63b5869 Author: hokiegeek2 <hokiege...@gmail.com> Authored: Fri May 20 12:11:32 2016 -0400 Committer: darinj <dar...@apache.org> Committed: Wed Jun 8 13:10:44 2016 -0400 ---------------------------------------------------------------------- .../src/main/java/org/apache/myriad/Main.java | 21 ++--- .../java/org/apache/myriad/MesosModule.java | 23 +++-- .../java/org/apache/myriad/MyriadModule.java | 25 ++--- .../configuration/MyriadConfiguration.java | 61 ++++++------ .../MyriadExecutorConfiguration.java | 12 ++- .../configuration/NodeManagerConfiguration.java | 40 +++++--- .../configuration/ServiceConfiguration.java | 51 ++++++---- .../myriad/scheduler/MyriadOperations.java | 40 ++++---- .../myriad/scheduler/NMExecutorCLGenImpl.java | 18 ++-- .../scheduler/ServiceTaskConstraints.java | 9 +- .../scheduler/ServiceTaskFactoryImpl.java | 39 +++++--- .../apache/myriad/scheduler/TaskFactory.java | 32 +++++-- .../org/apache/myriad/scheduler/TaskUtils.java | 63 +++++++------ .../handlers/ResourceOffersEventHandler.java | 2 +- .../org/apache/myriad/state/SchedulerState.java | 9 +- .../main/resources/myriad-config-default.yml | 4 +- .../configuration/MyriadConfigurationTest.java | 97 +++++++++++++++++--- .../apache/myriad/scheduler/TestTaskUtils.java | 5 +- .../fgs/YarnNodeCapacityManagerSpec.groovy | 6 +- ...iad-config-test-default-with-docker-info.yml | 2 +- ...-config-test-default-with-framework-role.yml | 2 +- .../resources/myriad-config-test-default.yml | 14 +-- 22 files changed, 363 insertions(+), 212 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/cbdcbbe0/myriad-scheduler/src/main/java/org/apache/myriad/Main.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/Main.java b/myriad-scheduler/src/main/java/org/apache/myriad/Main.java index 0f0703e..e825256 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/Main.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/Main.java @@ -240,16 +240,15 @@ public class Main { TaskConstraintsManager taskConstraintsManager = injector.getInstance(TaskConstraintsManager.class); Map<String, ServiceConfiguration> servicesConfigs = injector.getInstance(MyriadConfiguration.class).getServiceConfigurations(); - if (servicesConfigs != null) { - for (Map.Entry<String, ServiceConfiguration> entry : servicesConfigs.entrySet()) { - final String taskPrefix = entry.getKey(); - ServiceConfiguration config = entry.getValue(); - final Double cpu = config.getCpus().or(ServiceConfiguration.DEFAULT_CPU); - final Double mem = config.getJvmMaxMemoryMB().or(ServiceConfiguration.DEFAULT_MEMORY); - - profileManager.add(new ServiceResourceProfile(taskPrefix, cpu, mem)); - taskConstraintsManager.addTaskConstraints(taskPrefix, new ServiceTaskConstraints(cfg, taskPrefix)); - } + + for (Map.Entry<String, ServiceConfiguration> entry : servicesConfigs.entrySet()) { + final String taskPrefix = entry.getKey(); + ServiceConfiguration config = entry.getValue(); + final Double cpu = config.getCpus(); + final Double mem = config.getJvmMaxMemoryMB(); + + profileManager.add(new ServiceResourceProfile(taskPrefix, cpu, mem)); + taskConstraintsManager.addTaskConstraints(taskPrefix, new ServiceTaskConstraints(cfg, taskPrefix)); } } @@ -262,7 +261,7 @@ public class Main { } private void initRebalancerService(MyriadConfiguration cfg, Injector injector) { - if (cfg.isRebalancer()) { + if (cfg.isRebalancerEnabled()) { LOGGER.info("Initializing Rebalancer"); rebalancerService = Executors.newScheduledThreadPool(1); final int initialDelay = 100; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/cbdcbbe0/myriad-scheduler/src/main/java/org/apache/myriad/MesosModule.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/MesosModule.java b/myriad-scheduler/src/main/java/org/apache/myriad/MesosModule.java index 8390749..7ca3962 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/MesosModule.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/MesosModule.java @@ -18,15 +18,11 @@ */ package org.apache.myriad; -import com.google.inject.AbstractModule; -import com.google.inject.Provides; -import com.google.inject.Scopes; -import com.google.inject.Singleton; -import com.google.protobuf.ByteString; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.util.concurrent.TimeUnit; + import org.apache.commons.lang.StringUtils; import org.apache.mesos.MesosSchedulerDriver; import org.apache.mesos.Protos.Credential; @@ -43,6 +39,13 @@ import org.apache.myriad.state.SchedulerState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Optional; +import com.google.inject.AbstractModule; +import com.google.inject.Provides; +import com.google.inject.Scopes; +import com.google.inject.Singleton; +import com.google.protobuf.ByteString; + /** * Guice Module for Mesos objects. */ @@ -64,12 +67,12 @@ public class MesosModule extends AbstractModule { Builder frameworkInfoBuilder = FrameworkInfo.newBuilder().setUser("").setName(cfg.getFrameworkName()).setCheckpoint( cfg.isCheckpoint()).setFailoverTimeout(cfg.getFrameworkFailoverTimeout()); - if (StringUtils.isNotEmpty(cfg.getFrameworkRole())) { - frameworkInfoBuilder.setRole(cfg.getFrameworkRole()); - } + frameworkInfoBuilder.setRole(cfg.getFrameworkRole()); - FrameworkID frameworkId = schedulerState.getFrameworkID(); - if (frameworkId != null) { + Optional<FrameworkID> optFrameId = schedulerState.getFrameworkID(); + + if (optFrameId.isPresent()) { + FrameworkID frameworkId = optFrameId.get(); LOGGER.info("Attempting to re-register with frameworkId: {}", frameworkId.getValue()); frameworkInfoBuilder.setId(frameworkId); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/cbdcbbe0/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java b/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java index 41abb9a..92add9a 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java @@ -103,22 +103,23 @@ public class MyriadModule extends AbstractModule { MapBinder<String, TaskFactory> mapBinder = MapBinder.newMapBinder(binder(), String.class, TaskFactory.class); mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactoryImpl.class).in(Scopes.SINGLETON); + Map<String, ServiceConfiguration> auxServicesConfigs = cfg.getServiceConfigurations(); - if (auxServicesConfigs != null) { - for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) { - String taskFactoryClass = entry.getValue().getTaskFactoryImplName().orNull(); - if (taskFactoryClass != null) { - try { - Class<? extends TaskFactory> implClass = getTaskFactoryClass(taskFactoryClass); - mapBinder.addBinding(entry.getKey()).to(implClass).in(Scopes.SINGLETON); - } catch (ClassNotFoundException e) { - LOGGER.error("ClassNotFoundException", e); - } - } else { - mapBinder.addBinding(entry.getKey()).to(ServiceTaskFactoryImpl.class).in(Scopes.SINGLETON); + for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) { + if (entry.getValue().getTaskFactoryImplName().isPresent()) { + String taskFactoryClass = entry.getValue().getTaskFactoryImplName().get(); + try { + Class<? extends TaskFactory> implClass = getTaskFactoryClass(taskFactoryClass); + mapBinder.addBinding(entry.getKey()).to(implClass).in(Scopes.SINGLETON); + } catch (ClassNotFoundException e) { + LOGGER.error("ClassNotFoundException", e); } + } else { + //TODO (kjyost) Confirm if this else statement and logic should still be here + mapBinder.addBinding(entry.getKey()).to(ServiceTaskFactoryImpl.class).in(Scopes.SINGLETON); } } + //TODO(Santosh): Should be configurable as well bind(NodeScaleDownPolicy.class).to(LeastAMNodesFirstPolicy.class).in(Scopes.SINGLETON); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/cbdcbbe0/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java index 3de72a6..cf6cc18 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java @@ -18,14 +18,15 @@ */ package org.apache.myriad.configuration; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Optional; -import com.google.common.base.Strings; +import java.util.Collections; import java.util.Map; import org.codehaus.jackson.map.annotate.JsonSerialize; import org.hibernate.validator.constraints.NotEmpty; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; + /** * Myriad Configuration commonly defined in the YML file * mesosMaster: 10.0.2.15:5050 @@ -85,7 +86,7 @@ public class MyriadConfiguration { /** * By default rebalancer is turned off. */ - public static final Boolean DEFAULT_REBALANCER = true; + public static final Boolean DEFAULT_REBALANCER_ENABLED = false; /** * By default ha is turned off. @@ -104,6 +105,14 @@ public class MyriadConfiguration { public static final Integer DEFAULT_ZK_TIMEOUT = 20000; public static final Integer DEFAULT_REST_API_PORT = 8192; + + public static final String DEFAULT_ROLE = "*"; + + public static final String DEFAULT_ZK_SERVERS = "localhost:2181"; + + public static final String DEFAULT_CGROUP_PATH = "/sys/fs/cgroup"; + + public static final Map<String, ServiceConfiguration> EMPTY_SERVICE_CONFIGURATION = Collections.emptyMap(); @JsonProperty @NotEmpty @@ -188,6 +197,7 @@ public class MyriadConfiguration { @JsonProperty private String cgroupPath; + public MyriadConfiguration() { } @@ -196,23 +206,23 @@ public class MyriadConfiguration { } public Boolean isCheckpoint() { - return this.checkpoint != null ? checkpoint : DEFAULT_CHECKPOINT; + return Optional.fromNullable(checkpoint).or(DEFAULT_CHECKPOINT); } public Optional<MyriadContainerConfiguration> getContainerInfo() { return Optional.fromNullable(containerInfo); } - public Double getFrameworkFailoverTimeout() { - return this.frameworkFailoverTimeout != null ? this.frameworkFailoverTimeout : DEFAULT_FRAMEWORK_FAILOVER_TIMEOUT_MS; + public String getFrameworkRole() { + return Optional.fromNullable(frameworkRole).or(DEFAULT_ROLE); } public String getFrameworkName() { - return Strings.isNullOrEmpty(this.frameworkName) ? DEFAULT_FRAMEWORK_NAME : this.frameworkName; + return Optional.fromNullable(frameworkName).or(DEFAULT_FRAMEWORK_NAME); } - - public String getFrameworkRole() { - return frameworkRole; + + public Double getFrameworkFailoverTimeout() { + return Optional.fromNullable(frameworkFailoverTimeout).or(DEFAULT_FRAMEWORK_FAILOVER_TIMEOUT_MS); } public Optional<String> getFrameworkUser() { @@ -231,27 +241,24 @@ public class MyriadConfiguration { return nmInstances; } - public Boolean isRebalancer() { - return rebalancer != null ? rebalancer : DEFAULT_REBALANCER; + public Boolean isRebalancerEnabled() { + return Optional.fromNullable(rebalancer).or(DEFAULT_REBALANCER_ENABLED); } public Boolean isHAEnabled() { - return haEnabled != null ? haEnabled : DEFAULT_HA_ENABLED; + return Optional.fromNullable(haEnabled).or(DEFAULT_HA_ENABLED); } public NodeManagerConfiguration getNodeManagerConfiguration() { - return this.nodemanager; + return nodemanager; } public Map<String, ServiceConfiguration> getServiceConfigurations() { - return this.services; + return Optional.fromNullable(services).or(EMPTY_SERVICE_CONFIGURATION); } - public ServiceConfiguration getServiceConfiguration(String taskName) { - if (services == null) { - return null; - } - return this.services.get(taskName); + public Optional<ServiceConfiguration> getServiceConfiguration(String taskName) { + return Optional.fromNullable(services.get(taskName)); } public MyriadExecutorConfiguration getMyriadExecutorConfiguration() { @@ -259,19 +266,19 @@ public class MyriadConfiguration { } public String getNativeLibrary() { - return Strings.isNullOrEmpty(this.nativeLibrary) ? DEFAULT_NATIVE_LIBRARY : this.nativeLibrary; + return Optional.fromNullable(nativeLibrary).or(DEFAULT_NATIVE_LIBRARY); } public String getZkServers() { - return this.zkServers; + return Optional.fromNullable(zkServers).or(DEFAULT_ZK_SERVERS); } public Integer getZkTimeout() { - return this.zkTimeout != null ? this.zkTimeout : DEFAULT_ZK_TIMEOUT; + return Optional.fromNullable(zkTimeout).or(DEFAULT_ZK_TIMEOUT); } public Integer getRestApiPort() { - return this.restApiPort != null ? this.restApiPort : DEFAULT_REST_API_PORT; + return Optional.fromNullable(restApiPort).or(DEFAULT_REST_API_PORT); } public Map<String, String> getYarnEnvironment() { @@ -295,6 +302,6 @@ public class MyriadConfiguration { } public String getCGroupPath() { - return cgroupPath == null ? "/sys/fs/cgroup" : cgroupPath; + return Optional.fromNullable(cgroupPath).or(DEFAULT_CGROUP_PATH); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/cbdcbbe0/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadExecutorConfiguration.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadExecutorConfiguration.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadExecutorConfiguration.java index 1096e16..7f80ccb 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadExecutorConfiguration.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadExecutorConfiguration.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; import org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerDouble; import org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerString; +import org.apache.myriad.executor.MyriadExecutorDefaults; import org.codehaus.jackson.map.annotate.JsonSerialize; import org.hibernate.validator.constraints.NotEmpty; @@ -58,8 +59,12 @@ public class MyriadExecutorConfiguration { @JsonSerialize(using = OptionalSerializerString.class) private String jvmUri; - public Optional<Double> getJvmMaxMemoryMB() { - return Optional.fromNullable(jvmMaxMemoryMB); + private Double generateMaxMemory() { + return (MyriadExecutorDefaults.DEFAULT_JVM_MAX_MEMORY_MB) * (1 + MyriadExecutorDefaults.JVM_OVERHEAD); + } + + public Double getJvmMaxMemoryMB() { + return Optional.fromNullable(jvmMaxMemoryMB).or(generateMaxMemory()); } public String getPath() { @@ -77,5 +82,4 @@ public class MyriadExecutorConfiguration { public Optional<String> getJvmUri() { return Optional.fromNullable(jvmUri); } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/cbdcbbe0/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java index de709ca..6104c2c 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java @@ -26,7 +26,7 @@ import org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerStri import org.codehaus.jackson.map.annotate.JsonSerialize; /** - * Node Manager Configuration + * YARN NodeManager Configuration */ public class NodeManagerConfiguration { /** @@ -40,10 +40,18 @@ public class NodeManagerConfiguration { public static final double DEFAULT_JVM_MAX_MEMORY_MB = 2048; /** - * Default cpu for NodeManager JVM. + * Default CPU cores for NodeManager JVM. */ public static final double DEFAULT_NM_CPUS = 1; + /** + * CGroups disabled by default + */ + public static final Boolean DEFAULT_NM_CGROUPS = false; + + /** + * Default NodeManager Mesos task prefix + */ public static final String NM_TASK_PREFIX = "nm"; /** @@ -54,7 +62,7 @@ public class NodeManagerConfiguration { private Double jvmMaxMemoryMB; /** - * Amount of CPU share given to NodeManger JVM. This is critical specially + * Amount of CPU share given to NodeManger JVM. This is critical especially * for NodeManager auxiliary services. */ @JsonProperty @@ -67,27 +75,31 @@ public class NodeManagerConfiguration { @JsonProperty @JsonSerialize(using = OptionalSerializerString.class) private String jvmOpts; - + /** - * Determines if cgroups are enabled for NM or not. + * Determines if cgroups are enabled for the NodeManager */ @JsonProperty @JsonSerialize(using = OptionalSerializerBoolean.class) private Boolean cgroups; - public Optional<Double> getJvmMaxMemoryMB() { - return Optional.fromNullable(jvmMaxMemoryMB); + private Double generateNodeManagerMemory() { + return (NodeManagerConfiguration.DEFAULT_JVM_MAX_MEMORY_MB) * (1 + NodeManagerConfiguration.JVM_OVERHEAD); } - + + public Double getJvmMaxMemoryMB() { + return Optional.fromNullable(jvmMaxMemoryMB).or(generateNodeManagerMemory()); + } + public Optional<String> getJvmOpts() { return Optional.fromNullable(jvmOpts); } - - public Optional<Double> getCpus() { - return Optional.fromNullable(cpus); + + public Double getCpus() { + return Optional.fromNullable(cpus).or(DEFAULT_NM_CPUS); } - public Optional<Boolean> getCgroups() { - return Optional.fromNullable(cgroups); + public boolean getCgroups() { + return Optional.fromNullable(cgroups).or(DEFAULT_NM_CGROUPS); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/cbdcbbe0/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java index 146df42..e888e05 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java @@ -18,41 +18,51 @@ package org.apache.myriad.configuration; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Optional; import java.util.Map; + import org.codehaus.jackson.map.annotate.JsonSerialize; import org.hibernate.validator.constraints.NotEmpty; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; /** * Configuration for any service/task to be started from Myriad Scheduler */ public class ServiceConfiguration { + /** + * Translates to -Xmx for the Mesos executor JVM. + * Default number of CPU cores per Mesos executor JVM. + */ + public static final Double DEFAULT_CPU_CORES = 0.1; - private static final Logger LOGGER = LoggerFactory.getLogger(ServiceConfiguration.class); - - public static final Double DEFAULT_CPU = 0.1; - - public static final Double DEFAULT_MEMORY = 256.0; - /** - * Translates to -Xmx for the JVM. + * Default amount of RAM per Mesos executor JVM. + */ + public static final Double DEFAULT_JVM_MAX_MEMORY_MB = 256.0; + + /** + * Translates to -Xmx for the Mesos executor JVM. + * Allot 10% more memory to account for JVM overhead. + */ + public static final double JVM_OVERHEAD = 0.1; + + /** + * Translates to -Xmx for the Mesos executor JVM. */ @JsonProperty @JsonSerialize(using = OptionalSerializer.OptionalSerializerDouble.class) protected Double jvmMaxMemoryMB; /** - * Amount of CPU share given to JVM. + * Amount of CPU share given to Mesos executor JVM. */ @JsonProperty @JsonSerialize(using = OptionalSerializer.OptionalSerializerDouble.class) protected Double cpus; /** - * Translates to jvm opts for the JVM. + * Translates to JVM opts for the Mesos executor JVM. */ @JsonProperty @JsonSerialize(using = OptionalSerializer.OptionalSerializerString.class) @@ -89,17 +99,20 @@ public class ServiceConfiguration { @JsonProperty protected String serviceOptsName; + private Double generateMaxMemory() { + return (DEFAULT_JVM_MAX_MEMORY_MB) * (1 + JVM_OVERHEAD); + } - public Optional<Double> getJvmMaxMemoryMB() { - return Optional.fromNullable(jvmMaxMemoryMB); + public Double getJvmMaxMemoryMB() { + return Optional.fromNullable(jvmMaxMemoryMB).or(generateMaxMemory()); } public Optional<String> getJvmOpts() { return Optional.fromNullable(jvmOpts); } - public Optional<Double> getCpus() { - return Optional.fromNullable(cpus); + public Double getCpus() { + return Optional.fromNullable(cpus).or(DEFAULT_CPU_CORES); } public String getTaskName() { @@ -130,7 +143,7 @@ public class ServiceConfiguration { return Optional.fromNullable(command); } - public String getServiceOpts() { - return serviceOptsName; + public Optional<String> getServiceOpts() { + return Optional.fromNullable(serviceOptsName); } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/cbdcbbe0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java index 2250eee..99f7e78 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java @@ -18,8 +18,6 @@ */ package org.apache.myriad.scheduler; -import com.google.common.collect.Lists; -import com.google.inject.Inject; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -42,6 +40,9 @@ import org.apache.myriad.webapp.MyriadWebServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; +import com.google.inject.Inject; + /** * Myriad scheduler operations */ @@ -108,26 +109,30 @@ public class MyriadOperations { * * @param instances * @param serviceName + * + * @throws MyriadBadConfigurationException if total number of instances in active, staging, and pending + * states exceeds the ServiceConfiguration.maxInstances */ public void flexUpAService(int instances, String serviceName) throws MyriadBadConfigurationException { - final ServiceConfiguration auxTaskConf = cfg.getServiceConfiguration(serviceName); - - int totalflexInstances = instances + getFlexibleInstances(serviceName); - Integer maxInstances = auxTaskConf.getMaxInstances().orNull(); - if (maxInstances != null && maxInstances > 0) { - // check number of instances - // sum of active, staging, pending should be < maxInstances - if (totalflexInstances > maxInstances) { - LOGGER.error("Current number of active, staging, pending and requested instances: {}" + - ", while it is greater then max instances allowed: {}", totalflexInstances, maxInstances); - throw new MyriadBadConfigurationException( - "Current number of active, staging, pending instances and requested: " + totalflexInstances + - ", while it is greater then max instances allowed: " + maxInstances); + final ServiceConfiguration auxTaskConf = cfg.getServiceConfiguration(serviceName).get(); + + if (auxTaskConf.getMaxInstances().isPresent()) { + //If total number of current and flex instances exceed maxInstances, throw an exception + int totalflexInstances = instances + getFlexibleInstances(serviceName); + Integer maxInstances = auxTaskConf.getMaxInstances().get(); + if (maxInstances > 0) { + if (totalflexInstances > maxInstances) { + LOGGER.error("Current number of active, staging, pending and requested instances: {}" + + ", while it is greater then max instances allowed: {}", totalflexInstances, maxInstances); + throw new MyriadBadConfigurationException( + "Current number of active, staging, pending instances and requested: " + totalflexInstances + + ", while it is greater then max instances allowed: " + maxInstances); + } } } - final Double cpu = auxTaskConf.getCpus().or(ServiceConfiguration.DEFAULT_CPU); - final Double mem = auxTaskConf.getJvmMaxMemoryMB().or(ServiceConfiguration.DEFAULT_MEMORY); + final Double cpu = auxTaskConf.getCpus(); + final Double mem = auxTaskConf.getJvmMaxMemoryMB(); Collection<NodeTask> nodes = new HashSet<>(); for (int i = 0; i < instances; i++) { @@ -177,6 +182,7 @@ public class MyriadOperations { } } } + int numStagingTasksScaledDown = numScaledDown - numPendingTasksScaledDown; Set<NodeTask> activeTasks = this.schedulerState.getActiveTasksByType(serviceName); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/cbdcbbe0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java index 18c3fce..db945ec 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java @@ -21,6 +21,7 @@ package org.apache.myriad.scheduler; import java.util.HashMap; import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.myriad.configuration.MyriadConfiguration; import org.slf4j.Logger; @@ -82,11 +83,11 @@ public class NMExecutorCLGenImpl implements ExecutorCommandLineGenerator { } String rmHostName = System.getProperty(KEY_YARN_RM_HOSTNAME); - if (rmHostName != null && !rmHostName.isEmpty()) { + if (StringUtils.isNotEmpty(rmHostName)) { addYarnNodemanagerOpt(KEY_YARN_RM_HOSTNAME, rmHostName); } - if (cfg.getNodeManagerConfiguration().getCgroups().or(Boolean.FALSE)) { + if (cfg.getNodeManagerConfiguration().getCgroups()) { addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_HIERARCHY, "mesos/$TASK_DIR"); if (environment.containsKey("YARN_HOME")) { addYarnNodemanagerOpt(KEY_YARN_HOME, environment.get("YARN_HOME")); @@ -157,18 +158,19 @@ public class NMExecutorCLGenImpl implements ExecutorCommandLineGenerator { @Override public String getConfigurationUrl() { String httpPolicy = conf.get(TaskFactory.YARN_HTTP_POLICY); + String address = StringUtils.EMPTY; if (httpPolicy != null && httpPolicy.equals(TaskFactory.YARN_HTTP_POLICY_HTTPS_ONLY)) { - String address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS); - if (address == null || address.isEmpty()) { + address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS); + if (StringUtils.isEmpty(address)) { address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_HOSTNAME) + ":8090"; } - return "https://" + address + "/conf"; } else { - String address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_ADDRESS); - if (address == null || address.isEmpty()) { + address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_ADDRESS); + if (StringUtils.isEmpty(address)) { address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_HOSTNAME) + ":8088"; } - return "http://" + address + "/conf"; } + + return "http://" + address + "/conf"; } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/cbdcbbe0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java index b7bccb7..60d4c44 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java @@ -29,18 +29,15 @@ import org.apache.myriad.configuration.ServiceConfiguration; */ public class ServiceTaskConstraints implements TaskConstraints { - private int portsCount; + private int portsCount = 0; public ServiceTaskConstraints(MyriadConfiguration cfg, String taskPrefix) { - this.portsCount = 0; Map<String, ServiceConfiguration> auxConfigs = cfg.getServiceConfigurations(); - if (auxConfigs == null) { - return; - } + ServiceConfiguration serviceConfig = auxConfigs.get(taskPrefix); if (serviceConfig != null) { if (serviceConfig.getPorts().isPresent()) { - this.portsCount = serviceConfig.getPorts().get().size(); + portsCount = serviceConfig.getPorts().get().size(); } } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/cbdcbbe0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java index dd86ac0..42f698a 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java @@ -24,6 +24,10 @@ import java.util.List; import java.util.Map; import java.util.Objects; import javax.inject.Inject; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.mesos.Protos; import org.apache.mesos.Protos.CommandInfo; import org.apache.mesos.Protos.CommandInfo.URI; @@ -65,13 +69,13 @@ public class ServiceTaskFactoryImpl implements TaskFactory { this.taskUtils = taskUtils; this.clGenerator = new ServiceCommandLineGenerator(cfg, cfg.getMyriadExecutorConfiguration().getNodeManagerUri().orNull()); } - + @Override public TaskInfo createTask(Offer offer, FrameworkID frameworkId, TaskID taskId, NodeTask nodeTask) { Objects.requireNonNull(offer, "Offer should be non-null"); Objects.requireNonNull(nodeTask, "NodeTask should be non-null"); - ServiceConfiguration serviceConfig = cfg.getServiceConfiguration(nodeTask.getTaskPrefix()); + ServiceConfiguration serviceConfig = cfg.getServiceConfiguration(nodeTask.getTaskPrefix()).get(); Objects.requireNonNull(serviceConfig, "ServiceConfig should be non-null"); Objects.requireNonNull(serviceConfig.getCommand().orNull(), "command for ServiceConfig should be non-null"); @@ -82,16 +86,17 @@ public class ServiceTaskFactoryImpl implements TaskFactory { List<Long> additionalPortsNumbers = null; final StringBuilder strB = new StringBuilder("env "); - if (serviceConfig.getServiceOpts() != null) { - strB.append(serviceConfig.getServiceOpts()).append("="); + if (serviceConfig.getServiceOpts().isPresent()) { + strB.append(serviceConfig.getServiceOpts().get()).append("="); strB.append("\""); - if (rmHostName != null && !rmHostName.isEmpty()) { + if (StringUtils.isNotEmpty(rmHostName)) { strB.append("-D" + YARN_RESOURCEMANAGER_HOSTNAME + "=" + rmHostName + " "); } Map<String, Long> ports = serviceConfig.getPorts().orNull(); - if (ports != null && !ports.isEmpty()) { + + if (MapUtils.isNotEmpty(ports)) { int neededPortsCount = 0; for (Map.Entry<String, Long> portEntry : ports.entrySet()) { Long port = portEntry.getValue(); @@ -130,7 +135,7 @@ public class ServiceTaskFactoryImpl implements TaskFactory { .addAllResources(taskUtils.getScalarResource(offer, "cpus", nodeTask.getProfile().getCpus(), 0.0)) .addAllResources(taskUtils.getScalarResource(offer, "mem", nodeTask.getProfile().getMemory(), 0.0)); - if (additionalPortsNumbers != null && !additionalPortsNumbers.isEmpty()) { + if (CollectionUtils.isNotEmpty(additionalPortsNumbers)) { // set ports Value.Ranges.Builder valueRanger = Value.Ranges.newBuilder(); for (Long port : additionalPortsNumbers) { @@ -151,7 +156,7 @@ public class ServiceTaskFactoryImpl implements TaskFactory { MyriadExecutorConfiguration myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration(); CommandInfo.Builder commandInfo = CommandInfo.newBuilder(); Map<String, String> envVars = cfg.getYarnEnvironment(); - if (envVars != null && !envVars.isEmpty()) { + if (MapUtils.isNotEmpty(envVars)) { Protos.Environment.Builder yarnHomeB = Protos.Environment.newBuilder(); for (Map.Entry<String, String> envEntry : envVars.entrySet()) { Protos.Environment.Variable.Builder yarnEnvB = Protos.Environment.Variable.newBuilder(); @@ -163,7 +168,7 @@ public class ServiceTaskFactoryImpl implements TaskFactory { if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) { //Both FrameworkUser and FrameworkSuperuser to get all of the directory permissions correct. - if (!(cfg.getFrameworkUser().isPresent() && cfg.getFrameworkSuperUser().isPresent())) { + if (!minimumUserSet(cfg)) { throw new RuntimeException("Trying to use remote distribution, but frameworkUser" + "and/or frameworkSuperUser not set!"); } @@ -195,6 +200,10 @@ public class ServiceTaskFactoryImpl implements TaskFactory { return commandInfo.build(); } + private Boolean minimumUserSet(MyriadConfiguration conf) { + return (cfg.getFrameworkUser().isPresent() || cfg.getFrameworkSuperUser().isPresent()); + } + @Override public ExecutorInfo getExecutorInfoForSlave(FrameworkID frameworkId, Offer offer, CommandInfo commandInfo) { // TODO (yufeldman) if executor specified use it , otherwise return null @@ -213,9 +222,10 @@ public class ServiceTaskFactoryImpl implements TaskFactory { if (requestedPorts == 0) { return null; } + final List<Long> returnedPorts = new ArrayList<>(); for (Resource resource : offer.getResourcesList()) { - if (resource.getName().equals("ports") && (!resource.hasRole() || resource.getRole().equals("*"))) { + if (resource.getName().equals("ports") && (isDefaultRole(resource))) { Iterator<Value.Range> itr = resource.getRanges().getRangeList().iterator(); while (itr.hasNext()) { Value.Range range = itr.next(); @@ -232,7 +242,12 @@ public class ServiceTaskFactoryImpl implements TaskFactory { } } } - // this is actually an error condition - we did not have enough ports to use + //this is actually an error condition - we did not have enough ports to use + //TODO (hokiegeek2) does this need error handling then? return returnedPorts; } -} + + private boolean isDefaultRole(Resource resource) { + return !resource.hasRole() || resource.getRole().equals("*"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/cbdcbbe0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java index 92e4cf7..6b398a3 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java @@ -18,10 +18,25 @@ */ package org.apache.myriad.scheduler; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.mesos.Protos.*; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.Set; + +import javax.inject.Inject; + +import org.apache.mesos.Protos.CommandInfo; import org.apache.mesos.Protos.CommandInfo.URI; +import org.apache.mesos.Protos.ExecutorID; +import org.apache.mesos.Protos.ExecutorInfo; +import org.apache.mesos.Protos.FrameworkID; +import org.apache.mesos.Protos.Offer; +import org.apache.mesos.Protos.Resource; +import org.apache.mesos.Protos.TaskID; +import org.apache.mesos.Protos.TaskInfo; +import org.apache.mesos.Protos.Value; import org.apache.mesos.Protos.Value.Range; import org.apache.myriad.configuration.MyriadConfiguration; import org.apache.myriad.configuration.MyriadExecutorConfiguration; @@ -29,12 +44,11 @@ import org.apache.myriad.state.NodeTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.inject.Inject; -import java.util.*; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; /** - * Creates Tasks based on mesos offers + * Creates Tasks based upon Mesos offers */ public interface TaskFactory { static final String YARN_RESOURCEMANAGER_HOSTNAME = "yarn.resourcemanager.hostname"; @@ -65,14 +79,12 @@ public interface TaskFactory { private MyriadConfiguration cfg; private TaskUtils taskUtils; private ExecutorCommandLineGenerator clGenerator; - private TaskConstraints constraints; @Inject public NMTaskFactoryImpl(MyriadConfiguration cfg, TaskUtils taskUtils, ExecutorCommandLineGenerator clGenerator) { this.cfg = cfg; this.taskUtils = taskUtils; this.clGenerator = clGenerator; - this.constraints = new NMTaskConstraints(); } @VisibleForTesting @@ -108,7 +120,7 @@ public interface TaskFactory { //Utility function to get the first NMPorts.expectedNumPorts number of ports of an offer @VisibleForTesting protected static NMPorts getPorts(Offer offer) { - HashSet<Long> ports = new HashSet<>(); + Set<Long> ports = new HashSet<>(); for (Resource resource : offer.getResourcesList()) { if (resource.getName().equals("ports") && (!resource.hasRole() || resource.getRole().equals("*"))) { ports = getNMPorts(resource); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/cbdcbbe0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java index 1d9c518..d73a467 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java @@ -18,14 +18,14 @@ */ package org.apache.myriad.scheduler; -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.collect.Iterables; - import java.io.IOException; import java.io.InputStream; import java.io.StringWriter; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; + import javax.annotation.Nullable; import javax.inject.Inject; import javax.xml.parsers.DocumentBuilder; @@ -44,9 +44,13 @@ import javax.xml.xpath.XPathExpression; import javax.xml.xpath.XPathExpressionException; import javax.xml.xpath.XPathFactory; -import com.google.common.base.Preconditions; import org.apache.mesos.Protos; -import org.apache.myriad.configuration.*; +import org.apache.myriad.configuration.MyriadBadConfigurationException; +import org.apache.myriad.configuration.MyriadConfiguration; +import org.apache.myriad.configuration.MyriadContainerConfiguration; +import org.apache.myriad.configuration.MyriadDockerConfiguration; +import org.apache.myriad.configuration.NodeManagerConfiguration; +import org.apache.myriad.configuration.ServiceConfiguration; import org.apache.myriad.executor.MyriadExecutorDefaults; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +60,11 @@ import org.w3c.dom.NodeList; import org.xml.sax.InputSource; import org.xml.sax.SAXException; +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; + /** * utility class for working with tasks and node manager profiles */ @@ -158,14 +167,11 @@ public class TaskUtils { } public double getNodeManagerMemory() { - NodeManagerConfiguration nmCfg = this.cfg.getNodeManagerConfiguration(); - return (nmCfg.getJvmMaxMemoryMB().isPresent() ? nmCfg.getJvmMaxMemoryMB() - .get() : NodeManagerConfiguration.DEFAULT_JVM_MAX_MEMORY_MB) * (1 + NodeManagerConfiguration.JVM_OVERHEAD); + return cfg.getNodeManagerConfiguration().getJvmMaxMemoryMB(); } - + public double getNodeManagerCpus() { - Optional<Double> cpus = this.cfg.getNodeManagerConfiguration().getCpus(); - return cpus.isPresent() ? cpus.get() : NodeManagerConfiguration.DEFAULT_NM_CPUS; + return cfg.getNodeManagerConfiguration().getCpus(); } public double getExecutorCpus() { @@ -174,9 +180,7 @@ public class TaskUtils { } public double getExecutorMemory() { - MyriadExecutorConfiguration executorCfg = this.cfg.getMyriadExecutorConfiguration(); - return (executorCfg.getJvmMaxMemoryMB().isPresent() ? executorCfg.getJvmMaxMemoryMB() - .get() : MyriadExecutorDefaults.DEFAULT_JVM_MAX_MEMORY_MB) * (1 + MyriadExecutorDefaults.JVM_OVERHEAD); + return cfg.getMyriadExecutorConfiguration().getJvmMaxMemoryMB(); } public double getTaskCpus(NMProfile profile) { @@ -193,28 +197,26 @@ public class TaskUtils { if (taskName.startsWith(NodeManagerConfiguration.NM_TASK_PREFIX)) { return getAggregateCpus(profile); } - ServiceConfiguration auxConf = cfg.getServiceConfiguration(taskName); - if (auxConf == null) { + + Optional<ServiceConfiguration> auxConf = cfg.getServiceConfiguration(taskName); + if (!auxConf.isPresent()) { throw new MyriadBadConfigurationException("Can not find profile for task name: " + taskName); } - if (!auxConf.getCpus().isPresent()) { - throw new MyriadBadConfigurationException("cpu is not defined for task with name: " + taskName); - } - return auxConf.getCpus().get(); + + return auxConf.get().getCpus(); } public double getAuxTaskMemory(NMProfile profile, String taskName) throws MyriadBadConfigurationException { if (taskName.startsWith(NodeManagerConfiguration.NM_TASK_PREFIX)) { return getAggregateMemory(profile); } - ServiceConfiguration auxConf = cfg.getServiceConfiguration(taskName); - if (auxConf == null) { - throw new MyriadBadConfigurationException("Can not find profile for task name: " + taskName); - } - if (!auxConf.getJvmMaxMemoryMB().isPresent()) { - throw new MyriadBadConfigurationException("memory is not defined for task with name: " + taskName); + + Optional<ServiceConfiguration> auxConf = cfg.getServiceConfiguration(taskName); + if (!auxConf.isPresent()) { + throw new MyriadBadConfigurationException("Cannot find profile for task name: " + taskName); } - return auxConf.getJvmMaxMemoryMB().get(); + + return auxConf.get().getJvmMaxMemoryMB(); } public TaskUtils() { @@ -299,7 +301,8 @@ public class TaskUtils { */ public Iterable<Protos.Resource> getScalarResource(Protos.Offer offer, String name, Double value, Double used) { String role = cfg.getFrameworkRole(); - ArrayList<Protos.Resource> resources = new ArrayList<>(); + List<Protos.Resource> resources = new ArrayList<Protos.Resource>(); + double resourceDifference = 0; //used to determine the resource difference of value and the resources requested from role * //Find role by name, must loop through resources for (Protos.Resource r : offer.getResourcesList()) { http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/cbdcbbe0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java index 1df68a5..8d1cd03 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java @@ -130,7 +130,7 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv if (matches(offer, taskToLaunch, constraint) && SchedulerUtils.isUniqueHostname(offer, taskToLaunch, launchedTasks)) { try { - final TaskInfo task = taskFactoryMap.get(taskPrefix).createTask(offer, schedulerState.getFrameworkID(), + final TaskInfo task = taskFactoryMap.get(taskPrefix).createTask(offer, schedulerState.getFrameworkID().get(), pendingTaskId, taskToLaunch); List<OfferID> offerIds = new ArrayList<>(); offerIds.add(offer.getId()); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/cbdcbbe0/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java b/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java index 0e2a18e..8a531ea 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java @@ -18,7 +18,6 @@ */ package org.apache.myriad.state; -import com.google.common.collect.Sets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -30,6 +29,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Pattern; + import org.apache.commons.collections.CollectionUtils; import org.apache.mesos.Protos; import org.apache.mesos.Protos.SlaveID; @@ -38,6 +38,9 @@ import org.apache.myriad.state.utils.StoreContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Optional; +import com.google.common.collect.Sets; + /** * Represents the state of the Myriad scheduler */ @@ -353,8 +356,8 @@ public class SchedulerState { return this.tasks.containsKey(taskID); } - public synchronized Protos.FrameworkID getFrameworkID() { - return this.frameworkId; + public synchronized Optional<Protos.FrameworkID> getFrameworkID() { + return Optional.fromNullable(frameworkId); } public synchronized void setFrameworkId(Protos.FrameworkID newFrameworkId) { http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/cbdcbbe0/myriad-scheduler/src/main/resources/myriad-config-default.yml ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/resources/myriad-config-default.yml b/myriad-scheduler/src/main/resources/myriad-config-default.yml index f985054..066f734 100644 --- a/myriad-scheduler/src/main/resources/myriad-config-default.yml +++ b/myriad-scheduler/src/main/resources/myriad-config-default.yml @@ -20,10 +20,10 @@ mesosMaster: 10.0.2.15:5050 checkpoint: false frameworkFailoverTimeout: 43200000 frameworkName: MyriadAlpha -frameworkRole: +frameworkRole: "*" frameworkUser: hduser # User the Node Manager runs as, required if nodeManagerURI set, otherwise defaults to the user # running the resource manager. -frameworkSuperUser: root # To be depricated, currently permissions need set by a superuser due to Mesos-1790. Must be +frameworkSuperUser: root # To be deprecated, currently permissions need set by a superuser due to Mesos-1790. Must be # root or have passwordless sudo. Required if nodeManagerURI set, ignored otherwise. nativeLibrary: /usr/local/lib/libmesos.so zkServers: localhost:2181 http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/cbdcbbe0/myriad-scheduler/src/test/java/org/apache/myriad/configuration/MyriadConfigurationTest.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/configuration/MyriadConfigurationTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/configuration/MyriadConfigurationTest.java index 85fe5e6..22df23d 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/configuration/MyriadConfigurationTest.java +++ b/myriad-scheduler/src/test/java/org/apache/myriad/configuration/MyriadConfigurationTest.java @@ -18,14 +18,17 @@ package org.apache.myriad.configuration; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + import java.util.Map; -import org.junit.AfterClass; + import org.junit.BeforeClass; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; /** * AuxServices/tasks test @@ -42,16 +45,10 @@ public class MyriadConfigurationTest { } - @AfterClass - public static void tearDownAfterClass() throws Exception { - } - @Test - public void additionalPropertiestest() throws Exception { - + public void serviceConfigurationTest() throws Exception { Map<String, ServiceConfiguration> auxConfigs = cfg.getServiceConfigurations(); - assertNotNull(auxConfigs); assertEquals(auxConfigs.size(), 2); for (Map.Entry<String, ServiceConfiguration> entry : auxConfigs.entrySet()) { @@ -62,4 +59,78 @@ public class MyriadConfigurationTest { } } -} + @Test + public void coreConfigurationTest() throws Exception { + assertEquals("MyriadTest", cfg.getFrameworkName()); + + //authorization parameters + assertEquals("*", cfg.getFrameworkRole()); + assertEquals("hduser", cfg.getFrameworkUser().get()); + assertEquals("root", cfg.getFrameworkSuperUser().get()); + + //ports and directory paths + assertEquals("10.0.2.15:5050", cfg.getMesosMaster()); + assertEquals("/usr/local/lib/libmesos.so", cfg.getNativeLibrary()); + assertEquals(new Integer(8192), cfg.getRestApiPort()); + assertEquals("10.0.2.15:2181", cfg.getZkServers()); + + //timeouts + assertEquals(new Double(44200000), cfg.getFrameworkFailoverTimeout()); + assertEquals(new Integer(25000), cfg.getZkTimeout()); + + //checkpoints + assertEquals(false, cfg.isCheckpoint()); + assertEquals(true, cfg.isHAEnabled()); + assertEquals(false, cfg.isRebalancerEnabled()); + } + + @Test + public void executorConfigurationTest() throws Exception { + MyriadExecutorConfiguration conf = cfg.getMyriadExecutorConfiguration(); + + assertEquals(new Double(256), conf.getJvmMaxMemoryMB()); + assertEquals("hdfs://namenode:port/dist/hadoop-2.7.0.tar.gz", conf.getNodeManagerUri().get()); + assertEquals("file:///usr/local/libexec/mesos/myriad-executor-runnable-0.1.0.jar", conf.getPath()); + } + + @Test + public void nodeManagerConfigurationTest() throws Exception { + NodeManagerConfiguration config = cfg.getNodeManagerConfiguration(); + + assertFalse(config.getCgroups()); + assertEquals(new Double(0.2), config.getCpus()); + assertEquals(new Double(1024.0), config.getJvmMaxMemoryMB()); + } + + @Test + public void profilesConfigurationTest() throws Exception { + Map<String, Map<String, String>> profiles = cfg.getProfiles(); + + for (Map.Entry<String, Map<String, String>> profile : profiles.entrySet()) { + assertTrue(validateProfile(profile)); + } + } + + private boolean validateProfile(Map.Entry<String, Map<String, String>> entry) { + String key = entry.getKey(); + Map<String, String> value = entry.getValue(); + + switch (key) { + case "small" : { + return value.get("cpu").equals("1") && value.get("mem").equals("1100"); + } + + case "medium" : { + return value.get("cpu").equals("2") && value.get("mem").equals("2048"); + } + + case "large" : { + return value.get("cpu").equals("4") && value.get("mem").equals("4096"); + } + + default : { + return true; + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/cbdcbbe0/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestTaskUtils.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestTaskUtils.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestTaskUtils.java index 14e0a00..06fa698 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestTaskUtils.java +++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestTaskUtils.java @@ -134,8 +134,9 @@ public class TestTaskUtils { private void checkResourceList(Iterable<Protos.Resource> resources, String name, Double roleVal, Double defaultVal) { int i = 0; - Range defaultValueRange = Ranges.closed(defaultVal - epsilon, defaultVal + epsilon); - Range roleValueRange = Ranges.closed(roleVal - epsilon, roleVal + epsilon); + + Range<Double> defaultValueRange = Ranges.closed(defaultVal - epsilon, defaultVal + epsilon); + Range<Double> roleValueRange = Ranges.closed(roleVal - epsilon, roleVal + epsilon); for (Protos.Resource resource: resources) { if (resource.hasRole() && resource.getRole().equals("test")) { http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/cbdcbbe0/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy index f7d8c43..4e3dc50 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy +++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy @@ -119,7 +119,7 @@ class YarnNodeCapacityManagerSpec extends FGSTestBaseSpec { zeroNM.getTotalCapability().getVirtualCores() == 2 1 * rmContext.getDispatcher().getEventHandler().handle(_ as NodeResourceUpdateSchedulerEvent) } - + YarnNodeCapacityManager getYarnNodeCapacityManager() { def registry = Mock(InterceptorRegistry) def executorInfo = Protos.ExecutorInfo.newBuilder() @@ -133,9 +133,9 @@ class YarnNodeCapacityManagerSpec extends FGSTestBaseSpec { getNodeTask(_, NodeManagerConfiguration.NM_TASK_PREFIX) >> nodeTask } def cfg = Mock(MyriadConfiguration) { - getFrameworkRole() >> "some_role" + getFrameworkName() >> "MyriadTest" } - print(cfg.getFrameworkRole()) + def taskUtils = new TaskUtils(cfg) return new YarnNodeCapacityManager(registry, yarnScheduler, rmContext, myriadDriver, offerLifecycleManager, nodeStore, state, taskUtils) http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/cbdcbbe0/myriad-scheduler/src/test/resources/myriad-config-test-default-with-docker-info.yml ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/resources/myriad-config-test-default-with-docker-info.yml b/myriad-scheduler/src/test/resources/myriad-config-test-default-with-docker-info.yml index 221607a..4bdb6e2 100644 --- a/myriad-scheduler/src/test/resources/myriad-config-test-default-with-docker-info.yml +++ b/myriad-scheduler/src/test/resources/myriad-config-test-default-with-docker-info.yml @@ -22,7 +22,7 @@ frameworkName: MyriadTest frameworkRole: test frameworkUser: hduser # User the Node Manager runs as, required if nodeManagerURI set, otherwise defaults to the user # running the resource manager. -frameworkSuperUser: root # To be depricated, currently permissions need set by a superuser due to Mesos-1790. Must be +frameworkSuperUser: root # To be deprecated, currently permissions need set by a superuser due to Mesos-1790. Must be # root or have passwordless sudo. Required if nodeManagerURI set, ignored otherwise. nativeLibrary: /usr/local/lib/libmesos.so zkServers: localhost:2181 http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/cbdcbbe0/myriad-scheduler/src/test/resources/myriad-config-test-default-with-framework-role.yml ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/resources/myriad-config-test-default-with-framework-role.yml b/myriad-scheduler/src/test/resources/myriad-config-test-default-with-framework-role.yml index ab7f137..87700fe 100644 --- a/myriad-scheduler/src/test/resources/myriad-config-test-default-with-framework-role.yml +++ b/myriad-scheduler/src/test/resources/myriad-config-test-default-with-framework-role.yml @@ -22,7 +22,7 @@ frameworkName: MyriadTest frameworkRole: test frameworkUser: hduser # User the Node Manager runs as, required if nodeManagerURI set, otherwise defaults to the user # running the resource manager. -frameworkSuperUser: root # To be depricated, currently permissions need set by a superuser due to Mesos-1790. Must be +frameworkSuperUser: root # To be deprecated, currently permissions need set by a superuser due to Mesos-1790. Must be # root or have passwordless sudo. Required if nodeManagerURI set, ignored otherwise. nativeLibrary: /usr/local/lib/libmesos.so zkServers: localhost:2181 http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/cbdcbbe0/myriad-scheduler/src/test/resources/myriad-config-test-default.yml ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/resources/myriad-config-test-default.yml b/myriad-scheduler/src/test/resources/myriad-config-test-default.yml index 241aac9..2945bcd 100644 --- a/myriad-scheduler/src/test/resources/myriad-config-test-default.yml +++ b/myriad-scheduler/src/test/resources/myriad-config-test-default.yml @@ -16,17 +16,18 @@ # under the License. mesosMaster: 10.0.2.15:5050 +haEnabled: true checkpoint: false -frameworkFailoverTimeout: 43200000 +frameworkFailoverTimeout: 44200000 frameworkName: MyriadTest -frameworkRole: +frameworkRole: "*" frameworkUser: hduser # User the Node Manager runs as, required if nodeManagerURI set, otherwise defaults to the user # running the resource manager. -frameworkSuperUser: root # To be depricated, currently permissions need set by a superuser due to Mesos-1790. Must be +frameworkSuperUser: root # To be deprecated, currently permissions need set by a superuser due to Mesos-1790. Must be # root or have passwordless sudo. Required if nodeManagerURI set, ignored otherwise. nativeLibrary: /usr/local/lib/libmesos.so -zkServers: localhost:2181 -zkTimeout: 20000 +zkServers: 10.0.2.15:2181 +zkTimeout: 25000 restApiPort: 8192 profiles: small: @@ -38,7 +39,8 @@ profiles: large: cpu: 4 mem: 4096 -rebalancer: false + +rebalancer: nodemanager: jvmMaxMemoryMB: 1024 cpus: 0.2