http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
index c401474..d65c125 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
@@ -1,33 +1,25 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.blacklist;
 
 import com.google.common.collect.EvictingQueue;
 import com.google.common.collect.Sets;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
-
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.scheduler.Cluster;
@@ -46,29 +38,23 @@ import org.slf4j.LoggerFactory;
 
 
 public class BlacklistScheduler implements IScheduler {
-    private static final Logger LOG = 
LoggerFactory.getLogger(BlacklistScheduler.class);
-
     public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800;
     public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 3;
     public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME = 300;
-
+    private static final Logger LOG = 
LoggerFactory.getLogger(BlacklistScheduler.class);
     private final IScheduler underlyingScheduler;
-    private Map<String, Object> conf;
-
     protected int toleranceTime;
     protected int toleranceCount;
     protected int resumeTime;
     protected IReporter reporter;
     protected IBlacklistStrategy blacklistStrategy;
-
     protected int nimbusMonitorFreqSecs;
-
     protected Map<String, Set<Integer>> cachedSupervisors;
-
     //key is supervisor key ,value is supervisor ports
     protected EvictingQueue<HashMap<String, Set<Integer>>> 
badSupervisorsToleranceSlidingWindow;
     protected int windowSize;
     protected Set<String> blacklistHost;
+    private Map<String, Object> conf;
 
     public BlacklistScheduler(IScheduler underlyingScheduler) {
         this.underlyingScheduler = underlyingScheduler;
@@ -81,18 +67,18 @@ public class BlacklistScheduler implements IScheduler {
         this.conf = conf;
 
         toleranceTime = 
ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME),
-                DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME);
+                                            
DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME);
         toleranceCount = 
ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT),
-                DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
+                                             
DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
         resumeTime = 
ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME),
-                DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME);
+                                         
DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME);
 
         String reporterClassName = 
ObjectReader.getString(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER),
-                LogReporter.class.getName());
+                                                          
LogReporter.class.getName());
         reporter = (IReporter) initializeInstance(reporterClassName, 
"blacklist reporter");
 
         String strategyClassName = 
ObjectReader.getString(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY),
-                DefaultBlacklistStrategy.class.getName());
+                                                          
DefaultBlacklistStrategy.class.getName());
         blacklistStrategy = (IBlacklistStrategy) 
initializeInstance(strategyClassName, "blacklist strategy");
 
         nimbusMonitorFreqSecs = 
ObjectReader.getInt(this.conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
@@ -177,7 +163,7 @@ public class BlacklistScheduler implements IScheduler {
 
     private Set<String> getBlacklistHosts(Cluster cluster, Topologies 
topologies) {
         Set<String> blacklistSet = blacklistStrategy.getBlacklist(new 
ArrayList<>(badSupervisorsToleranceSlidingWindow),
-                cluster, topologies);
+                                                                  cluster, 
topologies);
         Set<String> blacklistHostSet = new HashSet<>();
         for (String supervisor : blacklistSet) {
             String host = cluster.getHost(supervisor);
@@ -235,7 +221,7 @@ public class BlacklistScheduler implements IScheduler {
                     cachedSupervisors.put(supervisorKey, slots);
                 }
                 LOG.info("Worker slot {} was never back to normal during 
tolerance period, probably dead. Will be removed from cache.",
-                        workerSlot);
+                         workerSlot);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
index 153829c..b1bb933 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/IReporter.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.blacklist.reporters;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
index 3255c9d..4d3eded 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/reporters/LogReporter.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.blacklist.reporters;
@@ -21,7 +15,6 @@ package org.apache.storm.scheduler.blacklist.reporters;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,6 +29,6 @@ public class LogReporter implements IReporter {
     @Override
     public void reportBlacklist(String supervisor, List<Map<String, 
Set<Integer>>> toleranceBuffer) {
         LOG.warn("add supervisor {}  to blacklist. The bad slot history of 
supervisors is : {}",
-                supervisor, toleranceBuffer);
+                 supervisor, toleranceBuffer);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
index f6ca6ad..3332b2d 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.blacklist.strategies;
@@ -24,7 +18,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.SupervisorDetails;
@@ -39,11 +32,9 @@ import org.slf4j.LoggerFactory;
 
 public class DefaultBlacklistStrategy implements IBlacklistStrategy {
 
-    private static Logger LOG = 
LoggerFactory.getLogger(DefaultBlacklistStrategy.class);
-
     public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800;
     public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 3;
-
+    private static Logger LOG = 
LoggerFactory.getLogger(DefaultBlacklistStrategy.class);
     private IReporter reporter;
 
     private int toleranceCount;
@@ -55,11 +46,11 @@ public class DefaultBlacklistStrategy implements 
IBlacklistStrategy {
     @Override
     public void prepare(Map<String, Object> conf) {
         toleranceCount = 
ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT),
-                DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
+                                             
DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
         resumeTime = 
ObjectReader.getInt(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), 
DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME);
 
         String reporterClassName = 
ObjectReader.getString(conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER),
-                LogReporter.class.getName());
+                                                          
LogReporter.class.getName());
         reporter = (IReporter) initializeInstance(reporterClassName, 
"blacklist reporter");
 
         nimbusMonitorFreqSecs = 
ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
@@ -133,7 +124,7 @@ public class DefaultBlacklistStrategy implements 
IBlacklistStrategy {
 
             if (shortage > 0) {
                 LOG.info("total needed num of workers :{}, available num of 
slots not in blacklist :{}, num blacklist :{}, " +
-                        "will release some blacklist.", totalNeedNumWorkers, 
availableSlotsNotInBlacklistCount, blacklist.size());
+                         "will release some blacklist.", totalNeedNumWorkers, 
availableSlotsNotInBlacklistCount, blacklist.size());
 
                 //release earliest blacklist
                 Set<String> readyToRemove = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
index 8543659..295dd82 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.blacklist.strategies;
@@ -21,7 +15,6 @@ package org.apache.storm.scheduler.blacklist.strategies;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.Topologies;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/DefaultPool.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/DefaultPool.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/DefaultPool.java
index b690764..1b6ac91 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/DefaultPool.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/DefaultPool.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.multitenant;
@@ -24,196 +18,199 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.storm.scheduler.SchedulerAssignment;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A pool of machines that anyone can use, but topologies are not isolated
  */
 public class DefaultPool extends NodePool {
-  private static final Logger LOG = LoggerFactory.getLogger(DefaultPool.class);
-  private Set<Node> _nodes = new HashSet<>();
-  private HashMap<String, TopologyDetails> _tds = new HashMap<>();
-  
-  @Override
-  public void addTopology(TopologyDetails td) {
-    String topId = td.getId();
-    LOG.debug("Adding in Topology {}", topId);
-    _tds.put(topId, td);
-    SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
-    if (assignment != null) {
-      for (WorkerSlot ws: assignment.getSlots()) {
-        Node n = _nodeIdToNode.get(ws.getNodeId());
-        _nodes.add(n);
-      }
-    }
-  }
-
-  @Override
-  public boolean canAdd(TopologyDetails td) {
-    return true;
-  }
-
-  @Override
-  public Collection<Node> takeNodes(int nodesNeeded) {
-    HashSet<Node> ret = new HashSet<>();
-    LinkedList<Node> sortedNodes = new LinkedList<>(_nodes);
-    Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
-    for (Node n: sortedNodes) {
-      if (nodesNeeded <= ret.size()) {
-        break;
-      }
-      if (n.isAlive()) {
-        n.freeAllSlots(_cluster);
-        _nodes.remove(n);
-        ret.add(n);
-      }
-    }
-    return ret;
-  }
-  
-  @Override
-  public int nodesAvailable() {
-    int total = 0;
-    for (Node n: _nodes) {
-      if (n.isAlive()) total++;
-    }
-    return total;
-  }
-  
-  @Override
-  public int slotsAvailable() {
-    return Node.countTotalSlotsAlive(_nodes);
-  }
-
-  @Override
-  public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int 
slotsNeeded) {
-    int nodesFound = 0;
-    int slotsFound = 0;
-    LinkedList<Node> sortedNodes = new LinkedList<>(_nodes);
-    Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
-    for (Node n: sortedNodes) {
-      if (slotsNeeded <= 0) {
-        break;
-      }
-      if (n.isAlive()) {
-        nodesFound++;
-        int totalSlotsFree = n.totalSlots();
-        slotsFound += totalSlotsFree;
-        slotsNeeded -= totalSlotsFree;
-      }
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultPool.class);
+    private Set<Node> _nodes = new HashSet<>();
+    private HashMap<String, TopologyDetails> _tds = new HashMap<>();
+
+    @Override
+    public void addTopology(TopologyDetails td) {
+        String topId = td.getId();
+        LOG.debug("Adding in Topology {}", topId);
+        _tds.put(topId, td);
+        SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
+        if (assignment != null) {
+            for (WorkerSlot ws : assignment.getSlots()) {
+                Node n = _nodeIdToNode.get(ws.getNodeId());
+                _nodes.add(n);
+            }
+        }
     }
-    return new NodeAndSlotCounts(nodesFound, slotsFound);
-  }
-  
-  @Override
-  public Collection<Node> takeNodesBySlots(int slotsNeeded) {
-    HashSet<Node> ret = new HashSet<>();
-    LinkedList<Node> sortedNodes = new LinkedList<>(_nodes);
-    Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
-    for (Node n: sortedNodes) {
-      if (slotsNeeded <= 0) {
-        break;
-      }
-      if (n.isAlive()) {
-        n.freeAllSlots(_cluster);
-        _nodes.remove(n);
-        ret.add(n);
-        slotsNeeded -= n.totalSlotsFree();
-      }
+
+    @Override
+    public boolean canAdd(TopologyDetails td) {
+        return true;
     }
-    return ret;
-  }
-
-  @Override
-  public void scheduleAsNeeded(NodePool... lesserPools) {
-    for (TopologyDetails td : _tds.values()) {
-      String topId = td.getId();
-      if (_cluster.needsScheduling(td)) {
-        LOG.debug("Scheduling topology {}",topId);
-        int totalTasks = td.getExecutors().size();
-        int origRequest = td.getNumWorkers();
-        int slotsRequested = Math.min(totalTasks, origRequest);
-        int slotsUsed = Node.countSlotsUsed(topId, _nodes);
-        int slotsFree = Node.countFreeSlotsAlive(_nodes);
-        //Check to see if we have enough slots before trying to get them
-        int slotsAvailable = 0;
-        if (slotsRequested > slotsFree) {
-          slotsAvailable = NodePool.slotsAvailable(lesserPools);
-        }
-        int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + 
slotsAvailable);
-        int executorsNotRunning = _cluster.getUnassignedExecutors(td).size();
-        LOG.debug("Slots... requested {} used {} free {} available {} to be 
used {}, executors not running {}",
-                slotsRequested, slotsUsed, slotsFree, slotsAvailable, 
slotsToUse, executorsNotRunning);
-        if (slotsToUse <= 0) {
-          if (executorsNotRunning > 0) {
-            _cluster.setStatus(topId,"Not fully scheduled (No free slots in 
default pool) "+executorsNotRunning+" executors not scheduled");
-          } else {
-            if (slotsUsed < slotsRequested) {
-              _cluster.setStatus(topId,"Running with fewer slots than 
requested ("+slotsUsed+"/"+origRequest+")");
-            } else { //slotsUsed < origRequest
-              _cluster.setStatus(topId,"Fully Scheduled (requested 
"+origRequest+" slots, but could only use "+slotsUsed+")");
+
+    @Override
+    public Collection<Node> takeNodes(int nodesNeeded) {
+        HashSet<Node> ret = new HashSet<>();
+        LinkedList<Node> sortedNodes = new LinkedList<>(_nodes);
+        Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
+        for (Node n : sortedNodes) {
+            if (nodesNeeded <= ret.size()) {
+                break;
+            }
+            if (n.isAlive()) {
+                n.freeAllSlots(_cluster);
+                _nodes.remove(n);
+                ret.add(n);
             }
-          }
-          continue;
         }
+        return ret;
+    }
 
-        int slotsNeeded = slotsToUse - slotsFree;
-        if (slotsNeeded > 0) {
-          _nodes.addAll(NodePool.takeNodesBySlot(slotsNeeded, lesserPools));
+    @Override
+    public int nodesAvailable() {
+        int total = 0;
+        for (Node n : _nodes) {
+            if (n.isAlive()) total++;
         }
+        return total;
+    }
+
+    @Override
+    public int slotsAvailable() {
+        return Node.countTotalSlotsAlive(_nodes);
+    }
 
-        if (executorsNotRunning <= 0) {
-          //There are free slots that we can take advantage of now.
-          for (Node n: _nodes) {
-            n.freeTopology(topId, _cluster); 
-          }
-          slotsFree = Node.countFreeSlotsAlive(_nodes);
-          slotsToUse = Math.min(slotsRequested, slotsFree);
+    @Override
+    public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int 
slotsNeeded) {
+        int nodesFound = 0;
+        int slotsFound = 0;
+        LinkedList<Node> sortedNodes = new LinkedList<>(_nodes);
+        Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
+        for (Node n : sortedNodes) {
+            if (slotsNeeded <= 0) {
+                break;
+            }
+            if (n.isAlive()) {
+                nodesFound++;
+                int totalSlotsFree = n.totalSlots();
+                slotsFound += totalSlotsFree;
+                slotsNeeded -= totalSlotsFree;
+            }
         }
-        
-        RoundRobinSlotScheduler slotSched = 
-          new RoundRobinSlotScheduler(td, slotsToUse, _cluster);
-        
-        LinkedList<Node> nodes = new LinkedList<>(_nodes);
-        while (true) {
-          Node n;
-          do {
-            if (nodes.isEmpty()) {
-              throw new IllegalStateException("This should not happen, we" +
-              " messed up and did not get enough slots");
+        return new NodeAndSlotCounts(nodesFound, slotsFound);
+    }
+
+    @Override
+    public Collection<Node> takeNodesBySlots(int slotsNeeded) {
+        HashSet<Node> ret = new HashSet<>();
+        LinkedList<Node> sortedNodes = new LinkedList<>(_nodes);
+        Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
+        for (Node n : sortedNodes) {
+            if (slotsNeeded <= 0) {
+                break;
             }
-            n = nodes.peekFirst();
-            if (n.totalSlotsFree() == 0) {
-              nodes.remove();
-              n = null;
+            if (n.isAlive()) {
+                n.freeAllSlots(_cluster);
+                _nodes.remove(n);
+                ret.add(n);
+                slotsNeeded -= n.totalSlotsFree();
             }
-          } while (n == null);
-          if (!slotSched.assignSlotTo(n)) {
-            break;
-          }
         }
-        int afterSchedSlotsUsed = Node.countSlotsUsed(topId, _nodes);
-        if (afterSchedSlotsUsed < slotsRequested) {
-          _cluster.setStatus(topId,"Running with fewer slots than requested 
("+afterSchedSlotsUsed+"/"+origRequest+")");
-        } else if (afterSchedSlotsUsed < origRequest) {
-          _cluster.setStatus(topId,"Fully Scheduled (requested "+origRequest+" 
slots, but could only use "+afterSchedSlotsUsed+")");
-        } else {
-          _cluster.setStatus(topId,"Fully Scheduled");
+        return ret;
+    }
+
+    @Override
+    public void scheduleAsNeeded(NodePool... lesserPools) {
+        for (TopologyDetails td : _tds.values()) {
+            String topId = td.getId();
+            if (_cluster.needsScheduling(td)) {
+                LOG.debug("Scheduling topology {}", topId);
+                int totalTasks = td.getExecutors().size();
+                int origRequest = td.getNumWorkers();
+                int slotsRequested = Math.min(totalTasks, origRequest);
+                int slotsUsed = Node.countSlotsUsed(topId, _nodes);
+                int slotsFree = Node.countFreeSlotsAlive(_nodes);
+                //Check to see if we have enough slots before trying to get 
them
+                int slotsAvailable = 0;
+                if (slotsRequested > slotsFree) {
+                    slotsAvailable = NodePool.slotsAvailable(lesserPools);
+                }
+                int slotsToUse = Math.min(slotsRequested - slotsUsed, 
slotsFree + slotsAvailable);
+                int executorsNotRunning = 
_cluster.getUnassignedExecutors(td).size();
+                LOG.debug("Slots... requested {} used {} free {} available {} 
to be used {}, executors not running {}",
+                          slotsRequested, slotsUsed, slotsFree, 
slotsAvailable, slotsToUse, executorsNotRunning);
+                if (slotsToUse <= 0) {
+                    if (executorsNotRunning > 0) {
+                        _cluster.setStatus(topId, "Not fully scheduled (No 
free slots in default pool) " + executorsNotRunning +
+                                                  " executors not scheduled");
+                    } else {
+                        if (slotsUsed < slotsRequested) {
+                            _cluster.setStatus(topId, "Running with fewer 
slots than requested (" + slotsUsed + "/" + origRequest + ")");
+                        } else { //slotsUsed < origRequest
+                            _cluster.setStatus(topId,
+                                               "Fully Scheduled (requested " + 
origRequest + " slots, but could only use " + slotsUsed +
+                                               ")");
+                        }
+                    }
+                    continue;
+                }
+
+                int slotsNeeded = slotsToUse - slotsFree;
+                if (slotsNeeded > 0) {
+                    _nodes.addAll(NodePool.takeNodesBySlot(slotsNeeded, 
lesserPools));
+                }
+
+                if (executorsNotRunning <= 0) {
+                    //There are free slots that we can take advantage of now.
+                    for (Node n : _nodes) {
+                        n.freeTopology(topId, _cluster);
+                    }
+                    slotsFree = Node.countFreeSlotsAlive(_nodes);
+                    slotsToUse = Math.min(slotsRequested, slotsFree);
+                }
+
+                RoundRobinSlotScheduler slotSched =
+                    new RoundRobinSlotScheduler(td, slotsToUse, _cluster);
+
+                LinkedList<Node> nodes = new LinkedList<>(_nodes);
+                while (true) {
+                    Node n;
+                    do {
+                        if (nodes.isEmpty()) {
+                            throw new IllegalStateException("This should not 
happen, we" +
+                                                            " messed up and 
did not get enough slots");
+                        }
+                        n = nodes.peekFirst();
+                        if (n.totalSlotsFree() == 0) {
+                            nodes.remove();
+                            n = null;
+                        }
+                    } while (n == null);
+                    if (!slotSched.assignSlotTo(n)) {
+                        break;
+                    }
+                }
+                int afterSchedSlotsUsed = Node.countSlotsUsed(topId, _nodes);
+                if (afterSchedSlotsUsed < slotsRequested) {
+                    _cluster.setStatus(topId, "Running with fewer slots than 
requested (" + afterSchedSlotsUsed + "/" + origRequest + ")");
+                } else if (afterSchedSlotsUsed < origRequest) {
+                    _cluster.setStatus(topId,
+                                       "Fully Scheduled (requested " + 
origRequest + " slots, but could only use " + afterSchedSlotsUsed +
+                                       ")");
+                } else {
+                    _cluster.setStatus(topId, "Fully Scheduled");
+                }
+            } else {
+                _cluster.setStatus(topId, "Fully Scheduled");
+            }
         }
-      } else {
-        _cluster.setStatus(topId,"Fully Scheduled");
-      }
     }
-  }
-  
-  @Override
-  public String toString() {
-    return "DefaultPool  " + _nodes.size() + " nodes " + _tds.size() + " 
topologies";
-  }
+
+    @Override
+    public String toString() {
+        return "DefaultPool  " + _nodes.size() + " nodes " + _tds.size() + " 
topologies";
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/FreePool.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/FreePool.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/FreePool.java
index 88cec6b..e868223 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/FreePool.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/FreePool.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.multitenant;
@@ -23,103 +17,101 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.TopologyDetails;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * All of the machines that currently have nothing assigned to them
  */
 public class FreePool extends NodePool {
-  private static final Logger LOG = LoggerFactory.getLogger(FreePool.class);
-  private Set<Node> _nodes = new HashSet<>();
-  private int _totalSlots = 0;
+    private static final Logger LOG = LoggerFactory.getLogger(FreePool.class);
+    private Set<Node> _nodes = new HashSet<>();
+    private int _totalSlots = 0;
 
-  @Override
-  public void init(Cluster cluster, Map<String, Node> nodeIdToNode) {
-    super.init(cluster, nodeIdToNode);
-    for (Node n: nodeIdToNode.values()) {
-      if(n.isTotallyFree() && n.isAlive()) {
-        _nodes.add(n);
-        _totalSlots += n.totalSlotsFree();
-      }
+    @Override
+    public void init(Cluster cluster, Map<String, Node> nodeIdToNode) {
+        super.init(cluster, nodeIdToNode);
+        for (Node n : nodeIdToNode.values()) {
+            if (n.isTotallyFree() && n.isAlive()) {
+                _nodes.add(n);
+                _totalSlots += n.totalSlotsFree();
+            }
+        }
+        LOG.debug("Found {} nodes with {} slots", _nodes.size(), _totalSlots);
     }
-    LOG.debug("Found {} nodes with {} slots", _nodes.size(), _totalSlots);
-  }
-  
-  @Override
-  public void addTopology(TopologyDetails td) {
-    throw new IllegalArgumentException("The free pool cannot run any 
topologies");
-  }
 
-  @Override
-  public boolean canAdd(TopologyDetails td) {
-    // The free pool never has anything running
-    return false;
-  }
-  
-  @Override
-  public Collection<Node> takeNodes(int nodesNeeded) {
-    HashSet<Node> ret = new HashSet<>();
-    Iterator<Node> it = _nodes.iterator();
-    while (it.hasNext() && nodesNeeded > ret.size()) {
-      Node n = it.next();
-      ret.add(n);
-      _totalSlots -= n.totalSlotsFree();
-      it.remove();
+    @Override
+    public void addTopology(TopologyDetails td) {
+        throw new IllegalArgumentException("The free pool cannot run any 
topologies");
     }
-    return ret;
-  }
-  
-  @Override
-  public int nodesAvailable() {
-    return _nodes.size();
-  }
 
-  @Override
-  public int slotsAvailable() {
-    return _totalSlots;
-  }
+    @Override
+    public boolean canAdd(TopologyDetails td) {
+        // The free pool never has anything running
+        return false;
+    }
 
-  @Override
-  public Collection<Node> takeNodesBySlots(int slotsNeeded) {
-    HashSet<Node> ret = new HashSet<>();
-    Iterator<Node> it = _nodes.iterator();
-    while (it.hasNext() && slotsNeeded > 0) {
-      Node n = it.next();
-      ret.add(n);
-      _totalSlots -= n.totalSlotsFree();
-      slotsNeeded -= n.totalSlotsFree();
-      it.remove();
+    @Override
+    public Collection<Node> takeNodes(int nodesNeeded) {
+        HashSet<Node> ret = new HashSet<>();
+        Iterator<Node> it = _nodes.iterator();
+        while (it.hasNext() && nodesNeeded > ret.size()) {
+            Node n = it.next();
+            ret.add(n);
+            _totalSlots -= n.totalSlotsFree();
+            it.remove();
+        }
+        return ret;
     }
-    return ret;
-  }
-  
-  @Override
-  public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int 
slotsNeeded) {
-    int slotsFound = 0;
-    int nodesFound = 0;
-    Iterator<Node> it = _nodes.iterator();
-    while (it.hasNext() && slotsNeeded > 0) {
-      Node n = it.next();
-      nodesFound++;
-      int totalSlots = n.totalSlots();
-      slotsFound += totalSlots;
-      slotsNeeded -= totalSlots;
+
+    @Override
+    public int nodesAvailable() {
+        return _nodes.size();
     }
-    return new NodeAndSlotCounts(nodesFound, slotsFound);
-  }
 
-  @Override
-  public void scheduleAsNeeded(NodePool... lesserPools) {
-    //No topologies running so NOOP
-  }
-  
-  @Override
-  public String toString() {
-    return "FreePool of "+_nodes.size()+" nodes with "+_totalSlots+" slots";
-  }
+    @Override
+    public int slotsAvailable() {
+        return _totalSlots;
+    }
+
+    @Override
+    public Collection<Node> takeNodesBySlots(int slotsNeeded) {
+        HashSet<Node> ret = new HashSet<>();
+        Iterator<Node> it = _nodes.iterator();
+        while (it.hasNext() && slotsNeeded > 0) {
+            Node n = it.next();
+            ret.add(n);
+            _totalSlots -= n.totalSlotsFree();
+            slotsNeeded -= n.totalSlotsFree();
+            it.remove();
+        }
+        return ret;
+    }
+
+    @Override
+    public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int 
slotsNeeded) {
+        int slotsFound = 0;
+        int nodesFound = 0;
+        Iterator<Node> it = _nodes.iterator();
+        while (it.hasNext() && slotsNeeded > 0) {
+            Node n = it.next();
+            nodesFound++;
+            int totalSlots = n.totalSlots();
+            slotsFound += totalSlots;
+            slotsNeeded -= totalSlots;
+        }
+        return new NodeAndSlotCounts(nodesFound, slotsFound);
+    }
+
+    @Override
+    public void scheduleAsNeeded(NodePool... lesserPools) {
+        //No topologies running so NOOP
+    }
+
+    @Override
+    public String toString() {
+        return "FreePool of " + _nodes.size() + " nodes with " + _totalSlots + 
" slots";
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/IsolatedPool.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/IsolatedPool.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/IsolatedPool.java
index f16e4fa..19ce712 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/IsolatedPool.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/IsolatedPool.java
@@ -1,371 +1,362 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.multitenant;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.storm.Config;
 import org.apache.storm.scheduler.SchedulerAssignment;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A pool of machines that can be used to run isolated topologies
  */
 public class IsolatedPool extends NodePool {
-  private static final Logger LOG = 
LoggerFactory.getLogger(IsolatedPool.class);
-  private Map<String, Set<Node>> _topologyIdToNodes = new HashMap<>();
-  private HashMap<String, TopologyDetails> _tds = new HashMap<>();
-  private HashSet<String> _isolated = new HashSet<>();
-  private int _maxNodes;
-  private int _usedNodes;
+    private static final Logger LOG = 
LoggerFactory.getLogger(IsolatedPool.class);
+    private Map<String, Set<Node>> _topologyIdToNodes = new HashMap<>();
+    private HashMap<String, TopologyDetails> _tds = new HashMap<>();
+    private HashSet<String> _isolated = new HashSet<>();
+    private int _maxNodes;
+    private int _usedNodes;
 
-  public IsolatedPool(int maxNodes) {
-    _maxNodes = maxNodes;
-    _usedNodes = 0;
-  }
-
-  @Override
-  public void addTopology(TopologyDetails td) {
-    String topId = td.getId();
-    LOG.debug("Adding in Topology {}", topId);
-    SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
-    Set<Node> assignedNodes = new HashSet<>();
-    if (assignment != null) {
-      for (WorkerSlot ws: assignment.getSlots()) {
-        Node n = _nodeIdToNode.get(ws.getNodeId());
-        assignedNodes.add(n);
-      }
-    }
-    _usedNodes += assignedNodes.size();
-    _topologyIdToNodes.put(topId, assignedNodes);
-    _tds.put(topId, td);
-    if (td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES) != null) {
-      _isolated.add(topId);
+    public IsolatedPool(int maxNodes) {
+        _maxNodes = maxNodes;
+        _usedNodes = 0;
     }
-  }
 
-  @Override
-  public boolean canAdd(TopologyDetails td) {
-    //Only add topologies that are not sharing nodes with other topologies
-    String topId = td.getId();
-    SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
-    if (assignment != null) {
-      for (WorkerSlot ws: assignment.getSlots()) {
-        Node n = _nodeIdToNode.get(ws.getNodeId());
-        if (n.getRunningTopologies().size() > 1) {
-          return false;
-        }
-      }
-    }
-    return true;
-  }
-  
-  @Override
-  public void scheduleAsNeeded(NodePool ... lesserPools) {
-    for (String topId : _topologyIdToNodes.keySet()) {
-      TopologyDetails td = _tds.get(topId);
-      Set<Node> allNodes = _topologyIdToNodes.get(topId);
-      Number nodesRequested = (Number) 
td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES);
-      Integer effectiveNodesRequested = null;
-      if (nodesRequested != null) {
-        effectiveNodesRequested = Math.min(td.getExecutors().size(),
-                +nodesRequested.intValue());
-      }
-      if (_cluster.needsScheduling(td) ||
-              (effectiveNodesRequested != null &&
-                      allNodes.size() != effectiveNodesRequested)) {
-        LOG.debug("Scheduling topology {}", topId);
-        int slotsToUse = 0;
-        if (effectiveNodesRequested == null) {
-          slotsToUse = getNodesForNotIsolatedTop(td, allNodes, lesserPools);
-        } else {
-          slotsToUse = getNodesForIsolatedTop(td, allNodes, lesserPools,
-                  effectiveNodesRequested);
+    @Override
+    public void addTopology(TopologyDetails td) {
+        String topId = td.getId();
+        LOG.debug("Adding in Topology {}", topId);
+        SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
+        Set<Node> assignedNodes = new HashSet<>();
+        if (assignment != null) {
+            for (WorkerSlot ws : assignment.getSlots()) {
+                Node n = _nodeIdToNode.get(ws.getNodeId());
+                assignedNodes.add(n);
+            }
         }
-        //No slots to schedule for some reason, so skip it.
-        if (slotsToUse <= 0) {
-          continue;
+        _usedNodes += assignedNodes.size();
+        _topologyIdToNodes.put(topId, assignedNodes);
+        _tds.put(topId, td);
+        if (td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES) != null) {
+            _isolated.add(topId);
         }
-        
-        RoundRobinSlotScheduler slotSched = 
-          new RoundRobinSlotScheduler(td, slotsToUse, _cluster);
+    }
 
-        LOG.debug("Nodes sorted by free space {}", allNodes);
-        while (true) {
-          Node n = findBestNode(allNodes);
-          if (n == null) {
-            LOG.error("No nodes to use to assign topology {}", td.getName());
-            break;
-          }
-          if (!slotSched.assignSlotTo(n)) {
-            break;
-          }
+    @Override
+    public boolean canAdd(TopologyDetails td) {
+        //Only add topologies that are not sharing nodes with other topologies
+        String topId = td.getId();
+        SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
+        if (assignment != null) {
+            for (WorkerSlot ws : assignment.getSlots()) {
+                Node n = _nodeIdToNode.get(ws.getNodeId());
+                if (n.getRunningTopologies().size() > 1) {
+                    return false;
+                }
+            }
         }
-      }
-      Set<Node> found = _topologyIdToNodes.get(topId);
-      int nc = found == null ? 0 : found.size();
-      _cluster.setStatus(topId,"Scheduled Isolated on "+nc+" Nodes");
+        return true;
     }
-  }
 
-  private Node findBestNode(Collection<Node> nodes) {
-    Node ret = null;
-    for(Node node : nodes) {
-      if(ret == null ) {
-        if(node.totalSlotsFree() > 0) {
-          ret = node;
-        }
-      } else {
-        if (node.totalSlotsFree() > 0) {
-          if (node.totalSlotsUsed() < ret.totalSlotsUsed()) {
-            ret = node;
-          } else if (node.totalSlotsUsed() == ret.totalSlotsUsed()) {
-            if(node.totalSlotsFree() > ret.totalSlotsFree()) {
-              ret = node;
+    @Override
+    public void scheduleAsNeeded(NodePool... lesserPools) {
+        for (String topId : _topologyIdToNodes.keySet()) {
+            TopologyDetails td = _tds.get(topId);
+            Set<Node> allNodes = _topologyIdToNodes.get(topId);
+            Number nodesRequested = (Number) 
td.getConf().get(Config.TOPOLOGY_ISOLATED_MACHINES);
+            Integer effectiveNodesRequested = null;
+            if (nodesRequested != null) {
+                effectiveNodesRequested = Math.min(td.getExecutors().size(),
+                                                   +nodesRequested.intValue());
+            }
+            if (_cluster.needsScheduling(td) ||
+                (effectiveNodesRequested != null &&
+                 allNodes.size() != effectiveNodesRequested)) {
+                LOG.debug("Scheduling topology {}", topId);
+                int slotsToUse = 0;
+                if (effectiveNodesRequested == null) {
+                    slotsToUse = getNodesForNotIsolatedTop(td, allNodes, 
lesserPools);
+                } else {
+                    slotsToUse = getNodesForIsolatedTop(td, allNodes, 
lesserPools,
+                                                        
effectiveNodesRequested);
+                }
+                //No slots to schedule for some reason, so skip it.
+                if (slotsToUse <= 0) {
+                    continue;
+                }
+
+                RoundRobinSlotScheduler slotSched =
+                    new RoundRobinSlotScheduler(td, slotsToUse, _cluster);
+
+                LOG.debug("Nodes sorted by free space {}", allNodes);
+                while (true) {
+                    Node n = findBestNode(allNodes);
+                    if (n == null) {
+                        LOG.error("No nodes to use to assign topology {}", 
td.getName());
+                        break;
+                    }
+                    if (!slotSched.assignSlotTo(n)) {
+                        break;
+                    }
+                }
             }
-          }
+            Set<Node> found = _topologyIdToNodes.get(topId);
+            int nc = found == null ? 0 : found.size();
+            _cluster.setStatus(topId, "Scheduled Isolated on " + nc + " 
Nodes");
         }
-      }
     }
-    return ret;
-  }
-  
-  /**
-   * Get the nodes needed to schedule an isolated topology.
-   * @param td the topology to be scheduled
-   * @param allNodes the nodes already scheduled for this topology.
-   * This will be updated to include new nodes if needed. 
-   * @param lesserPools node pools we can steal nodes from
-   * @return the number of additional slots that should be used for scheduling.
-   */
-  private int getNodesForIsolatedTop(TopologyDetails td, Set<Node> allNodes,
-      NodePool[] lesserPools, int nodesRequested) {
-    String topId = td.getId();
-    LOG.debug("Topology {} is isolated", topId);
-    int nodesFromUsAvailable = nodesAvailable();
-    int nodesFromOthersAvailable = NodePool.nodesAvailable(lesserPools);
 
-    int nodesUsed = _topologyIdToNodes.get(topId).size();
-    int nodesNeeded = nodesRequested - nodesUsed;
-    LOG.debug("Nodes... requested {} used {} available from us {} " +
-        "avail from other {} needed {}", nodesRequested,
-            nodesUsed, nodesFromUsAvailable, nodesFromOthersAvailable,
-            nodesNeeded);
-    if ((nodesNeeded - nodesFromUsAvailable) > (_maxNodes - _usedNodes)) {
-      _cluster.setStatus(topId,"Max Nodes("+_maxNodes+") for this user would 
be exceeded. "
-        + ((nodesNeeded - nodesFromUsAvailable) - (_maxNodes - _usedNodes)) 
-        + " more nodes needed to run topology.");
-      return 0;
+    private Node findBestNode(Collection<Node> nodes) {
+        Node ret = null;
+        for (Node node : nodes) {
+            if (ret == null) {
+                if (node.totalSlotsFree() > 0) {
+                    ret = node;
+                }
+            } else {
+                if (node.totalSlotsFree() > 0) {
+                    if (node.totalSlotsUsed() < ret.totalSlotsUsed()) {
+                        ret = node;
+                    } else if (node.totalSlotsUsed() == ret.totalSlotsUsed()) {
+                        if (node.totalSlotsFree() > ret.totalSlotsFree()) {
+                            ret = node;
+                        }
+                    }
+                }
+            }
+        }
+        return ret;
     }
 
-    //In order to avoid going over _maxNodes I may need to steal from
-    // myself even though other pools have free nodes. so figure out how
-    // much each group should provide
-    int nodesNeededFromOthers = Math.min(Math.min(_maxNodes - _usedNodes, 
-        nodesFromOthersAvailable), nodesNeeded);
-    int nodesNeededFromUs = nodesNeeded - nodesNeededFromOthers; 
-    LOG.debug("Nodes... needed from us {} needed from others {}", 
-        nodesNeededFromUs, nodesNeededFromOthers);
+    /**
+     * Get the nodes needed to schedule an isolated topology.
+     * @param td the topology to be scheduled
+     * @param allNodes the nodes already scheduled for this topology.
+     * This will be updated to include new nodes if needed.
+     * @param lesserPools node pools we can steal nodes from
+     * @return the number of additional slots that should be used for 
scheduling.
+     */
+    private int getNodesForIsolatedTop(TopologyDetails td, Set<Node> allNodes,
+                                       NodePool[] lesserPools, int 
nodesRequested) {
+        String topId = td.getId();
+        LOG.debug("Topology {} is isolated", topId);
+        int nodesFromUsAvailable = nodesAvailable();
+        int nodesFromOthersAvailable = NodePool.nodesAvailable(lesserPools);
 
-    if (nodesNeededFromUs > nodesFromUsAvailable) {
-      _cluster.setStatus(topId, "Not Enough Nodes Available to Schedule 
Topology");
-      return 0;
-    }
+        int nodesUsed = _topologyIdToNodes.get(topId).size();
+        int nodesNeeded = nodesRequested - nodesUsed;
+        LOG.debug("Nodes... requested {} used {} available from us {} " +
+                  "avail from other {} needed {}", nodesRequested,
+                  nodesUsed, nodesFromUsAvailable, nodesFromOthersAvailable,
+                  nodesNeeded);
+        if ((nodesNeeded - nodesFromUsAvailable) > (_maxNodes - _usedNodes)) {
+            _cluster.setStatus(topId, "Max Nodes(" + _maxNodes + ") for this 
user would be exceeded. "
+                                      + ((nodesNeeded - nodesFromUsAvailable) 
- (_maxNodes - _usedNodes))
+                                      + " more nodes needed to run topology.");
+            return 0;
+        }
 
-    //Get the nodes
-    Collection<Node> found = NodePool.takeNodes(nodesNeededFromOthers, 
lesserPools);
-    _usedNodes += found.size();
-    allNodes.addAll(found);
-    Collection<Node> foundMore = takeNodes(nodesNeededFromUs);
-    _usedNodes += foundMore.size();
-    allNodes.addAll(foundMore);
+        //In order to avoid going over _maxNodes I may need to steal from
+        // myself even though other pools have free nodes. so figure out how
+        // much each group should provide
+        int nodesNeededFromOthers = Math.min(Math.min(_maxNodes - _usedNodes,
+                                                      
nodesFromOthersAvailable), nodesNeeded);
+        int nodesNeededFromUs = nodesNeeded - nodesNeededFromOthers;
+        LOG.debug("Nodes... needed from us {} needed from others {}",
+                  nodesNeededFromUs, nodesNeededFromOthers);
 
-    int totalTasks = td.getExecutors().size();
-    int origRequest = td.getNumWorkers();
-    int slotsRequested = Math.min(totalTasks, origRequest);
-    int slotsUsed = Node.countSlotsUsed(allNodes);
-    int slotsFree = Node.countFreeSlotsAlive(allNodes);
-    int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree);
-    if (slotsToUse <= 0) {
-        // if # of workers requested is more than we currently have
-        if (origRequest > slotsUsed) {
-            _cluster.setStatus(topId, "Running with fewer slots than requested 
" + slotsUsed + "/" +
-                               origRequest + " on " + allNodes.size() + " 
node(s) with " + (slotsUsed + slotsFree) + " total slots");
-        } else {
-            // if # of workers requested is less than we took
-            // then we know some workers we track died, since we have more 
workers than we are supposed to have 
-            _cluster.setStatus(topId, "Node has partially crashed, if this 
situation persists rebalance the topology.");
+        if (nodesNeededFromUs > nodesFromUsAvailable) {
+            _cluster.setStatus(topId, "Not Enough Nodes Available to Schedule 
Topology");
+            return 0;
         }
+
+        //Get the nodes
+        Collection<Node> found = NodePool.takeNodes(nodesNeededFromOthers, 
lesserPools);
+        _usedNodes += found.size();
+        allNodes.addAll(found);
+        Collection<Node> foundMore = takeNodes(nodesNeededFromUs);
+        _usedNodes += foundMore.size();
+        allNodes.addAll(foundMore);
+
+        int totalTasks = td.getExecutors().size();
+        int origRequest = td.getNumWorkers();
+        int slotsRequested = Math.min(totalTasks, origRequest);
+        int slotsUsed = Node.countSlotsUsed(allNodes);
+        int slotsFree = Node.countFreeSlotsAlive(allNodes);
+        int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree);
+        if (slotsToUse <= 0) {
+            // if # of workers requested is more than we currently have
+            if (origRequest > slotsUsed) {
+                _cluster.setStatus(topId, "Running with fewer slots than 
requested " + slotsUsed + "/" +
+                                          origRequest + " on " + 
allNodes.size() + " node(s) with " + (slotsUsed + slotsFree) +
+                                          " total slots");
+            } else {
+                // if # of workers requested is less than we took
+                // then we know some workers we track died, since we have more 
workers than we are supposed to have
+                _cluster.setStatus(topId, "Node has partially crashed, if this 
situation persists rebalance the topology.");
+            }
+        }
+        return slotsToUse;
     }
-    return slotsToUse;
-  }
-  
-  /**
-   * Get the nodes needed to schedule a non-isolated topology.
-   * @param td the topology to be scheduled
-   * @param allNodes the nodes already scheduled for this topology.
-   * This will be updated to include new nodes if needed. 
-   * @param lesserPools node pools we can steal nodes from
-   * @return the number of additional slots that should be used for scheduling.
-   */
-  private int getNodesForNotIsolatedTop(TopologyDetails td, Set<Node> allNodes,
-      NodePool[] lesserPools) {
-    String topId = td.getId();
-    LOG.debug("Topology {} is not isolated",topId);
-    int totalTasks = td.getExecutors().size();
-    int origRequest = td.getNumWorkers();
-    int slotsRequested = Math.min(totalTasks, origRequest);
-    int slotsUsed = Node.countSlotsUsed(topId, allNodes);
-    int slotsFree = Node.countFreeSlotsAlive(allNodes);
-    //Check to see if we have enough slots before trying to get them
-    int slotsAvailable = 0;
-    if (slotsRequested > slotsFree) {
-      slotsAvailable = NodePool.slotsAvailable(lesserPools);
-    }
-    int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + 
slotsAvailable);
-    LOG.debug("Slots... requested {} used {} free {} available {} to be used 
{}",
-            slotsRequested, slotsUsed, slotsFree, slotsAvailable, slotsToUse);
-    if (slotsToUse <= 0) {
-      _cluster.setStatus(topId, "Not Enough Slots Available to Schedule 
Topology");
-      return 0;
-    }
-    int slotsNeeded = slotsToUse - slotsFree;
-    int numNewNodes = NodePool.getNodeCountIfSlotsWereTaken(slotsNeeded, 
lesserPools);
-    LOG.debug("Nodes... new {} used {} max {}",
-            numNewNodes, _usedNodes, _maxNodes);
-    if ((numNewNodes + _usedNodes) > _maxNodes) {
-      _cluster.setStatus(topId,"Max Nodes("+_maxNodes+") for this user would 
be exceeded. " +
-      (numNewNodes - (_maxNodes - _usedNodes)) + " more nodes needed to run 
topology.");
-      return 0;
+
+    /**
+     * Get the nodes needed to schedule a non-isolated topology.
+     * @param td the topology to be scheduled
+     * @param allNodes the nodes already scheduled for this topology.
+     * This will be updated to include new nodes if needed.
+     * @param lesserPools node pools we can steal nodes from
+     * @return the number of additional slots that should be used for 
scheduling.
+     */
+    private int getNodesForNotIsolatedTop(TopologyDetails td, Set<Node> 
allNodes,
+                                          NodePool[] lesserPools) {
+        String topId = td.getId();
+        LOG.debug("Topology {} is not isolated", topId);
+        int totalTasks = td.getExecutors().size();
+        int origRequest = td.getNumWorkers();
+        int slotsRequested = Math.min(totalTasks, origRequest);
+        int slotsUsed = Node.countSlotsUsed(topId, allNodes);
+        int slotsFree = Node.countFreeSlotsAlive(allNodes);
+        //Check to see if we have enough slots before trying to get them
+        int slotsAvailable = 0;
+        if (slotsRequested > slotsFree) {
+            slotsAvailable = NodePool.slotsAvailable(lesserPools);
+        }
+        int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + 
slotsAvailable);
+        LOG.debug("Slots... requested {} used {} free {} available {} to be 
used {}",
+                  slotsRequested, slotsUsed, slotsFree, slotsAvailable, 
slotsToUse);
+        if (slotsToUse <= 0) {
+            _cluster.setStatus(topId, "Not Enough Slots Available to Schedule 
Topology");
+            return 0;
+        }
+        int slotsNeeded = slotsToUse - slotsFree;
+        int numNewNodes = NodePool.getNodeCountIfSlotsWereTaken(slotsNeeded, 
lesserPools);
+        LOG.debug("Nodes... new {} used {} max {}",
+                  numNewNodes, _usedNodes, _maxNodes);
+        if ((numNewNodes + _usedNodes) > _maxNodes) {
+            _cluster.setStatus(topId, "Max Nodes(" + _maxNodes + ") for this 
user would be exceeded. " +
+                                      (numNewNodes - (_maxNodes - _usedNodes)) 
+ " more nodes needed to run topology.");
+            return 0;
+        }
+
+        Collection<Node> found = NodePool.takeNodesBySlot(slotsNeeded, 
lesserPools);
+        _usedNodes += found.size();
+        allNodes.addAll(found);
+        return slotsToUse;
     }
-    
-    Collection<Node> found = NodePool.takeNodesBySlot(slotsNeeded, 
lesserPools);
-    _usedNodes += found.size();
-    allNodes.addAll(found);
-    return slotsToUse;
-  }
 
-  @Override
-  public Collection<Node> takeNodes(int nodesNeeded) {
-    LOG.debug("Taking {} from {}", nodesNeeded, this);
-    HashSet<Node> ret = new HashSet<>();
-    for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) {
-      if (!_isolated.contains(entry.getKey())) {
-        Iterator<Node> it = entry.getValue().iterator();
-        while (it.hasNext()) {
-          if (nodesNeeded <= 0) {
-            return ret;
-          }
-          Node n = it.next();
-          it.remove();
-          n.freeAllSlots(_cluster);
-          ret.add(n);
-          nodesNeeded--;
-          _usedNodes--;
+    @Override
+    public Collection<Node> takeNodes(int nodesNeeded) {
+        LOG.debug("Taking {} from {}", nodesNeeded, this);
+        HashSet<Node> ret = new HashSet<>();
+        for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) {
+            if (!_isolated.contains(entry.getKey())) {
+                Iterator<Node> it = entry.getValue().iterator();
+                while (it.hasNext()) {
+                    if (nodesNeeded <= 0) {
+                        return ret;
+                    }
+                    Node n = it.next();
+                    it.remove();
+                    n.freeAllSlots(_cluster);
+                    ret.add(n);
+                    nodesNeeded--;
+                    _usedNodes--;
+                }
+            }
         }
-      }
+        return ret;
     }
-    return ret;
-  }
-  
-  @Override
-  public int nodesAvailable() {
-    int total = 0;
-    for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) {
-      if (!_isolated.contains(entry.getKey())) {
-        total += entry.getValue().size();
-      }
+
+    @Override
+    public int nodesAvailable() {
+        int total = 0;
+        for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) {
+            if (!_isolated.contains(entry.getKey())) {
+                total += entry.getValue().size();
+            }
+        }
+        return total;
     }
-    return total;
-  }
-  
-  @Override
-  public int slotsAvailable() {
-    int total = 0;
-    for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) {
-      if (!_isolated.contains(entry.getKey())) {
-        total += Node.countTotalSlotsAlive(entry.getValue());
-      }
+
+    @Override
+    public int slotsAvailable() {
+        int total = 0;
+        for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) {
+            if (!_isolated.contains(entry.getKey())) {
+                total += Node.countTotalSlotsAlive(entry.getValue());
+            }
+        }
+        return total;
     }
-    return total;
-  }
 
-  @Override
-  public Collection<Node> takeNodesBySlots(int slotsNeeded) {
-    HashSet<Node> ret = new HashSet<>();
-    for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) {
-      if (!_isolated.contains(entry.getKey())) {
-        Iterator<Node> it = entry.getValue().iterator();
-        while (it.hasNext()) {
-          Node n = it.next();
-          if (n.isAlive()) {
-            it.remove();
-            _usedNodes--;
-            n.freeAllSlots(_cluster);
-            ret.add(n);
-            slotsNeeded -= n.totalSlots();
-            if (slotsNeeded <= 0) {
-              return ret;
+    @Override
+    public Collection<Node> takeNodesBySlots(int slotsNeeded) {
+        HashSet<Node> ret = new HashSet<>();
+        for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) {
+            if (!_isolated.contains(entry.getKey())) {
+                Iterator<Node> it = entry.getValue().iterator();
+                while (it.hasNext()) {
+                    Node n = it.next();
+                    if (n.isAlive()) {
+                        it.remove();
+                        _usedNodes--;
+                        n.freeAllSlots(_cluster);
+                        ret.add(n);
+                        slotsNeeded -= n.totalSlots();
+                        if (slotsNeeded <= 0) {
+                            return ret;
+                        }
+                    }
+                }
             }
-          }
         }
-      }
+        return ret;
     }
-    return ret;
-  }
-  
-  @Override
-  public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int 
slotsNeeded) {
-    int nodesFound = 0;
-    int slotsFound = 0;
-    for (Entry<String, Set<Node>> entry: _topologyIdToNodes.entrySet()) {
-      if (!_isolated.contains(entry.getKey())) {
-        for (Node n : entry.getValue()) {
-          if (n.isAlive()) {
-            nodesFound++;
-            int totalSlotsFree = n.totalSlots();
-            slotsFound += totalSlotsFree;
-            slotsNeeded -= totalSlotsFree;
-            if (slotsNeeded <= 0) {
-              return new NodeAndSlotCounts(nodesFound, slotsFound);
+
+    @Override
+    public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int 
slotsNeeded) {
+        int nodesFound = 0;
+        int slotsFound = 0;
+        for (Entry<String, Set<Node>> entry : _topologyIdToNodes.entrySet()) {
+            if (!_isolated.contains(entry.getKey())) {
+                for (Node n : entry.getValue()) {
+                    if (n.isAlive()) {
+                        nodesFound++;
+                        int totalSlotsFree = n.totalSlots();
+                        slotsFound += totalSlotsFree;
+                        slotsNeeded -= totalSlotsFree;
+                        if (slotsNeeded <= 0) {
+                            return new NodeAndSlotCounts(nodesFound, 
slotsFound);
+                        }
+                    }
+                }
             }
-          }
         }
-      }
+        return new NodeAndSlotCounts(nodesFound, slotsFound);
+    }
+
+    @Override
+    public String toString() {
+        return "IsolatedPool... ";
     }
-    return new NodeAndSlotCounts(nodesFound, slotsFound);
-  }
-  
-  @Override
-  public String toString() {
-    return "IsolatedPool... ";
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
index 65c657e..24c008e 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.multitenant;
@@ -32,94 +26,94 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class MultitenantScheduler implements IScheduler {
-  private static final Logger LOG = 
LoggerFactory.getLogger(MultitenantScheduler.class);
-  private Map<String, Object> conf;
-  protected IConfigLoader configLoader;
-  
-  @Override
-  public void prepare(Map<String, Object> conf) {
-    this.conf = conf;
-    configLoader = ConfigLoaderFactoryService.createConfigLoader(conf);
-
-  }
-
-  /**
-   * Load from configLoaders first; if no config available, read from 
multitenant-scheduler.yaml;
-   * if no config available from multitenant-scheduler.yaml, get configs from 
conf. Only one will be used.
-   * @return User pool configs.
-   */
-  private Map<String, Number> getUserConf() {
-    Map<String, Number> ret;
-
-    // Try the loader plugin, if configured
-    if (configLoader != null) {
-      ret = (Map<String, Number>) 
configLoader.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
-      if (ret != null) {
-        return ret;
-      } else {
-        LOG.warn("Config loader returned null. Will try to read from 
multitenant-scheduler.yaml");
-      }
-    }
+    private static final Logger LOG = 
LoggerFactory.getLogger(MultitenantScheduler.class);
+    protected IConfigLoader configLoader;
+    private Map<String, Object> conf;
 
-    // If that fails, fall back on the multitenant-scheduler.yaml file
-    Map fromFile = Utils.findAndReadConfigFile("multitenant-scheduler.yaml", 
false);
-    ret = (Map<String, 
Number>)fromFile.get(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
-    if (ret != null) {
-      return ret;
-    } else {
-        LOG.warn("Reading from multitenant-scheduler.yaml returned null. This 
could because the file is not available. "
-                + "Will load configs from storm configuration");
-    }
+    @Override
+    public void prepare(Map<String, Object> conf) {
+        this.conf = conf;
+        configLoader = ConfigLoaderFactoryService.createConfigLoader(conf);
 
-    // If that fails, use config
-    ret = (Map<String, Number>) 
conf.get(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
-    if (ret == null) {
-      return new HashMap<>();
-    } else {
-      return ret;
     }
-  }
-
-  @Override
-  public Map config() {
-    return getUserConf();
-  }
- 
-  @Override
-  public void schedule(Topologies topologies, Cluster cluster) {
-    LOG.debug("Rerunning scheduling...");
-    Map<String, Node> nodeIdToNode = Node.getAllNodesFrom(cluster);
-    
-    Map<String, Number> userConf = getUserConf();
-    
-    Map<String, IsolatedPool> userPools = new HashMap<>();
-    for (Map.Entry<String, Number> entry : userConf.entrySet()) {
-      userPools.put(entry.getKey(), new 
IsolatedPool(entry.getValue().intValue()));
-    }
-    DefaultPool defaultPool = new DefaultPool();
-    FreePool freePool = new FreePool();
-    
-    freePool.init(cluster, nodeIdToNode);
-    for (IsolatedPool pool : userPools.values()) {
-      pool.init(cluster, nodeIdToNode);
+
+    /**
+     * Load from configLoaders first; if no config available, read from 
multitenant-scheduler.yaml;
+     * if no config available from multitenant-scheduler.yaml, get configs 
from conf. Only one will be used.
+     * @return User pool configs.
+     */
+    private Map<String, Number> getUserConf() {
+        Map<String, Number> ret;
+
+        // Try the loader plugin, if configured
+        if (configLoader != null) {
+            ret = (Map<String, Number>) 
configLoader.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
+            if (ret != null) {
+                return ret;
+            } else {
+                LOG.warn("Config loader returned null. Will try to read from 
multitenant-scheduler.yaml");
+            }
+        }
+
+        // If that fails, fall back on the multitenant-scheduler.yaml file
+        Map fromFile = 
Utils.findAndReadConfigFile("multitenant-scheduler.yaml", false);
+        ret = (Map<String, Number>) 
fromFile.get(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
+        if (ret != null) {
+            return ret;
+        } else {
+            LOG.warn("Reading from multitenant-scheduler.yaml returned null. 
This could because the file is not available. "
+                     + "Will load configs from storm configuration");
+        }
+
+        // If that fails, use config
+        ret = (Map<String, Number>) 
conf.get(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
+        if (ret == null) {
+            return new HashMap<>();
+        } else {
+            return ret;
+        }
     }
-    defaultPool.init(cluster, nodeIdToNode);
-    
-    for (TopologyDetails td: topologies.getTopologies()) {
-      String user = td.getTopologySubmitter();
-      LOG.debug("Found top {} run by user {}",td.getId(), user);
-      NodePool pool = userPools.get(user);
-      if (pool == null || !pool.canAdd(td)) {
-        pool = defaultPool;
-      }
-      pool.addTopology(td);
+
+    @Override
+    public Map config() {
+        return getUserConf();
     }
-    
-    //Now schedule all of the topologies that need to be scheduled
-    for (IsolatedPool pool : userPools.values()) {
-      pool.scheduleAsNeeded(freePool, defaultPool);
+
+    @Override
+    public void schedule(Topologies topologies, Cluster cluster) {
+        LOG.debug("Rerunning scheduling...");
+        Map<String, Node> nodeIdToNode = Node.getAllNodesFrom(cluster);
+
+        Map<String, Number> userConf = getUserConf();
+
+        Map<String, IsolatedPool> userPools = new HashMap<>();
+        for (Map.Entry<String, Number> entry : userConf.entrySet()) {
+            userPools.put(entry.getKey(), new 
IsolatedPool(entry.getValue().intValue()));
+        }
+        DefaultPool defaultPool = new DefaultPool();
+        FreePool freePool = new FreePool();
+
+        freePool.init(cluster, nodeIdToNode);
+        for (IsolatedPool pool : userPools.values()) {
+            pool.init(cluster, nodeIdToNode);
+        }
+        defaultPool.init(cluster, nodeIdToNode);
+
+        for (TopologyDetails td : topologies.getTopologies()) {
+            String user = td.getTopologySubmitter();
+            LOG.debug("Found top {} run by user {}", td.getId(), user);
+            NodePool pool = userPools.get(user);
+            if (pool == null || !pool.canAdd(td)) {
+                pool = defaultPool;
+            }
+            pool.addTopology(td);
+        }
+
+        //Now schedule all of the topologies that need to be scheduled
+        for (IsolatedPool pool : userPools.values()) {
+            pool.scheduleAsNeeded(freePool, defaultPool);
+        }
+        defaultPool.scheduleAsNeeded(freePool);
+        LOG.debug("Scheduling done...");
     }
-    defaultPool.scheduleAsNeeded(freePool);
-    LOG.debug("Scheduling done...");
-  }
 }

Reply via email to