Flexdown NMs in "pending" and "staging" states before flexing down NMs in 
"active" state.


Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/72f2f3f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/72f2f3f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/72f2f3f9

Branch: refs/heads/master
Commit: 72f2f3f93612823d0e9658513d51946124559e2b
Parents: 0acbdbd
Author: Santosh Marella <smare...@maprtech.com>
Authored: Mon Sep 14 13:29:57 2015 -0700
Committer: Santosh Marella <mare...@gmail.com>
Committed: Thu Oct 15 12:56:45 2015 -0700

----------------------------------------------------------------------
 .../ebay/myriad/scheduler/MyriadOperations.java | 72 ++++++++++----------
 1 file changed, 35 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/72f2f3f9/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
index ed2adbd..84ec723 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
@@ -55,56 +55,54 @@ public class MyriadOperations {
     }
 
     public void flexDownCluster(NMProfile profile, int 
numInstancesToScaleDown) {
-        Set<NodeTask> activeTasksForProfile = 
Sets.newHashSet(this.schedulerState.getActiveTasksForProfile(profile));
-        List<String> nodesToScaleDown = 
nodeScaleDownPolicy.getNodesToScaleDown();
-        filterUnregisteredNMs(activeTasksForProfile, nodesToScaleDown);
+        // Flex down Pending tasks, if any
+        int numPendingTasksScaledDown = 0;
+          Set<Protos.TaskID> pendingTasks = 
Sets.newHashSet(this.schedulerState.getPendingTaskIds());
 
-        // TODO(Santosh): Make this more efficient by using a Map<HostName, 
NodeTask> in scheduler state
-        int numActiveTasksScaledDown = 0;
-        for (int i = 0; i < numInstancesToScaleDown; i++) {
-            for (NodeTask nodeTask : activeTasksForProfile) {
-                if (nodesToScaleDown.size() > i && 
nodesToScaleDown.get(i).equals(nodeTask.getHostname())) {
-                    
this.schedulerState.makeTaskKillable(nodeTask.getTaskStatus().getTaskId());
-                    numActiveTasksScaledDown++;
-                    if (LOGGER.isDebugEnabled()) {
-                        LOGGER.debug("Marked NodeTask {} on host {} for kill.",
-                                nodeTask.getTaskStatus().getTaskId(), 
nodeTask.getHostname());
-                    }
-                }
+          for (Protos.TaskID taskId : pendingTasks) {
+            if 
(schedulerState.getTask(taskId).getProfile().getName().equals(profile.getName()))
 {
+              this.schedulerState.makeTaskKillable(taskId);
+              numPendingTasksScaledDown++;
+              if (numPendingTasksScaledDown == numInstancesToScaleDown) {
+                break;
+              }
             }
-        }
+          }
 
         // Flex down Staging tasks, if any
         int numStagingTasksScaledDown = 0;
-        if (numActiveTasksScaledDown < numInstancesToScaleDown) {
-            Set<Protos.TaskID> stagingTasks = 
Sets.newHashSet(this.schedulerState.getStagingTaskIds());
+        if (numPendingTasksScaledDown < numInstancesToScaleDown) {
+          Set<Protos.TaskID> stagingTasks = 
Sets.newHashSet(this.schedulerState.getStagingTaskIds());
 
-            for (Protos.TaskID taskId : stagingTasks) {
-                if 
(schedulerState.getTask(taskId).getProfile().getName().equals(profile.getName()))
 {
-                  this.schedulerState.makeTaskKillable(taskId);
-                  numStagingTasksScaledDown++;
-                  if (numStagingTasksScaledDown + numActiveTasksScaledDown == 
numInstancesToScaleDown) {
-                    break;
-                  }
-                }
+          for (Protos.TaskID taskId : stagingTasks) {
+            if 
(schedulerState.getTask(taskId).getProfile().getName().equals(profile.getName()))
 {
+              this.schedulerState.makeTaskKillable(taskId);
+              numStagingTasksScaledDown++;
+              if (numStagingTasksScaledDown + numPendingTasksScaledDown == 
numInstancesToScaleDown) {
+                break;
+              }
             }
+          }
         }
 
-        // Flex down Pending tasks, if any
-        int numPendingTasksScaledDown = 0;
-        if (numStagingTasksScaledDown + numActiveTasksScaledDown < 
numInstancesToScaleDown) {
-          Set<Protos.TaskID> pendingTasks = 
Sets.newHashSet(this.schedulerState.getPendingTaskIds());
+        int numActiveTasksScaledDown = 0;
+        if (numPendingTasksScaledDown + numStagingTasksScaledDown < 
numInstancesToScaleDown) {
+          Set<NodeTask> activeTasksForProfile = 
Sets.newHashSet(this.schedulerState.getActiveTasksForProfile(profile));
+          List<String> nodesToScaleDown = 
nodeScaleDownPolicy.getNodesToScaleDown();
+          filterUnregisteredNMs(activeTasksForProfile, nodesToScaleDown);
 
-          for (Protos.TaskID taskId : pendingTasks) {
-              if 
(schedulerState.getTask(taskId).getProfile().getName().equals(profile.getName()))
 {
-                this.schedulerState.makeTaskKillable(taskId);
-                numPendingTasksScaledDown++;
-                if (numActiveTasksScaledDown + numStagingTasksScaledDown + 
numPendingTasksScaledDown
-                    == numInstancesToScaleDown) {
-                  break;
+          for (int i = 0; i < numInstancesToScaleDown - 
(numPendingTasksScaledDown + numStagingTasksScaledDown); i++) {
+            for (NodeTask nodeTask : activeTasksForProfile) {
+              if (nodesToScaleDown.size() > i && 
nodesToScaleDown.get(i).equals(nodeTask.getHostname())) {
+                
this.schedulerState.makeTaskKillable(nodeTask.getTaskStatus().getTaskId());
+                numActiveTasksScaledDown++;
+                if (LOGGER.isDebugEnabled()) {
+                  LOGGER.debug("Marked NodeTask {} on host {} for kill.",
+                      nodeTask.getTaskStatus().getTaskId(), 
nodeTask.getHostname());
                 }
               }
             }
+          }
         }
 
         if (numActiveTasksScaledDown + numStagingTasksScaledDown + 
numPendingTasksScaledDown == 0) {

Reply via email to