http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceNameNormalizer.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceNameNormalizer.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceNameNormalizer.java
index fc9182d..57ee348 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceNameNormalizer.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceNameNormalizer.java
@@ -54,11 +54,11 @@ public class ResourceNameNormalizer {
             return new HashMap<>();
         }
         return new HashMap<>(resourceMap.entrySet().stream()
-            .collect(Collectors.toMap(
-                //Map the key if needed
-                (e) -> resourceNameMapping.getOrDefault(e.getKey(), 
e.getKey()),
-                //Map the value
-                (e) -> e.getValue().doubleValue())));
+                                        .collect(Collectors.toMap(
+                                            //Map the key if needed
+                                            (e) -> 
resourceNameMapping.getOrDefault(e.getKey(), e.getKey()),
+                                            //Map the value
+                                            (e) -> 
e.getValue().doubleValue())));
     }
 
     public Map<String, String> getResourceNameMapping() {

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
index 8b401aa..2e1ca79 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.resource.strategies.priority;
@@ -33,12 +27,44 @@ import org.slf4j.LoggerFactory;
 public class DefaultSchedulingPriorityStrategy implements 
ISchedulingPriorityStrategy {
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultSchedulingPriorityStrategy.class);
 
+    protected SimulatedUser getSimulatedUserFor(User u, ISchedulingState 
cluster) {
+        return new SimulatedUser(u, cluster);
+    }
+
+    @Override
+    public List<TopologyDetails> getOrderedTopologies(ISchedulingState 
cluster, Map<String, User> userMap) {
+        double cpuAvail = cluster.getClusterTotalCpuResource();
+        double memAvail = cluster.getClusterTotalMemoryResource();
+
+        List<TopologyDetails> allUserTopologies = new ArrayList<>();
+        List<SimulatedUser> users = new ArrayList<>();
+        for (User u : userMap.values()) {
+            users.add(getSimulatedUserFor(u, cluster));
+        }
+        while (!users.isEmpty()) {
+            Collections.sort(users, new SimulatedUserComparator(cpuAvail, 
memAvail));
+            SimulatedUser u = users.get(0);
+            TopologyDetails td = u.getNextHighest();
+            if (td == null) {
+                users.remove(0);
+            } else {
+                double score = u.getScore(cpuAvail, memAvail);
+                td = u.simScheduleNextHighest();
+                LOG.info("SIM Scheduling {} with score of {}", td.getId(), 
score);
+                cpuAvail -= td.getTotalRequestedCpu();
+                memAvail -= (td.getTotalRequestedMemOffHeap() + 
td.getTotalRequestedMemOnHeap());
+                allUserTopologies.add(td);
+            }
+        }
+        return allUserTopologies;
+    }
+
     protected static class SimulatedUser {
+        public final double guaranteedCpu;
+        public final double guaranteedMemory;
         protected final LinkedList<TopologyDetails> tds = new LinkedList<>();
         private double assignedCpu = 0.0;
         private double assignedMemory = 0.0;
-        public final double guaranteedCpu;
-        public final double guaranteedMemory;
 
         public SimulatedUser(User other, ISchedulingState cluster) {
             
tds.addAll(cluster.getTopologies().getTopologiesOwnedBy(other.getId()));
@@ -82,8 +108,8 @@ public class DefaultSchedulingPriorityStrategy implements 
ISchedulingPriorityStr
             }
             double wouldBeCpu = assignedCpu + td.getTotalRequestedCpu();
             double wouldBeMem = assignedMemory + 
td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap();
-            double cpuScore = (wouldBeCpu - guaranteedCpu)/availableCpu;
-            double memScore = (wouldBeMem - guaranteedMemory)/availableMemory;
+            double cpuScore = (wouldBeCpu - guaranteedCpu) / availableCpu;
+            double memScore = (wouldBeMem - guaranteedMemory) / 
availableMemory;
             return Math.max(cpuScore, memScore);
         }
 
@@ -94,38 +120,6 @@ public class DefaultSchedulingPriorityStrategy implements 
ISchedulingPriorityStr
 
     }
 
-    protected SimulatedUser getSimulatedUserFor(User u, ISchedulingState 
cluster) {
-        return new SimulatedUser(u, cluster);
-    }
-
-    @Override
-    public List<TopologyDetails> getOrderedTopologies(ISchedulingState 
cluster, Map<String, User> userMap) {
-        double cpuAvail = cluster.getClusterTotalCpuResource();
-        double memAvail = cluster.getClusterTotalMemoryResource();
-
-        List<TopologyDetails> allUserTopologies = new ArrayList<>();
-        List<SimulatedUser> users = new ArrayList<>();
-        for (User u : userMap.values()) {
-            users.add(getSimulatedUserFor(u, cluster));
-        }
-        while (!users.isEmpty()) {
-            Collections.sort(users, new SimulatedUserComparator(cpuAvail, 
memAvail));
-            SimulatedUser u = users.get(0);
-            TopologyDetails td = u.getNextHighest();
-            if (td == null) {
-                users.remove(0);
-            } else {
-                double score = u.getScore(cpuAvail, memAvail);
-                td = u.simScheduleNextHighest();
-                LOG.info("SIM Scheduling {} with score of {}", td.getId(), 
score);
-                cpuAvail -= td.getTotalRequestedCpu();
-                memAvail -= (td.getTotalRequestedMemOffHeap() + 
td.getTotalRequestedMemOnHeap());
-                allUserTopologies.add(td);
-            }
-        }
-        return allUserTopologies;
-    }
-
     private static class SimulatedUserComparator implements 
Comparator<SimulatedUser> {
 
         private final double cpuAvail;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/FIFOSchedulingPriorityStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/FIFOSchedulingPriorityStrategy.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/FIFOSchedulingPriorityStrategy.java
index 1c76b8d..98b3f4e 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/FIFOSchedulingPriorityStrategy.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/FIFOSchedulingPriorityStrategy.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.resource.strategies.priority;
@@ -29,6 +23,11 @@ import org.slf4j.LoggerFactory;
 public class FIFOSchedulingPriorityStrategy extends 
DefaultSchedulingPriorityStrategy {
     private static final Logger LOG = 
LoggerFactory.getLogger(FIFOSchedulingPriorityStrategy.class);
 
+    @Override
+    protected SimulatedUser getSimulatedUserFor(User u, ISchedulingState 
cluster) {
+        return new FIFOSimulatedUser(u, cluster);
+    }
+
     protected static class FIFOSimulatedUser extends SimulatedUser {
 
         public FIFOSimulatedUser(User other, ISchedulingState cluster) {
@@ -54,11 +53,6 @@ public class FIFOSchedulingPriorityStrategy extends 
DefaultSchedulingPriorityStr
         }
     }
 
-    @Override
-    protected SimulatedUser getSimulatedUserFor(User u, ISchedulingState 
cluster) {
-        return new FIFOSimulatedUser(u, cluster);
-    }
-
     /**
      * Comparator that sorts topologies by priority and then by submission time
      * First sort by Topology Priority, if there is a tie for topology 
priority, topology uptime is used to sort

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
index 77ce0d9..8bb50af 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
@@ -1,26 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.resource.strategies.priority;
 
 import java.util.List;
 import java.util.Map;
-
 import org.apache.storm.scheduler.ISchedulingState;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.resource.User;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index bd4d3ce..a474ccc 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -20,12 +20,10 @@ package 
org.apache.storm.scheduler.resource.strategies.scheduling;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -47,8 +45,8 @@ import org.slf4j.LoggerFactory;
 public abstract class BaseResourceAwareStrategy implements IStrategy {
     private static final Logger LOG = 
LoggerFactory.getLogger(BaseResourceAwareStrategy.class);
     protected Cluster cluster;
-    private Map<String, List<String>> networkTopography;
     protected RAS_Nodes nodes;
+    private Map<String, List<String>> networkTopography;
 
     @VisibleForTesting
     void prepare(Cluster cluster) {
@@ -66,42 +64,43 @@ public abstract class BaseResourceAwareStrategy implements 
IStrategy {
     /**
      * Schedule executor exec from topology td.
      *
-     * @param exec the executor to schedule
-     * @param td the topology executor exec is a part of
+     * @param exec           the executor to schedule
+     * @param td             the topology executor exec is a part of
      * @param scheduledTasks executors that have been scheduled
      */
     protected void scheduleExecutor(
-            ExecutorDetails exec, TopologyDetails td, 
Collection<ExecutorDetails> scheduledTasks, List<ObjectResources> sortedNodes) {
+        ExecutorDetails exec, TopologyDetails td, Collection<ExecutorDetails> 
scheduledTasks, List<ObjectResources> sortedNodes) {
         WorkerSlot targetSlot = findWorkerForExec(exec, td, sortedNodes);
         if (targetSlot != null) {
             RAS_Node targetNode = idToNode(targetSlot.getNodeId());
             targetNode.assignSingleExecutor(targetSlot, exec, td);
             scheduledTasks.add(exec);
             LOG.debug(
-                    "TASK {} assigned to Node: {} avail [ mem: {} cpu: {} ] 
total [ mem: {} cpu: {} ] on "
-                            + "slot: {} on Rack: {}",
-                    exec,
-                    targetNode.getHostname(),
-                    targetNode.getAvailableMemoryResources(),
-                    targetNode.getAvailableCpuResources(),
-                    targetNode.getTotalMemoryResources(),
-                    targetNode.getTotalCpuResources(),
-                    targetSlot,
-                    nodeToRack(targetNode));
+                "TASK {} assigned to Node: {} avail [ mem: {} cpu: {} ] total 
[ mem: {} cpu: {} ] on "
+                + "slot: {} on Rack: {}",
+                exec,
+                targetNode.getHostname(),
+                targetNode.getAvailableMemoryResources(),
+                targetNode.getAvailableCpuResources(),
+                targetNode.getTotalMemoryResources(),
+                targetNode.getTotalCpuResources(),
+                targetSlot,
+                nodeToRack(targetNode));
         } else {
             LOG.error("Not Enough Resources to schedule Task {}", exec);
         }
     }
 
     protected abstract TreeSet<ObjectResources> sortObjectResources(
-            final AllResources allResources, ExecutorDetails exec, 
TopologyDetails topologyDetails,
-            final ExistingScheduleFunc existingScheduleFunc
+        final AllResources allResources, ExecutorDetails exec, TopologyDetails 
topologyDetails,
+        final ExistingScheduleFunc existingScheduleFunc
     );
+
     /**
      * Find a worker to schedule executor exec on.
      *
      * @param exec the executor to schedule
-     * @param td the topology that the executor is a part of
+     * @param td   the topology that the executor is a part of
      * @return a worker to assign exec on. Returns null if a worker cannot be 
successfully found in cluster
      */
     protected WorkerSlot findWorkerForExec(ExecutorDetails exec, 
TopologyDetails td, List<ObjectResources> sortedNodes) {
@@ -117,98 +116,24 @@ public abstract class BaseResourceAwareStrategy 
implements IStrategy {
     }
 
     /**
-     * interface for calculating the number of existing executors scheduled on 
a object (rack or
-     * node).
-     */
-    protected interface ExistingScheduleFunc {
-        int getNumExistingSchedule(String objectId);
-    }
-
-    /**
-     * a class to contain individual object resources as well as cumulative 
stats.
-     */
-    static class AllResources {
-        List<ObjectResources> objectResources = new LinkedList<>();
-        NormalizedResourceOffer availableResourcesOverall = new 
NormalizedResourceOffer();
-        NormalizedResourceOffer totalResourcesOverall = new 
NormalizedResourceOffer();
-        String identifier;
-
-        public AllResources(String identifier) {
-            this.identifier = identifier;
-        }
-
-        public AllResources(AllResources other) {
-            this (null,
-                new NormalizedResourceOffer(other.availableResourcesOverall),
-                new NormalizedResourceOffer(other.totalResourcesOverall),
-                other.identifier);
-            List<ObjectResources> objectResourcesList = new ArrayList<>();
-            for (ObjectResources objectResource : other.objectResources) {
-                objectResourcesList.add(new ObjectResources(objectResource));
-            }
-            this.objectResources = objectResourcesList;
-        }
-
-        public AllResources(List<ObjectResources> objectResources, 
NormalizedResourceOffer availableResourcesOverall,
-                            NormalizedResourceOffer totalResourcesOverall, 
String identifier) {
-            this.objectResources = objectResources;
-            this.availableResourcesOverall = availableResourcesOverall;
-            this.totalResourcesOverall = totalResourcesOverall;
-            this.identifier = identifier;
-        }
-    }
-
-    /**
-     * class to keep track of resources on a rack or node.
-     */
-    static class ObjectResources {
-        public final String id;
-        public NormalizedResourceOffer availableResources = new 
NormalizedResourceOffer();
-        public NormalizedResourceOffer totalResources = new 
NormalizedResourceOffer();
-        public double effectiveResources = 0.0;
-
-        public ObjectResources(String id) {
-            this.id = id;
-        }
-
-        public ObjectResources(ObjectResources other) {
-            this(other.id, other.availableResources, other.totalResources, 
other.effectiveResources);
-        }
-
-        public ObjectResources(String id, NormalizedResourceOffer 
availableResources, NormalizedResourceOffer totalResources,
-                               double effectiveResources) {
-            this.id = id;
-            this.availableResources = availableResources;
-            this.totalResources = totalResources;
-            this.effectiveResources = effectiveResources;
-        }
-
-        @Override
-        public String toString() {
-            return this.id;
-        }
-    }
-
-    /**
      * Nodes are sorted by two criteria.
      *
      * <p>1) the number executors of the topology that needs to be scheduled 
is already on the node in
-     * descending order. The reasoning to sort based on criterion 1 is so we 
schedule the rest of a
-     * topology on the same node as the existing executors of the topology.
+     * descending order. The reasoning to sort based on criterion 1 is so we 
schedule the rest of a topology on the same node as the
+     * existing executors of the topology.
      *
      * <p>2) the subordinate/subservient resource availability percentage of a 
node in descending
-     * order We calculate the resource availability percentage by dividing the 
resource availability
-     * that have exhausted or little of one of the resources mentioned above 
will be ranked after
-     * on the node by the resource availability of the entire rack By doing 
this calculation, nodes
-     * nodes that have more balanced resource availability. So we will be less 
likely to pick a node
-     * that have a lot of one resource but a low amount of another.
+     * order We calculate the resource availability percentage by dividing the 
resource availability that have exhausted or little of one of
+     * the resources mentioned above will be ranked after on the node by the 
resource availability of the entire rack By doing this
+     * calculation, nodes nodes that have more balanced resource availability. 
So we will be less likely to pick a node that have a lot of
+     * one resource but a low amount of another.
      *
      * @param availNodes a list of all the nodes we want to sort
-     * @param rackId the rack id availNodes are a part of
+     * @param rackId     the rack id availNodes are a part of
      * @return a sorted list of nodes.
      */
     protected TreeSet<ObjectResources> sortNodes(
-            List<RAS_Node> availNodes, ExecutorDetails exec, TopologyDetails 
topologyDetails, String rackId) {
+        List<RAS_Node> availNodes, ExecutorDetails exec, TopologyDetails 
topologyDetails, String rackId) {
         AllResources allResources = new AllResources("RACK");
         List<ObjectResources> nodes = allResources.objectResources;
 
@@ -264,7 +189,7 @@ public abstract class BaseResourceAwareStrategy implements 
IStrategy {
         for (ObjectResources rack : sortedRacks) {
             final String rackId = rack.id;
             TreeSet<ObjectResources> sortedNodes = sortNodes(
-                    getAvailableNodesFromRack(rackId), exec, td, rackId);
+                getAvailableNodesFromRack(rackId), exec, td, rackId);
             totallySortedNodes.addAll(sortedNodes);
         }
         //Now do some post processing to add make some nodes preferred over 
others.
@@ -304,14 +229,14 @@ public abstract class BaseResourceAwareStrategy 
implements IStrategy {
      * Racks are sorted by two criteria.
      *
      * <p>1) the number executors of the topology that needs to be scheduled 
is already on the rack in descending order.
-     * The reasoning to sort based on criterion 1 is so we schedule the rest 
of a topology on the same rack as the
-     * existing executors of the topology.
+     * The reasoning to sort based on criterion 1 is so we schedule the rest 
of a topology on the same rack as the existing executors of the
+     * topology.
      *
      * <p>2) the subordinate/subservient resource availability percentage of a 
rack in descending order We calculate
-     * the resource availability percentage by dividing the resource 
availability on the rack by the resource
-     * availability of the  entire cluster By doing this calculation, racks 
that have exhausted or little of one of
-     * the resources mentioned above will be ranked after racks that have more 
balanced resource availability. So we
-     * will be less likely to pick a rack that have a lot of one resource but 
a low amount of another.
+     * the resource availability percentage by dividing the resource 
availability on the rack by the resource availability of the  entire
+     * cluster By doing this calculation, racks that have exhausted or little 
of one of the resources mentioned above will be ranked after
+     * racks that have more balanced resource availability. So we will be less 
likely to pick a rack that have a lot of one resource but a
+     * low amount of another.
      *
      * @return a sorted list of racks
      */
@@ -369,7 +294,6 @@ public abstract class BaseResourceAwareStrategy implements 
IStrategy {
             });
     }
 
-
     /**
      * Get the rack on which a node is a part of.
      *
@@ -437,7 +361,7 @@ public abstract class BaseResourceAwareStrategy implements 
IStrategy {
     /**
      * Sort a component's neighbors by the number of connections it needs to 
make with this component.
      *
-     * @param thisComp the component that we need to sort its neighbors
+     * @param thisComp     the component that we need to sort its neighbors
      * @param componentMap all the components to sort
      * @return a sorted set of components
      */
@@ -460,15 +384,14 @@ public abstract class BaseResourceAwareStrategy 
implements IStrategy {
     }
 
     /**
-     * Order executors based on how many in and out connections it will 
potentially need to make, in descending order.
-     * First order components by the number of in and out connections it will 
have.  Then iterate through the sorted list of components.
-     * For each component sort the neighbors of that component by how many 
connections it will have to make with that component.
-     * Add an executor from this component and then from each neighboring 
component in sorted order.
-     * Do this until there is nothing left to schedule.
+     * Order executors based on how many in and out connections it will 
potentially need to make, in descending order. First order
+     * components by the number of in and out connections it will have.  Then 
iterate through the sorted list of components. For each
+     * component sort the neighbors of that component by how many connections 
it will have to make with that component. Add an executor from
+     * this component and then from each neighboring component in sorted 
order. Do this until there is nothing left to schedule.
      *
-     * @param td The topology the executors belong to
-     * @param unassignedExecutors a collection of unassigned executors that 
need to be unassigned. Should only try to
-     *     assign executors from this list
+     * @param td                  The topology the executors belong to
+     * @param unassignedExecutors a collection of unassigned executors that 
need to be unassigned. Should only try to assign executors from
+     *                            this list
      * @return a list of executors in sorted order
      */
     protected List<ExecutorDetails> orderExecutors(
@@ -596,4 +519,76 @@ public abstract class BaseResourceAwareStrategy implements 
IStrategy {
         }
         return ret;
     }
+
+    /**
+     * interface for calculating the number of existing executors scheduled on 
a object (rack or node).
+     */
+    protected interface ExistingScheduleFunc {
+        int getNumExistingSchedule(String objectId);
+    }
+
+    /**
+     * a class to contain individual object resources as well as cumulative 
stats.
+     */
+    static class AllResources {
+        List<ObjectResources> objectResources = new LinkedList<>();
+        NormalizedResourceOffer availableResourcesOverall = new 
NormalizedResourceOffer();
+        NormalizedResourceOffer totalResourcesOverall = new 
NormalizedResourceOffer();
+        String identifier;
+
+        public AllResources(String identifier) {
+            this.identifier = identifier;
+        }
+
+        public AllResources(AllResources other) {
+            this(null,
+                 new NormalizedResourceOffer(other.availableResourcesOverall),
+                 new NormalizedResourceOffer(other.totalResourcesOverall),
+                 other.identifier);
+            List<ObjectResources> objectResourcesList = new ArrayList<>();
+            for (ObjectResources objectResource : other.objectResources) {
+                objectResourcesList.add(new ObjectResources(objectResource));
+            }
+            this.objectResources = objectResourcesList;
+        }
+
+        public AllResources(List<ObjectResources> objectResources, 
NormalizedResourceOffer availableResourcesOverall,
+                            NormalizedResourceOffer totalResourcesOverall, 
String identifier) {
+            this.objectResources = objectResources;
+            this.availableResourcesOverall = availableResourcesOverall;
+            this.totalResourcesOverall = totalResourcesOverall;
+            this.identifier = identifier;
+        }
+    }
+
+    /**
+     * class to keep track of resources on a rack or node.
+     */
+    static class ObjectResources {
+        public final String id;
+        public NormalizedResourceOffer availableResources = new 
NormalizedResourceOffer();
+        public NormalizedResourceOffer totalResources = new 
NormalizedResourceOffer();
+        public double effectiveResources = 0.0;
+
+        public ObjectResources(String id) {
+            this.id = id;
+        }
+
+        public ObjectResources(ObjectResources other) {
+            this(other.id, other.availableResources, other.totalResources, 
other.effectiveResources);
+        }
+
+        public ObjectResources(String id, NormalizedResourceOffer 
availableResources, NormalizedResourceOffer totalResources,
+                               double effectiveResources) {
+            this.id = id;
+            this.availableResources = availableResources;
+            this.totalResources = totalResources;
+            this.effectiveResources = effectiveResources;
+        }
+
+        @Override
+        public String toString() {
+            return this.id;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
index f790050..12f89ca 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.resource.strategies.scheduling;
@@ -47,155 +41,201 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
+    //hard coded max number of states to search
+    public static final int MAX_STATE_SEARCH = 100_000;
+    public static final int DEFAULT_STATE_SEARCH = 10_000;
     private static final Logger LOG = 
LoggerFactory.getLogger(ConstraintSolverStrategy.class);
+    private Map<String, RAS_Node> nodes;
+    private Map<ExecutorDetails, String> execToComp;
+    private Map<String, Set<ExecutorDetails>> compToExecs;
+    private List<String> favoredNodes;
+    private List<String> unFavoredNodes;
 
-    protected static class SolverResult {
-        private final int statesSearched;
-        private final boolean success;
-        private final long timeTakenMillis;
-        private final int backtracked;
+    //constraints and spreads
+    private Map<String, Map<String, Integer>> constraintMatrix;
+    private HashSet<String> spreadComps = new HashSet<>();
 
-        public SolverResult(SearcherState state, boolean success) {
-            this.statesSearched = state.getStatesSearched();
-            this.success = success;
-            timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis;
-            backtracked = state.numBacktrack;
+    static Map<String, Map<String, Integer>> getConstraintMap(TopologyDetails 
topo, Set<String> comps) {
+        Map<String, Map<String, Integer>> matrix = new HashMap<>();
+        for (String comp : comps) {
+            matrix.put(comp, new HashMap<>());
+            for (String comp2 : comps) {
+                matrix.get(comp).put(comp2, 0);
+            }
         }
-
-        public SchedulingResult asSchedulingResult() {
-            if (success) {
-                return SchedulingResult.success("Fully Scheduled by 
ConstraintSolverStrategy (" + statesSearched
-                    + " states traversed in " + timeTakenMillis + "ms, 
backtracked " + backtracked + " times)");
+        List<List<String>> constraints = (List<List<String>>) 
topo.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINTS);
+        if (constraints != null) {
+            for (List<String> constraintPair : constraints) {
+                String comp1 = constraintPair.get(0);
+                String comp2 = constraintPair.get(1);
+                if (!matrix.containsKey(comp1)) {
+                    LOG.warn("Comp: {} declared in constraints is not valid!", 
comp1);
+                    continue;
+                }
+                if (!matrix.containsKey(comp2)) {
+                    LOG.warn("Comp: {} declared in constraints is not valid!", 
comp2);
+                    continue;
+                }
+                matrix.get(comp1).put(comp2, 1);
+                matrix.get(comp2).put(comp1, 1);
             }
-            return 
SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
-                "Cannot find scheduling that satisfies all constraints (" + 
statesSearched
-                    + " states traversed in " + timeTakenMillis + "ms, 
backtracked " + backtracked + " times)");
         }
+        return matrix;
     }
 
-    protected static class SearcherState {
-        // Metrics
-        // How many states searched so far.
-        private int statesSearched = 0;
-        // Number of times we had to backtrack.
-        private int numBacktrack = 0;
-        final long startTimeMillis;
-        private final long maxEndTimeMs;
-
-        // Current state
-        // The current executor we are trying to schedule
-        private int execIndex = 0;
-        // A map of the worker to the components in the worker to be able to 
enforce constraints.
-        private final Map<WorkerSlot, Set<String>> workerCompAssignment;
-        private final boolean[] okToRemoveFromWorker;
-        // for the currently tested assignment a Map of the node to the 
components on it to be able to enforce constraints
-        private final Map<RAS_Node, Set<String>> nodeCompAssignment;
-        private final boolean[] okToRemoveFromNode;
-
-        // Static State
-        // The list of all executors (preferably sorted to make assignments 
simpler).
-        private final List<ExecutorDetails> execs;
-        //The maximum number of state to search before stopping.
-        private final int maxStatesSearched;
-        //The topology we are scheduling
-        private final TopologyDetails td;
+    /**
+     * Determines if a scheduling is valid and all constraints are satisfied.
+     */
+    @VisibleForTesting
+    public static boolean validateSolution(Cluster cluster, TopologyDetails 
td) {
+        return checkSpreadSchedulingValid(cluster, td)
+               && checkConstraintsSatisfied(cluster, td)
+               && checkResourcesCorrect(cluster, td);
+    }
 
-        private SearcherState(Map<WorkerSlot, Set<String>> 
workerCompAssignment, Map<RAS_Node, Set<String>> nodeCompAssignment,
-                              int maxStatesSearched, long maxTimeMs, 
List<ExecutorDetails> execs, TopologyDetails td) {
-            assert !execs.isEmpty();
-            assert execs != null;
+    /**
+     * Check if constraints are satisfied.
+     */
+    private static boolean checkConstraintsSatisfied(Cluster cluster, 
TopologyDetails topo) {
+        LOG.info("Checking constraints...");
+        Map<ExecutorDetails, WorkerSlot> result = 
cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
+        Map<ExecutorDetails, String> execToComp = 
topo.getExecutorToComponent();
+        //get topology constraints
+        Map<String, Map<String, Integer>> constraintMatrix = 
getConstraintMap(topo, new HashSet<>(topo.getExecutorToComponent().values()));
 
-            this.workerCompAssignment = workerCompAssignment;
-            this.nodeCompAssignment = nodeCompAssignment;
-            this.maxStatesSearched = maxStatesSearched;
-            this.execs = execs;
-            okToRemoveFromWorker = new boolean[execs.size()];
-            okToRemoveFromNode = new boolean[execs.size()];
-            this.td = td;
-            startTimeMillis = Time.currentTimeMillis();
-            if (maxTimeMs <= 0) {
-                maxEndTimeMs = Long.MAX_VALUE;
-            } else {
-                maxEndTimeMs = startTimeMillis + maxTimeMs;
+        Map<WorkerSlot, Set<String>> workerCompMap = new HashMap<>();
+        result.forEach((exec, worker) -> {
+            String comp = execToComp.get(exec);
+            workerCompMap.computeIfAbsent(worker, (k) -> new 
HashSet<>()).add(comp);
+        });
+        for (Map.Entry<WorkerSlot, Set<String>> entry : 
workerCompMap.entrySet()) {
+            Set<String> comps = entry.getValue();
+            for (String comp1 : comps) {
+                for (String comp2 : comps) {
+                    if (!comp1.equals(comp2) && 
constraintMatrix.get(comp1).get(comp2) != 0) {
+                        LOG.error("Incorrect Scheduling: worker exclusion for 
Component {} and {} not satisfied on WorkerSlot: {}",
+                                  comp1, comp2, entry.getKey());
+                        return false;
+                    }
+                }
             }
         }
+        return true;
+    }
 
-        public void incStatesSearched() {
-            statesSearched++;
-            if (LOG.isDebugEnabled() && statesSearched % 1_000 == 0) {
-                LOG.debug("States Searched: {}", statesSearched);
-                LOG.debug("backtrack: {}", numBacktrack);
+    private static Map<WorkerSlot, RAS_Node> workerToNodes(Cluster cluster) {
+        Map<WorkerSlot, RAS_Node> workerToNodes = new HashMap<>();
+        for (RAS_Node node : RAS_Nodes.getAllNodesFrom(cluster).values()) {
+            for (WorkerSlot s : node.getUsedSlots()) {
+                workerToNodes.put(s, node);
             }
         }
+        return workerToNodes;
+    }
 
-        public int getStatesSearched() {
-            return statesSearched;
-        }
+    private static boolean checkSpreadSchedulingValid(Cluster cluster, 
TopologyDetails topo) {
+        LOG.info("Checking for a valid scheduling...");
+        Map<ExecutorDetails, WorkerSlot> result = 
cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
+        Map<ExecutorDetails, String> execToComp = 
topo.getExecutorToComponent();
+        Map<WorkerSlot, HashSet<ExecutorDetails>> workerExecMap = new 
HashMap<>();
+        Map<WorkerSlot, HashSet<String>> workerCompMap = new HashMap<>();
+        Map<RAS_Node, HashSet<String>> nodeCompMap = new HashMap<>();
+        Map<WorkerSlot, RAS_Node> workerToNodes = workerToNodes(cluster);
+        boolean ret = true;
 
-        public boolean areSearchLimitsExceeded() {
-            return statesSearched > maxStatesSearched || 
Time.currentTimeMillis() > maxEndTimeMs;
-        }
+        HashSet<String> spreadComps = getSpreadComps(topo);
+        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : result.entrySet()) 
{
+            ExecutorDetails exec = entry.getKey();
+            WorkerSlot worker = entry.getValue();
+            RAS_Node node = workerToNodes.get(worker);
 
-        public SearcherState nextExecutor() {
-            execIndex++;
-            if (execIndex >= execs.size()) {
-                throw new IllegalStateException("Internal Error: exceeded the 
exec limit " + execIndex + " >= " + execs.size());
+            if (workerExecMap.computeIfAbsent(worker, (k) -> new 
HashSet<>()).contains(exec)) {
+                LOG.error("Incorrect Scheduling: Found duplicate in 
scheduling");
+                return false;
             }
-            return this;
+            workerExecMap.get(worker).add(exec);
+            String comp = execToComp.get(exec);
+            workerCompMap.computeIfAbsent(worker, (k) -> new 
HashSet<>()).add(comp);
+            if (spreadComps.contains(comp)) {
+                if (nodeCompMap.computeIfAbsent(node, (k) -> new 
HashSet<>()).contains(comp)) {
+                    LOG.error("Incorrect Scheduling: Spread for Component: {} 
{} on node {} not satisfied {}",
+                              comp, exec, node.getId(), nodeCompMap.get(node));
+                    ret = false;
+                }
+            }
+            nodeCompMap.computeIfAbsent(node, (k) -> new 
HashSet<>()).add(comp);
         }
+        return ret;
+    }
 
-        public boolean areAllExecsScheduled() {
-            return execIndex == execs.size() - 1;
+    /**
+     * Check if resource constraints satisfied.
+     */
+    private static boolean checkResourcesCorrect(Cluster cluster, 
TopologyDetails topo) {
+        LOG.info("Checking Resources...");
+        Map<ExecutorDetails, WorkerSlot> result = 
cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
+        Map<RAS_Node, Collection<ExecutorDetails>> nodeToExecs = new 
HashMap<>();
+        Map<ExecutorDetails, WorkerSlot> mergedExecToWorker = new HashMap<>();
+        Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster);
+        //merge with existing assignments
+        if (cluster.getAssignmentById(topo.getId()) != null
+            && cluster.getAssignmentById(topo.getId()).getExecutorToSlot() != 
null) {
+            
mergedExecToWorker.putAll(cluster.getAssignmentById(topo.getId()).getExecutorToSlot());
         }
+        mergedExecToWorker.putAll(result);
 
-        public ExecutorDetails currentExec() {
-            return execs.get(execIndex);
-        }
+        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : 
mergedExecToWorker.entrySet()) {
+            ExecutorDetails exec = entry.getKey();
+            WorkerSlot worker = entry.getValue();
+            RAS_Node node = nodes.get(worker.getNodeId());
 
-        public void tryToSchedule(Map<ExecutorDetails, String> execToComp, 
RAS_Node node, WorkerSlot workerSlot) {
-            ExecutorDetails exec = currentExec();
-            String comp = execToComp.get(exec);
-            LOG.trace("Trying assignment of {} {} to {}", exec, comp, 
workerSlot);
-            //It is possible that this component is already scheduled on this 
node or worker.  If so when we backtrack we cannot remove it
-            okToRemoveFromWorker[execIndex] = 
workerCompAssignment.computeIfAbsent(workerSlot, (k) -> new 
HashSet<>()).add(comp);
-            okToRemoveFromNode[execIndex] = 
nodeCompAssignment.computeIfAbsent(node, (k) -> new HashSet<>()).add(comp);
-            node.assignSingleExecutor(workerSlot, exec, td);
+            if (node.getAvailableMemoryResources() < 0.0 && 
node.getAvailableCpuResources() < 0.0) {
+                LOG.error("Incorrect Scheduling: found node with negative 
available resources");
+                return false;
+            }
+            nodeToExecs.computeIfAbsent(node, (k) -> new 
HashSet<>()).add(exec);
         }
 
-        public void backtrack(Map<ExecutorDetails, String> execToComp, 
RAS_Node node, WorkerSlot workerSlot) {
-            execIndex--;
-            if (execIndex < 0) {
-                throw new IllegalStateException("Internal Error: exec index 
became negative");
+        for (Map.Entry<RAS_Node, Collection<ExecutorDetails>> entry : 
nodeToExecs.entrySet()) {
+            RAS_Node node = entry.getKey();
+            Collection<ExecutorDetails> execs = entry.getValue();
+            double cpuUsed = 0.0;
+            double memoryUsed = 0.0;
+            for (ExecutorDetails exec : execs) {
+                cpuUsed += topo.getTotalCpuReqTask(exec);
+                memoryUsed += topo.getTotalMemReqTask(exec);
             }
-            numBacktrack++;
-            ExecutorDetails exec = currentExec();
-            String comp = execToComp.get(exec);
-            LOG.trace("Backtracking {} {} from {}", exec, comp, workerSlot);
-            if (okToRemoveFromWorker[execIndex]) {
-                workerCompAssignment.get(workerSlot).remove(comp);
-                okToRemoveFromWorker[execIndex] = false;
+            if (node.getAvailableCpuResources() != 
(node.getTotalCpuResources() - cpuUsed)) {
+                LOG.error("Incorrect Scheduling: node {} has consumed 
incorrect amount of cpu. Expected: {}"
+                          + " Actual: {} Executors scheduled on node: {}",
+                          node.getId(), (node.getTotalCpuResources() - 
cpuUsed), node.getAvailableCpuResources(), execs);
+                return false;
             }
-            if (okToRemoveFromNode[execIndex]) {
-                nodeCompAssignment.get(node).remove(comp);
-                okToRemoveFromNode[execIndex] = false;
+            if (node.getAvailableMemoryResources() != 
(node.getTotalMemoryResources() - memoryUsed)) {
+                LOG.error("Incorrect Scheduling: node {} has consumed 
incorrect amount of memory. Expected: {}"
+                          + " Actual: {} Executors scheduled on node: {}",
+                          node.getId(), (node.getTotalMemoryResources() - 
memoryUsed), node.getAvailableMemoryResources(), execs);
+                return false;
             }
-            node.freeSingleExecutor(exec, td);
         }
+        return true;
     }
 
-    private Map<String, RAS_Node> nodes;
-    private Map<ExecutorDetails, String> execToComp;
-    private Map<String, Set<ExecutorDetails>> compToExecs;
-    private List<String> favoredNodes;
-    private List<String> unFavoredNodes;
-
-    //constraints and spreads
-    private Map<String, Map<String, Integer>> constraintMatrix;
-    private HashSet<String> spreadComps = new HashSet<>();
-
-    //hard coded max number of states to search
-    public static final int MAX_STATE_SEARCH = 100_000;
-    public static final int DEFAULT_STATE_SEARCH = 10_000;
+    private static HashSet<String> getSpreadComps(TopologyDetails topo) {
+        HashSet<String> retSet = new HashSet<>();
+        List<String> spread = (List<String>) 
topo.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS);
+        if (spread != null) {
+            Set<String> comps = topo.getComponents().keySet();
+            for (String comp : spread) {
+                if (comps.contains(comp)) {
+                    retSet.add(comp);
+                } else {
+                    LOG.warn("Comp {} declared for spread not valid", comp);
+                }
+            }
+        }
+        return retSet;
+    }
 
     @Override
     public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
@@ -206,7 +246,8 @@ public class ConstraintSolverStrategy extends 
BaseResourceAwareStrategy {
         Map<RAS_Node, Set<String>> nodeCompAssignment = new HashMap<>();
         //set max number of states to search
         final int maxStateSearch = Math.min(MAX_STATE_SEARCH,
-            
ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH),
 DEFAULT_STATE_SEARCH));
+                                            
ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH),
+                                                                
DEFAULT_STATE_SEARCH));
 
         final long maxTimeMs =
             
ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_TIME_SECS),
 -1).intValue() * 1000L;
@@ -228,8 +269,8 @@ public class ConstraintSolverStrategy extends 
BaseResourceAwareStrategy {
         //get a sorted list of unassigned executors based on number of 
constraints
         Set<ExecutorDetails> unassignedExecutors = new 
HashSet<>(cluster.getUnassignedExecutors(td));
         List<ExecutorDetails> sortedExecs = getSortedExecs(spreadComps, 
constraintMatrix, compToExecs).stream()
-            .filter(unassignedExecutors::contains)
-            .collect(Collectors.toList());
+                                                                               
                       .filter(unassignedExecutors::contains)
+                                                                               
                       .collect(Collectors.toList());
 
         //populate with existing assignments
         SchedulerAssignment existingAssignment = 
cluster.getAssignmentById(td.getId());
@@ -258,18 +299,18 @@ public class ConstraintSolverStrategy extends 
BaseResourceAwareStrategy {
             int numExecs = compToExecs.get(comp).size();
             if (numExecs > nodes.size()) {
                 LOG.error("Unsatisfiable constraint: Component: {} marked as 
spread has {} executors which is larger "
-                    + "than number of nodes: {}", comp, numExecs, 
nodes.size());
+                          + "than number of nodes: {}", comp, numExecs, 
nodes.size());
                 return false;
             }
         }
         if (execToComp.size() >= MAX_STATE_SEARCH) {
             LOG.error("Number of executors is greater than the maximum number 
of states allowed to be searched.  "
-                + "# of executors: {} Max states to search: {}", 
execToComp.size(), MAX_STATE_SEARCH);
+                      + "# of executors: {} Max states to search: {}", 
execToComp.size(), MAX_STATE_SEARCH);
             return false;
         }
         return true;
     }
-    
+
     @Override
     protected TreeSet<ObjectResources> sortObjectResources(
         final AllResources allResources, ExecutorDetails exec, TopologyDetails 
topologyDetails,
@@ -289,233 +330,67 @@ public class ConstraintSolverStrategy extends 
BaseResourceAwareStrategy {
         ExecutorDetails exec = state.currentExec();
         List<ObjectResources> sortedNodes = sortAllNodes(state.td, exec, 
favoredNodes, unFavoredNodes);
 
-        for (ObjectResources nodeResources: sortedNodes) {
+        for (ObjectResources nodeResources : sortedNodes) {
             RAS_Node node = nodes.get(nodeResources.id);
             for (WorkerSlot workerSlot : node.getSlotsAvailbleTo(state.td)) {
                 if (isExecAssignmentToWorkerValid(workerSlot, state)) {
-                    state.tryToSchedule(execToComp, node, workerSlot);
-
-                    if (state.areAllExecsScheduled()) {
-                        //Everything is scheduled correctly, so no need to 
search any more.
-                        return new SolverResult(state, true);
-                    }
-
-                    SolverResult results = 
backtrackSearch(state.nextExecutor());
-                    if (results.success) {
-                        //We found a good result we are done.
-                        return results;
-                    }
-
-                    if (state.areSearchLimitsExceeded()) {
-                        //No need to search more it is not going to help.
-                        return new SolverResult(state, false);
-                    }
-
-                    //backtracking (If we ever get here there really isn't a 
lot of hope that we will find a scheduling)
-                    state.backtrack(execToComp, node, workerSlot);
-                }
-            }
-        }
-        //Tried all of the slots and none of them worked.
-        return new SolverResult(state, false);
-    }
-
-    /**
-     * Check if any constraints are violated if exec is scheduled on worker.
-     * @return true if scheduling exec on worker does not violate any 
constraints, returns false if it does
-     */
-    public boolean isExecAssignmentToWorkerValid(WorkerSlot worker, 
SearcherState state) {
-        final ExecutorDetails exec = state.currentExec();
-        //check resources
-        RAS_Node node = nodes.get(worker.getNodeId());
-        if (!node.wouldFit(worker, exec, state.td)) {
-            LOG.trace("{} would not fit in resources available on {}", exec, 
worker);
-            return false;
-        }
-
-        //check if exec can be on worker based on user defined component 
exclusions
-        String execComp = execToComp.get(exec);
-        Set<String> components = state.workerCompAssignment.get(worker);
-        if (components != null) {
-            Map<String, Integer> subMatrix = constraintMatrix.get(execComp);
-            for (String comp : components) {
-                if (subMatrix.get(comp) != 0) {
-                    LOG.trace("{} found {} constraint violation {} on {}", 
exec, execComp, comp, worker);
-                    return false;
-                }
-            }
-        }
-
-        //check if exec satisfy spread
-        if (spreadComps.contains(execComp)) {
-            if (state.nodeCompAssignment.computeIfAbsent(node, (k) -> new 
HashSet<>()).contains(execComp)) {
-                LOG.trace("{} Found spread violation {} on node {}", exec, 
execComp, node.getId());
-                return false;
-            }
-        }
-        return true;
-    }
-
-    static Map<String, Map<String, Integer>> getConstraintMap(TopologyDetails 
topo, Set<String> comps) {
-        Map<String, Map<String, Integer>> matrix = new HashMap<>();
-        for (String comp : comps) {
-            matrix.put(comp, new HashMap<>());
-            for (String comp2 : comps) {
-                matrix.get(comp).put(comp2, 0);
-            }
-        }
-        List<List<String>> constraints = (List<List<String>>) 
topo.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINTS);
-        if (constraints != null) {
-            for (List<String> constraintPair : constraints) {
-                String comp1 = constraintPair.get(0);
-                String comp2 = constraintPair.get(1);
-                if (!matrix.containsKey(comp1)) {
-                    LOG.warn("Comp: {} declared in constraints is not valid!", 
comp1);
-                    continue;
-                }
-                if (!matrix.containsKey(comp2)) {
-                    LOG.warn("Comp: {} declared in constraints is not valid!", 
comp2);
-                    continue;
-                }
-                matrix.get(comp1).put(comp2, 1);
-                matrix.get(comp2).put(comp1, 1);
-            }
-        }
-        return matrix;
-    }
-
-    /**
-     * Determines if a scheduling is valid and all constraints are satisfied.
-     */
-    @VisibleForTesting
-    public static boolean validateSolution(Cluster cluster, TopologyDetails 
td) {
-        return checkSpreadSchedulingValid(cluster, td)
-            && checkConstraintsSatisfied(cluster, td)
-            && checkResourcesCorrect(cluster, td);
-    }
-
-    /**
-     * Check if constraints are satisfied.
-     */
-    private static boolean checkConstraintsSatisfied(Cluster cluster, 
TopologyDetails topo) {
-        LOG.info("Checking constraints...");
-        Map<ExecutorDetails, WorkerSlot> result = 
cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
-        Map<ExecutorDetails, String> execToComp = 
topo.getExecutorToComponent();
-        //get topology constraints
-        Map<String, Map<String, Integer>> constraintMatrix = 
getConstraintMap(topo, new HashSet<>(topo.getExecutorToComponent().values()));
-
-        Map<WorkerSlot, Set<String>> workerCompMap = new HashMap<>();
-        result.forEach((exec, worker) -> {
-            String comp = execToComp.get(exec);
-            workerCompMap.computeIfAbsent(worker, (k) -> new 
HashSet<>()).add(comp);
-        });
-        for (Map.Entry<WorkerSlot, Set<String>> entry : 
workerCompMap.entrySet()) {
-            Set<String> comps = entry.getValue();
-            for (String comp1 : comps) {
-                for (String comp2: comps) {
-                    if (!comp1.equals(comp2) && 
constraintMatrix.get(comp1).get(comp2) != 0) {
-                        LOG.error("Incorrect Scheduling: worker exclusion for 
Component {} and {} not satisfied on WorkerSlot: {}",
-                            comp1, comp2, entry.getKey());
-                        return false;
-                    }
-                }
-            }
-        }
-        return true;
-    }
+                    state.tryToSchedule(execToComp, node, workerSlot);
 
-    private static Map<WorkerSlot, RAS_Node> workerToNodes(Cluster cluster) {
-        Map<WorkerSlot, RAS_Node> workerToNodes = new HashMap<>();
-        for (RAS_Node node: RAS_Nodes.getAllNodesFrom(cluster).values()) {
-            for (WorkerSlot s : node.getUsedSlots()) {
-                workerToNodes.put(s, node);
-            }
-        }
-        return workerToNodes;
-    }
+                    if (state.areAllExecsScheduled()) {
+                        //Everything is scheduled correctly, so no need to 
search any more.
+                        return new SolverResult(state, true);
+                    }
 
-    private static boolean checkSpreadSchedulingValid(Cluster cluster, 
TopologyDetails topo) {
-        LOG.info("Checking for a valid scheduling...");
-        Map<ExecutorDetails, WorkerSlot> result = 
cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
-        Map<ExecutorDetails, String> execToComp = 
topo.getExecutorToComponent();
-        Map<WorkerSlot, HashSet<ExecutorDetails>> workerExecMap = new 
HashMap<>();
-        Map<WorkerSlot, HashSet<String>> workerCompMap = new HashMap<>();
-        Map<RAS_Node, HashSet<String>> nodeCompMap = new HashMap<>();
-        Map<WorkerSlot, RAS_Node> workerToNodes = workerToNodes(cluster);
-        boolean ret = true;
+                    SolverResult results = 
backtrackSearch(state.nextExecutor());
+                    if (results.success) {
+                        //We found a good result we are done.
+                        return results;
+                    }
 
-        HashSet<String> spreadComps = getSpreadComps(topo);
-        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : result.entrySet()) 
{
-            ExecutorDetails exec = entry.getKey();
-            WorkerSlot worker = entry.getValue();
-            RAS_Node node = workerToNodes.get(worker);
+                    if (state.areSearchLimitsExceeded()) {
+                        //No need to search more it is not going to help.
+                        return new SolverResult(state, false);
+                    }
 
-            if (workerExecMap.computeIfAbsent(worker, (k) -> new 
HashSet<>()).contains(exec)) {
-                LOG.error("Incorrect Scheduling: Found duplicate in 
scheduling");
-                return false;
-            }
-            workerExecMap.get(worker).add(exec);
-            String comp = execToComp.get(exec);
-            workerCompMap.computeIfAbsent(worker, (k) -> new 
HashSet<>()).add(comp);
-            if (spreadComps.contains(comp)) {
-                if (nodeCompMap.computeIfAbsent(node, (k) -> new 
HashSet<>()).contains(comp)) {
-                    LOG.error("Incorrect Scheduling: Spread for Component: {} 
{} on node {} not satisfied {}",
-                        comp, exec, node.getId(), nodeCompMap.get(node));
-                    ret = false;
+                    //backtracking (If we ever get here there really isn't a 
lot of hope that we will find a scheduling)
+                    state.backtrack(execToComp, node, workerSlot);
                 }
             }
-            nodeCompMap.computeIfAbsent(node, (k) -> new 
HashSet<>()).add(comp);
         }
-        return ret;
+        //Tried all of the slots and none of them worked.
+        return new SolverResult(state, false);
     }
 
     /**
-     * Check if resource constraints satisfied.
+     * Check if any constraints are violated if exec is scheduled on worker.
+     * @return true if scheduling exec on worker does not violate any 
constraints, returns false if it does
      */
-    private static boolean checkResourcesCorrect(Cluster cluster, 
TopologyDetails topo) {
-        LOG.info("Checking Resources...");
-        Map<ExecutorDetails, WorkerSlot> result = 
cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
-        Map<RAS_Node, Collection<ExecutorDetails>> nodeToExecs = new 
HashMap<>();
-        Map<ExecutorDetails, WorkerSlot> mergedExecToWorker = new HashMap<>();
-        Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster);
-        //merge with existing assignments
-        if (cluster.getAssignmentById(topo.getId()) != null
-                && cluster.getAssignmentById(topo.getId()).getExecutorToSlot() 
!= null) {
-            
mergedExecToWorker.putAll(cluster.getAssignmentById(topo.getId()).getExecutorToSlot());
+    public boolean isExecAssignmentToWorkerValid(WorkerSlot worker, 
SearcherState state) {
+        final ExecutorDetails exec = state.currentExec();
+        //check resources
+        RAS_Node node = nodes.get(worker.getNodeId());
+        if (!node.wouldFit(worker, exec, state.td)) {
+            LOG.trace("{} would not fit in resources available on {}", exec, 
worker);
+            return false;
         }
-        mergedExecToWorker.putAll(result);
-
-        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : 
mergedExecToWorker.entrySet()) {
-            ExecutorDetails exec = entry.getKey();
-            WorkerSlot worker = entry.getValue();
-            RAS_Node node = nodes.get(worker.getNodeId());
 
-            if (node.getAvailableMemoryResources() < 0.0 && 
node.getAvailableCpuResources() < 0.0) {
-                LOG.error("Incorrect Scheduling: found node with negative 
available resources");
-                return false;
+        //check if exec can be on worker based on user defined component 
exclusions
+        String execComp = execToComp.get(exec);
+        Set<String> components = state.workerCompAssignment.get(worker);
+        if (components != null) {
+            Map<String, Integer> subMatrix = constraintMatrix.get(execComp);
+            for (String comp : components) {
+                if (subMatrix.get(comp) != 0) {
+                    LOG.trace("{} found {} constraint violation {} on {}", 
exec, execComp, comp, worker);
+                    return false;
+                }
             }
-            nodeToExecs.computeIfAbsent(node, (k) -> new 
HashSet<>()).add(exec);
         }
 
-        for (Map.Entry<RAS_Node, Collection<ExecutorDetails>> entry : 
nodeToExecs.entrySet()) {
-            RAS_Node node = entry.getKey();
-            Collection<ExecutorDetails> execs = entry.getValue();
-            double cpuUsed = 0.0;
-            double memoryUsed = 0.0;
-            for (ExecutorDetails exec : execs) {
-                cpuUsed += topo.getTotalCpuReqTask(exec);
-                memoryUsed += topo.getTotalMemReqTask(exec);
-            }
-            if (node.getAvailableCpuResources() != 
(node.getTotalCpuResources() - cpuUsed)) {
-                LOG.error("Incorrect Scheduling: node {} has consumed 
incorrect amount of cpu. Expected: {}"
-                        + " Actual: {} Executors scheduled on node: {}",
-                        node.getId(), (node.getTotalCpuResources() - cpuUsed), 
node.getAvailableCpuResources(), execs);
-                return false;
-            }
-            if (node.getAvailableMemoryResources() != 
(node.getTotalMemoryResources() - memoryUsed)) {
-                LOG.error("Incorrect Scheduling: node {} has consumed 
incorrect amount of memory. Expected: {}"
-                        + " Actual: {} Executors scheduled on node: {}",
-                        node.getId(), (node.getTotalMemoryResources() - 
memoryUsed), node.getAvailableMemoryResources(), execs);
+        //check if exec satisfy spread
+        if (spreadComps.contains(execComp)) {
+            if (state.nodeCompAssignment.computeIfAbsent(node, (k) -> new 
HashSet<>()).contains(execComp)) {
+                LOG.trace("{} Found spread violation {} on node {}", exec, 
execComp, node.getId());
                 return false;
             }
         }
@@ -551,22 +426,6 @@ public class ConstraintSolverStrategy extends 
BaseResourceAwareStrategy {
         return retList;
     }
 
-    private static HashSet<String> getSpreadComps(TopologyDetails topo) {
-        HashSet<String> retSet = new HashSet<>();
-        List<String> spread = (List<String>) 
topo.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS);
-        if (spread != null) {
-            Set<String> comps = topo.getComponents().keySet();
-            for (String comp : spread) {
-                if (comps.contains(comp)) {
-                    retSet.add(comp);
-                } else {
-                    LOG.warn("Comp {} declared for spread not valid", comp);
-                }
-            }
-        }
-        return retSet;
-    }
-
     /**
      * Used to sort a Map by the values.
      */
@@ -584,4 +443,136 @@ public class ConstraintSolverStrategy extends 
BaseResourceAwareStrategy {
         sortedByValues.putAll(map);
         return sortedByValues;
     }
+
+    protected static class SolverResult {
+        private final int statesSearched;
+        private final boolean success;
+        private final long timeTakenMillis;
+        private final int backtracked;
+
+        public SolverResult(SearcherState state, boolean success) {
+            this.statesSearched = state.getStatesSearched();
+            this.success = success;
+            timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis;
+            backtracked = state.numBacktrack;
+        }
+
+        public SchedulingResult asSchedulingResult() {
+            if (success) {
+                return SchedulingResult.success("Fully Scheduled by 
ConstraintSolverStrategy (" + statesSearched
+                                                + " states traversed in " + 
timeTakenMillis + "ms, backtracked " + backtracked + " times)");
+            }
+            return 
SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
+                                            "Cannot find scheduling that 
satisfies all constraints (" + statesSearched
+                                            + " states traversed in " + 
timeTakenMillis + "ms, backtracked " + backtracked + " times)");
+        }
+    }
+
+    protected static class SearcherState {
+        final long startTimeMillis;
+        private final long maxEndTimeMs;
+        // A map of the worker to the components in the worker to be able to 
enforce constraints.
+        private final Map<WorkerSlot, Set<String>> workerCompAssignment;
+        private final boolean[] okToRemoveFromWorker;
+        // for the currently tested assignment a Map of the node to the 
components on it to be able to enforce constraints
+        private final Map<RAS_Node, Set<String>> nodeCompAssignment;
+        private final boolean[] okToRemoveFromNode;
+        // Static State
+        // The list of all executors (preferably sorted to make assignments 
simpler).
+        private final List<ExecutorDetails> execs;
+        //The maximum number of state to search before stopping.
+        private final int maxStatesSearched;
+        //The topology we are scheduling
+        private final TopologyDetails td;
+        // Metrics
+        // How many states searched so far.
+        private int statesSearched = 0;
+        // Number of times we had to backtrack.
+        private int numBacktrack = 0;
+        // Current state
+        // The current executor we are trying to schedule
+        private int execIndex = 0;
+
+        private SearcherState(Map<WorkerSlot, Set<String>> 
workerCompAssignment, Map<RAS_Node, Set<String>> nodeCompAssignment,
+                              int maxStatesSearched, long maxTimeMs, 
List<ExecutorDetails> execs, TopologyDetails td) {
+            assert !execs.isEmpty();
+            assert execs != null;
+
+            this.workerCompAssignment = workerCompAssignment;
+            this.nodeCompAssignment = nodeCompAssignment;
+            this.maxStatesSearched = maxStatesSearched;
+            this.execs = execs;
+            okToRemoveFromWorker = new boolean[execs.size()];
+            okToRemoveFromNode = new boolean[execs.size()];
+            this.td = td;
+            startTimeMillis = Time.currentTimeMillis();
+            if (maxTimeMs <= 0) {
+                maxEndTimeMs = Long.MAX_VALUE;
+            } else {
+                maxEndTimeMs = startTimeMillis + maxTimeMs;
+            }
+        }
+
+        public void incStatesSearched() {
+            statesSearched++;
+            if (LOG.isDebugEnabled() && statesSearched % 1_000 == 0) {
+                LOG.debug("States Searched: {}", statesSearched);
+                LOG.debug("backtrack: {}", numBacktrack);
+            }
+        }
+
+        public int getStatesSearched() {
+            return statesSearched;
+        }
+
+        public boolean areSearchLimitsExceeded() {
+            return statesSearched > maxStatesSearched || 
Time.currentTimeMillis() > maxEndTimeMs;
+        }
+
+        public SearcherState nextExecutor() {
+            execIndex++;
+            if (execIndex >= execs.size()) {
+                throw new IllegalStateException("Internal Error: exceeded the 
exec limit " + execIndex + " >= " + execs.size());
+            }
+            return this;
+        }
+
+        public boolean areAllExecsScheduled() {
+            return execIndex == execs.size() - 1;
+        }
+
+        public ExecutorDetails currentExec() {
+            return execs.get(execIndex);
+        }
+
+        public void tryToSchedule(Map<ExecutorDetails, String> execToComp, 
RAS_Node node, WorkerSlot workerSlot) {
+            ExecutorDetails exec = currentExec();
+            String comp = execToComp.get(exec);
+            LOG.trace("Trying assignment of {} {} to {}", exec, comp, 
workerSlot);
+            //It is possible that this component is already scheduled on this 
node or worker.  If so when we backtrack we cannot remove it
+            okToRemoveFromWorker[execIndex] = 
workerCompAssignment.computeIfAbsent(workerSlot, (k) -> new 
HashSet<>()).add(comp);
+            okToRemoveFromNode[execIndex] = 
nodeCompAssignment.computeIfAbsent(node, (k) -> new HashSet<>()).add(comp);
+            node.assignSingleExecutor(workerSlot, exec, td);
+        }
+
+        public void backtrack(Map<ExecutorDetails, String> execToComp, 
RAS_Node node, WorkerSlot workerSlot) {
+            execIndex--;
+            if (execIndex < 0) {
+                throw new IllegalStateException("Internal Error: exec index 
became negative");
+            }
+            numBacktrack++;
+            ExecutorDetails exec = currentExec();
+            String comp = execToComp.get(exec);
+            LOG.trace("Backtracking {} {} from {}", exec, comp, workerSlot);
+            if (okToRemoveFromWorker[execIndex]) {
+                workerCompAssignment.get(workerSlot).remove(comp);
+                okToRemoveFromWorker[execIndex] = false;
+            }
+            if (okToRemoveFromNode[execIndex]) {
+                nodeCompAssignment.get(node).remove(comp);
+                okToRemoveFromNode[execIndex] = false;
+            }
+            node.freeSingleExecutor(exec, td);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index efa3ecf..826c24d 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -43,10 +43,10 @@ public class DefaultResourceAwareStrategy extends 
BaseResourceAwareStrategy impl
         if (nodes.getNodes().size() <= 0) {
             LOG.warn("No available nodes to schedule tasks on!");
             return SchedulingResult.failure(
-                    SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available 
nodes to schedule tasks on!");
+                SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available 
nodes to schedule tasks on!");
         }
         Collection<ExecutorDetails> unassignedExecutors =
-                new HashSet<>(this.cluster.getUnassignedExecutors(td));
+            new HashSet<>(this.cluster.getUnassignedExecutors(td));
         LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
         Collection<ExecutorDetails> scheduledTasks = new ArrayList<>();
         List<Component> spouts = this.getSpouts(td);
@@ -54,7 +54,7 @@ public class DefaultResourceAwareStrategy extends 
BaseResourceAwareStrategy impl
         if (spouts.size() == 0) {
             LOG.error("Cannot find a Spout!");
             return SchedulingResult.failure(
-                    SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a 
Spout!");
+                SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a 
Spout!");
         }
 
         //order executors to be scheduled
@@ -66,10 +66,10 @@ public class DefaultResourceAwareStrategy extends 
BaseResourceAwareStrategy impl
 
         for (ExecutorDetails exec : orderedExecutors) {
             LOG.debug(
-                    "Attempting to schedule: {} of component {}[ REQ {} ]",
-                    exec,
-                    td.getExecutorToComponent().get(exec),
-                    td.getTaskResourceReqList(exec));
+                "Attempting to schedule: {} of component {}[ REQ {} ]",
+                exec,
+                td.getExecutorToComponent().get(exec),
+                td.getTaskResourceReqList(exec));
             scheduleExecutor(exec, td, scheduledTasks, sortedNodes);
         }
 
@@ -85,12 +85,12 @@ public class DefaultResourceAwareStrategy extends 
BaseResourceAwareStrategy impl
         if (executorsNotScheduled.size() > 0) {
             LOG.error("Not all executors successfully scheduled: {}", 
executorsNotScheduled);
             result =
-                    SchedulingResult.failure(
-                            SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
-                            (td.getExecutors().size() - 
unassignedExecutors.size())
-                                    + "/"
-                                    + td.getExecutors().size()
-                                    + " executors scheduled");
+                SchedulingResult.failure(
+                    SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
+                    (td.getExecutors().size() - unassignedExecutors.size())
+                    + "/"
+                    + td.getExecutors().size()
+                    + " executors scheduled");
         } else {
             LOG.debug("All resources successfully scheduled!");
             result = SchedulingResult.success("Fully Scheduled by " + 
this.getClass().getSimpleName());
@@ -99,64 +99,61 @@ public class DefaultResourceAwareStrategy extends 
BaseResourceAwareStrategy impl
     }
 
     /**
-     * Sort objects by the following two criteria. 1) the number executors of 
the topology that needs
-     * to be scheduled is already on the object (node or rack) in descending 
order. The reasoning to
-     * sort based on criterion 1 is so we schedule the rest of a topology on 
the same object (node or
-     * rack) as the existing executors of the topology. 2) the 
subordinate/subservient resource
-     * availability percentage of a rack in descending order We calculate the 
resource availability
-     * percentage by dividing the resource availability of the object (node or 
rack) by the resource
-     * availability of the entire rack or cluster depending on if object 
references a node or a rack.
-     * By doing this calculation, objects (node or rack) that have exhausted 
or little of one of the
-     * resources mentioned above will be ranked after racks that have more 
balanced resource
-     * availability. So we will be less likely to pick a rack that have a lot 
of one resource but a
-     * low amount of another.
+     * Sort objects by the following two criteria. 1) the number executors of 
the topology that needs to be scheduled is already on the
+     * object (node or rack) in descending order. The reasoning to sort based 
on criterion 1 is so we schedule the rest of a topology on the
+     * same object (node or rack) as the existing executors of the topology. 
2) the subordinate/subservient resource availability percentage
+     * of a rack in descending order We calculate the resource availability 
percentage by dividing the resource availability of the object
+     * (node or rack) by the resource availability of the entire rack or 
cluster depending on if object references a node or a rack. By
+     * doing this calculation, objects (node or rack) that have exhausted or 
little of one of the resources mentioned above will be ranked
+     * after racks that have more balanced resource availability. So we will 
be less likely to pick a rack that have a lot of one resource
+     * but a low amount of another.
      *
-     * @param allResources contains all individual ObjectResources as well as 
cumulative stats
+     * @param allResources         contains all individual ObjectResources as 
well as cumulative stats
      * @param existingScheduleFunc a function to get existing executors 
already scheduled on this object
      * @return a sorted list of ObjectResources
      */
     @Override
     protected TreeSet<ObjectResources> sortObjectResources(
-            final AllResources allResources, ExecutorDetails exec, 
TopologyDetails topologyDetails,
-            final ExistingScheduleFunc existingScheduleFunc) {
-        
+        final AllResources allResources, ExecutorDetails exec, TopologyDetails 
topologyDetails,
+        final ExistingScheduleFunc existingScheduleFunc) {
+
         for (ObjectResources objectResources : allResources.objectResources) {
             objectResources.effectiveResources =
                 
allResources.availableResourcesOverall.calculateMinPercentageUsedBy(objectResources.availableResources);
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Effective resources for {} is {}, and 
numExistingSchedule is {}",
-                    objectResources.id, objectResources.effectiveResources,
-                    
existingScheduleFunc.getNumExistingSchedule(objectResources.id));
+                          objectResources.id, 
objectResources.effectiveResources,
+                          
existingScheduleFunc.getNumExistingSchedule(objectResources.id));
             }
         }
 
         TreeSet<ObjectResources> sortedObjectResources =
-                new TreeSet<>((o1, o2) -> {
-                    int execsScheduled1 = 
existingScheduleFunc.getNumExistingSchedule(o1.id);
-                    int execsScheduled2 = 
existingScheduleFunc.getNumExistingSchedule(o2.id);
-                    if (execsScheduled1 > execsScheduled2) {
+            new TreeSet<>((o1, o2) -> {
+                int execsScheduled1 = 
existingScheduleFunc.getNumExistingSchedule(o1.id);
+                int execsScheduled2 = 
existingScheduleFunc.getNumExistingSchedule(o2.id);
+                if (execsScheduled1 > execsScheduled2) {
+                    return -1;
+                } else if (execsScheduled1 < execsScheduled2) {
+                    return 1;
+                } else {
+                    if (o1.effectiveResources > o2.effectiveResources) {
                         return -1;
-                    } else if (execsScheduled1 < execsScheduled2) {
+                    } else if (o1.effectiveResources < o2.effectiveResources) {
                         return 1;
                     } else {
-                        if (o1.effectiveResources > o2.effectiveResources) {
+                        double o1Avg = 
allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources);
+                        double o2Avg = 
allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources);
+
+                        if (o1Avg > o2Avg) {
                             return -1;
-                        } else if (o1.effectiveResources < 
o2.effectiveResources) {
+                        } else if (o1Avg < o2Avg) {
                             return 1;
                         } else {
-                            double o1Avg = 
allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources);
-                            double o2Avg = 
allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources);
-
-                            if (o1Avg > o2Avg) {
-                                return -1;
-                            } else if (o1Avg < o2Avg) {
-                                return 1;
-                            } else {
-                                return o1.id.compareTo(o2.id);
-                            }
+                            return o1.id.compareTo(o2.id);
                         }
                     }
-                });
+                }
+            });
         sortedObjectResources.addAll(allResources.objectResources);
         LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
         return sortedObjectResources;

Reply via email to