Support profile in flexdown
Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/0acbdbdd Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/0acbdbdd Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/0acbdbdd Branch: refs/heads/master Commit: 0acbdbdd4aeecf500c43f14244def727d1be52de Parents: afa9ac9 Author: Santosh Marella <smare...@maprtech.com> Authored: Fri Sep 11 17:45:27 2015 -0700 Committer: Santosh Marella <mare...@gmail.com> Committed: Thu Oct 15 12:56:45 2015 -0700 ---------------------------------------------------------------------- .../src/main/java/com/ebay/myriad/Main.java | 3 +- .../com/ebay/myriad/api/ClustersResource.java | 58 ++++++---- .../api/model/FlexDownClusterRequest.java | 14 ++- .../ebay/myriad/scheduler/MyriadOperations.java | 109 +++++++++---------- .../com/ebay/myriad/scheduler/Rebalancer.java | 27 ++--- .../handlers/ResourceOffersEventHandler.java | 7 ++ .../com/ebay/myriad/state/SchedulerState.java | 17 ++- 7 files changed, 137 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0acbdbdd/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java b/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java index 9871d58..cd8b90e 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java @@ -185,8 +185,9 @@ public class Main { private void startNMInstances(Injector injector) { Map<String, Integer> nmInstances = injector.getInstance(MyriadConfiguration.class).getNmInstances(); MyriadOperations myriadOperations = injector.getInstance(MyriadOperations.class); + NMProfileManager profileManager = injector.getInstance(NMProfileManager.class); for (Map.Entry<String, Integer> entry : nmInstances.entrySet()) { - myriadOperations.flexUpCluster(entry.getValue(), entry.getKey()); + myriadOperations.flexUpCluster(profileManager.get(entry.getKey()), entry.getValue()); } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0acbdbdd/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java b/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java index f040a44..05ba7e3 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java @@ -60,25 +60,19 @@ public class ClustersResource { @Produces(MediaType.TEXT_PLAIN) @Consumes(MediaType.APPLICATION_JSON) public Response flexUp(FlexUpClusterRequest request) { - Preconditions.checkNotNull(request, - "request object cannot be null or empty"); + Preconditions.checkNotNull(request, "request object cannot be null or empty"); Integer instances = request.getInstances(); String profile = request.getProfile(); LOGGER.info("Received flexup request. Profile: {}, Instances: {}", profile, instances); - // Validate profile request Response.ResponseBuilder response = Response.status(Response.Status.ACCEPTED); - if (!this.profileManager.exists(profile)) { - response.status(Response.Status.BAD_REQUEST) - .entity("Profile does not exist: '" + profile + "'"); - LOGGER.error("Provided profile does not exist: '" + profile + "'"); - } - validateInstances(instances, response); + boolean isValidRequest = validateProfile(profile, response); + isValidRequest = isValidRequest && validateInstances(instances, response); Response returnResponse = response.build(); if (returnResponse.getStatus() == Response.Status.ACCEPTED.getStatusCode()) { - this.myriadOperations.flexUpCluster(instances, profile); + this.myriadOperations.flexUpCluster(this.profileManager.get(profile), instances); } return returnResponse; @@ -90,37 +84,59 @@ public class ClustersResource { @Produces(MediaType.TEXT_PLAIN) @Consumes(MediaType.APPLICATION_JSON) public Response flexDown(FlexDownClusterRequest request) { - Preconditions.checkNotNull(request, - "request object cannot be null or empty"); + Preconditions.checkNotNull(request, "request object cannot be null or empty"); Integer instances = request.getInstances(); - LOGGER.info("Received flexdown request. Instances: {}", instances); + String profile = request.getProfile(); + LOGGER.info("Received flex down request. Profile: {}, Instances: {}", profile, instances); Response.ResponseBuilder response = Response.status(Response.Status.ACCEPTED); - validateInstances(instances, response); + boolean isValidRequest = validateProfile(profile, response); + isValidRequest = isValidRequest && validateInstances(instances, response); Integer numFlexedUp = this.getNumFlexedupNMs(); - if (numFlexedUp < instances) { - String message = String.format("Number of requested instances for flexdown is greater than the number " + - "of Node Managers flexed up. Requested: %d, Flexed Up: %d. Only %d Node Managers " + - "will be flexed down", instances, numFlexedUp, numFlexedUp); + if (isValidRequest && numFlexedUp < instances) { + String message = String.format("Number of requested instances for flexdown is greater than the number of " + + "Node Managers previously flexed up. Requested: %d, Previously flexed Up: %d. " + + "Only %d Node Managers will be flexed down", instances, numFlexedUp, numFlexedUp); response.entity(message); LOGGER.warn(message); } Response returnResponse = response.build(); if (returnResponse.getStatus() == Response.Status.ACCEPTED.getStatusCode()) { - this.myriadOperations.flexDownCluster(instances); + this.myriadOperations.flexDownCluster(profileManager.get(profile), instances); } return returnResponse; } - private void validateInstances(Integer instances, ResponseBuilder response) { - if (!(instances > 0)) { + private boolean validateProfile(String profile, ResponseBuilder response) { + if (profile == null || profile.isEmpty()) { + response.status(Response.Status.BAD_REQUEST).entity("'profile' is null or empty"); + LOGGER.error("'profile' is null or empty"); + return false; + } + if (!this.profileManager.exists(profile)) { + response.status(Response.Status.BAD_REQUEST) + .entity("Profile does not exist: '" + profile + "'"); + LOGGER.error("Provided profile does not exist: '" + profile + "'"); + return false; + } + return true; + } + + private boolean validateInstances(Integer instances, ResponseBuilder response) { + if (instances == null) { + response.status(Response.Status.BAD_REQUEST).entity("'instances' is null"); + LOGGER.error("'instances' is null"); + return false; + } else if (!(instances > 0)) { response.status(Response.Status.BAD_REQUEST) .entity("Invalid instance size: " + instances); LOGGER.error("Invalid instance size request " + instances); + return false; } + return true; } private Integer getNumFlexedupNMs() { http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0acbdbdd/myriad-scheduler/src/main/java/com/ebay/myriad/api/model/FlexDownClusterRequest.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/api/model/FlexDownClusterRequest.java b/myriad-scheduler/src/main/java/com/ebay/myriad/api/model/FlexDownClusterRequest.java index eaf4918..e801e65 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/api/model/FlexDownClusterRequest.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/api/model/FlexDownClusterRequest.java @@ -24,13 +24,17 @@ import org.hibernate.validator.constraints.NotEmpty; public class FlexDownClusterRequest { @NotEmpty + public String profile; + + @NotEmpty public Integer instances; public FlexDownClusterRequest() { } - public FlexDownClusterRequest(Integer instances) { + public FlexDownClusterRequest(String profile, Integer instances) { this.instances = instances; + this.profile = profile; } public Integer getInstances() { @@ -41,6 +45,14 @@ public class FlexDownClusterRequest { this.instances = instances; } + public String getProfile() { + return profile; + } + + public void setProfile(String profile) { + this.profile = profile; + } + public String toString() { Gson gson = new Gson(); return gson.toJson(this); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0acbdbdd/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java index 4947647..ed2adbd 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java @@ -15,7 +15,6 @@ */ package com.ebay.myriad.scheduler; -import com.ebay.myriad.configuration.MyriadConfiguration; import com.ebay.myriad.policy.NodeScaleDownPolicy; import com.ebay.myriad.state.NodeTask; import com.ebay.myriad.state.SchedulerState; @@ -37,65 +36,36 @@ import java.util.Set; public class MyriadOperations { private static final Logger LOGGER = LoggerFactory.getLogger(MyriadOperations.class); private final SchedulerState schedulerState; - private MyriadConfiguration cfg; - private NMProfileManager profileManager; private NodeScaleDownPolicy nodeScaleDownPolicy; @Inject - public MyriadOperations(MyriadConfiguration cfg, - SchedulerState schedulerState, - NMProfileManager profileManager, + public MyriadOperations(SchedulerState schedulerState, NodeScaleDownPolicy nodeScaleDownPolicy) { - this.cfg = cfg; - this.schedulerState = schedulerState; - this.profileManager = profileManager; - this.nodeScaleDownPolicy = nodeScaleDownPolicy; + this.schedulerState = schedulerState; + this.nodeScaleDownPolicy = nodeScaleDownPolicy; } - public void flexUpCluster(int instances, String profile) { + public void flexUpCluster(NMProfile profile, int instances) { Collection<NodeTask> nodes = new HashSet<>(); for (int i = 0; i < instances; i++) { - nodes.add(new NodeTask(profileManager.get(profile))); + nodes.add(new NodeTask(profile)); } - LOGGER.info("Adding {} instances to cluster", nodes.size()); this.schedulerState.addNodes(nodes); } - public void flexDownCluster(int numInstancesToScaleDown) { - LOGGER.info("About to flex down {} instances", numInstancesToScaleDown); - - int numScaledDown = 0; - Set<NodeTask> activeTasks = Sets.newHashSet(this.schedulerState.getActiveTasks()); + public void flexDownCluster(NMProfile profile, int numInstancesToScaleDown) { + Set<NodeTask> activeTasksForProfile = Sets.newHashSet(this.schedulerState.getActiveTasksForProfile(profile)); List<String> nodesToScaleDown = nodeScaleDownPolicy.getNodesToScaleDown(); - if (activeTasks.size() > nodesToScaleDown.size()) { - LOGGER.info("Will skip flexing down {} Node Manager instances that were launched but " + - "have not yet registered with Resource Manager.", activeTasks.size() - nodesToScaleDown.size()); - } - - // If a NM is flexed down it takes time for the RM to realize the NM is no longer up - // We need to make sure we filter out nodes that have already been flexed down - // but have not disappeared from the RM's view of the cluster - for (Iterator<String> iterator = nodesToScaleDown.iterator(); iterator.hasNext();) { - String nodeToScaleDown = iterator.next(); - boolean nodePresentInMyriad = false; - for (NodeTask nodeTask : activeTasks) { - if (nodeTask.getHostname().equals(nodeToScaleDown)) { - nodePresentInMyriad = true; - break; - } - } - if (!nodePresentInMyriad) { - iterator.remove(); - } - } + filterUnregisteredNMs(activeTasksForProfile, nodesToScaleDown); // TODO(Santosh): Make this more efficient by using a Map<HostName, NodeTask> in scheduler state + int numActiveTasksScaledDown = 0; for (int i = 0; i < numInstancesToScaleDown; i++) { - for (NodeTask nodeTask : activeTasks) { + for (NodeTask nodeTask : activeTasksForProfile) { if (nodesToScaleDown.size() > i && nodesToScaleDown.get(i).equals(nodeTask.getHostname())) { this.schedulerState.makeTaskKillable(nodeTask.getTaskStatus().getTaskId()); - numScaledDown++; + numActiveTasksScaledDown++; if (LOGGER.isDebugEnabled()) { LOGGER.debug("Marked NodeTask {} on host {} for kill.", nodeTask.getTaskStatus().getTaskId(), nodeTask.getHostname()); @@ -103,37 +73,64 @@ public class MyriadOperations { } } } - int numActiveTasksScaledDown = numScaledDown; // Flex down Staging tasks, if any - if (numScaledDown < numInstancesToScaleDown) { + int numStagingTasksScaledDown = 0; + if (numActiveTasksScaledDown < numInstancesToScaleDown) { Set<Protos.TaskID> stagingTasks = Sets.newHashSet(this.schedulerState.getStagingTaskIds()); for (Protos.TaskID taskId : stagingTasks) { - this.schedulerState.makeTaskKillable(taskId); - numScaledDown++; - if (numScaledDown == numInstancesToScaleDown) { + if (schedulerState.getTask(taskId).getProfile().getName().equals(profile.getName())) { + this.schedulerState.makeTaskKillable(taskId); + numStagingTasksScaledDown++; + if (numStagingTasksScaledDown + numActiveTasksScaledDown == numInstancesToScaleDown) { break; + } } } } - int numStagingTasksScaledDown = numScaledDown - numActiveTasksScaledDown; // Flex down Pending tasks, if any - if (numScaledDown < numInstancesToScaleDown) { - Set<Protos.TaskID> pendingTasks = Sets.newHashSet(this.schedulerState.getPendingTaskIds()); + int numPendingTasksScaledDown = 0; + if (numStagingTasksScaledDown + numActiveTasksScaledDown < numInstancesToScaleDown) { + Set<Protos.TaskID> pendingTasks = Sets.newHashSet(this.schedulerState.getPendingTaskIds()); - for (Protos.TaskID taskId : pendingTasks) { + for (Protos.TaskID taskId : pendingTasks) { + if (schedulerState.getTask(taskId).getProfile().getName().equals(profile.getName())) { this.schedulerState.makeTaskKillable(taskId); - numScaledDown++; - if (numScaledDown == numInstancesToScaleDown) { - break; + numPendingTasksScaledDown++; + if (numActiveTasksScaledDown + numStagingTasksScaledDown + numPendingTasksScaledDown + == numInstancesToScaleDown) { + break; } + } } } - int numPendingTasksScaledDown = numScaledDown - numStagingTasksScaledDown; - LOGGER.info("Flexed down {} of {} instances including {} staging instances, and {} pending instances.", - numScaledDown, numInstancesToScaleDown, numStagingTasksScaledDown, numPendingTasksScaledDown); + if (numActiveTasksScaledDown + numStagingTasksScaledDown + numPendingTasksScaledDown == 0) { + LOGGER.info("No Node Managers with profile '{}' found for scaling down.", profile.getName()); + } else { + LOGGER.info("Flexed down {} active, {} staging and {} pending Node Managers with '{}' profile.", + numActiveTasksScaledDown, numStagingTasksScaledDown, numPendingTasksScaledDown, profile.getName()); + } + } + + private void filterUnregisteredNMs(Set<NodeTask> activeTasksForProfile, List<String> registeredNMHosts) { + // If a NM is flexed down it takes time for the RM to realize the NM is no longer up + // We need to make sure we filter out nodes that have already been flexed down + // but have not disappeared from the RM's view of the cluster + for (Iterator<String> iterator = registeredNMHosts.iterator(); iterator.hasNext();) { + String nodeToScaleDown = iterator.next(); + boolean nodePresentInMyriad = false; + for (NodeTask nodeTask : activeTasksForProfile) { + if (nodeTask.getHostname().equals(nodeToScaleDown)) { + nodePresentInMyriad = true; + break; + } + } + if (!nodePresentInMyriad) { + iterator.remove(); + } } + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0acbdbdd/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java index ba2bb1a..164e7fa 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java @@ -15,14 +15,11 @@ */ package com.ebay.myriad.scheduler; -import com.ebay.myriad.configuration.MyriadConfiguration; import com.ebay.myriad.state.SchedulerState; -import com.google.common.base.Preconditions; +import javax.inject.Inject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.inject.Inject; - /** * {@link Rebalancer} is responsible for scaling registered YARN clusters as per * configured rules and policies. @@ -30,30 +27,24 @@ import javax.inject.Inject; public class Rebalancer implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(Rebalancer.class); - private MyriadConfiguration cfg; - private SchedulerState schedulerState; - private MyriadDriverManager driverManager; - private MyriadOperations myriadOperations; + private final SchedulerState schedulerState; + private final MyriadOperations myriadOperations; + private final NMProfileManager profileManager; @Inject - public Rebalancer(MyriadConfiguration cfg, SchedulerState schedulerState, - MyriadDriverManager driverManager, MyriadOperations myriadOperations) { - Preconditions.checkArgument(cfg != null); - Preconditions.checkArgument(schedulerState != null); - Preconditions.checkArgument(driverManager != null); - Preconditions.checkArgument(myriadOperations != null); - - this.cfg = cfg; + public Rebalancer(SchedulerState schedulerState, + MyriadOperations myriadOperations, + NMProfileManager profileManager) { this.schedulerState = schedulerState; - this.driverManager = driverManager; this.myriadOperations = myriadOperations; + this.profileManager = profileManager; } @Override public void run() { LOGGER.info("Active {}, Pending {}", schedulerState.getActiveTaskIds().size(), schedulerState.getPendingTaskIds().size()); if (schedulerState.getActiveTaskIds().size() < 1 && schedulerState.getPendingTaskIds().size() < 1) { - myriadOperations.flexUpCluster(1, "small"); + myriadOperations.flexUpCluster(profileManager.get("small"), 1); } // RestAdapter restAdapter = new RestAdapter.Builder() // .setEndpoint("http://" + host + ":" + port) http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0acbdbdd/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java index 07241d2..59ea547 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java @@ -74,6 +74,13 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv SchedulerDriver driver = event.getDriver(); List<Offer> offers = event.getOffers(); + // Sometimes, we see that mesos sends resource offers before Myriad receives + // a notification for "framework registration". This is a simple defensive code + // to not process any offers unless Myriad receives a "framework registered" notification. + if (schedulerState.getFrameworkID() == null) { + LOGGER.warn("Received {} offers, but not processing them since Framework ID is not yet set", offers.size()); + return; + } LOGGER.info("Received offers {}", offers.size()); LOGGER.debug("Pending tasks: {}", this.schedulerState.getPendingTaskIds()); driverOperationLock.lock(); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0acbdbdd/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java index 00cf8c4..e428a1d 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java @@ -15,6 +15,7 @@ */ package com.ebay.myriad.state; +import com.ebay.myriad.scheduler.NMProfile; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -191,7 +192,21 @@ public class SchedulerState { return Collections.unmodifiableCollection(activeNodeTasks); } - // TODO (sdaingade) Clone NodeTask + public synchronized Collection<NodeTask> getActiveTasksForProfile(NMProfile profile) { + List<NodeTask> activeNodeTasks = new ArrayList<>(); + if (CollectionUtils.isNotEmpty(activeTasks) + && CollectionUtils.isNotEmpty(tasks.values())) { + for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) { + NodeTask nodeTask = entry.getValue(); + if (activeTasks.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(profile.getName())) { + activeNodeTasks.add(nodeTask); + } + } + } + return Collections.unmodifiableCollection(activeNodeTasks); + } + + // TODO (sdaingade) Clone NodeTask public synchronized NodeTask getNodeTask(SlaveID slaveId) { for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) { if (entry.getValue().getSlaveId() != null &&