Cleaned up flexdown code - Fixed a bunch of edge cases esp. when constraint is not specified in the flex down request - Refactored a code to make it more readable; flexdown now happens in 3 (readable) steps: - flexdown pending tasks matching the given profile, constraints - flexdown staging tasks matching the given profile, constraints - flexdown active tasks matching the given profile, constraints - Minor cleanups of other classes wrt logging and removal of unused methods.
Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/34e3958c Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/34e3958c Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/34e3958c Branch: refs/heads/master Commit: 34e3958c4134b079fbc766f4fe4d2f88b42840f4 Parents: e8ec517 Author: Santosh Marella <smare...@maprtech.com> Authored: Tue Oct 13 13:17:05 2015 -0700 Committer: Santosh Marella <mare...@gmail.com> Committed: Thu Oct 15 12:56:46 2015 -0700 ---------------------------------------------------------------------- .../com/ebay/myriad/api/ClustersResource.java | 16 ++- .../myriad/policy/LeastAMNodesFirstPolicy.java | 47 ++++--- .../ebay/myriad/policy/NodeScaleDownPolicy.java | 10 +- .../ebay/myriad/scheduler/MyriadOperations.java | 132 ++++++++----------- .../ebay/myriad/scheduler/SchedulerUtils.java | 24 ---- .../scheduler/constraints/LikeConstraint.java | 4 +- .../handlers/ResourceOffersEventHandler.java | 11 +- .../com/ebay/myriad/state/SchedulerState.java | 53 +++++--- .../constraints/LikeConstraintSpec.groovy | 12 ++ 9 files changed, 148 insertions(+), 161 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java b/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java index f2a3018..9f47b51 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java @@ -19,6 +19,7 @@ import com.codahale.metrics.annotation.Timed; import com.ebay.myriad.api.model.FlexDownClusterRequest; import com.ebay.myriad.api.model.FlexUpClusterRequest; import com.ebay.myriad.scheduler.MyriadOperations; +import com.ebay.myriad.scheduler.NMProfile; import com.ebay.myriad.scheduler.NMProfileManager; import com.ebay.myriad.scheduler.constraints.ConstraintFactory; import com.ebay.myriad.state.SchedulerState; @@ -108,11 +109,11 @@ public class ClustersResource { isValidRequest = isValidRequest && validateInstances(instances, response); isValidRequest = isValidRequest && validateConstraints(constraints, response); - Integer numFlexedUp = this.getNumFlexedupNMs(); + Integer numFlexedUp = this.getNumFlexedupNMs(profile); if (isValidRequest && numFlexedUp < instances) { String message = String.format("Number of requested instances for flexdown is greater than the number of " + - "Node Managers previously flexed up. Requested: %d, Previously flexed Up: %d. " + - "Only %d Node Managers will be flexed down", instances, numFlexedUp, numFlexedUp); + "Node Managers previously flexed up for profile '%s'. Requested: %d, Previously flexed Up: %d. " + + "Only %d Node Managers will be flexed down.", profile, instances, numFlexedUp, numFlexedUp); response.entity(message); LOGGER.warn(message); } @@ -203,10 +204,11 @@ public class ClustersResource { } - private Integer getNumFlexedupNMs() { - return this.schedulerState.getActiveTaskIds().size() - + this.schedulerState.getStagingTaskIds().size() - + this.schedulerState.getPendingTaskIds().size(); + private Integer getNumFlexedupNMs(String profile) { + NMProfile nmProfile = profileManager.get(profile); + return this.schedulerState.getActiveTaskIDsForProfile(nmProfile).size() + + this.schedulerState.getStagingTaskIDsForProfile(nmProfile).size() + + this.schedulerState.getPendingTaskIDsForProfile(nmProfile).size(); } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/main/java/com/ebay/myriad/policy/LeastAMNodesFirstPolicy.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/policy/LeastAMNodesFirstPolicy.java b/myriad-scheduler/src/main/java/com/ebay/myriad/policy/LeastAMNodesFirstPolicy.java index 38b14a7..568247b 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/policy/LeastAMNodesFirstPolicy.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/policy/LeastAMNodesFirstPolicy.java @@ -2,7 +2,7 @@ package com.ebay.myriad.policy; import com.ebay.myriad.scheduler.yarn.interceptor.BaseInterceptor; import com.ebay.myriad.scheduler.yarn.interceptor.InterceptorRegistry; -import com.google.common.collect.Lists; +import com.ebay.myriad.state.SchedulerState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; @@ -10,11 +10,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.mesos.Protos; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.inject.Inject; -import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -28,6 +28,7 @@ public class LeastAMNodesFirstPolicy extends BaseInterceptor implements NodeScal private static final Logger LOGGER = LoggerFactory.getLogger(LeastAMNodesFirstPolicy.class); private final AbstractYarnScheduler yarnScheduler; + private final SchedulerState schedulerState; //TODO(Santosh): Should figure out the right values for the hashmap properties. // currently it's tuned for 200 nodes and 50 RM RPC threads (Yarn's default). @@ -35,20 +36,26 @@ public class LeastAMNodesFirstPolicy extends BaseInterceptor implements NodeScal private static final int EXPECTED_CONCURRENT_ACCCESS_COUNT = 50; private static final float LOAD_FACTOR_DEFAULT = 0.75f; - private Map<String, SchedulerNode> schedulerNodes = new ConcurrentHashMap<>(INITIAL_NODE_SIZE, LOAD_FACTOR_DEFAULT, EXPECTED_CONCURRENT_ACCCESS_COUNT); + private Map<String, SchedulerNode> schedulerNodes = + new ConcurrentHashMap<>(INITIAL_NODE_SIZE, LOAD_FACTOR_DEFAULT, EXPECTED_CONCURRENT_ACCCESS_COUNT); @Inject - public LeastAMNodesFirstPolicy(InterceptorRegistry registry, AbstractYarnScheduler yarnScheduler) { + public LeastAMNodesFirstPolicy(InterceptorRegistry registry, + AbstractYarnScheduler yarnScheduler, + SchedulerState schedulerState) { registry.register(this); this.yarnScheduler = yarnScheduler; + this.schedulerState = schedulerState; } + /** + * Sort the given list of tasks by the number of App Master containers running on the corresponding NM node. + * @param taskIDs + */ @Override - public List<String> getNodesToScaleDown() { - List<SchedulerNode> nodes = Lists.newArrayList(this.schedulerNodes.values()); - + public void apply(List<Protos.TaskID> taskIDs) { if (LOGGER.isDebugEnabled()) { - for (SchedulerNode node : nodes) { + for (SchedulerNode node : schedulerNodes.values()) { LOGGER.debug("Host {} is running {} containers including {} App Masters", node.getNodeID().getHost(), node.getRunningContainers().size(), getNumAMContainers(node.getRunningContainers())); @@ -58,9 +65,22 @@ public class LeastAMNodesFirstPolicy extends BaseInterceptor implements NodeScal // process HBs from NodeManagers and the state of SchedulerNode objects might change while we // are in the middle of sorting them based on the least number of AM containers. synchronized (yarnScheduler) { - Collections.sort(nodes, new Comparator<SchedulerNode>() { + Collections.sort(taskIDs, new Comparator<Protos.TaskID>() { @Override - public int compare(SchedulerNode o1, SchedulerNode o2) { + public int compare(Protos.TaskID t1, Protos.TaskID t2) { + SchedulerNode o1 = schedulerNodes.get(schedulerState.getTask(t1).getHostname()); + SchedulerNode o2 = schedulerNodes.get(schedulerState.getTask(t2).getHostname()); + + if (o1 == null) { // a NM was launched by Myriad, but it hasn't yet registered with RM + if (o2 == null) { + return 0; + } else { + return -1; + } + } else if (o2 == null) { + return 1; + } // else, both the NMs have registered with RM + List<RMContainer> runningContainers1 = o1.getRunningContainers(); List<RMContainer> runningContainers2 = o2.getRunningContainers(); @@ -78,13 +98,6 @@ public class LeastAMNodesFirstPolicy extends BaseInterceptor implements NodeScal } }); } - - List<String> hosts = new ArrayList<>(nodes.size()); - for (SchedulerNode node : nodes) { - hosts.add(node.getNodeID().getHost()); - } - - return hosts; } @Override http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/main/java/com/ebay/myriad/policy/NodeScaleDownPolicy.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/policy/NodeScaleDownPolicy.java b/myriad-scheduler/src/main/java/com/ebay/myriad/policy/NodeScaleDownPolicy.java index db80761..f40d360 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/policy/NodeScaleDownPolicy.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/policy/NodeScaleDownPolicy.java @@ -1,5 +1,7 @@ package com.ebay.myriad.policy; +import org.apache.mesos.Protos; + import java.util.List; /** @@ -8,11 +10,9 @@ import java.util.List; public interface NodeScaleDownPolicy { /** - * Get a list of host names of the nodes that needs to be scaled down. - * The implementation of the policy should populate this list in a way that - * the most preferred nodes to be scaled down should occur first in the list. - * @return + * Apply a scale down policy to the given list of taskIDs. + * @param taskIDs */ - public List<String> getNodesToScaleDown(); + public void apply(List<Protos.TaskID> taskIDs); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java index 540d3e7..27fe406 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java @@ -20,17 +20,16 @@ import com.ebay.myriad.scheduler.constraints.Constraint; import com.ebay.myriad.scheduler.constraints.LikeConstraint; import com.ebay.myriad.state.NodeTask; import com.ebay.myriad.state.SchedulerState; -import com.google.common.collect.Sets; +import com.google.common.collect.Lists; import com.google.inject.Inject; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; import org.apache.mesos.Protos; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; + /** * Myriad scheduler operations */ @@ -57,68 +56,61 @@ public class MyriadOperations { public void flexDownCluster(NMProfile profile, Constraint constraint, int numInstancesToScaleDown) { // Flex down Pending tasks, if any - int numPendingTasksScaledDown = 0; - Set<Protos.TaskID> pendingTasks = Sets.newHashSet(this.schedulerState.getPendingTaskIds()); - - for (Protos.TaskID taskId : pendingTasks) { - NodeTask nodeTask = schedulerState.getTask(taskId); - if (nodeTask != null && nodeTask.getProfile().getName().equals(profile.getName()) && - meetsConstraint(nodeTask, constraint)) { - this.schedulerState.makeTaskKillable(taskId); - numPendingTasksScaledDown++; - if (numPendingTasksScaledDown == numInstancesToScaleDown) { - break; - } - } - } + int numPendingTasksScaledDown = flexDownPendingTasks( + profile, constraint, numInstancesToScaleDown); // Flex down Staging tasks, if any - int numStagingTasksScaledDown = 0; - if (numPendingTasksScaledDown < numInstancesToScaleDown) { - Set<Protos.TaskID> stagingTasks = Sets.newHashSet(this.schedulerState.getStagingTaskIds()); - - for (Protos.TaskID taskId : stagingTasks) { - NodeTask nodeTask = schedulerState.getTask(taskId); - if (nodeTask != null && nodeTask.getProfile().getName().equals(profile.getName()) && - meetsConstraint(nodeTask, constraint)) { - this.schedulerState.makeTaskKillable(taskId); - numStagingTasksScaledDown++; - if (numStagingTasksScaledDown + numPendingTasksScaledDown == numInstancesToScaleDown) { - break; - } - } - } - } + int numStagingTasksScaledDown = flexDownStagingTasks( + profile, constraint, numInstancesToScaleDown - numPendingTasksScaledDown); - int numActiveTasksScaledDown = 0; - if (numPendingTasksScaledDown + numStagingTasksScaledDown < numInstancesToScaleDown) { - Set<NodeTask> activeTasksForProfile = Sets.newHashSet(this.schedulerState.getActiveTasksForProfile(profile)); - List<String> nodesToScaleDown = nodeScaleDownPolicy.getNodesToScaleDown(); - filterUnregisteredNMs(activeTasksForProfile, nodesToScaleDown); - - for (int i = 0; i < numInstancesToScaleDown - (numPendingTasksScaledDown + numStagingTasksScaledDown); i++) { - for (NodeTask nodeTask : activeTasksForProfile) { - if (nodesToScaleDown.size() > i && - nodesToScaleDown.get(i).equals(nodeTask.getHostname()) && - meetsConstraint(nodeTask, constraint)) { - this.schedulerState.makeTaskKillable(nodeTask.getTaskStatus().getTaskId()); - numActiveTasksScaledDown++; - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Marked NodeTask {} on host {} for kill.", - nodeTask.getTaskStatus().getTaskId(), nodeTask.getHostname()); - } - } - } - } - } + // Flex down Active tasks, if any + int numActiveTasksScaledDown = flexDownActiveTasks( + profile, constraint, numInstancesToScaleDown - numPendingTasksScaledDown - numStagingTasksScaledDown); if (numActiveTasksScaledDown + numStagingTasksScaledDown + numPendingTasksScaledDown == 0) { - LOGGER.info("No Node Managers with profile '{}' and constraint {} found for scaling down.", - profile.getName(), constraint.toString()); + LOGGER.info("No Node Managers with profile '{}' and constraint '{}' found for scaling down.", + profile.getName(), constraint == null ? "null" : constraint.toString()); } else { - LOGGER.info("Flexed down {} active, {} staging and {} pending Node Managers with '{}' profile.", - numActiveTasksScaledDown, numStagingTasksScaledDown, numPendingTasksScaledDown, profile.getName()); + LOGGER.info("Flexed down {} active, {} staging and {} pending Node Managers with " + + "'{}' profile and constraint '{}'.", numActiveTasksScaledDown, numStagingTasksScaledDown, + numPendingTasksScaledDown, profile.getName(), constraint == null ? "null" : constraint.toString()); + } + } + + private int flexDownPendingTasks(NMProfile profile, Constraint constraint, int numInstancesToScaleDown) { + return numInstancesToScaleDown > 0 ? flexDownTasks(schedulerState.getPendingTaskIDsForProfile(profile), + profile, constraint, numInstancesToScaleDown) : 0; + } + + private int flexDownStagingTasks(NMProfile profile, Constraint constraint, int numInstancesToScaleDown) { + return numInstancesToScaleDown > 0 ? flexDownTasks(schedulerState.getStagingTaskIDsForProfile(profile), + profile, constraint, numInstancesToScaleDown) : 0; + } + + private int flexDownActiveTasks(NMProfile profile, Constraint constraint, int numInstancesToScaleDown) { + if (numInstancesToScaleDown > 0) { + List<Protos.TaskID> activeTasksForProfile = Lists.newArrayList(schedulerState.getActiveTaskIDsForProfile(profile)); + nodeScaleDownPolicy.apply(activeTasksForProfile); + return flexDownTasks(activeTasksForProfile, profile, constraint, numInstancesToScaleDown); + } + return 0; + } + + private int flexDownTasks(Collection<Protos.TaskID> taskIDs, NMProfile profile, + Constraint constraint, int numInstancesToScaleDown) { + int numInstancesScaledDown = 0; + for (Protos.TaskID taskID : taskIDs) { + NodeTask nodeTask = schedulerState.getTask(taskID); + if (nodeTask.getProfile().getName().equals(profile.getName()) && + meetsConstraint(nodeTask, constraint)) { + this.schedulerState.makeTaskKillable(taskID); + numInstancesScaledDown++; + if (numInstancesScaledDown == numInstancesToScaleDown) { + break; + } } + } + return numInstancesScaledDown; } private boolean meetsConstraint(NodeTask nodeTask, Constraint constraint) { @@ -142,22 +134,4 @@ public class MyriadOperations { return true; } - private void filterUnregisteredNMs(Set<NodeTask> activeTasksForProfile, List<String> registeredNMHosts) { - // If a NM is flexed down it takes time for the RM to realize the NM is no longer up - // We need to make sure we filter out nodes that have already been flexed down - // but have not disappeared from the RM's view of the cluster - for (Iterator<String> iterator = registeredNMHosts.iterator(); iterator.hasNext();) { - String nodeToScaleDown = iterator.next(); - boolean nodePresentInMyriad = false; - for (NodeTask nodeTask : activeTasksForProfile) { - if (nodeTask.getHostname().equals(nodeToScaleDown)) { - nodePresentInMyriad = true; - break; - } - } - if (!nodePresentInMyriad) { - iterator.remove(); - } - } - } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/SchedulerUtils.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/SchedulerUtils.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/SchedulerUtils.java index 36da5b1..46a3d89 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/SchedulerUtils.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/SchedulerUtils.java @@ -19,16 +19,11 @@ import com.ebay.myriad.state.NodeTask; import com.ebay.myriad.state.SchedulerState; import com.google.common.base.Preconditions; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.MapUtils; import org.apache.mesos.Protos; -import org.apache.mesos.Protos.Attribute; -import org.apache.mesos.Protos.Offer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; /** * Provides utilities for scheduling with the mesos offers @@ -36,25 +31,6 @@ import java.util.Map; public class SchedulerUtils { private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerUtils.class); - public static boolean isMatchSlaveAttributes(Offer offer, Map<String, String> requestAttributes) { - boolean match = true; - - Map<String, String> offerAttributes = new HashMap<>(); - for (Attribute attribute : offer.getAttributesList()) { - offerAttributes.put(attribute.getName(), attribute.getText().getValue()); - } - - // Match with offer attributes only if request has attributes. - if (!MapUtils.isEmpty(requestAttributes)) { - match = offerAttributes.equals(requestAttributes); - } - - LOGGER.debug("Match status: {} for offer: {} and requestAttributes: {}", - match, offer, requestAttributes); - - return match; - } - public static boolean isUniqueHostname(Protos.OfferOrBuilder offer, Collection<NodeTask> tasks) { Preconditions.checkArgument(offer != null); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/LikeConstraint.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/LikeConstraint.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/LikeConstraint.java index 5092783..727a19c 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/LikeConstraint.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/LikeConstraint.java @@ -80,7 +80,7 @@ public class LikeConstraint implements Constraint { if (lhs != null ? !lhs.equals(that.lhs) : that.lhs != null) { return false; } - if (pattern != null ? !pattern.equals(that.pattern) : that.pattern != null) { + if (pattern != null ? !pattern.pattern().equals(that.pattern.pattern()) : that.pattern != null) { return false; } @@ -90,7 +90,7 @@ public class LikeConstraint implements Constraint { @Override public int hashCode() { int result = lhs != null ? lhs.hashCode() : 0; - result = 31 * result + (pattern != null ? pattern.hashCode() : 0); + result = 31 * result + (pattern != null ? pattern.pattern().hashCode() : 0); return result; } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java index 1ce647f..cd90f56 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java @@ -177,7 +177,7 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv checkResource(mem < 0, "mem"); checkResource(ports < 0, "port"); - return checkAggregates(offer, profile, ports, cpus, mem); + return checkAggregates(profile, ports, cpus, mem); } private boolean meetsConstraint(Offer offer, Constraint constraint) { @@ -203,17 +203,16 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv } } - private boolean checkAggregates(Offer offer, NMProfile profile, int ports, double cpus, double mem) { - Map<String, String> requestAttributes = new HashMap<>(); + private boolean checkAggregates(NMProfile profile, int ports, double cpus, double mem) { if (taskUtils.getAggregateCpus(profile) <= cpus && taskUtils.getAggregateMemory(profile) <= mem - && SchedulerUtils.isMatchSlaveAttributes(offer, requestAttributes) && NMPorts.expectedNumPorts() <= ports) { return true; } else { - LOGGER.info("Offer not sufficient for task with, cpu: {}, memory: {}, ports: {}", - taskUtils.getAggregateCpus(profile), taskUtils.getAggregateMemory(profile), ports); + LOGGER.info("Offer not sufficient for launching task. Task requires cpu: {}, memory: {}, # of ports: {}. " + + "Offer has cpu: {}, memory: {}, # of ports: {}", taskUtils.getAggregateCpus(profile), + taskUtils.getAggregateMemory(profile), NMPorts.expectedNumPorts(), cpus, mem, ports); return false; } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java index e428a1d..28aa17d 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java @@ -16,26 +16,15 @@ package com.ebay.myriad.state; import com.ebay.myriad.scheduler.NMProfile; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; - +import com.ebay.myriad.state.utils.StoreContext; import org.apache.commons.collections.CollectionUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.mesos.Protos; import org.apache.mesos.Protos.SlaveID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import com.ebay.myriad.state.utils.StoreContext; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; /** * Represents the state of the Myriad scheduler @@ -175,6 +164,17 @@ public class SchedulerState { return Collections.unmodifiableSet(this.pendingTasks); } + public synchronized Collection<Protos.TaskID> getPendingTaskIDsForProfile(NMProfile profile) { + List<Protos.TaskID> pendingTaskIds = new ArrayList<>(); + for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) { + NodeTask nodeTask = entry.getValue(); + if (pendingTasks.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(profile.getName())) { + pendingTaskIds.add(entry.getKey()); + } + } + return Collections.unmodifiableCollection(pendingTaskIds); + } + public synchronized Set<Protos.TaskID> getActiveTaskIds() { return Collections.unmodifiableSet(this.activeTasks); } @@ -192,18 +192,18 @@ public class SchedulerState { return Collections.unmodifiableCollection(activeNodeTasks); } - public synchronized Collection<NodeTask> getActiveTasksForProfile(NMProfile profile) { - List<NodeTask> activeNodeTasks = new ArrayList<>(); + public synchronized Collection<Protos.TaskID> getActiveTaskIDsForProfile(NMProfile profile) { + List<Protos.TaskID> activeTaskIDs = new ArrayList<>(); if (CollectionUtils.isNotEmpty(activeTasks) && CollectionUtils.isNotEmpty(tasks.values())) { for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) { NodeTask nodeTask = entry.getValue(); if (activeTasks.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(profile.getName())) { - activeNodeTasks.add(nodeTask); + activeTaskIDs.add(entry.getKey()); } } } - return Collections.unmodifiableCollection(activeNodeTasks); + return Collections.unmodifiableCollection(activeTaskIDs); } // TODO (sdaingade) Clone NodeTask @@ -221,7 +221,18 @@ public class SchedulerState { return Collections.unmodifiableSet(this.stagingTasks); } - public synchronized Set<Protos.TaskID> getLostTaskIds() { + public synchronized Collection<Protos.TaskID> getStagingTaskIDsForProfile(NMProfile profile) { + List<Protos.TaskID> stagingTaskIDs = new ArrayList<>(); + for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) { + NodeTask nodeTask = entry.getValue(); + if (stagingTasks.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(profile.getName())) { + stagingTaskIDs.add(entry.getKey()); + } + } + return Collections.unmodifiableCollection(stagingTaskIDs); + } + + public synchronized Set<Protos.TaskID> getLostTaskIds() { return Collections.unmodifiableSet(this.lostTasks); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/constraints/LikeConstraintSpec.groovy ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/constraints/LikeConstraintSpec.groovy b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/constraints/LikeConstraintSpec.groovy index f2972a7..5504f33 100644 --- a/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/constraints/LikeConstraintSpec.groovy +++ b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/constraints/LikeConstraintSpec.groovy @@ -50,6 +50,18 @@ class LikeConstraintSpec extends Specification { getTextAttribute("random", "random value")) | true } + def "equals"() { + given: + def constraint1 = new LikeConstraint("hostname", "perfnode13[3-4].perf.lab") + def constraint2 = new LikeConstraint("hostname", "perfnode13[3-4].perf.lab") + def constraint3 = new LikeConstraint("hostname", "perfnode133.perf.lab") + + expect: + constraint1.equals(constraint2) + !constraint1.equals(constraint3) + !constraint2.equals(constraint3) + } + private static Protos.Attribute getTextAttribute(String name, String value) { Protos.Attribute.newBuilder() .setName(name)