Repository: storm
Updated Branches:
  refs/heads/master 3a5ecf50f -> b8c52026f


[STORM-2134] - improving the current scheduling strategy for RAS


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d4e94379
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d4e94379
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d4e94379

Branch: refs/heads/master
Commit: d4e943794458f8177a518b68eb2efe09d48d0c6d
Parents: ce7ba75
Author: Boyang Jerry Peng <jerryp...@yahoo-inc.com>
Authored: Thu Oct 6 13:45:55 2016 -0500
Committer: Boyang Jerry Peng <jerryp...@yahoo-inc.com>
Committed: Thu Oct 6 13:45:55 2016 -0500

----------------------------------------------------------------------
 pom.xml                                         |   6 +
 storm-core/pom.xml                              |  16 +
 .../storm/scheduler/resource/ResourceUtils.java |  22 +
 .../DefaultResourceAwareStrategy.java           | 673 ++++++++++++-------
 .../resource/TestResourceAwareScheduler.java    |   1 -
 .../TestDefaultResourceAwareStrategy.java       |   8 +-
 6 files changed, 472 insertions(+), 254 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d4e94379/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 28ba98e..59966e6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -224,6 +224,7 @@
         <commons-io.version>2.5</commons-io.version>
         <commons-lang.version>2.5</commons-lang.version>
         <commons-exec.version>1.1</commons-exec.version>
+        <commons-collections.version>3.2.2</commons-collections.version>
         <commons-fileupload.version>1.3.2</commons-fileupload.version>
         <commons-codec.version>1.6</commons-codec.version>
         <commons-cli.version>1.3.1</commons-cli.version>
@@ -586,6 +587,11 @@
                 <version>${commons-exec.version}</version>
             </dependency>
             <dependency>
+                <groupId>commons-collections</groupId>
+                <artifactId>commons-collections</artifactId>
+                <version>${commons-collections.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>commons-lang</groupId>
                 <artifactId>commons-lang</artifactId>
                 <version>${commons-lang.version}</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/d4e94379/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 1220746..f09126c 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -165,6 +165,10 @@
             <scope>compile</scope>
         </dependency>
         <dependency>
+            <groupId>commons-collections</groupId>
+            <artifactId>commons-collections</artifactId>
+        </dependency>
+        <dependency>
             <groupId>commons-lang</groupId>
             <artifactId>commons-lang</artifactId>
             <scope>compile</scope>
@@ -509,6 +513,7 @@
                             <include>org.jgrapht:jgrapht-core</include>
                             <include>org.apache.commons:commons-exec</include>
                             
<include>org.apache.commons:commons-compress</include>
+                            
<include>commons-collections:commons-collections</include>
                             <include>org.apache.hadoop:hadoop-auth</include>
                             <include>commons-cli:commons-cli</include>
                             <include>commons-io:commons-io</include>
@@ -677,6 +682,10 @@
                           
<shadedPattern>org.apache.storm.shade.org.apache.commons.lang</shadedPattern>
                         </relocation>
                         <relocation>
+                            <pattern>org.apache.commons.collections</pattern>
+                            
<shadedPattern>org.apache.storm.shade.org.apache.commons.collections</shadedPattern>
+                        </relocation>
+                        <relocation>
                           <pattern>org.json.simple</pattern>
                           
<shadedPattern>org.apache.storm.shade.org.json.simple</shadedPattern>
                         </relocation>
@@ -797,6 +806,13 @@
                             </excludes>
                         </filter>
                         <filter>
+                            
<artifact>commons-collections:commons-collections</artifact>
+                            <excludes>
+                                <exclude>META-INF/LICENSE.txt</exclude>
+                                <exclude>META-INF/NOTICE.txt</exclude>
+                            </excludes>
+                        </filter>
+                        <filter>
                             <artifact>commons-io:commons-io</artifact>
                             <excludes>
                                 <exclude>META-INF/LICENSE.txt</exclude>

http://git-wip-us.apache.org/repos/asf/storm/blob/d4e94379/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceUtils.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceUtils.java 
b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceUtils.java
index 8700746..f3799af 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceUtils.java
@@ -181,4 +181,26 @@ public class ResourceUtils {
         }
         return str.toString();
     }
+
+    /**
+     * Calculate the sum of a collection of doubles
+     * @param list collection of doubles
+     * @return the sum of of collection of doubles
+     */
+    public static double sum(Collection<Double> list) {
+        double sum = 0.0;
+        for (Double elem : list) {
+            sum += elem;
+        }
+        return sum;
+    }
+
+    /**
+     * Caculate the average of a collection of doubles
+     * @param list a collection of doubles
+     * @return the average of collection of doubles
+     */
+    public static double avg(Collection<Double> list) {
+        return sum(list) / list.size();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/d4e94379/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
 
b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index 8957dc0..df3300c 100644
--- 
a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ 
b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -20,25 +20,26 @@ package 
org.apache.storm.scheduler.resource.strategies.scheduling;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Queue;
-import java.util.TreeMap;
 import java.util.HashSet;
-import java.util.Iterator;
+import java.util.Set;
 import java.util.TreeSet;
 
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.resource.RAS_Node;
 import org.apache.storm.scheduler.resource.RAS_Nodes;
+import org.apache.storm.scheduler.resource.ResourceUtils;
 import org.apache.storm.scheduler.resource.SchedulingResult;
 import org.apache.storm.scheduler.resource.SchedulingState;
 import org.apache.storm.scheduler.resource.SchedulingStatus;
+import org.apache.commons.collections.ListUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,13 +52,11 @@ public class DefaultResourceAwareStrategy implements 
IStrategy {
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultResourceAwareStrategy.class);
     private Cluster _cluster;
     private Topologies _topologies;
-    private RAS_Node refNode = null;
     private Map<String, List<String>> _clusterInfo;
     private RAS_Nodes _nodes;
 
-    private final double CPU_WEIGHT = 1.0;
-    private final double MEM_WEIGHT = 1.0;
-    private final double NETWORK_WEIGHT = 1.0;
+    private TreeSet<ObjectResources> _sortedRacks = null;
+    private Map<String, TreeSet<ObjectResources>> _rackIdToSortedNodes = new 
HashMap<String, TreeSet<ObjectResources>>();
 
     public void prepare (SchedulingState schedulingState) {
         _cluster = schedulingState.cluster;
@@ -67,29 +66,12 @@ public class DefaultResourceAwareStrategy implements 
IStrategy {
         LOG.debug(this.getClusterInfo());
     }
 
-    //the returned TreeMap keeps the Components sorted
-    private TreeMap<Integer, List<ExecutorDetails>> 
getPriorityToExecutorDetailsListMap(
-            Queue<Component> ordered__Component_list, 
Collection<ExecutorDetails> unassignedExecutors) {
-        TreeMap<Integer, List<ExecutorDetails>> retMap = new TreeMap<>();
-        Integer rank = 0;
-        for (Component ras_comp : ordered__Component_list) {
-            retMap.put(rank, new ArrayList<ExecutorDetails>());
-            for(ExecutorDetails exec : ras_comp.execs) {
-                if(unassignedExecutors.contains(exec)) {
-                    retMap.get(rank).add(exec);
-                }
-            }
-            rank++;
-        }
-        return retMap;
-    }
-
     public SchedulingResult schedule(TopologyDetails td) {
         if (_nodes.getNodes().size() <= 0) {
             LOG.warn("No available nodes to schedule tasks on!");
             return 
SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No 
available nodes to schedule tasks on!");
         }
-        Collection<ExecutorDetails> unassignedExecutors = 
_cluster.getUnassignedExecutors(td);
+        Collection<ExecutorDetails> unassignedExecutors = new 
HashSet<ExecutorDetails>(_cluster.getUnassignedExecutors(td));
         Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = 
new HashMap<>();
         LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
         Collection<ExecutorDetails> scheduledTasks = new ArrayList<>();
@@ -100,25 +82,16 @@ public class DefaultResourceAwareStrategy implements 
IStrategy {
             return 
SchedulingResult.failure(SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a 
Spout!");
         }
 
-        Queue<Component> ordered__Component_list = bfs(td, spouts);
+        //order executors to be scheduled
+        List<ExecutorDetails> orderedExecutors = orderExecutors(td, 
unassignedExecutors);
 
-        Map<Integer, List<ExecutorDetails>> priorityToExecutorMap = 
getPriorityToExecutorDetailsListMap(ordered__Component_list, 
unassignedExecutors);
         Collection<ExecutorDetails> executorsNotScheduled = new 
HashSet<>(unassignedExecutors);
-        Integer longestPriorityListSize = 
this.getLongestPriorityListSize(priorityToExecutorMap);
-        //Pick the first executor with priority one, then the 1st exec with 
priority 2, so on an so forth.
-        //Once we reach the last priority, we go back to priority 1 and 
schedule the second task with priority 1.
-        for (int i = 0; i < longestPriorityListSize; i++) {
-            for (Entry<Integer, List<ExecutorDetails>> entry : 
priorityToExecutorMap.entrySet()) {
-                Iterator<ExecutorDetails> it = entry.getValue().iterator();
-                if (it.hasNext()) {
-                    ExecutorDetails exec = it.next();
-                    LOG.debug("\n\nAttempting to schedule: {} of component {}[ 
REQ {} ] with rank {}",
-                            new Object[] { exec, 
td.getExecutorToComponent().get(exec),
-                                    td.getTaskResourceReqList(exec), 
entry.getKey() });
-                    scheduleExecutor(exec, td, schedulerAssignmentMap, 
scheduledTasks);
-                    it.remove();
-                }
-            }
+
+        for (ExecutorDetails exec : orderedExecutors) {
+            LOG.debug("\n\nAttempting to schedule: {} of component {}[ REQ {} 
]",
+                    exec, td.getExecutorToComponent().get(exec),
+                    td.getTaskResourceReqList(exec));
+            scheduleExecutor(exec, td, schedulerAssignmentMap, scheduledTasks);
         }
 
         executorsNotScheduled.removeAll(scheduledTasks);
@@ -146,6 +119,14 @@ public class DefaultResourceAwareStrategy implements 
IStrategy {
         return result;
     }
 
+    /**
+     * Schedule executor exec from topology td
+     *
+     * @param exec the executor to schedule
+     * @param td the topology executor exec is a part of
+     * @param schedulerAssignmentMap the assignments already calculated
+     * @param scheduledTasks executors that have been scheduled
+     */
     private void scheduleExecutor(ExecutorDetails exec, TopologyDetails td, 
Map<WorkerSlot,
             Collection<ExecutorDetails>> schedulerAssignmentMap, 
Collection<ExecutorDetails> scheduledTasks) {
         WorkerSlot targetSlot = this.findWorkerForExec(exec, td, 
schedulerAssignmentMap);
@@ -167,69 +148,59 @@ public class DefaultResourceAwareStrategy implements 
IStrategy {
         }
     }
 
+    /**
+     * Find a worker to schedule executor exec on
+     *
+     * @param exec the executor to schedule
+     * @param td the topology that the executor is a part of
+     * @param scheduleAssignmentMap already calculated assignments
+     * @return a worker to assign exec on.  Returns null if a worker cannot be 
successfully found in cluster
+     */
     private WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails 
td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
         WorkerSlot ws = null;
-        // first scheduling
-        if (this.refNode == null) {
-            // iterate through an ordered list of all racks available to make 
sure we cannot schedule the first executor in any rack before we "give up"
-            // the list is ordered in decreasing order of effective resources. 
With the rack in the front of the list having the most effective resources.
-            for (RackResources rack : sortRacks(td.getId())) {
-                ws = this.getBestWorker(exec, td, rack.id, 
scheduleAssignmentMap);
-                if (ws != null) {
-                    LOG.debug("best rack: {}", rack.id);
-                    break;
-                }
-            }
-        } else {
-            ws = this.getBestWorker(exec, td, scheduleAssignmentMap);
+
+        // iterate through an ordered list of all racks available to make sure 
we cannot schedule the first executor in any rack before we "give up"
+        // the list is ordered in decreasing order of effective resources. 
With the rack in the front of the list having the most effective resources.
+        if (_sortedRacks == null) {
+            _sortedRacks = sortRacks(td.getId(), scheduleAssignmentMap);
         }
-        if (ws != null) {
-            this.refNode = this.idToNode(ws.getNodeId());
+
+        for (ObjectResources rack : _sortedRacks) {
+            ws = this.getBestWorker(exec, td, rack.id, scheduleAssignmentMap);
+            if (ws != null) {
+                LOG.debug("best rack: {}", rack.id);
+                break;
+            }
         }
-        LOG.debug("reference node for the resource aware scheduler is: {}", 
this.refNode);
         return ws;
     }
 
-    private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, 
Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
-        return this.getBestWorker(exec, td, null, scheduleAssignmentMap);
-    }
-
+    /**
+     * Get the best worker to assign executor exec on a rack
+     *
+     * @param exec the executor to schedule
+     * @param td the topology that the executor is a part of
+     * @param rackId the rack id of the rack to find a worker on
+     * @param scheduleAssignmentMap already calculated assignments
+     * @return a worker to assign executor exec to. Returns null if a worker 
cannot be successfully found on rack with rackId
+     */
     private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, 
String rackId, Map<WorkerSlot, Collection<ExecutorDetails>> 
scheduleAssignmentMap) {
-        double taskMem = td.getTotalMemReqTask(exec);
-        double taskCPU = td.getTotalCpuReqTask(exec);
-        List<RAS_Node> nodes;
-        if(rackId != null) {
-            nodes = this.getAvailableNodesFromRack(rackId);
 
-        } else {
-            nodes = this.getAvailableNodes();
+        if (!_rackIdToSortedNodes.containsKey(rackId)) {
+            _rackIdToSortedNodes.put(rackId, 
sortNodes(this.getAvailableNodesFromRack(rackId), rackId, td.getId(), 
scheduleAssignmentMap));
         }
-        //First sort nodes by distance
-        TreeMap<Double, RAS_Node> nodeRankMap = new TreeMap<>();
-        for (RAS_Node n : nodes) {
-            if(n.getFreeSlots().size()>0) {
-                if (n.getAvailableMemoryResources() >= taskMem
-                        && n.getAvailableCpuResources() >= taskCPU) {
-                    double a = Math.pow(((taskCPU - 
n.getAvailableCpuResources())/(n.getAvailableCpuResources() + 1))
-                            * this.CPU_WEIGHT, 2);
-                    double b = Math.pow(((taskMem - 
n.getAvailableMemoryResources())/(n.getAvailableMemoryResources() + 1))
-                            * this.MEM_WEIGHT, 2);
-                    double c = 0.0;
-                    if(this.refNode != null) {
-                        c = Math.pow(this.distToNode(this.refNode, n)
-                                * this.NETWORK_WEIGHT, 2);
+
+        TreeSet<ObjectResources> sortedNodes = 
_rackIdToSortedNodes.get(rackId);
+
+        double taskMem = td.getTotalMemReqTask(exec);
+        double taskCPU = td.getTotalCpuReqTask(exec);
+        for (ObjectResources nodeResources : sortedNodes) {
+            RAS_Node n = _nodes.getNodeById(nodeResources.id);
+            if (n.getAvailableCpuResources() >= taskCPU && 
n.getAvailableMemoryResources() >= taskMem && n.getFreeSlots().size() > 0) {
+                for (WorkerSlot ws : n.getFreeSlots()) {
+                    if (checkWorkerConstraints(exec, ws, td, 
scheduleAssignmentMap)) {
+                        return ws;
                     }
-                    double distance = Math.sqrt(a + b + c);
-                    nodeRankMap.put(distance, n);
-                }
-            }
-        }
-        //Then, pick worker from closest node that satisfy constraints
-        for(Map.Entry<Double, RAS_Node> entry : nodeRankMap.entrySet()) {
-            RAS_Node n = entry.getValue();
-            for(WorkerSlot ws : n.getFreeSlots()) {
-                if(checkWorkerConstraints(exec, ws, td, 
scheduleAssignmentMap)) {
-                    return ws;
                 }
             }
         }
@@ -237,18 +208,40 @@ public class DefaultResourceAwareStrategy implements 
IStrategy {
     }
 
     /**
-     * class to keep track of resources on a rack
+     * interface for calculating the number of existing executors scheduled on 
a object (rack or node)
+     */
+    private interface ExistingScheduleFunc {
+        int getNumExistingSchedule(String objectId);
+    }
+
+    /**
+     * a class to contain individual object resources as well as cumulative 
stats
+     */
+    static class AllResources {
+        List<ObjectResources> objectResources = new 
LinkedList<ObjectResources>();
+        double availMemResourcesOverall = 0.0;
+        double totalMemResourcesOverall = 0.0;
+        double availCpuResourcesOverall = 0.0;
+        double totalCpuResourcesOverall = 0.0;
+        String identifier;
+
+        public AllResources(String identifier) {
+            this.identifier = identifier;
+        }
+    }
+
+    /**
+     * class to keep track of resources on a rack or node
      */
-    class RackResources {
+     static class ObjectResources {
         String id;
         double availMem = 0.0;
         double totalMem = 0.0;
         double availCpu = 0.0;
         double totalCpu = 0.0;
-        int freeSlots = 0;
-        int totalSlots = 0;
         double effectiveResources = 0.0;
-        public RackResources(String id) {
+
+        public ObjectResources(String id) {
             this.id = id;
         }
 
@@ -259,8 +252,87 @@ public class DefaultResourceAwareStrategy implements 
IStrategy {
     }
 
     /**
+     * Sorted Nodes
      *
-     * @param topoId
+     * @param availNodes            a list of all the nodes we want to sort
+     * @param rackId                the rack id availNodes are a part of
+     * @param topoId                the topology that we are trying to schedule
+     * @param scheduleAssignmentMap calculated assignments so far
+     * @return a sorted list of nodes
+     * <p>
+     * Nodes are sorted by two criteria. 1) the number executors of the 
topology that needs to be scheduled is already on the node in descending order.
+     * The reasoning to sort based on criterion 1 is so we schedule the rest 
of a topology on the same node as the existing executors of the topology.
+     * 2) the subordinate/subservient resource availability percentage of a 
node in descending order
+     * We calculate the resource availability percentage by dividing the 
resource availability on the node by the resource availability of the entire 
rack
+     * By doing this calculation, nodes that have exhausted or little of one 
of the resources mentioned above will be ranked after nodes that have more 
balanced resource availability.
+     * So we will be less likely to pick a node that have a lot of one 
resource but a low amount of another.
+     */
+    private TreeSet<ObjectResources> sortNodes(List<RAS_Node> availNodes, 
String rackId, final String topoId, final Map<WorkerSlot, 
Collection<ExecutorDetails>> scheduleAssignmentMap) {
+        AllResources allResources = new AllResources("RACK");
+        List<ObjectResources> nodes = allResources.objectResources;
+        final Map<String, String> nodeIdToRackId = new HashMap<String, 
String>();
+
+        for (RAS_Node ras_node : availNodes) {
+            String nodeId = ras_node.getId();
+            ObjectResources node = new ObjectResources(nodeId);
+
+            double availMem = ras_node.getAvailableMemoryResources();
+            double availCpu = ras_node.getAvailableCpuResources();
+            int freeSlots = ras_node.totalSlotsFree();
+            double totalMem = ras_node.getTotalMemoryResources();
+            double totalCpu = ras_node.getTotalCpuResources();
+            int totalSlots = ras_node.totalSlots();
+
+            node.availMem = availMem;
+            node.totalMem = totalMem;
+            node.availCpu = availCpu;
+            node.totalCpu = totalCpu;
+            nodes.add(node);
+
+            allResources.availMemResourcesOverall += availMem;
+            allResources.availCpuResourcesOverall += availCpu;
+
+            allResources.totalMemResourcesOverall += totalMem;
+            allResources.totalCpuResourcesOverall += totalCpu;
+        }
+
+
+        LOG.debug("Rack {}: Overall Avail [ CPU {} MEM {} ] Total [ CPU {} MEM 
{} ]",
+                rackId, allResources.availCpuResourcesOverall, 
allResources.availMemResourcesOverall, allResources.totalCpuResourcesOverall, 
allResources.totalMemResourcesOverall);
+
+        return sortObjectResources(allResources, new ExistingScheduleFunc() {
+            @Override
+            public int getNumExistingSchedule(String objectId) {
+
+                //Get execs already assigned in rack
+                Collection<ExecutorDetails> execs = new 
LinkedList<ExecutorDetails>();
+                if (_cluster.getAssignmentById(topoId) != null) {
+                    for (Map.Entry<ExecutorDetails, WorkerSlot> entry : 
_cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) {
+                        WorkerSlot workerSlot = entry.getValue();
+                        ExecutorDetails exec = entry.getKey();
+                        if (workerSlot.getNodeId().equals(objectId)) {
+                            execs.add(exec);
+                        }
+                    }
+                }
+                // get execs already scheduled in the current scheduling
+                for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry 
: scheduleAssignmentMap.entrySet()) {
+
+                    WorkerSlot workerSlot = entry.getKey();
+                    if (workerSlot.getNodeId().equals(objectId)) {
+                        execs.addAll(entry.getValue());
+                    }
+                }
+                return execs.size();
+            }
+        });
+    }
+
+    /**
+     * Sort racks
+     *
+     * @param topoId                topology id
+     * @param scheduleAssignmentMap calculated assignments so far
      * @return a sorted list of racks
      * Racks are sorted by two criteria. 1) the number executors of the 
topology that needs to be scheduled is already on the rack in descending order.
      * The reasoning to sort based on  criterion 1 is so we schedule the rest 
of a topology on the same rack as the existing executors of the topology.
@@ -269,117 +341,161 @@ public class DefaultResourceAwareStrategy implements 
IStrategy {
      * By doing this calculation, racks that have exhausted or little of one 
of the resources mentioned above will be ranked after racks that have more 
balanced resource availability.
      * So we will be less likely to pick a rack that have a lot of one 
resource but a low amount of another.
      */
-    TreeSet<RackResources> sortRacks(final String topoId) {
-        List<RackResources> racks = new LinkedList<RackResources>();
-        final Map<String, String> nodeIdToRackId = new HashMap<String, 
String>();
-        double availMemResourcesOverall = 0.0;
-        double totalMemResourcesOverall = 0.0;
-
-        double availCpuResourcesOverall = 0.0;
-        double totalCpuResourcesOverall = 0.0;
+    TreeSet<ObjectResources> sortRacks(final String topoId, final 
Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
+        AllResources allResources = new AllResources("Cluster");
+        List<ObjectResources> racks = allResources.objectResources;
 
-        int freeSlotsOverall = 0;
-        int totalSlotsOverall = 0;
+        final Map<String, String> nodeIdToRackId = new HashMap<String, 
String>();
 
-        for (Entry<String, List<String>> entry : _clusterInfo.entrySet()) {
+        for (Map.Entry<String, List<String>> entry : _clusterInfo.entrySet()) {
             String rackId = entry.getKey();
             List<String> nodeIds = entry.getValue();
-            RackResources rack = new RackResources(rackId);
+            ObjectResources rack = new ObjectResources(rackId);
             racks.add(rack);
             for (String nodeId : nodeIds) {
                 RAS_Node node = 
_nodes.getNodeById(this.NodeHostnameToId(nodeId));
                 double availMem = node.getAvailableMemoryResources();
                 double availCpu = node.getAvailableCpuResources();
-                double freeSlots = node.totalSlotsFree();
                 double totalMem = node.getTotalMemoryResources();
                 double totalCpu = node.getTotalCpuResources();
-                double totalSlots = node.totalSlots();
 
                 rack.availMem += availMem;
                 rack.totalMem += totalMem;
                 rack.availCpu += availCpu;
                 rack.totalCpu += totalCpu;
-                rack.freeSlots += freeSlots;
-                rack.totalSlots += totalSlots;
                 nodeIdToRackId.put(nodeId, rack.id);
 
-                availMemResourcesOverall += availMem;
-                availCpuResourcesOverall += availCpu;
-                freeSlotsOverall += freeSlots;
+                allResources.availMemResourcesOverall += availMem;
+                allResources.availCpuResourcesOverall += availCpu;
 
-                totalMemResourcesOverall += totalMem;
-                totalCpuResourcesOverall += totalCpu;
-                totalSlotsOverall += totalSlots;
+                allResources.totalMemResourcesOverall += totalMem;
+                allResources.totalCpuResourcesOverall += totalCpu;
             }
         }
+        LOG.debug("Cluster Overall Avail [ CPU {} MEM {} ] Total [ CPU {} MEM 
{} ]",
+                allResources.availCpuResourcesOverall, 
allResources.availMemResourcesOverall, allResources.totalCpuResourcesOverall, 
allResources.totalMemResourcesOverall);
+
+        return sortObjectResources(allResources, new ExistingScheduleFunc() {
+            @Override
+            public int getNumExistingSchedule(String objectId) {
+
+                String rackId = objectId;
+                //Get execs already assigned in rack
+                Collection<ExecutorDetails> execs = new 
LinkedList<ExecutorDetails>();
+                if (_cluster.getAssignmentById(topoId) != null) {
+                    for (Map.Entry<ExecutorDetails, WorkerSlot> entry : 
_cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) {
+                        String nodeId = entry.getValue().getNodeId();
+                        String hostname = idToNode(nodeId).getHostname();
+                        ExecutorDetails exec = entry.getKey();
+                        if (nodeIdToRackId.get(hostname) != null && 
nodeIdToRackId.get(hostname).equals(rackId)) {
+                            execs.add(exec);
+                        }
+                    }
+                }
+                // get execs already scheduled in the current scheduling
+                for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry 
: scheduleAssignmentMap.entrySet()) {
+                    WorkerSlot workerSlot = entry.getKey();
+                    String nodeId = workerSlot.getNodeId();
+                    String hostname = idToNode(nodeId).getHostname();
+                    if (nodeIdToRackId.get(hostname).equals(rackId)) {
+                        execs.addAll(entry.getValue());
+                    }
+                }
+                return execs.size();
+            }
+        });
+    }
+
+    /**
+     * Sort objects by the following two criteria. 1) the number executors of 
the topology that needs to be scheduled is already on the object (node or rack) 
in descending order.
+     * The reasoning to sort based on criterion 1 is so we schedule the rest 
of a topology on the same object (node or rack) as the existing executors of 
the topology.
+     * 2) the subordinate/subservient resource availability percentage of a 
rack in descending order
+     * We calculate the resource availability percentage by dividing the 
resource availability of the object (node or rack) by the resource availability 
of the entire rack or cluster depending on if object
+     * references a node or a rack.
+     * By doing this calculation, objects (node or rack) that have exhausted 
or little of one of the resources mentioned above will be ranked after racks 
that have more balanced resource availability.
+     * So we will be less likely to pick a rack that have a lot of one 
resource but a low amount of another.
+     *
+     * @param allResources         contains all individual ObjectResources as 
well as cumulative stats
+     * @param existingScheduleFunc a function to get existing executors 
already scheduled on this object
+     * @return a sorted list of ObjectResources
+     */
+    private TreeSet<ObjectResources> sortObjectResources(final AllResources 
allResources, final ExistingScheduleFunc existingScheduleFunc) {
 
-        LOG.debug("Cluster Overall Avail [ CPU {} MEM {} Slots {} ] Total [ 
CPU {} MEM {} Slots {} ]",
-                availCpuResourcesOverall, availMemResourcesOverall, 
freeSlotsOverall, totalCpuResourcesOverall, totalMemResourcesOverall, 
totalSlotsOverall);
-        for (RackResources rack : racks) {
-            if (availCpuResourcesOverall <= 0.0 || availMemResourcesOverall <= 
0.0 || freeSlotsOverall <= 0.0) {
-                rack.effectiveResources = 0.0;
+        for (ObjectResources objectResources : allResources.objectResources) {
+            StringBuilder sb = new StringBuilder();
+            if (allResources.availCpuResourcesOverall <= 0.0  || 
allResources.availMemResourcesOverall <= 0.0) {
+                objectResources.effectiveResources = 0.0;
             } else {
-                rack.effectiveResources = Math.min(Math.min((rack.availCpu / 
availCpuResourcesOverall), (rack.availMem / availMemResourcesOverall)), 
((double) rack.freeSlots / (double) freeSlotsOverall));
+                List<Double> values = new LinkedList<Double>();
+
+                //add cpu
+                double cpuPercent = (objectResources.availCpu / 
allResources.availCpuResourcesOverall) * 100.0;
+                values.add(cpuPercent);
+                sb.append(String.format("CPU %f(%f%%) ", 
objectResources.availCpu, cpuPercent));
+
+                //add memory
+                double memoryPercent = (objectResources.availMem / 
allResources.availMemResourcesOverall) * 100.0;
+                values.add(memoryPercent);
+                sb.append(String.format("MEM %f(%f%%) ", 
objectResources.availMem, memoryPercent));
+
+                objectResources.effectiveResources = Collections.min(values);
             }
-            LOG.debug("rack: {} Avail [ CPU {}({}%) MEM {}({}%) Slots {}({}%) 
] Total [ CPU {} MEM {} Slots {} ] effective resources: {}",
-                    rack.id, rack.availCpu, 
rack.availCpu/availCpuResourcesOverall * 100.0, rack.availMem, 
rack.availMem/availMemResourcesOverall * 100.0,
-                    rack.freeSlots, ((double) rack.freeSlots / (double) 
freeSlotsOverall) * 100.0, rack.totalCpu, rack.totalMem, rack.totalSlots, 
rack.effectiveResources);
+            LOG.debug("{}: Avail [ {} ] Total [ CPU {} MEM {}] effective 
resources: {}",
+                    objectResources.id, sb.toString(),
+                    objectResources.totalCpu, objectResources.totalMem, 
objectResources.effectiveResources);
         }
 
-        TreeSet<RackResources> sortedRacks = new TreeSet<RackResources>(new 
Comparator<RackResources>() {
+        TreeSet<ObjectResources> sortedObjectResources = new 
TreeSet<ObjectResources>(new Comparator<ObjectResources>() {
             @Override
-            public int compare(RackResources o1, RackResources o2) {
+            public int compare(ObjectResources o1, ObjectResources o2) {
 
-                int execsScheduledInRack1 = 
getExecutorsScheduledinRack(topoId, o1.id, nodeIdToRackId).size();
-                int execsScheduledInRack2 = 
getExecutorsScheduledinRack(topoId, o2.id, nodeIdToRackId).size();
-                if (execsScheduledInRack1 > execsScheduledInRack2) {
+                int execsScheduled1 = 
existingScheduleFunc.getNumExistingSchedule(o1.id);
+                int execsScheduled2 = 
existingScheduleFunc.getNumExistingSchedule(o2.id);
+                if (execsScheduled1 > execsScheduled2) {
                     return -1;
-                } else if (execsScheduledInRack1 < execsScheduledInRack2) {
+                } else if (execsScheduled1 < execsScheduled2) {
                     return 1;
                 } else {
                     if (o1.effectiveResources > o2.effectiveResources) {
                         return -1;
                     } else if (o1.effectiveResources < o2.effectiveResources) {
                         return 1;
-                    }
-                    else {
-                        return o1.id.compareTo(o2.id);
+                    } else {
+                        List<Double> o1_values = new LinkedList<Double>();
+                        List<Double> o2_values = new LinkedList<Double>();
+                        o1_values.add((o1.availCpu / 
allResources.availCpuResourcesOverall) * 100.0);
+                        o2_values.add((o2.availCpu / 
allResources.availCpuResourcesOverall) * 100.0);
+
+                        o1_values.add((o1.availMem / 
allResources.availMemResourcesOverall) * 100.0);
+                        o2_values.add((o2.availMem / 
allResources.availMemResourcesOverall) * 100.0);
+
+                        double o1_avg = ResourceUtils.avg(o1_values);
+                        double o2_avg = ResourceUtils.avg(o2_values);
+
+                        if (o1_avg > o2_avg) {
+                            return -1;
+                        } else if (o1_avg < o2_avg) {
+                            return 1;
+                        } else {
+                            return o1.id.compareTo(o2.id);
+                        }
                     }
                 }
             }
         });
-        sortedRacks.addAll(racks);
-        LOG.debug("Sorted rack: {}", sortedRacks);
-        return sortedRacks;
-    }
-
-    private Collection<ExecutorDetails> getExecutorsScheduledinRack(String 
topoId, String rackId, Map<String, String> nodeToRack) {
-        Collection<ExecutorDetails> execs = new LinkedList<ExecutorDetails>();
-        if (_cluster.getAssignmentById(topoId) != null) {
-            for (Entry<ExecutorDetails, WorkerSlot> entry : 
_cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) {
-                String nodeId = entry.getValue().getNodeId();
-                String hostname = idToNode(nodeId).getHostname();
-                ExecutorDetails exec = entry.getKey();
-                if (nodeToRack.get(hostname) != null && 
nodeToRack.get(hostname).equals(rackId)) {
-                    execs.add(exec);
-                }
-            }
-        }
-        return execs;
-    }
-
-    private Double distToNode(RAS_Node src, RAS_Node dest) {
-        if (src.getId().equals(dest.getId())) {
-            return 0.0;
-        } else if (this.nodeToRack(src).equals(this.nodeToRack(dest))) {
-            return 0.5;
-        } else {
-            return 1.0;
-        }
+        sortedObjectResources.addAll(allResources.objectResources);
+        LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
+        return sortedObjectResources;
     }
 
+    /**
+     * Get the rack on which a node is a part of
+     *
+     * @param node the node to find out which rack its on
+     * @return the rack id
+     */
     private String nodeToRack(RAS_Node node) {
-        for (Entry<String, List<String>> entry : _clusterInfo
+        for (Map.Entry<String, List<String>> entry : _clusterInfo
                 .entrySet()) {
             if (entry.getValue().contains(node.getHostname())) {
                 return entry.getKey();
@@ -389,14 +505,12 @@ public class DefaultResourceAwareStrategy implements 
IStrategy {
         return null;
     }
 
-    private List<RAS_Node> getAvailableNodes() {
-        LinkedList<RAS_Node> nodes = new LinkedList<>();
-        for (String rackId : _clusterInfo.keySet()) {
-            nodes.addAll(this.getAvailableNodesFromRack(rackId));
-        }
-        return nodes;
-    }
-
+    /**
+     * get a list nodes from a rack
+     *
+     * @param rackId the rack id of the rack to get nodes from
+     * @return a list of nodes
+     */
     private List<RAS_Node> getAvailableNodesFromRack(String rackId) {
         List<RAS_Node> retList = new ArrayList<>();
         for (String node_id : _clusterInfo.get(rackId)) {
@@ -406,59 +520,127 @@ public class DefaultResourceAwareStrategy implements 
IStrategy {
         return retList;
     }
 
-    private List<WorkerSlot> getAvailableWorkersFromRack(String rackId) {
-        List<RAS_Node> nodes = this.getAvailableNodesFromRack(rackId);
-        List<WorkerSlot> workers = new LinkedList<>();
-        for(RAS_Node node : nodes) {
-            workers.addAll(node.getFreeSlots());
-        }
-        return workers;
+    /**
+     * sort components by the number of in and out connections that need to be 
made
+     *
+     * @param componentMap The components that need to be sorted
+     * @return a sorted set of components
+     */
+    private Set<Component> sortComponents(final Map<String, Component> 
componentMap) {
+        Set<Component> sortedComponents = new TreeSet<Component>(new 
Comparator<Component>() {
+            @Override
+            public int compare(Component o1, Component o2) {
+                int connections1 = 0;
+                int connections2 = 0;
+
+                for (String childId : (List<String>) 
ListUtils.union(o1.children, o1.parents)) {
+                    connections1 += (componentMap.get(childId).execs.size() * 
o1.execs.size());
+                }
+
+                for (String childId : (List<String>) 
ListUtils.union(o2.children, o2.parents)) {
+                    connections2 += (componentMap.get(childId).execs.size() * 
o2.execs.size());
+                }
+
+                if (connections1 > connections1) {
+                    return -1;
+                } else if (connections1 < connections2) {
+                    return 1;
+                } else {
+                    return o1.id.compareTo(o2.id);
+                }
+            }
+        });
+        sortedComponents.addAll(componentMap.values());
+        return sortedComponents;
     }
 
-    private List<WorkerSlot> getAvailableWorker() {
-        List<WorkerSlot> workers = new LinkedList<>();
-        for (String rackId : _clusterInfo.keySet()) {
-            workers.addAll(this.getAvailableWorkersFromRack(rackId));
-        }
-        return workers;
+    /**
+     * Sort a component's neighbors by the number of connections it needs to 
make with this component
+     *
+     * @param thisComp     the component that we need to sort its neighbors
+     * @param componentMap all the components to sort
+     * @return a sorted set of components
+     */
+    private Set<Component> sortNeighbors(final Component thisComp, final 
Map<String, Component> componentMap) {
+        Set<Component> sortedComponents = new TreeSet<Component>(new 
Comparator<Component>() {
+            @Override
+            public int compare(Component o1, Component o2) {
+                int connections1 = o1.execs.size() * thisComp.execs.size();
+                int connections2 = o2.execs.size() * thisComp.execs.size();
+                if (connections1 > connections2) {
+                    return -1;
+                } else if (connections1 < connections2) {
+                    return 1;
+                } else {
+                    return o1.id.compareTo(o2.id);
+                }
+            }
+        });
+        sortedComponents.addAll(componentMap.values());
+        return sortedComponents;
     }
 
     /**
-     * Breadth first traversal of the topology DAG
-     * @param td
-     * @param spouts
-     * @return A partial ordering of components
+     * Order executors based on how many in and out connections it will 
potentially need to make.
+     * First order components by the number of in and out connections it will 
have.  Then iterate through the sorted list of components.
+     * For each component sort the neighbors of that component by how many 
connections it will have to make with that component.
+     * Add an executor from this component and then from each neighboring 
component in sorted order.  Do this until there is nothing left to schedule
+     *
+     * @param td                  The topology the executors belong to
+     * @param unassignedExecutors a collection of unassigned executors that 
need to be unassigned. Should only try to assign executors from this list
+     * @return a list of executors in sorted order
      */
-    private Queue<Component> bfs(TopologyDetails td, List<Component> spouts) {
-        // Since queue is a interface
-        Queue<Component> ordered__Component_list = new LinkedList<Component>();
-        HashSet<String> visited = new HashSet<>();
-
-        /* start from each spout that is not visited, each does a 
breadth-first traverse */
-        for (Component spout : spouts) {
-            if (!visited.contains(spout.id)) {
-                Queue<Component> queue = new LinkedList<>();
-                visited.add(spout.id);
-                queue.offer(spout);
-                while (!queue.isEmpty()) {
-                    Component comp = queue.poll();
-                    ordered__Component_list.add(comp);
-                    List<String> neighbors = new ArrayList<>();
-                    neighbors.addAll(comp.children);
-                    neighbors.addAll(comp.parents);
-                    for (String nbID : neighbors) {
-                        if (!visited.contains(nbID)) {
-                            Component child = td.getComponents().get(nbID);
-                            visited.add(nbID);
-                            queue.offer(child);
-                        }
-                    }
+    private List<ExecutorDetails> orderExecutors(TopologyDetails td, 
Collection<ExecutorDetails> unassignedExecutors) {
+        Map<String, Component> componentMap = td.getComponents();
+        List<ExecutorDetails> execsScheduled = new LinkedList<>();
+
+        Map<String, Queue<ExecutorDetails>> compToExecsToSchedule = new 
HashMap<>();
+        for (Component component : componentMap.values()) {
+            compToExecsToSchedule.put(component.id, new 
LinkedList<ExecutorDetails>());
+            for (ExecutorDetails exec : component.execs) {
+                if (unassignedExecutors.contains(exec)) {
+                    compToExecsToSchedule.get(component.id).add(exec);
                 }
             }
         }
-        return ordered__Component_list;
+
+        Set<Component> sortedComponents = sortComponents(componentMap);
+        sortedComponents.addAll(componentMap.values());
+
+        for (Component currComp : sortedComponents) {
+            Map<String, Component> neighbors = new HashMap<String, 
Component>();
+            for (String compId : (List<String>) 
ListUtils.union(currComp.children, currComp.parents)) {
+                neighbors.put(compId, componentMap.get(compId));
+            }
+            Set<Component> sortedNeighbors = sortNeighbors(currComp, 
neighbors);
+            Queue<ExecutorDetails> currCompExesToSched = 
compToExecsToSchedule.get(currComp.id);
+
+            boolean flag = false;
+            do {
+                flag = false;
+                if (!currCompExesToSched.isEmpty()) {
+                    execsScheduled.add(currCompExesToSched.poll());
+                    flag = true;
+                }
+
+                for (Component neighborComp : sortedNeighbors) {
+                    Queue<ExecutorDetails> neighborCompExesToSched = 
compToExecsToSchedule.get(neighborComp.id);
+                    if (!neighborCompExesToSched.isEmpty()) {
+                        execsScheduled.add(neighborCompExesToSched.poll());
+                        flag = true;
+                    }
+                }
+            } while (flag);
+        }
+        return execsScheduled;
     }
 
+    /**
+     * Get a list of all the spouts in the topology
+     *
+     * @param td topology to get spouts from
+     * @return a list of spouts
+     */
     private List<Component> getSpouts(TopologyDetails td) {
         List<Component> spouts = new ArrayList<>();
 
@@ -470,22 +652,12 @@ public class DefaultResourceAwareStrategy implements 
IStrategy {
         return spouts;
     }
 
-    private Integer getLongestPriorityListSize(Map<Integer, 
List<ExecutorDetails>> priorityToExecutorMap) {
-        Integer mostNum = 0;
-        for (List<ExecutorDetails> execs : priorityToExecutorMap.values()) {
-            Integer numExecs = execs.size();
-            if (mostNum < numExecs) {
-                mostNum = numExecs;
-            }
-        }
-        return mostNum;
-    }
-
     /**
      * Get the remaining amount memory that can be assigned to a worker given 
the set worker max heap size
-     * @param ws
-     * @param td
-     * @param scheduleAssignmentMap
+     *
+     * @param ws                    the worker to get the remaining amount of 
memory that can be assigned to it
+     * @param td                    the topology that has executors running on 
the worker
+     * @param scheduleAssignmentMap the schedulings calculated so far
      * @return The remaining amount of memory
      */
     private Double getWorkerScheduledMemoryAvailable(WorkerSlot ws, 
TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> 
scheduleAssignmentMap) {
@@ -495,16 +667,17 @@ public class DefaultResourceAwareStrategy implements 
IStrategy {
 
     /**
      * Get the amount of memory already assigned to a worker
-     * @param ws
-     * @param td
-     * @param scheduleAssignmentMap
+     *
+     * @param ws                    the worker to get the amount of memory 
assigned to a worker
+     * @param td                    the topology that has executors running on 
the worker
+     * @param scheduleAssignmentMap the schedulings calculated so far
      * @return the amount of memory
      */
     private Double getWorkerScheduledMemoryUse(WorkerSlot ws, TopologyDetails 
td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
         Double totalMem = 0.0;
         Collection<ExecutorDetails> execs = scheduleAssignmentMap.get(ws);
-        if(execs != null) {
-            for(ExecutorDetails exec : execs) {
+        if (execs != null) {
+            for (ExecutorDetails exec : execs) {
                 totalMem += td.getTotalMemReqTask(exec);
             }
         }
@@ -514,15 +687,16 @@ public class DefaultResourceAwareStrategy implements 
IStrategy {
     /**
      * Checks whether we can schedule an Executor exec on the worker slot ws
      * Only considers memory currently.  May include CPU in the future
-     * @param exec
-     * @param ws
-     * @param td
-     * @param scheduleAssignmentMap
+     *
+     * @param exec                  the executor to check whether it can be 
asssigned to worker ws
+     * @param ws                    the worker to check whether executor exec 
can be assigned to it
+     * @param td                    the topology that the exec is from
+     * @param scheduleAssignmentMap the schedulings calculated so far
      * @return a boolean: True denoting the exec can be scheduled on ws and 
false if it cannot
      */
     private boolean checkWorkerConstraints(ExecutorDetails exec, WorkerSlot 
ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> 
scheduleAssignmentMap) {
         boolean retVal = false;
-        if(this.getWorkerScheduledMemoryAvailable(ws, td, 
scheduleAssignmentMap) >= td.getTotalMemReqTask(exec)) {
+        if (this.getWorkerScheduledMemoryAvailable(ws, td, 
scheduleAssignmentMap) >= td.getTotalMemReqTask(exec)) {
             retVal = true;
         }
         return retVal;
@@ -530,14 +704,15 @@ public class DefaultResourceAwareStrategy implements 
IStrategy {
 
     /**
      * Get the amount of resources available and total for each node
+     *
      * @return a String with cluster resource info for debug
      */
     private String getClusterInfo() {
         String retVal = "Cluster info:\n";
-        for(Entry<String, List<String>> clusterEntry : 
_clusterInfo.entrySet()) {
+        for (Map.Entry<String, List<String>> clusterEntry : 
_clusterInfo.entrySet()) {
             String clusterId = clusterEntry.getKey();
             retVal += "Rack: " + clusterId + "\n";
-            for(String nodeHostname : clusterEntry.getValue()) {
+            for (String nodeHostname : clusterEntry.getValue()) {
                 RAS_Node node = 
this.idToNode(this.NodeHostnameToId(nodeHostname));
                 retVal += "-> Node: " + node.getHostname() + " " + 
node.getId() + "\n";
                 retVal += "--> Avail Resources: {Mem " + 
node.getAvailableMemoryResources() + ", CPU " + node.getAvailableCpuResources() 
+ " Slots: " + node.totalSlotsFree() + "}\n";
@@ -549,7 +724,8 @@ public class DefaultResourceAwareStrategy implements 
IStrategy {
 
     /**
      * hostname to Id
-     * @param hostname
+     *
+     * @param hostname the hostname to convert to node id
      * @return the id of a node
      */
     public String NodeHostnameToId(String hostname) {
@@ -567,12 +743,13 @@ public class DefaultResourceAwareStrategy implements 
IStrategy {
 
     /**
      * Find RAS_Node for specified node id
-     * @param id
+     *
+     * @param id the node/supervisor id to lookup
      * @return a RAS_Node object
      */
     public RAS_Node idToNode(String id) {
         RAS_Node ret = _nodes.getNodeById(id);
-        if(ret == null) {
+        if (ret == null) {
             LOG.error("Cannot find Node with Id: {}", id);
         }
         return ret;

http://git-wip-us.apache.org/repos/asf/storm/blob/d4e94379/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
 
b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index b72f2f0..054ecc2 100644
--- 
a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ 
b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -19,7 +19,6 @@
 package org.apache.storm.scheduler.resource;
 
 import org.apache.storm.Config;
-import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.ExecutorDetails;

http://git-wip-us.apache.org/repos/asf/storm/blob/d4e94379/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
 
b/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index 77f23aa..f533543 100644
--- 
a/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ 
b/storm-core/test/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -33,7 +33,7 @@ import org.apache.storm.scheduler.resource.RAS_Node;
 import org.apache.storm.scheduler.resource.RAS_Nodes;
 import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
 import org.apache.storm.scheduler.resource.SchedulingResult;
-import 
org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.RackResources;
+import 
org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.ObjectResources;
 import org.apache.storm.scheduler.resource.SchedulingState;
 import org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler;
 import org.apache.storm.scheduler.resource.User;
@@ -268,10 +268,10 @@ public class TestDefaultResourceAwareStrategy {
         DefaultResourceAwareStrategy rs = new DefaultResourceAwareStrategy();
 
         rs.prepare(new SchedulingState(new HashMap<String, User>(), cluster, 
topologies, config));
-        TreeSet<RackResources> sortedRacks= rs.sortRacks(topo1.getId());
+        TreeSet<ObjectResources> sortedRacks= rs.sortRacks(topo1.getId(), new 
HashMap<WorkerSlot, Collection<ExecutorDetails>>());
 
         Assert.assertEquals("# of racks sorted", 5, sortedRacks.size());
-        Iterator<RackResources> it = sortedRacks.iterator();
+        Iterator<ObjectResources> it = sortedRacks.iterator();
         // Ranked first since rack-0 has the most balanced set of resources
         Assert.assertEquals("rack-0 should be ordered first", "rack-0", 
it.next().id);
         // Ranked second since rack-1 has a balanced set of resources but less 
than rack-0
@@ -312,8 +312,6 @@ public class TestDefaultResourceAwareStrategy {
             cluster.assign(targetSlot, topo2.getId(), 
Arrays.asList(targetExec));
         }
 
-        topologies.getById(topo2.getId()).getTotalMemoryResourceList();
-
         rs = new DefaultResourceAwareStrategy();
         rs.prepare(new SchedulingState(new HashMap<String, User>(), cluster, 
topologies, config));
         // schedule topo2

Reply via email to