http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/Node.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/Node.java 
b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/Node.java
index 389faba..89fe462 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/Node.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/Node.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.multitenant;
@@ -23,318 +17,313 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.Map.Entry;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import java.util.Set;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.SchedulerAssignment;
 import org.apache.storm.scheduler.SupervisorDetails;
 import org.apache.storm.scheduler.WorkerSlot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Represents a single node in the cluster.
  */
 public class Node {
-  private static final Logger LOG = LoggerFactory.getLogger(Node.class);
-  private Map<String, Set<WorkerSlot>> _topIdToUsedSlots = new HashMap<>();
-  private Set<WorkerSlot> _freeSlots = new HashSet<>();
-  private final String _nodeId;
-  private boolean _isAlive;
-  
-  public Node(String nodeId, Set<Integer> allPorts, boolean isAlive) {
-    _nodeId = nodeId;
-    _isAlive = isAlive;
-    if (_isAlive && allPorts != null) {
-      for (int port: allPorts) {
-        _freeSlots.add(new WorkerSlot(_nodeId, port));
-      }
-    }
-  }
+    /**
+     * Used to sort a list of nodes so the node with the most free slots comes
+     * first.
+     */
+    public static final Comparator<Node> FREE_NODE_COMPARATOR_DEC = new 
Comparator<Node>() {
+        @Override
+        public int compare(Node o1, Node o2) {
+            return o1.totalSlotsUsed() - o2.totalSlotsUsed();
+        }
+    };
+    private static final Logger LOG = LoggerFactory.getLogger(Node.class);
+    private final String _nodeId;
+    private Map<String, Set<WorkerSlot>> _topIdToUsedSlots = new HashMap<>();
+    private Set<WorkerSlot> _freeSlots = new HashSet<>();
+    private boolean _isAlive;
 
-  public String getId() {
-    return _nodeId;
-  }
-  
-  public boolean isAlive() {
-    return _isAlive;
-  }
-  
-  /**
-   * @return a collection of the topology ids currently running on this node
-   */
-  public Collection<String> getRunningTopologies() {
-    return _topIdToUsedSlots.keySet();
-  }
-  
-  public boolean isTotallyFree() {
-    return _topIdToUsedSlots.isEmpty();
-  }
-  
-  public int totalSlotsFree() {
-    return _freeSlots.size();
-  }
-  
-  public int totalSlotsUsed() {
-    int total = 0;
-    for (Set<WorkerSlot> slots: _topIdToUsedSlots.values()) {
-      total += slots.size();
-    }
-    return total;
-  }
-  
-  public int totalSlots() {
-    return totalSlotsFree() + totalSlotsUsed();
-  }
-  
-  public int totalSlotsUsed(String topId) {
-    int total = 0;
-    Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId);
-    if (slots != null) {
-      total = slots.size();
+    public Node(String nodeId, Set<Integer> allPorts, boolean isAlive) {
+        _nodeId = nodeId;
+        _isAlive = isAlive;
+        if (_isAlive && allPorts != null) {
+            for (int port : allPorts) {
+                _freeSlots.add(new WorkerSlot(_nodeId, port));
+            }
+        }
     }
-    return total;
-  }
 
-  private void validateSlot(WorkerSlot ws) {
-    if (!_nodeId.equals(ws.getNodeId())) {
-      throw new IllegalArgumentException(
-          "Trying to add a slot to the wrong node " + ws + 
-          " is not a part of " + _nodeId);
+    public static int countSlotsUsed(String topId, Collection<Node> nodes) {
+        int total = 0;
+        for (Node n : nodes) {
+            total += n.totalSlotsUsed(topId);
+        }
+        return total;
     }
-  }
- 
-  private void addOrphanedSlot(WorkerSlot ws) {
-    if (_isAlive) {
-      throw new IllegalArgumentException("Orphaned Slots " +
-        "only are allowed on dead nodes.");
+
+    public static int countSlotsUsed(Collection<Node> nodes) {
+        int total = 0;
+        for (Node n : nodes) {
+            total += n.totalSlotsUsed();
+        }
+        return total;
     }
-    validateSlot(ws);
-    if (_freeSlots.contains(ws)) {
-      return;
+
+    public static int countFreeSlotsAlive(Collection<Node> nodes) {
+        int total = 0;
+        for (Node n : nodes) {
+            if (n.isAlive()) {
+                total += n.totalSlotsFree();
+            }
+        }
+        return total;
     }
-    for (Set<WorkerSlot> used: _topIdToUsedSlots.values()) {
-      if (used.contains(ws)) {
-        return;
-      }
+
+    public static int countTotalSlotsAlive(Collection<Node> nodes) {
+        int total = 0;
+        for (Node n : nodes) {
+            if (n.isAlive()) {
+                total += n.totalSlots();
+            }
+        }
+        return total;
     }
-    _freeSlots.add(ws);
-  }
- 
-  boolean assignInternal(WorkerSlot ws, String topId, boolean dontThrow) {
-    validateSlot(ws);
-    if (!_freeSlots.remove(ws)) {
-      for (Entry<String, Set<WorkerSlot>> topologySetEntry : 
_topIdToUsedSlots.entrySet()) {
-        if (topologySetEntry.getValue().contains(ws)) {
-          if (dontThrow) {
-            LOG.warn("Worker slot [" + ws + "] can't be assigned to " + topId +
-                    ". Its already assigned to " + topologySetEntry.getKey() + 
".");
-            return true;
-          }
-          throw new IllegalStateException("Worker slot [" + ws + "] can't be 
assigned to "
-                  + topId + ". Its already assigned to " + 
topologySetEntry.getKey() + ".");
+
+    public static Map<String, Node> getAllNodesFrom(Cluster cluster) {
+        Map<String, Node> nodeIdToNode = new HashMap<>();
+        for (SupervisorDetails sup : cluster.getSupervisors().values()) {
+            //Node ID and supervisor ID are the same.
+            String id = sup.getId();
+            boolean isAlive = !cluster.isBlackListed(id);
+            LOG.debug("Found a {} Node {} {}",
+                      isAlive ? "living" : "dead", id, sup.getAllPorts());
+            nodeIdToNode.put(id, new Node(id, sup.getAllPorts(), isAlive));
         }
-      }
-      LOG.warn("Adding Worker slot [" + ws + "] that was not reported in the 
supervisor heartbeats," +
-              " but the worker is already running for topology " + topId + 
".");
+
+        for (Entry<String, SchedulerAssignment> entry : 
cluster.getAssignments().entrySet()) {
+            String topId = entry.getValue().getTopologyId();
+            for (WorkerSlot ws : entry.getValue().getSlots()) {
+                String id = ws.getNodeId();
+                Node node = nodeIdToNode.get(id);
+                if (node == null) {
+                    LOG.debug("Found an assigned slot on a dead supervisor 
{}", ws);
+                    node = new Node(id, null, false);
+                    nodeIdToNode.put(id, node);
+                }
+                if (!node.isAlive()) {
+                    //The supervisor on the node down so add an orphaned slot 
to hold the unsupervised worker
+                    node.addOrphanedSlot(ws);
+                }
+                if (node.assignInternal(ws, topId, true)) {
+                    LOG.warn("Bad scheduling state for topology [" + topId + 
"], the slot " +
+                             ws + " assigned to multiple workers, un-assigning 
everything...");
+                    node.free(ws, cluster, true);
+                }
+            }
+        }
+
+        return nodeIdToNode;
     }
-    Set<WorkerSlot> usedSlots = _topIdToUsedSlots.get(topId);
-    if (usedSlots == null) {
-      usedSlots = new HashSet<>();
-      _topIdToUsedSlots.put(topId, usedSlots);
+
+    public String getId() {
+        return _nodeId;
     }
-    usedSlots.add(ws);
-    return false;
-  }
-  
-  /**
-   * Free all slots on this node.  This will update the Cluster too.
-   * @param cluster the cluster to be updated
-   */
-  public void freeAllSlots(Cluster cluster) {
-    if (!_isAlive) {
-      LOG.warn("Freeing all slots on a dead node {} ",_nodeId);
-    } 
-    for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
-      cluster.freeSlots(entry.getValue());
-      if (_isAlive) {
-        _freeSlots.addAll(entry.getValue());
-      }
+
+    public boolean isAlive() {
+        return _isAlive;
     }
-    _topIdToUsedSlots = new HashMap<>();
-  }
-  
-  /**
-   * Frees a single slot in this node
-   * @param ws the slot to free
-   * @param cluster the cluster to update
-   */
-  public void free(WorkerSlot ws, Cluster cluster, boolean forceFree) {
-    if (_freeSlots.contains(ws)) return;
-    boolean wasFound = false;
-    for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
-      Set<WorkerSlot> slots = entry.getValue();
-      if (slots.remove(ws)) {
-        cluster.freeSlot(ws);
-        if (_isAlive) {
-          _freeSlots.add(ws);
-        }
-        wasFound = true;
-      }
+
+    /**
+     * @return a collection of the topology ids currently running on this node
+     */
+    public Collection<String> getRunningTopologies() {
+        return _topIdToUsedSlots.keySet();
     }
-    if(!wasFound)
-    {
-      if(forceFree)
-      {
-        LOG.info("Forcefully freeing the " + ws);
-        cluster.freeSlot(ws);
-        _freeSlots.add(ws);
-      } else {
-        throw new IllegalArgumentException("Tried to free a slot that was not" 
+
-              " part of this node " + _nodeId);
-      }
+
+    public boolean isTotallyFree() {
+        return _topIdToUsedSlots.isEmpty();
     }
-  }
-   
-  /**
-   * Frees all the slots for a topology.
-   * @param topId the topology to free slots for
-   * @param cluster the cluster to update
-   */
-  public void freeTopology(String topId, Cluster cluster) {
-    Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId);
-    if (slots == null || slots.isEmpty()) return;
-    for (WorkerSlot ws : slots) {
-      cluster.freeSlot(ws);
-      if (_isAlive) {
-        _freeSlots.add(ws);
-      }
+
+    public int totalSlotsFree() {
+        return _freeSlots.size();
     }
-    _topIdToUsedSlots.remove(topId);
-  }
- 
-  /**
-   * Assign a free slot on the node to the following topology and executors.
-   * This will update the cluster too.
-   * @param topId the topology to assign a free slot to.
-   * @param executors the executors to run in that slot.
-   * @param cluster the cluster to be updated
-   */
-  public void assign(String topId, Collection<ExecutorDetails> executors, 
-      Cluster cluster) {
-    if (!_isAlive) {
-      throw new IllegalStateException("Trying to adding to a dead node " + 
_nodeId);
+
+    public int totalSlotsUsed() {
+        int total = 0;
+        for (Set<WorkerSlot> slots : _topIdToUsedSlots.values()) {
+            total += slots.size();
+        }
+        return total;
+    }
+
+    public int totalSlots() {
+        return totalSlotsFree() + totalSlotsUsed();
     }
-    if (_freeSlots.isEmpty()) {
-      throw new IllegalStateException("Trying to assign to a full node " + 
_nodeId);
+
+    public int totalSlotsUsed(String topId) {
+        int total = 0;
+        Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId);
+        if (slots != null) {
+            total = slots.size();
+        }
+        return total;
     }
-    if (executors.size() == 0) {
-      LOG.warn("Trying to assign nothing from " + topId + " to " + _nodeId + " 
(Ignored)");
-    } else {
-      WorkerSlot slot = _freeSlots.iterator().next();
-      cluster.assign(slot, topId, executors);
-      assignInternal(slot, topId, false);
+
+    private void validateSlot(WorkerSlot ws) {
+        if (!_nodeId.equals(ws.getNodeId())) {
+            throw new IllegalArgumentException(
+                "Trying to add a slot to the wrong node " + ws +
+                " is not a part of " + _nodeId);
+        }
     }
-  }
-  
-  @Override
-  public boolean equals(Object other) {
-      return other instanceof Node && _nodeId.equals(((Node) other)._nodeId);
-  }
-  
-  @Override
-  public int hashCode() {
-    return _nodeId.hashCode();
-  }
-  
-  @Override
-  public String toString() {
-    return "Node: " + _nodeId;
-  }
 
-  public static int countSlotsUsed(String topId, Collection<Node> nodes) {
-    int total = 0;
-    for (Node n: nodes) {
-      total += n.totalSlotsUsed(topId);
+    private void addOrphanedSlot(WorkerSlot ws) {
+        if (_isAlive) {
+            throw new IllegalArgumentException("Orphaned Slots " +
+                                               "only are allowed on dead 
nodes.");
+        }
+        validateSlot(ws);
+        if (_freeSlots.contains(ws)) {
+            return;
+        }
+        for (Set<WorkerSlot> used : _topIdToUsedSlots.values()) {
+            if (used.contains(ws)) {
+                return;
+            }
+        }
+        _freeSlots.add(ws);
     }
-    return total;
-  }
-  
-  public static int countSlotsUsed(Collection<Node> nodes) {
-    int total = 0;
-    for (Node n: nodes) {
-      total += n.totalSlotsUsed();
+
+    boolean assignInternal(WorkerSlot ws, String topId, boolean dontThrow) {
+        validateSlot(ws);
+        if (!_freeSlots.remove(ws)) {
+            for (Entry<String, Set<WorkerSlot>> topologySetEntry : 
_topIdToUsedSlots.entrySet()) {
+                if (topologySetEntry.getValue().contains(ws)) {
+                    if (dontThrow) {
+                        LOG.warn("Worker slot [" + ws + "] can't be assigned 
to " + topId +
+                                 ". Its already assigned to " + 
topologySetEntry.getKey() + ".");
+                        return true;
+                    }
+                    throw new IllegalStateException("Worker slot [" + ws + "] 
can't be assigned to "
+                                                    + topId + ". Its already 
assigned to " + topologySetEntry.getKey() + ".");
+                }
+            }
+            LOG.warn("Adding Worker slot [" + ws + "] that was not reported in 
the supervisor heartbeats," +
+                     " but the worker is already running for topology " + 
topId + ".");
+        }
+        Set<WorkerSlot> usedSlots = _topIdToUsedSlots.get(topId);
+        if (usedSlots == null) {
+            usedSlots = new HashSet<>();
+            _topIdToUsedSlots.put(topId, usedSlots);
+        }
+        usedSlots.add(ws);
+        return false;
     }
-    return total;
-  }
-  
-  public static int countFreeSlotsAlive(Collection<Node> nodes) {
-    int total = 0;
-    for (Node n: nodes) {
-      if (n.isAlive()) {
-        total += n.totalSlotsFree();
-      }
+
+    /**
+     * Free all slots on this node.  This will update the Cluster too.
+     * @param cluster the cluster to be updated
+     */
+    public void freeAllSlots(Cluster cluster) {
+        if (!_isAlive) {
+            LOG.warn("Freeing all slots on a dead node {} ", _nodeId);
+        }
+        for (Entry<String, Set<WorkerSlot>> entry : 
_topIdToUsedSlots.entrySet()) {
+            cluster.freeSlots(entry.getValue());
+            if (_isAlive) {
+                _freeSlots.addAll(entry.getValue());
+            }
+        }
+        _topIdToUsedSlots = new HashMap<>();
     }
-    return total;
-  }
-  
-  public static int countTotalSlotsAlive(Collection<Node> nodes) {
-    int total = 0;
-    for (Node n: nodes) {
-      if (n.isAlive()) {
-        total += n.totalSlots();
-      }
+
+    /**
+     * Frees a single slot in this node
+     * @param ws the slot to free
+     * @param cluster the cluster to update
+     */
+    public void free(WorkerSlot ws, Cluster cluster, boolean forceFree) {
+        if (_freeSlots.contains(ws)) return;
+        boolean wasFound = false;
+        for (Entry<String, Set<WorkerSlot>> entry : 
_topIdToUsedSlots.entrySet()) {
+            Set<WorkerSlot> slots = entry.getValue();
+            if (slots.remove(ws)) {
+                cluster.freeSlot(ws);
+                if (_isAlive) {
+                    _freeSlots.add(ws);
+                }
+                wasFound = true;
+            }
+        }
+        if (!wasFound) {
+            if (forceFree) {
+                LOG.info("Forcefully freeing the " + ws);
+                cluster.freeSlot(ws);
+                _freeSlots.add(ws);
+            } else {
+                throw new IllegalArgumentException("Tried to free a slot that 
was not" +
+                                                   " part of this node " + 
_nodeId);
+            }
+        }
     }
-    return total;
-  }
-  
-  public static Map<String, Node> getAllNodesFrom(Cluster cluster) {
-    Map<String, Node> nodeIdToNode = new HashMap<>();
-    for (SupervisorDetails sup : cluster.getSupervisors().values()) {
-      //Node ID and supervisor ID are the same.
-      String id = sup.getId();
-      boolean isAlive = !cluster.isBlackListed(id);
-      LOG.debug("Found a {} Node {} {}",
-              isAlive? "living":"dead", id, sup.getAllPorts());
-      nodeIdToNode.put(id, new Node(id, sup.getAllPorts(), isAlive));
+
+    /**
+     * Frees all the slots for a topology.
+     * @param topId the topology to free slots for
+     * @param cluster the cluster to update
+     */
+    public void freeTopology(String topId, Cluster cluster) {
+        Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId);
+        if (slots == null || slots.isEmpty()) return;
+        for (WorkerSlot ws : slots) {
+            cluster.freeSlot(ws);
+            if (_isAlive) {
+                _freeSlots.add(ws);
+            }
+        }
+        _topIdToUsedSlots.remove(topId);
     }
-    
-    for (Entry<String, SchedulerAssignment> entry : 
cluster.getAssignments().entrySet()) {
-      String topId = entry.getValue().getTopologyId();
-      for (WorkerSlot ws: entry.getValue().getSlots()) {
-        String id = ws.getNodeId();
-        Node node = nodeIdToNode.get(id);
-        if (node == null) {
-          LOG.debug("Found an assigned slot on a dead supervisor {}", ws);
-          node = new Node(id, null, false);
-          nodeIdToNode.put(id, node);
+
+    /**
+     * Assign a free slot on the node to the following topology and executors.
+     * This will update the cluster too.
+     * @param topId the topology to assign a free slot to.
+     * @param executors the executors to run in that slot.
+     * @param cluster the cluster to be updated
+     */
+    public void assign(String topId, Collection<ExecutorDetails> executors,
+                       Cluster cluster) {
+        if (!_isAlive) {
+            throw new IllegalStateException("Trying to adding to a dead node " 
+ _nodeId);
         }
-        if (!node.isAlive()) {
-          //The supervisor on the node down so add an orphaned slot to hold 
the unsupervised worker 
-          node.addOrphanedSlot(ws);
+        if (_freeSlots.isEmpty()) {
+            throw new IllegalStateException("Trying to assign to a full node " 
+ _nodeId);
         }
-        if (node.assignInternal(ws, topId, true)) {
-          LOG.warn("Bad scheduling state for topology [" + topId+ "], the slot 
" +
-                  ws + " assigned to multiple workers, un-assigning 
everything...");
-          node.free(ws, cluster, true);
+        if (executors.size() == 0) {
+            LOG.warn("Trying to assign nothing from " + topId + " to " + 
_nodeId + " (Ignored)");
+        } else {
+            WorkerSlot slot = _freeSlots.iterator().next();
+            cluster.assign(slot, topId, executors);
+            assignInternal(slot, topId, false);
         }
-      }
     }
-    
-    return nodeIdToNode;
-  }
-  
-  /**
-   * Used to sort a list of nodes so the node with the most free slots comes
-   * first.
-   */
-  public static final Comparator<Node> FREE_NODE_COMPARATOR_DEC = new 
Comparator<Node>() {
+
+    @Override
+    public boolean equals(Object other) {
+        return other instanceof Node && _nodeId.equals(((Node) other)._nodeId);
+    }
+
+    @Override
+    public int hashCode() {
+        return _nodeId.hashCode();
+    }
+
     @Override
-    public int compare(Node o1, Node o2) {
-      return o1.totalSlotsUsed() - o2.totalSlotsUsed();
+    public String toString() {
+        return "Node: " + _nodeId;
     }
-  };
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/NodePool.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/NodePool.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/NodePool.java
index e609028..21ffbf5 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/NodePool.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/NodePool.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.multitenant;
@@ -27,270 +21,268 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.storm.Config;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.SchedulerAssignment;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A pool of nodes that can be used to run topologies.
  */
 public abstract class NodePool {
-  protected Cluster _cluster;
-  protected Map<String, Node> _nodeIdToNode;
-  
-  public static class NodeAndSlotCounts {
-    public final int _nodes;
-    public final int _slots;
-    
-    public NodeAndSlotCounts(int nodes, int slots) {
-      _nodes = nodes;
-      _slots = slots;
+    private static final Logger LOG = LoggerFactory.getLogger(NodePool.class);
+    protected Cluster _cluster;
+    protected Map<String, Node> _nodeIdToNode;
+
+    public static int slotsAvailable(NodePool[] pools) {
+        int slotsAvailable = 0;
+        for (NodePool pool : pools) {
+            slotsAvailable += pool.slotsAvailable();
+        }
+        return slotsAvailable;
     }
-  }
 
-  /**
-   * Place executors into slots in a round robin way, taking into account
-   * component spreading among different hosts.
-   */
-  public static class RoundRobinSlotScheduler {
-    private Map<String,Set<String>> _nodeToComps;
-    private HashMap<String, List<ExecutorDetails>> _spreadToSchedule;
-    private LinkedList<Set<ExecutorDetails>> _slots;
-    private Set<ExecutorDetails> _lastSlot;
-    private Cluster _cluster;
-    private String _topId;
-    
-    /**
-     * Create a new scheduler for a given topology
-     * @param td the topology to schedule
-     * @param slotsToUse the number of slots to use for the executors left to 
-     * schedule.
-     * @param cluster the cluster to schedule this on. 
-     */
-    public RoundRobinSlotScheduler(TopologyDetails td, int slotsToUse, 
-        Cluster cluster) {
-      _topId = td.getId();
-      _cluster = cluster;
-      
-      Map<ExecutorDetails, String> execToComp = td.getExecutorToComponent();
-      SchedulerAssignment assignment = _cluster.getAssignmentById(_topId);
-      _nodeToComps = new HashMap<>();
+    public static int nodesAvailable(NodePool[] pools) {
+        int nodesAvailable = 0;
+        for (NodePool pool : pools) {
+            nodesAvailable += pool.nodesAvailable();
+        }
+        return nodesAvailable;
+    }
 
-      if (assignment != null) {
-        Map<ExecutorDetails, WorkerSlot> execToSlot = 
assignment.getExecutorToSlot();
-        
-        for (Entry<ExecutorDetails, WorkerSlot> entry: execToSlot.entrySet()) {
-          String nodeId = entry.getValue().getNodeId();
-          Set<String> comps = _nodeToComps.get(nodeId);
-          if (comps == null) {
-            comps = new HashSet<>();
-            _nodeToComps.put(nodeId, comps);
-          }
-          comps.add(execToComp.get(entry.getKey()));
+    public static Collection<Node> takeNodesBySlot(int slotsNeeded, NodePool[] 
pools) {
+        LOG.debug("Trying to grab {} free slots from {}", slotsNeeded, pools);
+        HashSet<Node> ret = new HashSet<>();
+        for (NodePool pool : pools) {
+            Collection<Node> got = pool.takeNodesBySlots(slotsNeeded);
+            ret.addAll(got);
+            slotsNeeded -= Node.countFreeSlotsAlive(got);
+            LOG.debug("Got {} nodes so far need {} more slots", ret.size(), 
slotsNeeded);
+            if (slotsNeeded <= 0) {
+                break;
+            }
         }
-      }
-      
-      _spreadToSchedule = new HashMap<>();
-      List<String> spreadComps = 
(List<String>)td.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS);
-      if (spreadComps != null) {
-        for (String comp: spreadComps) {
-          _spreadToSchedule.put(comp, new ArrayList<ExecutorDetails>());
+        return ret;
+    }
+
+    public static Collection<Node> takeNodes(int nodesNeeded, NodePool[] 
pools) {
+        LOG.debug("Trying to grab {} free nodes from {}", nodesNeeded, pools);
+        HashSet<Node> ret = new HashSet<>();
+        for (NodePool pool : pools) {
+            Collection<Node> got = pool.takeNodes(nodesNeeded);
+            ret.addAll(got);
+            nodesNeeded -= got.size();
+            LOG.debug("Got {} nodes so far need {} more nodes", ret.size(), 
nodesNeeded);
+            if (nodesNeeded <= 0) {
+                break;
+            }
         }
-      }
-      
-      _slots = new LinkedList<>();
-      for (int i = 0; i < slotsToUse; i++) {
-        _slots.add(new HashSet<ExecutorDetails>());
-      }
+        return ret;
+    }
 
-      int at = 0;
-      for (Entry<String, List<ExecutorDetails>> entry: 
_cluster.getNeedsSchedulingComponentToExecutors(td).entrySet()) {
-        LOG.debug("Scheduling for {}", entry.getKey());
-        if (_spreadToSchedule.containsKey(entry.getKey())) {
-          LOG.debug("Saving {} for spread...",entry.getKey());
-          _spreadToSchedule.get(entry.getKey()).addAll(entry.getValue());
-        } else {
-          for (ExecutorDetails ed: entry.getValue()) {
-            LOG.debug("Assigning {} {} to slot {}", entry.getKey(), ed, at);
-            _slots.get(at).add(ed);
-            at++;
-            if (at >= _slots.size()) {
-              at = 0;
+    public static int getNodeCountIfSlotsWereTaken(int slots, NodePool[] 
pools) {
+        LOG.debug("How many nodes to get {} slots from {}", slots, pools);
+        int total = 0;
+        for (NodePool pool : pools) {
+            NodeAndSlotCounts ns = 
pool.getNodeAndSlotCountIfSlotsWereTaken(slots);
+            total += ns._nodes;
+            slots -= ns._slots;
+            LOG.debug("Found {} nodes so far {} more slots needed", total, 
slots);
+            if (slots <= 0) {
+                break;
             }
-          }
         }
-      }
-      _lastSlot = _slots.get(_slots.size() - 1);
+        return total;
     }
-    
+
     /**
-     * Assign a slot to the given node.
-     * @param n the node to assign a slot to.
-     * @return true if there are more slots to assign else false.
+     * Initialize the pool.
+     * @param cluster the cluster
+     * @param nodeIdToNode the mapping of node id to nodes
      */
-    public boolean assignSlotTo(Node n) {
-      if (_slots.isEmpty()) {
-        return false;
-      }
-      Set<ExecutorDetails> slot = _slots.pop();
-      if (slot == _lastSlot) {
-        //The last slot fill it up
-        for (Entry<String, List<ExecutorDetails>> entry: 
_spreadToSchedule.entrySet()) {
-          if (entry.getValue().size() > 0) {
-            slot.addAll(entry.getValue());
-          }
-        }
-      } else {
-        String nodeId = n.getId();
-        Set<String> nodeComps = _nodeToComps.get(nodeId);
-        if (nodeComps == null) {
-          nodeComps = new HashSet<>();
-          _nodeToComps.put(nodeId, nodeComps);
+    public void init(Cluster cluster, Map<String, Node> nodeIdToNode) {
+        _cluster = cluster;
+        _nodeIdToNode = nodeIdToNode;
+    }
+
+    /**
+     * Add a topology to the pool
+     * @param td the topology to add.
+     */
+    public abstract void addTopology(TopologyDetails td);
+
+    /**
+     * Check if this topology can be added to this pool
+     * @param td the topology
+     * @return true if it can else false
+     */
+    public abstract boolean canAdd(TopologyDetails td);
+
+    /**
+     * @return the number of nodes that are available to be taken
+     */
+    public abstract int slotsAvailable();
+
+    /**
+     * Take nodes from this pool that can fulfill possibly up to the
+     * slotsNeeded
+     * @param slotsNeeded the number of slots that are needed.
+     * @return a Collection of nodes with the removed nodes in it.
+     * This may be empty, but should not be null.
+     */
+    public abstract Collection<Node> takeNodesBySlots(int slotsNeeded);
+
+    /**
+     * Get the number of nodes and slots this would provide to get the slots 
needed
+     * @param slots the number of slots needed
+     * @return the number of nodes and slots that would be returned.
+     */
+    public abstract NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int 
slots);
+
+    /**
+     * @return the number of nodes that are available to be taken
+     */
+    public abstract int nodesAvailable();
+
+    /**
+     * Take up to nodesNeeded from this pool
+     * @param nodesNeeded the number of nodes that are needed.
+     * @return a Collection of nodes with the removed nodes in it.
+     * This may be empty, but should not be null.
+     */
+    public abstract Collection<Node> takeNodes(int nodesNeeded);
+
+    /**
+     * Reschedule any topologies as needed.
+     * @param lesserPools pools that may be used to steal nodes from.
+     */
+    public abstract void scheduleAsNeeded(NodePool... lesserPools);
+
+    public static class NodeAndSlotCounts {
+        public final int _nodes;
+        public final int _slots;
+
+        public NodeAndSlotCounts(int nodes, int slots) {
+            _nodes = nodes;
+            _slots = slots;
         }
-        for (Entry<String, List<ExecutorDetails>> entry: 
_spreadToSchedule.entrySet()) {
-          if (entry.getValue().size() > 0) {
-            String comp = entry.getKey();
-            if (!nodeComps.contains(comp)) {
-              nodeComps.add(comp);
-              slot.add(entry.getValue().remove(0));
+    }
+
+    /**
+     * Place executors into slots in a round robin way, taking into account
+     * component spreading among different hosts.
+     */
+    public static class RoundRobinSlotScheduler {
+        private Map<String, Set<String>> _nodeToComps;
+        private HashMap<String, List<ExecutorDetails>> _spreadToSchedule;
+        private LinkedList<Set<ExecutorDetails>> _slots;
+        private Set<ExecutorDetails> _lastSlot;
+        private Cluster _cluster;
+        private String _topId;
+
+        /**
+         * Create a new scheduler for a given topology
+         * @param td the topology to schedule
+         * @param slotsToUse the number of slots to use for the executors left 
to
+         * schedule.
+         * @param cluster the cluster to schedule this on.
+         */
+        public RoundRobinSlotScheduler(TopologyDetails td, int slotsToUse,
+                                       Cluster cluster) {
+            _topId = td.getId();
+            _cluster = cluster;
+
+            Map<ExecutorDetails, String> execToComp = 
td.getExecutorToComponent();
+            SchedulerAssignment assignment = 
_cluster.getAssignmentById(_topId);
+            _nodeToComps = new HashMap<>();
+
+            if (assignment != null) {
+                Map<ExecutorDetails, WorkerSlot> execToSlot = 
assignment.getExecutorToSlot();
+
+                for (Entry<ExecutorDetails, WorkerSlot> entry : 
execToSlot.entrySet()) {
+                    String nodeId = entry.getValue().getNodeId();
+                    Set<String> comps = _nodeToComps.get(nodeId);
+                    if (comps == null) {
+                        comps = new HashSet<>();
+                        _nodeToComps.put(nodeId, comps);
+                    }
+                    comps.add(execToComp.get(entry.getKey()));
+                }
+            }
+
+            _spreadToSchedule = new HashMap<>();
+            List<String> spreadComps = (List<String>) 
td.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS);
+            if (spreadComps != null) {
+                for (String comp : spreadComps) {
+                    _spreadToSchedule.put(comp, new 
ArrayList<ExecutorDetails>());
+                }
+            }
+
+            _slots = new LinkedList<>();
+            for (int i = 0; i < slotsToUse; i++) {
+                _slots.add(new HashSet<ExecutorDetails>());
+            }
+
+            int at = 0;
+            for (Entry<String, List<ExecutorDetails>> entry : 
_cluster.getNeedsSchedulingComponentToExecutors(td).entrySet()) {
+                LOG.debug("Scheduling for {}", entry.getKey());
+                if (_spreadToSchedule.containsKey(entry.getKey())) {
+                    LOG.debug("Saving {} for spread...", entry.getKey());
+                    
_spreadToSchedule.get(entry.getKey()).addAll(entry.getValue());
+                } else {
+                    for (ExecutorDetails ed : entry.getValue()) {
+                        LOG.debug("Assigning {} {} to slot {}", 
entry.getKey(), ed, at);
+                        _slots.get(at).add(ed);
+                        at++;
+                        if (at >= _slots.size()) {
+                            at = 0;
+                        }
+                    }
+                }
             }
-          }
+            _lastSlot = _slots.get(_slots.size() - 1);
         }
-      }
-      n.assign(_topId, slot, _cluster);
-      return !_slots.isEmpty();
-    }
-  }
-  
-  private static final Logger LOG = LoggerFactory.getLogger(NodePool.class);
-  /**
-   * Initialize the pool.
-   * @param cluster the cluster
-   * @param nodeIdToNode the mapping of node id to nodes
-   */
-  public void init(Cluster cluster, Map<String, Node> nodeIdToNode) {
-    _cluster = cluster;
-    _nodeIdToNode = nodeIdToNode;
-  }
-  
-  /**
-   * Add a topology to the pool
-   * @param td the topology to add.
-   */
-  public abstract void addTopology(TopologyDetails td);
-  
-  /**
-   * Check if this topology can be added to this pool
-   * @param td the topology
-   * @return true if it can else false
-   */
-  public abstract boolean canAdd(TopologyDetails td);
-  
-  /**
-   * @return the number of nodes that are available to be taken
-   */
-  public abstract int slotsAvailable();
-  
-  /**
-   * Take nodes from this pool that can fulfill possibly up to the
-   * slotsNeeded
-   * @param slotsNeeded the number of slots that are needed.
-   * @return a Collection of nodes with the removed nodes in it.  
-   * This may be empty, but should not be null.
-   */
-  public abstract Collection<Node> takeNodesBySlots(int slotsNeeded);
 
-  /**
-   * Get the number of nodes and slots this would provide to get the slots 
needed
-   * @param slots the number of slots needed
-   * @return the number of nodes and slots that would be returned.
-   */
-  public abstract NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int 
slots);
-  
-  /**
-   * @return the number of nodes that are available to be taken
-   */
-  public abstract int nodesAvailable();
-  
-  /**
-   * Take up to nodesNeeded from this pool
-   * @param nodesNeeded the number of nodes that are needed.
-   * @return a Collection of nodes with the removed nodes in it.  
-   * This may be empty, but should not be null.
-   */
-  public abstract Collection<Node> takeNodes(int nodesNeeded);
-  
-  /**
-   * Reschedule any topologies as needed.
-   * @param lesserPools pools that may be used to steal nodes from.
-   */
-  public abstract void scheduleAsNeeded(NodePool ... lesserPools);
-  
-  public static int slotsAvailable(NodePool[] pools) {
-    int slotsAvailable = 0;
-    for (NodePool pool: pools) {
-      slotsAvailable += pool.slotsAvailable();
-    }
-    return slotsAvailable;
-  }
-  
-  public static int nodesAvailable(NodePool[] pools) {
-    int nodesAvailable = 0;
-    for (NodePool pool: pools) {
-      nodesAvailable += pool.nodesAvailable();
-    }
-    return nodesAvailable;
-  }
-  
-  public static Collection<Node> takeNodesBySlot(int slotsNeeded,NodePool[] 
pools) {
-    LOG.debug("Trying to grab {} free slots from {}",slotsNeeded, pools);
-    HashSet<Node> ret = new HashSet<>();
-    for (NodePool pool: pools) {
-      Collection<Node> got = pool.takeNodesBySlots(slotsNeeded);
-      ret.addAll(got);
-      slotsNeeded -= Node.countFreeSlotsAlive(got);
-      LOG.debug("Got {} nodes so far need {} more 
slots",ret.size(),slotsNeeded);
-      if (slotsNeeded <= 0) {
-        break;
-      }
-    }
-    return ret;
-  }
-  
-  public static Collection<Node> takeNodes(int nodesNeeded,NodePool[] pools) {
-    LOG.debug("Trying to grab {} free nodes from {}",nodesNeeded, pools);
-    HashSet<Node> ret = new HashSet<>();
-    for (NodePool pool: pools) {
-      Collection<Node> got = pool.takeNodes(nodesNeeded);
-      ret.addAll(got);
-      nodesNeeded -= got.size();
-      LOG.debug("Got {} nodes so far need {} more nodes", ret.size(), 
nodesNeeded);
-      if (nodesNeeded <= 0) {
-        break;
-      }
+        /**
+         * Assign a slot to the given node.
+         * @param n the node to assign a slot to.
+         * @return true if there are more slots to assign else false.
+         */
+        public boolean assignSlotTo(Node n) {
+            if (_slots.isEmpty()) {
+                return false;
+            }
+            Set<ExecutorDetails> slot = _slots.pop();
+            if (slot == _lastSlot) {
+                //The last slot fill it up
+                for (Entry<String, List<ExecutorDetails>> entry : 
_spreadToSchedule.entrySet()) {
+                    if (entry.getValue().size() > 0) {
+                        slot.addAll(entry.getValue());
+                    }
+                }
+            } else {
+                String nodeId = n.getId();
+                Set<String> nodeComps = _nodeToComps.get(nodeId);
+                if (nodeComps == null) {
+                    nodeComps = new HashSet<>();
+                    _nodeToComps.put(nodeId, nodeComps);
+                }
+                for (Entry<String, List<ExecutorDetails>> entry : 
_spreadToSchedule.entrySet()) {
+                    if (entry.getValue().size() > 0) {
+                        String comp = entry.getKey();
+                        if (!nodeComps.contains(comp)) {
+                            nodeComps.add(comp);
+                            slot.add(entry.getValue().remove(0));
+                        }
+                    }
+                }
+            }
+            n.assign(_topId, slot, _cluster);
+            return !_slots.isEmpty();
+        }
     }
-    return ret;
-  }
-
-  public static int getNodeCountIfSlotsWereTaken(int slots,NodePool[] pools) {
-    LOG.debug("How many nodes to get {} slots from {}",slots, pools);
-    int total = 0;
-    for (NodePool pool: pools) {
-      NodeAndSlotCounts ns = pool.getNodeAndSlotCountIfSlotsWereTaken(slots);
-      total += ns._nodes;
-      slots -= ns._slots;
-      LOG.debug("Found {} nodes so far {} more slots needed", total, slots);
-      if (slots <= 0) {
-        break;
-      }
-    }    
-    return total;
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
index 84a95a7..4f43c0a 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -34,24 +34,23 @@ import 
org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** Represents a single node in the cluster. */
+/**
+ * Represents a single node in the cluster.
+ */
 public class RAS_Node {
     private static final Logger LOG = LoggerFactory.getLogger(RAS_Node.class);
-
+    private final String nodeId;
+    private final Cluster cluster;
+    private final Set<WorkerSlot> originallyFreeSlots;
     //A map consisting of all workers on the node.
     //The key of the map is the worker id and the value is the corresponding 
workerslot object
     private Map<String, WorkerSlot> slots = new HashMap<>();
-
     // A map describing which topologies are using which slots on this node.  
The format of the map is the following:
     // {TopologyId -> {WorkerId -> {Executors}}}
     private Map<String, Map<String, Collection<ExecutorDetails>>> 
topIdToUsedSlots = new HashMap<>();
-
-    private final String nodeId;
     private String hostname;
     private boolean isAlive;
     private SupervisorDetails sup;
-    private final Cluster cluster;
-    private final Set<WorkerSlot> originallyFreeSlots;
 
     public RAS_Node(
         String nodeId,
@@ -99,6 +98,26 @@ public class RAS_Node {
         }
     }
 
+    public static int countFreeSlotsAlive(Collection<RAS_Node> nodes) {
+        int total = 0;
+        for (RAS_Node n : nodes) {
+            if (n.isAlive()) {
+                total += n.totalSlotsFree();
+            }
+        }
+        return total;
+    }
+
+    public static int countTotalSlotsAlive(Collection<RAS_Node> nodes) {
+        int total = 0;
+        for (RAS_Node n : nodes) {
+            if (n.isAlive()) {
+                total += n.totalSlots();
+            }
+        }
+        return total;
+    }
+
     public String getId() {
         return nodeId;
     }
@@ -167,7 +186,9 @@ public class RAS_Node {
         return isAlive;
     }
 
-    /** Get a collection of the topology ids currently running on this node. */
+    /**
+     * Get a collection of the topology ids currently running on this node.
+     */
     public Collection<String> getRunningTopologies() {
         return topIdToUsedSlots.keySet();
     }
@@ -192,7 +213,9 @@ public class RAS_Node {
         return slots.size();
     }
 
-    /** Free all slots on this node. This will update the Cluster too. */
+    /**
+     * Free all slots on this node. This will update the Cluster too.
+     */
     public void freeAllSlots() {
         if (!isAlive) {
             LOG.warn("Freeing all slots on a dead node {} ", nodeId);
@@ -279,8 +302,8 @@ public class RAS_Node {
     /**
      * Assigns a worker to a node.
      *
-     * @param target the worker slot to assign the executors
-     * @param td the topology the executors are from
+     * @param target    the worker slot to assign the executors
+     * @param td        the topology the executors are from
      * @param executors executors to assign to the specified worker slot
      */
     public void assign(WorkerSlot target, TopologyDetails td, 
Collection<ExecutorDetails> executors) {
@@ -343,9 +366,9 @@ public class RAS_Node {
     /**
      * Would scheduling exec in ws fit with the current resource constraints.
      *
-     * @param ws the slot to possibly put exec in
+     * @param ws   the slot to possibly put exec in
      * @param exec the executor to possibly place in ws
-     * @param td the topology exec is a part of
+     * @param td   the topology exec is a part of
      * @return true if it would fit else false
      */
     public boolean wouldFit(WorkerSlot ws, ExecutorDetails exec, 
TopologyDetails td) {
@@ -353,13 +376,13 @@ public class RAS_Node {
             throw new IllegalStateException("Slot " + ws + " is not a part of 
this node " + nodeId);
         }
         return isAlive
-            && cluster.wouldFit(
+               && cluster.wouldFit(
             ws,
             exec,
             td,
             getTotalAvailableResources(),
             td.getTopologyWorkerMaxHeapSize()
-            );
+        );
     }
 
     @Override
@@ -378,40 +401,20 @@ public class RAS_Node {
     @Override
     public String toString() {
         return "{Node: "
-            + ((sup == null) ? "null (possibly down)" : sup.getHost())
-            + ", Avail [ Mem: "
-            + getAvailableMemoryResources()
-            + ", CPU: "
-            + getAvailableCpuResources()
-            + ", Slots: "
-            + this.getFreeSlots()
-            + "] Total [ Mem: "
-            + ((sup == null) ? "N/A" : this.getTotalMemoryResources())
-            + ", CPU: "
-            + ((sup == null) ? "N/A" : this.getTotalCpuResources())
-            + ", Slots: "
-            + this.slots.values()
-            + " ]}";
-    }
-
-    public static int countFreeSlotsAlive(Collection<RAS_Node> nodes) {
-        int total = 0;
-        for (RAS_Node n : nodes) {
-            if (n.isAlive()) {
-                total += n.totalSlotsFree();
-            }
-        }
-        return total;
-    }
-
-    public static int countTotalSlotsAlive(Collection<RAS_Node> nodes) {
-        int total = 0;
-        for (RAS_Node n : nodes) {
-            if (n.isAlive()) {
-                total += n.totalSlots();
-            }
-        }
-        return total;
+               + ((sup == null) ? "null (possibly down)" : sup.getHost())
+               + ", Avail [ Mem: "
+               + getAvailableMemoryResources()
+               + ", CPU: "
+               + getAvailableCpuResources()
+               + ", Slots: "
+               + this.getFreeSlots()
+               + "] Total [ Mem: "
+               + ((sup == null) ? "N/A" : this.getTotalMemoryResources())
+               + ", CPU: "
+               + ((sup == null) ? "N/A" : this.getTotalCpuResources())
+               + ", Slots: "
+               + this.slots.values()
+               + " ]}";
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
index 2bd2b86..584934b 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Nodes.java
@@ -32,9 +32,8 @@ import org.slf4j.LoggerFactory;
 
 public class RAS_Nodes {
 
-    private Map<String, RAS_Node> nodeMap;
-
     private static final Logger LOG = LoggerFactory.getLogger(RAS_Nodes.class);
+    private Map<String, RAS_Node> nodeMap;
 
     public RAS_Nodes(Cluster cluster) {
         this.nodeMap = getAllNodesFrom(cluster);
@@ -116,13 +115,16 @@ public class RAS_Nodes {
         return nodeIdToNode;
     }
 
-    /** get node object from nodeId. */
+    /**
+     * get node object from nodeId.
+     */
     public RAS_Node getNodeById(String nodeId) {
         return this.nodeMap.get(nodeId);
     }
 
     /**
      * Free everything on the given slots.
+     *
      * @param workerSlots the slots to free
      */
     public void freeSlots(Collection<WorkerSlot> workerSlots) {

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index 541e1a5..60f6f6c 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.resource;
@@ -52,11 +46,35 @@ public class ResourceAwareScheduler implements IScheduler {
     private IConfigLoader configLoader;
     private int maxSchedulingAttempts;
 
+    private static void markFailedTopology(User u, Cluster c, TopologyDetails 
td, String message) {
+        markFailedTopology(u, c, td, message, null);
+    }
+
+    private static void markFailedTopology(User u, Cluster c, TopologyDetails 
td, String message, Throwable t) {
+        c.setStatus(td, message);
+        String realMessage = td.getId() + " " + message;
+        if (t != null) {
+            LOG.error(realMessage, t);
+        } else {
+            LOG.error(realMessage);
+        }
+        u.markTopoUnsuccess(td);
+    }
+
+    private static double getCpuUsed(SchedulerAssignment assignment) {
+        return 
assignment.getScheduledResources().values().stream().mapToDouble((wr) -> 
wr.get_cpu()).sum();
+    }
+
+    private static double getMemoryUsed(SchedulerAssignment assignment) {
+        return assignment.getScheduledResources().values().stream()
+                         .mapToDouble((wr) -> wr.get_mem_on_heap() + 
wr.get_mem_off_heap()).sum();
+    }
+
     @Override
     public void prepare(Map<String, Object> conf) {
         this.conf = conf;
         schedulingPriorityStrategy = ReflectionUtils.newInstance(
-                (String) 
conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY));
+            (String) 
conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY));
         configLoader = ConfigLoaderFactoryService.createConfigLoader(conf);
         maxSchedulingAttempts = ObjectReader.getInt(
             
conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_TOPOLOGY_SCHEDULING_ATTEMPTS),
 5);
@@ -85,21 +103,6 @@ public class ResourceAwareScheduler implements IScheduler {
         }
     }
 
-    private static void markFailedTopology(User u, Cluster c, TopologyDetails 
td, String message) {
-        markFailedTopology(u, c, td, message, null);
-    }
-
-    private static void markFailedTopology(User u, Cluster c, TopologyDetails 
td, String message, Throwable t) {
-        c.setStatus(td, message);
-        String realMessage = td.getId() + " " + message;
-        if (t != null) {
-            LOG.error(realMessage, t);
-        } else {
-            LOG.error(realMessage);
-        }
-        u.markTopoUnsuccess(td);
-    }
-
     private void scheduleTopology(TopologyDetails td, Cluster cluster, final 
User topologySubmitter,
                                   List<TopologyDetails> orderedTopologies) {
         //A copy of cluster that we can modify, but does not get committed 
back to cluster unless scheduling succeeds
@@ -119,17 +122,17 @@ public class ResourceAwareScheduler implements IScheduler 
{
             rasStrategy.prepare(conf);
         } catch (DisallowedStrategyException e) {
             markFailedTopology(topologySubmitter, cluster, td,
-                "Unsuccessful in scheduling - " + e.getAttemptedClass()
-                    + " is not an allowed strategy. Please make sure your "
-                    + Config.TOPOLOGY_SCHEDULER_STRATEGY
-                    + " config is one of the allowed strategies: "
-                    + e.getAllowedStrategies(), e);
+                               "Unsuccessful in scheduling - " + 
e.getAttemptedClass()
+                               + " is not an allowed strategy. Please make 
sure your "
+                               + Config.TOPOLOGY_SCHEDULER_STRATEGY
+                               + " config is one of the allowed strategies: "
+                               + e.getAllowedStrategies(), e);
             return;
         } catch (RuntimeException e) {
             markFailedTopology(topologySubmitter, cluster, td,
-                "Unsuccessful in scheduling - failed to create instance of 
topology strategy "
-                    + strategyConf
-                    + ". Please check logs for details", e);
+                               "Unsuccessful in scheduling - failed to create 
instance of topology strategy "
+                               + strategyConf
+                               + ". Please check logs for details", e);
             return;
         }
 
@@ -167,7 +170,7 @@ public class ResourceAwareScheduler implements IScheduler {
                                 Collection<WorkerSlot> workersToEvict = 
workingState.getUsedSlotsByTopologyId(topologyEvict.getId());
 
                                 LOG.debug("Evicting Topology {} with workers: 
{} from user {}", topologyEvict.getName(), workersToEvict,
-                                    topologyEvict.getTopologySubmitter());
+                                          
topologyEvict.getTopologySubmitter());
                                 cpuNeeded -= getCpuUsed(evictAssignemnt);
                                 memoryNeeded -= getMemoryUsed(evictAssignemnt);
                                 evictedSomething = true;
@@ -205,22 +208,13 @@ public class ResourceAwareScheduler implements IScheduler 
{
                 }
             } catch (Exception ex) {
                 markFailedTopology(topologySubmitter, cluster, td,
-                    "Internal Error - Exception thrown when scheduling. Please 
check logs for details", ex);
+                                   "Internal Error - Exception thrown when 
scheduling. Please check logs for details", ex);
                 return;
             }
         }
         markFailedTopology(topologySubmitter, cluster, td, "Failed to schedule 
within " + maxSchedulingAttempts + " attempts");
     }
 
-    private static double getCpuUsed(SchedulerAssignment assignment) {
-        return 
assignment.getScheduledResources().values().stream().mapToDouble((wr) -> 
wr.get_cpu()).sum();
-    }
-
-    private static double getMemoryUsed(SchedulerAssignment assignment) {
-        return assignment.getScheduledResources().values().stream()
-            .mapToDouble((wr) -> wr.get_mem_on_heap() + 
wr.get_mem_off_heap()).sum();
-    }
-
     /**
      * Get User wrappers around cluster.
      *
@@ -288,7 +282,7 @@ public class ResourceAwareScheduler implements IScheduler {
             return convertToDouble(raw);
         } else {
             LOG.warn("Reading from user-resource-pools.yaml returned null. 
This could because the file is not available. "
-                    + "Will load configs from storm configuration");
+                     + "Will load configs from storm configuration");
         }
 
         // if no configs from user-resource-pools.yaml, get configs from conf

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
index 2f9ab4c..e0a8114 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceUtils.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.resource;
@@ -37,7 +31,7 @@ public class ResourceUtils {
     private static final Logger LOG = 
LoggerFactory.getLogger(ResourceUtils.class);
 
     public static NormalizedResourceRequest getBoltResources(StormTopology 
topology, Map<String, Object> topologyConf,
-                                                              String 
componentId) {
+                                                             String 
componentId) {
         if (topology.get_bolts() != null) {
             Bolt bolt = topology.get_bolts().get(componentId);
             return new NormalizedResourceRequest(bolt.get_common(), 
topologyConf);
@@ -46,7 +40,7 @@ public class ResourceUtils {
     }
 
     public static Map<String, NormalizedResourceRequest> 
getBoltsResources(StormTopology topology,
-                                                                     
Map<String, Object> topologyConf) {
+                                                                           
Map<String, Object> topologyConf) {
         Map<String, NormalizedResourceRequest> boltResources = new HashMap<>();
         if (topology.get_bolts() != null) {
             for (Map.Entry<String, Bolt> bolt : 
topology.get_bolts().entrySet()) {
@@ -70,7 +64,7 @@ public class ResourceUtils {
     }
 
     public static Map<String, NormalizedResourceRequest> 
getSpoutsResources(StormTopology topology,
-                                                                      
Map<String, Object> topologyConf) {
+                                                                            
Map<String, Object> topologyConf) {
         Map<String, NormalizedResourceRequest> spoutResources = new 
HashMap<>();
         if (topology.get_spouts() != null) {
             for (Map.Entry<String, SpoutSpec> spout : 
topology.get_spouts().entrySet()) {
@@ -128,7 +122,7 @@ public class ResourceUtils {
     }
 
     public static String getCorrespondingLegacyResourceName(String 
normalizedResourceName) {
-        for(Map.Entry<String, String> entry : 
NormalizedResources.RESOURCE_NAME_NORMALIZER.getResourceNameMapping().entrySet())
 {
+        for (Map.Entry<String, String> entry : 
NormalizedResources.RESOURCE_NAME_NORMALIZER.getResourceNameMapping().entrySet())
 {
             if (entry.getValue().equals(normalizedResourceName)) {
                 return entry.getKey();
             }
@@ -143,9 +137,9 @@ public class ResourceUtils {
             JSONObject jsonObject = (JSONObject) obj;
 
             Map<String, Double> componentResourceMap =
-                    (Map<String, Double>) jsonObject.getOrDefault(
-                            Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new 
HashMap<String, Double>()
-                    );
+                (Map<String, Double>) jsonObject.getOrDefault(
+                    Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new 
HashMap<String, Double>()
+                );
 
             for (Map.Entry<String, Double> resourceUpdateEntry : 
resourceUpdates.entrySet()) {
                 if 
(NormalizedResources.RESOURCE_NAME_NORMALIZER.getResourceNameMapping().containsValue(resourceUpdateEntry.getKey()))
 {
@@ -159,7 +153,7 @@ public class ResourceUtils {
 
             return jsonObject.toJSONString();
         } catch (ParseException ex) {
-            throw new RuntimeException("Failed to parse component resources 
with json: " +  jsonConf);
+            throw new RuntimeException("Failed to parse component resources 
with json: " + jsonConf);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingResult.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingResult.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingResult.java
index 72f083d..882c6be 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingResult.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingResult.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.resource;
@@ -66,7 +60,7 @@ public class SchedulingResult {
     public String getErrorMessage() {
         return this.errorMessage;
     }
-    
+
     public boolean isSuccess() {
         return SchedulingStatus.isStatusSuccess(this.status);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingStatus.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingStatus.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingStatus.java
index 703f4b8..47c0541 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingStatus.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingStatus.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.resource;
@@ -28,7 +22,7 @@ public enum SchedulingStatus {
 
     public static EnumSet<SchedulingStatus> success = EnumSet.of(SUCCESS);
     public static EnumSet<SchedulingStatus> failure = 
EnumSet.of(FAIL_INVALID_TOPOLOGY, FAIL_NOT_ENOUGH_RESOURCES,
-        FAIL_OTHER);
+                                                                 FAIL_OTHER);
 
     public static boolean isStatusSuccess(SchedulingStatus status) {
         return success.contains(status);

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java
index 9083510..ee79d59 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java
@@ -18,13 +18,11 @@
 
 package org.apache.storm.scheduler.resource;
 
-import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
-
 import org.apache.storm.daemon.nimbus.TopologyResources;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.ISchedulingState;
@@ -32,13 +30,11 @@ import org.apache.storm.scheduler.SchedulerAssignment;
 import org.apache.storm.scheduler.TopologyDetails;
 
 public class User {
-    private String userId;
-
     //Topologies that were deemed to be invalid
     private final Set<TopologyDetails> unsuccess = new HashSet<>();
-
     private final double cpuGuarantee;
     private final double memoryGuarantee;
+    private String userId;
 
     public User(String userId) {
         this(userId, 0, 0);
@@ -206,8 +202,8 @@ public class User {
     }
 
     /**
-     * Comparator that sorts topologies by priority and then by submission 
time First sort by Topology
-     * Priority, if there is a tie for topology priority, topology uptime is 
used to sort.
+     * Comparator that sorts topologies by priority and then by submission 
time First sort by Topology Priority, if there is a tie for
+     * topology priority, topology uptime is used to sort.
      */
     static class PQsortByPriorityAndSubmittionTime implements 
Comparator<TopologyDetails> {
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
index 417dca9..0c073a8 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
@@ -74,12 +74,13 @@ public class NormalizedResourceOffer implements 
NormalizedResourcesWithMemory {
         if (totalMemoryMb < 0.0) {
             normalizedResources.throwBecauseResourceBecameNegative(
                 Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, totalMemoryMb, 
other.getTotalMemoryMb());
-        };
+        }
+        ;
     }
 
     /**
      * @see 
NormalizedResources#calculateAveragePercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources,
-     * double, double).
+     *     double, double).
      */
     public double calculateAveragePercentageUsedBy(NormalizedResourceOffer 
used) {
         return normalizedResources.calculateAveragePercentageUsedBy(
@@ -88,7 +89,7 @@ public class NormalizedResourceOffer implements 
NormalizedResourcesWithMemory {
 
     /**
      * @see 
NormalizedResources#calculateMinPercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources,
 double,
-     * double)
+     *     double)
      */
     public double calculateMinPercentageUsedBy(NormalizedResourceOffer used) {
         return 
normalizedResources.calculateMinPercentageUsedBy(used.getNormalizedResources(), 
getTotalMemoryMb(), used.getTotalMemoryMb());
@@ -96,7 +97,7 @@ public class NormalizedResourceOffer implements 
NormalizedResourcesWithMemory {
 
     /**
      * @see 
NormalizedResources#couldHoldIgnoringSharedMemory(org.apache.storm.scheduler.resource.normalization.NormalizedResources,
 double,
-     * double).
+     *     double).
      */
     public boolean couldHoldIgnoringSharedMemory(NormalizedResourcesWithMemory 
other) {
         return normalizedResources.couldHoldIgnoringSharedMemory(

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
index 6627bb5..dc380e6 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
@@ -37,6 +37,28 @@ import org.slf4j.LoggerFactory;
 public class NormalizedResourceRequest implements 
NormalizedResourcesWithMemory {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(NormalizedResourceRequest.class);
+    private final NormalizedResources normalizedResources;
+    private double onHeap;
+    private double offHeap;
+
+    private NormalizedResourceRequest(Map<String, ? extends Number> resources,
+                                      Map<String, Double> defaultResources) {
+        Map<String, Double> normalizedResourceMap = 
NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(defaultResources);
+        
normalizedResourceMap.putAll(NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources));
+        onHeap = 
normalizedResourceMap.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME,
 0.0);
+        offHeap = 
normalizedResourceMap.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME,
 0.0);
+        normalizedResources = new NormalizedResources(normalizedResourceMap);
+    }
+    public NormalizedResourceRequest(ComponentCommon component, Map<String, 
Object> topoConf) {
+        this(parseResources(component.get_json_conf()), 
getDefaultResources(topoConf));
+    }
+    public NormalizedResourceRequest(Map<String, Object> topoConf) {
+        this((Map<String, ? extends Number>) null, 
getDefaultResources(topoConf));
+    }
+
+    public NormalizedResourceRequest() {
+        this((Map<String, ? extends Number>) null, null);
+    }
 
     private static void putIfMissing(Map<String, Double> dest, String destKey, 
Map<String, Object> src, String srcKey) {
         if (!dest.containsKey(destKey)) {
@@ -48,8 +70,9 @@ public class NormalizedResourceRequest implements 
NormalizedResourcesWithMemory
     }
 
     private static Map<String, Double> getDefaultResources(Map<String, Object> 
topoConf) {
-        Map<String, Double> ret = 
NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap((Map<String, 
Number>) topoConf.getOrDefault(
-            Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>()));
+        Map<String, Double> ret =
+            
NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap((Map<String, 
Number>) topoConf.getOrDefault(
+                Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, new HashMap<>()));
         putIfMissing(ret, Constants.COMMON_CPU_RESOURCE_NAME, topoConf, 
Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
         putIfMissing(ret, Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 
topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
         putIfMissing(ret, Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 
topoConf, Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
@@ -78,7 +101,7 @@ public class NormalizedResourceRequest implements 
NormalizedResourcesWithMemory
                 }
                 if 
(jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
                     Double topoCpu = 
ObjectReader.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT),
-                        null);
+                                                            null);
                     
topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu);
                 }
 
@@ -102,31 +125,6 @@ public class NormalizedResourceRequest implements 
NormalizedResourcesWithMemory
         return topologyResources;
     }
 
-    private final NormalizedResources normalizedResources;
-    private double onHeap;
-    private double offHeap;
-
-    private NormalizedResourceRequest(Map<String, ? extends Number> resources,
-        Map<String, Double> defaultResources) {
-        Map<String, Double> normalizedResourceMap = 
NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(defaultResources);
-        
normalizedResourceMap.putAll(NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resources));
-        onHeap = 
normalizedResourceMap.getOrDefault(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME,
 0.0);
-        offHeap = 
normalizedResourceMap.getOrDefault(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME,
 0.0);
-        normalizedResources = new NormalizedResources(normalizedResourceMap);
-    }
-
-    public NormalizedResourceRequest(ComponentCommon component, Map<String, 
Object> topoConf) {
-        this(parseResources(component.get_json_conf()), 
getDefaultResources(topoConf));
-    }
-
-    public NormalizedResourceRequest(Map<String, Object> topoConf) {
-        this((Map<String, ? extends Number>) null, 
getDefaultResources(topoConf));
-    }
-
-    public NormalizedResourceRequest() {
-        this((Map<String, ? extends Number>) null, null);
-    }
-
     public Map<String, Double> toNormalizedMap() {
         Map<String, Double> ret = this.normalizedResources.toNormalizedMap();
         ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, offHeap);

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
index 76d5ce2..c41cd95 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
@@ -45,16 +45,6 @@ public class NormalizedResources {
     private double[] otherResources;
 
     /**
-     * This is for testing only. It allows a test to reset the static state 
relating to resource names. We reset the mapping because some
-     * algorithms sadly have different behavior if a resource exists or not.
-     */
-    @VisibleForTesting
-    public static void resetResourceNames() {
-        RESOURCE_NAME_NORMALIZER = new ResourceNameNormalizer();
-        RESOURCE_MAP_ARRAY_BRIDGE = new ResourceMapArrayBridge();
-    }
-
-    /**
      * Copy constructor.
      */
     public NormalizedResources(NormalizedResources other) {
@@ -74,6 +64,16 @@ public class NormalizedResources {
     }
 
     /**
+     * This is for testing only. It allows a test to reset the static state 
relating to resource names. We reset the mapping because some
+     * algorithms sadly have different behavior if a resource exists or not.
+     */
+    @VisibleForTesting
+    public static void resetResourceNames() {
+        RESOURCE_NAME_NORMALIZER = new ResourceNameNormalizer();
+        RESOURCE_MAP_ARRAY_BRIDGE = new ResourceMapArrayBridge();
+    }
+
+    /**
      * Get the total amount of cpu.
      *
      * @return the amount of cpu.
@@ -81,7 +81,7 @@ public class NormalizedResources {
     public double getTotalCpu() {
         return cpu;
     }
-    
+
     private void zeroPadOtherResourcesIfNecessary(int requiredLength) {
         if (requiredLength > otherResources.length) {
             double[] newResources = new double[requiredLength];
@@ -116,16 +116,18 @@ public class NormalizedResources {
 
     /**
      * Throw an IllegalArgumentException because a resource became negative 
during remove.
-     * @param resourceName The name of the resource that became negative
-     * @param currentValue The current value of the resource
+     *
+     * @param resourceName    The name of the resource that became negative
+     * @param currentValue    The current value of the resource
      * @param subtractedValue The value that was subtracted to make the 
resource negative
      */
     public void throwBecauseResourceBecameNegative(String resourceName, double 
currentValue, double subtractedValue) {
         throw new IllegalArgumentException(String.format("Resource amounts 
should never be negative."
-            + " Resource '%s' with current value '%f' became negative because 
'%f' was removed.",
-            resourceName, currentValue, subtractedValue));
+                                                         +
+                                                         " Resource '%s' with 
current value '%f' became negative because '%f' was removed.",
+                                                         resourceName, 
currentValue, subtractedValue));
     }
-    
+
     /**
      * Remove the other resources from this. This is the same as subtracting 
the resources in other from this.
      *
@@ -173,8 +175,8 @@ public class NormalizedResources {
      * A simple sanity check to see if all of the resources in this would be 
large enough to hold the resources in other ignoring memory. It
      * does not check memory because with shared memory it is beyond the scope 
of this.
      *
-     * @param other the resources that we want to check if they would fit in 
this.
-     * @param thisTotalMemoryMb The total memory in MB of this
+     * @param other              the resources that we want to check if they 
would fit in this.
+     * @param thisTotalMemoryMb  The total memory in MB of this
      * @param otherTotalMemoryMb The total memory in MB of other
      * @return true if it might fit, else false if it could not possibly fit.
      */
@@ -204,8 +206,8 @@ public class NormalizedResources {
 
     private void throwBecauseUsedIsNotSubsetOfTotal(NormalizedResources used, 
double totalMemoryMb, double usedMemoryMb) {
         throw new IllegalArgumentException(String.format("The used resources 
must be a subset of the total resources."
-            + " Used: '%s', Total: '%s', Used Mem: '%f', Total Mem: '%f'",
-            used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, 
totalMemoryMb));
+                                                         + " Used: '%s', 
Total: '%s', Used Mem: '%f', Total Mem: '%f'",
+                                                         
used.toNormalizedMap(), this.toNormalizedMap(), usedMemoryMb, totalMemoryMb));
     }
 
     /**
@@ -213,18 +215,19 @@ public class NormalizedResources {
      * subset of the total resources. If a resource in the total has a value 
of zero, it will be skipped in the calculation to avoid
      * division by 0. If all resources are skipped the result is defined to be 
100.0.
      *
-     * @param used the amount of resources used.
+     * @param used          the amount of resources used.
      * @param totalMemoryMb The total memory in MB
-     * @param usedMemoryMb The used memory in MB
+     * @param usedMemoryMb  The used memory in MB
      * @return the average percentage used 0.0 to 100.0.
+     *
      * @throws IllegalArgumentException if any resource in used has a greater 
value than the same resource in the total, or used has generic
-     *     resources that are not present in the total.
+     *                                  resources that are not present in the 
total.
      */
     public double calculateAveragePercentageUsedBy(NormalizedResources used, 
double totalMemoryMb, double usedMemoryMb) {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Calculating avg percentage used by. Used Mem: {} Total 
Mem: {}"
-                + " Used Normalized Resources: {} Total Normalized Resources: 
{}", totalMemoryMb, usedMemoryMb,
-                toNormalizedMap(), used.toNormalizedMap());
+                      + " Used Normalized Resources: {} Total Normalized 
Resources: {}", totalMemoryMb, usedMemoryMb,
+                      toNormalizedMap(), used.toNormalizedMap());
         }
 
         int skippedResourceTypes = 0;
@@ -290,18 +293,19 @@ public class NormalizedResources {
      * subset of the total resources. If a resource in the total has a value 
of zero, it will be skipped in the calculation to avoid
      * division by 0. If all resources are skipped the result is defined to be 
100.0.
      *
-     * @param used the amount of resources used.
+     * @param used          the amount of resources used.
      * @param totalMemoryMb The total memory in MB
-     * @param usedMemoryMb The used memory in MB
+     * @param usedMemoryMb  The used memory in MB
      * @return the minimum percentage used 0.0 to 100.0.
+     *
      * @throws IllegalArgumentException if any resource in used has a greater 
value than the same resource in the total, or used has generic
-     *     resources that are not present in the total.
+     *                                  resources that are not present in the 
total.
      */
     public double calculateMinPercentageUsedBy(NormalizedResources used, 
double totalMemoryMb, double usedMemoryMb) {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Calculating min percentage used by. Used Mem: {} Total 
Mem: {}"
-                + " Used Normalized Resources: {} Total Normalized Resources: 
{}", totalMemoryMb, usedMemoryMb,
-                toNormalizedMap(), used.toNormalizedMap());
+                      + " Used Normalized Resources: {} Total Normalized 
Resources: {}", totalMemoryMb, usedMemoryMb,
+                      toNormalizedMap(), used.toNormalizedMap());
         }
 
         double min = 1.0;
@@ -322,7 +326,7 @@ public class NormalizedResources {
         if (used.otherResources.length > otherResources.length) {
             throwBecauseUsedIsNotSubsetOfTotal(used, totalMemoryMb, 
usedMemoryMb);
         }
-        
+
         for (int i = 0; i < otherResources.length; i++) {
             if (otherResources[i] == 0.0) {
                 //Skip any resources where the total is 0, the percent used 
for this resource isn't meaningful.

Reply via email to