http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
 
b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
index 2e7ed02..94f2fed 100644
--- 
a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
+++ 
b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
@@ -1,19 +1,12 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.metricstore.rocksdb;
@@ -25,7 +18,6 @@ import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.metricstore.AggLevel;
@@ -48,9 +40,9 @@ import org.slf4j.LoggerFactory;
 
 
 public class RocksDbStore implements MetricStore, AutoCloseable {
+    static final int INVALID_METADATA_STRING_ID = 0;
     private static final Logger LOG = 
LoggerFactory.getLogger(RocksDbStore.class);
     private static final int MAX_QUEUE_CAPACITY = 4000;
-    static final int INVALID_METADATA_STRING_ID = 0;
     RocksDB db;
     private ReadOnlyStringMetadataCache readOnlyStringMetadataCache = null;
     private BlockingQueue queue = new LinkedBlockingQueue(MAX_QUEUE_CAPACITY);
@@ -58,10 +50,6 @@ public class RocksDbStore implements MetricStore, 
AutoCloseable {
     private MetricsCleaner metricsCleaner = null;
     private Meter failureMeter = null;
 
-    interface RocksDbScanCallback {
-        boolean cb(RocksDbKey key, RocksDbValue val);  // return false to stop 
scan
-    }
-
     /**
      * Create metric store instance using the configurations provided via the 
config map.
      *
@@ -132,7 +120,7 @@ public class RocksDbStore implements MetricStore, 
AutoCloseable {
 
         if 
(!(config.containsKey(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING))) {
             throw new MetricException("Not a vaild RocksDB configuration - 
Does not specify creation policy "
-                    + DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING);
+                                      + 
DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING);
         }
 
         // validate path defined
@@ -147,17 +135,17 @@ public class RocksDbStore implements MetricStore, 
AutoCloseable {
 
         if 
(!(config.containsKey(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY)))
 {
             throw new MetricException("Not a valid RocksDB configuration - 
Missing metadata string cache size "
-                    + 
DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY);
+                                      + 
DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY);
         }
 
         if 
(!config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS)) {
             throw new MetricException("Not a valid RocksDB configuration - 
Missing metric retention "
-                    + DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS);
+                                      + 
DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS);
         }
     }
 
     private String getRocksDbAbsoluteDir(Map<String, Object> conf) throws 
MetricException {
-        String storePath = 
(String)conf.get(DaemonConfig.STORM_ROCKSDB_LOCATION);
+        String storePath = (String) 
conf.get(DaemonConfig.STORM_ROCKSDB_LOCATION);
         if (storePath == null) {
             throw new MetricException("Not a vaild RocksDB configuration - 
Missing store location " + DaemonConfig.STORM_ROCKSDB_LOCATION);
         } else {
@@ -201,7 +189,7 @@ public class RocksDbStore implements MetricStore, 
AutoCloseable {
      * Fill out the numeric values for a metric.
      *
      * @param metric  Metric to populate
-     * @return   true if the metric was populated, false otherwise
+     * @return true if the metric was populated, false otherwise
      * @throws MetricException  if read from database fails
      */
     @Override
@@ -234,7 +222,7 @@ public class RocksDbStore implements MetricStore, 
AutoCloseable {
         }
 
         RocksDbKey key = RocksDbKey.createMetricKey(metric.getAggLevel(), 
topologyId, metric.getTimestamp(), metricId,
-                componentId, executorId, hostId, metric.getPort(), streamId);
+                                                    componentId, executorId, 
hostId, metric.getPort(), streamId);
 
         return populateFromKey(key, metric);
     }
@@ -461,9 +449,9 @@ public class RocksDbStore implements MetricStore, 
AutoCloseable {
         for (AggLevel aggLevel : filter.getAggLevels()) {
 
             RocksDbKey startKey = RocksDbKey.createMetricKey(aggLevel, 
startTopologyId, startTime, startMetricId,
-                    startComponentId, startExecutorId, startHostId, startPort, 
startStreamId);
+                                                             startComponentId, 
startExecutorId, startHostId, startPort, startStreamId);
             RocksDbKey endKey = RocksDbKey.createMetricKey(aggLevel, 
endTopologyId, endTime, endMetricId,
-                    endComponentId, endExecutorId, endHostId, endPort, 
endStreamId);
+                                                           endComponentId, 
endExecutorId, endHostId, endPort, endStreamId);
 
             RocksIterator iterator = db.newIterator(ro);
             for (iterator.seek(startKey.getRaw()); iterator.isValid(); 
iterator.next()) {
@@ -519,7 +507,7 @@ public class RocksDbStore implements MetricStore, 
AutoCloseable {
                         String streamId = 
metadataIdToString(KeyType.STREAM_ID_STRING, key.getStreamId(), 
idToStringCache);
 
                         Metric metric = new Metric(metricName, timestamp, 
topologyId, 0.0, componentId, executorId, hostname,
-                                streamId, key.getPort(), aggLevel);
+                                                   streamId, key.getPort(), 
aggLevel);
 
                         val.populateMetric(metric);
 
@@ -561,7 +549,7 @@ public class RocksDbStore implements MetricStore, 
AutoCloseable {
             s = rdbValue.getMetdataString();
             lookupCache.put(id, s);
             return s;
-        }  catch (RocksDBException e) {
+        } catch (RocksDBException e) {
             if (this.failureMeter != null) {
                 this.failureMeter.mark();
             }
@@ -635,5 +623,9 @@ public class RocksDbStore implements MetricStore, 
AutoCloseable {
             }
         }
     }
+
+    interface RocksDbScanCallback {
+        boolean cb(RocksDbKey key, RocksDbValue val);  // return false to stop 
scan
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbValue.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbValue.java
 
b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbValue.java
index 58b2c76..0da7b92 100644
--- 
a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbValue.java
+++ 
b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbValue.java
@@ -1,19 +1,12 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.metricstore.rocksdb;
@@ -50,11 +43,11 @@ import org.apache.storm.metricstore.Metric;
  */
 
 class RocksDbValue {
-    private static int METRIC_VALUE_SIZE = 41;
-    private byte[] value;
     private static final byte CURRENT_METADATA_VERSION = 0;
     private static final byte CURRENT_METRIC_VERSION = 0;
+    private static int METRIC_VALUE_SIZE = 41;
     private static int MIN_METADATA_VALUE_SIZE = 9;
+    private byte[] value;
 
     /**
      * Constructor from raw data.
@@ -98,7 +91,7 @@ class RocksDbValue {
     /**
      * Get the metadata string portion of the value.  Assumes the value is 
metadata.
      *
-     * @return   the metadata string
+     * @return the metadata string
      */
     String getMetdataString() {
         if (this.value.length < MIN_METADATA_VALUE_SIZE) {

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadata.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadata.java
 
b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadata.java
index 6f54a58..d890582 100644
--- 
a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadata.java
+++ 
b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadata.java
@@ -1,19 +1,12 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.metricstore.rocksdb;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java
 
b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java
index 7ce8435..d70076f 100644
--- 
a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java
+++ 
b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java
@@ -1,19 +1,12 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.metricstore.rocksdb;
@@ -39,12 +32,23 @@ import org.slf4j.LoggerFactory;
  */
 
 public class StringMetadataCache implements 
LruMap.CacheEvictionCallback<String, StringMetadata>,
-        WritableStringMetadataCache, ReadOnlyStringMetadataCache {
+                                            WritableStringMetadataCache, 
ReadOnlyStringMetadataCache {
     private static final Logger LOG = 
LoggerFactory.getLogger(StringMetadataCache.class);
+    private static StringMetadataCache instance = null;
     private Map<String, StringMetadata> lruStringCache;
     private Map<Integer, String> hashToString = new ConcurrentHashMap<>();
     private RocksDbMetricsWriter dbWriter;
-    private static StringMetadataCache instance = null;
+
+    /**
+     * Constructor to create a cache.
+     *
+     * @param dbWriter   The rocks db writer instance the cache should use 
when evicting data
+     * @param capacity  The cache size
+     */
+    private StringMetadataCache(RocksDbMetricsWriter dbWriter, int capacity) {
+        lruStringCache = Collections.synchronizedMap(new LruMap<>(capacity, 
this));
+        this.dbWriter = dbWriter;
+    }
 
     /**
      * Initializes the cache instance.
@@ -87,22 +91,15 @@ public class StringMetadataCache implements 
LruMap.CacheEvictionCallback<String,
         }
     }
 
-    /**
-     * Constructor to create a cache.
-     *
-     * @param dbWriter   The rocks db writer instance the cache should use 
when evicting data
-     * @param capacity  The cache size
-     */
-    private StringMetadataCache(RocksDbMetricsWriter dbWriter, int capacity) {
-        lruStringCache = Collections.synchronizedMap(new LruMap<>(capacity, 
this));
-        this.dbWriter = dbWriter;
+    static void cleanUp() {
+        instance = null;
     }
 
     /**
      * Get the string metadata from the cache.
      *
      * @param s   The string to look for
-     * @return   the metadata associated with the string or null if not found
+     * @return the metadata associated with the string or null if not found
      */
     public StringMetadata get(String s) {
         return lruStringCache.get(s);
@@ -169,7 +166,7 @@ public class StringMetadataCache implements 
LruMap.CacheEvictionCallback<String,
      * Determines if a string Id is contained in the cache.
      *
      * @param stringId   The string Id to check
-     * @return   true if the Id is in the cache, false otherwise
+     * @return true if the Id is in the cache, false otherwise
      */
     public boolean contains(Integer stringId) {
         return hashToString.containsKey(stringId);
@@ -179,7 +176,7 @@ public class StringMetadataCache implements 
LruMap.CacheEvictionCallback<String,
      * Returns the string matching the string Id if in the cache.
      *
      * @param stringId   The string Id to check
-     * @return   the associated string if the Id is in the cache, null 
otherwise
+     * @return the associated string if the Id is in the cache, null otherwise
      */
     public String getMetadataString(Integer stringId) {
         return hashToString.get(stringId);
@@ -188,15 +185,11 @@ public class StringMetadataCache implements 
LruMap.CacheEvictionCallback<String,
     /**
      * Get the map of the cache contents.  Provided to allow writing the data 
to RocksDB on shutdown.
      *
-     * @return   the string metadata map entrySet
+     * @return the string metadata map entrySet
      */
     public Set<Map.Entry<String, StringMetadata>> entrySet() {
         return lruStringCache.entrySet();
     }
 
-    static void cleanUp() {
-        instance = null;
-    }
-
 }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/WritableStringMetadataCache.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/WritableStringMetadataCache.java
 
b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/WritableStringMetadataCache.java
index 2d4165f..ab47fdf 100644
--- 
a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/WritableStringMetadataCache.java
+++ 
b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/WritableStringMetadataCache.java
@@ -1,26 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.metricstore.rocksdb;
 
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.http.annotation.NotThreadSafe;
 import org.apache.storm.metricstore.MetricException;
 
@@ -48,7 +40,7 @@ public interface WritableStringMetadataCache extends 
ReadOnlyStringMetadataCache
     /**
      * Get the map of the cache contents.  Provided to allow writing the data 
to RocksDB on shutdown.
      *
-     * @return   the string metadata map entrySet
+     * @return the string metadata map entrySet
      */
     Set<Map.Entry<String, StringMetadata>> entrySet();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
 
b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
index 558e570..b912513 100644
--- 
a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
+++ 
b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.nimbus;
@@ -27,11 +21,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.daemon.supervisor.Supervisor;
 import org.apache.storm.generated.SupervisorAssignments;
-import org.apache.storm.scheduler.SupervisorDetails;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.SupervisorClient;
@@ -95,6 +87,17 @@ public class AssignmentDistributionService implements 
Closeable {
     private boolean isLocalMode = false; // boolean cache for local mode 
decision
 
     /**
+     * Factory method for initialize a instance.
+     * @param conf config.
+     * @return an instance of {@link AssignmentDistributionService}
+     */
+    public static AssignmentDistributionService getInstance(Map conf) {
+        AssignmentDistributionService service = new 
AssignmentDistributionService();
+        service.prepare(conf);
+        return service;
+    }
+
+    /**
      * Function for initialization.
      *
      * @param conf config
@@ -163,6 +166,47 @@ public class AssignmentDistributionService implements 
Closeable {
         }
     }
 
+    public void addLocalSupervisor(Supervisor supervisor) {
+        this.localSupervisors.put(supervisor.getId(), supervisor);
+    }
+
+    private Integer nextQueueId() {
+        return this.random.nextInt(threadsNum);
+    }
+
+    private LinkedBlockingQueue<NodeAssignments> nextQueue() {
+        return this.assignmentsQueue.get(nextQueueId());
+    }
+
+    private LinkedBlockingQueue<NodeAssignments> getQueueById(Integer 
queueIndex) {
+        return this.assignmentsQueue.get(queueIndex);
+    }
+
+    /**
+     * Get an assignments from the target queue with the specific index.
+     * @param queueIndex index of the queue
+     * @return an {@link NodeAssignments}
+     * @throws InterruptedException
+     */
+    public NodeAssignments nextAssignments(Integer queueIndex) throws 
InterruptedException {
+        NodeAssignments target = null;
+        while (true) {
+            target = getQueueById(queueIndex).poll();
+            if (target != null) {
+                return target;
+            }
+            Time.sleep(100L);
+        }
+    }
+
+    public boolean isActive() {
+        return this.active;
+    }
+
+    public Map getConf() {
+        return this.conf;
+    }
+
     static class NodeAssignments {
         private String node;
         private String host;
@@ -177,7 +221,7 @@ public class AssignmentDistributionService implements 
Closeable {
         }
 
         public static NodeAssignments getInstance(String node, String host, 
Integer serverPort,
-            SupervisorAssignments assignments) {
+                                                  SupervisorAssignments 
assignments) {
             return new NodeAssignments(node, host, serverPort, assignments);
         }
 
@@ -237,12 +281,12 @@ public class AssignmentDistributionService implements 
Closeable {
                     
supervisor.sendSupervisorAssignments(assignments.getAssignments());
                 } else {
                     LOG.error("Can not find node {} for assignments 
distribution", assignments.getNode());
-                    throw new RuntimeException("null for node " + 
assignments.getNode() +  " supervisor instance.");
+                    throw new RuntimeException("null for node " + 
assignments.getNode() + " supervisor instance.");
                 }
             } else {
                 // distributed mode
                 try (SupervisorClient client = 
SupervisorClient.getConfiguredClient(service.getConf(),
-                    assignments.getHost(), assignments.getServerPort())){
+                                                                               
     assignments.getHost(), assignments.getServerPort())) {
                     try {
                         
client.getClient().sendSupervisorAssignments(assignments.getAssignments());
                     } catch (Exception e) {
@@ -257,56 +301,4 @@ public class AssignmentDistributionService implements 
Closeable {
             }
         }
     }
-
-    public void addLocalSupervisor(Supervisor supervisor) {
-        this.localSupervisors.put(supervisor.getId(), supervisor);
-    }
-
-    private Integer nextQueueId() {
-        return this.random.nextInt(threadsNum);
-    }
-
-    private LinkedBlockingQueue<NodeAssignments> nextQueue() {
-        return this.assignmentsQueue.get(nextQueueId());
-    }
-
-    private LinkedBlockingQueue<NodeAssignments> getQueueById(Integer 
queueIndex) {
-        return this.assignmentsQueue.get(queueIndex);
-    }
-
-    /**
-     * Get an assignments from the target queue with the specific index.
-     * @param queueIndex index of the queue
-     * @return an {@link NodeAssignments}
-     * @throws InterruptedException
-     */
-    public NodeAssignments nextAssignments(Integer queueIndex) throws 
InterruptedException {
-        NodeAssignments target = null;
-        while (true) {
-            target = getQueueById(queueIndex).poll();
-            if (target != null) {
-                return target;
-            }
-            Time.sleep(100L);
-        }
-    }
-
-    public boolean isActive() {
-        return this.active;
-    }
-
-    public Map getConf() {
-        return this.conf;
-    }
-
-    /**
-     * Factory method for initialize a instance.
-     * @param conf config.
-     * @return an instance of {@link AssignmentDistributionService}
-     */
-    public static AssignmentDistributionService getInstance(Map conf) {
-        AssignmentDistributionService service = new 
AssignmentDistributionService();
-        service.prepare(conf);
-        return service;
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/nimbus/DefaultTopologyValidator.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/nimbus/DefaultTopologyValidator.java
 
b/storm-server/src/main/java/org/apache/storm/nimbus/DefaultTopologyValidator.java
index ab164ca..7fef7bf 100644
--- 
a/storm-server/src/main/java/org/apache/storm/nimbus/DefaultTopologyValidator.java
+++ 
b/storm-server/src/main/java/org/apache/storm/nimbus/DefaultTopologyValidator.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.nimbus;
@@ -30,7 +24,7 @@ public class DefaultTopologyValidator implements 
ITopologyValidator {
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultTopologyValidator.class);
 
     @Override
-    public void prepare(Map<String, Object> StormConf){
+    public void prepare(Map<String, Object> StormConf) {
     }
 
     @Override
@@ -63,5 +57,5 @@ public class DefaultTopologyValidator implements 
ITopologyValidator {
                 }
             }
         }
-    }    
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/nimbus/ILeaderElector.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/nimbus/ILeaderElector.java 
b/storm-server/src/main/java/org/apache/storm/nimbus/ILeaderElector.java
index e362fbc..05cdbc1 100644
--- a/storm-server/src/main/java/org/apache/storm/nimbus/ILeaderElector.java
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/ILeaderElector.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.nimbus;
 
 import java.io.Closeable;
@@ -62,7 +57,7 @@ public interface ILeaderElector extends Closeable {
      * Get list of current nimbus addresses.
      * @return list of current nimbus addresses, includes leader.
      */
-    List<NimbusInfo> getAllNimbuses()throws Exception;
+    List<NimbusInfo> getAllNimbuses() throws Exception;
 
     /**
      * Method called to allow for cleanup. once close this object can not be 
reused.

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/nimbus/ITopologyActionNotifierPlugin.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/nimbus/ITopologyActionNotifierPlugin.java
 
b/storm-server/src/main/java/org/apache/storm/nimbus/ITopologyActionNotifierPlugin.java
index f36a4a8..8247a85 100644
--- 
a/storm-server/src/main/java/org/apache/storm/nimbus/ITopologyActionNotifierPlugin.java
+++ 
b/storm-server/src/main/java/org/apache/storm/nimbus/ITopologyActionNotifierPlugin.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.nimbus;
 
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/nimbus/ITopologyValidator.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/nimbus/ITopologyValidator.java 
b/storm-server/src/main/java/org/apache/storm/nimbus/ITopologyValidator.java
index 46c4073..88d756d 100644
--- a/storm-server/src/main/java/org/apache/storm/nimbus/ITopologyValidator.java
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/ITopologyValidator.java
@@ -1,30 +1,25 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.nimbus;
 
+import java.util.Map;
 import org.apache.storm.generated.InvalidTopologyException;
 import org.apache.storm.generated.StormTopology;
-import java.util.Map;
 
 public interface ITopologyValidator {
 
     void prepare(Map<String, Object> StormConf);
 
     void validate(String topologyName, Map<String, Object> topologyConf, 
StormTopology topology)
-            throws InvalidTopologyException;
+        throws InvalidTopologyException;
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/nimbus/IWorkerHeartbeatsRecoveryStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/nimbus/IWorkerHeartbeatsRecoveryStrategy.java
 
b/storm-server/src/main/java/org/apache/storm/nimbus/IWorkerHeartbeatsRecoveryStrategy.java
index bde58dc..8457de8 100644
--- 
a/storm-server/src/main/java/org/apache/storm/nimbus/IWorkerHeartbeatsRecoveryStrategy.java
+++ 
b/storm-server/src/main/java/org/apache/storm/nimbus/IWorkerHeartbeatsRecoveryStrategy.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.nimbus;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
 
b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
index 59cd462..68a2391 100644
--- 
a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
+++ 
b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
@@ -1,34 +1,26 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.nimbus;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Sets;
-
-import javax.security.auth.Subject;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
-
+import javax.security.auth.Subject;
 import org.apache.commons.io.IOUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.leader.LeaderLatch;
@@ -54,21 +46,17 @@ import org.slf4j.LoggerFactory;
  */
 public class LeaderListenerCallback {
     private static final Logger LOG = 
LoggerFactory.getLogger(LeaderListenerCallback.class);
-
+    private static final String STORM_JAR_SUFFIX = "-stormjar.jar";
+    private static final String STORM_CODE_SUFFIX = "-stormcode.ser";
+    private static final String STORM_CONF_SUFFIX = "-stormconf.ser";
     private final BlobStore blobStore;
     private final TopoCache tc;
     private final IStormClusterState clusterState;
-
     private final CuratorFramework zk;
     private final LeaderLatch leaderLatch;
-
     private final Map conf;
     private final List<ACL> acls;
 
-    private static final String STORM_JAR_SUFFIX = "-stormjar.jar";
-    private static final String STORM_CODE_SUFFIX = "-stormcode.ser";
-    private static final String STORM_CONF_SUFFIX = "-stormconf.ser";
-
     /**
      * Constructor for {@LeaderListenerCallback}.
      * @param conf config
@@ -103,7 +91,7 @@ public class LeaderListenerCallback {
         clusterState.setAssignmentsBackendSynchronized();
 
         Set<String> activeTopologyIds = new 
TreeSet<>(ClientZookeeper.getChildren(zk,
-            ClusterUtils.STORMS_SUBTREE, false));
+                                                                               
   ClusterUtils.STORMS_SUBTREE, false));
 
         Set<String> activeTopologyBlobKeys = 
populateTopologyBlobKeys(activeTopologyIds);
         Set<String> activeTopologyCodeKeys = 
filterTopologyCodeKeys(activeTopologyBlobKeys);
@@ -113,8 +101,8 @@ public class LeaderListenerCallback {
         // this finds all active topologies blob keys from all local topology 
blob keys
         Sets.SetView<String> diffTopology = 
Sets.difference(activeTopologyBlobKeys, allLocalTopologyBlobKeys);
         LOG.info("active-topology-blobs [{}] local-topology-blobs [{}] 
diff-topology-blobs [{}]",
-                generateJoinedString(activeTopologyIds), 
generateJoinedString(allLocalTopologyBlobKeys),
-                generateJoinedString(diffTopology));
+                 generateJoinedString(activeTopologyIds), 
generateJoinedString(allLocalTopologyBlobKeys),
+                 generateJoinedString(diffTopology));
 
         if (diffTopology.isEmpty()) {
             Set<String> activeTopologyDependencies = 
getTopologyDependencyKeys(activeTopologyCodeKeys);
@@ -122,15 +110,15 @@ public class LeaderListenerCallback {
             // this finds all dependency blob keys from active topologies from 
all local blob keys
             Sets.SetView<String> diffDependencies = 
Sets.difference(activeTopologyDependencies, allLocalBlobKeys);
             LOG.info("active-topology-dependencies [{}] local-blobs [{}] 
diff-topology-dependencies [{}]",
-                    generateJoinedString(activeTopologyDependencies), 
generateJoinedString(allLocalBlobKeys),
-                    generateJoinedString(diffDependencies));
+                     generateJoinedString(activeTopologyDependencies), 
generateJoinedString(allLocalBlobKeys),
+                     generateJoinedString(diffDependencies));
 
             if (diffDependencies.isEmpty()) {
                 LOG.info("Accepting leadership, all active topologies and 
corresponding dependencies found locally.");
                 tc.clear();
             } else {
                 LOG.info("Code for all active topologies is available locally, 
but some dependencies are not found locally, "
-                        + "giving up leadership.");
+                         + "giving up leadership.");
                 closeLatch();
             }
         } else {
@@ -174,8 +162,8 @@ public class LeaderListenerCallback {
         Set<String> topologyBlobKeys = new HashSet<>();
         for (String blobKey : blobKeys) {
             if (blobKey.endsWith(STORM_JAR_SUFFIX)
-                    || blobKey.endsWith(STORM_CODE_SUFFIX)
-                    || blobKey.endsWith(STORM_CONF_SUFFIX)) {
+                || blobKey.endsWith(STORM_CODE_SUFFIX)
+                || blobKey.endsWith(STORM_CONF_SUFFIX)) {
                 topologyBlobKeys.add(blobKey);
             }
         }
@@ -209,12 +197,12 @@ public class LeaderListenerCallback {
                 }
             } catch (AuthorizationException | KeyNotFoundException | 
IOException e) {
                 LOG.error("Exception occurs while reading blob for key: "
-                        + activeTopologyCodeKey
-                        + ", exception: "
-                        + e, e);
+                          + activeTopologyCodeKey
+                          + ", exception: "
+                          + e, e);
                 throw new RuntimeException("Exception occurs while reading 
blob for key: "
-                        + activeTopologyCodeKey
-                        + ", exception: " + e, e);
+                                           + activeTopologyCodeKey
+                                           + ", exception: " + e, e);
             }
         }
         return activeTopologyDependencies;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/nimbus/NimbusHeartbeatsPressureTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/nimbus/NimbusHeartbeatsPressureTest.java
 
b/storm-server/src/main/java/org/apache/storm/nimbus/NimbusHeartbeatsPressureTest.java
index f725f5d..17d4d2f 100644
--- 
a/storm-server/src/main/java/org/apache/storm/nimbus/NimbusHeartbeatsPressureTest.java
+++ 
b/storm-server/src/main/java/org/apache/storm/nimbus/NimbusHeartbeatsPressureTest.java
@@ -1,33 +1,26 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.nimbus;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
-
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import org.apache.storm.Config;
 import org.apache.storm.generated.ExecutorInfo;
 import org.apache.storm.generated.SupervisorWorkerHeartbeat;
@@ -51,6 +44,8 @@ public class NimbusHeartbeatsPressureTest {
     private static int THREAD_SUBMIT_NUM = 1;
     private static int MOCKED_STORM_NUM = 5000;
     private static volatile boolean[] readyFlags = new boolean[THREADS_NUM];
+    private static Random rand = new Random(47);
+    private static List<double[]> totalCostTimesBook = new ArrayList<>();
 
     static {
         for (int i = 0; i < THREADS_NUM; i++) {
@@ -58,9 +53,6 @@ public class NimbusHeartbeatsPressureTest {
         }
     }
 
-    private static Random rand = new Random(47);
-    private static List<double[]> totalCostTimesBook = new ArrayList<>();
-
     /**
      * Initialize a fake config.
      * @return conf
@@ -131,6 +123,53 @@ public class NimbusHeartbeatsPressureTest {
         }
     }
 
+    private static void printTimeCostArray(Double[] array) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("[");
+        for (int i = 0; i < array.length; i++) {
+            if (i != array.length - 1) {
+                builder.append(array[i] + ",");
+            } else {
+                builder.append(array[i] + "");
+            }
+        }
+        builder.append("]");
+        System.out.println(builder.toString());
+    }
+
+    private static void printStatistics(List<double[]> data) {
+
+        List<Double> totalPoints = new ArrayList<>();
+        double total = 0D;
+        for (double[] item : data) {
+            for (Double point : item) {
+                if (point != null) {
+                    totalPoints.add(point);
+                    total += point;
+                }
+            }
+        }
+        Double[] totalPointsArray = new Double[totalPoints.size()];
+
+        totalPoints.toArray(totalPointsArray);
+        Arrays.sort(totalPointsArray);
+        // printTimeCostArray(totalPointsArray);
+        println("===== statistics ================");
+        println("===== min time cost: " + totalPointsArray[0] + " =====");
+        println("===== max time cost: " + 
totalPointsArray[totalPointsArray.length - 2] + " =====");
+
+        double meanVal = total / totalPointsArray.length;
+        println("===== mean time cost: " + meanVal + " =====");
+        int middleIndex = (int) (totalPointsArray.length * 0.5);
+        println("===== median time cost: " + totalPointsArray[middleIndex] + " 
=====");
+        int top90Index = (int) (totalPointsArray.length * 0.9);
+        println("===== top90 time cost: " + totalPointsArray[top90Index] + " 
=====");
+    }
+
+    public static void main(String[] args) {
+        testMaxThroughput();
+    }
+
     static class HeartbeatSendTask implements Runnable {
         private double[] runtimesBook;
         private int taskId;
@@ -191,51 +230,4 @@ public class NimbusHeartbeatsPressureTest {
         }
     }
 
-    private static void printTimeCostArray(Double[] array) {
-        StringBuilder builder = new StringBuilder();
-        builder.append("[");
-        for (int i = 0; i < array.length; i++) {
-            if (i != array.length - 1) {
-                builder.append(array[i] + ",");
-            } else {
-                builder.append(array[i] + "");
-            }
-        }
-        builder.append("]");
-        System.out.println(builder.toString());
-    }
-
-    private static void printStatistics(List<double[]> data) {
-
-        List<Double> totalPoints = new ArrayList<>();
-        double total = 0D;
-        for (double[] item : data) {
-            for (Double point : item) {
-                if (point != null) {
-                    totalPoints.add(point);
-                    total += point;
-                }
-            }
-        }
-        Double[] totalPointsArray = new Double[totalPoints.size()];
-
-        totalPoints.toArray(totalPointsArray);
-        Arrays.sort(totalPointsArray);
-        // printTimeCostArray(totalPointsArray);
-        println("===== statistics ================");
-        println("===== min time cost: " + totalPointsArray[0] + " =====");
-        println("===== max time cost: " + 
totalPointsArray[totalPointsArray.length - 2] + " =====");
-
-        double meanVal = total / totalPointsArray.length;
-        println("===== mean time cost: " + meanVal + " =====");
-        int middleIndex = (int) (totalPointsArray.length * 0.5);
-        println("===== median time cost: " + totalPointsArray[middleIndex] + " 
=====");
-        int top90Index = (int) (totalPointsArray.length * 0.9);
-        println("===== top90 time cost: " + totalPointsArray[top90Index] + " 
=====");
-    }
-
-    public static void main(String[] args) {
-        testMaxThroughput();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/nimbus/StrictTopologyValidator.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/nimbus/StrictTopologyValidator.java
 
b/storm-server/src/main/java/org/apache/storm/nimbus/StrictTopologyValidator.java
index cdf22c7..b7e7117 100644
--- 
a/storm-server/src/main/java/org/apache/storm/nimbus/StrictTopologyValidator.java
+++ 
b/storm-server/src/main/java/org/apache/storm/nimbus/StrictTopologyValidator.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.nimbus;
@@ -30,7 +24,7 @@ public class StrictTopologyValidator implements 
ITopologyValidator {
     private static final Logger LOG = 
LoggerFactory.getLogger(StrictTopologyValidator.class);
 
     @Override
-    public void prepare(Map stormConf){
+    public void prepare(Map stormConf) {
     }
 
     @Override
@@ -63,5 +57,5 @@ public class StrictTopologyValidator implements 
ITopologyValidator {
                 }
             }
         }
-    }    
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java
 
b/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java
index 78f880a..ac01aae 100644
--- 
a/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java
+++ 
b/storm-server/src/main/java/org/apache/storm/nimbus/TimeOutWorkerHeartbeatsRecoveryStrategy.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.nimbus;
@@ -21,11 +15,9 @@ package org.apache.storm.nimbus;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.storm.Config;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.Time;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/nimbus/WorkerHeartbeatsRecoveryStrategyFactory.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/nimbus/WorkerHeartbeatsRecoveryStrategyFactory.java
 
b/storm-server/src/main/java/org/apache/storm/nimbus/WorkerHeartbeatsRecoveryStrategyFactory.java
index 4befaf8..f656089 100644
--- 
a/storm-server/src/main/java/org/apache/storm/nimbus/WorkerHeartbeatsRecoveryStrategyFactory.java
+++ 
b/storm-server/src/main/java/org/apache/storm/nimbus/WorkerHeartbeatsRecoveryStrategyFactory.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.nimbus;
@@ -37,10 +31,10 @@ public class WorkerHeartbeatsRecoveryStrategyFactory {
         IWorkerHeartbeatsRecoveryStrategy strategy;
         if 
(conf.get(DaemonConfig.NIMBUS_WORKER_HEARTBEATS_RECOVERY_STRATEGY_CLASS) != 
null) {
             Object targetObj = ReflectionUtils.newInstance((String)
-                    
conf.get(DaemonConfig.NIMBUS_WORKER_HEARTBEATS_RECOVERY_STRATEGY_CLASS));
+                                                               
conf.get(DaemonConfig.NIMBUS_WORKER_HEARTBEATS_RECOVERY_STRATEGY_CLASS));
             Preconditions.checkState(targetObj instanceof 
IWorkerHeartbeatsRecoveryStrategy,
-                    "{} must implements IWorkerHeartbeatsRecoveryStrategy",
-                    
DaemonConfig.NIMBUS_WORKER_HEARTBEATS_RECOVERY_STRATEGY_CLASS);
+                                     "{} must implements 
IWorkerHeartbeatsRecoveryStrategy",
+                                     
DaemonConfig.NIMBUS_WORKER_HEARTBEATS_RECOVERY_STRATEGY_CLASS);
             strategy = ((IWorkerHeartbeatsRecoveryStrategy) targetObj);
         } else {
             strategy = new TimeOutWorkerHeartbeatsRecoveryStrategy();

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/pacemaker/IServerMessageHandler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/pacemaker/IServerMessageHandler.java
 
b/storm-server/src/main/java/org/apache/storm/pacemaker/IServerMessageHandler.java
index 7dca237..a7ebc1e 100644
--- 
a/storm-server/src/main/java/org/apache/storm/pacemaker/IServerMessageHandler.java
+++ 
b/storm-server/src/main/java/org/apache/storm/pacemaker/IServerMessageHandler.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.pacemaker;
 
 import org.apache.storm.generated.HBMessage;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java 
b/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java
index 095982b..ff283a4 100644
--- a/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java
+++ b/storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java
@@ -1,29 +1,26 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.pacemaker;
 
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.storm.generated.HBMessage;
 import org.apache.storm.generated.HBMessageData;
 import org.apache.storm.generated.HBNodes;
@@ -35,79 +32,81 @@ import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.VersionInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import com.codahale.metrics.ExponentiallyDecayingReservoir;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Meter;
-
 import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
 
 
 public class Pacemaker implements IServerMessageHandler {
 
     private static final Logger LOG = LoggerFactory.getLogger(Pacemaker.class);
-    private Map<String, byte[]> heartbeats;
-    private Map<String, Object> conf;
-
     private final static Meter meterSendPulseCount = 
StormMetricsRegistry.registerMeter("pacemaker:send-pulse-count");
     private final static Meter meterTotalReceivedSize = 
StormMetricsRegistry.registerMeter("pacemaker:total-receive-size");
     private final static Meter meterGetPulseCount = 
StormMetricsRegistry.registerMeter("pacemaker:get-pulse=count");
     private final static Meter meterTotalSentSize = 
StormMetricsRegistry.registerMeter("pacemaker:total-sent-size");
-    private final static Histogram histogramHeartbeatSize = 
StormMetricsRegistry.registerHistogram("pacemaker:heartbeat-size", new 
ExponentiallyDecayingReservoir());
+    private final static Histogram histogramHeartbeatSize =
+        StormMetricsRegistry.registerHistogram("pacemaker:heartbeat-size", new 
ExponentiallyDecayingReservoir());
+    private Map<String, byte[]> heartbeats;
+    private Map<String, Object> conf;
 
 
     public Pacemaker(Map<String, Object> conf) {
         heartbeats = new ConcurrentHashMap();
         this.conf = conf;
         StormMetricsRegistry.registerGauge("pacemaker:size-total-keys",
-                new Callable() {
-                    @Override
-                    public Integer call() throws Exception {
-                        return heartbeats.size();
-                    }
-                });
+                                           new Callable() {
+                                               @Override
+                                               public Integer call() throws 
Exception {
+                                                   return heartbeats.size();
+                                               }
+                                           });
         StormMetricsRegistry.startMetricsReporters(conf);
     }
 
+    public static void main(String[] args) {
+        SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
+        Map<String, Object> conf = 
ConfigUtils.overrideLoginConfigWithSystemProperty(Utils.readStormConfig());
+        final Pacemaker serverHandler = new Pacemaker(conf);
+        serverHandler.launchServer();
+    }
+
     @Override
     public HBMessage handleMessage(HBMessage m, boolean authenticated) {
         HBMessage response = null;
         HBMessageData data = m.get_data();
         switch (m.get_type()) {
-        case CREATE_PATH:
-            response = createPath(data.get_path());
-            break;
-        case EXISTS:
-            response = pathExists(data.get_path(), authenticated);
-            break;
-        case SEND_PULSE:
-            response = sendPulse(data.get_pulse());
-            break;
-        case GET_ALL_PULSE_FOR_PATH:
-            response = getAllPulseForPath(data.get_path(), authenticated);
-            break;
-        case GET_ALL_NODES_FOR_PATH:
-            response = getAllNodesForPath(data.get_path(), authenticated);
-            break;
-        case GET_PULSE:
-            response = getPulse(data.get_path(), authenticated);
-            break;
-        case DELETE_PATH:
-            response = deletePath(data.get_path());
-            break;
-        case DELETE_PULSE_ID:
-            response = deletePulseId(data.get_path());
-            break;
-        default:
-            LOG.info("Got Unexpected Type: {}", m.get_type());
-            break;
+            case CREATE_PATH:
+                response = createPath(data.get_path());
+                break;
+            case EXISTS:
+                response = pathExists(data.get_path(), authenticated);
+                break;
+            case SEND_PULSE:
+                response = sendPulse(data.get_pulse());
+                break;
+            case GET_ALL_PULSE_FOR_PATH:
+                response = getAllPulseForPath(data.get_path(), authenticated);
+                break;
+            case GET_ALL_NODES_FOR_PATH:
+                response = getAllNodesForPath(data.get_path(), authenticated);
+                break;
+            case GET_PULSE:
+                response = getPulse(data.get_path(), authenticated);
+                break;
+            case DELETE_PATH:
+                response = deletePath(data.get_path());
+                break;
+            case DELETE_PULSE_ID:
+                response = deletePulseId(data.get_path());
+                break;
+            default:
+                LOG.info("Got Unexpected Type: {}", m.get_type());
+                break;
         }
-        if (response != null)
+        if (response != null) {
             response.set_message_id(m.get_message_id());
+        }
         return response;
     }
 
-
     private HBMessage createPath(String path) {
         return new HBMessage(HBServerMessageType.CREATE_PATH_RESPONSE, null);
     }
@@ -192,8 +191,9 @@ public class Pacemaker implements IServerMessageHandler {
         String prefix = path.endsWith("/") ? path : (path + "/");
         for (String key : heartbeats.keySet()) {
             String checkKey = key + "/";
-            if (checkKey.indexOf(prefix) == 0)
+            if (checkKey.indexOf(prefix) == 0) {
                 deletePulseId(key);
+            }
         }
         return new HBMessage(HBServerMessageType.DELETE_PATH_RESPONSE, null);
     }
@@ -209,11 +209,4 @@ public class Pacemaker implements IServerMessageHandler {
         return new PacemakerServer(this, conf);
     }
 
-    public static void main(String[] args) {
-        SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
-        Map<String, Object> conf = 
ConfigUtils.overrideLoginConfigWithSystemProperty(Utils.readStormConfig());
-        final Pacemaker serverHandler = new Pacemaker(conf);
-        serverHandler.launchServer();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java 
b/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java
index 2f050db..e4e3c67 100644
--- a/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java
+++ b/storm-server/src/main/java/org/apache/storm/pacemaker/PacemakerServer.java
@@ -1,36 +1,30 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.pacemaker;
 
-import org.apache.storm.Config;
-import org.apache.storm.DaemonConfig;
-import org.apache.storm.generated.HBMessage;
-import org.apache.storm.messaging.netty.ISaslServer;
-import org.apache.storm.messaging.netty.NettyRenameThreadFactory;
-import org.apache.storm.security.auth.AuthUtils;
-import java.lang.InterruptedException;
 import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import javax.security.auth.login.Configuration;
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.generated.HBMessage;
+import org.apache.storm.messaging.netty.ISaslServer;
+import org.apache.storm.messaging.netty.NettyRenameThreadFactory;
 import org.apache.storm.pacemaker.codec.ThriftNettyServerCodec;
+import org.apache.storm.security.auth.AuthUtils;
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelPipelineFactory;
@@ -55,48 +49,47 @@ class PacemakerServer implements ISaslServer {
     private ConcurrentSkipListSet<Channel> authenticated_channels = new 
ConcurrentSkipListSet<Channel>();
     private ThriftNettyServerCodec.AuthMethod authMethod;
 
-    public PacemakerServer(IServerMessageHandler handler, Map<String, Object> 
config){
-        int maxWorkers = (int)config.get(DaemonConfig.PACEMAKER_MAX_THREADS);
-        this.port = (int)config.get(Config.PACEMAKER_PORT);
+    public PacemakerServer(IServerMessageHandler handler, Map<String, Object> 
config) {
+        int maxWorkers = (int) config.get(DaemonConfig.PACEMAKER_MAX_THREADS);
+        this.port = (int) config.get(Config.PACEMAKER_PORT);
         this.handler = handler;
         this.topo_name = "pacemaker_server";
 
-        String auth = (String)config.get(Config.PACEMAKER_AUTH_METHOD);
-        switch(auth) {
-
-        case "DIGEST":
-            Configuration login_conf = AuthUtils.GetConfiguration(config);
-            authMethod = ThriftNettyServerCodec.AuthMethod.DIGEST;
-            this.secret = AuthUtils.makeDigestPayload(login_conf, 
AuthUtils.LOGIN_CONTEXT_PACEMAKER_DIGEST);
-            if(this.secret == null) {
-                LOG.error("Can't start pacemaker server without digest 
secret.");
-                throw new RuntimeException("Can't start pacemaker server 
without digest secret.");
-            }
-            break;
-
-        case "KERBEROS":
-            authMethod = ThriftNettyServerCodec.AuthMethod.KERBEROS;
-            break;
-
-        case "NONE":
-            authMethod = ThriftNettyServerCodec.AuthMethod.NONE;
-            break;
-
-        default:
-            LOG.error("Can't start pacemaker server without proper 
PACEMAKER_AUTH_METHOD.");
-            throw new RuntimeException("Can't start pacemaker server without 
proper PACEMAKER_AUTH_METHOD.");
+        String auth = (String) config.get(Config.PACEMAKER_AUTH_METHOD);
+        switch (auth) {
+
+            case "DIGEST":
+                Configuration login_conf = AuthUtils.GetConfiguration(config);
+                authMethod = ThriftNettyServerCodec.AuthMethod.DIGEST;
+                this.secret = AuthUtils.makeDigestPayload(login_conf, 
AuthUtils.LOGIN_CONTEXT_PACEMAKER_DIGEST);
+                if (this.secret == null) {
+                    LOG.error("Can't start pacemaker server without digest 
secret.");
+                    throw new RuntimeException("Can't start pacemaker server 
without digest secret.");
+                }
+                break;
+
+            case "KERBEROS":
+                authMethod = ThriftNettyServerCodec.AuthMethod.KERBEROS;
+                break;
+
+            case "NONE":
+                authMethod = ThriftNettyServerCodec.AuthMethod.NONE;
+                break;
+
+            default:
+                LOG.error("Can't start pacemaker server without proper 
PACEMAKER_AUTH_METHOD.");
+                throw new RuntimeException("Can't start pacemaker server 
without proper PACEMAKER_AUTH_METHOD.");
         }
 
         ThreadFactory bossFactory = new 
NettyRenameThreadFactory("server-boss");
         ThreadFactory workerFactory = new 
NettyRenameThreadFactory("server-worker");
         NioServerSocketChannelFactory factory;
-        if(maxWorkers > 0) {
+        if (maxWorkers > 0) {
             factory =
                 new 
NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
                                                   
Executors.newCachedThreadPool(workerFactory),
                                                   maxWorkers);
-        }
-        else {
+        } else {
             factory =
                 new 
NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
                                                   
Executors.newCachedThreadPool(workerFactory));
@@ -106,10 +99,10 @@ class PacemakerServer implements ISaslServer {
         bootstrap.setOption("tcpNoDelay", true);
         bootstrap.setOption("sendBufferSize", FIVE_MB_IN_BYTES);
         bootstrap.setOption("keepAlive", true);
-        int thriftMessageMaxSize = 
(Integer)config.get(Config.PACEMAKER_THRIFT_MESSAGE_SIZE_MAX);
+        int thriftMessageMaxSize = (Integer) 
config.get(Config.PACEMAKER_THRIFT_MESSAGE_SIZE_MAX);
         ChannelPipelineFactory pipelineFactory =
             new ThriftNettyServerCodec(this, config, authMethod, 
thriftMessageMaxSize)
-            .pipelineFactory();
+                .pipelineFactory();
         bootstrap.setPipelineFactory(pipelineFactory);
         Channel channel = bootstrap.bind(new InetSocketAddress(port));
         allChannels.add(channel);
@@ -123,11 +116,10 @@ class PacemakerServer implements ISaslServer {
 
     public void cleanPipeline(Channel channel) {
         boolean authenticated = authenticated_channels.contains(channel);
-        if(!authenticated) {
-            if(channel.getPipeline().get(ThriftNettyServerCodec.SASL_HANDLER) 
!= null) {
+        if (!authenticated) {
+            if (channel.getPipeline().get(ThriftNettyServerCodec.SASL_HANDLER) 
!= null) {
                 
channel.getPipeline().remove(ThriftNettyServerCodec.SASL_HANDLER);
-            }
-            else 
if(channel.getPipeline().get(ThriftNettyServerCodec.KERBEROS_HANDLER) != null) {
+            } else if 
(channel.getPipeline().get(ThriftNettyServerCodec.KERBEROS_HANDLER) != null) {
                 
channel.getPipeline().remove(ThriftNettyServerCodec.KERBEROS_HANDLER);
             }
         }
@@ -137,15 +129,14 @@ class PacemakerServer implements ISaslServer {
         cleanPipeline(channel);
 
         boolean authenticated = (authMethod == 
ThriftNettyServerCodec.AuthMethod.NONE) || 
authenticated_channels.contains(channel);
-        HBMessage m = (HBMessage)mesg;
+        HBMessage m = (HBMessage) mesg;
         LOG.debug("received message. Passing to handler. {} : {} : {}",
                   handler.toString(), m.toString(), channel.toString());
         HBMessage response = handler.handleMessage(m, authenticated);
-        if(response != null) {
+        if (response != null) {
             LOG.debug("Got Response from handler: {}", response);
             channel.write(response);
-        }
-        else {
+        } else {
             LOG.info("Got null response from handler handling message: {}", m);
         }
     }

Reply via email to