http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/LoggingMetricsConsumer.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/metric/LoggingMetricsConsumer.java 
b/jstorm-core/src/main/java/backtype/storm/metric/LoggingMetricsConsumer.java
index c1c7c0a..840a45d 100755
--- 
a/jstorm-core/src/main/java/backtype/storm/metric/LoggingMetricsConsumer.java
+++ 
b/jstorm-core/src/main/java/backtype/storm/metric/LoggingMetricsConsumer.java
@@ -45,28 +45,26 @@ public class LoggingMetricsConsumer implements 
IMetricsConsumer {
     public static final Logger LOG = 
LoggerFactory.getLogger(LoggingMetricsConsumer.class);
 
     @Override
-    public void prepare(Map stormConf, Object registrationArgument, 
TopologyContext context, IErrorReporter errorReporter) { }
+    public void prepare(Map stormConf, Object registrationArgument, 
TopologyContext context, IErrorReporter errorReporter) {
+    }
 
     static private String padding = "                       ";
 
     @Override
     public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> 
dataPoints) {
         StringBuilder sb = new StringBuilder();
-        String header = String.format("%d\t%15s:%-4d\t%3d:%-11s\t",
-            taskInfo.timestamp,
-            taskInfo.srcWorkerHost, taskInfo.srcWorkerPort,
-            taskInfo.srcTaskId,
-            taskInfo.srcComponentId);
+        String header =
+                String.format("%d\t%15s:%-4d\t%3d:%-11s\t", 
taskInfo.timestamp, taskInfo.srcWorkerHost, taskInfo.srcWorkerPort, 
taskInfo.srcTaskId,
+                        taskInfo.srcComponentId);
         sb.append(header);
         for (DataPoint p : dataPoints) {
             sb.delete(header.length(), sb.length());
-            sb.append(p.name)
-                
.append(padding).delete(header.length()+23,sb.length()).append("\t")
-                .append(p.value);
+            sb.append(p.name).append(padding).delete(header.length() + 23, 
sb.length()).append("\t").append(p.value);
             LOG.info(sb.toString());
         }
     }
 
     @Override
-    public void cleanup() { }
+    public void cleanup() {
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/MetricsConsumerBolt.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/metric/MetricsConsumerBolt.java 
b/jstorm-core/src/main/java/backtype/storm/metric/MetricsConsumerBolt.java
index d8eb3bf..afbc7da 100755
--- a/jstorm-core/src/main/java/backtype/storm/metric/MetricsConsumerBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/metric/MetricsConsumerBolt.java
@@ -41,18 +41,18 @@ public class MetricsConsumerBolt implements IBolt {
     @Override
     public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
         try {
-            _metricsConsumer = 
(IMetricsConsumer)Class.forName(_consumerClassName).newInstance();
+            _metricsConsumer = (IMetricsConsumer) 
Class.forName(_consumerClassName).newInstance();
         } catch (Exception e) {
-            throw new RuntimeException("Could not instantiate a class listed 
in config under section " +
-                Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully 
qualified name " + _consumerClassName, e);
+            throw new RuntimeException("Could not instantiate a class listed 
in config under section " + Config.TOPOLOGY_METRICS_CONSUMER_REGISTER
+                    + " with fully qualified name " + _consumerClassName, e);
         }
-        _metricsConsumer.prepare(stormConf, _registrationArgument, context, 
(IErrorReporter)collector);
+        _metricsConsumer.prepare(stormConf, _registrationArgument, context, 
(IErrorReporter) collector);
         _collector = collector;
     }
-    
+
     @Override
     public void execute(Tuple input) {
-        
_metricsConsumer.handleDataPoints((IMetricsConsumer.TaskInfo)input.getValue(0), 
(Collection)input.getValue(1));
+        _metricsConsumer.handleDataPoints((IMetricsConsumer.TaskInfo) 
input.getValue(0), (Collection) input.getValue(1));
         _collector.ack(input);
     }
 
@@ -60,5 +60,5 @@ public class MetricsConsumerBolt implements IBolt {
     public void cleanup() {
         _metricsConsumer.cleanup();
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/SystemBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/metric/SystemBolt.java 
b/jstorm-core/src/main/java/backtype/storm/metric/SystemBolt.java
index 492bc2d..43551ad 100755
--- a/jstorm-core/src/main/java/backtype/storm/metric/SystemBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/metric/SystemBolt.java
@@ -35,7 +35,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-
 // There is one task inside one executor for each worker of the topology.
 // TaskID is always -1, therefore you can only send-unanchored tuples to 
co-located SystemBolt.
 // This bolt was conceived to export worker stats via metrics api.
@@ -45,12 +44,14 @@ public class SystemBolt implements IBolt {
 
     private static class MemoryUsageMetric implements IMetric {
         IFn _getUsage;
+
         public MemoryUsageMetric(IFn getUsage) {
             _getUsage = getUsage;
         }
+
         @Override
         public Object getValueAndReset() {
-            MemoryUsage memUsage = (MemoryUsage)_getUsage.invoke();
+            MemoryUsage memUsage = (MemoryUsage) _getUsage.invoke();
             HashMap m = new HashMap();
             m.put("maxBytes", memUsage.getMax());
             m.put("committedBytes", memUsage.getCommitted());
@@ -68,16 +69,18 @@ public class SystemBolt implements IBolt {
         GarbageCollectorMXBean _gcBean;
         Long _collectionCount;
         Long _collectionTime;
+
         public GarbageCollectorMetric(GarbageCollectorMXBean gcBean) {
             _gcBean = gcBean;
         }
+
         @Override
         public Object getValueAndReset() {
             Long collectionCountP = _gcBean.getCollectionCount();
             Long collectionTimeP = _gcBean.getCollectionTime();
 
             Map ret = null;
-            if(_collectionCount!=null && _collectionTime!=null) {
+            if (_collectionCount != null && _collectionTime != null) {
                 ret = new HashMap();
                 ret.put("count", collectionCountP - _collectionCount);
                 ret.put("timeMs", collectionTimeP - _collectionTime);
@@ -91,7 +94,7 @@ public class SystemBolt implements IBolt {
 
     @Override
     public void prepare(final Map stormConf, TopologyContext context, 
OutputCollector collector) {
-        if(_prepareWasCalled && 
!"local".equals(stormConf.get(Config.STORM_CLUSTER_MODE))) {
+        if (_prepareWasCalled && 
!"local".equals(stormConf.get(Config.STORM_CLUSTER_MODE))) {
             throw new RuntimeException("A single worker should have 1 
SystemBolt instance.");
         }
         _prepareWasCalled = true;
@@ -103,14 +106,14 @@ public class SystemBolt implements IBolt {
         context.registerMetric("uptimeSecs", new IMetric() {
             @Override
             public Object getValueAndReset() {
-                return jvmRT.getUptime()/1000.0;
+                return jvmRT.getUptime() / 1000.0;
             }
         }, bucketSize);
 
         context.registerMetric("startTimeSecs", new IMetric() {
             @Override
             public Object getValueAndReset() {
-                return jvmRT.getStartTime()/1000.0;
+                return jvmRT.getStartTime() / 1000.0;
             }
         }, bucketSize);
 
@@ -122,7 +125,8 @@ public class SystemBolt implements IBolt {
                 if (doEvent) {
                     doEvent = false;
                     return 1;
-                } else return 0;
+                } else
+                    return 0;
             }
         }, bucketSize);
 
@@ -139,7 +143,7 @@ public class SystemBolt implements IBolt {
             }
         }), bucketSize);
 
-        for(GarbageCollectorMXBean b : 
ManagementFactory.getGarbageCollectorMXBeans()) {
+        for (GarbageCollectorMXBean b : 
ManagementFactory.getGarbageCollectorMXBeans()) {
             context.registerMetric("GC/" + b.getName().replaceAll("\\W", ""), 
new GarbageCollectorMetric(b), bucketSize);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/api/CombinedMetric.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/metric/api/CombinedMetric.java 
b/jstorm-core/src/main/java/backtype/storm/metric/api/CombinedMetric.java
index 5764a25..a840851 100755
--- a/jstorm-core/src/main/java/backtype/storm/metric/api/CombinedMetric.java
+++ b/jstorm-core/src/main/java/backtype/storm/metric/api/CombinedMetric.java
@@ -25,7 +25,7 @@ public class CombinedMetric implements IMetric {
         _combiner = combiner;
         _value = _combiner.identity();
     }
-    
+
     public void update(Object value) {
         _value = _combiner.combine(_value, value);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/api/CountMetric.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/metric/api/CountMetric.java 
b/jstorm-core/src/main/java/backtype/storm/metric/api/CountMetric.java
index dd048b8..82dc529 100755
--- a/jstorm-core/src/main/java/backtype/storm/metric/api/CountMetric.java
+++ b/jstorm-core/src/main/java/backtype/storm/metric/api/CountMetric.java
@@ -24,7 +24,7 @@ public class CountMetric implements IMetric {
 
     public CountMetric() {
     }
-    
+
     public void incr() {
         _value++;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/api/ICombiner.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/metric/api/ICombiner.java 
b/jstorm-core/src/main/java/backtype/storm/metric/api/ICombiner.java
index 04b3156..4dcb0dd 100755
--- a/jstorm-core/src/main/java/backtype/storm/metric/api/ICombiner.java
+++ b/jstorm-core/src/main/java/backtype/storm/metric/api/ICombiner.java
@@ -19,5 +19,6 @@ package backtype.storm.metric.api;
 
 public interface ICombiner<T> {
     public T identity();
+
     public T combine(T a, T b);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/api/IMetricsConsumer.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/metric/api/IMetricsConsumer.java 
b/jstorm-core/src/main/java/backtype/storm/metric/api/IMetricsConsumer.java
index 14f1bf6..840bc6b 100755
--- a/jstorm-core/src/main/java/backtype/storm/metric/api/IMetricsConsumer.java
+++ b/jstorm-core/src/main/java/backtype/storm/metric/api/IMetricsConsumer.java
@@ -24,37 +24,47 @@ import java.util.Map;
 
 public interface IMetricsConsumer {
     public static class TaskInfo {
-        public TaskInfo() {}
+        public TaskInfo() {
+        }
+
         public TaskInfo(String srcWorkerHost, int srcWorkerPort, String 
srcComponentId, int srcTaskId, long timestamp, int updateIntervalSecs) {
             this.srcWorkerHost = srcWorkerHost;
             this.srcWorkerPort = srcWorkerPort;
-            this.srcComponentId = srcComponentId; 
-            this.srcTaskId = srcTaskId; 
+            this.srcComponentId = srcComponentId;
+            this.srcTaskId = srcTaskId;
             this.timestamp = timestamp;
-            this.updateIntervalSecs = updateIntervalSecs; 
+            this.updateIntervalSecs = updateIntervalSecs;
         }
+
         public String srcWorkerHost;
         public int srcWorkerPort;
-        public String srcComponentId; 
-        public int srcTaskId; 
+        public String srcComponentId;
+        public int srcTaskId;
         public long timestamp;
-        public int updateIntervalSecs; 
+        public int updateIntervalSecs;
     }
+
     public static class DataPoint {
-        public DataPoint() {}
+        public DataPoint() {
+        }
+
         public DataPoint(String name, Object value) {
             this.name = name;
             this.value = value;
         }
+
         @Override
         public String toString() {
             return "[" + name + " = " + value + "]";
         }
-        public String name; 
+
+        public String name;
         public Object value;
     }
 
     void prepare(Map stormConf, Object registrationArgument, TopologyContext 
context, IErrorReporter errorReporter);
+
     void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints);
+
     void cleanup();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/api/IReducer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/metric/api/IReducer.java 
b/jstorm-core/src/main/java/backtype/storm/metric/api/IReducer.java
index a58df3b..403fe89 100755
--- a/jstorm-core/src/main/java/backtype/storm/metric/api/IReducer.java
+++ b/jstorm-core/src/main/java/backtype/storm/metric/api/IReducer.java
@@ -19,6 +19,8 @@ package backtype.storm.metric.api;
 
 public interface IReducer<T> {
     T init();
+
     T reduce(T accumulator, Object input);
+
     Object extractResult(T accumulator);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/api/MeanReducer.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/metric/api/MeanReducer.java 
b/jstorm-core/src/main/java/backtype/storm/metric/api/MeanReducer.java
index e25e26d..4138cab 100755
--- a/jstorm-core/src/main/java/backtype/storm/metric/api/MeanReducer.java
+++ b/jstorm-core/src/main/java/backtype/storm/metric/api/MeanReducer.java
@@ -31,23 +31,22 @@ public class MeanReducer implements 
IReducer<MeanReducerState> {
 
     public MeanReducerState reduce(MeanReducerState acc, Object input) {
         acc.count++;
-        if(input instanceof Double) {
-            acc.sum += (Double)input;
-        } else if(input instanceof Long) {
-            acc.sum += ((Long)input).doubleValue();
-        } else if(input instanceof Integer) {
-            acc.sum += ((Integer)input).doubleValue();
+        if (input instanceof Double) {
+            acc.sum += (Double) input;
+        } else if (input instanceof Long) {
+            acc.sum += ((Long) input).doubleValue();
+        } else if (input instanceof Integer) {
+            acc.sum += ((Integer) input).doubleValue();
         } else {
-            throw new RuntimeException(
-                "MeanReducer::reduce called with unsupported input type `" + 
input.getClass()
-                + "`. Supported types are Double, Long, Integer.");
+            throw new RuntimeException("MeanReducer::reduce called with 
unsupported input type `" + input.getClass()
+                    + "`. Supported types are Double, Long, Integer.");
         }
         return acc;
     }
 
     public Object extractResult(MeanReducerState acc) {
-        if(acc.count > 0) {
-            return new Double(acc.sum / (double)acc.count);
+        if (acc.count > 0) {
+            return new Double(acc.sum / (double) acc.count);
         } else {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/api/MultiCountMetric.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/metric/api/MultiCountMetric.java 
b/jstorm-core/src/main/java/backtype/storm/metric/api/MultiCountMetric.java
index c420a16..eae7de3 100755
--- a/jstorm-core/src/main/java/backtype/storm/metric/api/MultiCountMetric.java
+++ b/jstorm-core/src/main/java/backtype/storm/metric/api/MultiCountMetric.java
@@ -26,10 +26,10 @@ public class MultiCountMetric implements IMetric {
 
     public MultiCountMetric() {
     }
-    
+
     public CountMetric scope(String key) {
         CountMetric val = _value.get(key);
-        if(val == null) {
+        if (val == null) {
             _value.put(key, val = new CountMetric());
         }
         return val;
@@ -37,7 +37,7 @@ public class MultiCountMetric implements IMetric {
 
     public Object getValueAndReset() {
         Map ret = new HashMap();
-        for(Map.Entry<String, CountMetric> e : _value.entrySet()) {
+        for (Map.Entry<String, CountMetric> e : _value.entrySet()) {
             ret.put(e.getKey(), e.getValue().getValueAndReset());
         }
         return ret;

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/api/MultiReducedMetric.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/metric/api/MultiReducedMetric.java 
b/jstorm-core/src/main/java/backtype/storm/metric/api/MultiReducedMetric.java
index 530b168..09b26a7 100755
--- 
a/jstorm-core/src/main/java/backtype/storm/metric/api/MultiReducedMetric.java
+++ 
b/jstorm-core/src/main/java/backtype/storm/metric/api/MultiReducedMetric.java
@@ -28,10 +28,10 @@ public class MultiReducedMetric implements IMetric {
     public MultiReducedMetric(IReducer reducer) {
         _reducer = reducer;
     }
-    
+
     public ReducedMetric scope(String key) {
         ReducedMetric val = _value.get(key);
-        if(val == null) {
+        if (val == null) {
             _value.put(key, val = new ReducedMetric(_reducer));
         }
         return val;
@@ -39,9 +39,9 @@ public class MultiReducedMetric implements IMetric {
 
     public Object getValueAndReset() {
         Map ret = new HashMap();
-        for(Map.Entry<String, ReducedMetric> e : _value.entrySet()) {
+        for (Map.Entry<String, ReducedMetric> e : _value.entrySet()) {
             Object val = e.getValue().getValueAndReset();
-            if(val != null) {
+            if (val != null) {
                 ret.put(e.getKey(), val);
             }
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/api/rpc/CountShellMetric.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/metric/api/rpc/CountShellMetric.java 
b/jstorm-core/src/main/java/backtype/storm/metric/api/rpc/CountShellMetric.java
index def74c2..68d6a5c 100755
--- 
a/jstorm-core/src/main/java/backtype/storm/metric/api/rpc/CountShellMetric.java
+++ 
b/jstorm-core/src/main/java/backtype/storm/metric/api/rpc/CountShellMetric.java
@@ -21,16 +21,13 @@ import backtype.storm.metric.api.CountMetric;
 
 public class CountShellMetric extends CountMetric implements IShellMetric {
     /***
-     * @param
-     *  params should be null or long
-     *  if value is null, it will call incr()
-     *  if value is long, it will call incrBy((long)params)
+     * @param params should be null or long if value is null, it will call 
incr() if value is long, it will call incrBy((long)params)
      * */
     public void updateMetricFromRPC(Object value) {
         if (value == null) {
             incr();
         } else if (value instanceof Long) {
-            incrBy((Long)value);
+            incrBy((Long) value);
         } else {
             throw new RuntimeException("CountShellMetric updateMetricFromRPC 
params should be null or Long");
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/api/rpc/IShellMetric.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/metric/api/rpc/IShellMetric.java 
b/jstorm-core/src/main/java/backtype/storm/metric/api/rpc/IShellMetric.java
index d53baea..4b2d97a 100755
--- a/jstorm-core/src/main/java/backtype/storm/metric/api/rpc/IShellMetric.java
+++ b/jstorm-core/src/main/java/backtype/storm/metric/api/rpc/IShellMetric.java
@@ -21,11 +21,9 @@ import backtype.storm.metric.api.IMetric;
 
 public interface IShellMetric extends IMetric {
     /***
-     * @function
-     *     This interface is used by ShellBolt and ShellSpout through RPC call 
to update Metric 
-     * @param
-     *     value used to update metric, its's meaning change according 
implementation
-     *     Object can be any json support types: String, Long, Double, 
Boolean, Null, List, Map
+     * @function This interface is used by ShellBolt and ShellSpout through 
RPC call to update Metric
+     * @param value used to update metric, its's meaning change according 
implementation Object can be any json support types: String, Long, Double, 
Boolean,
+     *            Null, List, Map
      * */
     public void updateMetricFromRPC(Object value);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/multilang/BoltMsg.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/multilang/BoltMsg.java 
b/jstorm-core/src/main/java/backtype/storm/multilang/BoltMsg.java
index 446bdc4..d1eee4e 100755
--- a/jstorm-core/src/main/java/backtype/storm/multilang/BoltMsg.java
+++ b/jstorm-core/src/main/java/backtype/storm/multilang/BoltMsg.java
@@ -20,15 +20,12 @@ package backtype.storm.multilang;
 import java.util.List;
 
 /**
- * BoltMsg is an object that represents the data sent from a shell component to
- * a bolt process that implements a multi-language protocol. It is the union of
- * all data types that a bolt can receive from Storm.
- *
+ * BoltMsg is an object that represents the data sent from a shell component 
to a bolt process that implements a multi-language protocol. It is the union of 
all
+ * data types that a bolt can receive from Storm.
+ * 
  * <p>
- * BoltMsgs are objects sent to the ISerializer interface, for serialization
- * according to the wire protocol implemented by the serializer. The BoltMsg
- * class allows for a decoupling between the serialized representation of the
- * data and the data itself.
+ * BoltMsgs are objects sent to the ISerializer interface, for serialization 
according to the wire protocol implemented by the serializer. The BoltMsg class
+ * allows for a decoupling between the serialized representation of the data 
and the data itself.
  * </p>
  */
 public class BoltMsg {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/multilang/ISerializer.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/multilang/ISerializer.java 
b/jstorm-core/src/main/java/backtype/storm/multilang/ISerializer.java
index c9c7ad4..6729d89 100755
--- a/jstorm-core/src/main/java/backtype/storm/multilang/ISerializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/multilang/ISerializer.java
@@ -27,55 +27,52 @@ import java.util.Map;
 import backtype.storm.task.TopologyContext;
 
 /**
- * The ISerializer interface describes the methods that an object should
- * implement to provide serialization and de-serialization capabilities to
- * non-JVM language components.
+ * The ISerializer interface describes the methods that an object should 
implement to provide serialization and de-serialization capabilities to non-JVM
+ * language components.
  */
 public interface ISerializer extends Serializable {
 
     /**
      * This method sets the input and output streams of the serializer
-     *
+     * 
      * @param processIn output stream to non-JVM component
      * @param processOut input stream from non-JVM component
      */
     void initialize(OutputStream processIn, InputStream processOut);
 
     /**
-     * This method transmits the Storm config to the non-JVM process and
-     * receives its pid.
-     *
+     * This method transmits the Storm config to the non-JVM process and 
receives its pid.
+     * 
      * @param conf storm configuration
      * @param context topology context
      * @return process pid
      */
-    Number connect(Map conf, TopologyContext context) throws IOException,
-            NoOutputException;
+    Number connect(Map conf, TopologyContext context) throws IOException, 
NoOutputException;
 
     /**
      * This method receives a shell message from the non-JVM process
-     *
+     * 
      * @return shell message
      */
     ShellMsg readShellMsg() throws IOException, NoOutputException;
 
     /**
      * This method sends a bolt message to a non-JVM bolt process
-     *
+     * 
      * @param msg bolt message
      */
     void writeBoltMsg(BoltMsg msg) throws IOException;
 
     /**
      * This method sends a spout message to a non-JVM spout process
-     *
+     * 
      * @param msg spout message
      */
     void writeSpoutMsg(SpoutMsg msg) throws IOException;
 
     /**
      * This method sends a list of task IDs to a non-JVM bolt process
-     *
+     * 
      * @param taskIds list of task IDs
      */
     void writeTaskIds(List<Integer> taskIds) throws IOException;

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/multilang/JsonSerializer.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/multilang/JsonSerializer.java 
b/jstorm-core/src/main/java/backtype/storm/multilang/JsonSerializer.java
index 0e2e156..bce9a1a 100755
--- a/jstorm-core/src/main/java/backtype/storm/multilang/JsonSerializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/multilang/JsonSerializer.java
@@ -52,8 +52,7 @@ public class JsonSerializer implements ISerializer {
         }
     }
 
-    public Number connect(Map conf, TopologyContext context)
-            throws IOException, NoOutputException {
+    public Number connect(Map conf, TopologyContext context) throws 
IOException, NoOutputException {
         JSONObject setupInfo = new JSONObject();
         setupInfo.put("pidDir", context.getPIDDir());
         setupInfo.put("conf", conf);
@@ -140,22 +139,22 @@ public class JsonSerializer implements ISerializer {
                 shellMsg.addAnchor((String) o);
             }
         }
-       
-        Object nameObj = msg.get("name"); 
+
+        Object nameObj = msg.get("name");
         String metricName = null;
         if (nameObj != null && nameObj instanceof String) {
             metricName = (String) nameObj;
         }
         shellMsg.setMetricName(metricName);
-        
+
         Object paramsObj = msg.get("params");
         shellMsg.setMetricParams(paramsObj);
 
         if (command.equals("log")) {
             Object logLevelObj = msg.get("level");
             if (logLevelObj != null && logLevelObj instanceof Long) {
-                long logLevel = (Long)logLevelObj;
-                shellMsg.setLogLevel((int)logLevel);
+                long logLevel = (Long) logLevelObj;
+                shellMsg.setLogLevel((int) logLevel);
             }
         }
 
@@ -183,8 +182,7 @@ public class JsonSerializer implements ISerializer {
                 if (line.length() == 0) {
                     errorMessage.append(" No output read.\n");
                 } else {
-                    errorMessage.append(" Currently read output: "
-                            + line.toString() + "\n");
+                    errorMessage.append(" Currently read output: " + 
line.toString() + "\n");
                 }
                 errorMessage.append("Serializer Exception:\n");
                 throw new NoOutputException(errorMessage.toString());

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/multilang/NoOutputException.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/multilang/NoOutputException.java 
b/jstorm-core/src/main/java/backtype/storm/multilang/NoOutputException.java
index 1ce75d3..58b0a6e 100755
--- a/jstorm-core/src/main/java/backtype/storm/multilang/NoOutputException.java
+++ b/jstorm-core/src/main/java/backtype/storm/multilang/NoOutputException.java
@@ -18,8 +18,7 @@
 package backtype.storm.multilang;
 
 /**
- * A NoOutputException states that no data has been received from the connected
- * non-JVM process.
+ * A NoOutputException states that no data has been received from the 
connected non-JVM process.
  */
 public class NoOutputException extends Exception {
     public NoOutputException() {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/multilang/ShellMsg.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/multilang/ShellMsg.java 
b/jstorm-core/src/main/java/backtype/storm/multilang/ShellMsg.java
index 9eafb1a..01434e0 100755
--- a/jstorm-core/src/main/java/backtype/storm/multilang/ShellMsg.java
+++ b/jstorm-core/src/main/java/backtype/storm/multilang/ShellMsg.java
@@ -21,15 +21,12 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * ShellMsg is an object that represents the data sent to a shell component 
from
- * a process that implements a multi-language protocol. It is the union of all
+ * ShellMsg is an object that represents the data sent to a shell component 
from a process that implements a multi-language protocol. It is the union of all
  * data types that a component can send to Storm.
- *
+ * 
  * <p>
- * ShellMsgs are objects received from the ISerializer interface, after the
- * serializer has deserialized the data from the underlying wire protocol. The
- * ShellMsg class allows for a decoupling between the serialized representation
- * of the data and the data itself.
+ * ShellMsgs are objects received from the ISerializer interface, after the 
serializer has deserialized the data from the underlying wire protocol. The 
ShellMsg
+ * class allows for a decoupling between the serialized representation of the 
data and the data itself.
  * </p>
  */
 public class ShellMsg {
@@ -42,22 +39,28 @@ public class ShellMsg {
     private List<Object> tuple;
     private boolean needTaskIds;
 
-    //metrics rpc 
+    // metrics rpc
     private String metricName;
     private Object metricParams;
 
-    //logLevel
+    // logLevel
     public enum ShellLogLevel {
         TRACE, DEBUG, INFO, WARN, ERROR;
 
         public static ShellLogLevel fromInt(int i) {
             switch (i) {
-                case 0: return TRACE;
-                case 1: return DEBUG;
-                case 2: return INFO;
-                case 3: return WARN;
-                case 4: return ERROR;
-                default: return INFO;
+            case 0:
+                return TRACE;
+            case 1:
+                return DEBUG;
+            case 2:
+                return INFO;
+            case 3:
+                return WARN;
+            case 4:
+                return ERROR;
+            default:
+                return INFO;
             }
         }
     }
@@ -168,18 +171,8 @@ public class ShellMsg {
 
     @Override
     public String toString() {
-        return "ShellMsg{" +
-                "command='" + command + '\'' +
-                ", id=" + id +
-                ", anchors=" + anchors +
-                ", stream='" + stream + '\'' +
-                ", task=" + task +
-                ", msg='" + msg + '\'' +
-                ", tuple=" + tuple +
-                ", needTaskIds=" + needTaskIds +
-                ", metricName='" + metricName + '\'' +
-                ", metricParams=" + metricParams +
-                ", logLevel=" + logLevel +
-                '}';
+        return "ShellMsg{" + "command='" + command + '\'' + ", id=" + id + ", 
anchors=" + anchors + ", stream='" + stream + '\'' + ", task=" + task + ", 
msg='"
+                + msg + '\'' + ", tuple=" + tuple + ", needTaskIds=" + 
needTaskIds + ", metricName='" + metricName + '\'' + ", metricParams=" + 
metricParams
+                + ", logLevel=" + logLevel + '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/multilang/SpoutMsg.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/multilang/SpoutMsg.java 
b/jstorm-core/src/main/java/backtype/storm/multilang/SpoutMsg.java
index cb1b108..c08646c 100755
--- a/jstorm-core/src/main/java/backtype/storm/multilang/SpoutMsg.java
+++ b/jstorm-core/src/main/java/backtype/storm/multilang/SpoutMsg.java
@@ -18,15 +18,12 @@
 package backtype.storm.multilang;
 
 /**
- * SpoutMsg is an object that represents the data sent from a shell spout to a
- * process that implements a multi-language spout. The SpoutMsg is used to send
- * a "next", "ack" or "fail" message to a spout.
- *
+ * SpoutMsg is an object that represents the data sent from a shell spout to a 
process that implements a multi-language spout. The SpoutMsg is used to send a
+ * "next", "ack" or "fail" message to a spout.
+ * 
  * <p>
- * Spout messages are objects sent to the ISerializer interface, for
- * serialization according to the wire protocol implemented by the serializer.
- * The SpoutMsg class allows for a decoupling between the serialized
- * representation of the data and the data itself.
+ * Spout messages are objects sent to the ISerializer interface, for 
serialization according to the wire protocol implemented by the serializer. The 
SpoutMsg
+ * class allows for a decoupling between the serialized representation of the 
data and the data itself.
  * </p>
  */
 public class SpoutMsg {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/nimbus/DefaultTopologyValidator.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/nimbus/DefaultTopologyValidator.java 
b/jstorm-core/src/main/java/backtype/storm/nimbus/DefaultTopologyValidator.java
index a687215..14c5723 100755
--- 
a/jstorm-core/src/main/java/backtype/storm/nimbus/DefaultTopologyValidator.java
+++ 
b/jstorm-core/src/main/java/backtype/storm/nimbus/DefaultTopologyValidator.java
@@ -23,9 +23,10 @@ import java.util.Map;
 
 public class DefaultTopologyValidator implements ITopologyValidator {
     @Override
-    public void prepare(Map StormConf){
+    public void prepare(Map StormConf) {
     }
+
     @Override
-    public void validate(String topologyName, Map topologyConf, StormTopology 
topology) throws InvalidTopologyException {        
-    }    
+    public void validate(String topologyName, Map topologyConf, StormTopology 
topology) throws InvalidTopologyException {
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/nimbus/ITopologyValidator.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/nimbus/ITopologyValidator.java 
b/jstorm-core/src/main/java/backtype/storm/nimbus/ITopologyValidator.java
index 99bd07b..36e2c18 100755
--- a/jstorm-core/src/main/java/backtype/storm/nimbus/ITopologyValidator.java
+++ b/jstorm-core/src/main/java/backtype/storm/nimbus/ITopologyValidator.java
@@ -23,6 +23,6 @@ import java.util.Map;
 
 public interface ITopologyValidator {
     void prepare(Map StormConf);
-    void validate(String topologyName, Map topologyConf, StormTopology 
topology)
-            throws InvalidTopologyException;
+
+    void validate(String topologyName, Map topologyConf, StormTopology 
topology) throws InvalidTopologyException;
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/planner/CompoundSpout.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/planner/CompoundSpout.java 
b/jstorm-core/src/main/java/backtype/storm/planner/CompoundSpout.java
index 141b24b..6665d82 100755
--- a/jstorm-core/src/main/java/backtype/storm/planner/CompoundSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/planner/CompoundSpout.java
@@ -17,9 +17,8 @@
  */
 package backtype.storm.planner;
 
-
 public class CompoundSpout
-        //implements ISpout
+// implements ISpout
 {
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/planner/CompoundTask.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/planner/CompoundTask.java 
b/jstorm-core/src/main/java/backtype/storm/planner/CompoundTask.java
index 40a7f37..2bd56ee 100755
--- a/jstorm-core/src/main/java/backtype/storm/planner/CompoundTask.java
+++ b/jstorm-core/src/main/java/backtype/storm/planner/CompoundTask.java
@@ -17,9 +17,8 @@
  */
 package backtype.storm.planner;
 
-
 public class CompoundTask
-//        implements IBolt
+// implements IBolt
 {
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/planner/TaskBundle.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/planner/TaskBundle.java 
b/jstorm-core/src/main/java/backtype/storm/planner/TaskBundle.java
index 81c6209..4542aad 100755
--- a/jstorm-core/src/main/java/backtype/storm/planner/TaskBundle.java
+++ b/jstorm-core/src/main/java/backtype/storm/planner/TaskBundle.java
@@ -20,14 +20,13 @@ package backtype.storm.planner;
 import backtype.storm.task.IBolt;
 import java.io.Serializable;
 
-
 public class TaskBundle implements Serializable {
     public IBolt task;
     public int componentId;
-    
+
     public TaskBundle(IBolt task, int componentId) {
         this.task = task;
         this.componentId = componentId;
     }
-    
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/Cluster.java 
b/jstorm-core/src/main/java/backtype/storm/scheduler/Cluster.java
index e0c7cc7..14d9ede 100755
--- a/jstorm-core/src/main/java/backtype/storm/scheduler/Cluster.java
+++ b/jstorm-core/src/main/java/backtype/storm/scheduler/Cluster.java
@@ -30,25 +30,25 @@ public class Cluster {
     /**
      * key: supervisor id, value: supervisor details
      */
-    private Map<String, SupervisorDetails>   supervisors;
+    private Map<String, SupervisorDetails> supervisors;
     /**
      * key: topologyId, value: topology's current assignments.
      */
     private Map<String, SchedulerAssignmentImpl> assignments;
     /**
      * key topologyId, Value: scheduler's status.
-     */  
+     */
     private Map<String, String> status;
 
     /**
      * a map from hostname to supervisor id.
      */
-    private Map<String, List<String>>        hostToId;
-    
+    private Map<String, List<String>> hostToId;
+
     private Set<String> blackListedHosts = new HashSet<String>();
     private INimbus inimbus;
 
-    public Cluster(INimbus nimbus, Map<String, SupervisorDetails> supervisors, 
Map<String, SchedulerAssignmentImpl> assignments){
+    public Cluster(INimbus nimbus, Map<String, SupervisorDetails> supervisors, 
Map<String, SchedulerAssignmentImpl> assignments) {
         this.inimbus = nimbus;
         this.supervisors = new HashMap<String, 
SupervisorDetails>(supervisors.size());
         this.supervisors.putAll(supervisors);
@@ -65,35 +65,36 @@ public class Cluster {
             this.hostToId.get(host).add(nodeId);
         }
     }
-    
+
     public void setBlacklistedHosts(Set<String> hosts) {
         blackListedHosts = hosts;
     }
-    
+
     public Set<String> getBlacklistedHosts() {
         return blackListedHosts;
     }
-    
+
     public void blacklistHost(String host) {
         // this is so it plays well with setting blackListedHosts to an 
immutable list
-        if(blackListedHosts==null) blackListedHosts = new HashSet<String>();
-        if(!(blackListedHosts instanceof HashSet))
+        if (blackListedHosts == null)
+            blackListedHosts = new HashSet<String>();
+        if (!(blackListedHosts instanceof HashSet))
             blackListedHosts = new HashSet<String>(blackListedHosts);
         blackListedHosts.add(host);
     }
-    
+
     public boolean isBlackListed(String supervisorId) {
-        return blackListedHosts != null && 
blackListedHosts.contains(getHost(supervisorId));        
+        return blackListedHosts != null && 
blackListedHosts.contains(getHost(supervisorId));
     }
 
     public boolean isBlacklistedHost(String host) {
-        return blackListedHosts != null && blackListedHosts.contains(host);  
+        return blackListedHosts != null && blackListedHosts.contains(host);
     }
-    
+
     public String getHost(String supervisorId) {
         return inimbus.getHostName(supervisors, supervisorId);
     }
-    
+
     /**
      * Gets all the topologies which needs scheduling.
      * 
@@ -116,8 +117,8 @@ public class Cluster {
      * 
      * A topology needs scheduling if one of the following conditions holds:
      * <ul>
-     *   <li>Although the topology is assigned slots, but is squeezed. i.e. 
the topology is assigned less slots than desired.</li>
-     *   <li>There are unassigned executors in this topology</li>
+     * <li>Although the topology is assigned slots, but is squeezed. i.e. the 
topology is assigned less slots than desired.</li>
+     * <li>There are unassigned executors in this topology</li>
      * </ul>
      */
     public boolean needsScheduling(TopologyDetails topology) {
@@ -139,7 +140,7 @@ public class Cluster {
      */
     public Map<ExecutorDetails, String> 
getNeedsSchedulingExecutorToComponents(TopologyDetails topology) {
         Collection<ExecutorDetails> allExecutors = new 
HashSet(topology.getExecutors());
-        
+
         SchedulerAssignment assignment = 
this.assignments.get(topology.getId());
         if (assignment != null) {
             Collection<ExecutorDetails> assignedExecutors = 
assignment.getExecutors();
@@ -148,7 +149,7 @@ public class Cluster {
 
         return topology.selectExecutorToComponent(allExecutors);
     }
-    
+
     /**
      * Gets a component-id -> executors map which needs scheduling in this 
topology.
      * 
@@ -163,14 +164,13 @@ public class Cluster {
             if (!componentToExecutors.containsKey(component)) {
                 componentToExecutors.put(component, new 
ArrayList<ExecutorDetails>());
             }
-            
+
             componentToExecutors.get(component).add(executor);
         }
-        
+
         return componentToExecutors;
     }
 
-
     /**
      * Get all the used ports of this supervisor.
      * 
@@ -207,9 +207,10 @@ public class Cluster {
 
         return ret;
     }
-    
+
     public Set<Integer> getAssignablePorts(SupervisorDetails supervisor) {
-        if(isBlackListed(supervisor.id)) return new HashSet();
+        if (isBlackListed(supervisor.id))
+            return new HashSet();
         return supervisor.allPorts;
     }
 
@@ -229,7 +230,7 @@ public class Cluster {
 
         return slots;
     }
-    
+
     public List<WorkerSlot> getAssignableSlots(SupervisorDetails supervisor) {
         Set<Integer> ports = this.getAssignablePorts(supervisor);
         List<WorkerSlot> slots = new ArrayList<WorkerSlot>(ports.size());
@@ -238,9 +239,9 @@ public class Cluster {
             slots.add(new WorkerSlot(supervisor.getId(), port));
         }
 
-        return slots;        
+        return slots;
     }
-    
+
     /**
      * get the unassigned executors of the topology.
      */
@@ -250,13 +251,13 @@ public class Cluster {
         }
 
         Collection<ExecutorDetails> ret = new HashSet(topology.getExecutors());
-        
+
         SchedulerAssignment assignment = 
this.getAssignmentById(topology.getId());
         if (assignment != null) {
             Set<ExecutorDetails> assignedExecutors = assignment.getExecutors();
             ret.removeAll(assignedExecutors);
         }
-        
+
         return ret;
     }
 
@@ -287,16 +288,16 @@ public class Cluster {
         if (this.isSlotOccupied(slot)) {
             throw new RuntimeException("slot: [" + slot.getNodeId() + ", " + 
slot.getPort() + "] is already occupied.");
         }
-        
-        SchedulerAssignmentImpl assignment = 
(SchedulerAssignmentImpl)this.getAssignmentById(topologyId);
+
+        SchedulerAssignmentImpl assignment = (SchedulerAssignmentImpl) 
this.getAssignmentById(topologyId);
         if (assignment == null) {
             assignment = new SchedulerAssignmentImpl(topologyId, new 
HashMap<ExecutorDetails, WorkerSlot>());
             this.assignments.put(topologyId, assignment);
         } else {
             for (ExecutorDetails executor : executors) {
-                 if (assignment.isExecutorAssigned(executor)) {
-                     throw new RuntimeException("the executor is already 
assigned, you should unassign it before assign it to another slot.");
-                 }
+                if (assignment.isExecutorAssigned(executor)) {
+                    throw new RuntimeException("the executor is already 
assigned, you should unassign it before assign it to another slot.");
+                }
             }
         }
 
@@ -316,7 +317,7 @@ public class Cluster {
 
         return slots;
     }
-    
+
     public List<WorkerSlot> getAssignableSlots() {
         List<WorkerSlot> slots = new ArrayList<WorkerSlot>();
         for (SupervisorDetails supervisor : this.supervisors.values()) {
@@ -339,14 +340,14 @@ public class Cluster {
             }
         }
     }
-    
+
     /**
      * free the slots.
      * 
      * @param slots
      */
     public void freeSlots(Collection<WorkerSlot> slots) {
-        if(slots!=null) {
+        if (slots != null) {
             for (WorkerSlot slot : slots) {
                 this.freeSlot(slot);
             }
@@ -365,10 +366,10 @@ public class Cluster {
                 return true;
             }
         }
-        
+
         return false;
     }
-    
+
     /**
      * get the current assignment for the topology.
      */
@@ -390,10 +391,10 @@ public class Cluster {
 
         return null;
     }
-    
+
     public Collection<WorkerSlot> getUsedSlots() {
         Set<WorkerSlot> ret = new HashSet();
-        for(SchedulerAssignmentImpl s: assignments.values()) {
+        for (SchedulerAssignmentImpl s : assignments.values()) {
             ret.addAll(s.getExecutorToSlot().values());
         }
         return ret;
@@ -423,11 +424,11 @@ public class Cluster {
      */
     public Map<String, SchedulerAssignment> getAssignments() {
         Map<String, SchedulerAssignment> ret = new HashMap<String, 
SchedulerAssignment>(this.assignments.size());
-        
+
         for (String topologyId : this.assignments.keySet()) {
             ret.put(topologyId, this.assignments.get(topologyId));
         }
-        
+
         return ret;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/ExecutorDetails.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/scheduler/ExecutorDetails.java 
b/jstorm-core/src/main/java/backtype/storm/scheduler/ExecutorDetails.java
index bcf4aca..5934d33 100755
--- a/jstorm-core/src/main/java/backtype/storm/scheduler/ExecutorDetails.java
+++ b/jstorm-core/src/main/java/backtype/storm/scheduler/ExecutorDetails.java
@@ -21,7 +21,7 @@ public class ExecutorDetails {
     int startTask;
     int endTask;
 
-    public ExecutorDetails(int startTask, int endTask){
+    public ExecutorDetails(int startTask, int endTask) {
         this.startTask = startTask;
         this.endTask = endTask;
     }
@@ -38,17 +38,17 @@ public class ExecutorDetails {
         if (other == null || !(other instanceof ExecutorDetails)) {
             return false;
         }
-        
-        ExecutorDetails executor = (ExecutorDetails)other;
+
+        ExecutorDetails executor = (ExecutorDetails) other;
         return (this.startTask == executor.startTask) && (this.endTask == 
executor.endTask);
     }
-    
+
     public int hashCode() {
         return this.startTask + 13 * this.endTask;
     }
-    
+
     @Override
     public String toString() {
-       return "[" + this.startTask + ", " + this.endTask + "]";
+        return "[" + this.startTask + ", " + this.endTask + "]";
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/INimbus.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/INimbus.java 
b/jstorm-core/src/main/java/backtype/storm/scheduler/INimbus.java
index a0fb417..b13beb3 100755
--- a/jstorm-core/src/main/java/backtype/storm/scheduler/INimbus.java
+++ b/jstorm-core/src/main/java/backtype/storm/scheduler/INimbus.java
@@ -23,17 +23,19 @@ import java.util.Set;
 
 public interface INimbus {
     void prepare(Map stormConf, String schedulerLocalDir);
+
     /**
-     * Returns all slots that are available for the next round of scheduling. 
A slot is available for scheduling
-     * if it is free and can be assigned to, or if it is used and can be 
reassigned.
+     * Returns all slots that are available for the next round of scheduling. 
A slot is available for scheduling if it is free and can be assigned to, or if 
it
+     * is used and can be reassigned.
      */
-    Collection<WorkerSlot> 
allSlotsAvailableForScheduling(Collection<SupervisorDetails> 
existingSupervisors, Topologies topologies, Set<String> 
topologiesMissingAssignments);
+    Collection<WorkerSlot> 
allSlotsAvailableForScheduling(Collection<SupervisorDetails> 
existingSupervisors, Topologies topologies,
+                                                          Set<String> 
topologiesMissingAssignments);
 
     // this is called after the assignment is changed in ZK
     void assignSlots(Topologies topologies, Map<String, 
Collection<WorkerSlot>> newSlotsByTopologyId);
-    
+
     // map from node id to supervisor details
     String getHostName(Map<String, SupervisorDetails> existingSupervisors, 
String nodeId);
-    
-    IScheduler getForcedScheduler(); 
+
+    IScheduler getForcedScheduler();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/IScheduler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/IScheduler.java 
b/jstorm-core/src/main/java/backtype/storm/scheduler/IScheduler.java
index 5395882..93096f8 100755
--- a/jstorm-core/src/main/java/backtype/storm/scheduler/IScheduler.java
+++ b/jstorm-core/src/main/java/backtype/storm/scheduler/IScheduler.java
@@ -19,22 +19,18 @@ package backtype.storm.scheduler;
 
 import java.util.Map;
 
-
 public interface IScheduler {
-    
+
     void prepare(Map conf);
-    
+
     /**
-     * Set assignments for the topologies which needs scheduling. The new 
assignments is available 
-     * through <code>cluster.getAssignments()</code>
-     *
-     *@param topologies all the topologies in the cluster, some of them need 
schedule. Topologies object here 
-     *       only contain static information about topologies. Information 
like assignments, slots are all in
-     *       the <code>cluster</code>object.
-     *@param cluster the cluster these topologies are running in. 
<code>cluster</code> contains everything user
-     *       need to develop a new scheduling logic. e.g. supervisors 
information, available slots, current 
-     *       assignments for all the topologies etc. User can set the new 
assignment for topologies using
-     *       <code>cluster.setAssignmentById</code>
+     * Set assignments for the topologies which needs scheduling. The new 
assignments is available through <code>cluster.getAssignments()</code>
+     * 
+     * @param topologies all the topologies in the cluster, some of them need 
schedule. Topologies object here only contain static information about 
topologies.
+     *            Information like assignments, slots are all in the 
<code>cluster</code>object.
+     * @param cluster the cluster these topologies are running in. 
<code>cluster</code> contains everything user need to develop a new scheduling 
logic. e.g.
+     *            supervisors information, available slots, current 
assignments for all the topologies etc. User can set the new assignment for 
topologies using
+     *            <code>cluster.setAssignmentById</code>
      */
     void schedule(Topologies topologies, Cluster cluster);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/ISupervisor.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/scheduler/ISupervisor.java 
b/jstorm-core/src/main/java/backtype/storm/scheduler/ISupervisor.java
index 64e1595..d64f851 100755
--- a/jstorm-core/src/main/java/backtype/storm/scheduler/ISupervisor.java
+++ b/jstorm-core/src/main/java/backtype/storm/scheduler/ISupervisor.java
@@ -20,26 +20,29 @@ package backtype.storm.scheduler;
 import java.util.Map;
 import java.util.Collection;
 
-
 public interface ISupervisor {
     void prepare(Map stormConf, String schedulerLocalDir);
+
     // for mesos, this is {hostname}-{topologyid}
     /**
      * The id used for writing metadata into ZK.
      */
     String getSupervisorId();
+
     /**
-     * The id used in assignments. This combined with confirmAssigned decides 
what
-     * this supervisor is responsible for. The combination of this and 
getSupervisorId
-     * allows Nimbus to assign to a single machine and have multiple 
supervisors
-     * on that machine execute the assignment. This is important for achieving 
resource isolation.
+     * The id used in assignments. This combined with confirmAssigned decides 
what this supervisor is responsible for. The combination of this and
+     * getSupervisorId allows Nimbus to assign to a single machine and have 
multiple supervisors on that machine execute the assignment. This is important 
for
+     * achieving resource isolation.
      */
     String getAssignmentId();
+
     Object getMetadata();
-    
+
     boolean confirmAssigned(int port);
+
     // calls this before actually killing the worker locally...
     // sends a "task finished" update
     void killedWorker(int port);
+
     void assigned(Collection<Integer> ports);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/SchedulerAssignment.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/scheduler/SchedulerAssignment.java 
b/jstorm-core/src/main/java/backtype/storm/scheduler/SchedulerAssignment.java
index 0212e48..7451dcc 100755
--- 
a/jstorm-core/src/main/java/backtype/storm/scheduler/SchedulerAssignment.java
+++ 
b/jstorm-core/src/main/java/backtype/storm/scheduler/SchedulerAssignment.java
@@ -23,6 +23,7 @@ import java.util.Set;
 public interface SchedulerAssignment {
     /**
      * Does this slot occupied by this assignment?
+     * 
      * @param slot
      * @return
      */
@@ -35,24 +36,27 @@ public interface SchedulerAssignment {
      * @return
      */
     public boolean isExecutorAssigned(ExecutorDetails executor);
-    
+
     /**
      * get the topology-id this assignment is for.
+     * 
      * @return
      */
     public String getTopologyId();
 
     /**
      * get the executor -> slot map.
+     * 
      * @return
      */
     public Map<ExecutorDetails, WorkerSlot> getExecutorToSlot();
 
     /**
      * Return the executors covered by this assignments
+     * 
      * @return
      */
     public Set<ExecutorDetails> getExecutors();
-    
+
     public Set<WorkerSlot> getSlots();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/SchedulerAssignmentImpl.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/scheduler/SchedulerAssignmentImpl.java
 
b/jstorm-core/src/main/java/backtype/storm/scheduler/SchedulerAssignmentImpl.java
index 08af4b7..7a6947f 100755
--- 
a/jstorm-core/src/main/java/backtype/storm/scheduler/SchedulerAssignmentImpl.java
+++ 
b/jstorm-core/src/main/java/backtype/storm/scheduler/SchedulerAssignmentImpl.java
@@ -35,7 +35,7 @@ public class SchedulerAssignmentImpl implements 
SchedulerAssignment {
      * assignment detail, a mapping from executor to <code>WorkerSlot</code>
      */
     Map<ExecutorDetails, WorkerSlot> executorToSlot;
-    
+
     public SchedulerAssignmentImpl(String topologyId, Map<ExecutorDetails, 
WorkerSlot> executorToSlots) {
         this.topologyId = topologyId;
         this.executorToSlot = new HashMap<ExecutorDetails, WorkerSlot>(0);
@@ -47,10 +47,11 @@ public class SchedulerAssignmentImpl implements 
SchedulerAssignment {
     @Override
     public Set<WorkerSlot> getSlots() {
         return new HashSet(executorToSlot.values());
-    }    
-    
+    }
+
     /**
      * Assign the slot to executors.
+     * 
      * @param slot
      * @param executors
      */
@@ -59,9 +60,10 @@ public class SchedulerAssignmentImpl implements 
SchedulerAssignment {
             this.executorToSlot.put(executor, slot);
         }
     }
-    
+
     /**
      * Release the slot occupied by this assignment.
+     * 
      * @param slot
      */
     public void unassignBySlot(WorkerSlot slot) {
@@ -72,7 +74,7 @@ public class SchedulerAssignmentImpl implements 
SchedulerAssignment {
                 executors.add(executor);
             }
         }
-        
+
         // remove
         for (ExecutorDetails executor : executors) {
             this.executorToSlot.remove(executor);
@@ -81,6 +83,7 @@ public class SchedulerAssignmentImpl implements 
SchedulerAssignment {
 
     /**
      * Does this slot occupied by this assignment?
+     * 
      * @param slot
      * @return
      */
@@ -91,7 +94,7 @@ public class SchedulerAssignmentImpl implements 
SchedulerAssignment {
     public boolean isExecutorAssigned(ExecutorDetails executor) {
         return this.executorToSlot.containsKey(executor);
     }
-    
+
     public String getTopologyId() {
         return this.topologyId;
     }
@@ -102,6 +105,7 @@ public class SchedulerAssignmentImpl implements 
SchedulerAssignment {
 
     /**
      * Return the executors covered by this assignments
+     * 
      * @return
      */
     public Set<ExecutorDetails> getExecutors() {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/SupervisorDetails.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/scheduler/SupervisorDetails.java 
b/jstorm-core/src/main/java/backtype/storm/scheduler/SupervisorDetails.java
index 7497f26..2b8a400 100755
--- a/jstorm-core/src/main/java/backtype/storm/scheduler/SupervisorDetails.java
+++ b/jstorm-core/src/main/java/backtype/storm/scheduler/SupervisorDetails.java
@@ -38,19 +38,19 @@ public class SupervisorDetails {
      */
     Set<Integer> allPorts;
 
-    public SupervisorDetails(String id, Object meta){
+    public SupervisorDetails(String id, Object meta) {
         this.id = id;
         this.meta = meta;
         allPorts = new HashSet();
     }
-    
-    public SupervisorDetails(String id, Object meta, Collection<Number> 
allPorts){
+
+    public SupervisorDetails(String id, Object meta, Collection<Number> 
allPorts) {
         this.id = id;
         this.meta = meta;
         setAllPorts(allPorts);
     }
 
-    public SupervisorDetails(String id, String host, Object schedulerMeta, 
Collection<Number> allPorts){
+    public SupervisorDetails(String id, String host, Object schedulerMeta, 
Collection<Number> allPorts) {
         this.id = id;
         this.host = host;
         this.schedulerMeta = schedulerMeta;
@@ -60,8 +60,8 @@ public class SupervisorDetails {
 
     private void setAllPorts(Collection<Number> allPorts) {
         this.allPorts = new HashSet<Integer>();
-        if(allPorts!=null) {
-            for(Number n: allPorts) {
+        if (allPorts != null) {
+            for (Number n : allPorts) {
                 this.allPorts.add(n.intValue());
             }
         }
@@ -78,7 +78,7 @@ public class SupervisorDetails {
     public Object getMeta() {
         return meta;
     }
-    
+
     public Set<Integer> getAllPorts() {
         return allPorts;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/Topologies.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/Topologies.java 
b/jstorm-core/src/main/java/backtype/storm/scheduler/Topologies.java
index 70af1b4..771fcf2 100755
--- a/jstorm-core/src/main/java/backtype/storm/scheduler/Topologies.java
+++ b/jstorm-core/src/main/java/backtype/storm/scheduler/Topologies.java
@@ -24,33 +24,34 @@ import java.util.Map;
 public class Topologies {
     Map<String, TopologyDetails> topologies;
     Map<String, String> nameToId;
-    
+
     public Topologies(Map<String, TopologyDetails> topologies) {
-        if(topologies==null) topologies = new HashMap();
+        if (topologies == null)
+            topologies = new HashMap();
         this.topologies = new HashMap<String, 
TopologyDetails>(topologies.size());
         this.topologies.putAll(topologies);
         this.nameToId = new HashMap<String, String>(topologies.size());
-        
+
         for (String topologyId : topologies.keySet()) {
             TopologyDetails topology = topologies.get(topologyId);
             this.nameToId.put(topology.getName(), topologyId);
         }
     }
-    
+
     public TopologyDetails getById(String topologyId) {
         return this.topologies.get(topologyId);
     }
-    
+
     public TopologyDetails getByName(String topologyName) {
         String topologyId = this.nameToId.get(topologyName);
-        
+
         if (topologyId == null) {
             return null;
         } else {
             return this.getById(topologyId);
         }
     }
-    
+
     public Collection<TopologyDetails> getTopologies() {
         return this.topologies.values();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/scheduler/TopologyDetails.java 
b/jstorm-core/src/main/java/backtype/storm/scheduler/TopologyDetails.java
index 6daf4ed..84b3966 100755
--- a/jstorm-core/src/main/java/backtype/storm/scheduler/TopologyDetails.java
+++ b/jstorm-core/src/main/java/backtype/storm/scheduler/TopologyDetails.java
@@ -24,21 +24,20 @@ import java.util.Map;
 import backtype.storm.Config;
 import backtype.storm.generated.StormTopology;
 
-
 public class TopologyDetails {
     String topologyId;
     Map topologyConf;
     StormTopology topology;
     Map<ExecutorDetails, String> executorToComponent;
     int numWorkers;
- 
+
     public TopologyDetails(String topologyId, Map topologyConf, StormTopology 
topology, int numWorkers) {
         this.topologyId = topologyId;
         this.topologyConf = topologyConf;
         this.topology = topology;
         this.numWorkers = numWorkers;
     }
-    
+
     public TopologyDetails(String topologyId, Map topologyConf, StormTopology 
topology, int numWorkers, Map<ExecutorDetails, String> executorToComponents) {
         this(topologyId, topologyConf, topology, numWorkers);
         this.executorToComponent = new HashMap<ExecutorDetails, String>(0);
@@ -46,23 +45,23 @@ public class TopologyDetails {
             this.executorToComponent.putAll(executorToComponents);
         }
     }
-    
+
     public String getId() {
         return topologyId;
     }
-    
+
     public String getName() {
-        return (String)this.topologyConf.get(Config.TOPOLOGY_NAME);
+        return (String) this.topologyConf.get(Config.TOPOLOGY_NAME);
     }
-    
+
     public Map getConf() {
         return topologyConf;
     }
-    
+
     public int getNumWorkers() {
         return numWorkers;
     }
-    
+
     public StormTopology getTopology() {
         return topology;
     }
@@ -79,10 +78,10 @@ public class TopologyDetails {
                 ret.put(executor, compId);
             }
         }
-        
+
         return ret;
     }
-    
+
     public Collection<ExecutorDetails> getExecutors() {
         return this.executorToComponent.keySet();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/WorkerSlot.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/WorkerSlot.java 
b/jstorm-core/src/main/java/backtype/storm/scheduler/WorkerSlot.java
index 8331ad8..baeb233 100755
--- a/jstorm-core/src/main/java/backtype/storm/scheduler/WorkerSlot.java
+++ b/jstorm-core/src/main/java/backtype/storm/scheduler/WorkerSlot.java
@@ -20,36 +20,36 @@ package backtype.storm.scheduler;
 import java.io.Serializable;
 
 public class WorkerSlot implements Comparable<WorkerSlot>, Serializable {
-    
+
     private static final long serialVersionUID = -4451854497340313268L;
     String nodeId;
     int port;
-    
+
     public WorkerSlot(String nodeId, Number port) {
         this.nodeId = nodeId;
         this.port = port.intValue();
     }
-    
+
     public WorkerSlot() {
-        
+
     }
-    
+
     public String getNodeId() {
         return nodeId;
     }
-    
+
     public int getPort() {
         return port;
     }
-    
+
     public void setNodeId(String nodeId) {
         this.nodeId = nodeId;
     }
-    
+
     public void setPort(int port) {
         this.port = port;
     }
-    
+
     @Override
     public int hashCode() {
         final int prime = 31;
@@ -58,7 +58,7 @@ public class WorkerSlot implements Comparable<WorkerSlot>, 
Serializable {
         result = prime * result + port;
         return result;
     }
-    
+
     @Override
     public boolean equals(Object obj) {
         if (this == obj)
@@ -77,12 +77,12 @@ public class WorkerSlot implements Comparable<WorkerSlot>, 
Serializable {
             return false;
         return true;
     }
-    
+
     @Override
     public String toString() {
         return this.nodeId + ":" + this.port;
     }
-    
+
     @Override
     public int compareTo(WorkerSlot o) {
         String otherNode = o.getNodeId();

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/DefaultPool.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/DefaultPool.java
 
b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/DefaultPool.java
index 3053b5b..8064a0d 100755
--- 
a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/DefaultPool.java
+++ 
b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/DefaultPool.java
@@ -36,184 +36,183 @@ import backtype.storm.scheduler.WorkerSlot;
  * A pool of machines that anyone can use, but topologies are not isolated
  */
 public class DefaultPool extends NodePool {
-  private static final Logger LOG = LoggerFactory.getLogger(DefaultPool.class);
-  private Set<Node> _nodes = new HashSet<Node>();
-  private HashMap<String, TopologyDetails> _tds = new HashMap<String, 
TopologyDetails>();
-  
-  @Override
-  public void addTopology(TopologyDetails td) {
-    String topId = td.getId();
-    LOG.debug("Adding in Topology {}", topId);
-    _tds.put(topId, td);
-    SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
-    if (assignment != null) {
-      for (WorkerSlot ws: assignment.getSlots()) {
-        Node n = _nodeIdToNode.get(ws.getNodeId());
-        _nodes.add(n);
-      }
-    }
-  }
-
-  @Override
-  public boolean canAdd(TopologyDetails td) {
-    return true;
-  }
-
-  @Override
-  public Collection<Node> takeNodes(int nodesNeeded) {
-    HashSet<Node> ret = new HashSet<Node>();
-    LinkedList<Node> sortedNodes = new LinkedList<Node>(_nodes);
-    Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
-    for (Node n: sortedNodes) {
-      if (nodesNeeded <= ret.size()) {
-        break;
-      }
-      if (n.isAlive()) {
-        n.freeAllSlots(_cluster);
-        _nodes.remove(n);
-        ret.add(n);
-      }
-    }
-    return ret;
-  }
-  
-  @Override
-  public int nodesAvailable() {
-    int total = 0;
-    for (Node n: _nodes) {
-      if (n.isAlive()) total++;
-    }
-    return total;
-  }
-  
-  @Override
-  public int slotsAvailable() {
-    return Node.countTotalSlotsAlive(_nodes);
-  }
-
-  @Override
-  public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int 
slotsNeeded) {
-    int nodesFound = 0;
-    int slotsFound = 0;
-    LinkedList<Node> sortedNodes = new LinkedList<Node>(_nodes);
-    Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
-    for (Node n: sortedNodes) {
-      if (slotsNeeded <= 0) {
-        break;
-      }
-      if (n.isAlive()) {
-        nodesFound++;
-        int totalSlotsFree = n.totalSlots();
-        slotsFound += totalSlotsFree;
-        slotsNeeded -= totalSlotsFree;
-      }
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultPool.class);
+    private Set<Node> _nodes = new HashSet<Node>();
+    private HashMap<String, TopologyDetails> _tds = new HashMap<String, 
TopologyDetails>();
+
+    @Override
+    public void addTopology(TopologyDetails td) {
+        String topId = td.getId();
+        LOG.debug("Adding in Topology {}", topId);
+        _tds.put(topId, td);
+        SchedulerAssignment assignment = _cluster.getAssignmentById(topId);
+        if (assignment != null) {
+            for (WorkerSlot ws : assignment.getSlots()) {
+                Node n = _nodeIdToNode.get(ws.getNodeId());
+                _nodes.add(n);
+            }
+        }
     }
-    return new NodeAndSlotCounts(nodesFound, slotsFound);
-  }
-  
-  @Override
-  public Collection<Node> takeNodesBySlots(int slotsNeeded) {
-    HashSet<Node> ret = new HashSet<Node>();
-    LinkedList<Node> sortedNodes = new LinkedList<Node>(_nodes);
-    Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
-    for (Node n: sortedNodes) {
-      if (slotsNeeded <= 0) {
-        break;
-      }
-      if (n.isAlive()) {
-        n.freeAllSlots(_cluster);
-        _nodes.remove(n);
-        ret.add(n);
-        slotsNeeded -= n.totalSlotsFree();
-      }
+
+    @Override
+    public boolean canAdd(TopologyDetails td) {
+        return true;
     }
-    return ret;
-  }
-
-  @Override
-  public void scheduleAsNeeded(NodePool... lesserPools) {
-    for (TopologyDetails td : _tds.values()) {
-      String topId = td.getId();
-      if (_cluster.needsScheduling(td)) {
-        LOG.debug("Scheduling topology {}",topId);
-        int totalTasks = td.getExecutors().size();
-        int origRequest = td.getNumWorkers();
-        int slotsRequested = Math.min(totalTasks, origRequest);
-        int slotsUsed = Node.countSlotsUsed(topId, _nodes);
-        int slotsFree = Node.countFreeSlotsAlive(_nodes);
-        //Check to see if we have enough slots before trying to get them
-        int slotsAvailable = 0;
-        if (slotsRequested > slotsFree) {
-          slotsAvailable = NodePool.slotsAvailable(lesserPools);
-        }
-        int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + 
slotsAvailable);
-        int executorsNotRunning = _cluster.getUnassignedExecutors(td).size();
-        LOG.debug("Slots... requested {} used {} free {} available {} to be 
used {}, executors not running {}", 
-            new Object[] {slotsRequested, slotsUsed, slotsFree, 
slotsAvailable, slotsToUse, executorsNotRunning}); 
-        if (slotsToUse <= 0) {
-          if (executorsNotRunning > 0) {
-            _cluster.setStatus(topId,"Not fully scheduled (No free slots in 
default pool) "+executorsNotRunning+" executors not scheduled");
-          } else {
-            if (slotsUsed < slotsRequested) {
-              _cluster.setStatus(topId,"Running with fewer slots than 
requested ("+slotsUsed+"/"+origRequest+")");
-            } else { //slotsUsed < origRequest
-              _cluster.setStatus(topId,"Fully Scheduled (requested 
"+origRequest+" slots, but could only use "+slotsUsed+")");
+
+    @Override
+    public Collection<Node> takeNodes(int nodesNeeded) {
+        HashSet<Node> ret = new HashSet<Node>();
+        LinkedList<Node> sortedNodes = new LinkedList<Node>(_nodes);
+        Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
+        for (Node n : sortedNodes) {
+            if (nodesNeeded <= ret.size()) {
+                break;
+            }
+            if (n.isAlive()) {
+                n.freeAllSlots(_cluster);
+                _nodes.remove(n);
+                ret.add(n);
             }
-          }
-          continue;
         }
+        return ret;
+    }
 
-        int slotsNeeded = slotsToUse - slotsFree;
-        if (slotsNeeded > 0) {
-          _nodes.addAll(NodePool.takeNodesBySlot(slotsNeeded, lesserPools));
+    @Override
+    public int nodesAvailable() {
+        int total = 0;
+        for (Node n : _nodes) {
+            if (n.isAlive())
+                total++;
         }
+        return total;
+    }
+
+    @Override
+    public int slotsAvailable() {
+        return Node.countTotalSlotsAlive(_nodes);
+    }
 
-        if (executorsNotRunning <= 0) {
-          //There are free slots that we can take advantage of now.
-          for (Node n: _nodes) {
-            n.freeTopology(topId, _cluster); 
-          }
-          slotsFree = Node.countFreeSlotsAlive(_nodes);
-          slotsToUse = Math.min(slotsRequested, slotsFree);
+    @Override
+    public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int 
slotsNeeded) {
+        int nodesFound = 0;
+        int slotsFound = 0;
+        LinkedList<Node> sortedNodes = new LinkedList<Node>(_nodes);
+        Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
+        for (Node n : sortedNodes) {
+            if (slotsNeeded <= 0) {
+                break;
+            }
+            if (n.isAlive()) {
+                nodesFound++;
+                int totalSlotsFree = n.totalSlots();
+                slotsFound += totalSlotsFree;
+                slotsNeeded -= totalSlotsFree;
+            }
         }
-        
-        RoundRobinSlotScheduler slotSched = 
-          new RoundRobinSlotScheduler(td, slotsToUse, _cluster);
-        
-        LinkedList<Node> nodes = new LinkedList<Node>(_nodes);
-        while (true) {
-          Node n = null;
-          do {
-            if (nodes.isEmpty()) {
-              throw new IllegalStateException("This should not happen, we" +
-              " messed up and did not get enough slots");
+        return new NodeAndSlotCounts(nodesFound, slotsFound);
+    }
+
+    @Override
+    public Collection<Node> takeNodesBySlots(int slotsNeeded) {
+        HashSet<Node> ret = new HashSet<Node>();
+        LinkedList<Node> sortedNodes = new LinkedList<Node>(_nodes);
+        Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC);
+        for (Node n : sortedNodes) {
+            if (slotsNeeded <= 0) {
+                break;
             }
-            n = nodes.peekFirst();
-            if (n.totalSlotsFree() == 0) {
-              nodes.remove();
-              n = null;
+            if (n.isAlive()) {
+                n.freeAllSlots(_cluster);
+                _nodes.remove(n);
+                ret.add(n);
+                slotsNeeded -= n.totalSlotsFree();
             }
-          } while (n == null);
-          if (!slotSched.assignSlotTo(n)) {
-            break;
-          }
         }
-        int afterSchedSlotsUsed = Node.countSlotsUsed(topId, _nodes);
-        if (afterSchedSlotsUsed < slotsRequested) {
-          _cluster.setStatus(topId,"Running with fewer slots than requested 
("+afterSchedSlotsUsed+"/"+origRequest+")");
-        } else if (afterSchedSlotsUsed < origRequest) {
-          _cluster.setStatus(topId,"Fully Scheduled (requested "+origRequest+" 
slots, but could only use "+afterSchedSlotsUsed+")");
-        } else {
-          _cluster.setStatus(topId,"Fully Scheduled");
+        return ret;
+    }
+
+    @Override
+    public void scheduleAsNeeded(NodePool... lesserPools) {
+        for (TopologyDetails td : _tds.values()) {
+            String topId = td.getId();
+            if (_cluster.needsScheduling(td)) {
+                LOG.debug("Scheduling topology {}", topId);
+                int totalTasks = td.getExecutors().size();
+                int origRequest = td.getNumWorkers();
+                int slotsRequested = Math.min(totalTasks, origRequest);
+                int slotsUsed = Node.countSlotsUsed(topId, _nodes);
+                int slotsFree = Node.countFreeSlotsAlive(_nodes);
+                // Check to see if we have enough slots before trying to get 
them
+                int slotsAvailable = 0;
+                if (slotsRequested > slotsFree) {
+                    slotsAvailable = NodePool.slotsAvailable(lesserPools);
+                }
+                int slotsToUse = Math.min(slotsRequested - slotsUsed, 
slotsFree + slotsAvailable);
+                int executorsNotRunning = 
_cluster.getUnassignedExecutors(td).size();
+                LOG.debug("Slots... requested {} used {} free {} available {} 
to be used {}, executors not running {}", new Object[] { slotsRequested,
+                        slotsUsed, slotsFree, slotsAvailable, slotsToUse, 
executorsNotRunning });
+                if (slotsToUse <= 0) {
+                    if (executorsNotRunning > 0) {
+                        _cluster.setStatus(topId, "Not fully scheduled (No 
free slots in default pool) " + executorsNotRunning + " executors not 
scheduled");
+                    } else {
+                        if (slotsUsed < slotsRequested) {
+                            _cluster.setStatus(topId, "Running with fewer 
slots than requested (" + slotsUsed + "/" + origRequest + ")");
+                        } else { // slotsUsed < origRequest
+                            _cluster.setStatus(topId, "Fully Scheduled 
(requested " + origRequest + " slots, but could only use " + slotsUsed + ")");
+                        }
+                    }
+                    continue;
+                }
+
+                int slotsNeeded = slotsToUse - slotsFree;
+                if (slotsNeeded > 0) {
+                    _nodes.addAll(NodePool.takeNodesBySlot(slotsNeeded, 
lesserPools));
+                }
+
+                if (executorsNotRunning <= 0) {
+                    // There are free slots that we can take advantage of now.
+                    for (Node n : _nodes) {
+                        n.freeTopology(topId, _cluster);
+                    }
+                    slotsFree = Node.countFreeSlotsAlive(_nodes);
+                    slotsToUse = Math.min(slotsRequested, slotsFree);
+                }
+
+                RoundRobinSlotScheduler slotSched = new 
RoundRobinSlotScheduler(td, slotsToUse, _cluster);
+
+                LinkedList<Node> nodes = new LinkedList<Node>(_nodes);
+                while (true) {
+                    Node n = null;
+                    do {
+                        if (nodes.isEmpty()) {
+                            throw new IllegalStateException("This should not 
happen, we" + " messed up and did not get enough slots");
+                        }
+                        n = nodes.peekFirst();
+                        if (n.totalSlotsFree() == 0) {
+                            nodes.remove();
+                            n = null;
+                        }
+                    } while (n == null);
+                    if (!slotSched.assignSlotTo(n)) {
+                        break;
+                    }
+                }
+                int afterSchedSlotsUsed = Node.countSlotsUsed(topId, _nodes);
+                if (afterSchedSlotsUsed < slotsRequested) {
+                    _cluster.setStatus(topId, "Running with fewer slots than 
requested (" + afterSchedSlotsUsed + "/" + origRequest + ")");
+                } else if (afterSchedSlotsUsed < origRequest) {
+                    _cluster.setStatus(topId, "Fully Scheduled (requested " + 
origRequest + " slots, but could only use " + afterSchedSlotsUsed + ")");
+                } else {
+                    _cluster.setStatus(topId, "Fully Scheduled");
+                }
+            } else {
+                _cluster.setStatus(topId, "Fully Scheduled");
+            }
         }
-      } else {
-        _cluster.setStatus(topId,"Fully Scheduled");
-      }
     }
-  }
-  
-  @Override
-  public String toString() {
-    return "DefaultPool  " + _nodes.size() + " nodes " + _tds.size() + " 
topologies";
-  }
+
+    @Override
+    public String toString() {
+        return "DefaultPool  " + _nodes.size() + " nodes " + _tds.size() + " 
topologies";
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/FreePool.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/FreePool.java 
b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/FreePool.java
index c625895..239e529 100755
--- 
a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/FreePool.java
+++ 
b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/FreePool.java
@@ -34,92 +34,92 @@ import backtype.storm.scheduler.TopologyDetails;
  * All of the machines that currently have nothing assigned to them
  */
 public class FreePool extends NodePool {
-  private static final Logger LOG = LoggerFactory.getLogger(FreePool.class);
-  private Set<Node> _nodes = new HashSet<Node>();
-  private int _totalSlots = 0;
+    private static final Logger LOG = LoggerFactory.getLogger(FreePool.class);
+    private Set<Node> _nodes = new HashSet<Node>();
+    private int _totalSlots = 0;
 
-  @Override
-  public void init(Cluster cluster, Map<String, Node> nodeIdToNode) {
-    super.init(cluster, nodeIdToNode);
-    for (Node n: nodeIdToNode.values()) {
-      if(n.isTotallyFree() && n.isAlive()) {
-        _nodes.add(n);
-        _totalSlots += n.totalSlotsFree();
-      }
+    @Override
+    public void init(Cluster cluster, Map<String, Node> nodeIdToNode) {
+        super.init(cluster, nodeIdToNode);
+        for (Node n : nodeIdToNode.values()) {
+            if (n.isTotallyFree() && n.isAlive()) {
+                _nodes.add(n);
+                _totalSlots += n.totalSlotsFree();
+            }
+        }
+        LOG.debug("Found {} nodes with {} slots", _nodes.size(), _totalSlots);
     }
-    LOG.debug("Found {} nodes with {} slots", _nodes.size(), _totalSlots);
-  }
-  
-  @Override
-  public void addTopology(TopologyDetails td) {
-    throw new IllegalArgumentException("The free pool cannot run any 
topologies");
-  }
 
-  @Override
-  public boolean canAdd(TopologyDetails td) {
-    // The free pool never has anything running
-    return false;
-  }
-  
-  @Override
-  public Collection<Node> takeNodes(int nodesNeeded) {
-    HashSet<Node> ret = new HashSet<Node>();
-    Iterator<Node> it = _nodes.iterator();
-    while (it.hasNext() && nodesNeeded > ret.size()) {
-      Node n = it.next();
-      ret.add(n);
-      _totalSlots -= n.totalSlotsFree();
-      it.remove();
+    @Override
+    public void addTopology(TopologyDetails td) {
+        throw new IllegalArgumentException("The free pool cannot run any 
topologies");
     }
-    return ret;
-  }
-  
-  @Override
-  public int nodesAvailable() {
-    return _nodes.size();
-  }
 
-  @Override
-  public int slotsAvailable() {
-    return _totalSlots;
-  }
+    @Override
+    public boolean canAdd(TopologyDetails td) {
+        // The free pool never has anything running
+        return false;
+    }
+
+    @Override
+    public Collection<Node> takeNodes(int nodesNeeded) {
+        HashSet<Node> ret = new HashSet<Node>();
+        Iterator<Node> it = _nodes.iterator();
+        while (it.hasNext() && nodesNeeded > ret.size()) {
+            Node n = it.next();
+            ret.add(n);
+            _totalSlots -= n.totalSlotsFree();
+            it.remove();
+        }
+        return ret;
+    }
 
-  @Override
-  public Collection<Node> takeNodesBySlots(int slotsNeeded) {
-    HashSet<Node> ret = new HashSet<Node>();
-    Iterator<Node> it = _nodes.iterator();
-    while (it.hasNext() && slotsNeeded > 0) {
-      Node n = it.next();
-      ret.add(n);
-      _totalSlots -= n.totalSlotsFree();
-      slotsNeeded -= n.totalSlotsFree();
-      it.remove();
+    @Override
+    public int nodesAvailable() {
+        return _nodes.size();
     }
-    return ret;
-  }
-  
-  @Override
-  public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int 
slotsNeeded) {
-    int slotsFound = 0;
-    int nodesFound = 0;
-    Iterator<Node> it = _nodes.iterator();
-    while (it.hasNext() && slotsNeeded > 0) {
-      Node n = it.next();
-      nodesFound++;
-      int totalSlots = n.totalSlots();
-      slotsFound += totalSlots;
-      slotsNeeded -= totalSlots;
+
+    @Override
+    public int slotsAvailable() {
+        return _totalSlots;
+    }
+
+    @Override
+    public Collection<Node> takeNodesBySlots(int slotsNeeded) {
+        HashSet<Node> ret = new HashSet<Node>();
+        Iterator<Node> it = _nodes.iterator();
+        while (it.hasNext() && slotsNeeded > 0) {
+            Node n = it.next();
+            ret.add(n);
+            _totalSlots -= n.totalSlotsFree();
+            slotsNeeded -= n.totalSlotsFree();
+            it.remove();
+        }
+        return ret;
     }
-    return new NodeAndSlotCounts(nodesFound, slotsFound);
-  }
 
-  @Override
-  public void scheduleAsNeeded(NodePool... lesserPools) {
-    //No topologies running so NOOP
-  }
-  
-  @Override
-  public String toString() {
-    return "FreePool of "+_nodes.size()+" nodes with "+_totalSlots+" slots";
-  }
+    @Override
+    public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int 
slotsNeeded) {
+        int slotsFound = 0;
+        int nodesFound = 0;
+        Iterator<Node> it = _nodes.iterator();
+        while (it.hasNext() && slotsNeeded > 0) {
+            Node n = it.next();
+            nodesFound++;
+            int totalSlots = n.totalSlots();
+            slotsFound += totalSlots;
+            slotsNeeded -= totalSlots;
+        }
+        return new NodeAndSlotCounts(nodesFound, slotsFound);
+    }
+
+    @Override
+    public void scheduleAsNeeded(NodePool... lesserPools) {
+        // No topologies running so NOOP
+    }
+
+    @Override
+    public String toString() {
+        return "FreePool of " + _nodes.size() + " nodes with " + _totalSlots + 
" slots";
+    }
 }

Reply via email to