Support profile in flexdown

Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/0acbdbdd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/0acbdbdd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/0acbdbdd

Branch: refs/heads/master
Commit: 0acbdbdd4aeecf500c43f14244def727d1be52de
Parents: afa9ac9
Author: Santosh Marella <smare...@maprtech.com>
Authored: Fri Sep 11 17:45:27 2015 -0700
Committer: Santosh Marella <mare...@gmail.com>
Committed: Thu Oct 15 12:56:45 2015 -0700

----------------------------------------------------------------------
 .../src/main/java/com/ebay/myriad/Main.java     |   3 +-
 .../com/ebay/myriad/api/ClustersResource.java   |  58 ++++++----
 .../api/model/FlexDownClusterRequest.java       |  14 ++-
 .../ebay/myriad/scheduler/MyriadOperations.java | 109 +++++++++----------
 .../com/ebay/myriad/scheduler/Rebalancer.java   |  27 ++---
 .../handlers/ResourceOffersEventHandler.java    |   7 ++
 .../com/ebay/myriad/state/SchedulerState.java   |  17 ++-
 7 files changed, 137 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0acbdbdd/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java
index 9871d58..cd8b90e 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java
@@ -185,8 +185,9 @@ public class Main {
     private void startNMInstances(Injector injector) {
       Map<String, Integer> nmInstances = 
injector.getInstance(MyriadConfiguration.class).getNmInstances();
       MyriadOperations myriadOperations = 
injector.getInstance(MyriadOperations.class);
+      NMProfileManager profileManager = 
injector.getInstance(NMProfileManager.class);
       for (Map.Entry<String, Integer> entry : nmInstances.entrySet()) {
-        myriadOperations.flexUpCluster(entry.getValue(), entry.getKey());
+        myriadOperations.flexUpCluster(profileManager.get(entry.getKey()), 
entry.getValue());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0acbdbdd/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java
index f040a44..05ba7e3 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/api/ClustersResource.java
@@ -60,25 +60,19 @@ public class ClustersResource {
     @Produces(MediaType.TEXT_PLAIN)
     @Consumes(MediaType.APPLICATION_JSON)
     public Response flexUp(FlexUpClusterRequest request) {
-        Preconditions.checkNotNull(request,
-                "request object cannot be null or empty");
+        Preconditions.checkNotNull(request, "request object cannot be null or 
empty");
 
         Integer instances = request.getInstances();
         String profile = request.getProfile();
         LOGGER.info("Received flexup request. Profile: {}, Instances: {}", 
profile, instances);
 
-        // Validate profile request
         Response.ResponseBuilder response = 
Response.status(Response.Status.ACCEPTED);
-        if (!this.profileManager.exists(profile)) {
-            response.status(Response.Status.BAD_REQUEST)
-                    .entity("Profile does not exist: '" + profile + "'");
-            LOGGER.error("Provided profile does not exist: '" + profile + "'");
-        }
-        validateInstances(instances, response);
+        boolean isValidRequest = validateProfile(profile, response);
+        isValidRequest = isValidRequest && validateInstances(instances, 
response);
 
         Response returnResponse = response.build();
         if (returnResponse.getStatus() == 
Response.Status.ACCEPTED.getStatusCode()) {
-            this.myriadOperations.flexUpCluster(instances, profile);
+            
this.myriadOperations.flexUpCluster(this.profileManager.get(profile), 
instances);
         }
 
         return returnResponse;
@@ -90,37 +84,59 @@ public class ClustersResource {
     @Produces(MediaType.TEXT_PLAIN)
     @Consumes(MediaType.APPLICATION_JSON)
     public Response flexDown(FlexDownClusterRequest request) {
-        Preconditions.checkNotNull(request,
-                "request object cannot be null or empty");
+        Preconditions.checkNotNull(request, "request object cannot be null or 
empty");
 
         Integer instances = request.getInstances();
-        LOGGER.info("Received flexdown request. Instances: {}", instances);
+        String profile = request.getProfile();
+        LOGGER.info("Received flex down request. Profile: {}, Instances: {}", 
profile, instances);
 
         Response.ResponseBuilder response = 
Response.status(Response.Status.ACCEPTED);
-        validateInstances(instances, response);
+        boolean isValidRequest = validateProfile(profile, response);
+        isValidRequest = isValidRequest && validateInstances(instances, 
response);
 
         Integer numFlexedUp = this.getNumFlexedupNMs();
-        if (numFlexedUp < instances)  {
-            String message = String.format("Number of requested instances for 
flexdown is greater than the number " +
-                "of Node Managers flexed up. Requested: %d, Flexed Up: %d. 
Only %d Node Managers " +
-                "will be flexed down", instances, numFlexedUp, numFlexedUp);
+        if (isValidRequest && numFlexedUp < instances)  {
+            String message = String.format("Number of requested instances for 
flexdown is greater than the number of " +
+                "Node Managers previously flexed up. Requested: %d, Previously 
flexed Up: %d. " +
+                "Only %d Node Managers will be flexed down", instances, 
numFlexedUp, numFlexedUp);
             response.entity(message);
             LOGGER.warn(message);
         }
 
         Response returnResponse = response.build();
         if (returnResponse.getStatus() == 
Response.Status.ACCEPTED.getStatusCode()) {
-            this.myriadOperations.flexDownCluster(instances);
+            this.myriadOperations.flexDownCluster(profileManager.get(profile), 
instances);
         }
         return returnResponse;
     }
 
-    private void validateInstances(Integer instances, ResponseBuilder 
response) {
-      if (!(instances > 0)) {
+    private boolean validateProfile(String profile, ResponseBuilder response) {
+      if (profile == null || profile.isEmpty()) {
+        response.status(Response.Status.BAD_REQUEST).entity("'profile' is null 
or empty");
+        LOGGER.error("'profile' is null or empty");
+        return false;
+      }
+      if (!this.profileManager.exists(profile)) {
+        response.status(Response.Status.BAD_REQUEST)
+            .entity("Profile does not exist: '" + profile + "'");
+        LOGGER.error("Provided profile does not exist: '" + profile + "'");
+        return false;
+      }
+      return true;
+    }
+
+    private boolean validateInstances(Integer instances, ResponseBuilder 
response) {
+      if (instances == null) {
+        response.status(Response.Status.BAD_REQUEST).entity("'instances' is 
null");
+        LOGGER.error("'instances' is null");
+        return false;
+      } else if (!(instances > 0)) {
           response.status(Response.Status.BAD_REQUEST)
                   .entity("Invalid instance size: " + instances);
           LOGGER.error("Invalid instance size request " + instances);
+        return false;
       }
+      return true;
     }
 
     private Integer getNumFlexedupNMs() {

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0acbdbdd/myriad-scheduler/src/main/java/com/ebay/myriad/api/model/FlexDownClusterRequest.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/api/model/FlexDownClusterRequest.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/api/model/FlexDownClusterRequest.java
index eaf4918..e801e65 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/api/model/FlexDownClusterRequest.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/api/model/FlexDownClusterRequest.java
@@ -24,13 +24,17 @@ import org.hibernate.validator.constraints.NotEmpty;
 public class FlexDownClusterRequest {
 
     @NotEmpty
+    public String profile;
+
+    @NotEmpty
     public Integer instances;
 
     public FlexDownClusterRequest() {
     }
 
-    public FlexDownClusterRequest(Integer instances) {
+    public FlexDownClusterRequest(String profile, Integer instances) {
         this.instances = instances;
+        this.profile = profile;
     }
 
     public Integer getInstances() {
@@ -41,6 +45,14 @@ public class FlexDownClusterRequest {
         this.instances = instances;
     }
 
+    public String getProfile() {
+      return profile;
+    }
+
+    public void setProfile(String profile) {
+      this.profile = profile;
+    }
+
     public String toString() {
         Gson gson = new Gson();
         return gson.toJson(this);

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0acbdbdd/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
index 4947647..ed2adbd 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
@@ -15,7 +15,6 @@
  */
 package com.ebay.myriad.scheduler;
 
-import com.ebay.myriad.configuration.MyriadConfiguration;
 import com.ebay.myriad.policy.NodeScaleDownPolicy;
 import com.ebay.myriad.state.NodeTask;
 import com.ebay.myriad.state.SchedulerState;
@@ -37,65 +36,36 @@ import java.util.Set;
 public class MyriadOperations {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MyriadOperations.class);
     private final SchedulerState schedulerState;
-    private MyriadConfiguration cfg;
-    private NMProfileManager profileManager;
     private NodeScaleDownPolicy nodeScaleDownPolicy;
 
     @Inject
-    public MyriadOperations(MyriadConfiguration cfg,
-                            SchedulerState schedulerState,
-                            NMProfileManager profileManager,
+    public MyriadOperations(SchedulerState schedulerState,
                             NodeScaleDownPolicy nodeScaleDownPolicy) {
-        this.cfg = cfg;
-        this.schedulerState = schedulerState;
-        this.profileManager = profileManager;
-        this.nodeScaleDownPolicy = nodeScaleDownPolicy;
+      this.schedulerState = schedulerState;
+      this.nodeScaleDownPolicy = nodeScaleDownPolicy;
     }
 
-    public void flexUpCluster(int instances, String profile) {
+    public void flexUpCluster(NMProfile profile, int instances) {
         Collection<NodeTask> nodes = new HashSet<>();
         for (int i = 0; i < instances; i++) {
-            nodes.add(new NodeTask(profileManager.get(profile)));
+            nodes.add(new NodeTask(profile));
         }
 
-        LOGGER.info("Adding {} instances to cluster", nodes.size());
         this.schedulerState.addNodes(nodes);
     }
 
-    public void flexDownCluster(int numInstancesToScaleDown) {
-        LOGGER.info("About to flex down {} instances", 
numInstancesToScaleDown);
-
-        int numScaledDown = 0;
-        Set<NodeTask> activeTasks = 
Sets.newHashSet(this.schedulerState.getActiveTasks());
+    public void flexDownCluster(NMProfile profile, int 
numInstancesToScaleDown) {
+        Set<NodeTask> activeTasksForProfile = 
Sets.newHashSet(this.schedulerState.getActiveTasksForProfile(profile));
         List<String> nodesToScaleDown = 
nodeScaleDownPolicy.getNodesToScaleDown();
-        if (activeTasks.size() > nodesToScaleDown.size()) {
-            LOGGER.info("Will skip flexing down {} Node Manager instances that 
were launched but " +
-                    "have not yet registered with Resource Manager.", 
activeTasks.size() - nodesToScaleDown.size());
-        }
-        
-        // If a NM is flexed down it takes time for the RM to realize the NM 
is no longer up
-        // We need to make sure we filter out nodes that have already been 
flexed down
-        // but have not disappeared from the RM's view of the cluster
-        for (Iterator<String> iterator = nodesToScaleDown.iterator(); 
iterator.hasNext();) {
-            String nodeToScaleDown = iterator.next();
-            boolean nodePresentInMyriad = false;
-            for (NodeTask nodeTask : activeTasks) {
-                if (nodeTask.getHostname().equals(nodeToScaleDown)) {
-                    nodePresentInMyriad = true;    
-                    break;
-                }
-            }
-            if (!nodePresentInMyriad) {
-                iterator.remove();
-            }
-        }
+        filterUnregisteredNMs(activeTasksForProfile, nodesToScaleDown);
 
         // TODO(Santosh): Make this more efficient by using a Map<HostName, 
NodeTask> in scheduler state
+        int numActiveTasksScaledDown = 0;
         for (int i = 0; i < numInstancesToScaleDown; i++) {
-            for (NodeTask nodeTask : activeTasks) {
+            for (NodeTask nodeTask : activeTasksForProfile) {
                 if (nodesToScaleDown.size() > i && 
nodesToScaleDown.get(i).equals(nodeTask.getHostname())) {
                     
this.schedulerState.makeTaskKillable(nodeTask.getTaskStatus().getTaskId());
-                    numScaledDown++;
+                    numActiveTasksScaledDown++;
                     if (LOGGER.isDebugEnabled()) {
                         LOGGER.debug("Marked NodeTask {} on host {} for kill.",
                                 nodeTask.getTaskStatus().getTaskId(), 
nodeTask.getHostname());
@@ -103,37 +73,64 @@ public class MyriadOperations {
                 }
             }
         }
-        int numActiveTasksScaledDown = numScaledDown;
 
         // Flex down Staging tasks, if any
-        if (numScaledDown < numInstancesToScaleDown) {
+        int numStagingTasksScaledDown = 0;
+        if (numActiveTasksScaledDown < numInstancesToScaleDown) {
             Set<Protos.TaskID> stagingTasks = 
Sets.newHashSet(this.schedulerState.getStagingTaskIds());
 
             for (Protos.TaskID taskId : stagingTasks) {
-                this.schedulerState.makeTaskKillable(taskId);
-                numScaledDown++;
-                if (numScaledDown == numInstancesToScaleDown) {
+                if 
(schedulerState.getTask(taskId).getProfile().getName().equals(profile.getName()))
 {
+                  this.schedulerState.makeTaskKillable(taskId);
+                  numStagingTasksScaledDown++;
+                  if (numStagingTasksScaledDown + numActiveTasksScaledDown == 
numInstancesToScaleDown) {
                     break;
+                  }
                 }
             }
         }
-        int numStagingTasksScaledDown = numScaledDown - 
numActiveTasksScaledDown;
 
         // Flex down Pending tasks, if any
-        if (numScaledDown < numInstancesToScaleDown) {
-            Set<Protos.TaskID> pendingTasks = 
Sets.newHashSet(this.schedulerState.getPendingTaskIds());
+        int numPendingTasksScaledDown = 0;
+        if (numStagingTasksScaledDown + numActiveTasksScaledDown < 
numInstancesToScaleDown) {
+          Set<Protos.TaskID> pendingTasks = 
Sets.newHashSet(this.schedulerState.getPendingTaskIds());
 
-            for (Protos.TaskID taskId : pendingTasks) {
+          for (Protos.TaskID taskId : pendingTasks) {
+              if 
(schedulerState.getTask(taskId).getProfile().getName().equals(profile.getName()))
 {
                 this.schedulerState.makeTaskKillable(taskId);
-                numScaledDown++;
-                if (numScaledDown == numInstancesToScaleDown) {
-                    break;
+                numPendingTasksScaledDown++;
+                if (numActiveTasksScaledDown + numStagingTasksScaledDown + 
numPendingTasksScaledDown
+                    == numInstancesToScaleDown) {
+                  break;
                 }
+              }
             }
         }
-        int numPendingTasksScaledDown = numScaledDown - 
numStagingTasksScaledDown;
 
-        LOGGER.info("Flexed down {} of {} instances including {} staging 
instances, and {} pending instances.",
-                numScaledDown, numInstancesToScaleDown, 
numStagingTasksScaledDown, numPendingTasksScaledDown);
+        if (numActiveTasksScaledDown + numStagingTasksScaledDown + 
numPendingTasksScaledDown == 0) {
+          LOGGER.info("No Node Managers with profile '{}' found for scaling 
down.", profile.getName());
+        } else {
+          LOGGER.info("Flexed down {} active, {} staging  and {} pending Node 
Managers with '{}' profile.",
+              numActiveTasksScaledDown, numStagingTasksScaledDown, 
numPendingTasksScaledDown, profile.getName());
+        }
+    }
+
+  private void filterUnregisteredNMs(Set<NodeTask> activeTasksForProfile, 
List<String> registeredNMHosts) {
+    // If a NM is flexed down it takes time for the RM to realize the NM is no 
longer up
+    // We need to make sure we filter out nodes that have already been flexed 
down
+    // but have not disappeared from the RM's view of the cluster
+    for (Iterator<String> iterator = registeredNMHosts.iterator(); 
iterator.hasNext();) {
+        String nodeToScaleDown = iterator.next();
+        boolean nodePresentInMyriad = false;
+        for (NodeTask nodeTask : activeTasksForProfile) {
+            if (nodeTask.getHostname().equals(nodeToScaleDown)) {
+                nodePresentInMyriad = true;
+                break;
+            }
+        }
+        if (!nodePresentInMyriad) {
+            iterator.remove();
+        }
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0acbdbdd/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java
index ba2bb1a..164e7fa 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/Rebalancer.java
@@ -15,14 +15,11 @@
  */
 package com.ebay.myriad.scheduler;
 
-import com.ebay.myriad.configuration.MyriadConfiguration;
 import com.ebay.myriad.state.SchedulerState;
-import com.google.common.base.Preconditions;
+import javax.inject.Inject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.inject.Inject;
-
 /**
  * {@link Rebalancer} is responsible for scaling registered YARN clusters as 
per
  * configured rules and policies.
@@ -30,30 +27,24 @@ import javax.inject.Inject;
 public class Rebalancer implements Runnable {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(Rebalancer.class);
 
-    private MyriadConfiguration cfg;
-    private SchedulerState schedulerState;
-    private MyriadDriverManager driverManager;
-    private MyriadOperations myriadOperations;
+    private final SchedulerState schedulerState;
+    private final MyriadOperations myriadOperations;
+    private final NMProfileManager profileManager;
 
     @Inject
-    public Rebalancer(MyriadConfiguration cfg, SchedulerState schedulerState,
-                      MyriadDriverManager driverManager, MyriadOperations 
myriadOperations) {
-        Preconditions.checkArgument(cfg != null);
-        Preconditions.checkArgument(schedulerState != null);
-        Preconditions.checkArgument(driverManager != null);
-        Preconditions.checkArgument(myriadOperations != null);
-
-        this.cfg = cfg;
+    public Rebalancer(SchedulerState schedulerState,
+                      MyriadOperations myriadOperations,
+                      NMProfileManager profileManager) {
         this.schedulerState = schedulerState;
-        this.driverManager = driverManager;
         this.myriadOperations = myriadOperations;
+        this.profileManager = profileManager;
     }
 
     @Override
     public void run() {
         LOGGER.info("Active {}, Pending {}", 
schedulerState.getActiveTaskIds().size(), 
schedulerState.getPendingTaskIds().size());
         if (schedulerState.getActiveTaskIds().size() < 1 && 
schedulerState.getPendingTaskIds().size() < 1) {
-            myriadOperations.flexUpCluster(1, "small");
+            myriadOperations.flexUpCluster(profileManager.get("small"), 1);
         }
 //            RestAdapter restAdapter = new RestAdapter.Builder()
 //                    .setEndpoint("http://"; + host + ":" + port)

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0acbdbdd/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
index 07241d2..59ea547 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
@@ -74,6 +74,13 @@ public class ResourceOffersEventHandler implements 
EventHandler<ResourceOffersEv
     SchedulerDriver driver = event.getDriver();
     List<Offer> offers = event.getOffers();
 
+    // Sometimes, we see that mesos sends resource offers before Myriad 
receives
+    // a notification for "framework registration". This is a simple defensive 
code
+    // to not process any offers unless Myriad receives a "framework 
registered" notification.
+    if (schedulerState.getFrameworkID() == null) {
+      LOGGER.warn("Received {} offers, but not processing them since Framework 
ID is not yet set", offers.size());
+      return;
+    }
     LOGGER.info("Received offers {}", offers.size());
     LOGGER.debug("Pending tasks: {}", this.schedulerState.getPendingTaskIds());
     driverOperationLock.lock();

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0acbdbdd/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
index 00cf8c4..e428a1d 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
@@ -15,6 +15,7 @@
  */
 package com.ebay.myriad.state;
 
+import com.ebay.myriad.scheduler.NMProfile;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -191,7 +192,21 @@ public class SchedulerState {
         return Collections.unmodifiableCollection(activeNodeTasks);
     }
 
-    // TODO (sdaingade) Clone NodeTask
+    public synchronized Collection<NodeTask> 
getActiveTasksForProfile(NMProfile profile) {
+      List<NodeTask> activeNodeTasks = new ArrayList<>();
+      if (CollectionUtils.isNotEmpty(activeTasks)
+          && CollectionUtils.isNotEmpty(tasks.values())) {
+        for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
+          NodeTask nodeTask = entry.getValue();
+          if (activeTasks.contains(entry.getKey()) && 
nodeTask.getProfile().getName().equals(profile.getName())) {
+            activeNodeTasks.add(nodeTask);
+          }
+        }
+      }
+      return Collections.unmodifiableCollection(activeNodeTasks);
+    }
+
+  // TODO (sdaingade) Clone NodeTask
     public synchronized NodeTask getNodeTask(SlaveID slaveId) {
         for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
             if (entry.getValue().getSlaveId() != null &&

Reply via email to