http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
index 2b0ca40..af9c681 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
@@ -36,16 +36,51 @@ import org.slf4j.LoggerFactory;
 public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy 
implements IStrategy {
     private static final Logger LOG = 
LoggerFactory.getLogger(GenericResourceAwareStrategy.class);
 
+    /**
+     * Implementation of the sortObjectResources method so other strategies 
can reuse it.
+     */
+    public static TreeSet<ObjectResources> sortObjectResourcesImpl(
+        final AllResources allResources, ExecutorDetails exec, TopologyDetails 
topologyDetails,
+        final ExistingScheduleFunc existingScheduleFunc) {
+        AllResources affinityBasedAllResources = new 
AllResources(allResources);
+
+        TreeSet<ObjectResources> sortedObjectResources =
+            new TreeSet<>((o1, o2) -> {
+                int execsScheduled1 = 
existingScheduleFunc.getNumExistingSchedule(o1.id);
+                int execsScheduled2 = 
existingScheduleFunc.getNumExistingSchedule(o2.id);
+                if (execsScheduled1 > execsScheduled2) {
+                    return -1;
+                } else if (execsScheduled1 < execsScheduled2) {
+                    return 1;
+                } else {
+                    double o1Avg = 
allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources);
+                    double o2Avg = 
allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources);
+
+                    if (o1Avg > o2Avg) {
+                        return -1;
+                    } else if (o1Avg < o2Avg) {
+                        return 1;
+                    } else {
+                        return o1.id.compareTo(o2.id);
+                    }
+
+                }
+            });
+        
sortedObjectResources.addAll(affinityBasedAllResources.objectResources);
+        LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
+        return sortedObjectResources;
+    }
+
     @Override
     public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
         prepare(cluster);
         if (nodes.getNodes().size() <= 0) {
             LOG.warn("No available nodes to schedule tasks on!");
             return SchedulingResult.failure(
-                    SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available 
nodes to schedule tasks on!");
+                SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available 
nodes to schedule tasks on!");
         }
         Collection<ExecutorDetails> unassignedExecutors =
-                new HashSet<>(this.cluster.getUnassignedExecutors(td));
+            new HashSet<>(this.cluster.getUnassignedExecutors(td));
         LOG.info("ExecutorsNeedScheduling: {}", unassignedExecutors);
         Collection<ExecutorDetails> scheduledTasks = new ArrayList<>();
         List<Component> spouts = this.getSpouts(td);
@@ -53,7 +88,7 @@ public class GenericResourceAwareStrategy extends 
BaseResourceAwareStrategy impl
         if (spouts.size() == 0) {
             LOG.error("Cannot find a Spout!");
             return SchedulingResult.failure(
-                    SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a 
Spout!");
+                SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a 
Spout!");
         }
 
         //order executors to be scheduled
@@ -64,10 +99,10 @@ public class GenericResourceAwareStrategy extends 
BaseResourceAwareStrategy impl
 
         for (ExecutorDetails exec : orderedExecutors) {
             LOG.debug(
-                    "Attempting to schedule: {} of component {}[ REQ {} ]",
-                    exec,
-                    td.getExecutorToComponent().get(exec),
-                    td.getTaskResourceReqList(exec));
+                "Attempting to schedule: {} of component {}[ REQ {} ]",
+                exec,
+                td.getExecutorToComponent().get(exec),
+                td.getTaskResourceReqList(exec));
             final List<ObjectResources> sortedNodes = this.sortAllNodes(td, 
exec, favoredNodes, unFavoredNodes);
 
             scheduleExecutor(exec, td, scheduledTasks, sortedNodes);
@@ -86,12 +121,12 @@ public class GenericResourceAwareStrategy extends 
BaseResourceAwareStrategy impl
         if (executorsNotScheduled.size() > 0) {
             LOG.error("Not all executors successfully scheduled: {}", 
executorsNotScheduled);
             result =
-                    SchedulingResult.failure(
-                            SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
-                            (td.getExecutors().size() - 
unassignedExecutors.size())
-                                    + "/"
-                                    + td.getExecutors().size()
-                                    + " executors scheduled");
+                SchedulingResult.failure(
+                    SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
+                    (td.getExecutors().size() - unassignedExecutors.size())
+                    + "/"
+                    + td.getExecutors().size()
+                    + " executors scheduled");
         } else {
             LOG.debug("All resources successfully scheduled!");
             result = SchedulingResult.success("Fully Scheduled by " + 
this.getClass().getSimpleName());
@@ -99,68 +134,28 @@ public class GenericResourceAwareStrategy extends 
BaseResourceAwareStrategy impl
         return result;
     }
 
-
     /**
-     * Sort objects by the following two criteria. 1) the number executors of 
the topology that needs
-     * to be scheduled is already on the object (node or rack) in descending 
order. The reasoning to
-     * sort based on criterion 1 is so we schedule the rest of a topology on 
the same object (node or
-     * rack) as the existing executors of the topology. 2) the 
subordinate/subservient resource
-     * availability percentage of a rack in descending order We calculate the 
resource availability
-     * percentage by dividing the resource availability of the object (node or 
rack) by the resource
-     * availability of the entire rack or cluster depending on if object 
references a node or a rack.
-     * How this differs from the DefaultResourceAwareStrategy is that the 
percentage boosts the node or rack
-     * if it is requested by the executor that the sorting is being done for 
and pulls it down if it is not.
-     * By doing this calculation, objects (node or rack) that have exhausted 
or little of one of the
-     * resources mentioned above will be ranked after racks that have more 
balanced resource
-     * availability and nodes or racks that have resources that are not 
requested will be ranked below
-     * . So we will be less likely to pick a rack that have a lot of one 
resource but a
-     * low amount of another and have a lot of resources that are not 
requested by the executor.
+     * Sort objects by the following two criteria. 1) the number executors of 
the topology that needs to be scheduled is already on the
+     * object (node or rack) in descending order. The reasoning to sort based 
on criterion 1 is so we schedule the rest of a topology on the
+     * same object (node or rack) as the existing executors of the topology. 
2) the subordinate/subservient resource availability percentage
+     * of a rack in descending order We calculate the resource availability 
percentage by dividing the resource availability of the object
+     * (node or rack) by the resource availability of the entire rack or 
cluster depending on if object references a node or a rack. How
+     * this differs from the DefaultResourceAwareStrategy is that the 
percentage boosts the node or rack if it is requested by the executor
+     * that the sorting is being done for and pulls it down if it is not. By 
doing this calculation, objects (node or rack) that have
+     * exhausted or little of one of the resources mentioned above will be 
ranked after racks that have more balanced resource availability
+     * and nodes or racks that have resources that are not requested will be 
ranked below . So we will be less likely to pick a rack that
+     * have a lot of one resource but a low amount of another and have a lot 
of resources that are not requested by the executor.
      *
-     * @param allResources contains all individual ObjectResources as well as 
cumulative stats
-     * @param exec executor for which the sorting is done
-     * @param topologyDetails topologyDetails for the above executor
+     * @param allResources         contains all individual ObjectResources as 
well as cumulative stats
+     * @param exec                 executor for which the sorting is done
+     * @param topologyDetails      topologyDetails for the above executor
      * @param existingScheduleFunc a function to get existing executors 
already scheduled on this object
      * @return a sorted list of ObjectResources
      */
     @Override
     protected TreeSet<ObjectResources> sortObjectResources(
-            final AllResources allResources, ExecutorDetails exec, 
TopologyDetails topologyDetails,
-            final ExistingScheduleFunc existingScheduleFunc) {
-        return sortObjectResourcesImpl(allResources, exec, topologyDetails, 
existingScheduleFunc);
-    }
-
-    /**
-     * Implementation of the sortObjectResources method so other strategies 
can reuse it.
-     */
-    public static TreeSet<ObjectResources> sortObjectResourcesImpl(
         final AllResources allResources, ExecutorDetails exec, TopologyDetails 
topologyDetails,
         final ExistingScheduleFunc existingScheduleFunc) {
-        AllResources affinityBasedAllResources = new 
AllResources(allResources);
-
-        TreeSet<ObjectResources> sortedObjectResources =
-                new TreeSet<>((o1, o2) -> {
-                    int execsScheduled1 = 
existingScheduleFunc.getNumExistingSchedule(o1.id);
-                    int execsScheduled2 = 
existingScheduleFunc.getNumExistingSchedule(o2.id);
-                    if (execsScheduled1 > execsScheduled2) {
-                        return -1;
-                    } else if (execsScheduled1 < execsScheduled2) {
-                        return 1;
-                    } else {
-                        double o1Avg = 
allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o1.availableResources);
-                        double o2Avg = 
allResources.availableResourcesOverall.calculateAveragePercentageUsedBy(o2.availableResources);
-
-                        if (o1Avg > o2Avg) {
-                            return -1;
-                        } else if (o1Avg < o2Avg) {
-                            return 1;
-                        } else {
-                            return o1.id.compareTo(o2.id);
-                        }
-
-                    }
-                });
-        
sortedObjectResources.addAll(affinityBasedAllResources.objectResources);
-        LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
-        return sortedObjectResources;
+        return sortObjectResourcesImpl(allResources, exec, topologyDetails, 
existingScheduleFunc);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
index 53859fd..2cdbb32 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
@@ -1,25 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.resource.strategies.scheduling;
 
 import java.util.Map;
-
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.resource.SchedulingResult;
@@ -29,13 +22,13 @@ import org.apache.storm.scheduler.resource.SchedulingResult;
  * In the future strategies will be pluggable
  */
 public interface IStrategy {
-    
+
     /**
      * Prepare the Strategy for scheduling.
      * @param config the cluster configuration
      */
     void prepare(Map<String, Object> config);
-    
+
     /**
      * This method is invoked to calculate a scheduling for topology td.  
Cluster will reject any changes that are
      * not for the given topology.  Any changes made to the cluster will be 
committed if the scheduling is successful.

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java
index 5d3dfbe..cfb6bb7 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.utils;
@@ -35,9 +29,7 @@ import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.utils.URIBuilder;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.util.EntityUtils;
-import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
-import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ServerConfigUtils;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
@@ -84,7 +76,7 @@ public class ArtifactoryConfigLoader implements IConfigLoader 
{
         }
         Integer thisPollTime = (Integer) 
conf.get(DaemonConfig.SCHEDULER_CONFIG_LOADER_POLLTIME_SECS);
         if (thisPollTime != null) {
-            artifactoryPollTimeSecs =thisPollTime;
+            artifactoryPollTimeSecs = thisPollTime;
         }
         String thisBase = (String) 
conf.get(DaemonConfig.SCHEDULER_CONFIG_LOADER_ARTIFACTORY_BASE_DIRECTORY);
         if (thisBase != null) {
@@ -97,7 +89,7 @@ public class ArtifactoryConfigLoader implements IConfigLoader 
{
             try {
                 targetURI = new URI(uriString);
                 scheme = 
targetURI.getScheme().substring(ARTIFACTORY_SCHEME_PREFIX.length());
-            } catch(URISyntaxException e) {
+            } catch (URISyntaxException e) {
                 LOG.error("Failed to parse uri={}", uriString);
             }
         }
@@ -119,7 +111,7 @@ public class ArtifactoryConfigLoader implements 
IConfigLoader {
         int currentTimeSecs = Time.currentTimeSecs();
         if (lastReturnedValue != null && ((currentTimeSecs - lastReturnedTime) 
< artifactoryPollTimeSecs)) {
             LOG.debug("currentTimeSecs: {}; lastReturnedTime {}; 
artifactoryPollTimeSecs: {}. Returning our last map.",
-                    currentTimeSecs, lastReturnedTime, 
artifactoryPollTimeSecs);
+                      currentTimeSecs, lastReturnedTime, 
artifactoryPollTimeSecs);
             return (Map<String, Object>) lastReturnedValue.get(configKey);
         }
 
@@ -134,41 +126,6 @@ public class ArtifactoryConfigLoader implements 
IConfigLoader {
         return null;
     }
 
-
-    /**
-     * A private class used to check the response coming back from httpclient.
-     */
-    private static class GETStringResponseHandler implements 
ResponseHandler<String> {
-        private static GETStringResponseHandler singleton = null;
-
-        /**
-         * @return a singleton httpclient GET response handler
-         */
-        public static GETStringResponseHandler getInstance() {
-            if (singleton == null) {
-                singleton = new GETStringResponseHandler();
-            }
-            return singleton;
-        }
-
-        /**
-         * @param response The http response to verify.
-         * @return null on failure or the response string if return code is in 
200 range
-         */
-        @Override
-        public String handleResponse(final HttpResponse response) throws 
IOException {
-            int status = response.getStatusLine().getStatusCode();
-            HttpEntity entity = response.getEntity();
-            String entityString = (entity != null ? 
EntityUtils.toString(entity) : null);
-            if (status >= 200 && status < 300) {
-                return entityString;
-            } else {
-                LOG.error("Got unexpected response code {}; entity: {}", 
status, entityString);
-                return null;
-            }
-        }
-    }
-
     /**
      * @param api null if we are trying to download artifact, otherwise a 
string to call REST api,
      *        e.g. "/api/storage"
@@ -232,13 +189,6 @@ public class ArtifactoryConfigLoader implements 
IConfigLoader {
         return returnValue;
     }
 
-    private class DirEntryCompare implements Comparator<JSONObject> {
-        @Override
-        public int compare(JSONObject o1, JSONObject o2) {
-            return ((String)o1.get("uri")).compareTo((String)o2.get("uri"));
-        }
-    }
-
     private String loadMostRecentArtifact(String location, String host, 
Integer port) {
         // Is this a directory or is it a file?
         JSONObject json = getArtifactMetadata(location, host, port);
@@ -256,7 +206,7 @@ public class ArtifactoryConfigLoader implements 
IConfigLoader {
             return returnValue;
         }
 
-        // This should mean that we were pointed at a directory.  
+        // This should mean that we were pointed at a directory.
         // Find the most recent child and load that.
         JSONArray msg = (JSONArray) json.get("children");
         if (msg == null || msg.size() == 0) {
@@ -336,13 +286,13 @@ public class ArtifactoryConfigLoader implements 
IConfigLoader {
         // First make the cache dir
         String localDirName = ServerConfigUtils.masterLocalDir(conf) + 
File.separator + LOCAL_ARTIFACT_DIR;
         File dir = new File(localDirName);
-        if (! dir.exists()) {
+        if (!dir.exists()) {
             dir.mkdirs();
         }
 
         localCacheDir = localDirName + File.separator + 
location.replaceAll(File.separator, "_");
         dir = new File(localCacheDir);
-        if (! dir.exists()) {
+        if (!dir.exists()) {
             dir.mkdir();
         }
         cacheInitialized = true;
@@ -388,4 +338,45 @@ public class ArtifactoryConfigLoader implements 
IConfigLoader {
 
         return null;
     }
+
+    /**
+     * A private class used to check the response coming back from httpclient.
+     */
+    private static class GETStringResponseHandler implements 
ResponseHandler<String> {
+        private static GETStringResponseHandler singleton = null;
+
+        /**
+         * @return a singleton httpclient GET response handler
+         */
+        public static GETStringResponseHandler getInstance() {
+            if (singleton == null) {
+                singleton = new GETStringResponseHandler();
+            }
+            return singleton;
+        }
+
+        /**
+         * @param response The http response to verify.
+         * @return null on failure or the response string if return code is in 
200 range
+         */
+        @Override
+        public String handleResponse(final HttpResponse response) throws 
IOException {
+            int status = response.getStatusLine().getStatusCode();
+            HttpEntity entity = response.getEntity();
+            String entityString = (entity != null ? 
EntityUtils.toString(entity) : null);
+            if (status >= 200 && status < 300) {
+                return entityString;
+            } else {
+                LOG.error("Got unexpected response code {}; entity: {}", 
status, entityString);
+                return null;
+            }
+        }
+    }
+
+    private class DirEntryCompare implements Comparator<JSONObject> {
+        @Override
+        public int compare(JSONObject o1, JSONObject o2) {
+            return ((String) o1.get("uri")).compareTo((String) o2.get("uri"));
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderFactory.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderFactory.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderFactory.java
index 6494ae2..71b9eb5 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderFactory.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderFactory.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.utils;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/utils/ConfigLoaderFactoryService.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/utils/ConfigLoaderFactoryService.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ConfigLoaderFactoryService.java
index 53df78b..4e1b3a5 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/utils/ConfigLoaderFactoryService.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ConfigLoaderFactoryService.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.utils;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoader.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoader.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoader.java
index a75e92b..8e4c9b9 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoader.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoader.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.utils;
@@ -21,7 +15,6 @@ package org.apache.storm.scheduler.utils;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Map;
-
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoaderFactory.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoaderFactory.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoaderFactory.java
index 9a2ac5c..c9abd1b 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoaderFactory.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoaderFactory.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.utils;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoader.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoader.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoader.java
index 5071133..6bfd6be 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoader.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoader.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.utils;
@@ -27,6 +21,6 @@ public interface IConfigLoader {
      * @param configKey The key from which we want to get the scheduler config.
      * @return The scheduler configs
      */
-    Map<?,?> load(String configKey);
+    Map<?, ?> load(String configKey);
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoaderFactory.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoaderFactory.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoaderFactory.java
index a039870..115c050 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoaderFactory.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoaderFactory.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.scheduler.utils;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/security/auth/workertoken/WorkerTokenManager.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/security/auth/workertoken/WorkerTokenManager.java
 
b/storm-server/src/main/java/org/apache/storm/security/auth/workertoken/WorkerTokenManager.java
index dac0c91..8997e6f 100644
--- 
a/storm-server/src/main/java/org/apache/storm/security/auth/workertoken/WorkerTokenManager.java
+++ 
b/storm-server/src/main/java/org/apache/storm/security/auth/workertoken/WorkerTokenManager.java
@@ -43,44 +43,21 @@ public class WorkerTokenManager {
     private static final Logger LOG = 
LoggerFactory.getLogger(WorkerTokenManager.class);
 
     /**
-     * The length of the random keys to use in bits.
-     * This should be at least the length of 
WorkerTokenSigner.DEFAULT_HMAC_ALGORITHM.
+     * The length of the random keys to use in bits. This should be at least 
the length of WorkerTokenSigner.DEFAULT_HMAC_ALGORITHM.
      */
     private static final int KEY_LENGTH = 256;
-
-    /**
-     * Generate a new random secret key.
-     * @return the new key
-     */
-    protected SecretKey generateSecret() {
-        SecretKey key;
-        synchronized (keyGen) {
-            key = keyGen.generateKey();
-        }
-        return key;
-    }
-
-    /**
-     * Get the secret that should be used to sign a token.  This may either 
reuse a secret or generate a new one so any user should
-     * call this once and save the result.
-     * @return the key to use.
-     */
-    protected SecretKey getCurrentSecret() {
-        return generateSecret();
-    }
-
     /**
      * Key generator to use.
      */
     private final KeyGenerator keyGen;
     private final IStormClusterState state;
     private final long tokenLifetimeMillis;
-
     /**
-     * Constructor.  This assumes that state can store the tokens securely, 
and that they should be enabled at all.
-     * Please use AuthUtils.areWorkerTokensEnabledServer to validate this 
first.
+     * Constructor.  This assumes that state can store the tokens securely, 
and that they should be enabled at all. Please use
+     * AuthUtils.areWorkerTokensEnabledServer to validate this first.
+     *
      * @param daemonConf the config for nimbus.
-     * @param state the state used to store private keys.
+     * @param state      the state used to store private keys.
      */
     public WorkerTokenManager(Map<String, Object> daemonConf, 
IStormClusterState state) {
         this.state = state;
@@ -91,15 +68,39 @@ public class WorkerTokenManager {
             throw new IllegalArgumentException("Can't find " + 
WorkerTokenSigner.DEFAULT_HMAC_ALGORITHM + " algorithm.");
         }
         this.tokenLifetimeMillis = TimeUnit.MILLISECONDS.convert(
-            
ObjectReader.getLong(daemonConf.get(DaemonConfig.STORM_WORKER_TOKEN_LIFE_TIME_HOURS),24L),
+            
ObjectReader.getLong(daemonConf.get(DaemonConfig.STORM_WORKER_TOKEN_LIFE_TIME_HOURS),
 24L),
             TimeUnit.HOURS);
     }
 
     /**
+     * Generate a new random secret key.
+     *
+     * @return the new key
+     */
+    protected SecretKey generateSecret() {
+        SecretKey key;
+        synchronized (keyGen) {
+            key = keyGen.generateKey();
+        }
+        return key;
+    }
+
+    /**
+     * Get the secret that should be used to sign a token.  This may either 
reuse a secret or generate a new one so any user should call
+     * this once and save the result.
+     *
+     * @return the key to use.
+     */
+    protected SecretKey getCurrentSecret() {
+        return generateSecret();
+    }
+
+    /**
      * Create or update an existing key.
+     *
      * @param serviceType the type of service to create a token for
-     * @param user the user the token is for
-     * @param topologyId the topology the token is for
+     * @param user        the user the token is for
+     * @param topologyId  the topology the token is for
      * @return a newly generated token that should be good to start using form 
now until it expires.
      */
     public WorkerToken createOrUpdateTokenFor(WorkerTokenServiceType 
serviceType, String user, String topologyId) {
@@ -118,6 +119,7 @@ public class WorkerTokenManager {
 
     /**
      * Get the maximum expiration token time that should be renewed.
+     *
      * @return any token with an expiration less than the returned value 
should be renewed.
      */
     public long getMaxExpirationTimeForRenewal() {

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/testing/CompleteTopologyParam.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/testing/CompleteTopologyParam.java
 
b/storm-server/src/main/java/org/apache/storm/testing/CompleteTopologyParam.java
index 861e21a..eafb315 100644
--- 
a/storm-server/src/main/java/org/apache/storm/testing/CompleteTopologyParam.java
+++ 
b/storm-server/src/main/java/org/apache/storm/testing/CompleteTopologyParam.java
@@ -1,24 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.testing;
 
 import java.util.Map;
-
 import org.apache.storm.Config;
 import org.apache.storm.Testing;
 
@@ -26,78 +20,78 @@ import org.apache.storm.Testing;
  * The param class for the `Testing.completeTopology`.
  */
 public class CompleteTopologyParam {
-       /**
-        * The mocked spout sources
-        */
-       private MockedSources mockedSources = new MockedSources();
-       /**
-        * the config for the topology when it was submitted to the cluster
-        */
-       private Map<String, Object> topoConf = new Config();
-       /**
-        * whether cleanup the state?
-        */
-       private boolean cleanupState = true;
-       /**
-        * the topology name you want to submit to the cluster
-        */
-       private String topologyName;
+    /**
+     * The mocked spout sources
+     */
+    private MockedSources mockedSources = new MockedSources();
+    /**
+     * the config for the topology when it was submitted to the cluster
+     */
+    private Map<String, Object> topoConf = new Config();
+    /**
+     * whether cleanup the state?
+     */
+    private boolean cleanupState = true;
+    /**
+     * the topology name you want to submit to the cluster
+     */
+    private String topologyName;
+
+    /**
+     * the timeout of topology you want to submit to the cluster
+     */
+    private int timeoutMs = Testing.TEST_TIMEOUT_MS;
 
-       /**
-        * the timeout of topology you want to submit to the cluster
-        */
-       private int timeoutMs = Testing.TEST_TIMEOUT_MS;
+    public MockedSources getMockedSources() {
+        return mockedSources;
+    }
 
-       public MockedSources getMockedSources() {
-               return mockedSources;
-       }
+    public void setMockedSources(MockedSources mockedSources) {
+        if (mockedSources == null) {
+            mockedSources = new MockedSources();
+        }
 
-       public void setMockedSources(MockedSources mockedSources) {
-           if (mockedSources == null) {
-               mockedSources = new MockedSources();
-           }
-              
-               this.mockedSources = mockedSources;
-       }
+        this.mockedSources = mockedSources;
+    }
 
-       public Map<String, Object> getStormConf() {
-               return topoConf;
-       }
+    public Map<String, Object> getStormConf() {
+        return topoConf;
+    }
 
-       public void setStormConf(Map<String, Object> topoConf) {
-           if (topoConf == null) {
-               topoConf = new Config();
-           }
-               this.topoConf = topoConf;
-       }
+    public void setStormConf(Map<String, Object> topoConf) {
+        if (topoConf == null) {
+            topoConf = new Config();
+        }
+        this.topoConf = topoConf;
+    }
 
-       public boolean getCleanupState() {
-               return cleanupState;
-       }
+    public boolean getCleanupState() {
+        return cleanupState;
+    }
 
-       public void setCleanupState(Boolean cleanupState) {
-           if (cleanupState == null) {
-               cleanupState = true;
-           }
-               this.cleanupState = cleanupState;
-       }
+    public void setCleanupState(Boolean cleanupState) {
+        if (cleanupState == null) {
+            cleanupState = true;
+        }
+        this.cleanupState = cleanupState;
+    }
 
-       public String getTopologyName() {
-               return topologyName;
-       }
+    public String getTopologyName() {
+        return topologyName;
+    }
 
-       public void setTopologyName(String topologyName) {
-               this.topologyName = topologyName;
-       }
+    public void setTopologyName(String topologyName) {
+        this.topologyName = topologyName;
+    }
 
-       public Integer getTimeoutMs() {
-               return timeoutMs;
-       }
+    public Integer getTimeoutMs() {
+        return timeoutMs;
+    }
 
-       public void setTimeoutMs(Integer timeoutMs) {
-           if (timeoutMs == null) {
-               timeoutMs = Testing.TEST_TIMEOUT_MS;
-           }
-               this.timeoutMs = timeoutMs;
-       }
+    public void setTimeoutMs(Integer timeoutMs) {
+        if (timeoutMs == null) {
+            timeoutMs = Testing.TEST_TIMEOUT_MS;
+        }
+        this.timeoutMs = timeoutMs;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/testing/InProcessZookeeper.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/testing/InProcessZookeeper.java 
b/storm-server/src/main/java/org/apache/storm/testing/InProcessZookeeper.java
index 032f09c..38db6da 100644
--- 
a/storm-server/src/main/java/org/apache/storm/testing/InProcessZookeeper.java
+++ 
b/storm-server/src/main/java/org/apache/storm/testing/InProcessZookeeper.java
@@ -1,25 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.testing;
 
 import java.util.List;
-
 import org.apache.storm.zookeeper.Zookeeper;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 
@@ -30,11 +23,11 @@ import org.apache.zookeeper.server.NIOServerCnxnFactory;
  * }
  */
 public class InProcessZookeeper implements AutoCloseable {
-    
+
     private final TmpPath zkTmp;
     private final NIOServerCnxnFactory zookeeper;
     private final long zkPort;
-    
+
     public InProcessZookeeper() throws Exception {
         zkTmp = new TmpPath();
         @SuppressWarnings("unchecked")
@@ -42,7 +35,7 @@ public class InProcessZookeeper implements AutoCloseable {
         zkPort = (Long) portAndHandle.get(0);
         zookeeper = (NIOServerCnxnFactory) portAndHandle.get(1);
     }
-    
+
     /**
      * @return the port ZK is listening on (localhost)
      */
@@ -55,5 +48,5 @@ public class InProcessZookeeper implements AutoCloseable {
         Zookeeper.shutdownInprocessZookeeper(zookeeper);
         zkTmp.close();
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/testing/TestJob.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/testing/TestJob.java 
b/storm-server/src/main/java/org/apache/storm/testing/TestJob.java
index 79288ad..9c118e8 100644
--- a/storm-server/src/main/java/org/apache/storm/testing/TestJob.java
+++ b/storm-server/src/main/java/org/apache/storm/testing/TestJob.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.testing;
 
 import org.apache.storm.ILocalCluster;
@@ -33,11 +28,11 @@ import org.apache.storm.ILocalCluster;
  * ```
  */
 public interface TestJob {
-       /**
-        * run the testing logic with the cluster.
-        * 
-        * @param cluster the cluster which created by 
<code>Testing.withSimulatedTimeLocalCluster</code>
-        *        and <code>Testing.withTrackedCluster</code>.
-        */
+    /**
+     * run the testing logic with the cluster.
+     *
+     * @param cluster the cluster which created by 
<code>Testing.withSimulatedTimeLocalCluster</code>
+     *        and <code>Testing.withTrackedCluster</code>.
+     */
     public void run(ILocalCluster cluster) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/testing/TrackedTopology.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/testing/TrackedTopology.java 
b/storm-server/src/main/java/org/apache/storm/testing/TrackedTopology.java
index 00bd1e7..1c6f738 100644
--- a/storm-server/src/main/java/org/apache/storm/testing/TrackedTopology.java
+++ b/storm-server/src/main/java/org/apache/storm/testing/TrackedTopology.java
@@ -1,29 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.testing;
 
-import static org.apache.storm.Testing.whileTimeout;
+package org.apache.storm.testing;
 
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.storm.ILocalCluster;
 import org.apache.storm.Testing;
 import org.apache.storm.Thrift;
@@ -37,6 +29,8 @@ import org.apache.storm.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.storm.Testing.whileTimeout;
+
 /**
  * A tracked topology keeps metrics for every bolt and spout.
  * This allows a test to know how many tuples have been fully processed.
@@ -48,88 +42,88 @@ public class TrackedTopology {
     private final StormTopology topology;
     private final AtomicInteger lastSpoutCommit;
     private final ILocalCluster cluster;
-    
+
     /**
      * Create a new topology to be tracked.
      * @param origTopo the original topology.
      * @param cluster a cluster that should have been launched with tracking 
enabled.
      */
-       public TrackedTopology(StormTopology origTopo, ILocalCluster cluster) {
-           LOG.warn("CLUSTER {} - {}", cluster, cluster.getTrackedId());
-           this.cluster = cluster;
-           lastSpoutCommit = new AtomicInteger(0);
-           String id = cluster.getTrackedId();
-           topology = origTopo.deepCopy();
-           for (Bolt bolt: topology.get_bolts().values()) {
-               IRichBolt obj = (IRichBolt) 
Thrift.deserializeComponentObject(bolt.get_bolt_object());
-               bolt.set_bolt_object(Thrift.serializeComponentObject(new 
BoltTracker(obj, id)));
-           }
-           for (SpoutSpec spout: topology.get_spouts().values()) {
-               IRichSpout obj = (IRichSpout) 
Thrift.deserializeComponentObject(spout.get_spout_object());
-               spout.set_spout_object(Thrift.serializeComponentObject(new 
SpoutTracker(obj, id)));
-           }
-       }
+    public TrackedTopology(StormTopology origTopo, ILocalCluster cluster) {
+        LOG.warn("CLUSTER {} - {}", cluster, cluster.getTrackedId());
+        this.cluster = cluster;
+        lastSpoutCommit = new AtomicInteger(0);
+        String id = cluster.getTrackedId();
+        topology = origTopo.deepCopy();
+        for (Bolt bolt : topology.get_bolts().values()) {
+            IRichBolt obj = (IRichBolt) 
Thrift.deserializeComponentObject(bolt.get_bolt_object());
+            bolt.set_bolt_object(Thrift.serializeComponentObject(new 
BoltTracker(obj, id)));
+        }
+        for (SpoutSpec spout : topology.get_spouts().values()) {
+            IRichSpout obj = (IRichSpout) 
Thrift.deserializeComponentObject(spout.get_spout_object());
+            spout.set_spout_object(Thrift.serializeComponentObject(new 
SpoutTracker(obj, id)));
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static int globalAmt(String id, String key) {
+        LOG.warn("Reading tracked metrics for ID {}", id);
+        return ((ConcurrentHashMap<String, AtomicInteger>) 
RegisteredGlobalState.getState(id)).get(key).get();
+    }
 
     public StormTopology getTopology() {
-               return topology;
+        return topology;
+    }
+
+    public ILocalCluster getCluster() {
+        return cluster;
+    }
+
+    /**
+     * Wait for 1 tuple to be fully processed
+     */
+    public void trackedWait() {
+        trackedWait(1, Testing.TEST_TIMEOUT_MS);
+    }
+
+    /**
+     * Wait for amt tuples to be fully processed.
+     */
+    public void trackedWait(int amt) {
+        trackedWait(amt, Testing.TEST_TIMEOUT_MS);
+    }
+
+    /**
+     * Wait for amt tuples to be fully processed timeoutMs happens.
+     */
+    public void trackedWait(int amt, int timeoutMs) {
+        final int target = amt + lastSpoutCommit.get();
+        final String id = cluster.getTrackedId();
+        Random rand = ThreadLocalRandom.current();
+        whileTimeout(timeoutMs,
+                     () -> {
+                         int se = globalAmt(id, "spout-emitted");
+                         int transferred = globalAmt(id, "transferred");
+                         int processed = globalAmt(id, "processed");
+                         LOG.info("emitted {} target {} transferred {} 
processed {}", se, target, transferred, processed);
+                         return (target != se) || (transferred != processed);
+                     },
+                     () -> {
+                         Time.advanceTimeSecs(1);
+                         try {
+                             Thread.sleep(rand.nextInt(200));
+                         } catch (Exception e) {
+                             throw new RuntimeException(e);
+                         }
+                     });
+        lastSpoutCommit.set(target);
     }
-       
-       public ILocalCluster getCluster() {
-           return cluster;
-       }
-       
-       /**
-        * Wait for 1 tuple to be fully processed
-        */
-       public void trackedWait() {
-           trackedWait(1, Testing.TEST_TIMEOUT_MS);
-       }
-       
-       /**
-        * Wait for amt tuples to be fully processed.
-        */
-       public void trackedWait(int amt) {
-           trackedWait(amt, Testing.TEST_TIMEOUT_MS);
-       }
-       
-       /**
-        * Wait for amt tuples to be fully processed timeoutMs happens.
-        */
-       public void trackedWait(int amt, int timeoutMs) {
-           final int target = amt + lastSpoutCommit.get();
-           final String id = cluster.getTrackedId();
-           Random rand = ThreadLocalRandom.current();
-           whileTimeout(timeoutMs,
-                   () -> {
-                       int se = globalAmt(id, "spout-emitted");
-                       int transferred = globalAmt(id, "transferred");
-                       int processed = globalAmt(id, "processed");
-                       LOG.info("emitted {} target {} transferred {} processed 
{}", se, target, transferred, processed);
-                       return (target != se) || (transferred != processed);
-                   },
-                   () -> {
-                       Time.advanceTimeSecs(1);
-                       try {
-                        Thread.sleep(rand.nextInt(200));
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
-                    }
-                   });
-           lastSpoutCommit.set(target);
-       }
 
-       /**
+    /**
      * Read a metric from the tracked cluster (NOT JUST THIS TOPOLOGY)
      * @param key one of "spout-emitted", "processed", or "transferred"
      * @return the amount of that metric
      */
-       public int globalAmt(String key) {
-           return globalAmt(cluster.getTrackedId(), key);
-       }
-
-    @SuppressWarnings("unchecked")
-    private static int globalAmt(String id, String key) {
-        LOG.warn("Reading tracked metrics for ID {}", id);
-        return ((ConcurrentHashMap<String, 
AtomicInteger>)RegisteredGlobalState.getState(id)).get(key).get();
+    public int globalAmt(String key) {
+        return globalAmt(cluster.getTrackedId(), key);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/utils/BufferInputStream.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/utils/BufferInputStream.java 
b/storm-server/src/main/java/org/apache/storm/utils/BufferInputStream.java
index cd230c3..5bf91ca 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/BufferInputStream.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/BufferInputStream.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.utils;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/utils/LruMap.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/utils/LruMap.java 
b/storm-server/src/main/java/org/apache/storm/utils/LruMap.java
index 3ed5d06..ea0ea54 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/LruMap.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/LruMap.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.utils;
@@ -40,7 +34,7 @@ public class LruMap<A, B> extends LinkedHashMap<A, B> {
         this(maxSize);
         this.evCb = evictionCallback;
     }
-    
+
     @Override
     protected boolean removeEldestEntry(final Map.Entry<A, B> eldest) {
         boolean evict = size() > this.maxSize;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/utils/ServerConfigUtils.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/utils/ServerConfigUtils.java 
b/storm-server/src/main/java/org/apache/storm/utils/ServerConfigUtils.java
index dcf3d09..1df19cb 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerConfigUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerConfigUtils.java
@@ -27,13 +27,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
 
 
-
 public class ServerConfigUtils {
     public static final String FILE_SEPARATOR = File.separator;
     public static final String NIMBUS_DO_NOT_REASSIGN = 
"NIMBUS-DO-NOT-REASSIGN";
@@ -44,9 +42,9 @@ public class ServerConfigUtils {
     private static ServerConfigUtils _instance = new ServerConfigUtils();
 
     /**
-     * Provide an instance of this class for delegates to use.  To mock out
-     * delegated methods, provide an instance of a subclass that overrides the
-     * implementation of the delegated method.
+     * Provide an instance of this class for delegates to use.  To mock out 
delegated methods, provide an instance of a subclass that
+     * overrides the implementation of the delegated method.
+     *
      * @param u a ServerConfigUtils instance
      * @return the previously set instance
      */
@@ -85,8 +83,8 @@ public class ServerConfigUtils {
 
     /* TODO: make sure test these two functions in manual tests */
     public static List<String> getTopoLogsUsers(Map<String, Object> 
topologyConf) {
-        List<String> logsUsers = 
(List<String>)topologyConf.get(DaemonConfig.LOGS_USERS);
-        List<String> topologyUsers = 
(List<String>)topologyConf.get(Config.TOPOLOGY_USERS);
+        List<String> logsUsers = (List<String>) 
topologyConf.get(DaemonConfig.LOGS_USERS);
+        List<String> topologyUsers = (List<String>) 
topologyConf.get(Config.TOPOLOGY_USERS);
         Set<String> mergedUsers = new HashSet<String>();
         if (logsUsers != null) {
             for (String user : logsUsers) {
@@ -108,8 +106,8 @@ public class ServerConfigUtils {
     }
 
     public static List<String> getTopoLogsGroups(Map<String, Object> 
topologyConf) {
-        List<String> logsGroups = 
(List<String>)topologyConf.get(DaemonConfig.LOGS_GROUPS);
-        List<String> topologyGroups = 
(List<String>)topologyConf.get(Config.TOPOLOGY_GROUPS);
+        List<String> logsGroups = (List<String>) 
topologyConf.get(DaemonConfig.LOGS_GROUPS);
+        List<String> topologyGroups = (List<String>) 
topologyConf.get(Config.TOPOLOGY_GROUPS);
         Set<String> mergedGroups = new HashSet<String>();
         if (logsGroups != null) {
             for (String group : logsGroups) {

Reply via email to