Implemented hostname constraint for flexup/flexdown
Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/b3d9f0cc Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/b3d9f0cc Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/b3d9f0cc Branch: refs/heads/master Commit: b3d9f0ccb2cbffb88d64c38767be3fad23011b2e Parents: 10b9e6a Author: Santosh Marella <smare...@maprtech.com> Authored: Fri Sep 18 12:04:48 2015 -0700 Committer: Santosh Marella <mare...@gmail.com> Committed: Thu Oct 15 12:56:45 2015 -0700 ---------------------------------------------------------------------- .../src/main/java/com/ebay/myriad/Main.java | 2 +- .../com/ebay/myriad/api/ClustersResource.java | 21 ++++- .../ebay/myriad/scheduler/MyriadOperations.java | 48 +++++++--- .../com/ebay/myriad/scheduler/Rebalancer.java | 2 +- .../ebay/myriad/scheduler/TaskTerminator.java | 21 +++-- .../scheduler/constraints/Constraint.java | 17 ++++ .../constraints/ConstraintFactory.java | 18 ++++ .../scheduler/constraints/LikeConstraint.java | 94 ++++++++++++++++++++ .../handlers/ResourceOffersEventHandler.java | 38 +++++++- .../java/com/ebay/myriad/state/NodeTask.java | 21 ++++- .../myriad/state/utils/ByteBufferSupport.java | 35 +++++++- .../myriad/scheduler/SchedulerUtilsSpec.groovy | 2 +- .../constraints/LikeConstraintSpec.groovy | 63 +++++++++++++ 13 files changed, 349 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java b/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java index cd8b90e..1bba999 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java @@ -187,7 +187,7 @@ public class Main { MyriadOperations myriadOperations = injector.getInstance(MyriadOperations.class); NMProfileManager profileManager = injector.getInstance(NMProfileManager.class); for (Map.Entry<String, Integer> entry : nmInstances.entrySet()) { - myriadOperations.flexUpCluster(profileManager.get(entry.getKey()), entry.getValue()); + myriadOperations.flexUpCluster(profileManager.get(entry.getKey()), entry.getValue(), null); } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java b/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java index 2b8fe0e..1cdb522 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java @@ -20,9 +20,12 @@ import com.ebay.myriad.api.model.FlexDownClusterRequest; import com.ebay.myriad.api.model.FlexUpClusterRequest; import com.ebay.myriad.scheduler.MyriadOperations; import com.ebay.myriad.scheduler.NMProfileManager; +import com.ebay.myriad.scheduler.constraints.ConstraintFactory; import com.ebay.myriad.state.SchedulerState; import com.google.common.base.Preconditions; import java.util.List; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; import javax.inject.Inject; import javax.ws.rs.Consumes; import javax.ws.rs.PUT; @@ -78,7 +81,9 @@ public class ClustersResource { Response returnResponse = response.build(); if (returnResponse.getStatus() == Response.Status.ACCEPTED.getStatusCode()) { - this.myriadOperations.flexUpCluster(this.profileManager.get(profile), instances); + String constraint = constraints != null && !constraints.isEmpty() ? constraints.get(0) : null; + this.myriadOperations.flexUpCluster(this.profileManager.get(profile), instances, + ConstraintFactory.createConstraint(constraint)); } return returnResponse; @@ -114,7 +119,9 @@ public class ClustersResource { Response returnResponse = response.build(); if (returnResponse.getStatus() == Response.Status.ACCEPTED.getStatusCode()) { - this.myriadOperations.flexDownCluster(profileManager.get(profile), instances); + String constraint = constraints != null && !constraints.isEmpty() ? constraints.get(0) : null; + this.myriadOperations.flexDownCluster(profileManager.get(profile), + ConstraintFactory.createConstraint(constraint), instances); } return returnResponse; } @@ -177,12 +184,20 @@ public class ClustersResource { String[] splits = constraint.split(" LIKE "); // "<key> LIKE <val_regex>" if (splits.length != 2) { - String message = String.format("Invalid regex for LIKE operator in constraint: %s. Format: %s", + String message = String.format("Invalid format for LIKE operator in constraint: %s. Format: %s", constraint, CONSTRAINT_FORMAT); response.status(Status.BAD_REQUEST).entity(message); LOGGER.error(message); return false; } + try { + Pattern.compile(splits[1]); + } catch (PatternSyntaxException e) { + String message = String.format("Invalid regex for LIKE operator in constraint: %s", constraint); + response.status(Status.BAD_REQUEST).entity(message); + LOGGER.error(message, e); + return false; + } return true; } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java index 84ec723..bb1a9fc 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java @@ -16,19 +16,20 @@ package com.ebay.myriad.scheduler; import com.ebay.myriad.policy.NodeScaleDownPolicy; +import com.ebay.myriad.scheduler.constraints.Constraint; +import com.ebay.myriad.scheduler.constraints.LikeConstraint; import com.ebay.myriad.state.NodeTask; import com.ebay.myriad.state.SchedulerState; import com.google.common.collect.Sets; import com.google.inject.Inject; -import org.apache.mesos.Protos; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Myriad scheduler operations @@ -45,22 +46,24 @@ public class MyriadOperations { this.nodeScaleDownPolicy = nodeScaleDownPolicy; } - public void flexUpCluster(NMProfile profile, int instances) { + public void flexUpCluster(NMProfile profile, int instances, Constraint constraint) { Collection<NodeTask> nodes = new HashSet<>(); for (int i = 0; i < instances; i++) { - nodes.add(new NodeTask(profile)); + nodes.add(new NodeTask(profile, constraint)); } this.schedulerState.addNodes(nodes); } - public void flexDownCluster(NMProfile profile, int numInstancesToScaleDown) { + public void flexDownCluster(NMProfile profile, Constraint constraint, int numInstancesToScaleDown) { // Flex down Pending tasks, if any int numPendingTasksScaledDown = 0; Set<Protos.TaskID> pendingTasks = Sets.newHashSet(this.schedulerState.getPendingTaskIds()); for (Protos.TaskID taskId : pendingTasks) { - if (schedulerState.getTask(taskId).getProfile().getName().equals(profile.getName())) { + NodeTask nodeTask = schedulerState.getTask(taskId); + if (nodeTask.getProfile().getName().equals(profile.getName()) && + meetsConstraint(nodeTask, constraint)) { this.schedulerState.makeTaskKillable(taskId); numPendingTasksScaledDown++; if (numPendingTasksScaledDown == numInstancesToScaleDown) { @@ -75,7 +78,9 @@ public class MyriadOperations { Set<Protos.TaskID> stagingTasks = Sets.newHashSet(this.schedulerState.getStagingTaskIds()); for (Protos.TaskID taskId : stagingTasks) { - if (schedulerState.getTask(taskId).getProfile().getName().equals(profile.getName())) { + NodeTask nodeTask = schedulerState.getTask(taskId); + if (nodeTask.getProfile().getName().equals(profile.getName()) && + meetsConstraint(nodeTask, constraint)) { this.schedulerState.makeTaskKillable(taskId); numStagingTasksScaledDown++; if (numStagingTasksScaledDown + numPendingTasksScaledDown == numInstancesToScaleDown) { @@ -93,7 +98,9 @@ public class MyriadOperations { for (int i = 0; i < numInstancesToScaleDown - (numPendingTasksScaledDown + numStagingTasksScaledDown); i++) { for (NodeTask nodeTask : activeTasksForProfile) { - if (nodesToScaleDown.size() > i && nodesToScaleDown.get(i).equals(nodeTask.getHostname())) { + if (nodesToScaleDown.size() > i && + nodesToScaleDown.get(i).equals(nodeTask.getHostname()) && + meetsConstraint(nodeTask, constraint)) { this.schedulerState.makeTaskKillable(nodeTask.getTaskStatus().getTaskId()); numActiveTasksScaledDown++; if (LOGGER.isDebugEnabled()) { @@ -106,13 +113,32 @@ public class MyriadOperations { } if (numActiveTasksScaledDown + numStagingTasksScaledDown + numPendingTasksScaledDown == 0) { - LOGGER.info("No Node Managers with profile '{}' found for scaling down.", profile.getName()); + LOGGER.info("No Node Managers with profile '{}' and constraint {} found for scaling down.", + profile.getName(), constraint.toString()); } else { LOGGER.info("Flexed down {} active, {} staging and {} pending Node Managers with '{}' profile.", numActiveTasksScaledDown, numStagingTasksScaledDown, numPendingTasksScaledDown, profile.getName()); } } + private boolean meetsConstraint(NodeTask nodeTask, Constraint constraint) { + if (constraint != null) { + if (constraint.equals(nodeTask.getConstraint())) { + return true; + } + switch (constraint.getType()) { + case LIKE: + LikeConstraint likeConstraint = (LikeConstraint) constraint; + if (likeConstraint.isConstraintOnHostName()) { + return likeConstraint.matchesHostName(nodeTask.getHostname()); + } else { + return likeConstraint.matchesSlaveAttributes(nodeTask.getSlaveAttributes()); + } + } + } + return true; + } + private void filterUnregisteredNMs(Set<NodeTask> activeTasksForProfile, List<String> registeredNMHosts) { // If a NM is flexed down it takes time for the RM to realize the NM is no longer up // We need to make sure we filter out nodes that have already been flexed down http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java index 164e7fa..e76a107 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java @@ -44,7 +44,7 @@ public class Rebalancer implements Runnable { public void run() { LOGGER.info("Active {}, Pending {}", schedulerState.getActiveTaskIds().size(), schedulerState.getPendingTaskIds().size()); if (schedulerState.getActiveTaskIds().size() < 1 && schedulerState.getPendingTaskIds().size() < 1) { - myriadOperations.flexUpCluster(profileManager.get("small"), 1); + myriadOperations.flexUpCluster(profileManager.get("small"), 1, null); } // RestAdapter restAdapter = new RestAdapter.Builder() // .setEndpoint("http://" + host + ":" + port) http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskTerminator.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskTerminator.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskTerminator.java index 874b445..db3ad9c 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskTerminator.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskTerminator.java @@ -15,33 +15,28 @@ */ package com.ebay.myriad.scheduler; -import com.ebay.myriad.configuration.MyriadConfiguration; import com.ebay.myriad.state.SchedulerState; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; +import java.util.Set; +import javax.inject.Inject; import org.apache.commons.collections.CollectionUtils; import org.apache.mesos.Protos.Status; import org.apache.mesos.Protos.TaskID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.inject.Inject; -import java.util.Set; - /** * {@link TaskTerminator} is responsible for killing tasks. */ public class TaskTerminator implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(MyriadDriverManager.class); - private MyriadConfiguration cfg; private SchedulerState schedulerState; private MyriadDriverManager driverManager; @Inject - public TaskTerminator(MyriadConfiguration cfg, - SchedulerState schedulerState, MyriadDriverManager driverManager) { - this.cfg = cfg; + public TaskTerminator(SchedulerState schedulerState, MyriadDriverManager driverManager) { this.schedulerState = schedulerState; this.driverManager = driverManager; } @@ -64,9 +59,13 @@ public class TaskTerminator implements Runnable { } for (TaskID taskIdToKill : killableTasks) { - Status status = this.driverManager.kill(taskIdToKill); - this.schedulerState.removeTask(taskIdToKill); - Preconditions.checkState(status == Status.DRIVER_RUNNING); + if (this.schedulerState.getPendingTaskIds().contains(taskIdToKill)) { + this.schedulerState.removeTask(taskIdToKill); + } else { + Status status = this.driverManager.kill(taskIdToKill); + this.schedulerState.removeTask(taskIdToKill); + Preconditions.checkState(status == Status.DRIVER_RUNNING); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/Constraint.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/Constraint.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/Constraint.java new file mode 100644 index 0000000..c59219f --- /dev/null +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/Constraint.java @@ -0,0 +1,17 @@ +package com.ebay.myriad.scheduler.constraints; + +/** + * Interface for Constraint. + */ +public interface Constraint { + /** + * Type of Constraint + */ + enum Type { + NULL, // to help with serialization + LIKE + } + + public Type getType(); + +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/ConstraintFactory.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/ConstraintFactory.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/ConstraintFactory.java new file mode 100644 index 0000000..8cfd128 --- /dev/null +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/ConstraintFactory.java @@ -0,0 +1,18 @@ +package com.ebay.myriad.scheduler.constraints; + +/** + * Factory to create constraints. + */ +public class ConstraintFactory { + + public static Constraint createConstraint(String constraintStr) { + if (constraintStr != null) { + String[] splits = constraintStr.split(" LIKE "); + if (splits.length == 2) { + return new LikeConstraint(splits[0], splits[1]); + } + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/LikeConstraint.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/LikeConstraint.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/LikeConstraint.java new file mode 100644 index 0000000..480ecfc --- /dev/null +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/constraints/LikeConstraint.java @@ -0,0 +1,94 @@ +package com.ebay.myriad.scheduler.constraints; + +import com.google.gson.Gson; +import java.util.Collection; +import java.util.regex.Pattern; +import org.apache.mesos.Protos.Attribute; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Constraint for LIKE operator. + * Format: <mesos_slave_attribute|hostname> LIKE <regex_value> + */ +public class LikeConstraint implements Constraint { + private static final Logger LOGGER = LoggerFactory.getLogger(LikeConstraint.class); + + private String lhs; + private String rhsRegex; + + public LikeConstraint(String lhs, String rhsRegex) { + this.lhs = lhs; + this.rhsRegex = rhsRegex; + } + + public boolean isConstraintOnHostName() { + return lhs.equalsIgnoreCase("hostname"); + } + + public boolean matchesHostName(String hostname) { + return lhs.equalsIgnoreCase("hostname") && hostname != null && Pattern.matches(rhsRegex, hostname); + } + + public boolean matchesSlaveAttributes(Collection<Attribute> attributes) { + if (!lhs.equalsIgnoreCase("hostname") && attributes != null) { + for (Attribute attr : attributes) { + if (attr.getName().equalsIgnoreCase(lhs)) { + switch (attr.getType()) { + case TEXT: + return Pattern.matches(rhsRegex, attr.getText().getValue()); + + case SCALAR: + return Pattern.matches(rhsRegex, String.valueOf(attr.getScalar().getValue())); + + default: + LOGGER.warn("LIKE constraint currently doesn't support Mesos slave attributes " + + "of type {}. Attribute Name: {}", attr.getType(), attr.getName()); + return false; + + } + } + } + } + return false; + } + + @Override + public Type getType() { + return Type.LIKE; + } + + @Override + public String toString() { + Gson gson = new Gson(); + return gson.toJson(this); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof LikeConstraint)) { + return false; + } + + LikeConstraint that = (LikeConstraint) o; + + if (lhs != null ? !lhs.equals(that.lhs) : that.lhs != null) { + return false; + } + if (rhsRegex != null ? !rhsRegex.equals(that.rhsRegex) : that.rhsRegex != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = lhs != null ? lhs.hashCode() : 0; + result = 31 * result + (rhsRegex != null ? rhsRegex.hashCode() : 0); + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java index 59ea547..1ce647f 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java @@ -20,6 +20,8 @@ import com.ebay.myriad.scheduler.NMProfile; import com.ebay.myriad.scheduler.SchedulerUtils; import com.ebay.myriad.scheduler.TaskFactory; import com.ebay.myriad.scheduler.TaskUtils; +import com.ebay.myriad.scheduler.constraints.Constraint; +import com.ebay.myriad.scheduler.constraints.LikeConstraint; import com.ebay.myriad.scheduler.event.ResourceOffersEvent; import com.ebay.myriad.scheduler.fgs.OfferLifecycleManager; import com.ebay.myriad.state.NodeTask; @@ -78,7 +80,10 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv // a notification for "framework registration". This is a simple defensive code // to not process any offers unless Myriad receives a "framework registered" notification. if (schedulerState.getFrameworkID() == null) { - LOGGER.warn("Received {} offers, but not processing them since Framework ID is not yet set", offers.size()); + LOGGER.warn("Received {} offers, but declining them since Framework ID is not yet set", offers.size()); + for (Offer offer : offers) { + driver.declineOffer(offer.getId()); + } return; } LOGGER.info("Received offers {}", offers.size()); @@ -87,14 +92,19 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv try { for (Iterator<Offer> iterator = offers.iterator(); iterator.hasNext();) { Offer offer = iterator.next(); + NodeTask nodeTask = schedulerState.getNodeTask(offer.getSlaveId()); + if (nodeTask != null) { + nodeTask.setSlaveAttributes(offer.getAttributesList()); + } Set<Protos.TaskID> pendingTasks = schedulerState.getPendingTaskIds(); if (CollectionUtils.isNotEmpty(pendingTasks)) { for (Protos.TaskID pendingTaskId : pendingTasks) { NodeTask taskToLaunch = schedulerState .getTask(pendingTaskId); NMProfile profile = taskToLaunch.getProfile(); + Constraint constraint = taskToLaunch.getConstraint(); - if (matches(offer, profile) + if (matches(offer, profile, constraint) && SchedulerUtils.isUniqueHostname(offer, schedulerState.getActiveTasks())) { TaskInfo task = taskFactory.createTask(offer, schedulerState.getFrameworkID(), pendingTaskId, @@ -143,7 +153,12 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv } } - private boolean matches(Offer offer, NMProfile profile) { + private boolean matches(Offer offer, NMProfile profile, Constraint constraint) { + + if (!meetsConstraint(offer, constraint)) { + return false; + } + Map<String, Object> results = new HashMap<String, Object>(5); for (Resource resource : offer.getResourcesList()) { @@ -165,6 +180,23 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv return checkAggregates(offer, profile, ports, cpus, mem); } + private boolean meetsConstraint(Offer offer, Constraint constraint) { + if (constraint != null) { + switch (constraint.getType()) { + case LIKE: + { + LikeConstraint likeConstraint = (LikeConstraint) constraint; + if (likeConstraint.isConstraintOnHostName()) { + return likeConstraint.matchesHostName(offer.getHostname()); + } else { + return likeConstraint.matchesSlaveAttributes(offer.getAttributesList()); + } + } + } + } + return true; + } + private void checkResource(boolean fail, String resource) { if (fail) { LOGGER.info("No " + resource + " resources present"); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java index 8191eed..943112f 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java @@ -16,8 +16,11 @@ package com.ebay.myriad.state; import com.ebay.myriad.scheduler.NMProfile; +import com.ebay.myriad.scheduler.constraints.Constraint; import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; import org.apache.mesos.Protos; +import org.apache.mesos.Protos.Attribute; /** * Represents a task to be launched by the executor @@ -37,9 +40,13 @@ public class NodeTask { */ private Protos.ExecutorInfo executorInfo; - public NodeTask(NMProfile profile) { + private Constraint constraint; + private List<Attribute> slaveAttributes; + + public NodeTask(NMProfile profile, Constraint constraint) { this.profile = profile; this.hostname = ""; + this.constraint = constraint; } public Protos.SlaveID getSlaveId() { @@ -58,6 +65,10 @@ public class NodeTask { this.profile = profile; } + public Constraint getConstraint() { + return constraint; + } + public String getHostname() { return this.hostname; } @@ -81,4 +92,12 @@ public class NodeTask { public void setExecutorInfo(Protos.ExecutorInfo executorInfo) { this.executorInfo = executorInfo; } + + public void setSlaveAttributes(List<Attribute> slaveAttributes) { + this.slaveAttributes = slaveAttributes; + } + + public List<Attribute> getSlaveAttributes() { + return slaveAttributes; + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java index 3d8d57e..f95e861 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java @@ -18,6 +18,9 @@ package com.ebay.myriad.state.utils; +import com.ebay.myriad.scheduler.constraints.Constraint; +import com.ebay.myriad.scheduler.constraints.Constraint.Type; +import com.ebay.myriad.scheduler.constraints.LikeConstraint; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -102,6 +105,18 @@ public class ByteBufferSupport { byte[] profile = toBytes(nt.getProfile().toString()); int size = profile.length + INT_SIZE; + Constraint constraint = nt.getConstraint(); + Constraint.Type type = constraint == null ? Type.NULL : constraint.getType(); + size += INT_SIZE; + + byte[] constraintBytes = ZERO_BYTES; + if (constraint != null) { + constraintBytes = toBytes(constraint.toString()); + size += constraintBytes.length + INT_SIZE; + } else { + size += INT_SIZE; + } + byte[] hostname = toBytes(nt.getHostname()); size += hostname.length + INT_SIZE; @@ -126,6 +141,8 @@ public class ByteBufferSupport { // Allocate and populate the buffer. ByteBuffer bb = createBuffer(size); putBytes(bb, profile); + bb.putInt(type.ordinal()); + putBytes(bb, constraintBytes); putBytes(bb, hostname); putBytes(bb, getSlaveBytes(nt)); putBytes(bb, getTaskBytes(nt)); @@ -173,7 +190,7 @@ public class ByteBufferSupport { public static NodeTask toNodeTask(ByteBuffer bb) { NodeTask nt = null; if (bb != null && bb.array().length > 0) { - nt = new NodeTask(getProfile(bb)); + nt = new NodeTask(getProfile(bb), getConstraint(bb)); nt.setHostname(toString(bb)); nt.setSlaveId(toSlaveId(bb)); nt.setTaskStatus(toTaskStatus(bb)); @@ -260,6 +277,22 @@ public class ByteBufferSupport { } } + public static Constraint getConstraint(ByteBuffer bb) { + Constraint.Type type = Constraint.Type.values()[bb.getInt()]; + String p = toString(bb); + switch (type) { + case NULL: + return null; + + case LIKE: + + if (!StringUtils.isEmpty(p)) { + return gson.fromJson(p, LikeConstraint.class); + } + } + return null; + } + public static Protos.SlaveID toSlaveId(ByteBuffer bb) { int size = bb.getInt(); if (size > 0) { http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/SchedulerUtilsSpec.groovy ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/SchedulerUtilsSpec.groovy b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/SchedulerUtilsSpec.groovy index bce37ad..305021a 100644 --- a/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/SchedulerUtilsSpec.groovy +++ b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/SchedulerUtilsSpec.groovy @@ -39,7 +39,7 @@ class SchedulerUtilsSpec extends Specification { NodeTask createNodeTask(String hostname) { - def node = new NodeTask(new NMProfile("", 1, 1)) + def node = new NodeTask(new NMProfile("", 1, 1), null) node.hostname = hostname node } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b3d9f0cc/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/constraints/LikeConstraintSpec.groovy ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/constraints/LikeConstraintSpec.groovy b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/constraints/LikeConstraintSpec.groovy new file mode 100644 index 0000000..f2972a7 --- /dev/null +++ b/myriad-scheduler/src/test/java/com/ebay/myriad/scheduler/constraints/LikeConstraintSpec.groovy @@ -0,0 +1,63 @@ +package com.ebay.myriad.scheduler.constraints + +import com.google.common.collect.Lists +import org.apache.mesos.Protos +import spock.lang.Specification + +import static org.apache.mesos.Protos.Value.Text +import static org.apache.mesos.Protos.Value.Type.TEXT + +/** + * + * Test for LikeConstraint + * + */ +class LikeConstraintSpec extends Specification { + + def "is matching host name"() { + given: + def constraint = new LikeConstraint("hostname", "host-[0-9]*.example.com") + + expect: + returnValue == constraint.matchesHostName(inputHostName) + + where: + inputHostName | returnValue + null | false + "" | false + "blah-blue" | false + "host-12.example.com" | true + "host-1.example.com" | true + "host-2.example.com" | true + } + + def "is matching dfs attribute"() { + given: + def constraint = new LikeConstraint("dfs", "true") + + expect: + returnValue == constraint.matchesSlaveAttributes(attributes) + + where: + attributes | returnValue + null | false + Lists.newArrayList() | false + Lists.newArrayList(getTextAttribute("dfs", "")) | false + Lists.newArrayList(getTextAttribute("dfs", "false")) | false + Lists.newArrayList(getTextAttribute("Distributed FS", "true")) | false + Lists.newArrayList(getTextAttribute("dfs", "true")) | true + Lists.newArrayList(getTextAttribute("dfs", "true"), + getTextAttribute("random", "random value")) | true + } + + private static Protos.Attribute getTextAttribute(String name, String value) { + Protos.Attribute.newBuilder() + .setName(name) + .setType(TEXT) + .setText(Text.newBuilder() + .setValue(value)) + .build() + } + + +}