Cleaned up flexdown code

- Fixed a bunch of edge cases esp. when constraint is not specified in the flex 
down request
- Refactored a code to make it more readable; flexdown now happens in 3 
(readable) steps:
   - flexdown pending tasks matching the given profile, constraints
   - flexdown staging tasks matching the given profile, constraints
   - flexdown active tasks matching the given profile, constraints
 - Minor cleanups of other classes wrt logging and removal of unused methods.


Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/34e3958c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/34e3958c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/34e3958c

Branch: refs/heads/master
Commit: 34e3958c4134b079fbc766f4fe4d2f88b42840f4
Parents: e8ec517
Author: Santosh Marella <smare...@maprtech.com>
Authored: Tue Oct 13 13:17:05 2015 -0700
Committer: Santosh Marella <mare...@gmail.com>
Committed: Thu Oct 15 12:56:46 2015 -0700

----------------------------------------------------------------------
 .../com/ebay/myriad/api/ClustersResource.java   |  16 ++-
 .../myriad/policy/LeastAMNodesFirstPolicy.java  |  47 ++++---
 .../ebay/myriad/policy/NodeScaleDownPolicy.java |  10 +-
 .../ebay/myriad/scheduler/MyriadOperations.java | 132 ++++++++-----------
 .../ebay/myriad/scheduler/SchedulerUtils.java   |  24 ----
 .../scheduler/constraints/LikeConstraint.java   |   4 +-
 .../handlers/ResourceOffersEventHandler.java    |  11 +-
 .../com/ebay/myriad/state/SchedulerState.java   |  53 +++++---
 .../constraints/LikeConstraintSpec.groovy       |  12 ++
 9 files changed, 148 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java
index f2a3018..9f47b51 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java
@@ -19,6 +19,7 @@ import com.codahale.metrics.annotation.Timed;
 import com.ebay.myriad.api.model.FlexDownClusterRequest;
 import com.ebay.myriad.api.model.FlexUpClusterRequest;
 import com.ebay.myriad.scheduler.MyriadOperations;
+import com.ebay.myriad.scheduler.NMProfile;
 import com.ebay.myriad.scheduler.NMProfileManager;
 import com.ebay.myriad.scheduler.constraints.ConstraintFactory;
 import com.ebay.myriad.state.SchedulerState;
@@ -108,11 +109,11 @@ public class ClustersResource {
         isValidRequest = isValidRequest && validateInstances(instances, 
response);
         isValidRequest = isValidRequest && validateConstraints(constraints, 
response);
 
-        Integer numFlexedUp = this.getNumFlexedupNMs();
+        Integer numFlexedUp = this.getNumFlexedupNMs(profile);
         if (isValidRequest && numFlexedUp < instances)  {
             String message = String.format("Number of requested instances for 
flexdown is greater than the number of " +
-                "Node Managers previously flexed up. Requested: %d, Previously 
flexed Up: %d. " +
-                "Only %d Node Managers will be flexed down", instances, 
numFlexedUp, numFlexedUp);
+                "Node Managers previously flexed up for profile '%s'. 
Requested: %d, Previously flexed Up: %d. " +
+                "Only %d Node Managers will be flexed down.", profile, 
instances, numFlexedUp, numFlexedUp);
             response.entity(message);
             LOGGER.warn(message);
         }
@@ -203,10 +204,11 @@ public class ClustersResource {
     }
 
 
-    private Integer getNumFlexedupNMs() {
-        return this.schedulerState.getActiveTaskIds().size()
-                + this.schedulerState.getStagingTaskIds().size()
-                + this.schedulerState.getPendingTaskIds().size();
+    private Integer getNumFlexedupNMs(String profile) {
+      NMProfile nmProfile = profileManager.get(profile);
+      return this.schedulerState.getActiveTaskIDsForProfile(nmProfile).size()
+                + 
this.schedulerState.getStagingTaskIDsForProfile(nmProfile).size()
+                + 
this.schedulerState.getPendingTaskIDsForProfile(nmProfile).size();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/main/java/com/ebay/myriad/policy/LeastAMNodesFirstPolicy.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/policy/LeastAMNodesFirstPolicy.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/policy/LeastAMNodesFirstPolicy.java
index 38b14a7..568247b 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/policy/LeastAMNodesFirstPolicy.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/policy/LeastAMNodesFirstPolicy.java
@@ -2,7 +2,7 @@ package com.ebay.myriad.policy;
 
 import com.ebay.myriad.scheduler.yarn.interceptor.BaseInterceptor;
 import com.ebay.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
-import com.google.common.collect.Lists;
+import com.ebay.myriad.state.SchedulerState;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
@@ -10,11 +10,11 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.mesos.Protos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -28,6 +28,7 @@ public class LeastAMNodesFirstPolicy extends BaseInterceptor 
implements NodeScal
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LeastAMNodesFirstPolicy.class);
 
     private final AbstractYarnScheduler yarnScheduler;
+    private final SchedulerState schedulerState;
 
     //TODO(Santosh): Should figure out the right values for the hashmap 
properties.
     // currently it's tuned for 200 nodes and 50 RM RPC threads (Yarn's 
default).
@@ -35,20 +36,26 @@ public class LeastAMNodesFirstPolicy extends 
BaseInterceptor implements NodeScal
     private static final int EXPECTED_CONCURRENT_ACCCESS_COUNT = 50;
     private static final float LOAD_FACTOR_DEFAULT = 0.75f;
 
-    private Map<String, SchedulerNode> schedulerNodes = new 
ConcurrentHashMap<>(INITIAL_NODE_SIZE, LOAD_FACTOR_DEFAULT, 
EXPECTED_CONCURRENT_ACCCESS_COUNT);
+    private Map<String, SchedulerNode> schedulerNodes =
+        new ConcurrentHashMap<>(INITIAL_NODE_SIZE, LOAD_FACTOR_DEFAULT, 
EXPECTED_CONCURRENT_ACCCESS_COUNT);
 
     @Inject
-    public LeastAMNodesFirstPolicy(InterceptorRegistry registry, 
AbstractYarnScheduler yarnScheduler) {
+    public LeastAMNodesFirstPolicy(InterceptorRegistry registry,
+                                   AbstractYarnScheduler yarnScheduler,
+                                   SchedulerState schedulerState) {
         registry.register(this);
         this.yarnScheduler = yarnScheduler;
+        this.schedulerState = schedulerState;
     }
 
+    /**
+     *  Sort the given list of tasks by the number of App Master containers 
running on the corresponding NM node.
+     * @param taskIDs
+     */
     @Override
-    public List<String> getNodesToScaleDown() {
-        List<SchedulerNode> nodes = 
Lists.newArrayList(this.schedulerNodes.values());
-
+    public void apply(List<Protos.TaskID> taskIDs) {
         if (LOGGER.isDebugEnabled()) {
-            for (SchedulerNode node : nodes) {
+            for (SchedulerNode node : schedulerNodes.values()) {
                 LOGGER.debug("Host {} is running {} containers including {} 
App Masters",
                         node.getNodeID().getHost(), 
node.getRunningContainers().size(),
                         getNumAMContainers(node.getRunningContainers()));
@@ -58,9 +65,22 @@ public class LeastAMNodesFirstPolicy extends BaseInterceptor 
implements NodeScal
         // process HBs from NodeManagers and the state of SchedulerNode 
objects might change while we
         // are in the middle of sorting them based on the least number of AM 
containers.
         synchronized (yarnScheduler) {
-            Collections.sort(nodes, new Comparator<SchedulerNode>() {
+            Collections.sort(taskIDs, new Comparator<Protos.TaskID>() {
                 @Override
-                public int compare(SchedulerNode o1, SchedulerNode o2) {
+                public int compare(Protos.TaskID t1, Protos.TaskID t2) {
+                    SchedulerNode o1 = 
schedulerNodes.get(schedulerState.getTask(t1).getHostname());
+                    SchedulerNode o2 = 
schedulerNodes.get(schedulerState.getTask(t2).getHostname());
+
+                    if (o1 == null) { // a NM was launched by Myriad, but it 
hasn't yet registered with RM
+                      if (o2 == null) {
+                        return 0;
+                      } else {
+                        return -1;
+                      }
+                    } else if (o2 == null) {
+                      return 1;
+                    } // else, both the NMs have registered with RM
+
                     List<RMContainer> runningContainers1 = 
o1.getRunningContainers();
                     List<RMContainer> runningContainers2 = 
o2.getRunningContainers();
 
@@ -78,13 +98,6 @@ public class LeastAMNodesFirstPolicy extends BaseInterceptor 
implements NodeScal
                 }
             });
         }
-
-        List<String> hosts = new ArrayList<>(nodes.size());
-        for (SchedulerNode node : nodes) {
-            hosts.add(node.getNodeID().getHost());
-        }
-
-        return hosts;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/main/java/com/ebay/myriad/policy/NodeScaleDownPolicy.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/policy/NodeScaleDownPolicy.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/policy/NodeScaleDownPolicy.java
index db80761..f40d360 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/policy/NodeScaleDownPolicy.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/policy/NodeScaleDownPolicy.java
@@ -1,5 +1,7 @@
 package com.ebay.myriad.policy;
 
+import org.apache.mesos.Protos;
+
 import java.util.List;
 
 /**
@@ -8,11 +10,9 @@ import java.util.List;
 public interface NodeScaleDownPolicy {
 
     /**
-     * Get a list of host names of the nodes that needs to be scaled down.
-     * The implementation of the policy should populate this list in a way that
-     * the most preferred nodes to be scaled down should occur first in the 
list.
-     * @return
+     * Apply a scale down policy to the given list of taskIDs.
+     * @param taskIDs
      */
-    public List<String> getNodesToScaleDown();
+    public void apply(List<Protos.TaskID> taskIDs);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
index 540d3e7..27fe406 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
@@ -20,17 +20,16 @@ import com.ebay.myriad.scheduler.constraints.Constraint;
 import com.ebay.myriad.scheduler.constraints.LikeConstraint;
 import com.ebay.myriad.state.NodeTask;
 import com.ebay.myriad.state.SchedulerState;
-import com.google.common.collect.Sets;
+import com.google.common.collect.Lists;
 import com.google.inject.Inject;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
 import org.apache.mesos.Protos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
 /**
  * Myriad scheduler operations
  */
@@ -57,68 +56,61 @@ public class MyriadOperations {
 
     public void flexDownCluster(NMProfile profile, Constraint constraint, int 
numInstancesToScaleDown) {
         // Flex down Pending tasks, if any
-        int numPendingTasksScaledDown = 0;
-          Set<Protos.TaskID> pendingTasks = 
Sets.newHashSet(this.schedulerState.getPendingTaskIds());
-
-          for (Protos.TaskID taskId : pendingTasks) {
-            NodeTask nodeTask = schedulerState.getTask(taskId);
-            if (nodeTask != null && 
nodeTask.getProfile().getName().equals(profile.getName()) &&
-                meetsConstraint(nodeTask, constraint)) {
-              this.schedulerState.makeTaskKillable(taskId);
-              numPendingTasksScaledDown++;
-              if (numPendingTasksScaledDown == numInstancesToScaleDown) {
-                break;
-              }
-            }
-          }
+        int numPendingTasksScaledDown = flexDownPendingTasks(
+            profile, constraint, numInstancesToScaleDown);
 
         // Flex down Staging tasks, if any
-        int numStagingTasksScaledDown = 0;
-        if (numPendingTasksScaledDown < numInstancesToScaleDown) {
-          Set<Protos.TaskID> stagingTasks = 
Sets.newHashSet(this.schedulerState.getStagingTaskIds());
-
-          for (Protos.TaskID taskId : stagingTasks) {
-            NodeTask nodeTask = schedulerState.getTask(taskId);
-            if (nodeTask != null && 
nodeTask.getProfile().getName().equals(profile.getName()) &&
-                meetsConstraint(nodeTask, constraint)) {
-              this.schedulerState.makeTaskKillable(taskId);
-              numStagingTasksScaledDown++;
-              if (numStagingTasksScaledDown + numPendingTasksScaledDown == 
numInstancesToScaleDown) {
-                break;
-              }
-            }
-          }
-        }
+        int numStagingTasksScaledDown = flexDownStagingTasks(
+            profile, constraint, numInstancesToScaleDown - 
numPendingTasksScaledDown);
 
-        int numActiveTasksScaledDown = 0;
-        if (numPendingTasksScaledDown + numStagingTasksScaledDown < 
numInstancesToScaleDown) {
-          Set<NodeTask> activeTasksForProfile = 
Sets.newHashSet(this.schedulerState.getActiveTasksForProfile(profile));
-          List<String> nodesToScaleDown = 
nodeScaleDownPolicy.getNodesToScaleDown();
-          filterUnregisteredNMs(activeTasksForProfile, nodesToScaleDown);
-
-          for (int i = 0; i < numInstancesToScaleDown - 
(numPendingTasksScaledDown + numStagingTasksScaledDown); i++) {
-            for (NodeTask nodeTask : activeTasksForProfile) {
-              if (nodesToScaleDown.size() > i &&
-                  nodesToScaleDown.get(i).equals(nodeTask.getHostname()) &&
-                  meetsConstraint(nodeTask, constraint)) {
-                
this.schedulerState.makeTaskKillable(nodeTask.getTaskStatus().getTaskId());
-                numActiveTasksScaledDown++;
-                if (LOGGER.isDebugEnabled()) {
-                  LOGGER.debug("Marked NodeTask {} on host {} for kill.",
-                      nodeTask.getTaskStatus().getTaskId(), 
nodeTask.getHostname());
-                }
-              }
-            }
-          }
-        }
+        // Flex down Active tasks, if any
+        int numActiveTasksScaledDown = flexDownActiveTasks(
+            profile, constraint, numInstancesToScaleDown - 
numPendingTasksScaledDown - numStagingTasksScaledDown);
 
         if (numActiveTasksScaledDown + numStagingTasksScaledDown + 
numPendingTasksScaledDown == 0) {
-          LOGGER.info("No Node Managers with profile '{}' and constraint {} 
found for scaling down.",
-              profile.getName(), constraint.toString());
+          LOGGER.info("No Node Managers with profile '{}' and constraint '{}' 
found for scaling down.",
+              profile.getName(), constraint == null ? "null" : 
constraint.toString());
         } else {
-          LOGGER.info("Flexed down {} active, {} staging  and {} pending Node 
Managers with '{}' profile.",
-              numActiveTasksScaledDown, numStagingTasksScaledDown, 
numPendingTasksScaledDown, profile.getName());
+          LOGGER.info("Flexed down {} active, {} staging  and {} pending Node 
Managers with " +
+              "'{}' profile and constraint '{}'.", numActiveTasksScaledDown, 
numStagingTasksScaledDown,
+              numPendingTasksScaledDown, profile.getName(), constraint == null 
? "null" : constraint.toString());
+        }
+    }
+
+    private int flexDownPendingTasks(NMProfile profile, Constraint constraint, 
int numInstancesToScaleDown) {
+      return numInstancesToScaleDown > 0 ? 
flexDownTasks(schedulerState.getPendingTaskIDsForProfile(profile),
+          profile, constraint, numInstancesToScaleDown) : 0;
+    }
+
+  private int flexDownStagingTasks(NMProfile profile, Constraint constraint, 
int numInstancesToScaleDown) {
+      return numInstancesToScaleDown > 0 ? 
flexDownTasks(schedulerState.getStagingTaskIDsForProfile(profile),
+          profile, constraint, numInstancesToScaleDown) : 0;
+    }
+
+    private int flexDownActiveTasks(NMProfile profile, Constraint constraint, 
int numInstancesToScaleDown) {
+      if (numInstancesToScaleDown > 0) {
+        List<Protos.TaskID> activeTasksForProfile = 
Lists.newArrayList(schedulerState.getActiveTaskIDsForProfile(profile));
+        nodeScaleDownPolicy.apply(activeTasksForProfile);
+        return flexDownTasks(activeTasksForProfile, profile, constraint, 
numInstancesToScaleDown);
+      }
+      return 0;
+    }
+
+  private int flexDownTasks(Collection<Protos.TaskID> taskIDs, NMProfile 
profile,
+                              Constraint constraint, int 
numInstancesToScaleDown) {
+      int numInstancesScaledDown = 0;
+      for (Protos.TaskID taskID : taskIDs) {
+        NodeTask nodeTask = schedulerState.getTask(taskID);
+        if (nodeTask.getProfile().getName().equals(profile.getName()) &&
+            meetsConstraint(nodeTask, constraint)) {
+          this.schedulerState.makeTaskKillable(taskID);
+          numInstancesScaledDown++;
+          if (numInstancesScaledDown == numInstancesToScaleDown) {
+            break;
+          }
         }
+      }
+      return numInstancesScaledDown;
     }
 
   private boolean meetsConstraint(NodeTask nodeTask, Constraint constraint) {
@@ -142,22 +134,4 @@ public class MyriadOperations {
     return true;
   }
 
-  private void filterUnregisteredNMs(Set<NodeTask> activeTasksForProfile, 
List<String> registeredNMHosts) {
-    // If a NM is flexed down it takes time for the RM to realize the NM is no 
longer up
-    // We need to make sure we filter out nodes that have already been flexed 
down
-    // but have not disappeared from the RM's view of the cluster
-    for (Iterator<String> iterator = registeredNMHosts.iterator(); 
iterator.hasNext();) {
-        String nodeToScaleDown = iterator.next();
-        boolean nodePresentInMyriad = false;
-        for (NodeTask nodeTask : activeTasksForProfile) {
-            if (nodeTask.getHostname().equals(nodeToScaleDown)) {
-                nodePresentInMyriad = true;
-                break;
-            }
-        }
-        if (!nodePresentInMyriad) {
-            iterator.remove();
-        }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/SchedulerUtils.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/SchedulerUtils.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/SchedulerUtils.java
index 36da5b1..46a3d89 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/SchedulerUtils.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/SchedulerUtils.java
@@ -19,16 +19,11 @@ import com.ebay.myriad.state.NodeTask;
 import com.ebay.myriad.state.SchedulerState;
 import com.google.common.base.Preconditions;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
 import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.Attribute;
-import org.apache.mesos.Protos.Offer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
 
 /**
  * Provides utilities for scheduling with the mesos offers
@@ -36,25 +31,6 @@ import java.util.Map;
 public class SchedulerUtils {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(SchedulerUtils.class);
 
-    public static boolean isMatchSlaveAttributes(Offer offer, Map<String, 
String> requestAttributes) {
-        boolean match = true;
-
-        Map<String, String> offerAttributes = new HashMap<>();
-        for (Attribute attribute : offer.getAttributesList()) {
-            offerAttributes.put(attribute.getName(), 
attribute.getText().getValue());
-        }
-
-        // Match with offer attributes only if request has attributes.
-        if (!MapUtils.isEmpty(requestAttributes)) {
-            match = offerAttributes.equals(requestAttributes);
-        }
-
-        LOGGER.debug("Match status: {} for offer: {} and requestAttributes: 
{}",
-                match, offer, requestAttributes);
-
-        return match;
-    }
-
     public static boolean isUniqueHostname(Protos.OfferOrBuilder offer,
                                            Collection<NodeTask> tasks) {
         Preconditions.checkArgument(offer != null);

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/LikeConstraint.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/LikeConstraint.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/LikeConstraint.java
index 5092783..727a19c 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/LikeConstraint.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/LikeConstraint.java
@@ -80,7 +80,7 @@ public class LikeConstraint implements Constraint {
     if (lhs != null ? !lhs.equals(that.lhs) : that.lhs != null) {
       return false;
     }
-    if (pattern != null ? !pattern.equals(that.pattern) : that.pattern != 
null) {
+    if (pattern != null ? !pattern.pattern().equals(that.pattern.pattern()) : 
that.pattern != null) {
       return false;
     }
 
@@ -90,7 +90,7 @@ public class LikeConstraint implements Constraint {
   @Override
   public int hashCode() {
     int result = lhs != null ? lhs.hashCode() : 0;
-    result = 31 * result + (pattern != null ? pattern.hashCode() : 0);
+    result = 31 * result + (pattern != null ? pattern.pattern().hashCode() : 
0);
     return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
index 1ce647f..cd90f56 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
@@ -177,7 +177,7 @@ public class ResourceOffersEventHandler implements 
EventHandler<ResourceOffersEv
     checkResource(mem < 0, "mem");
     checkResource(ports < 0, "port");
 
-    return checkAggregates(offer, profile, ports, cpus, mem);
+    return checkAggregates(profile, ports, cpus, mem);
   }
 
   private boolean meetsConstraint(Offer offer, Constraint constraint) {
@@ -203,17 +203,16 @@ public class ResourceOffersEventHandler implements 
EventHandler<ResourceOffersEv
     }
   }
 
-  private boolean checkAggregates(Offer offer, NMProfile profile, int ports, 
double cpus, double mem) {
-    Map<String, String> requestAttributes = new HashMap<>();
+  private boolean checkAggregates(NMProfile profile, int ports, double cpus, 
double mem) {
 
     if (taskUtils.getAggregateCpus(profile) <= cpus
         && taskUtils.getAggregateMemory(profile) <= mem
-        && SchedulerUtils.isMatchSlaveAttributes(offer, requestAttributes)
         && NMPorts.expectedNumPorts() <= ports) {
       return true;
     } else {
-      LOGGER.info("Offer not sufficient for task with, cpu: {}, memory: {}, 
ports: {}",
-          taskUtils.getAggregateCpus(profile), 
taskUtils.getAggregateMemory(profile), ports);
+      LOGGER.info("Offer not sufficient for launching task. Task requires cpu: 
{}, memory: {}, # of ports: {}. " +
+          "Offer has cpu: {}, memory: {}, # of ports: {}", 
taskUtils.getAggregateCpus(profile),
+          taskUtils.getAggregateMemory(profile), NMPorts.expectedNumPorts(), 
cpus, mem, ports);
       return false;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
index e428a1d..28aa17d 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
@@ -16,26 +16,15 @@
 package com.ebay.myriad.state;
 
 import com.ebay.myriad.scheduler.NMProfile;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
+import com.ebay.myriad.state.utils.StoreContext;
 import org.apache.commons.collections.CollectionUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.SlaveID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import com.ebay.myriad.state.utils.StoreContext;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Represents the state of the Myriad scheduler
@@ -175,6 +164,17 @@ public class SchedulerState {
         return Collections.unmodifiableSet(this.pendingTasks);
     }
 
+    public synchronized Collection<Protos.TaskID> 
getPendingTaskIDsForProfile(NMProfile profile) {
+      List<Protos.TaskID> pendingTaskIds = new ArrayList<>();
+      for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
+        NodeTask nodeTask = entry.getValue();
+        if (pendingTasks.contains(entry.getKey()) && 
nodeTask.getProfile().getName().equals(profile.getName())) {
+          pendingTaskIds.add(entry.getKey());
+        }
+      }
+      return Collections.unmodifiableCollection(pendingTaskIds);
+    }
+
     public synchronized Set<Protos.TaskID> getActiveTaskIds() {
         return Collections.unmodifiableSet(this.activeTasks);
     }
@@ -192,18 +192,18 @@ public class SchedulerState {
         return Collections.unmodifiableCollection(activeNodeTasks);
     }
 
-    public synchronized Collection<NodeTask> 
getActiveTasksForProfile(NMProfile profile) {
-      List<NodeTask> activeNodeTasks = new ArrayList<>();
+    public synchronized Collection<Protos.TaskID> 
getActiveTaskIDsForProfile(NMProfile profile) {
+      List<Protos.TaskID> activeTaskIDs = new ArrayList<>();
       if (CollectionUtils.isNotEmpty(activeTasks)
           && CollectionUtils.isNotEmpty(tasks.values())) {
         for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
           NodeTask nodeTask = entry.getValue();
           if (activeTasks.contains(entry.getKey()) && 
nodeTask.getProfile().getName().equals(profile.getName())) {
-            activeNodeTasks.add(nodeTask);
+            activeTaskIDs.add(entry.getKey());
           }
         }
       }
-      return Collections.unmodifiableCollection(activeNodeTasks);
+      return Collections.unmodifiableCollection(activeTaskIDs);
     }
 
   // TODO (sdaingade) Clone NodeTask
@@ -221,7 +221,18 @@ public class SchedulerState {
         return Collections.unmodifiableSet(this.stagingTasks);
     }
 
-    public synchronized Set<Protos.TaskID> getLostTaskIds() {
+    public synchronized Collection<Protos.TaskID> 
getStagingTaskIDsForProfile(NMProfile profile) {
+      List<Protos.TaskID> stagingTaskIDs = new ArrayList<>();
+      for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
+        NodeTask nodeTask = entry.getValue();
+        if (stagingTasks.contains(entry.getKey()) && 
nodeTask.getProfile().getName().equals(profile.getName())) {
+          stagingTaskIDs.add(entry.getKey());
+        }
+      }
+      return Collections.unmodifiableCollection(stagingTaskIDs);
+    }
+
+  public synchronized Set<Protos.TaskID> getLostTaskIds() {
         return Collections.unmodifiableSet(this.lostTasks);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/34e3958c/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/constraints/LikeConstraintSpec.groovy
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/constraints/LikeConstraintSpec.groovy
 
b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/constraints/LikeConstraintSpec.groovy
index f2972a7..5504f33 100644
--- 
a/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/constraints/LikeConstraintSpec.groovy
+++ 
b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/constraints/LikeConstraintSpec.groovy
@@ -50,6 +50,18 @@ class LikeConstraintSpec extends Specification {
         getTextAttribute("random", "random value"))                | true
   }
 
+  def "equals"() {
+    given:
+    def constraint1 = new LikeConstraint("hostname", 
"perfnode13[3-4].perf.lab")
+    def constraint2 = new LikeConstraint("hostname", 
"perfnode13[3-4].perf.lab")
+    def constraint3 = new LikeConstraint("hostname", "perfnode133.perf.lab")
+
+    expect:
+    constraint1.equals(constraint2)
+    !constraint1.equals(constraint3)
+    !constraint2.equals(constraint3)
+  }
+
   private static Protos.Attribute getTextAttribute(String name, String value) {
     Protos.Attribute.newBuilder()
         .setName(name)

Reply via email to