http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java
 
b/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java
index a6ceade..3fa40e6 100644
--- 
a/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java
+++ 
b/storm-server/src/main/java/org/apache/storm/pacemaker/codec/ThriftNettyServerCodec.java
@@ -1,22 +1,20 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.pacemaker.codec;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.messaging.netty.ISaslServer;
 import org.apache.storm.messaging.netty.IServer;
@@ -24,9 +22,6 @@ import 
org.apache.storm.messaging.netty.KerberosSaslServerHandler;
 import org.apache.storm.messaging.netty.SaslStormServerHandler;
 import org.apache.storm.messaging.netty.StormServerHandler;
 import org.apache.storm.security.auth.AuthUtils;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Map;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
@@ -37,20 +32,14 @@ public class ThriftNettyServerCodec {
 
     public static final String SASL_HANDLER = "sasl-handler";
     public static final String KERBEROS_HANDLER = "kerberos-handler";
-    
-    public enum AuthMethod {
-        DIGEST,
-        KERBEROS,
-        NONE
-    };
-    
+    private static final Logger LOG = LoggerFactory
+        .getLogger(ThriftNettyServerCodec.class);
+
+    ;
+    private final int thriftMessageMaxSize;
     private IServer server;
     private AuthMethod authMethod;
     private Map<String, Object> topoConf;
-    private final int thriftMessageMaxSize;
-    
-    private static final Logger LOG = LoggerFactory
-        .getLogger(ThriftNettyServerCodec.class);
 
     public ThriftNettyServerCodec(IServer server, Map<String, Object> topoConf,
                                   AuthMethod authMethod, int 
thriftMessageMaxSizeBytes) {
@@ -67,30 +56,26 @@ public class ThriftNettyServerCodec {
                 ChannelPipeline pipeline = Channels.pipeline();
                 pipeline.addLast("encoder", new ThriftEncoder());
                 pipeline.addLast("decoder", new 
ThriftDecoder(thriftMessageMaxSize));
-                if(authMethod == AuthMethod.DIGEST) {
+                if (authMethod == AuthMethod.DIGEST) {
                     try {
                         LOG.debug("Adding SaslStormServerHandler to pacemaker 
server pipeline.");
-                        pipeline.addLast(SASL_HANDLER, new 
SaslStormServerHandler((ISaslServer)server));
-                    }
-                    catch (IOException e) {
+                        pipeline.addLast(SASL_HANDLER, new 
SaslStormServerHandler((ISaslServer) server));
+                    } catch (IOException e) {
                         throw new RuntimeException(e);
                     }
-                }
-                else if(authMethod == AuthMethod.KERBEROS) {
+                } else if (authMethod == AuthMethod.KERBEROS) {
                     try {
                         LOG.debug("Adding KerberosSaslServerHandler to 
pacemaker server pipeline.");
                         ArrayList<String> authorizedUsers = new ArrayList(1);
-                        
authorizedUsers.add((String)topoConf.get(DaemonConfig.NIMBUS_DAEMON_USER));
-                        pipeline.addLast(KERBEROS_HANDLER, new 
KerberosSaslServerHandler((ISaslServer)server,
+                        authorizedUsers.add((String) 
topoConf.get(DaemonConfig.NIMBUS_DAEMON_USER));
+                        pipeline.addLast(KERBEROS_HANDLER, new 
KerberosSaslServerHandler((ISaslServer) server,
                                                                                
          topoConf,
                                                                                
          AuthUtils.LOGIN_CONTEXT_PACEMAKER_SERVER,
                                                                                
          authorizedUsers));
-                    }
-                    catch (IOException e) {
+                    } catch (IOException e) {
                         throw new RuntimeException(e);
                     }
-                }
-                else if(authMethod == AuthMethod.NONE) {
+                } else if (authMethod == AuthMethod.NONE) {
                     LOG.debug("Not authenticating any clients. AuthMethod is 
NONE");
                 }
 
@@ -99,4 +84,10 @@ public class ThriftNettyServerCodec {
             }
         };
     }
+
+    public enum AuthMethod {
+        DIGEST,
+        KERBEROS,
+        NONE
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java 
b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index c5a5ee5..bc08579 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -29,7 +29,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.stream.Collectors;
-
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
 import org.apache.storm.daemon.nimbus.TopologyResources;
@@ -49,40 +48,33 @@ import org.slf4j.LoggerFactory;
 
 public class Cluster implements ISchedulingState {
     private static final Logger LOG = LoggerFactory.getLogger(Cluster.class);
-    private SchedulerAssignmentImpl assignment;
-
     /**
      * key: supervisor id, value: supervisor details.
      */
     private final Map<String, SupervisorDetails> supervisors = new HashMap<>();
-
     /**
      * key: rack, value: nodes in that rack.
      */
     private final Map<String, List<String>> networkTopography = new 
HashMap<>();
-
     /**
      * key: topologyId, value: topology's current assignments.
      */
     private final Map<String, SchedulerAssignmentImpl> assignments = new 
HashMap<>();
-
     /**
      * key topologyId, Value: scheduler's status.
      */
     private final Map<String, String> status = new HashMap<>();
-
     /**
      * A map from hostname to supervisor ids.
      */
     private final Map<String, List<String>> hostToId = new HashMap<>();
-
     private final Map<String, Object> conf;
-
-    private Set<String> blackListedHosts = new HashSet<>();
-    private INimbus inimbus;
     private final Topologies topologies;
     private final Map<String, Map<WorkerSlot, NormalizedResourceRequest>> 
nodeToScheduledResourcesCache;
     private final Map<String, Set<WorkerSlot>> nodeToUsedSlotsCache;
+    private SchedulerAssignmentImpl assignment;
+    private Set<String> blackListedHosts = new HashSet<>();
+    private INimbus inimbus;
 
     public Cluster(
         INimbus nimbus,
@@ -111,7 +103,7 @@ public class Cluster implements ISchedulingState {
     /**
      * Testing Constructor that takes an existing cluster and replaces the 
topologies in it.
      *
-     * @param src the original cluster
+     * @param src        the original cluster
      * @param topologies the new topolgoies to use
      */
     @VisibleForTesting
@@ -195,8 +187,61 @@ public class Cluster implements ISchedulingState {
     }
 
     /**
-     * Check if the given topology is allowed for modification right now. If 
not throw an
-     * IllegalArgumentException else go on.
+     * Get heap memory usage for a worker's main process and logwriter process.
+     *
+     * @param topConf - the topology config
+     * @return the assigned memory (in MB)
+     */
+    public static double getAssignedMemoryForSlot(final Map<String, Object> 
topConf) {
+        double totalWorkerMemory = 0.0;
+        final Integer topologyWorkerDefaultMemoryAllocation = 768;
+
+        List<String> topologyWorkerGcChildopts = ConfigUtils.getValueAsList(
+            Config.TOPOLOGY_WORKER_GC_CHILDOPTS, topConf);
+        List<String> workerGcChildopts = ConfigUtils.getValueAsList(
+            Config.WORKER_GC_CHILDOPTS, topConf);
+        Double memGcChildopts = null;
+        memGcChildopts = Utils.parseJvmHeapMemByChildOpts(
+            topologyWorkerGcChildopts, null);
+        if (memGcChildopts == null) {
+            memGcChildopts = Utils.parseJvmHeapMemByChildOpts(
+                workerGcChildopts, null);
+        }
+
+        List<String> topologyWorkerChildopts = ConfigUtils.getValueAsList(
+            Config.TOPOLOGY_WORKER_CHILDOPTS, topConf);
+        Double memTopologyWorkerChildopts = Utils.parseJvmHeapMemByChildOpts(
+            topologyWorkerChildopts, null);
+
+        List<String> workerChildopts = ConfigUtils.getValueAsList(
+            Config.WORKER_CHILDOPTS, topConf);
+        Double memWorkerChildopts = Utils.parseJvmHeapMemByChildOpts(
+            workerChildopts, null);
+
+        if (memGcChildopts != null) {
+            totalWorkerMemory += memGcChildopts;
+        } else if (memTopologyWorkerChildopts != null) {
+            totalWorkerMemory += memTopologyWorkerChildopts;
+        } else if (memWorkerChildopts != null) {
+            totalWorkerMemory += memWorkerChildopts;
+        } else {
+            Object workerHeapMemoryMb = topConf.get(
+                Config.WORKER_HEAP_MEMORY_MB);
+            totalWorkerMemory += ObjectReader.getInt(
+                workerHeapMemoryMb, topologyWorkerDefaultMemoryAllocation);
+        }
+
+        List<String> topoWorkerLwChildopts = ConfigUtils.getValueAsList(
+            Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS, topConf);
+        if (topoWorkerLwChildopts != null) {
+            totalWorkerMemory += Utils.parseJvmHeapMemByChildOpts(
+                topoWorkerLwChildopts, 0.0);
+        }
+        return totalWorkerMemory;
+    }
+
+    /**
+     * Check if the given topology is allowed for modification right now. If 
not throw an IllegalArgumentException else go on.
      *
      * @param topologyId the id of the topology to check
      */
@@ -204,6 +249,16 @@ public class Cluster implements ISchedulingState {
         //NOOP
     }
 
+    @Override
+    public Topologies getTopologies() {
+        return topologies;
+    }
+
+    @Override
+    public Set<String> getBlacklistedHosts() {
+        return blackListedHosts;
+    }
+
     /**
      * Set the list of hosts that are blacklisted.
      *
@@ -218,16 +273,6 @@ public class Cluster implements ISchedulingState {
         blackListedHosts.addAll(hosts);
     }
 
-    @Override
-    public Topologies getTopologies() {
-        return topologies;
-    }
-
-    @Override
-    public Set<String> getBlacklistedHosts() {
-        return blackListedHosts;
-    }
-
     public void blacklistHost(String host) {
         blackListedHosts.add(host);
     }
@@ -304,7 +349,8 @@ public class Cluster implements ISchedulingState {
 
     @Override
     public Set<Integer> getUsedPorts(SupervisorDetails supervisor) {
-        return nodeToUsedSlotsCache.computeIfAbsent(supervisor.getId(), (x) -> 
new HashSet<>()).stream().map(WorkerSlot::getPort).collect(Collectors.toSet());
+        return nodeToUsedSlotsCache.computeIfAbsent(supervisor.getId(), (x) -> 
new HashSet<>()).stream().map(WorkerSlot::getPort)
+                                   .collect(Collectors.toSet());
     }
 
     @Override
@@ -424,12 +470,12 @@ public class Cluster implements ISchedulingState {
             totalResources.addOnHeap(shared.get_off_heap_worker());
 
             addResource(
-                    sharedTotalResources,
-                    Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 
shared.get_off_heap_worker()
+                sharedTotalResources,
+                Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 
shared.get_off_heap_worker()
             );
             addResource(
-                    sharedTotalResources,
-                    Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 
shared.get_on_heap()
+                sharedTotalResources,
+                Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 
shared.get_on_heap()
             );
         }
         sharedTotalResources = 
NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(sharedTotalResources);
@@ -441,12 +487,12 @@ public class Cluster implements ISchedulingState {
         ret.set_mem_off_heap(totalResources.getOffHeapMemoryMb());
         ret.set_mem_on_heap(totalResources.getOnHeapMemoryMb());
         ret.set_shared_mem_off_heap(
-                sharedTotalResources.getOrDefault(
-                        Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0)
+            sharedTotalResources.getOrDefault(
+                Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, 0.0)
         );
         ret.set_shared_mem_on_heap(
-                sharedTotalResources.getOrDefault(
-                        Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0)
+            sharedTotalResources.getOrDefault(
+                Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME, 0.0)
         );
         return ret;
     }
@@ -493,22 +539,22 @@ public class Cluster implements ISchedulingState {
         if (memoryAdded > memoryAvailable) {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Could not schedule {}:{} on {} not enough Mem {} > 
{}",
-                    td.getName(),
-                    exec,
-                    ws,
-                    memoryAdded,
-                    memoryAvailable);
+                          td.getName(),
+                          exec,
+                          ws,
+                          memoryAdded,
+                          memoryAvailable);
             }
             return false;
         }
         if (afterOnHeap > maxHeap) {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Could not schedule {}:{} on {} HEAP would be too 
large {} > {}",
-                    td.getName(),
-                    exec,
-                    ws,
-                    afterOnHeap,
-                    maxHeap);
+                          td.getName(),
+                          exec,
+                          ws,
+                          afterOnHeap,
+                          maxHeap);
             }
             return false;
         }
@@ -531,9 +577,9 @@ public class Cluster implements ISchedulingState {
         if (td == null) {
             throw new IllegalArgumentException(
                 "Trying to schedule for topo "
-                    + topologyId
-                    + " but that is not a known topology "
-                    + topologies.getAllIds());
+                + topologyId
+                + " but that is not a known topology "
+                + topologies.getAllIds());
         }
         WorkerResources resources = calculateWorkerResources(td, executors);
         SchedulerAssignmentImpl assignment = assignments.get(topologyId);
@@ -545,14 +591,14 @@ public class Cluster implements ISchedulingState {
                 if (assignment.isExecutorAssigned(executor)) {
                     throw new RuntimeException(
                         "Attempting to assign executor: "
-                            + executor
-                            + " of topology: "
-                            + topologyId
-                            + " to workerslot: "
-                            + slot
-                            + ". The executor is already assigned to 
workerslot: "
-                            + assignment.getExecutorToSlot().get(executor)
-                            + ". The executor must unassigned before it can be 
assigned to another slot!");
+                        + executor
+                        + " of topology: "
+                        + topologyId
+                        + " to workerslot: "
+                        + slot
+                        + ". The executor is already assigned to workerslot: "
+                        + assignment.getExecutorToSlot().get(executor)
+                        + ". The executor must unassigned before it can be 
assigned to another slot!");
                 }
             }
         }
@@ -587,7 +633,7 @@ public class Cluster implements ISchedulingState {
     /**
      * Calculate the amount of shared off heap memory on a given nodes with 
the given assignment.
      *
-     * @param nodeId the id of the node
+     * @param nodeId     the id of the node
      * @param assignment the current assignment
      * @return the amount of shared off heap memory for that node in MB
      */
@@ -759,59 +805,6 @@ public class Cluster implements ISchedulingState {
     }
 
     /**
-     * Get heap memory usage for a worker's main process and logwriter process.
-     * @param topConf - the topology config
-     * @return the assigned memory (in MB)
-     */
-    public static double getAssignedMemoryForSlot(final Map<String, Object> 
topConf) {
-        double totalWorkerMemory = 0.0;
-        final Integer topologyWorkerDefaultMemoryAllocation = 768;
-
-        List<String> topologyWorkerGcChildopts = ConfigUtils.getValueAsList(
-            Config.TOPOLOGY_WORKER_GC_CHILDOPTS, topConf);
-        List<String> workerGcChildopts = ConfigUtils.getValueAsList(
-            Config.WORKER_GC_CHILDOPTS, topConf);
-        Double memGcChildopts = null;
-        memGcChildopts = Utils.parseJvmHeapMemByChildOpts(
-            topologyWorkerGcChildopts, null);
-        if (memGcChildopts == null) {
-            memGcChildopts = Utils.parseJvmHeapMemByChildOpts(
-                workerGcChildopts, null);
-        }
-
-        List<String> topologyWorkerChildopts = ConfigUtils.getValueAsList(
-            Config.TOPOLOGY_WORKER_CHILDOPTS, topConf);
-        Double memTopologyWorkerChildopts = Utils.parseJvmHeapMemByChildOpts(
-            topologyWorkerChildopts, null);
-
-        List<String> workerChildopts = ConfigUtils.getValueAsList(
-            Config.WORKER_CHILDOPTS, topConf);
-        Double memWorkerChildopts = Utils.parseJvmHeapMemByChildOpts(
-            workerChildopts, null);
-
-        if (memGcChildopts != null) {
-            totalWorkerMemory += memGcChildopts;
-        } else if (memTopologyWorkerChildopts != null) {
-            totalWorkerMemory += memTopologyWorkerChildopts;
-        } else if (memWorkerChildopts != null) {
-            totalWorkerMemory += memWorkerChildopts;
-        } else {
-            Object workerHeapMemoryMb = topConf.get(
-                Config.WORKER_HEAP_MEMORY_MB);
-            totalWorkerMemory += ObjectReader.getInt(
-                workerHeapMemoryMb, topologyWorkerDefaultMemoryAllocation);
-        }
-
-        List<String> topoWorkerLwChildopts = ConfigUtils.getValueAsList(
-            Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS, topConf);
-        if (topoWorkerLwChildopts != null) {
-            totalWorkerMemory += Utils.parseJvmHeapMemByChildOpts(
-                topoWorkerLwChildopts, 0.0);
-        }
-        return totalWorkerMemory;
-    }
-
-    /**
      * set scheduler status for a topology.
      */
     public void setStatus(TopologyDetails td, String statusMessage) {
@@ -837,10 +830,6 @@ public class Cluster implements ISchedulingState {
         return status;
     }
 
-    public String getStatus(String topoId) {
-        return status.get(topoId);
-    }
-
     /**
      * set scheduler status map.
      */
@@ -858,6 +847,10 @@ public class Cluster implements ISchedulingState {
         this.status.putAll(statusMap);
     }
 
+    public String getStatus(String topoId) {
+        return status.get(topoId);
+    }
+
     @Override
     public Map<String, TopologyResources> getTopologyResourcesMap() {
         Map<String, TopologyResources> ret = new HashMap<>(assignments.size());
@@ -925,7 +918,7 @@ public class Cluster implements ISchedulingState {
 
 
     /**
-     *  This medhod updates ScheduledResources and UsedSlots cache for given 
workerSlot
+     * This medhod updates ScheduledResources and UsedSlots cache for given 
workerSlot
      *
      * @param workerSlot
      * @param workerResources

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/Component.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/Component.java 
b/storm-server/src/main/java/org/apache/storm/scheduler/Component.java
index 6a0de72..c4a28c3 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Component.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Component.java
@@ -21,7 +21,6 @@ package org.apache.storm.scheduler;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-
 import org.apache.storm.generated.ComponentType;
 
 public class Component {
@@ -33,9 +32,10 @@ public class Component {
 
     /**
      * Create a new component.
-     * @param type the type of component this is
+     *
+     * @param type   the type of component this is
      * @param compId the id of the component
-     * @param execs the executors for this component.
+     * @param execs  the executors for this component.
      */
     public Component(ComponentType type, String compId, List<ExecutorDetails> 
execs) {
         this.type = type;
@@ -76,13 +76,13 @@ public class Component {
     @Override
     public String toString() {
         return "{id: "
-            + getId()
-            + " Parents: "
-            + getParents()
-            + " Children: "
-            + getChildren()
-            + " Execs: "
-            + getExecs()
-            + "}";
+               + getId()
+               + " Parents: "
+               + getParents()
+               + " Children: "
+               + getChildren()
+               + " Execs: "
+               + getExecs()
+               + "}";
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java 
b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
index 82630ca..84498a3 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.scheduler;
 
 import java.util.HashMap;
@@ -23,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-
 import org.apache.storm.utils.Utils;
 
 public class DefaultScheduler implements IScheduler {
@@ -44,7 +44,7 @@ public class DefaultScheduler implements IScheduler {
             }
 
             for (WorkerSlot slot : slots) {
-                existingSlots.remove(slot);                
+                existingSlots.remove(slot);
             }
 
             return existingSlots.keySet();
@@ -74,7 +74,8 @@ public class DefaultScheduler implements IScheduler {
             List<WorkerSlot> availableSlots = cluster.getAvailableSlots();
             Set<ExecutorDetails> allExecutors = topology.getExecutors();
 
-            Map<WorkerSlot, List<ExecutorDetails>> aliveAssigned = 
EvenScheduler.getAliveAssignedWorkerSlotExecutors(cluster, topology.getId());
+            Map<WorkerSlot, List<ExecutorDetails>> aliveAssigned =
+                EvenScheduler.getAliveAssignedWorkerSlotExecutors(cluster, 
topology.getId());
             Set<ExecutorDetails> aliveExecutors = new 
HashSet<ExecutorDetails>();
             for (List<ExecutorDetails> list : aliveAssigned.values()) {
                 aliveExecutors.addAll(list);
@@ -85,7 +86,7 @@ public class DefaultScheduler implements IScheduler {
 
             Set<WorkerSlot> badSlots = null;
             if (totalSlotsToUse > aliveAssigned.size() || 
!allExecutors.equals(aliveExecutors)) {
-                badSlots = badSlots(aliveAssigned, allExecutors.size(), 
totalSlotsToUse);                
+                badSlots = badSlots(aliveAssigned, allExecutors.size(), 
totalSlotsToUse);
             }
             if (badSlots != null) {
                 cluster.freeSlots(badSlots);

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java 
b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
index 9cbce18..f4ac258 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
@@ -20,7 +20,6 @@ package org.apache.storm.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -30,7 +29,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-
 import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
@@ -116,7 +114,7 @@ public class EvenScheduler implements IScheduler {
 
         //allow requesting slots number bigger than available slots
         int toIndex = (totalSlotsToUse - aliveAssigned.size())
-            > sortedList.size() ? sortedList.size() : (totalSlotsToUse - 
aliveAssigned.size());
+                      > sortedList.size() ? sortedList.size() : 
(totalSlotsToUse - aliveAssigned.size());
         List<WorkerSlot> reassignSlots = sortedList.subList(0, toIndex);
 
         Set<ExecutorDetails> aliveExecutors = new HashSet<ExecutorDetails>();

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java 
b/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java
index bbbbf3f..855cc96 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java
@@ -40,8 +40,8 @@ public class ExecutorDetails {
         if (other == null || !(other instanceof ExecutorDetails)) {
             return false;
         }
-        
-        ExecutorDetails executor = (ExecutorDetails)other;
+
+        ExecutorDetails executor = (ExecutorDetails) other;
         return (this.startTask == executor.startTask) && (this.endTask == 
executor.endTask);
     }
 
@@ -49,7 +49,7 @@ public class ExecutorDetails {
     public int hashCode() {
         return this.startTask + 13 * this.endTask;
     }
-    
+
     @Override
     public String toString() {
         return "[" + this.startTask + ", " + this.endTask + "]";

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/INimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/INimbus.java 
b/storm-server/src/main/java/org/apache/storm/scheduler/INimbus.java
index db2fed9..4e26708 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/INimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/INimbus.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler;
@@ -42,6 +36,6 @@ public interface INimbus {
      * map from node id to supervisor details.
      */
     String getHostName(Map<String, SupervisorDetails> existingSupervisors, 
String nodeId);
-    
-    IScheduler getForcedScheduler(); 
+
+    IScheduler getForcedScheduler();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java 
b/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java
index 5b79f11..18e9afd 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler;
@@ -21,9 +15,9 @@ package org.apache.storm.scheduler;
 import java.util.Map;
 
 public interface IScheduler {
-    
+
     void prepare(Map<String, Object> conf);
-    
+
     /**
      * Set assignments for the topologies which needs scheduling. The new 
assignments is available 
      * through `cluster.getAssignments()`

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java 
b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
index 187a03c..b0d94b5 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java
@@ -27,17 +27,21 @@ import org.apache.storm.generated.WorkerResources;
 import 
org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
 import 
org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
 
-/** An interface that provides access to the current scheduling state. */
+/**
+ * An interface that provides access to the current scheduling state.
+ */
 public interface ISchedulingState {
 
     /**
      * Get all of the topologies.
+     *
      * @return all of the topologies that are a part of the cluster.
      */
     Topologies getTopologies();
 
     /**
      * Get all of the topologies that need scheduling.
+     *
      * @return all of the topologies that are not fully scheduled.
      */
     List<TopologyDetails> needsSchedulingTopologies();
@@ -48,16 +52,16 @@ public interface ISchedulingState {
      * <p>A topology needs scheduling if one of the following conditions holds:
      *
      * <ul>
-     *   <li>Although the topology is assigned slots, but is squeezed. i.e. 
the topology is assigned
-     *       less slots than desired.
-     *   <li>There are unassigned executors in this topology
+     * <li>Although the topology is assigned slots, but is squeezed. i.e. the 
topology is assigned
+     * less slots than desired.
+     * <li>There are unassigned executors in this topology
      * </ul>
      */
     boolean needsScheduling(TopologyDetails topology);
 
     /**
-     * Like {@link #needsScheduling(TopologyDetails)} but does not take into 
account the number of
-     * workers requested. This is because the number of workers is ignored in 
RAS
+     * Like {@link #needsScheduling(TopologyDetails)} but does not take into 
account the number of workers requested. This is because the
+     * number of workers is ignored in RAS
      *
      * @param topology the topology to check
      * @return true if the topology needs scheduling else false.
@@ -66,6 +70,7 @@ public interface ISchedulingState {
 
     /**
      * Get all of the hosts that are blacklisted.
+     *
      * @return all of the hosts that are blacklisted
      */
     Set<String> getBlacklistedHosts();
@@ -104,6 +109,7 @@ public interface ISchedulingState {
 
     /**
      * Get the executor to component name map for executors that need to be 
scheduled.
+     *
      * @param topology the topology this is for
      * @return a executor -> component-id map which needs scheduling in this 
topology.
      */
@@ -111,16 +117,21 @@ public interface ISchedulingState {
 
     /**
      * Get the component name to executor list for executors that need to be 
scheduled.
+     *
      * @param topology the topology this is for
      * @return a component-id -> executors map which needs scheduling in this 
topology.
      */
     Map<String, List<ExecutorDetails>> getNeedsSchedulingComponentToExecutors(
         TopologyDetails topology);
 
-    /** Get all the used ports of this supervisor. */
+    /**
+     * Get all the used ports of this supervisor.
+     */
     Set<Integer> getUsedPorts(SupervisorDetails supervisor);
 
-    /** Return the available ports of this supervisor. */
+    /**
+     * Return the available ports of this supervisor.
+     */
     Set<Integer> getAvailablePorts(SupervisorDetails supervisor);
 
     /**
@@ -131,10 +142,14 @@ public interface ISchedulingState {
      */
     Set<Integer> getAssignablePorts(SupervisorDetails supervisor);
 
-    /** Return all the available slots on this supervisor. */
+    /**
+     * Return all the available slots on this supervisor.
+     */
     List<WorkerSlot> getAvailableSlots(SupervisorDetails supervisor);
 
-    /** Get all the available worker slots in the cluster. */
+    /**
+     * Get all the available worker slots in the cluster.
+     */
     List<WorkerSlot> getAvailableSlots();
 
     /**
@@ -145,14 +160,19 @@ public interface ISchedulingState {
      */
     List<WorkerSlot> getAssignableSlots(SupervisorDetails supervisor);
 
-    /** Get all non-blacklisted slots in the cluster. */
+    /**
+     * Get all non-blacklisted slots in the cluster.
+     */
     List<WorkerSlot> getAssignableSlots();
 
-    /** Get all currently occupied slots. */
+    /**
+     * Get all currently occupied slots.
+     */
     Collection<WorkerSlot> getUsedSlots();
 
     /**
      * Check if a slot is occupied or not.
+     *
      * @param slot the slot be to checked.
      * @return true if the specified slot is occupied.
      */
@@ -160,20 +180,20 @@ public interface ISchedulingState {
 
     /**
      * Get the number of workers assigned to a topology.
+     *
      * @param topology the topology this is for
      * @return the number of workers assigned to this topology.
      */
     int getAssignedNumWorkers(TopologyDetails topology);
 
     /**
-     * Would scheduling exec on ws fit? With a heap <= maxHeap total memory 
added <= memoryAvailable
-     * and cpu added <= cpuAvailable.
+     * Would scheduling exec on ws fit? With a heap <= maxHeap total memory 
added <= memoryAvailable and cpu added <= cpuAvailable.
      *
-     * @param ws the slot to put it in
-     * @param exec the executor to investigate
-     * @param td the topology detains for this executor
+     * @param ws                 the slot to put it in
+     * @param exec               the executor to investigate
+     * @param td                 the topology detains for this executor
      * @param resourcesAvailable all the available resources
-     * @param maxHeap the maximum heap size for ws
+     * @param maxHeap            the maximum heap size for ws
      * @return true it fits else false
      */
     boolean wouldFit(
@@ -183,13 +203,19 @@ public interface ISchedulingState {
         NormalizedResourceOffer resourcesAvailable,
         double maxHeap);
 
-    /** get the current assignment for the topology. */
+    /**
+     * get the current assignment for the topology.
+     */
     SchedulerAssignment getAssignmentById(String topologyId);
 
-    /** get slots used by a topology. */
+    /**
+     * get slots used by a topology.
+     */
     Collection<WorkerSlot> getUsedSlotsByTopologyId(String topologyId);
 
-    /** Get a specific supervisor with the <code>nodeId</code>. */
+    /**
+     * Get a specific supervisor with the <code>nodeId</code>.
+     */
     SupervisorDetails getSupervisorById(String nodeId);
 
     /**
@@ -200,45 +226,55 @@ public interface ISchedulingState {
      */
     List<SupervisorDetails> getSupervisorsByHost(String host);
 
-    /** Get all the assignments. */
+    /**
+     * Get all the assignments.
+     */
     Map<String, SchedulerAssignment> getAssignments();
 
-    /** Get all the supervisors. */
+    /**
+     * Get all the supervisors.
+     */
     Map<String, SupervisorDetails> getSupervisors();
 
-    /** Get all scheduled resources for node. **/
+    /**
+     * Get all scheduled resources for node.
+     **/
     NormalizedResourceRequest getAllScheduledResourcesForNode(String nodeId);
 
-    /** Get the total amount of CPU resources in cluster. */
+    /**
+     * Get the total amount of CPU resources in cluster.
+     */
     double getClusterTotalCpuResource();
 
-    /** Get the total amount of memory resources in cluster. */
+    /**
+     * Get the total amount of memory resources in cluster.
+     */
     double getClusterTotalMemoryResource();
 
-    /** Get the network topography (rackId -> nodes in the rack). */
+    /**
+     * Get the network topography (rackId -> nodes in the rack).
+     */
     Map<String, List<String>> getNetworkTopography();
 
-    /** Get all topology scheduler statuses. */
+    /**
+     * Get all topology scheduler statuses.
+     */
     Map<String, String> getStatusMap();
 
     /**
-     * Get the amount of resources used by topologies. Used for displaying 
resource information on the
-     * UI.
+     * Get the amount of resources used by topologies. Used for displaying 
resource information on the UI.
      *
-     * @return a map that contains multiple topologies and the resources the 
topology requested and
-     *     assigned. Key: topology id Value: an array that describes the 
resources the topology
-     *     requested and assigned in the following format: 
{requestedMemOnHeap, requestedMemOffHeap,
-     *     requestedCpu, assignedMemOnHeap, assignedMemOffHeap, assignedCpu}
+     * @return a map that contains multiple topologies and the resources the 
topology requested and assigned. Key: topology id Value: an
+     *     array that describes the resources the topology requested and 
assigned in the following format: {requestedMemOnHeap,
+     *     requestedMemOffHeap, requestedCpu, assignedMemOnHeap, 
assignedMemOffHeap, assignedCpu}
      */
     Map<String, TopologyResources> getTopologyResourcesMap();
 
     /**
-     * Get the amount of used and free resources on a supervisor. Used for 
displaying resource
-     * information on the UI
+     * Get the amount of used and free resources on a supervisor. Used for 
displaying resource information on the UI
      *
-     * @return a map where the key is the supervisor id and the value is a map 
that represents
-     *     resource usage for a supervisor in the following format: {totalMem, 
totalCpu, usedMem,
-     *     usedCpu}
+     * @return a map where the key is the supervisor id and the value is a map 
that represents resource usage for a supervisor in the
+     *     following format: {totalMem, totalCpu, usedMem, usedCpu}
      */
     Map<String, SupervisorResources> getSupervisorsResourcesMap();
 
@@ -273,6 +309,8 @@ public interface ISchedulingState {
      */
     double getScheduledCpuForNode(String nodeId);
 
-    /** Get the nimbus configuration. */
+    /**
+     * Get the nimbus configuration.
+     */
     Map<String, Object> getConf();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/ISupervisor.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/ISupervisor.java 
b/storm-server/src/main/java/org/apache/storm/scheduler/ISupervisor.java
index 4ecf28e..08d1229 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/ISupervisor.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/ISupervisor.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler;
@@ -25,6 +19,7 @@ public interface ISupervisor {
     void prepare(Map<String, Object> topoConf, String schedulerLocalDir);
 
     // for mesos, this is {hostname}-{topologyid}
+
     /**
      * The id used for writing metadata into ZK.
      */
@@ -39,7 +34,7 @@ public interface ISupervisor {
     String getAssignmentId();
 
     Object getMetadata();
-    
+
     boolean confirmAssigned(int port);
 
     // calls this before actually killing the worker locally...

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java 
b/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java
index 3201a40..03c0c6a 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.scheduler;
 
 import java.util.ArrayList;
@@ -28,7 +23,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-
 import org.apache.commons.lang.Validate;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.utils.Utils;
@@ -63,7 +57,8 @@ public class IsolationScheduler implements IScheduler {
 
     // get host -> all assignable worker slots for non-blacklisted machines 
(assigned or not assigned)
     // will then have a list of machines that need to be assigned (machine -> 
[topology, list of list of executors])
-    // match each spec to a machine (who has the right number of workers), 
free everything else on that machine and assign those slots (do one topology at 
a time)
+    // match each spec to a machine (who has the right number of workers), 
free everything else on that machine and assign those slots
+    // (do one topology at a time)
     // blacklist all machines who had production slots defined
     // log isolated topologies who weren't able to get enough slots / machines
     // run default scheduler on isolated topologies that didn't have enough 
slots + non-isolated topologies on remaining machines
@@ -85,9 +80,9 @@ public class IsolationScheduler implements IScheduler {
             int numWorkers = assignments.size();
 
             if (isoIds.contains(topologyId)
-                    && checkAssignmentTopology(assignments, topologyId)
-                    && distribution.containsKey(numWorkers)
-                    && checkAssignmentWorkerSpecs(assignments, workerSpecs)) {
+                && checkAssignmentTopology(assignments, topologyId)
+                && distribution.containsKey(numWorkers)
+                && checkAssignmentWorkerSpecs(assignments, workerSpecs)) {
                 decrementDistribution(distribution, numWorkers);
                 for (AssignmentInfo ass : assignments) {
                     workerSpecs.remove(ass.getExecutors());
@@ -127,9 +122,9 @@ public class IsolationScheduler implements IScheduler {
         List<String> failedTopologyIds = 
extractFailedTopologyIds(topologyWorkerSpecs);
         if (failedTopologyIds.size() > 0) {
             LOG.warn("Unable to isolate topologies " + failedTopologyIds
-                    + ". No machine had enough worker slots to run the 
remaining workers for these topologies. "
-                    + "Clearing all other resources and will wait for enough 
resources for "
-                    + "isolated topologies before allocating any other 
resources.");
+                     + ". No machine had enough worker slots to run the 
remaining workers for these topologies. "
+                     + "Clearing all other resources and will wait for enough 
resources for "
+                     + "isolated topologies before allocating any other 
resources.");
             // clear workers off all hosts that are not blacklisted
             Map<String, Set<WorkerSlot>> usedSlots = hostToUsedSlots(cluster);
             Set<Map.Entry<String, Set<WorkerSlot>>> entries = 
usedSlots.entrySet();
@@ -176,7 +171,7 @@ public class IsolationScheduler implements IScheduler {
 
     private List<String> extractFailedTopologyIds(Map<String, 
Set<Set<ExecutorDetails>>> isoTopologyWorkerSpecs) {
         List<String> failedTopologyIds = new ArrayList<String>();
-        for (Map.Entry<String, Set<Set<ExecutorDetails>>> topoWorkerSpecsEntry 
: isoTopologyWorkerSpecs.entrySet()){
+        for (Map.Entry<String, Set<Set<ExecutorDetails>>> topoWorkerSpecsEntry 
: isoTopologyWorkerSpecs.entrySet()) {
             Set<Set<ExecutorDetails>> workerSpecs = 
topoWorkerSpecsEntry.getValue();
             if (workerSpecs != null && !workerSpecs.isEmpty()) {
                 failedTopologyIds.add(topoWorkerSpecsEntry.getKey());
@@ -195,7 +190,7 @@ public class IsolationScheduler implements IScheduler {
     }
 
     private Map<String, List<AssignmentInfo>> hostAssignments(Cluster cluster) 
{
-        Collection<SchedulerAssignment> assignmentValues =  
cluster.getAssignments().values();
+        Collection<SchedulerAssignment> assignmentValues = 
cluster.getAssignments().values();
         Map<String, List<AssignmentInfo>> hostAssignments = new 
HashMap<String, List<AssignmentInfo>>();
 
         for (SchedulerAssignment sa : assignmentValues) {
@@ -237,7 +232,7 @@ public class IsolationScheduler implements IScheduler {
                 bucketExecutors.put(bucketIndex, executors);
             }
             executors.add(executor);
-            bucketIndex = (bucketIndex+1) % numWorkers;
+            bucketIndex = (bucketIndex + 1) % numWorkers;
         }
 
         return new HashSet<Set<ExecutorDetails>>(bucketExecutors.values());
@@ -325,11 +320,11 @@ public class IsolationScheduler implements IScheduler {
             sortHostAssignSlots.add(new HostAssignableSlots(entry.getKey(), 
entry.getValue()));
         }
         Collections.sort(sortHostAssignSlots, new 
Comparator<HostAssignableSlots>() {
-                    @Override
-                    public int compare(HostAssignableSlots o1, 
HostAssignableSlots o2) {
-                        return o2.getWorkerSlots().size() - 
o1.getWorkerSlots().size();
-                    }
-                });
+            @Override
+            public int compare(HostAssignableSlots o1, HostAssignableSlots o2) 
{
+                return o2.getWorkerSlots().size() - o1.getWorkerSlots().size();
+            }
+        });
         Collections.shuffle(sortHostAssignSlots);
 
         return new LinkedList<HostAssignableSlots>(sortHostAssignSlots);

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java
index c16a67f..347c95f 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignment.java
@@ -21,13 +21,12 @@ package org.apache.storm.scheduler;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
-
-import org.apache.storm.generated.Assignment;
 import org.apache.storm.generated.WorkerResources;
 
 public interface SchedulerAssignment {
     /**
      * Is this slot part of this assignment or not.
+     *
      * @param slot the slot to check.
      * @return true if the slot is occupied by this assignment else false.
      */
@@ -35,50 +34,57 @@ public interface SchedulerAssignment {
 
     /**
      * Is the executor assigned or not.
-     * 
+     *
      * @param executor the executor to check it if is assigned.
      * @return true if it is assigned else false
      */
     public boolean isExecutorAssigned(ExecutorDetails executor);
-    
+
     /**
      * Return the ID of the topology.
+     *
      * @return the topology-id this assignment is for.
      */
     public String getTopologyId();
 
     /**
      * Get the map of executor to WorkerSlot.
+     *
      * @return the executor -> slot map.
      */
     public Map<ExecutorDetails, WorkerSlot> getExecutorToSlot();
 
     /**
      * Get the set of all executors.
-     * @return  the executors covered by this assignments
+     *
+     * @return the executors covered by this assignments
      */
     public Set<ExecutorDetails> getExecutors();
 
     /**
      * Get the set of all slots that are a part of this.
+     *
      * @return the set of all slots.
      */
     public Set<WorkerSlot> getSlots();
 
     /**
      * Get the mapping of slot to executors on that slot.
+     *
      * @return the slot to the executors assigned to that slot.
      */
     public Map<WorkerSlot, Collection<ExecutorDetails>> getSlotToExecutors();
-    
+
     /**
      * Get the slot to resource mapping.
+     *
      * @return The slot to resource mapping
      */
     public Map<WorkerSlot, WorkerResources> getScheduledResources();
-    
+
     /**
      * Get the total shared off heap memory mapping.
+     *
      * @return host to total shared off heap memory mapping.
      */
     public Map<String, Double> getNodeIdToTotalSharedOffHeapMemory();

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
index f92c3d1..dcb2ebc 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/SchedulerAssignmentImpl.java
@@ -26,7 +26,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.storm.generated.WorkerResources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,14 +49,15 @@ public class SchedulerAssignmentImpl implements 
SchedulerAssignment {
 
     /**
      * Create a new assignment.
-     * @param topologyId the id of the topology the assignment is for.
-     * @param executorToSlot the executor to slot mapping for the assignment.  
Can be null and set through other methods later.
-     * @param resources the resources for the current assignments.  Can be 
null and set through other methods later.
+     *
+     * @param topologyId                 the id of the topology the assignment 
is for.
+     * @param executorToSlot             the executor to slot mapping for the 
assignment.  Can be null and set through other methods later.
+     * @param resources                  the resources for the current 
assignments.  Can be null and set through other methods later.
      * @param nodeIdToTotalSharedOffHeap the shared memory for this assignment 
can be null and set through other methods later.
      */
     public SchedulerAssignmentImpl(String topologyId, Map<ExecutorDetails, 
WorkerSlot> executorToSlot,
-            Map<WorkerSlot, WorkerResources> resources, Map<String, Double> 
nodeIdToTotalSharedOffHeap) {
-        this.topologyId = topologyId;       
+                                   Map<WorkerSlot, WorkerResources> resources, 
Map<String, Double> nodeIdToTotalSharedOffHeap) {
+        this.topologyId = topologyId;
         if (executorToSlot != null) {
             if (executorToSlot.entrySet().stream().anyMatch((entry) -> 
entry.getKey() == null || entry.getValue() == null)) {
                 throw new RuntimeException("Cannot create a scheduling with a 
null in it " + executorToSlot);
@@ -83,8 +83,8 @@ public class SchedulerAssignmentImpl implements 
SchedulerAssignment {
     }
 
     public SchedulerAssignmentImpl(SchedulerAssignment assignment) {
-        this(assignment.getTopologyId(), assignment.getExecutorToSlot(), 
-                assignment.getScheduledResources(), 
assignment.getNodeIdToTotalSharedOffHeapMemory());
+        this(assignment.getTopologyId(), assignment.getExecutorToSlot(),
+             assignment.getScheduledResources(), 
assignment.getNodeIdToTotalSharedOffHeapMemory());
     }
 
     @Override
@@ -94,6 +94,7 @@ public class SchedulerAssignmentImpl implements 
SchedulerAssignment {
 
     /**
      * Like the equals command, but ignores the resources.
+     *
      * @param other the object to check for equality against.
      * @return true if they are equal, ignoring resources, else false.
      */
@@ -105,11 +106,11 @@ public class SchedulerAssignmentImpl implements 
SchedulerAssignment {
             return false;
         }
         SchedulerAssignmentImpl o = (SchedulerAssignmentImpl) other;
-        
+
         return topologyId.equals(o.topologyId)
-            && executorToSlot.equals(o.executorToSlot);
+               && executorToSlot.equals(o.executorToSlot);
     }
-    
+
     @Override
     public int hashCode() {
         final int prime = 31;
@@ -118,7 +119,7 @@ public class SchedulerAssignmentImpl implements 
SchedulerAssignment {
         result = prime * result + ((executorToSlot == null) ? 0 : 
executorToSlot.hashCode());
         return result;
     }
-    
+
     @Override
     public boolean equals(Object other) {
         if (!equalsIgnoreResources(other)) {
@@ -127,13 +128,13 @@ public class SchedulerAssignmentImpl implements 
SchedulerAssignment {
         SchedulerAssignmentImpl o = (SchedulerAssignmentImpl) other;
 
         return resources.equals(o.resources)
-            && nodeIdToTotalSharedOffHeap.equals(o.nodeIdToTotalSharedOffHeap);
+               && 
nodeIdToTotalSharedOffHeap.equals(o.nodeIdToTotalSharedOffHeap);
     }
-    
+
     @Override
     public Set<WorkerSlot> getSlots() {
         return new HashSet<>(executorToSlot.values());
-    }    
+    }
 
     @Deprecated
     public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors) 
{
@@ -180,7 +181,7 @@ public class SchedulerAssignmentImpl implements 
SchedulerAssignment {
 
         String node = slot.getNodeId();
         boolean isFound = false;
-        for (WorkerSlot ws: executorToSlot.values()) {
+        for (WorkerSlot ws : executorToSlot.values()) {
             if (node.equals(ws.getNodeId())) {
                 isFound = true;
                 break;
@@ -243,7 +244,7 @@ public class SchedulerAssignmentImpl implements 
SchedulerAssignment {
     public void setTotalSharedOffHeapMemory(String node, double value) {
         nodeIdToTotalSharedOffHeap.put(node, value);
     }
-    
+
     @Override
     public Map<String, Double> getNodeIdToTotalSharedOffHeapMemory() {
         return nodeIdToTotalSharedOffHeap;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/SingleTopologyCluster.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/SingleTopologyCluster.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/SingleTopologyCluster.java
index ee8ef00..ff020a4 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/SingleTopologyCluster.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/SingleTopologyCluster.java
@@ -27,7 +27,7 @@ public class SingleTopologyCluster extends Cluster {
     /**
      * Create a new cluster that only allows modifications to a single 
topology.
      *
-     * @param other the current cluster to base this off of
+     * @param other      the current cluster to base this off of
      * @param topologyId the topology that is allowed to be modified.
      */
     public SingleTopologyCluster(Cluster other, String topologyId) {

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java 
b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
index 00b8ea0..4ac1057 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler;
@@ -22,7 +16,6 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import org.apache.storm.Constants;
 import 
org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,13 +38,13 @@ public class SupervisorDetails {
      */
     private final Object schedulerMeta;
     /**
-     * all the ports of the supervisor.
-     */
-    private Set<Integer> allPorts;
-    /**
      * Map containing a manifest of resources for the node the supervisor 
resides.
      */
     private final NormalizedResourceOffer totalResources;
+    /**
+     * all the ports of the supervisor.
+     */
+    private Set<Integer> allPorts;
 
     /**
      * Create the details of a new supervisor.
@@ -105,20 +98,11 @@ public class SupervisorDetails {
                              Collection<? extends Number> allPorts, 
Map<String, Double> totalResources) {
         this(id, serverPort, host, null, schedulerMeta, allPorts, 
totalResources);
     }
-    
+
     @Override
     public String toString() {
         return getClass().getSimpleName() + " ID: " + id + " HOST: " + host + 
" META: " + meta
-                + " SCHED_META: " + schedulerMeta + " PORTS: " + allPorts;
-    }
-
-    private void setAllPorts(Collection<? extends Number> allPorts) {
-        this.allPorts = new HashSet<>();
-        if (allPorts != null) {
-            for (Number n: allPorts) {
-                this.allPorts.add(n.intValue());
-            }
-        }
+               + " SCHED_META: " + schedulerMeta + " PORTS: " + allPorts;
     }
 
     public String getId() {
@@ -136,11 +120,20 @@ public class SupervisorDetails {
     public Object getMeta() {
         return meta;
     }
-    
+
     public Set<Integer> getAllPorts() {
         return allPorts;
     }
 
+    private void setAllPorts(Collection<? extends Number> allPorts) {
+        this.allPorts = new HashSet<>();
+        if (allPorts != null) {
+            for (Number n : allPorts) {
+                this.allPorts.add(n.intValue());
+            }
+        }
+    }
+
     public Object getSchedulerMeta() {
         return this.schedulerMeta;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorResources.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorResources.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorResources.java
index ac2a506..8ee686f 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorResources.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorResources.java
@@ -31,8 +31,8 @@ public class SupervisorResources {
      *
      * @param totalMem the total mem on the supervisor
      * @param totalCpu the total CPU on the supervisor
-     * @param usedMem the used mem on the supervisor
-     * @param usedCpu the used CPU on the supervisor
+     * @param usedMem  the used mem on the supervisor
+     * @param usedCpu  the used CPU on the supervisor
      */
     public SupervisorResources(double totalMem, double totalCpu, double 
usedMem, double usedCpu) {
         this.totalMem = totalMem;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/Topologies.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/Topologies.java 
b/storm-server/src/main/java/org/apache/storm/scheduler/Topologies.java
index 16d230a..b222916 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Topologies.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Topologies.java
@@ -30,19 +30,9 @@ public class Topologies implements Iterable<TopologyDetails> 
{
     Map<String, String> nameToId;
     Map<String, Map<String, Component>> allComponents;
 
-    private static Map<String, TopologyDetails> mkMap(TopologyDetails[] 
details) {
-        Map<String, TopologyDetails> ret = new HashMap<>();
-        for (TopologyDetails td : details) {
-            if (ret.put(td.getId(), td) != null) {
-                throw new IllegalArgumentException(
-                    "Cannot have multiple topologies with the id " + 
td.getId());
-            }
-        }
-        return ret;
-    }
-
     /**
      * Create a new Topologies from a list of TopologyDetails.
+     *
      * @param details the list of details to use.
      * @throws IllegalArgumentException if duplicate topology ids are found.
      */
@@ -52,6 +42,7 @@ public class Topologies implements Iterable<TopologyDetails> {
 
     /**
      * Create a new Topologies from a map of id to topology
+     *
      * @param topologies a map of topology id to topology details.
      */
     public Topologies(Map<String, TopologyDetails> topologies) {
@@ -74,12 +65,24 @@ public class Topologies implements 
Iterable<TopologyDetails> {
         this(src.topologies);
     }
 
+    private static Map<String, TopologyDetails> mkMap(TopologyDetails[] 
details) {
+        Map<String, TopologyDetails> ret = new HashMap<>();
+        for (TopologyDetails td : details) {
+            if (ret.put(td.getId(), td) != null) {
+                throw new IllegalArgumentException(
+                    "Cannot have multiple topologies with the id " + 
td.getId());
+            }
+        }
+        return ret;
+    }
+
     public Collection<String> getAllIds() {
         return topologies.keySet();
     }
 
     /**
      * Get a topology given an ID
+     *
      * @param topologyId the id of the topology to get
      * @return the topology or null if it is not found.
      */
@@ -88,8 +91,8 @@ public class Topologies implements Iterable<TopologyDetails> {
     }
 
     /**
-     * Get a topology given a topology name. Nimbus prevents multiple 
topologies
-     * from having the same name, so this assumes it is true.
+     * Get a topology given a topology name. Nimbus prevents multiple 
topologies from having the same name, so this assumes it is true.
+     *
      * @param topologyName the name of the topology to look for
      * @return the a topology with the given name.
      */
@@ -109,6 +112,7 @@ public class Topologies implements 
Iterable<TopologyDetails> {
 
     /**
      * Get all topologies submitted/owned by a given user.
+     *
      * @param user the name of the user
      * @return all of the topologies submitted by this user.
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java 
b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
index 41e7edf..c43f94a 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
@@ -43,31 +43,30 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TopologyDetails {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TopologyDetails.class);
     private final String topologyId;
     private final Map<String, Object> topologyConf;
     private final StormTopology topology;
     private final Map<ExecutorDetails, String> executorToComponent;
     private final int numWorkers;
+    //when topology was launched
+    private final int launchTime;
+    private final String owner;
+    private final String topoName;
     //<ExecutorDetails - Task, Map<String - Type of resource, Map<String - 
type of that resource, Double - amount>>>
     private Map<ExecutorDetails, NormalizedResourceRequest> resourceList;
     //Max heap size for a worker used by topology
     private Double topologyWorkerMaxHeapSize;
     //topology priority
     private Integer topologyPriority;
-    //when topology was launched
-    private final int launchTime;
-    private final String owner;
-    private final String topoName;
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(TopologyDetails.class);
 
     public TopologyDetails(String topologyId, Map<String, Object> 
topologyConf, StormTopology topology, int numWorkers, String owner) {
-        this(topologyId, topologyConf, topology,  numWorkers,  null, 0, owner);
+        this(topologyId, topologyConf, topology, numWorkers, null, 0, owner);
     }
 
     public TopologyDetails(String topologyId, Map<String, Object> 
topologyConf, StormTopology topology,
                            int numWorkers, Map<ExecutorDetails, String> 
executorToComponents, String owner) {
-        this(topologyId, topologyConf, topology,  numWorkers,  
executorToComponents, 0, owner);
+        this(topologyId, topologyConf, topology, numWorkers, 
executorToComponents, 0, owner);
     }
 
     public TopologyDetails(String topologyId, Map<String, Object> 
topologyConf, StormTopology topology, int numWorkers,
@@ -168,8 +167,8 @@ public class TopologyDetails {
                     getExecutorToComponent().get(exec),
                     exec,
                     
topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
-                        resourceList.get(exec)
-                    );
+                    resourceList.get(exec)
+                );
                 addDefaultResforExec(exec);
             }
         }
@@ -194,9 +193,8 @@ public class TopologyDetails {
     }
 
     /**
-     * Returns a representation of the non-system components of the topology 
graph Each Component
-     * object in the returning map is populated with the list of its parents, 
children and execs
-     * assigned to that component.
+     * Returns a representation of the non-system components of the topology 
graph Each Component object in the returning map is populated
+     * with the list of its parents, children and execs assigned to that 
component.
      *
      * @return a map of components
      */
@@ -248,6 +246,7 @@ public class TopologyDetails {
 
     /**
      * Gets the on heap memory requirement for a certain task within a 
topology.
+     *
      * @param exec the executor the inquiry is concerning.
      * @return Double the amount of on heap memory requirement for this exec 
in topology topoId.
      */
@@ -255,13 +254,15 @@ public class TopologyDetails {
         Double ret = null;
         if (hasExecInTopo(exec)) {
             ret = resourceList
-                    .get(exec).getOnHeapMemoryMb();;
+                .get(exec).getOnHeapMemoryMb();
+            ;
         }
         return ret;
     }
 
     /**
      * Gets the off heap memory requirement for a certain task within a 
topology.
+     *
      * @param exec the executor the inquiry is concerning.
      * @return Double the amount of off heap memory requirement for this exec 
in topology topoId.
      */
@@ -269,32 +270,33 @@ public class TopologyDetails {
         Double ret = null;
         if (hasExecInTopo(exec)) {
             ret = resourceList
-                    .get(exec).getOffHeapMemoryMb();
+                .get(exec).getOffHeapMemoryMb();
         }
         return ret;
     }
 
     /**
      * Gets the total memory requirement for a task.
+     *
      * @param exec the executor the inquiry is concerning.
      * @return Double the total memory requirement for this exec in topology 
topoId.
      */
     public Double getTotalMemReqTask(ExecutorDetails exec) {
         if (hasExecInTopo(exec)) {
             return getOffHeapMemoryRequirement(exec)
-                    + getOnHeapMemoryRequirement(exec);
+                   + getOnHeapMemoryRequirement(exec);
         }
         return null;
     }
 
     /**
      * Gets the total memory resource list for a set of tasks that is part of 
a topology.
+     *
      * @param executors all executors for a topology
-     * @return Map<ExecutorDetails, Double> ,
-     * a map of the total memory requirement for all tasks in topology topoId.
+     * @return Map<ExecutorDetails   ,       Double> , a map of the total 
memory requirement for all tasks in topology topoId.
      */
     public Set<SharedMemory> getSharedMemoryRequests(
-            Collection<ExecutorDetails> executors
+        Collection<ExecutorDetails> executors
     ) {
         Set<String> components = new HashSet<>();
         for (ExecutorDetails exec : executors) {
@@ -324,6 +326,7 @@ public class TopologyDetails {
 
     /**
      * Get the total resource requirement for an executor.
+     *
      * @param exec the executor to get the resources for.
      * @return Double the total about of cpu requirement for executor
      */
@@ -336,20 +339,20 @@ public class TopologyDetails {
 
     /**
      * Get the total CPU requirement for executor.
+     *
      * @param exec
-     * @return Map<String, Double> generic resource mapping requirement for 
the executor
+     * @return Map<String   ,       Double> generic resource mapping 
requirement for the executor
      */
     public Double getTotalCpuReqTask(ExecutorDetails exec) {
         if (hasExecInTopo(exec)) {
             return resourceList
-                    .get(exec).getTotalCpu();
+                .get(exec).getTotalCpu();
         }
         return null;
     }
 
     /**
-     * Note: The public API relevant to resource aware scheduling is unstable 
as of May 2015.
-     *       We reserve the right to change them.
+     * Note: The public API relevant to resource aware scheduling is unstable 
as of May 2015. We reserve the right to change them.
      *
      * @return the total on-heap memory requested for this topology
      */
@@ -379,8 +382,7 @@ public class TopologyDetails {
     }
 
     /**
-     * Note: The public API relevant to resource aware scheduling is unstable 
as of May 2015.
-     *       We reserve the right to change them.
+     * Note: The public API relevant to resource aware scheduling is unstable 
as of May 2015. We reserve the right to change them.
      *
      * @return the total off-heap memory requested for this topology
      */
@@ -410,8 +412,7 @@ public class TopologyDetails {
     }
 
     /**
-     * Note: The public API relevant to resource aware scheduling is unstable 
as of May 2015.
-     *       We reserve the right to change them.
+     * Note: The public API relevant to resource aware scheduling is unstable 
as of May 2015. We reserve the right to change them.
      *
      * @return the total cpu requested for this topology
      */
@@ -428,6 +429,7 @@ public class TopologyDetails {
 
     /**
      * get the resources requirements for a executor.
+     *
      * @param exec
      * @return a map containing the resource requirements for this exec
      */
@@ -440,6 +442,7 @@ public class TopologyDetails {
 
     /**
      * Checks if a executor is part of this topology.
+     *
      * @return Boolean whether or not a certain ExecutorDetail is included in 
the resourceList.
      */
     public boolean hasExecInTopo(ExecutorDetails exec) {
@@ -472,28 +475,30 @@ public class TopologyDetails {
         if (component.equals(Acker.ACKER_COMPONENT_ID)) {
             if 
(topologyConf.containsKey(Config.TOPOLOGY_ACKER_RESOURCES_ONHEAP_MEMORY_MB)) {
                 
resourceListForExec.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB,
-                        
ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_ACKER_RESOURCES_ONHEAP_MEMORY_MB)));
+                                        
ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_ACKER_RESOURCES_ONHEAP_MEMORY_MB)));
             }
             if 
(topologyConf.containsKey(Config.TOPOLOGY_ACKER_RESOURCES_OFFHEAP_MEMORY_MB)) {
                 
resourceListForExec.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB,
-                        
ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_ACKER_RESOURCES_OFFHEAP_MEMORY_MB)));
+                                        
ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_ACKER_RESOURCES_OFFHEAP_MEMORY_MB)));
             }
             if 
(topologyConf.containsKey(Config.TOPOLOGY_ACKER_CPU_PCORE_PERCENT)) {
                 
resourceListForExec.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT,
-                        
ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_ACKER_CPU_PCORE_PERCENT)));
+                                        
ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_ACKER_CPU_PCORE_PERCENT)));
             }
         } else if 
(component.startsWith(Constants.METRICS_COMPONENT_ID_PREFIX)) {
             if 
(topologyConf.containsKey(Config.TOPOLOGY_METRICS_CONSUMER_RESOURCES_ONHEAP_MEMORY_MB))
 {
                 
resourceListForExec.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB,
-                        
ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_METRICS_CONSUMER_RESOURCES_ONHEAP_MEMORY_MB)));
+                                        ObjectReader
+                                            
.getDouble(topologyConf.get(Config.TOPOLOGY_METRICS_CONSUMER_RESOURCES_ONHEAP_MEMORY_MB)));
             }
             if 
(topologyConf.containsKey(Config.TOPOLOGY_METRICS_CONSUMER_RESOURCES_OFFHEAP_MEMORY_MB))
 {
                 
resourceListForExec.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB,
-                        
ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_METRICS_CONSUMER_RESOURCES_OFFHEAP_MEMORY_MB)));
+                                        ObjectReader
+                                            
.getDouble(topologyConf.get(Config.TOPOLOGY_METRICS_CONSUMER_RESOURCES_OFFHEAP_MEMORY_MB)));
             }
             if 
(topologyConf.containsKey(Config.TOPOLOGY_METRICS_CONSUMER_CPU_PCORE_PERCENT)) {
                 
resourceListForExec.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT,
-                        
ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_METRICS_CONSUMER_CPU_PCORE_PERCENT)));
+                                        
ObjectReader.getDouble(topologyConf.get(Config.TOPOLOGY_METRICS_CONSUMER_CPU_PCORE_PERCENT)));
             }
         }
     }
@@ -514,6 +519,7 @@ public class TopologyDetails {
 
     /**
      * Get the max heap size for a worker used by this topology.
+     *
      * @return the worker max heap size
      */
     public Double getTopologyWorkerMaxHeapSize() {
@@ -551,17 +557,17 @@ public class TopologyDetails {
     @Override
     public String toString() {
         return "Name: "
-            + getName()
-            + " id: "
-            + getId()
-            + " Priority: "
-            + getTopologyPriority()
-            + " Uptime: "
-            + getUpTime()
-            + " CPU: "
-            + getTotalRequestedCpu()
-            + " Memory: "
-            + (getTotalRequestedMemOffHeap() + getTotalRequestedMemOnHeap());
+               + getName()
+               + " id: "
+               + getId()
+               + " Priority: "
+               + getTopologyPriority()
+               + " Uptime: "
+               + getUpTime()
+               + " CPU: "
+               + getTotalRequestedCpu()
+               + " Memory: "
+               + (getTotalRequestedMemOffHeap() + 
getTotalRequestedMemOnHeap());
     }
 
     @Override

Reply via email to