Implemented hostname constraint for flexup/flexdown

Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/b3d9f0cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/b3d9f0cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/b3d9f0cc

Branch: refs/heads/master
Commit: b3d9f0ccb2cbffb88d64c38767be3fad23011b2e
Parents: 10b9e6a
Author: Santosh Marella <smare...@maprtech.com>
Authored: Fri Sep 18 12:04:48 2015 -0700
Committer: Santosh Marella <mare...@gmail.com>
Committed: Thu Oct 15 12:56:45 2015 -0700

----------------------------------------------------------------------
 .../src/main/java/com/ebay/myriad/Main.java     |  2 +-
 .../com/ebay/myriad/api/ClustersResource.java   | 21 ++++-
 .../ebay/myriad/scheduler/MyriadOperations.java | 48 +++++++---
 .../com/ebay/myriad/scheduler/Rebalancer.java   |  2 +-
 .../ebay/myriad/scheduler/TaskTerminator.java   | 21 +++--
 .../scheduler/constraints/Constraint.java       | 17 ++++
 .../constraints/ConstraintFactory.java          | 18 ++++
 .../scheduler/constraints/LikeConstraint.java   | 94 ++++++++++++++++++++
 .../handlers/ResourceOffersEventHandler.java    | 38 +++++++-
 .../java/com/ebay/myriad/state/NodeTask.java    | 21 ++++-
 .../myriad/state/utils/ByteBufferSupport.java   | 35 +++++++-
 .../myriad/scheduler/SchedulerUtilsSpec.groovy  |  2 +-
 .../constraints/LikeConstraintSpec.groovy       | 63 +++++++++++++
 13 files changed, 349 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java
index cd8b90e..1bba999 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java
@@ -187,7 +187,7 @@ public class Main {
       MyriadOperations myriadOperations = 
injector.getInstance(MyriadOperations.class);
       NMProfileManager profileManager = 
injector.getInstance(NMProfileManager.class);
       for (Map.Entry<String, Integer> entry : nmInstances.entrySet()) {
-        myriadOperations.flexUpCluster(profileManager.get(entry.getKey()), 
entry.getValue());
+        myriadOperations.flexUpCluster(profileManager.get(entry.getKey()), 
entry.getValue(), null);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java
index 2b8fe0e..1cdb522 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java
@@ -20,9 +20,12 @@ import com.ebay.myriad.api.model.FlexDownClusterRequest;
 import com.ebay.myriad.api.model.FlexUpClusterRequest;
 import com.ebay.myriad.scheduler.MyriadOperations;
 import com.ebay.myriad.scheduler.NMProfileManager;
+import com.ebay.myriad.scheduler.constraints.ConstraintFactory;
 import com.ebay.myriad.state.SchedulerState;
 import com.google.common.base.Preconditions;
 import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
 import javax.inject.Inject;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.PUT;
@@ -78,7 +81,9 @@ public class ClustersResource {
 
         Response returnResponse = response.build();
         if (returnResponse.getStatus() == 
Response.Status.ACCEPTED.getStatusCode()) {
-            
this.myriadOperations.flexUpCluster(this.profileManager.get(profile), 
instances);
+          String constraint = constraints != null && !constraints.isEmpty() ? 
constraints.get(0) : null;
+          
this.myriadOperations.flexUpCluster(this.profileManager.get(profile), instances,
+              ConstraintFactory.createConstraint(constraint));
         }
 
         return returnResponse;
@@ -114,7 +119,9 @@ public class ClustersResource {
 
         Response returnResponse = response.build();
         if (returnResponse.getStatus() == 
Response.Status.ACCEPTED.getStatusCode()) {
-            this.myriadOperations.flexDownCluster(profileManager.get(profile), 
instances);
+            String constraint = constraints != null && !constraints.isEmpty() 
? constraints.get(0) : null;
+            this.myriadOperations.flexDownCluster(profileManager.get(profile),
+                ConstraintFactory.createConstraint(constraint), instances);
         }
         return returnResponse;
     }
@@ -177,12 +184,20 @@ public class ClustersResource {
 
       String[] splits = constraint.split(" LIKE "); // "<key> LIKE <val_regex>"
       if (splits.length != 2) {
-        String message = String.format("Invalid regex for LIKE operator in 
constraint: %s. Format: %s",
+        String message = String.format("Invalid format for LIKE operator in 
constraint: %s. Format: %s",
             constraint, CONSTRAINT_FORMAT);
         response.status(Status.BAD_REQUEST).entity(message);
         LOGGER.error(message);
         return false;
       }
+      try {
+        Pattern.compile(splits[1]);
+      } catch (PatternSyntaxException e) {
+        String message = String.format("Invalid regex for LIKE operator in 
constraint: %s", constraint);
+        response.status(Status.BAD_REQUEST).entity(message);
+        LOGGER.error(message, e);
+        return false;
+      }
       return true;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
index 84ec723..bb1a9fc 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
@@ -16,19 +16,20 @@
 package com.ebay.myriad.scheduler;
 
 import com.ebay.myriad.policy.NodeScaleDownPolicy;
+import com.ebay.myriad.scheduler.constraints.Constraint;
+import com.ebay.myriad.scheduler.constraints.LikeConstraint;
 import com.ebay.myriad.state.NodeTask;
 import com.ebay.myriad.state.SchedulerState;
 import com.google.common.collect.Sets;
 import com.google.inject.Inject;
-import org.apache.mesos.Protos;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Myriad scheduler operations
@@ -45,22 +46,24 @@ public class MyriadOperations {
       this.nodeScaleDownPolicy = nodeScaleDownPolicy;
     }
 
-    public void flexUpCluster(NMProfile profile, int instances) {
+    public void flexUpCluster(NMProfile profile, int instances, Constraint 
constraint) {
         Collection<NodeTask> nodes = new HashSet<>();
         for (int i = 0; i < instances; i++) {
-            nodes.add(new NodeTask(profile));
+            nodes.add(new NodeTask(profile, constraint));
         }
 
         this.schedulerState.addNodes(nodes);
     }
 
-    public void flexDownCluster(NMProfile profile, int 
numInstancesToScaleDown) {
+    public void flexDownCluster(NMProfile profile, Constraint constraint, int 
numInstancesToScaleDown) {
         // Flex down Pending tasks, if any
         int numPendingTasksScaledDown = 0;
           Set<Protos.TaskID> pendingTasks = 
Sets.newHashSet(this.schedulerState.getPendingTaskIds());
 
           for (Protos.TaskID taskId : pendingTasks) {
-            if 
(schedulerState.getTask(taskId).getProfile().getName().equals(profile.getName()))
 {
+            NodeTask nodeTask = schedulerState.getTask(taskId);
+            if (nodeTask.getProfile().getName().equals(profile.getName()) &&
+                meetsConstraint(nodeTask, constraint)) {
               this.schedulerState.makeTaskKillable(taskId);
               numPendingTasksScaledDown++;
               if (numPendingTasksScaledDown == numInstancesToScaleDown) {
@@ -75,7 +78,9 @@ public class MyriadOperations {
           Set<Protos.TaskID> stagingTasks = 
Sets.newHashSet(this.schedulerState.getStagingTaskIds());
 
           for (Protos.TaskID taskId : stagingTasks) {
-            if 
(schedulerState.getTask(taskId).getProfile().getName().equals(profile.getName()))
 {
+            NodeTask nodeTask = schedulerState.getTask(taskId);
+            if (nodeTask.getProfile().getName().equals(profile.getName()) &&
+                meetsConstraint(nodeTask, constraint)) {
               this.schedulerState.makeTaskKillable(taskId);
               numStagingTasksScaledDown++;
               if (numStagingTasksScaledDown + numPendingTasksScaledDown == 
numInstancesToScaleDown) {
@@ -93,7 +98,9 @@ public class MyriadOperations {
 
           for (int i = 0; i < numInstancesToScaleDown - 
(numPendingTasksScaledDown + numStagingTasksScaledDown); i++) {
             for (NodeTask nodeTask : activeTasksForProfile) {
-              if (nodesToScaleDown.size() > i && 
nodesToScaleDown.get(i).equals(nodeTask.getHostname())) {
+              if (nodesToScaleDown.size() > i &&
+                  nodesToScaleDown.get(i).equals(nodeTask.getHostname()) &&
+                  meetsConstraint(nodeTask, constraint)) {
                 
this.schedulerState.makeTaskKillable(nodeTask.getTaskStatus().getTaskId());
                 numActiveTasksScaledDown++;
                 if (LOGGER.isDebugEnabled()) {
@@ -106,13 +113,32 @@ public class MyriadOperations {
         }
 
         if (numActiveTasksScaledDown + numStagingTasksScaledDown + 
numPendingTasksScaledDown == 0) {
-          LOGGER.info("No Node Managers with profile '{}' found for scaling 
down.", profile.getName());
+          LOGGER.info("No Node Managers with profile '{}' and constraint {} 
found for scaling down.",
+              profile.getName(), constraint.toString());
         } else {
           LOGGER.info("Flexed down {} active, {} staging  and {} pending Node 
Managers with '{}' profile.",
               numActiveTasksScaledDown, numStagingTasksScaledDown, 
numPendingTasksScaledDown, profile.getName());
         }
     }
 
+  private boolean meetsConstraint(NodeTask nodeTask, Constraint constraint) {
+    if (constraint != null) {
+      if (constraint.equals(nodeTask.getConstraint())) {
+        return true;
+      }
+      switch (constraint.getType()) {
+        case LIKE:
+          LikeConstraint likeConstraint = (LikeConstraint) constraint;
+          if (likeConstraint.isConstraintOnHostName()) {
+            return likeConstraint.matchesHostName(nodeTask.getHostname());
+          } else {
+            return 
likeConstraint.matchesSlaveAttributes(nodeTask.getSlaveAttributes());
+          }
+      }
+    }
+    return true;
+  }
+
   private void filterUnregisteredNMs(Set<NodeTask> activeTasksForProfile, 
List<String> registeredNMHosts) {
     // If a NM is flexed down it takes time for the RM to realize the NM is no 
longer up
     // We need to make sure we filter out nodes that have already been flexed 
down

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java
index 164e7fa..e76a107 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java
@@ -44,7 +44,7 @@ public class Rebalancer implements Runnable {
     public void run() {
         LOGGER.info("Active {}, Pending {}", 
schedulerState.getActiveTaskIds().size(), 
schedulerState.getPendingTaskIds().size());
         if (schedulerState.getActiveTaskIds().size() < 1 && 
schedulerState.getPendingTaskIds().size() < 1) {
-            myriadOperations.flexUpCluster(profileManager.get("small"), 1);
+            myriadOperations.flexUpCluster(profileManager.get("small"), 1, 
null);
         }
 //            RestAdapter restAdapter = new RestAdapter.Builder()
 //                    .setEndpoint("http://"; + host + ":" + port)

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskTerminator.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskTerminator.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskTerminator.java
index 874b445..db3ad9c 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskTerminator.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskTerminator.java
@@ -15,33 +15,28 @@
  */
 package com.ebay.myriad.scheduler;
 
-import com.ebay.myriad.configuration.MyriadConfiguration;
 import com.ebay.myriad.state.SchedulerState;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
+import java.util.Set;
+import javax.inject.Inject;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.mesos.Protos.Status;
 import org.apache.mesos.Protos.TaskID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.inject.Inject;
-import java.util.Set;
-
 /**
  * {@link TaskTerminator} is responsible for killing tasks.
  */
 public class TaskTerminator implements Runnable {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MyriadDriverManager.class);
 
-    private MyriadConfiguration cfg;
     private SchedulerState schedulerState;
     private MyriadDriverManager driverManager;
 
     @Inject
-    public TaskTerminator(MyriadConfiguration cfg,
-                          SchedulerState schedulerState, MyriadDriverManager 
driverManager) {
-        this.cfg = cfg;
+    public TaskTerminator(SchedulerState schedulerState, MyriadDriverManager 
driverManager) {
         this.schedulerState = schedulerState;
         this.driverManager = driverManager;
     }
@@ -64,9 +59,13 @@ public class TaskTerminator implements Runnable {
         }
 
         for (TaskID taskIdToKill : killableTasks) {
-            Status status = this.driverManager.kill(taskIdToKill);
-            this.schedulerState.removeTask(taskIdToKill);
-            Preconditions.checkState(status == Status.DRIVER_RUNNING);
+            if 
(this.schedulerState.getPendingTaskIds().contains(taskIdToKill)) {
+              this.schedulerState.removeTask(taskIdToKill);
+            } else {
+              Status status = this.driverManager.kill(taskIdToKill);
+              this.schedulerState.removeTask(taskIdToKill);
+              Preconditions.checkState(status == Status.DRIVER_RUNNING);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/Constraint.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/Constraint.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/Constraint.java
new file mode 100644
index 0000000..c59219f
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/Constraint.java
@@ -0,0 +1,17 @@
+package com.ebay.myriad.scheduler.constraints;
+
+/**
+ * Interface for Constraint.
+ */
+public interface Constraint {
+  /**
+   * Type of Constraint
+   */
+  enum Type {
+    NULL, // to help with serialization
+    LIKE
+  }
+
+  public Type getType();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/ConstraintFactory.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/ConstraintFactory.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/ConstraintFactory.java
new file mode 100644
index 0000000..8cfd128
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/ConstraintFactory.java
@@ -0,0 +1,18 @@
+package com.ebay.myriad.scheduler.constraints;
+
+/**
+ * Factory to create constraints.
+ */
+public class ConstraintFactory {
+
+  public static Constraint createConstraint(String constraintStr) {
+    if (constraintStr != null) {
+      String[] splits = constraintStr.split(" LIKE ");
+      if (splits.length == 2) {
+        return new LikeConstraint(splits[0], splits[1]);
+      }
+    }
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/LikeConstraint.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/LikeConstraint.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/LikeConstraint.java
new file mode 100644
index 0000000..480ecfc
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/LikeConstraint.java
@@ -0,0 +1,94 @@
+package com.ebay.myriad.scheduler.constraints;
+
+import com.google.gson.Gson;
+import java.util.Collection;
+import java.util.regex.Pattern;
+import org.apache.mesos.Protos.Attribute;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Constraint for LIKE operator.
+ * Format: <mesos_slave_attribute|hostname> LIKE <regex_value>
+ */
+public class LikeConstraint implements Constraint {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LikeConstraint.class);
+
+  private String lhs;
+  private String rhsRegex;
+
+  public LikeConstraint(String lhs, String rhsRegex) {
+    this.lhs = lhs;
+    this.rhsRegex = rhsRegex;
+  }
+
+  public boolean isConstraintOnHostName() {
+    return lhs.equalsIgnoreCase("hostname");
+  }
+
+  public boolean matchesHostName(String hostname) {
+    return lhs.equalsIgnoreCase("hostname") && hostname != null && 
Pattern.matches(rhsRegex, hostname);
+  }
+
+  public boolean matchesSlaveAttributes(Collection<Attribute> attributes) {
+    if (!lhs.equalsIgnoreCase("hostname") && attributes != null) {
+      for (Attribute attr : attributes) {
+        if (attr.getName().equalsIgnoreCase(lhs)) {
+          switch (attr.getType()) {
+            case TEXT:
+              return Pattern.matches(rhsRegex, attr.getText().getValue());
+
+            case SCALAR:
+              return Pattern.matches(rhsRegex, 
String.valueOf(attr.getScalar().getValue()));
+
+            default:
+              LOGGER.warn("LIKE constraint currently doesn't support Mesos 
slave attributes " +
+                  "of type {}. Attribute Name: {}", attr.getType(), 
attr.getName());
+              return false;
+
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public Type getType() {
+    return Type.LIKE;
+  }
+
+  @Override
+  public String toString() {
+    Gson gson = new Gson();
+    return gson.toJson(this);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof LikeConstraint)) {
+      return false;
+    }
+
+    LikeConstraint that = (LikeConstraint) o;
+
+    if (lhs != null ? !lhs.equals(that.lhs) : that.lhs != null) {
+      return false;
+    }
+    if (rhsRegex != null ? !rhsRegex.equals(that.rhsRegex) : that.rhsRegex != 
null) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = lhs != null ? lhs.hashCode() : 0;
+    result = 31 * result + (rhsRegex != null ? rhsRegex.hashCode() : 0);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
index 59ea547..1ce647f 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
@@ -20,6 +20,8 @@ import com.ebay.myriad.scheduler.NMProfile;
 import com.ebay.myriad.scheduler.SchedulerUtils;
 import com.ebay.myriad.scheduler.TaskFactory;
 import com.ebay.myriad.scheduler.TaskUtils;
+import com.ebay.myriad.scheduler.constraints.Constraint;
+import com.ebay.myriad.scheduler.constraints.LikeConstraint;
 import com.ebay.myriad.scheduler.event.ResourceOffersEvent;
 import com.ebay.myriad.scheduler.fgs.OfferLifecycleManager;
 import com.ebay.myriad.state.NodeTask;
@@ -78,7 +80,10 @@ public class ResourceOffersEventHandler implements 
EventHandler<ResourceOffersEv
     // a notification for "framework registration". This is a simple defensive 
code
     // to not process any offers unless Myriad receives a "framework 
registered" notification.
     if (schedulerState.getFrameworkID() == null) {
-      LOGGER.warn("Received {} offers, but not processing them since Framework 
ID is not yet set", offers.size());
+      LOGGER.warn("Received {} offers, but declining them since Framework ID 
is not yet set", offers.size());
+      for (Offer offer : offers) {
+        driver.declineOffer(offer.getId());
+      }
       return;
     }
     LOGGER.info("Received offers {}", offers.size());
@@ -87,14 +92,19 @@ public class ResourceOffersEventHandler implements 
EventHandler<ResourceOffersEv
     try {
       for (Iterator<Offer> iterator = offers.iterator(); iterator.hasNext();) {
         Offer offer = iterator.next();
+        NodeTask nodeTask = schedulerState.getNodeTask(offer.getSlaveId());
+        if (nodeTask != null) {
+          nodeTask.setSlaveAttributes(offer.getAttributesList());
+        }
         Set<Protos.TaskID> pendingTasks = schedulerState.getPendingTaskIds();
         if (CollectionUtils.isNotEmpty(pendingTasks)) {
           for (Protos.TaskID pendingTaskId : pendingTasks) {
             NodeTask taskToLaunch = schedulerState
                 .getTask(pendingTaskId);
             NMProfile profile = taskToLaunch.getProfile();
+            Constraint constraint = taskToLaunch.getConstraint();
 
-            if (matches(offer, profile)
+            if (matches(offer, profile, constraint)
                 && SchedulerUtils.isUniqueHostname(offer,
                 schedulerState.getActiveTasks())) {
               TaskInfo task = taskFactory.createTask(offer, 
schedulerState.getFrameworkID(), pendingTaskId,
@@ -143,7 +153,12 @@ public class ResourceOffersEventHandler implements 
EventHandler<ResourceOffersEv
     }
   }
 
-  private boolean matches(Offer offer, NMProfile profile) {
+  private boolean matches(Offer offer, NMProfile profile, Constraint 
constraint) {
+
+    if (!meetsConstraint(offer, constraint)) {
+      return false;
+    }
+
     Map<String, Object> results = new HashMap<String, Object>(5);
 
     for (Resource resource : offer.getResourcesList()) {
@@ -165,6 +180,23 @@ public class ResourceOffersEventHandler implements 
EventHandler<ResourceOffersEv
     return checkAggregates(offer, profile, ports, cpus, mem);
   }
 
+  private boolean meetsConstraint(Offer offer, Constraint constraint) {
+    if (constraint != null) {
+      switch (constraint.getType()) {
+        case LIKE:
+        {
+          LikeConstraint likeConstraint = (LikeConstraint) constraint;
+          if (likeConstraint.isConstraintOnHostName()) {
+            return likeConstraint.matchesHostName(offer.getHostname());
+          } else {
+            return 
likeConstraint.matchesSlaveAttributes(offer.getAttributesList());
+          }
+        }
+      }
+    }
+    return true;
+  }
+
   private void checkResource(boolean fail, String resource) {
     if (fail) {
       LOGGER.info("No " + resource + " resources present");

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java
index 8191eed..943112f 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java
@@ -16,8 +16,11 @@
 package com.ebay.myriad.state;
 
 import com.ebay.myriad.scheduler.NMProfile;
+import com.ebay.myriad.scheduler.constraints.Constraint;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
 import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.Attribute;
 
 /**
  * Represents a task to be launched by the executor
@@ -37,9 +40,13 @@ public class NodeTask {
      */
     private Protos.ExecutorInfo executorInfo;
 
-    public NodeTask(NMProfile profile) {
+    private Constraint constraint;
+    private List<Attribute> slaveAttributes;
+
+    public NodeTask(NMProfile profile, Constraint constraint) {
         this.profile = profile;
         this.hostname = "";
+        this.constraint = constraint;
     }
 
     public Protos.SlaveID getSlaveId() {
@@ -58,6 +65,10 @@ public class NodeTask {
         this.profile = profile;
     }
 
+    public Constraint getConstraint() {
+      return constraint;
+    }
+
     public String getHostname() {
         return this.hostname;
     }
@@ -81,4 +92,12 @@ public class NodeTask {
     public void setExecutorInfo(Protos.ExecutorInfo executorInfo) {
         this.executorInfo = executorInfo;
     }
+
+    public void setSlaveAttributes(List<Attribute> slaveAttributes) {
+      this.slaveAttributes = slaveAttributes;
+    }
+
+    public List<Attribute> getSlaveAttributes() {
+      return slaveAttributes;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
index 3d8d57e..f95e861 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
@@ -18,6 +18,9 @@
 
 package com.ebay.myriad.state.utils;
 
+import com.ebay.myriad.scheduler.constraints.Constraint;
+import com.ebay.myriad.scheduler.constraints.Constraint.Type;
+import com.ebay.myriad.scheduler.constraints.LikeConstraint;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -102,6 +105,18 @@ public class ByteBufferSupport {
     byte[] profile = toBytes(nt.getProfile().toString());
     int size = profile.length + INT_SIZE;
 
+    Constraint constraint = nt.getConstraint();
+    Constraint.Type type = constraint == null ? Type.NULL : 
constraint.getType();
+    size += INT_SIZE;
+
+    byte[] constraintBytes = ZERO_BYTES;
+    if (constraint != null) {
+      constraintBytes = toBytes(constraint.toString());
+      size += constraintBytes.length + INT_SIZE;
+    } else {
+      size += INT_SIZE;
+    }
+
     byte[] hostname = toBytes(nt.getHostname());
     size += hostname.length + INT_SIZE;
 
@@ -126,6 +141,8 @@ public class ByteBufferSupport {
     // Allocate and populate the buffer.
     ByteBuffer bb = createBuffer(size);
     putBytes(bb, profile);
+    bb.putInt(type.ordinal());
+    putBytes(bb, constraintBytes);
     putBytes(bb, hostname);
     putBytes(bb, getSlaveBytes(nt));
     putBytes(bb, getTaskBytes(nt));
@@ -173,7 +190,7 @@ public class ByteBufferSupport {
   public static NodeTask toNodeTask(ByteBuffer bb) {
     NodeTask nt = null;
     if (bb != null && bb.array().length > 0) {
-      nt = new NodeTask(getProfile(bb));
+      nt = new NodeTask(getProfile(bb), getConstraint(bb));
       nt.setHostname(toString(bb));
       nt.setSlaveId(toSlaveId(bb));
       nt.setTaskStatus(toTaskStatus(bb));
@@ -260,6 +277,22 @@ public class ByteBufferSupport {
     }
   }
 
+  public static Constraint getConstraint(ByteBuffer bb) {
+    Constraint.Type type = Constraint.Type.values()[bb.getInt()];
+    String p = toString(bb);
+    switch (type) {
+      case NULL:
+        return null;
+
+      case LIKE:
+
+        if (!StringUtils.isEmpty(p)) {
+          return gson.fromJson(p, LikeConstraint.class);
+        }
+    }
+    return null;
+  }
+
   public static Protos.SlaveID toSlaveId(ByteBuffer bb) {
     int size = bb.getInt();
     if (size > 0) {

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/SchedulerUtilsSpec.groovy
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/SchedulerUtilsSpec.groovy
 
b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/SchedulerUtilsSpec.groovy
index bce37ad..305021a 100644
--- 
a/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/SchedulerUtilsSpec.groovy
+++ 
b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/SchedulerUtilsSpec.groovy
@@ -39,7 +39,7 @@ class SchedulerUtilsSpec extends Specification {
 
 
     NodeTask createNodeTask(String hostname) {
-        def node = new NodeTask(new NMProfile("", 1, 1))
+        def node = new NodeTask(new NMProfile("", 1, 1), null)
         node.hostname = hostname
         node
     }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/constraints/LikeConstraintSpec.groovy
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/constraints/LikeConstraintSpec.groovy
 
b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/constraints/LikeConstraintSpec.groovy
new file mode 100644
index 0000000..f2972a7
--- /dev/null
+++ 
b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/constraints/LikeConstraintSpec.groovy
@@ -0,0 +1,63 @@
+package com.ebay.myriad.scheduler.constraints
+
+import com.google.common.collect.Lists
+import org.apache.mesos.Protos
+import spock.lang.Specification
+
+import static org.apache.mesos.Protos.Value.Text
+import static org.apache.mesos.Protos.Value.Type.TEXT
+
+/**
+ *
+ * Test for LikeConstraint
+ *
+ */
+class LikeConstraintSpec extends Specification {
+
+  def "is matching host name"() {
+    given:
+    def constraint = new LikeConstraint("hostname", "host-[0-9]*.example.com")
+
+    expect:
+    returnValue == constraint.matchesHostName(inputHostName)
+
+    where:
+    inputHostName         | returnValue
+    null                  | false
+    ""                    | false
+    "blah-blue"           | false
+    "host-12.example.com" | true
+    "host-1.example.com"  | true
+    "host-2.example.com"  | true
+  }
+
+  def "is matching dfs attribute"() {
+    given:
+    def constraint = new LikeConstraint("dfs", "true")
+
+    expect:
+    returnValue == constraint.matchesSlaveAttributes(attributes)
+
+    where:
+    attributes                                                     | 
returnValue
+    null                                                           | false
+    Lists.newArrayList()                                           | false
+    Lists.newArrayList(getTextAttribute("dfs", ""))                | false
+    Lists.newArrayList(getTextAttribute("dfs", "false"))           | false
+    Lists.newArrayList(getTextAttribute("Distributed FS", "true")) | false
+    Lists.newArrayList(getTextAttribute("dfs", "true"))            | true
+    Lists.newArrayList(getTextAttribute("dfs", "true"),
+        getTextAttribute("random", "random value"))                | true
+  }
+
+  private static Protos.Attribute getTextAttribute(String name, String value) {
+    Protos.Attribute.newBuilder()
+        .setName(name)
+        .setType(TEXT)
+        .setText(Text.newBuilder()
+        .setValue(value))
+        .build()
+  }
+
+
+}

Reply via email to