Flexdown NMs in "pending" and "staging" states before flexing down NMs in "active" state.
Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/72f2f3f9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/72f2f3f9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/72f2f3f9 Branch: refs/heads/master Commit: 72f2f3f93612823d0e9658513d51946124559e2b Parents: 0acbdbd Author: Santosh Marella <smare...@maprtech.com> Authored: Mon Sep 14 13:29:57 2015 -0700 Committer: Santosh Marella <mare...@gmail.com> Committed: Thu Oct 15 12:56:45 2015 -0700 ---------------------------------------------------------------------- .../ebay/myriad/scheduler/MyriadOperations.java | 72 ++++++++++---------- 1 file changed, 35 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/72f2f3f9/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java index ed2adbd..84ec723 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java @@ -55,56 +55,54 @@ public class MyriadOperations { } public void flexDownCluster(NMProfile profile, int numInstancesToScaleDown) { - Set<NodeTask> activeTasksForProfile = Sets.newHashSet(this.schedulerState.getActiveTasksForProfile(profile)); - List<String> nodesToScaleDown = nodeScaleDownPolicy.getNodesToScaleDown(); - filterUnregisteredNMs(activeTasksForProfile, nodesToScaleDown); + // Flex down Pending tasks, if any + int numPendingTasksScaledDown = 0; + Set<Protos.TaskID> pendingTasks = Sets.newHashSet(this.schedulerState.getPendingTaskIds()); - // TODO(Santosh): Make this more efficient by using a Map<HostName, NodeTask> in scheduler state - int numActiveTasksScaledDown = 0; - for (int i = 0; i < numInstancesToScaleDown; i++) { - for (NodeTask nodeTask : activeTasksForProfile) { - if (nodesToScaleDown.size() > i && nodesToScaleDown.get(i).equals(nodeTask.getHostname())) { - this.schedulerState.makeTaskKillable(nodeTask.getTaskStatus().getTaskId()); - numActiveTasksScaledDown++; - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Marked NodeTask {} on host {} for kill.", - nodeTask.getTaskStatus().getTaskId(), nodeTask.getHostname()); - } - } + for (Protos.TaskID taskId : pendingTasks) { + if (schedulerState.getTask(taskId).getProfile().getName().equals(profile.getName())) { + this.schedulerState.makeTaskKillable(taskId); + numPendingTasksScaledDown++; + if (numPendingTasksScaledDown == numInstancesToScaleDown) { + break; + } } - } + } // Flex down Staging tasks, if any int numStagingTasksScaledDown = 0; - if (numActiveTasksScaledDown < numInstancesToScaleDown) { - Set<Protos.TaskID> stagingTasks = Sets.newHashSet(this.schedulerState.getStagingTaskIds()); + if (numPendingTasksScaledDown < numInstancesToScaleDown) { + Set<Protos.TaskID> stagingTasks = Sets.newHashSet(this.schedulerState.getStagingTaskIds()); - for (Protos.TaskID taskId : stagingTasks) { - if (schedulerState.getTask(taskId).getProfile().getName().equals(profile.getName())) { - this.schedulerState.makeTaskKillable(taskId); - numStagingTasksScaledDown++; - if (numStagingTasksScaledDown + numActiveTasksScaledDown == numInstancesToScaleDown) { - break; - } - } + for (Protos.TaskID taskId : stagingTasks) { + if (schedulerState.getTask(taskId).getProfile().getName().equals(profile.getName())) { + this.schedulerState.makeTaskKillable(taskId); + numStagingTasksScaledDown++; + if (numStagingTasksScaledDown + numPendingTasksScaledDown == numInstancesToScaleDown) { + break; + } } + } } - // Flex down Pending tasks, if any - int numPendingTasksScaledDown = 0; - if (numStagingTasksScaledDown + numActiveTasksScaledDown < numInstancesToScaleDown) { - Set<Protos.TaskID> pendingTasks = Sets.newHashSet(this.schedulerState.getPendingTaskIds()); + int numActiveTasksScaledDown = 0; + if (numPendingTasksScaledDown + numStagingTasksScaledDown < numInstancesToScaleDown) { + Set<NodeTask> activeTasksForProfile = Sets.newHashSet(this.schedulerState.getActiveTasksForProfile(profile)); + List<String> nodesToScaleDown = nodeScaleDownPolicy.getNodesToScaleDown(); + filterUnregisteredNMs(activeTasksForProfile, nodesToScaleDown); - for (Protos.TaskID taskId : pendingTasks) { - if (schedulerState.getTask(taskId).getProfile().getName().equals(profile.getName())) { - this.schedulerState.makeTaskKillable(taskId); - numPendingTasksScaledDown++; - if (numActiveTasksScaledDown + numStagingTasksScaledDown + numPendingTasksScaledDown - == numInstancesToScaleDown) { - break; + for (int i = 0; i < numInstancesToScaleDown - (numPendingTasksScaledDown + numStagingTasksScaledDown); i++) { + for (NodeTask nodeTask : activeTasksForProfile) { + if (nodesToScaleDown.size() > i && nodesToScaleDown.get(i).equals(nodeTask.getHostname())) { + this.schedulerState.makeTaskKillable(nodeTask.getTaskStatus().getTaskId()); + numActiveTasksScaledDown++; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Marked NodeTask {} on host {} for kill.", + nodeTask.getTaskStatus().getTaskId(), nodeTask.getHostname()); } } } + } } if (numActiveTasksScaledDown + numStagingTasksScaledDown + numPendingTasksScaledDown == 0) {