http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskConstraints.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskConstraints.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskConstraints.java deleted file mode 100644 index ace9928..0000000 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskConstraints.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.myriad.scheduler; - -/** - * Generic interface to represent some constraints that task can impose - * while figuring out whether to accept or reject the offer - * We may start small and then eventually add more constraints - */ -public interface TaskConstraints { - - /** - * Required number of ports - * - * @return portsNumber - */ - public int portsCount(); - -}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskConstraintsManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskConstraintsManager.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskConstraintsManager.java deleted file mode 100644 index 0665190..0000000 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskConstraintsManager.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.myriad.scheduler; - -import java.util.HashMap; -import java.util.Map; - -/** - * Factory class to keep map of the constraints - */ -public class TaskConstraintsManager { - - /** - * Since all the additions will happen during init time, there is no need to make this map Concurrent - * if/when later on it will change we may need to change HashMap to Concurrent one - */ - private Map<String, TaskConstraints> taskConstraintsMap = new HashMap<>(); - - public TaskConstraints getConstraints(String taskPrefix) { - return taskConstraintsMap.get(taskPrefix); - } - - public void addTaskConstraints(final String taskPrefix, final TaskConstraints taskConstraints) { - if (taskConstraints != null) { - taskConstraintsMap.put(taskPrefix, taskConstraints); - } - } - - public boolean exists(String taskPrefix) { - return taskConstraintsMap.containsKey(taskPrefix); - } -} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java index 6b398a3..7e63e0d 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java @@ -9,233 +9,151 @@ * <p/> * http://www.apache.org/licenses/LICENSE-2.0 * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ -package org.apache.myriad.scheduler; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Objects; -import java.util.Random; -import java.util.Set; -import javax.inject.Inject; +package org.apache.myriad.scheduler; -import org.apache.mesos.Protos.CommandInfo; -import org.apache.mesos.Protos.CommandInfo.URI; -import org.apache.mesos.Protos.ExecutorID; -import org.apache.mesos.Protos.ExecutorInfo; -import org.apache.mesos.Protos.FrameworkID; -import org.apache.mesos.Protos.Offer; -import org.apache.mesos.Protos.Resource; -import org.apache.mesos.Protos.TaskID; -import org.apache.mesos.Protos.TaskInfo; -import org.apache.mesos.Protos.Value; -import org.apache.mesos.Protos.Value.Range; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import org.apache.mesos.Protos; import org.apache.myriad.configuration.MyriadConfiguration; -import org.apache.myriad.configuration.MyriadExecutorConfiguration; +import org.apache.myriad.configuration.MyriadContainerConfiguration; +import org.apache.myriad.configuration.MyriadDockerConfiguration; +import org.apache.myriad.scheduler.resource.ResourceOfferContainer; import org.apache.myriad.state.NodeTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; +import javax.annotation.Nullable; +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * Creates Tasks based upon Mesos offers + * Base class to create Tasks based upon Mesos offers */ -public interface TaskFactory { +public abstract class TaskFactory { + public static final String EXECUTOR_NAME = "myriad_task"; + public static final String EXECUTOR_PREFIX = "myriad_executor"; + + protected static final Logger LOGGER = LoggerFactory.getLogger(TaskFactory.class); + static final String YARN_RESOURCEMANAGER_HOSTNAME = "yarn.resourcemanager.hostname"; static final String YARN_RESOURCEMANAGER_WEBAPP_ADDRESS = "yarn.resourcemanager.webapp.address"; static final String YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS = "yarn.resourcemanager.webapp.https.address"; static final String YARN_HTTP_POLICY = "yarn.http.policy"; static final String YARN_HTTP_POLICY_HTTPS_ONLY = "HTTPS_ONLY"; - TaskInfo createTask(Offer offer, FrameworkID frameworkId, TaskID taskId, NodeTask nodeTask); + private static final String CONTAINER_PATH_KEY = "containerPath"; + private static final String HOST_PATH_KEY = "hostPath"; + private static final String RW_MODE = "mode"; + private static final String PARAMETER_KEY_KEY = "key"; + private static final String PARAMETER_VALUE_KEY = "value"; - // TODO(Santosh): This is needed because the ExecutorInfo constructed - // to launch NM needs to be specified to launch placeholder tasks for - // yarn containers (for fine grained scaling). - // If mesos supports just specifying the 'ExecutorId' without the full - // ExecutorInfo, we wouldn't need this interface method. - ExecutorInfo getExecutorInfoForSlave(FrameworkID frameworkId, Offer offer, CommandInfo commandInfo); + protected MyriadConfiguration cfg; + protected TaskUtils taskUtils; + protected ExecutorCommandLineGenerator clGenerator; - /** - * Creates TaskInfo objects to launch NMs as mesos tasks. - */ - class NMTaskFactoryImpl implements TaskFactory { - public static final String EXECUTOR_NAME = "myriad_task"; - public static final String EXECUTOR_PREFIX = "myriad_executor"; - public static final String YARN_NODEMANAGER_OPTS_KEY = "YARN_NODEMANAGER_OPTS"; - - private static final Logger LOGGER = LoggerFactory.getLogger(NMTaskFactoryImpl.class); - private static final Random rand = new Random(); - private MyriadConfiguration cfg; - private TaskUtils taskUtils; - private ExecutorCommandLineGenerator clGenerator; - - @Inject - public NMTaskFactoryImpl(MyriadConfiguration cfg, TaskUtils taskUtils, ExecutorCommandLineGenerator clGenerator) { - this.cfg = cfg; - this.taskUtils = taskUtils; - this.clGenerator = clGenerator; - } + public TaskFactory() { - @VisibleForTesting - protected static HashSet<Long> getNMPorts(Resource resource) { - HashSet<Long> ports = new HashSet<>(); - if (resource.getName().equals("ports")) { - /* - ranges.getRangeList() returns a list of ranges, each range specifies a begin and end only. - so must loop though each range until we get all ports needed. We exit each loop as soon as all - ports are found so bounded by NMPorts.expectedNumPorts. - */ - final List<Range> ranges = resource.getRanges().getRangeList(); - final List<Long> allAvailablePorts = new ArrayList<>(); - for (Range range : ranges) { - if (range.hasBegin() && range.hasEnd()) { - for (long i = range.getBegin(); i <= range.getEnd(); i++) { - allAvailablePorts.add(i); - } - } - } + } - Preconditions.checkState(allAvailablePorts.size() >= NMPorts.expectedNumPorts(), "Not enough ports in offer"); + @Inject + public TaskFactory(MyriadConfiguration cfg, TaskUtils taskUtils, ExecutorCommandLineGenerator clGenerator) { + this.cfg = cfg; + this.taskUtils = taskUtils; + this.clGenerator = clGenerator; + } - while (ports.size() < NMPorts.expectedNumPorts()) { - int portIndex = rand.nextInt(allAvailablePorts.size()); - ports.add(allAvailablePorts.get(portIndex)); - allAvailablePorts.remove(portIndex); - } - } - return ports; - } + public abstract Protos.TaskInfo createTask(ResourceOfferContainer resourceOfferContainer, Protos.FrameworkID frameworkId, + Protos.TaskID taskId, NodeTask nodeTask); - //Utility function to get the first NMPorts.expectedNumPorts number of ports of an offer - @VisibleForTesting - protected static NMPorts getPorts(Offer offer) { - Set<Long> ports = new HashSet<>(); - for (Resource resource : offer.getResourcesList()) { - if (resource.getName().equals("ports") && (!resource.hasRole() || resource.getRole().equals("*"))) { - ports = getNMPorts(resource); - break; + // TODO(Santosh): This is needed because the ExecutorInfo constructed + // to launch NM needs to be specified to launch placeholder tasks for + // yarn containers (for fine grained scaling). + // If mesos supports just specifying the 'ExecutorId' without the full + // ExecutorInfo, we wouldn't need this interface method. + public abstract Protos.ExecutorInfo getExecutorInfoForSlave(ResourceOfferContainer resourceOfferContainer, Protos.FrameworkID frameworkId, Protos.CommandInfo commandInfo); + + protected Iterable<Protos.Volume> getVolumes(Iterable<Map<String, String>> volume) { + return Iterables.transform(volume, new Function<Map<String, String>, Protos.Volume>() { + @Nullable + @Override + public Protos.Volume apply(Map<String, String> map) { + Preconditions.checkArgument(map.containsKey(HOST_PATH_KEY) && map.containsKey(CONTAINER_PATH_KEY)); + Protos.Volume.Mode mode = Protos.Volume.Mode.RO; + if (map.containsKey(RW_MODE) && map.get(RW_MODE).toLowerCase().equals("rw")) { + mode = Protos.Volume.Mode.RW; } + return Protos.Volume.newBuilder() + .setContainerPath(map.get(CONTAINER_PATH_KEY)) + .setHostPath(map.get(HOST_PATH_KEY)) + .setMode(mode) + .build(); } + }); + } - Long [] portArray = ports.toArray(new Long [ports.size()]); - return new NMPorts(portArray); - } - - @VisibleForTesting - CommandInfo getCommandInfo(ServiceResourceProfile profile, NMPorts ports) { - MyriadExecutorConfiguration myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration(); - CommandInfo.Builder commandInfo = CommandInfo.newBuilder(); - String cmd; - - if (myriadExecutorConfiguration.getJvmUri().isPresent()) { - final String jvmRemoteUri = myriadExecutorConfiguration.getJvmUri().get(); - LOGGER.info("Getting JRE distribution from:" + jvmRemoteUri); - URI jvmUri = URI.newBuilder().setValue(jvmRemoteUri).setExtract(true).build(); - commandInfo.addUris(jvmUri); - } - - if (myriadExecutorConfiguration.getConfigUri().isPresent()) { - String configURI = myriadExecutorConfiguration.getConfigUri().get(); - LOGGER.info("Getting Hadoop distribution from: {}", configURI); - commandInfo.addUris(URI.newBuilder().setValue(configURI).build()); - } - - if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) { - //Both FrameworkUser and FrameworkSuperuser to get all of the directory permissions correct. - if (!(cfg.getFrameworkUser().isPresent() && cfg.getFrameworkSuperUser().isPresent())) { - throw new RuntimeException("Trying to use remote distribution, but frameworkUser" + "and/or frameworkSuperUser not set!"); - } - String nodeManagerUri = myriadExecutorConfiguration.getNodeManagerUri().get(); - cmd = clGenerator.generateCommandLine(profile, ports); - - //get the nodemanagerURI - //We're going to extract ourselves, so setExtract is false - LOGGER.info("Getting Hadoop distribution from: {}", nodeManagerUri); - URI nmUri = URI.newBuilder().setValue(nodeManagerUri).setExtract(false).build(); - - //get configs directly from resource manager - String configUrlString = clGenerator.getConfigurationUrl(); - LOGGER.info("Getting config from:" + configUrlString); - URI configUri = URI.newBuilder().setValue(configUrlString).build(); - LOGGER.info("Slave will execute command: {}", cmd); - commandInfo.addUris(nmUri).addUris(configUri).setValue("echo \"" + cmd + "\";" + cmd); - commandInfo.setUser(cfg.getFrameworkSuperUser().get()); - - } else { - cmd = clGenerator.generateCommandLine(profile, ports); - commandInfo.setValue("echo \"" + cmd + "\";" + cmd); - - if (cfg.getFrameworkUser().isPresent()) { - commandInfo.setUser(cfg.getFrameworkUser().get()); - } + protected Iterable<Protos.Parameter> getParameters(Iterable<Map<String, String>> params) { + Preconditions.checkNotNull(params); + return Iterables.transform(params, new Function<Map<String, String>, Protos.Parameter>() { + @Override + public Protos.Parameter apply(Map<String, String> parameter) { + Preconditions.checkNotNull(parameter, "Null parameter"); + Preconditions.checkState(parameter.containsKey(PARAMETER_KEY_KEY), "Missing key"); + Preconditions.checkState(parameter.containsKey(PARAMETER_VALUE_KEY), "Missing value"); + return Protos.Parameter.newBuilder() + .setKey(parameter.get(PARAMETER_KEY_KEY)) + .setValue(PARAMETER_VALUE_KEY) + .build(); } - return commandInfo.build(); - } + }); + } - @Override - public TaskInfo createTask(Offer offer, FrameworkID frameworkId, TaskID taskId, NodeTask nodeTask) { - Objects.requireNonNull(offer, "Offer should be non-null"); - Objects.requireNonNull(nodeTask, "NodeTask should be non-null"); - - NMPorts ports = getPorts(offer); - LOGGER.debug(ports.toString()); - - ServiceResourceProfile serviceProfile = nodeTask.getProfile(); - Double taskMemory = serviceProfile.getAggregateMemory(); - Double taskCpus = serviceProfile.getAggregateCpu(); - - CommandInfo commandInfo = getCommandInfo(serviceProfile, ports); - ExecutorInfo executorInfo = getExecutorInfoForSlave(frameworkId, offer, commandInfo); - - TaskInfo.Builder taskBuilder = TaskInfo.newBuilder().setName(cfg.getFrameworkName() + "-" + taskId.getValue()).setTaskId(taskId).setSlaveId( - offer.getSlaveId()); - - return taskBuilder - .addAllResources(taskUtils.getScalarResource(offer, "cpus", taskCpus, taskUtils.getExecutorCpus())) - .addAllResources(taskUtils.getScalarResource(offer, "mem", taskMemory, taskUtils.getExecutorMemory())) - .addResources(Resource.newBuilder().setName("ports").setType(Value.Type.RANGES).setRanges(Value.Ranges.newBuilder() - .addRange(Range.newBuilder().setBegin(ports.getRpcPort()).setEnd(ports.getRpcPort()).build()) - .addRange(Range.newBuilder().setBegin(ports.getLocalizerPort()).setEnd(ports.getLocalizerPort()).build()) - .addRange(Range.newBuilder().setBegin(ports.getWebAppHttpPort()).setEnd(ports.getWebAppHttpPort()).build()) - .addRange(Range.newBuilder().setBegin(ports.getShufflePort()).setEnd(ports.getShufflePort()).build()))) - .setExecutor(executorInfo) - .build(); - } + protected Protos.ContainerInfo.DockerInfo getDockerInfo(MyriadDockerConfiguration dockerConfiguration) { + Preconditions.checkArgument(dockerConfiguration.getNetwork().equals("HOST"), "Currently only host networking supported"); + Protos.ContainerInfo.DockerInfo.Builder dockerBuilder = Protos.ContainerInfo.DockerInfo.newBuilder() + .setImage(dockerConfiguration.getImage()) + .setForcePullImage(dockerConfiguration.getForcePullImage()) + .setNetwork(Protos.ContainerInfo.DockerInfo.Network.valueOf(dockerConfiguration.getNetwork())) + .setPrivileged(dockerConfiguration.getPrivledged()) + .addAllParameters(getParameters(dockerConfiguration.getParameters())); + return dockerBuilder.build(); + } - @Override - public ExecutorInfo getExecutorInfoForSlave(FrameworkID frameworkId, Offer offer, CommandInfo commandInfo) { - ExecutorID executorId = ExecutorID.newBuilder() - .setValue(EXECUTOR_PREFIX + frameworkId.getValue() + offer.getId().getValue() + offer.getSlaveId().getValue()) - .build(); - ExecutorInfo.Builder executorInfo = ExecutorInfo.newBuilder().setCommand(commandInfo).setName(EXECUTOR_NAME).setExecutorId(executorId) - .addAllResources(taskUtils.getScalarResource(offer, "cpus", taskUtils.getExecutorCpus(), 0.0)) - .addAllResources(taskUtils.getScalarResource(offer, "mem", taskUtils.getExecutorMemory(), 0.0)); - if (cfg.getContainerInfo().isPresent()) { - executorInfo.setContainer(taskUtils.getContainerInfo()); - } - return executorInfo.build(); + /** + * Builds a ContainerInfo Object + * + * @return ContainerInfo + */ + protected Protos.ContainerInfo getContainerInfo() { + Preconditions.checkArgument(cfg.getContainerInfo().isPresent(), "ContainerConfiguration doesn't exist!"); + MyriadContainerConfiguration containerConfiguration = cfg.getContainerInfo().get(); + Protos.ContainerInfo.Builder containerBuilder = Protos.ContainerInfo.newBuilder() + .setType(Protos.ContainerInfo.Type.valueOf(containerConfiguration.getType())) + .addAllVolumes(getVolumes(containerConfiguration.getVolumes())); + if (containerConfiguration.getDockerInfo().isPresent()) { + MyriadDockerConfiguration dockerConfiguration = containerConfiguration.getDockerInfo().get(); + containerBuilder.setDocker(getDockerInfo(dockerConfiguration)); } + return containerBuilder.build(); } /** - * Implement NM Task Constraints + * Simple helper to convert Mesos Range Resource to a list of longs. */ - public static class NMTaskConstraints implements TaskConstraints { - - @Override - public int portsCount() { - return NMPorts.expectedNumPorts(); + protected List<Long> rangesConverter(List<Protos.Resource> rangeResources) { + List<Long> ret = new ArrayList(); + for (Protos.Resource range : rangeResources) { + ret.add(range.getRanges().getRange(0).getBegin()); } + return ret; } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java index 6ab6ee0..4bd60bc 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java @@ -18,18 +18,11 @@ */ package org.apache.myriad.scheduler; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.Iterables; import org.apache.mesos.Protos; import org.apache.myriad.configuration.MyriadConfiguration; -import org.apache.myriad.configuration.MyriadContainerConfiguration; -import org.apache.myriad.configuration.MyriadDockerConfiguration; import org.apache.myriad.executor.MyriadExecutorDefaults; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; import javax.inject.Inject; import java.util.ArrayList; import java.util.List; @@ -40,11 +33,7 @@ import java.util.Map; */ public class TaskUtils { private static final Logger LOGGER = LoggerFactory.getLogger(TaskUtils.class); - private static final String CONTAINER_PATH_KEY = "containerPath"; - private static final String HOST_PATH_KEY = "hostPath"; - private static final String RW_MODE = "mode"; - private static final String PARAMETER_KEY_KEY = "key"; - private static final String PARAMETER_VALUE_KEY = "value"; + private MyriadConfiguration cfg; @@ -53,6 +42,7 @@ public class TaskUtils { this.cfg = cfg; } + public double getNodeManagerMemory() { return cfg.getNodeManagerConfiguration().getJvmMaxMemoryMB(); } @@ -61,8 +51,11 @@ public class TaskUtils { return cfg.getNodeManagerConfiguration().getCpus(); } - public double getExecutorCpus() { + public Map<String, Long> getNodeManagerPorts() { + return cfg.getNodeManagerConfiguration().getPorts(); + } + public double getExecutorCpus() { return MyriadExecutorDefaults.DEFAULT_CPUS; } @@ -74,71 +67,6 @@ public class TaskUtils { super(); } - public Iterable<Protos.Volume> getVolumes(Iterable<Map<String, String>> volume) { - return Iterables.transform(volume, new Function<Map<String, String>, Protos.Volume>() { - @Nullable - @Override - public Protos.Volume apply(Map<String, String> map) { - Preconditions.checkArgument(map.containsKey(HOST_PATH_KEY) && map.containsKey(CONTAINER_PATH_KEY)); - Protos.Volume.Mode mode = Protos.Volume.Mode.RO; - if (map.containsKey(RW_MODE) && map.get(RW_MODE).toLowerCase().equals("rw")) { - mode = Protos.Volume.Mode.RW; - } - return Protos.Volume.newBuilder() - .setContainerPath(map.get(CONTAINER_PATH_KEY)) - .setHostPath(map.get(HOST_PATH_KEY)) - .setMode(mode) - .build(); - } - }); - } - - public Iterable<Protos.Parameter> getParameters(Iterable<Map<String, String>> params) { - Preconditions.checkNotNull(params); - return Iterables.transform(params, new Function<Map<String, String>, Protos.Parameter>() { - @Override - public Protos.Parameter apply(Map<String, String> parameter) { - Preconditions.checkNotNull(parameter, "Null parameter"); - Preconditions.checkState(parameter.containsKey(PARAMETER_KEY_KEY), "Missing key"); - Preconditions.checkState(parameter.containsKey(PARAMETER_VALUE_KEY), "Missing value"); - return Protos.Parameter.newBuilder() - .setKey(parameter.get(PARAMETER_KEY_KEY)) - .setValue(PARAMETER_VALUE_KEY) - .build(); - } - }); - } - - private Protos.ContainerInfo.DockerInfo getDockerInfo(MyriadDockerConfiguration dockerConfiguration) { - Preconditions.checkArgument(dockerConfiguration.getNetwork().equals("HOST"), "Currently only host networking supported"); - Protos.ContainerInfo.DockerInfo.Builder dockerBuilder = Protos.ContainerInfo.DockerInfo.newBuilder() - .setImage(dockerConfiguration.getImage()) - .setForcePullImage(dockerConfiguration.getForcePullImage()) - .setNetwork(Protos.ContainerInfo.DockerInfo.Network.valueOf(dockerConfiguration.getNetwork())) - .setPrivileged(dockerConfiguration.getPrivledged()) - .addAllParameters(getParameters(dockerConfiguration.getParameters())); - return dockerBuilder.build(); - } - - /** - * Builds a ContainerInfo Object - * - * @return ContainerInfo - */ - public Protos.ContainerInfo getContainerInfo() { - Preconditions.checkArgument(cfg.getContainerInfo().isPresent(), "ContainerConfiguration doesn't exist!"); - MyriadContainerConfiguration containerConfiguration = cfg.getContainerInfo().get(); - Protos.ContainerInfo.Builder containerBuilder = Protos.ContainerInfo.newBuilder() - .setType(Protos.ContainerInfo.Type.valueOf(containerConfiguration.getType())) - .addAllVolumes(getVolumes(containerConfiguration.getVolumes())); - if (containerConfiguration.getDockerInfo().isPresent()) { - MyriadDockerConfiguration dockerConfiguration = containerConfiguration.getDockerInfo().get(); - containerBuilder.setDocker(getDockerInfo(dockerConfiguration)); - } - return containerBuilder.build(); - } - - /** * Helper function that returns all scalar resources of a given name in an offer up to a given value. Attempts to * take resource from the prescribed role first and then from the default role. The variable used indicated any @@ -152,7 +80,7 @@ public class TaskUtils { */ public Iterable<Protos.Resource> getScalarResource(Protos.Offer offer, String name, Double value, Double used) { String role = cfg.getFrameworkRole(); - List<Protos.Resource> resources = new ArrayList<Protos.Resource>(); + List<Protos.Resource> resources = new ArrayList<>(); double resourceDifference = 0; //used to determine the resource difference of value and the resources requested from role * //Find role by name, must loop through resources http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java index f0e80e9..c65ad4a 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java @@ -20,34 +20,24 @@ package org.apache.myriad.scheduler.event.handlers; import com.google.common.collect.Sets; import com.lmax.disruptor.EventHandler; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; + +import java.util.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import javax.inject.Inject; import org.apache.commons.collections.CollectionUtils; import org.apache.mesos.Protos; import org.apache.mesos.Protos.Offer; -import org.apache.mesos.Protos.OfferID; -import org.apache.mesos.Protos.Resource; import org.apache.mesos.Protos.TaskInfo; -import org.apache.mesos.Protos.Value; import org.apache.mesos.SchedulerDriver; +import org.apache.myriad.configuration.MyriadConfiguration; import org.apache.myriad.scheduler.SchedulerUtils; import org.apache.myriad.scheduler.ServiceResourceProfile; -import org.apache.myriad.scheduler.TaskConstraints; -import org.apache.myriad.scheduler.TaskConstraintsManager; import org.apache.myriad.scheduler.TaskFactory; -import org.apache.myriad.scheduler.TaskUtils; import org.apache.myriad.scheduler.constraints.Constraint; -import org.apache.myriad.scheduler.constraints.LikeConstraint; import org.apache.myriad.scheduler.event.ResourceOffersEvent; import org.apache.myriad.scheduler.fgs.OfferLifecycleManager; +import org.apache.myriad.scheduler.resource.ResourceOfferContainer; import org.apache.myriad.state.NodeTask; import org.apache.myriad.state.SchedulerState; import org.slf4j.Logger; @@ -61,26 +51,20 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv private static final Lock driverOperationLock = new ReentrantLock(); - private static final String RESOURCES_CPU_KEY = "cpus"; - private static final String RESOURCES_MEM_KEY = "mem"; - private static final String RESOURCES_PORTS_KEY = "ports"; - private static final String RESOURCES_DISK_KEY = "disk"; - - - @Inject private SchedulerState schedulerState; - - @Inject - private TaskUtils taskUtils; - - @Inject + private MyriadConfiguration cfg; private Map<String, TaskFactory> taskFactoryMap; - - @Inject private OfferLifecycleManager offerLifecycleMgr; + private String role; @Inject - private TaskConstraintsManager taskConstraintsManager; + public ResourceOffersEventHandler(SchedulerState schedulerState, MyriadConfiguration cfg, Map<String, TaskFactory> taskFactoryMap, OfferLifecycleManager offerLifecycleManager) { + this.schedulerState = schedulerState; + this.cfg = cfg; + this.taskFactoryMap = taskFactoryMap; + this.offerLifecycleMgr = offerLifecycleManager; + this.role = cfg.getFrameworkRole(); + } @Override public void onEvent(ResourceOffersEvent event, long sequence, boolean endOfBatch) throws Exception { @@ -128,19 +112,16 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv launchedTasks.addAll(schedulerState.getActiveTasksByType(taskPrefix)); launchedTasks.addAll(schedulerState.getStagingTasksByType(taskPrefix)); - if (matches(offer, taskToLaunch, constraint) && SchedulerUtils.isUniqueHostname(offer, taskToLaunch, launchedTasks)) { + ResourceOfferContainer resourceOfferContainer = new ResourceOfferContainer(offer, taskToLaunch.getProfile(), role); + if (SchedulerUtils.isUniqueHostname(offer, taskToLaunch, launchedTasks) + && resourceOfferContainer.satisfies(taskToLaunch.getProfile(), constraint)) { try { - final TaskInfo task = taskFactoryMap.get(taskPrefix).createTask(offer, schedulerState.getFrameworkID().get(), - pendingTaskId, taskToLaunch); - List<OfferID> offerIds = new ArrayList<>(); - offerIds.add(offer.getId()); - List<TaskInfo> tasks = new ArrayList<>(); - tasks.add(task); + final TaskInfo task = taskFactoryMap.get(taskPrefix).createTask(resourceOfferContainer, + schedulerState.getFrameworkID().get(), pendingTaskId, taskToLaunch); LOGGER.info("Launching task: {} using offer: {}", task.getTaskId().getValue(), offer.getId()); LOGGER.debug("Launching task: {} with profile: {} using offer: {}", task, profile, offer); - driver.launchTasks(offerIds, tasks); + driver.launchTasks(Collections.singleton(offer.getId()), Collections.singleton(task)); schedulerState.makeTaskStaging(pendingTaskId); - // For every NM Task that we launch, we currently // need to backup the ExecutorInfo for that NM Task in the State Store. // Without this, we will not be able to launch tasks corresponding to yarn @@ -179,122 +160,4 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv driverOperationLock.unlock(); } } - - private boolean matches(Offer offer, NodeTask taskToLaunch, Constraint constraint) { - if (!meetsConstraint(offer, constraint)) { - return false; - } - Map<String, Object> results = new HashMap<String, Object>(5); - //Assign default values to avoid NPE - results.put(RESOURCES_CPU_KEY, Double.valueOf(0.0)); - results.put(RESOURCES_MEM_KEY, Double.valueOf(0.0)); - results.put(RESOURCES_DISK_KEY, Double.valueOf(0.0)); - results.put(RESOURCES_PORTS_KEY, Integer.valueOf(0)); - - for (Resource resource : offer.getResourcesList()) { - if (resourceEvaluators.containsKey(resource.getName())) { - resourceEvaluators.get(resource.getName()).eval(resource, results); - } else { - LOGGER.warn("Ignoring unknown resource type: {}", resource.getName()); - } - } - double cpus = (Double) results.get(RESOURCES_CPU_KEY); - double mem = (Double) results.get(RESOURCES_MEM_KEY); - int ports = (Integer) results.get(RESOURCES_PORTS_KEY); - - checkResource(cpus <= 0, RESOURCES_CPU_KEY); - checkResource(mem <= 0, RESOURCES_MEM_KEY); - checkResource(ports <= 0, RESOURCES_PORTS_KEY); - - return checkAggregates(offer, taskToLaunch, ports, cpus, mem); - } - - private boolean checkAggregates(Offer offer, NodeTask taskToLaunch, int ports, double cpus, double mem) { - final ServiceResourceProfile profile = taskToLaunch.getProfile(); - final String taskPrefix = taskToLaunch.getTaskPrefix(); - final double aggrCpu = profile.getAggregateCpu() + profile.getExecutorCpu(); - final double aggrMem = profile.getAggregateMemory() + profile.getExecutorMemory(); - final TaskConstraints taskConstraints = taskConstraintsManager.getConstraints(taskPrefix); - if (aggrCpu <= cpus && aggrMem <= mem && taskConstraints.portsCount() <= ports) { - return true; - } else { - LOGGER.debug("Offer insufficient for task with, cpu: {}, memory: {}, ports: {}", aggrCpu, aggrMem, ports); - return false; - } - } - - private boolean meetsConstraint(Offer offer, Constraint constraint) { - if (constraint != null) { - switch (constraint.getType()) { - case LIKE: { - LikeConstraint likeConstraint = (LikeConstraint) constraint; - if (likeConstraint.isConstraintOnHostName()) { - return likeConstraint.matchesHostName(offer.getHostname()); - } else { - return likeConstraint.matchesSlaveAttributes(offer.getAttributesList()); - } - } - default: - return false; - } - } - return true; - } - - private void checkResource(boolean fail, String resource) { - if (fail) { - LOGGER.debug("No " + resource + " resources present"); - } - } - - private static Double scalarToDouble(Resource resource, String id) { - Double value = new Double(0.0); - if (resource.getType().equals(Value.Type.SCALAR)) { - value = new Double(resource.getScalar().getValue()); - } else { - LOGGER.error(id + " resource was not a scalar: {}", resource.getType().toString()); - } - return value; - } - - private interface EvalResources { - public void eval(Resource resource, Map<String, Object> results); - } - - private static Map<String, EvalResources> resourceEvaluators; - - static { - resourceEvaluators = new HashMap<String, EvalResources>(4); - resourceEvaluators.put(RESOURCES_CPU_KEY, new EvalResources() { - public void eval(Resource resource, Map<String, Object> results) { - results.put(RESOURCES_CPU_KEY, (Double) results.get(RESOURCES_CPU_KEY) + scalarToDouble(resource, RESOURCES_CPU_KEY)); - } - }); - resourceEvaluators.put(RESOURCES_MEM_KEY, new EvalResources() { - public void eval(Resource resource, Map<String, Object> results) { - results.put(RESOURCES_MEM_KEY, (Double) results.get(RESOURCES_MEM_KEY) + scalarToDouble(resource, RESOURCES_MEM_KEY)); - } - }); - resourceEvaluators.put(RESOURCES_DISK_KEY, new EvalResources() { - public void eval(Resource resource, Map<String, Object> results) { - } - }); - resourceEvaluators.put(RESOURCES_PORTS_KEY, new EvalResources() { - public void eval(Resource resource, Map<String, Object> results) { - int ports = 0; - if (resource.getType().equals(Value.Type.RANGES)) { - Value.Ranges ranges = resource.getRanges(); - for (Value.Range range : ranges.getRangeList()) { - if (range.getBegin() < range.getEnd()) { - ports += range.getEnd() - range.getBegin() + 1; - } - } - } else { - LOGGER.error("ports resource was not Ranges: {}", resource.getType().toString()); - - } - results.put(RESOURCES_PORTS_KEY, (Integer) results.get(RESOURCES_PORTS_KEY) + Integer.valueOf(ports)); - } - }); - } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/resource/RangeResource.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/resource/RangeResource.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/resource/RangeResource.java new file mode 100644 index 0000000..45258f0 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/resource/RangeResource.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.resource; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.mesos.Protos; + +import java.util.*; + +/** + * Mutable POJO for handling RangeResources, specifically ports. + */ + +public class RangeResource { + private String name; + private List<Range> ranges = new ArrayList<>(); + private Long numValues = 0L; + private Long numDefaultValues = 0L; + + private String role; + @VisibleForTesting //This way we can set a seed to get deterministic values + private Random random = new Random(102); + + public RangeResource(String name, String role) { + this.name = name; + this.role = role; + } + + public boolean satisfies(Collection<Long> requestedValues) { + if (requestedValues.size() > numValues) { + return false; + } + List<Long> tmp = new ArrayList<>(); + tmp.addAll(requestedValues); + tmp.removeAll(Collections.singleton(0L)); + for (Long val : tmp) { + if (!contains(val)) { + return false; + } + } + return true; + } + + public boolean contains(Long value) { + for (Range range: ranges) { + if (range.contains(value)) { + return true; + } + } + return false; + } + + public List<Long> getValues() { + List<Long> ret = new ArrayList<>(); + for (Range range: ranges) { + ret.addAll(range.allValues()); + } + return ret; + } + + public void addRanges(List <Protos.Value.Range> ranges, Boolean withRole) { + for (Protos.Value.Range range : ranges) { + long tb = range.getBegin(); + long te = range.getEnd(); + this.ranges.add(new Range(tb, te, withRole)); + numValues += (te - tb + 1); + if (!withRole) { + numDefaultValues += (te - tb + 1); + } + } + } + + public List<Protos.Resource> consumeResource(Collection<Long> requestedValues) { + Preconditions.checkState(satisfies(requestedValues)); + List<Protos.Resource> resources = new ArrayList<>(); + List<Long> nonZeros = new ArrayList<>(); + nonZeros.addAll(requestedValues); + nonZeros.removeAll(Collections.singleton(0L)); + for (Long value : nonZeros) { + resources.add(createResource(value, hasRole(value))); + } + List<Long> randomValues = getRandomValues(requestedValues.size() - nonZeros.size()); + for (Long value: randomValues) { + resources.add(createResource(value, false)); + } + return resources; + } + + private Protos.Resource createResource(Long value, Boolean withRole) { + Preconditions.checkState(removeValue(value), "Value " + value + " doesn't exist"); + Protos.Resource.Builder builder = Protos.Resource.newBuilder() + .setName(name) + .setType(Protos.Value.Type.RANGES) + .setRanges(Protos.Value.Ranges.newBuilder() + .addRange(Protos.Value.Range.newBuilder() + .setBegin(value) + .setEnd(value) + .build() + ) + ); + if (withRole) { + builder.setRole(role); + } + return builder.build(); + } + + private List<Long> getRandomValues(int size) { + //can improve this + List<Integer> sample = new ArrayList<>(size); + while (sample.size() < size) { + int rand = random.nextInt(numDefaultValues.intValue()); + if (!sample.contains(rand)) { + sample.add(rand); + } + } + Collections.sort(sample); + + long location = 0; + long lastLocation = 0; + int j = 0; + List<Long> elems = new ArrayList<>(); + for (Range range : ranges) { + if (!range.role) { + long tb = range.begin; + long te = range.end; + location += te - tb + 1; + for (int i = j; i < sample.size(); i++) { + long val = sample.get(i); + if (val < location) { + elems.add(tb + val - lastLocation); + j++; + } else { + lastLocation = location; + break; + } + } + } + } + return elems; + } + + private boolean removeValue(Long value) { + for (Range range : ranges) { + if (range.contains(value)) { + ranges.remove(range); + long begin = range.begin; + long end = range.end; + if (value != begin && value != end) { + ranges.add(new Range(begin, value - 1, range.role)); + ranges.add(new Range(value + 1, end, range.role)); + return true; + } else if (value == begin && value != end) { + ranges.add(new Range(value + 1, end, range.role)); + return true; + } else if (value == end && value != begin) { + ranges.add(new Range(begin, value - 1, range.role)); + return true; + } else { + return true; + } + } + } + return false; + } + + private boolean hasRole(Long value) { + for (Range range : ranges) { + if (range.contains(value)) { + return range.role; + } + } + return false; + } + + private static class Range { + Long begin; + Long end; + Boolean role; + + public Range(Long begin, Long end, Boolean role){ + this.begin = begin; + this.end = end; + this.role = role; + } + public Collection<Long> allValues() { + List<Long> ret = new ArrayList<>(); + for (long i = begin; i <= end; i++) { + ret.add(i); + } + return ret; + } + public Boolean contains(Long value) { + return (value >= begin && value <= end); + } + + public String toString() { + return "(" + begin + "," + end + ")"; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/resource/ResourceOfferContainer.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/resource/ResourceOfferContainer.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/resource/ResourceOfferContainer.java new file mode 100644 index 0000000..13efbc2 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/resource/ResourceOfferContainer.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.resource; + +import com.google.common.base.Preconditions; +import org.apache.mesos.Protos; +import org.apache.myriad.scheduler.ServiceResourceProfile; +import org.apache.myriad.scheduler.constraints.Constraint; +import org.apache.myriad.scheduler.constraints.LikeConstraint; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +/** + * Container class to get and keep track of mesos resources + */ +public class ResourceOfferContainer { + private static final Logger LOGGER = LoggerFactory.getLogger(ResourceOfferContainer.class); + private static final String RESOURCE_CPUS = "cpus"; + private static final String RESOURCE_MEM = "mem"; + private static final String RESOURCE_PORTS = "ports"; + + private HashMap<String, ScalarResource> scalarValues = new HashMap<>(); + private HashMap<String, RangeResource> rangeValues = new HashMap<>(); + + private Protos.Offer offer; + private String role; + + /** + * Constructor takes an offer and profile and constructs a mutable POJO to handle resource offers. + * + * @param offer Mesos.Protos.Offer + * @param profile ServiceResourceProfile + */ + public ResourceOfferContainer(Protos.Offer offer, ServiceResourceProfile profile, String role) { + this.offer = offer; + this.role = role; + setScalarValues(); + //ports = new RangeResource(offer, RESOURCE_PORTS, profile.getPorts().values(), role); + } + + /** + * returns the hostname contained in the offer + * + * @return hostname + */ + public String getHostName() { + return offer.getHostname(); + } + + public String getOfferId() { + return offer.getId().getValue(); + } + + public Protos.SlaveID getSlaveId() { + return offer.getSlaveId(); + } + + public double getScalarValue(String name) { + return scalarValues.get(name).getTotalValue(); + } + + public double getCpus() { + return getScalarValue(RESOURCE_CPUS); + } + + public double getMem() { + return getScalarValue(RESOURCE_MEM); + } + + public List<Long> getPorts() { + return rangeValues.get(RESOURCE_PORTS).getValues(); + } + + /** + * Returns true if the offer meets the profile resource needs + * + * @param profile + * @return + */ + public boolean satisfies(ServiceResourceProfile profile) { + return scalarValues.containsKey(RESOURCE_CPUS) && scalarValues.get(RESOURCE_CPUS).satisfies(profile.getAggregateCpu()) && + scalarValues.containsKey(RESOURCE_MEM) && scalarValues.get(RESOURCE_MEM).satisfies(profile.getAggregateMemory()) && + rangeValues.containsKey(RESOURCE_PORTS) && rangeValues.get(RESOURCE_PORTS).satisfies(profile.getPorts().values()); + } + + /** + * Returns true if offer meets the profile resource needs AND the task constaint (an attibritute of hostname) + * + * @param profile + * @param constraint + * @return + */ + public boolean satisfies(ServiceResourceProfile profile, Constraint constraint) { + return satisfies(profile) && meetsConstraint(constraint); + } + + private boolean meetsConstraint(Constraint constraint) { + if (constraint != null) { + switch (constraint.getType()) { + case LIKE: { + LikeConstraint likeConstraint = (LikeConstraint) constraint; + if (likeConstraint.isConstraintOnHostName()) { + return likeConstraint.matchesHostName(offer.getHostname()); + } else { + return likeConstraint.matchesSlaveAttributes(offer.getAttributesList()); + } + } + default: + return false; + } + } + return true; + } + + private List<Protos.Resource> consumeScalarResource(String name, Double value) { + Preconditions.checkState(scalarValues.containsKey(name)); + return scalarValues.get(name).consumeResource(value); + } + + /** + * Returns a list of CPU Resources meeting the requested value. + * Decrements the available CPU resources available in the offer. + * Uses Preconditions the ensure value is not more that the amount the offer has. + * + * @param value + * @return List<Protos.Resource> + */ + public List<Protos.Resource> consumeCpus(Double value) { + return consumeScalarResource(RESOURCE_CPUS, value); + } + + /** + * Returns a list of MEM Resources meeting the requested value. + * Decrements the available MEM resources available in the offer. + * Uses Preconditions the ensure value is not more that the amount the offer has. + * + * @param value + * @return List<Protos.Resource> + */ + public List<Protos.Resource> consumeMem(Double value) { + return consumeScalarResource(RESOURCE_MEM, value); + } + + /** + * Returns a list of Range Resources meeting the requestedvalues. + * Removes the requested values from the available range resources available in the offer. + * Uses Preconditions the ensure values are contained in the offer. + * + * @param requestedValues + * @return List<Protos.Resource> + */ + public List<Protos.Resource> consumePorts(Collection<Long> requestedValues) { + return rangeValues.get(RESOURCE_PORTS).consumeResource(requestedValues); + } + + private void setScalarValues() { + for (Protos.Resource r : offer.getResourcesList()) { + if (r.hasScalar() && r.hasName() && r.hasRole() && r.getRole().equals(role)) { + addToScalarResource(r.getName(), r.getScalar().getValue(), true); + } else if (r.hasName() && r.hasScalar()) { + addToScalarResource(r.getName(), r.getScalar().getValue(), false); + } else if (r.hasRanges() && r.hasName() && r.hasRole() && r.getRole().equals(role)) { + addToRangeResource(r.getName(), r.getRanges().getRangeList(), true); + } else if (r.hasRanges() && r.hasName()) { + addToRangeResource(r.getName(), r.getRanges().getRangeList(), false); + } + } + } + + private void addToScalarResource(String name, Double value, Boolean hasRole) { + if (scalarValues.containsKey(name)) { + scalarValues.get(name).incrementValue(value, hasRole); + } else { + scalarValues.put(name, new ScalarResource(name, role)); + scalarValues.get(name).incrementValue(value, hasRole); + } + } + + private void addToRangeResource(String name, List<Protos.Value.Range> values , Boolean hasRole) { + if (rangeValues.containsKey(name)) { + rangeValues.get(name).addRanges(values, hasRole); + } else { + rangeValues.put(name, new RangeResource(name, role)); + rangeValues.get(name).addRanges(values, hasRole); + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/resource/ScalarResource.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/resource/ScalarResource.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/resource/ScalarResource.java new file mode 100644 index 0000000..a93ecee --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/resource/ScalarResource.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler.resource; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.mesos.Protos; + +import java.util.ArrayList; +import java.util.List; + +/** + * Mutable POJO for handling Scalar Resources + */ +@VisibleForTesting +class ScalarResource { + double defaultValue = 0.0; + double roleValue = 0.0; + String name; + String role; + + public ScalarResource(String name, String role) { + this.name = name; + this.role = role; + } + + public void incrementValue(Double value, Boolean role) { + if (role) { + roleValue += value; + } else { + defaultValue += value; + } + } + + public Double getTotalValue() { + return defaultValue + roleValue; + } + + public Boolean satisfies(Double value) { + return defaultValue + roleValue >= value; + } + + public List<Protos.Resource> consumeResource(Double value) { + Preconditions.checkState(roleValue + defaultValue >= value, String.format("%s value requested: %f, greater " + + "than amount held %f", name, value, roleValue + defaultValue)); + List<Protos.Resource> resources = new ArrayList<>(); + if (roleValue >= value) { + roleValue -= value; + resources.add(createResource(name, value, true)); + } else if (roleValue + defaultValue >= value && roleValue > 0) { + resources.add(createResource(name, roleValue, true)); + resources.add(createResource(name, value - roleValue, false)); + defaultValue -= (value - roleValue); + roleValue = 0; + } else if (roleValue + defaultValue >= value) { + resources.add(createResource(name, value, false)); + defaultValue -= value; + } + return resources; + } + + private Protos.Resource createResource(String name, Double value, boolean withRole) { + Protos.Resource.Builder builder = Protos.Resource.newBuilder() + .setName(name) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(value)) + .setType(Protos.Value.Type.SCALAR); + if (withRole) { + builder.setRole(role); + } + return builder.build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java index 9069c1a..5251f19 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java @@ -66,7 +66,6 @@ public class MyriadFairScheduler extends FairScheduler { rmContext.getDispatcher().register(RMNodeEventType.class, rmNodeEventHandler); super.setRMContext(rmContext); } - /** * ******** Methods overridden from YARN {@link FairScheduler} ********************* */ http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java b/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java index fbfb0d2..dc7a00b 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java +++ b/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java @@ -26,7 +26,6 @@ import java.io.*; import java.util.*; import org.apache.myriad.configuration.*; import org.apache.myriad.scheduler.*; -import org.apache.myriad.scheduler.TaskFactory.*; import org.slf4j.*; /** @@ -58,7 +57,7 @@ public class MyriadTestModule extends AbstractModule { bind(MyriadConfiguration.class).toInstance(cfg); MapBinder<String, TaskFactory> mapBinder = MapBinder.newMapBinder(binder(), String.class, TaskFactory.class); - mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactoryImpl.class).in(Scopes.SINGLETON); + mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactory.class).in(Scopes.SINGLETON); Map<String, ServiceConfiguration> auxServicesConfigs = cfg.getServiceConfigurations(); for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) { String taskFactoryClass = entry.getValue().getTaskFactoryImplName().orNull(); @@ -70,7 +69,7 @@ public class MyriadTestModule extends AbstractModule { e.printStackTrace(); } } else { - mapBinder.addBinding(entry.getKey()).to(ServiceTaskFactoryImpl.class).in(Scopes.SINGLETON); + mapBinder.addBinding(entry.getKey()).to(ServiceTaskFactory.class).in(Scopes.SINGLETON); } } } @@ -78,14 +77,7 @@ public class MyriadTestModule extends AbstractModule { @Provides @Singleton ExecutorCommandLineGenerator providesCLIGenerator(MyriadConfiguration cfg) { - ExecutorCommandLineGenerator cliGenerator = null; - MyriadExecutorConfiguration myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration(); - if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) { - cliGenerator = new DownloadNMExecutorCLGenImpl(cfg, myriadExecutorConfiguration.getNodeManagerUri().get()); - } else { - cliGenerator = new NMExecutorCLGenImpl(cfg); - } - return cliGenerator; + return new NMExecutorCommandLineGenerator(cfg); } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java index e57b128..e0eda0f 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java +++ b/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java @@ -16,6 +16,8 @@ import org.apache.myriad.state.SchedulerState; import org.junit.Before; import org.junit.Test; +import java.util.TreeMap; + /** * Unit tests for SchedulerStateResource */ @@ -34,10 +36,11 @@ public class SchedulerStateResourceTest extends BaseConfigurableTest { idOne = Protos.TaskID.newBuilder().setValue("nt-1").build(); idTwo = Protos.TaskID.newBuilder().setValue("nt-2").build(); idThree = Protos.TaskID.newBuilder().setValue("nt-3").build(); + TreeMap<String, Long> ports = new TreeMap<>(); - state.addTask(idOne, new NodeTask(new ServiceResourceProfile("profile1", 0.2, 1024.0), new LikeConstraint("localhost", "host-[0-9]*.example.com"))); - state.addTask(idTwo, new NodeTask(new ServiceResourceProfile("profile2", 0.4, 2048.0), new LikeConstraint("localhost", "host-[0-9]*.example.com"))); - state.addTask(idThree, new NodeTask(new ServiceResourceProfile("profile3", 0.6, 3072.0), new LikeConstraint("localhost", "host-[0-9]*.example.com"))); + state.addTask(idOne, new NodeTask(new ServiceResourceProfile("profile1", 0.2, 1024.0, ports), new LikeConstraint("localhost", "host-[0-9]*.example.com"))); + state.addTask(idTwo, new NodeTask(new ServiceResourceProfile("profile2", 0.4, 2048.0, ports), new LikeConstraint("localhost", "host-[0-9]*.example.com"))); + state.addTask(idThree, new NodeTask(new ServiceResourceProfile("profile3", 0.6, 3072.0, ports), new LikeConstraint("localhost", "host-[0-9]*.example.com"))); state.setFrameworkId(FrameworkID.newBuilder().setValue("mock-framework").build()); state.makeTaskActive(idOne); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MockSchedulerDriver.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MockSchedulerDriver.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MockSchedulerDriver.java index 2a60e58..80c4b89 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MockSchedulerDriver.java +++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MockSchedulerDriver.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.myriad.scheduler; import java.util.Collection; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadDriverTest.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadDriverTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadDriverTest.java index d384150..3538a34 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadDriverTest.java +++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadDriverTest.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.myriad.scheduler; import static org.junit.Assert.assertEquals; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java index 78f3627..29087e7 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java +++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.myriad.scheduler; import static org.junit.Assert.assertEquals; @@ -19,6 +36,8 @@ import org.apache.myriad.webapp.MyriadWebServer; import org.junit.Before; import org.junit.Test; +import java.util.TreeMap; + /** * Unit tests for MyriadOperations class */ @@ -52,7 +71,8 @@ public class MyriadOperationsTest extends BaseConfigurableTest { } private void generateProfiles() { - small = new ServiceResourceProfile("small", 0.1, 512.0); + TreeMap<String, Long> ports = new TreeMap<>(); + small = new ServiceResourceProfile("small", 0.1, 512.0, ports); } @Test http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/NMProfileManagerTest.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/NMProfileManagerTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/NMProfileManagerTest.java index a0aab74..d102090 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/NMProfileManagerTest.java +++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/NMProfileManagerTest.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.myriad.scheduler; import static org.junit.Assert.assertEquals; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsSpec.groovy ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsSpec.groovy b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsSpec.groovy index e394dd8..6555f04 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsSpec.groovy +++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsSpec.groovy @@ -53,8 +53,8 @@ class SchedulerUtilsSpec extends Specification { given: def state = Mock(SchedulerState) def tasks = [] - def fgsNMTask = new NodeTask(new ExtendedResourceProfile(new NMProfile("zero", 0, 0), 1.0, 2.0), null) - def cgsNMTask = new NodeTask(new ExtendedResourceProfile(new NMProfile("low", 2, 4096), 1.0, 2.0), null) + def fgsNMTask = new NodeTask(new ExtendedResourceProfile(new NMProfile("zero", 0, 0), 1.0, 2.0, new HashMap<String, Long>()), null) + def cgsNMTask = new NodeTask(new ExtendedResourceProfile(new NMProfile("low", 2, 4096), 1.0, 2.0, new HashMap<String, Long>()), null) fgsNMTask.setHostname("test_fgs_hostname") cgsNMTask.setHostname("test_cgs_hostname") tasks << fgsNMTask << cgsNMTask @@ -82,7 +82,7 @@ class SchedulerUtilsSpec extends Specification { NodeTask createNodeTask(String hostname) { - def node = new NodeTask(new ExtendedResourceProfile(new NMProfile("", 1, 1), 1.0, 1.0), null) + def node = new NodeTask(new ExtendedResourceProfile(new NMProfile("", 1, 1), 1.0, 1.0, new HashMap<String, Long>()), null) node.hostname = hostname node.taskPrefix = "nm" node http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/ServiceResourceProfileTest.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/ServiceResourceProfileTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/ServiceResourceProfileTest.java index 72e0092..7623cba 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/ServiceResourceProfileTest.java +++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/ServiceResourceProfileTest.java @@ -1,9 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.myriad.scheduler; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.TreeMap; + /** * Unit test cases for ServiceResourceProfile * @@ -13,15 +32,14 @@ public class ServiceResourceProfileTest { @Before public void setUp() throws Exception { - profile = new ServiceResourceProfile("ServiceResourceProfile", 0.1, 1024.0, 0.1, 512.0); + TreeMap<String, Long> ports = new TreeMap<>(); + profile = new ServiceResourceProfile("ServiceResourceProfile", 0.1, 1024.0, ports); } @Test public void testRequestedResources() throws Exception { Assert.assertEquals(new Double(0.1), profile.getCpus()); Assert.assertEquals(new Double(1024.0), profile.getMemory()); - Assert.assertEquals(new Double(0.1), profile.getExecutorCpu()); - Assert.assertEquals(new Double(512.0), profile.getExecutorMemory()); } @Test http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TMSTaskFactoryImpl.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TMSTaskFactoryImpl.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TMSTaskFactoryImpl.java index 17e043f..4ad11a5 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TMSTaskFactoryImpl.java +++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TMSTaskFactoryImpl.java @@ -22,15 +22,15 @@ import javax.inject.Inject; import org.apache.mesos.Protos.CommandInfo; import org.apache.mesos.Protos.ExecutorInfo; import org.apache.mesos.Protos.FrameworkID; -import org.apache.mesos.Protos.Offer; import org.apache.mesos.Protos.TaskID; import org.apache.mesos.Protos.TaskInfo; import org.apache.myriad.configuration.MyriadConfiguration; +import org.apache.myriad.scheduler.resource.ResourceOfferContainer; /** * Test implementation of TaskFactory */ -public class TMSTaskFactoryImpl implements TaskFactory { +public class TMSTaskFactoryImpl extends TaskFactory { private MyriadConfiguration cfg; private TaskUtils taskUtils; @@ -42,7 +42,7 @@ public class TMSTaskFactoryImpl implements TaskFactory { } @Override - public TaskInfo createTask(Offer offer, FrameworkID frameworkId, TaskID taskId, org.apache.myriad.state.NodeTask nodeTask) { + public TaskInfo createTask(ResourceOfferContainer offer, FrameworkID frameworkId, TaskID taskId, org.apache.myriad.state.NodeTask nodeTask) { return null; } @@ -63,7 +63,8 @@ public class TMSTaskFactoryImpl implements TaskFactory { } @Override - public ExecutorInfo getExecutorInfoForSlave(FrameworkID frameworkId, Offer offer, CommandInfo commandInfo) { + public ExecutorInfo getExecutorInfoForSlave(ResourceOfferContainer resourceOfferContainer, FrameworkID frameworkId, + CommandInfo commandInfo) { return null; } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TaskConstraintsManagerTest.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TaskConstraintsManagerTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TaskConstraintsManagerTest.java deleted file mode 100644 index 94946ce..0000000 --- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TaskConstraintsManagerTest.java +++ /dev/null @@ -1,32 +0,0 @@ -package org.apache.myriad.scheduler; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import org.apache.myriad.BaseConfigurableTest; -import org.junit.Before; -import org.junit.Test; - -/** - * Unit tests for TaskConstraintsManager - */ -public class TaskConstraintsManagerTest extends BaseConfigurableTest { - TaskConstraintsManager manager = new TaskConstraintsManager(); - - @Before - public void setUp() throws Exception { - super.setUp(); - manager.addTaskConstraints("jobhistory", new ServiceTaskConstraints(cfg, "jobhistory")); - } - - @Test - public void testAddConstraints() throws Exception { - assertTrue(manager.exists("jobhistory")); - } - - @Test - public void testGetConstraints() throws Exception { - TaskConstraints tCon = manager.getConstraints("jobhistory"); - assertEquals(3, tCon.portsCount()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestNMTaskFactory.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestNMTaskFactory.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestNMTaskFactory.java new file mode 100644 index 0000000..39d6052 --- /dev/null +++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestNMTaskFactory.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.myriad.scheduler; + +import org.apache.mesos.Protos; +import org.apache.myriad.BaseConfigurableTest; +import org.apache.myriad.scheduler.offer.OfferBuilder; +import org.apache.myriad.scheduler.resource.ResourceOfferContainer; +import org.apache.myriad.state.NodeTask; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + + +/** + * Tests for NMTaskFactory Class + */ +public class TestNMTaskFactory extends BaseConfigurableTest { + static Protos.FrameworkID frameworkId = Protos.FrameworkID.newBuilder().setValue("test").build(); + + @Test + public void testNMTaskFactory() { + NMExecutorCommandLineGenerator clGenerator = new NMExecutorCommandLineGenerator(cfgWithDocker); + TaskUtils taskUtils = new TaskUtils(cfgWithDocker); + Protos.Offer offer = new OfferBuilder("test.com") + .addScalarResource("cpus", 10.0) + .addScalarResource("mem", 16000) + .addRangeResource("ports", 3500, 3505) + .build(); + ServiceResourceProfile profile = new ExtendedResourceProfile(new NMProfile("tooMuchCpu", 7L, 8000L), taskUtils.getNodeManagerCpus(), + taskUtils.getNodeManagerMemory(), taskUtils.getNodeManagerPorts()); + NodeTask nodeTask = new NodeTask(profile, null); + ResourceOfferContainer roc = new ResourceOfferContainer(offer, profile, null); + NMTaskFactory taskFactory = new NMTaskFactory(cfgWithDocker, taskUtils, clGenerator); + Protos.TaskInfo taskInfo = taskFactory.createTask(roc, frameworkId, makeTaskId("nm.zero"), nodeTask); + assertFalse("taskInfo should not have a container", taskInfo.hasContainer()); + assertTrue("The container should have an executor", taskInfo.hasExecutor()); + Protos.ExecutorInfo executorInfo = taskInfo.getExecutor(); + assertTrue("executorInfo should have container", executorInfo.hasContainer()); + Protos.ContainerInfo containerInfo = executorInfo.getContainer(); + assertTrue("There should be two volumes", containerInfo.getVolumesCount() == 2); + assertTrue("The first volume should be read only", containerInfo.getVolumes(0).getMode().equals(Protos.Volume.Mode.RO)); + assertTrue("The first volume should be read write", containerInfo.getVolumes(1).getMode().equals(Protos.Volume.Mode.RW)); + assertTrue("There should be a docker image", containerInfo.getDocker().hasImage()); + assertTrue("The docker image should be mesos/myraid", containerInfo.getDocker().getImage().equals("mesos/myriad")); + assertTrue("Should be using host networking", containerInfo.getDocker().getNetwork().equals(Protos.ContainerInfo.DockerInfo.Network.HOST)); + assertTrue("There should be two parameters", containerInfo.getDocker().getParametersList().size() == 2); + assertTrue("Privledged mode should be false", !containerInfo.getDocker().getPrivileged()); + } + + private Protos.TaskID makeTaskId(String taskId) { + return Protos.TaskID.newBuilder().setValue(taskId).build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7aea259c/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestRandomPorts.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestRandomPorts.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestRandomPorts.java deleted file mode 100644 index dd36436..0000000 --- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestRandomPorts.java +++ /dev/null @@ -1,203 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.myriad.scheduler; - - -import com.google.common.collect.Lists; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import org.apache.mesos.Protos; -import org.apache.mesos.Protos.Resource; -import org.apache.mesos.Protos.Value.Range; -import org.apache.mesos.Protos.Value.Ranges; -import org.apache.myriad.scheduler.TaskFactory.NMTaskFactoryImpl; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * Test Class to test NM ports randomization - * - */ -public class TestRandomPorts { - - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - } - - @Test - public void testRandomPorts() { - Range range1 = Range.newBuilder().setBegin(100).setEnd(200).build(); - Range range2 = Range.newBuilder().setBegin(250).setEnd(300).build(); - Range range3 = Range.newBuilder().setBegin(310).setEnd(500).build(); - Range range4 = Range.newBuilder().setBegin(520).setEnd(720).build(); - Range range5 = Range.newBuilder().setBegin(750).setEnd(1000).build(); - - Ranges ranges = Ranges.newBuilder().addRange(range1) - .addRange(range2) - .addRange(range3) - .addRange(range4) - .addRange(range5).build(); - - - Resource resource = Resource.newBuilder().setType(Protos.Value.Type.RANGES).setRanges(ranges).setName("ports").build(); - - Set<Long> ports = NMTaskFactoryImpl.getNMPorts(resource); - - assertEquals(NMPorts.expectedNumPorts(), ports.size()); - List<Long> sortedList = Lists.newArrayList(ports); - - Collections.sort(sortedList); - - for (Long port : sortedList) { - assertTrue((port >= 100 && port <= 200) || - (port >= 250 && port <= 300) || - (port >= 310 && port <= 500) || - (port >= 520 && port <= 720) || - (port >= 750 && port <= 1000)); - } - } - - @Test - public void testRandomPortsNotEnough() { - Range range1 = Range.newBuilder().setBegin(100).setEnd(200).build(); - Range range2 = Range.newBuilder().setBegin(250).setEnd(300).build(); - - Ranges ranges = Ranges.newBuilder().addRange(range1) - .addRange(range2) - .build(); - - - Resource resource = Resource.newBuilder().setType(Protos.Value.Type.RANGES).setRanges(ranges).setName("ports").build(); - - Set<Long> ports = NMTaskFactoryImpl.getNMPorts(resource); - - assertEquals(NMPorts.expectedNumPorts(), ports.size()); - List<Long> sortedList = Lists.newArrayList(ports); - - Collections.sort(sortedList); - - for (Long port : sortedList) { - assertTrue((port >= 100 && port <= 200) || - (port >= 250 && port <= 300)); - } - } - - @Test - public void testRandomPortsNotEnoughPercentKickIn() { - Range range1 = Range.newBuilder().setBegin(100).setEnd(200).build(); - Range range2 = Range.newBuilder().setBegin(250).setEnd(335).build(); - - Ranges ranges = Ranges.newBuilder().addRange(range1) - .addRange(range2) - .build(); - - - Resource resource = Resource.newBuilder().setType(Protos.Value.Type.RANGES).setRanges(ranges).setName("ports").build(); - - Set<Long> ports = NMTaskFactoryImpl.getNMPorts(resource); - - assertEquals(NMPorts.expectedNumPorts(), ports.size()); - List<Long> sortedList = Lists.newArrayList(ports); - - Collections.sort(sortedList); - - for (int i = 0; i < sortedList.size(); i++) { - assertTrue((sortedList.get(i) >= 100 && sortedList.get(i) <= 200) || - (sortedList.get(i) >= 250 && sortedList.get(i) <= 335)); - } - } - - @Test - public void testRandomPortsLargeRange() { - Range range1 = Range.newBuilder().setBegin(100).setEnd(500).build(); - Range range2 = Range.newBuilder().setBegin(550).setEnd(835).build(); - - Ranges ranges = Ranges.newBuilder().addRange(range1) - .addRange(range2) - .build(); - - - Resource resource = Resource.newBuilder().setType(Protos.Value.Type.RANGES).setRanges(ranges).setName("ports").build(); - - Set<Long> ports = NMTaskFactoryImpl.getNMPorts(resource); - - assertEquals(NMPorts.expectedNumPorts(), ports.size()); - List<Long> sortedList = Lists.newArrayList(ports); - - Collections.sort(sortedList); - - for (int i = 0; i < sortedList.size(); i++) { - assertTrue((sortedList.get(i) >= 100 && sortedList.get(i) <= 500) || - (sortedList.get(i) >= 550 && sortedList.get(i) <= 835)); - } - } - - @Test - public void testRandomPortsSmallRange() { - Range range1 = Range.newBuilder().setBegin(100).setEnd(100).build(); - Range range2 = Range.newBuilder().setBegin(110).setEnd(115).build(); - - Ranges ranges = Ranges.newBuilder().addRange(range1) - .addRange(range2) - .build(); - - Resource resource = Resource.newBuilder().setType(Protos.Value.Type.RANGES).setRanges(ranges).setName("ports").build(); - - Set<Long> ports = NMTaskFactoryImpl.getNMPorts(resource); - - assertEquals(NMPorts.expectedNumPorts(), ports.size()); - List<Long> sortedList = Lists.newArrayList(ports); - - Collections.sort(sortedList); - - for (int i = 0; i < sortedList.size(); i++) { - assertTrue(sortedList.get(i) == 100 || (sortedList.get(i) <= 115 && sortedList.get(i) >= 110)); - } - } - - @Test - public void notEnoughPorts() throws Exception { - Range range1 = Range.newBuilder().setBegin(100).setEnd(100).build(); - Range range2 = Range.newBuilder().setBegin(110).setEnd(111).build(); - - Ranges ranges = Ranges.newBuilder().addRange(range1) - .addRange(range2) - .build(); - - Resource resource = Resource.newBuilder().setType(Protos.Value.Type.RANGES).setRanges(ranges).setName("ports").build(); - - try { - NMTaskFactoryImpl.getNMPorts(resource); - fail("Should fail, as number of ports is not enough"); - } catch (IllegalStateException ise) { - // should get here - } - - } -}