http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index 43c125d..90d68dc 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -24,15 +24,14 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.storm.Config;
 import org.apache.storm.generated.LSWorkerHeartbeat;
 import org.apache.storm.localizer.LocalResource;
 import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.ObjectReader;
-import org.apache.storm.utils.LocalState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,9 +41,11 @@ public class SupervisorUtils {
 
     private static final SupervisorUtils INSTANCE = new SupervisorUtils();
     private static SupervisorUtils _instance = INSTANCE;
+
     public static void setInstance(SupervisorUtils u) {
         _instance = u;
     }
+
     public static void resetInstance() {
         _instance = INSTANCE;
     }
@@ -62,9 +63,9 @@ public class SupervisorUtils {
     }
 
     /**
-     * Given the blob information returns the value of the uncompress field, 
handling it either being a string or a boolean value, or if it's not specified 
then
-     * returns false
-     * 
+     * Given the blob information returns the value of the uncompress field, 
handling it either being a string or a boolean value, or if
+     * it's not specified then returns false
+     *
      * @param blobInfo
      * @return
      */
@@ -73,8 +74,8 @@ public class SupervisorUtils {
     }
 
     /**
-     * Given the blob information returns the value of the workerRestart 
field, handling it either being a string or a boolean value, or
-     * if it's not specified then returns false.
+     * Given the blob information returns the value of the workerRestart 
field, handling it either being a string or a boolean value, or if
+     * it's not specified then returns false.
      *
      * @param blobInfo the info for the blob.
      * @return true if the blob needs a worker restart by way of the callback 
else false.
@@ -85,7 +86,7 @@ public class SupervisorUtils {
 
     /**
      * Returns a list of LocalResources based on the blobstore-map passed in
-     * 
+     *
      * @param blobstoreMap
      * @return
      */
@@ -94,7 +95,8 @@ public class SupervisorUtils {
         if (blobstoreMap != null) {
             for (Map.Entry<String, Map<String, Object>> map : 
blobstoreMap.entrySet()) {
                 Map<String, Object> blobConf = map.getValue();
-                LocalResource localResource = new LocalResource(map.getKey(), 
shouldUncompressBlob(blobConf), blobNeedsWorkerRestart(blobConf));
+                LocalResource localResource =
+                    new LocalResource(map.getKey(), 
shouldUncompressBlob(blobConf), blobNeedsWorkerRestart(blobConf));
                 localResourceList.add(localResource);
             }
         }
@@ -111,12 +113,30 @@ public class SupervisorUtils {
      *
      * @param conf
      * @return
+     *
      * @throws Exception
      */
     public static Map<String, LSWorkerHeartbeat> 
readWorkerHeartbeats(Map<String, Object> conf) throws Exception {
         return _instance.readWorkerHeartbeatsImpl(conf);
     }
 
+    /**
+     * get worker heartbeat by workerId
+     *
+     * @param conf
+     * @param workerId
+     * @return
+     *
+     * @throws IOException
+     */
+    private static LSWorkerHeartbeat readWorkerHeartbeat(Map<String, Object> 
conf, String workerId) {
+        return _instance.readWorkerHeartbeatImpl(conf, workerId);
+    }
+
+    public static boolean isWorkerHbTimedOut(int now, LSWorkerHeartbeat whb, 
Map<String, Object> conf) {
+        return _instance.isWorkerHbTimedOutImpl(now, whb, conf);
+    }
+
     public Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map<String, 
Object> conf) throws Exception {
         Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>();
 
@@ -130,19 +150,6 @@ public class SupervisorUtils {
         return workerHeartbeats;
     }
 
-
-    /**
-     * get worker heartbeat by workerId
-     *
-     * @param conf
-     * @param workerId
-     * @return
-     * @throws IOException
-     */
-    private static LSWorkerHeartbeat readWorkerHeartbeat(Map<String, Object> 
conf, String workerId) {
-        return _instance.readWorkerHeartbeatImpl(conf, workerId);
-    }
-
     protected LSWorkerHeartbeat readWorkerHeartbeatImpl(Map<String, Object> 
conf, String workerId) {
         try {
             LocalState localState = ConfigUtils.workerState(conf, workerId);
@@ -153,11 +160,7 @@ public class SupervisorUtils {
         }
     }
 
-    public static boolean  isWorkerHbTimedOut(int now, LSWorkerHeartbeat whb, 
Map<String, Object> conf) {
-        return _instance.isWorkerHbTimedOutImpl(now, whb, conf);
-    }
-
-    private  boolean  isWorkerHbTimedOutImpl(int now, LSWorkerHeartbeat whb, 
Map<String, Object> conf) {
+    private boolean isWorkerHbTimedOutImpl(int now, LSWorkerHeartbeat whb, 
Map<String, Object> conf) {
         return (now - whb.get_time_secs()) > 
ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/UniFunc.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/UniFunc.java 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/UniFunc.java
index 9662af5..e531cb6 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/UniFunc.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/UniFunc.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.daemon.supervisor;
 
 public interface UniFunc<T> {

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
index c01819b..6e4814c 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.daemon.supervisor.timer;
@@ -21,7 +15,6 @@ package org.apache.storm.daemon.supervisor.timer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.storm.daemon.supervisor.Supervisor;
 import org.apache.storm.daemon.supervisor.SupervisorUtils;
 import org.apache.storm.generated.LSWorkerHeartbeat;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
index 9ffce91..f68cb8c 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java
@@ -19,7 +19,6 @@
 package org.apache.storm.daemon.supervisor.timer;
 
 import java.util.Map;
-
 import org.apache.storm.daemon.supervisor.Supervisor;
 import org.apache.storm.healthcheck.HealthChecker;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
index 14ecf94..24d4471 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.daemon.supervisor.timer;
@@ -56,7 +50,7 @@ public class SupervisorHeartbeat implements Runnable {
         List<Long> usedPorts = new ArrayList<>();
         usedPorts.addAll(supervisor.getCurrAssignment().get().keySet());
         supervisorInfo.set_used_ports(usedPorts);
-        List metaDatas = (List)supervisor.getiSupervisor().getMetadata();
+        List metaDatas = (List) supervisor.getiSupervisor().getMetadata();
         List<Long> portList = new ArrayList<>();
         if (metaDatas != null) {
             for (Object data : metaDatas) {
@@ -85,8 +79,8 @@ public class SupervisorHeartbeat implements Runnable {
 
 
         // If configs are present in Generic map and legacy - the legacy 
values will be overwritten
-        Map<String, Number> rawResourcesMap = (Map<String,Number>) 
conf.getOrDefault(
-                Config.SUPERVISOR_RESOURCES_MAP, Collections.emptyMap()
+        Map<String, Number> rawResourcesMap = (Map<String, Number>) 
conf.getOrDefault(
+            Config.SUPERVISOR_RESOURCES_MAP, Collections.emptyMap()
         );
 
         for (Map.Entry<String, Number> stringNumberEntry : 
rawResourcesMap.entrySet()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
index ba8d133..639c9db 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
@@ -1,26 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.daemon.supervisor.timer;
 
 import java.util.HashMap;
 import java.util.Map;
-
 import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.daemon.supervisor.ReadClusterState;
 import org.apache.storm.daemon.supervisor.Supervisor;
@@ -56,6 +49,18 @@ public class SynchronizeAssignments implements Runnable {
         this.readClusterState = readClusterState;
     }
 
+    private static void assignedAssignmentsToLocal(IStormClusterState 
clusterState, SupervisorAssignments assignments) {
+        if (null == assignments) {
+            //unknown error, just skip
+            return;
+        }
+        Map<String, byte[]> serAssignments = new HashMap<>();
+        for (Map.Entry<String, Assignment> entry : 
assignments.get_storm_assignment().entrySet()) {
+            serAssignments.put(entry.getKey(), 
Utils.serialize(entry.getValue()));
+        }
+        clusterState.syncRemoteAssignments(serAssignments);
+    }
+
     @Override
     public void run() {
         // first sync assignments to local, then sync processes.
@@ -118,16 +123,4 @@ public class SynchronizeAssignments implements Runnable {
             }
         }
     }
-
-    private static void assignedAssignmentsToLocal(IStormClusterState 
clusterState, SupervisorAssignments assignments) {
-        if (null == assignments) {
-            //unknown error, just skip
-            return;
-        }
-        Map<String, byte[]> serAssignments = new HashMap<>();
-        for (Map.Entry<String, Assignment> entry : 
assignments.get_storm_assignment().entrySet()) {
-            serAssignments.put(entry.getKey(), 
Utils.serialize(entry.getValue()));
-        }
-        clusterState.syncRemoteAssignments(serAssignments);
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/event/EventManager.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/event/EventManager.java 
b/storm-server/src/main/java/org/apache/storm/event/EventManager.java
index 64536c1..ff7342b 100644
--- a/storm-server/src/main/java/org/apache/storm/event/EventManager.java
+++ b/storm-server/src/main/java/org/apache/storm/event/EventManager.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.event;
 
 public interface EventManager extends AutoCloseable {

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java 
b/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java
index 6b9d4f1..a527315 100644
--- a/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java
+++ b/storm-server/src/main/java/org/apache/storm/event/EventManagerImp.java
@@ -1,32 +1,26 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.event;
 
-import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+package org.apache.storm.event;
 
 import java.io.InterruptedIOException;
 import java.nio.channels.ClosedByInterruptException;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class EventManagerImp implements EventManager {
     private static final Logger LOG = 
LoggerFactory.getLogger(EventManagerImp.class);
@@ -55,7 +49,7 @@ public class EventManagerImp implements EventManager {
                         r.run();
                         proccessInc();
                     } catch (Throwable t) {
-                        if 
(Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, t) ) {
+                        if 
(Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, t)) {
                             LOG.info("Event manager interrupted while doing 
IO");
                         } else if 
(Utils.exceptionCauseIsInstanceOf(ClosedByInterruptException.class, t)) {
                             LOG.info("Event manager interrupted while doing 
NIO");

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java 
b/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java
index c373b05..83faf13 100644
--- a/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java
+++ b/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java
@@ -18,12 +18,6 @@
 
 package org.apache.storm.healthcheck;
 
-import org.apache.storm.DaemonConfig;
-import org.apache.storm.utils.ObjectReader;
-import org.apache.storm.utils.ServerConfigUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.InputStream;
@@ -32,6 +26,11 @@ import java.nio.channels.ClosedByInterruptException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ServerConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HealthChecker {
 
@@ -50,8 +49,9 @@ public class HealthChecker {
             if (parentFile.exists()) {
                 File[] list = parentFile.listFiles();
                 for (File f : list) {
-                    if (!f.isDirectory() && f.canExecute())
+                    if (!f.isDirectory() && f.canExecute()) {
                         healthScripts.add(f.getAbsolutePath());
+                    }
                 }
             }
             for (String script : healthScripts) {
@@ -66,7 +66,7 @@ public class HealthChecker {
         // to execute properly, not that the system is unhealthy, in which case
         // we don't want to start killing things.
 
-        if (results.contains(FAILED)|| results.contains(FAILED_WITH_EXIT_CODE)
+        if (results.contains(FAILED) || results.contains(FAILED_WITH_EXIT_CODE)
             || results.contains(TIMEOUT)) {
             LOG.warn("The supervisor healthchecks failed!!!");
             return 1;
@@ -120,8 +120,9 @@ public class HealthChecker {
             LOG.warn("Script failed with exception: ", e);
             return FAILED_WITH_EXIT_CODE;
         } finally {
-            if (interruptThread != null)
+            if (interruptThread != null) {
                 interruptThread.interrupt();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java 
b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 1cdcaad..8b41c68 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -29,7 +29,6 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -68,18 +67,19 @@ public class AsyncLocalizer implements AutoCloseable {
     private static final Logger LOG = 
LoggerFactory.getLogger(AsyncLocalizer.class);
 
     private static final CompletableFuture<Void> ALL_DONE_FUTURE = new 
CompletableFuture<>();
+    private static final int ATTEMPTS_INTERVAL_TIME = 100;
 
     static {
         ALL_DONE_FUTURE.complete(null);
     }
 
-    private final boolean isLocalMode;
     // track resources - user to resourceSet
     //ConcurrentHashMap is explicitly used everywhere in this class because it 
uses locks to guarantee atomicity for compute and
     // computeIfAbsent where as ConcurrentMap allows for a retry of the 
function passed in, and would require the function to have
     // no side effects.
     protected final ConcurrentHashMap<String, ConcurrentHashMap<String, 
LocalizedResource>> userFiles = new ConcurrentHashMap<>();
     protected final ConcurrentHashMap<String, ConcurrentHashMap<String, 
LocalizedResource>> userArchives = new ConcurrentHashMap<>();
+    private final boolean isLocalMode;
     // topology to tracking of topology dir and resources
     private final ConcurrentHashMap<String, CompletableFuture<Void>> 
blobPending;
     private final Map<String, Object> conf;
@@ -87,16 +87,14 @@ public class AsyncLocalizer implements AutoCloseable {
     private final boolean symlinksDisabled;
     private final ConcurrentHashMap<String, LocallyCachedBlob> topologyBlobs = 
new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String, CompletableFuture<Void>> 
topologyBasicDownloaded = new ConcurrentHashMap<>();
-
     private final Path localBaseDir;
     private final int blobDownloadRetries;
     private final ScheduledExecutorService execService;
-
+    private final long cacheCleanupPeriod;
     // cleanup
     @VisibleForTesting
     protected long cacheTargetSize;
-    private final long cacheCleanupPeriod;
-    
+
     @VisibleForTesting
     AsyncLocalizer(Map<String, Object> conf, AdvancedFSOps ops, String 
baseDir) throws IOException {
 
@@ -106,7 +104,7 @@ public class AsyncLocalizer implements AutoCloseable {
         localBaseDir = Paths.get(baseDir);
         // default cache size 10GB, converted to Bytes
         cacheTargetSize = 
ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_TARGET_SIZE_MB),
-            10 * 1024).longValue() << 20;
+                                              10 * 1024).longValue() << 20;
         // default 30 seconds. (we cache the size so it is cheap to do)
         cacheCleanupPeriod = ObjectReader.getInt(conf.get(
             DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 30 * 
1000).longValue();
@@ -117,10 +115,10 @@ public class AsyncLocalizer implements AutoCloseable {
             DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);
 
         execService = Executors.newScheduledThreadPool(threadPoolSize,
-            new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Executor 
- %d").build());
+                                                       new 
ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Executor - %d").build());
         reconstructLocalizedResources();
 
-        symlinksDisabled = (boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, 
false);
+        symlinksDisabled = (boolean) 
conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
         blobPending = new ConcurrentHashMap<>();
     }
 
@@ -131,40 +129,43 @@ public class AsyncLocalizer implements AutoCloseable {
     @VisibleForTesting
     LocallyCachedBlob getTopoJar(final String topologyId) {
         return 
topologyBlobs.computeIfAbsent(ConfigUtils.masterStormJarKey(topologyId),
-            (tjk) -> {
-                try {
-                    return new LocallyCachedTopologyBlob(topologyId, 
isLocalMode, conf, fsOps,
-                        LocallyCachedTopologyBlob.TopologyBlobType.TOPO_JAR);
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            });
+                                             (tjk) -> {
+                                                 try {
+                                                     return new 
LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps,
+                                                                               
           LocallyCachedTopologyBlob.TopologyBlobType
+                                                                               
               .TOPO_JAR);
+                                                 } catch (IOException e) {
+                                                     throw new 
RuntimeException(e);
+                                                 }
+                                             });
     }
 
     @VisibleForTesting
     LocallyCachedBlob getTopoCode(final String topologyId) {
         return 
topologyBlobs.computeIfAbsent(ConfigUtils.masterStormCodeKey(topologyId),
-            (tck) -> {
-                try {
-                    return new LocallyCachedTopologyBlob(topologyId, 
isLocalMode, conf, fsOps,
-                        LocallyCachedTopologyBlob.TopologyBlobType.TOPO_CODE);
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            });
+                                             (tck) -> {
+                                                 try {
+                                                     return new 
LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps,
+                                                                               
           LocallyCachedTopologyBlob.TopologyBlobType
+                                                                               
               .TOPO_CODE);
+                                                 } catch (IOException e) {
+                                                     throw new 
RuntimeException(e);
+                                                 }
+                                             });
     }
 
     @VisibleForTesting
     LocallyCachedBlob getTopoConf(final String topologyId) {
         return 
topologyBlobs.computeIfAbsent(ConfigUtils.masterStormConfKey(topologyId),
-            (tck) -> {
-                try {
-                    return new LocallyCachedTopologyBlob(topologyId, 
isLocalMode, conf, fsOps,
-                        LocallyCachedTopologyBlob.TopologyBlobType.TOPO_CONF);
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            });
+                                             (tck) -> {
+                                                 try {
+                                                     return new 
LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps,
+                                                                               
           LocallyCachedTopologyBlob.TopologyBlobType
+                                                                               
               .TOPO_CONF);
+                                                 } catch (IOException e) {
+                                                     throw new 
RuntimeException(e);
+                                                 }
+                                             });
     }
 
     private LocalizedResource getUserArchive(String user, String key) {
@@ -181,33 +182,35 @@ public class AsyncLocalizer implements AutoCloseable {
 
     /**
      * Request that all of the blobs necessary for this topology be downloaded.
+     *
      * @param assignment the assignment that needs the blobs
-     * @param port the port the assignment is a part of
-     * @param cb a callback for when the blobs change.  This is only for blobs 
that are tied to the lifetime of the worker.
+     * @param port       the port the assignment is a part of
+     * @param cb         a callback for when the blobs change.  This is only 
for blobs that are tied to the lifetime of the worker.
      * @return a Future that indicates when they are all downloaded.
+     *
      * @throws IOException if there was an error while trying doing it.
      */
     public CompletableFuture<Void> requestDownloadTopologyBlobs(final 
LocalAssignment assignment, final int port,
-                                                                             
final BlobChangingCallback cb) throws IOException {
+                                                                final 
BlobChangingCallback cb) throws IOException {
         final PortAndAssignment pna = new PortAndAssignment(port, assignment);
         final String topologyId = pna.getToplogyId();
 
         CompletableFuture<Void> baseBlobs = 
requestDownloadBaseTopologyBlobs(pna, cb);
         return baseBlobs.thenComposeAsync((v) ->
-            blobPending.compute(topologyId, (tid, old) -> {
-                CompletableFuture<Void> ret = old;
-                if (ret == null) {
-                    ret = CompletableFuture.supplyAsync(new DownloadBlobs(pna, 
cb), execService);
-                } else {
-                    try {
-                        addReferencesToBlobs(pna, cb);
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
-                    }
-                }
-                LOG.debug("Reserved blobs {} {}", topologyId, ret);
-                return ret;
-            }));
+                                              blobPending.compute(topologyId, 
(tid, old) -> {
+                                                  CompletableFuture<Void> ret 
= old;
+                                                  if (ret == null) {
+                                                      ret = 
CompletableFuture.supplyAsync(new DownloadBlobs(pna, cb), execService);
+                                                  } else {
+                                                      try {
+                                                          
addReferencesToBlobs(pna, cb);
+                                                      } catch (Exception e) {
+                                                          throw new 
RuntimeException(e);
+                                                      }
+                                                  }
+                                                  LOG.debug("Reserved blobs {} 
{}", topologyId, ret);
+                                                  return ret;
+                                              }));
     }
 
     @VisibleForTesting
@@ -225,19 +228,17 @@ public class AsyncLocalizer implements AutoCloseable {
         topoConf.addReference(pna, cb);
 
         return topologyBasicDownloaded.computeIfAbsent(topologyId,
-            (tid) -> downloadOrUpdate(topoJar, topoCode, topoConf));
+                                                       (tid) -> 
downloadOrUpdate(topoJar, topoCode, topoConf));
     }
 
-    private static final int ATTEMPTS_INTERVAL_TIME = 100;
-
-    private CompletableFuture<Void> downloadOrUpdate(LocallyCachedBlob ... 
blobs) {
+    private CompletableFuture<Void> downloadOrUpdate(LocallyCachedBlob... 
blobs) {
         return downloadOrUpdate(Arrays.asList(blobs));
     }
 
     private CompletableFuture<Void> downloadOrUpdate(Collection<? extends 
LocallyCachedBlob> blobs) {
-        CompletableFuture<Void> [] all = new CompletableFuture[blobs.size()];
+        CompletableFuture<Void>[] all = new CompletableFuture[blobs.size()];
         int i = 0;
-        for (final LocallyCachedBlob blob: blobs) {
+        for (final LocallyCachedBlob blob : blobs) {
             all[i] = CompletableFuture.runAsync(() -> {
                 LOG.debug("STARTING download of {}", blob);
                 try (ClientBlobStore blobStore = getClientBlobStore()) {
@@ -296,7 +297,7 @@ public class AsyncLocalizer implements AutoCloseable {
                 futures.add(downloadOrUpdate(map.values()));
             }
         }
-        for (CompletableFuture<?> f: futures) {
+        for (CompletableFuture<?> f : futures) {
             try {
                 f.get();
             } catch (Exception e) {
@@ -312,8 +313,7 @@ public class AsyncLocalizer implements AutoCloseable {
     }
 
     /**
-     * Start any background threads needed.  This includes updating blobs and 
cleaning up
-     * unused blobs over the configured size limit.
+     * Start any background threads needed.  This includes updating blobs and 
cleaning up unused blobs over the configured size limit.
      */
     public void start() {
         execService.scheduleWithFixedDelay(this::updateBlobs, 30, 30, 
TimeUnit.SECONDS);
@@ -366,76 +366,17 @@ public class AsyncLocalizer implements AutoCloseable {
         }
     }
 
-    private class DownloadBlobs implements Supplier<Void> {
-        private final PortAndAssignment pna;
-        private final BlobChangingCallback cb;
-
-        public DownloadBlobs(PortAndAssignment pna, BlobChangingCallback cb) {
-            this.pna = pna;
-            this.cb = cb;
-        }
-
-        @Override
-        public Void get() {
-            try {
-                String topologyId = pna.getToplogyId();
-                String topoOwner = pna.getOwner();
-                String stormroot = ConfigUtils.supervisorStormDistRoot(conf, 
topologyId);
-                Map<String, Object> topoConf = 
ConfigUtils.readSupervisorStormConf(conf, topologyId);
-
-                @SuppressWarnings("unchecked")
-                Map<String, Map<String, Object>> blobstoreMap =
-                    (Map<String, Map<String, Object>>) 
topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
-
-                List<LocalResource> localResourceList = getLocalResources(pna);
-                if (!localResourceList.isEmpty()) {
-                    File userDir = getLocalUserFileCacheDir(topoOwner);
-                    if (!fsOps.fileExists(userDir)) {
-                        fsOps.forceMkdir(userDir);
-                    }
-                    List<LocalizedResource> localizedResources = 
getBlobs(localResourceList, pna, cb);
-                    fsOps.setupBlobPermissions(userDir, topoOwner);
-                    if (!symlinksDisabled) {
-                        for (LocalizedResource localizedResource : 
localizedResources) {
-                            String keyName = localizedResource.getKey();
-                            //The sym link we are pointing to
-                            File rsrcFilePath = 
localizedResource.getCurrentSymlinkPath().toFile();
-
-                            String symlinkName = null;
-                            if (blobstoreMap != null) {
-                                Map<String, Object> blobInfo = 
blobstoreMap.get(keyName);
-                                if (blobInfo != null && 
blobInfo.containsKey("localname")) {
-                                    symlinkName = (String) 
blobInfo.get("localname");
-                                } else {
-                                    symlinkName = keyName;
-                                }
-                            } else {
-                                // all things are from dependencies
-                                symlinkName = keyName;
-                            }
-                            fsOps.createSymlink(new File(stormroot, 
symlinkName), rsrcFilePath);
-                        }
-                    }
-                }
-
-                return null;
-            } catch (Exception e) {
-                LOG.warn("Caught Exception While Downloading (rethrowing)... 
", e);
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
     /**
      * Do everything needed to recover the state in the AsyncLocalizer for a 
running topology.
+     *
      * @param currentAssignment the assignment for the topology.
-     * @param port the port the assignment is on.
-     * @param cb a callback for when the blobs are updated.  This will only be 
for blobs that
-     *     indicate that if they change the worker should be restarted.
+     * @param port              the port the assignment is on.
+     * @param cb                a callback for when the blobs are updated.  
This will only be for blobs that indicate that if they change
+     *                          the worker should be restarted.
      * @throws IOException on any error trying to recover the state.
      */
     public void recoverRunningTopology(final LocalAssignment 
currentAssignment, final int port,
-                                                    final BlobChangingCallback 
cb) throws IOException {
+                                       final BlobChangingCallback cb) throws 
IOException {
         final PortAndAssignment pna = new PortAndAssignment(port, 
currentAssignment);
         final String topologyId = pna.getToplogyId();
 
@@ -463,7 +404,7 @@ public class AsyncLocalizer implements AutoCloseable {
      * Remove this assignment/port as blocking resources from being cleaned up.
      *
      * @param assignment the assignment the resources are for
-     * @param port the port the topology is running on
+     * @param port       the port the topology is running on
      * @throws IOException on any error
      */
     public void releaseSlotFor(LocalAssignment assignment, int port) throws 
IOException {
@@ -512,13 +453,13 @@ public class AsyncLocalizer implements AutoCloseable {
     }
 
     private void recoverLocalizedArchivesForUser(String user) throws 
IOException {
-        for (String key: 
LocalizedResource.getLocalizedArchiveKeys(localBaseDir, user)) {
+        for (String key : 
LocalizedResource.getLocalizedArchiveKeys(localBaseDir, user)) {
             getUserArchive(user, key);
         }
     }
 
     private void recoverLocalizedFilesForUser(String user) throws IOException {
-        for (String key: LocalizedResource.getLocalizedFileKeys(localBaseDir, 
user)) {
+        for (String key : LocalizedResource.getLocalizedFileKeys(localBaseDir, 
user)) {
             getUserFile(user, key);
         }
     }
@@ -544,7 +485,7 @@ public class AsyncLocalizer implements AutoCloseable {
 
     // ignores invalid user/topo/key
     void removeBlobReference(String key, PortAndAssignment pna,
-                                          boolean uncompress) throws 
AuthorizationException, KeyNotFoundException {
+                             boolean uncompress) throws 
AuthorizationException, KeyNotFoundException {
         String user = pna.getOwner();
         String topo = pna.getToplogyId();
         ConcurrentMap<String, LocalizedResource> lrsrcSet = uncompress ? 
userArchives.get(user) : userFiles.get(user);
@@ -555,11 +496,11 @@ public class AsyncLocalizer implements AutoCloseable {
                 lrsrc.removeReference(pna);
             } else {
                 LOG.warn("trying to remove non-existent blob, key: " + key + " 
for user: " + user
-                    + " topo: " + topo);
+                         + " topo: " + topo);
             }
         } else {
             LOG.warn("trying to remove blob for non-existent resource set for 
user: " + user + " key: "
-                + key + " topo: " + topo);
+                     + key + " topo: " + topo);
         }
     }
 
@@ -568,9 +509,8 @@ public class AsyncLocalizer implements AutoCloseable {
     }
 
     /**
-     * This function either returns the blobs in the existing cache or if they 
don't exist in the
-     * cache, it downloads them in parallel (up to 
SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT)
-     * and will block until all of them have been downloaded.
+     * This function either returns the blobs in the existing cache or if they 
don't exist in the cache, it downloads them in parallel (up
+     * to SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT) and will block until all 
of them have been downloaded.
      */
     List<LocalizedResource> getBlobs(List<LocalResource> localResources, 
PortAndAssignment pna, BlobChangingCallback cb)
         throws AuthorizationException, KeyNotFoundException, IOException {
@@ -582,7 +522,7 @@ public class AsyncLocalizer implements AutoCloseable {
         List<CompletableFuture<?>> futures = new ArrayList<>();
 
         try {
-            for (LocalResource localResource: localResources) {
+            for (LocalResource localResource : localResources) {
                 String key = localResource.getBlobName();
                 boolean uncompress = localResource.shouldUncompress();
                 LocalizedResource lrsrc = uncompress ? getUserArchive(user, 
key) : getUserFile(user, key);
@@ -595,7 +535,7 @@ public class AsyncLocalizer implements AutoCloseable {
                 lrsrc.addReference(pna, localResource.needsCallback() ? cb : 
null);
             }
 
-            for (CompletableFuture<?> futureRsrc: futures) {
+            for (CompletableFuture<?> futureRsrc : futures) {
                 futureRsrc.get();
             }
         } catch (ExecutionException e) {
@@ -610,10 +550,6 @@ public class AsyncLocalizer implements AutoCloseable {
         return results;
     }
 
-    private interface ConsumePathAndId {
-        void accept(Path path, String topologyId) throws IOException;
-    }
-
     private void forEachTopologyDistDir(ConsumePathAndId consumer) throws 
IOException {
         Path stormCodeRoot = 
Paths.get(ConfigUtils.supervisorStormDistRoot(conf));
         if (Files.exists(stormCodeRoot) && Files.isDirectory(stormCodeRoot)) {
@@ -669,7 +605,7 @@ public class AsyncLocalizer implements AutoCloseable {
         LOG.debug("Resource cleanup: {}", toClean);
         Set<String> allUsers = new HashSet<>(userArchives.keySet());
         allUsers.addAll(userFiles.keySet());
-        for (String user: allUsers) {
+        for (String user : allUsers) {
             ConcurrentMap<String, LocalizedResource> filesForUser = 
userFiles.get(user);
             ConcurrentMap<String, LocalizedResource> archivesForUser = 
userArchives.get(user);
             if ((filesForUser == null || filesForUser.size() == 0)
@@ -686,4 +622,68 @@ public class AsyncLocalizer implements AutoCloseable {
             }
         }
     }
+
+    private interface ConsumePathAndId {
+        void accept(Path path, String topologyId) throws IOException;
+    }
+
+    private class DownloadBlobs implements Supplier<Void> {
+        private final PortAndAssignment pna;
+        private final BlobChangingCallback cb;
+
+        public DownloadBlobs(PortAndAssignment pna, BlobChangingCallback cb) {
+            this.pna = pna;
+            this.cb = cb;
+        }
+
+        @Override
+        public Void get() {
+            try {
+                String topologyId = pna.getToplogyId();
+                String topoOwner = pna.getOwner();
+                String stormroot = ConfigUtils.supervisorStormDistRoot(conf, 
topologyId);
+                Map<String, Object> topoConf = 
ConfigUtils.readSupervisorStormConf(conf, topologyId);
+
+                @SuppressWarnings("unchecked")
+                Map<String, Map<String, Object>> blobstoreMap =
+                    (Map<String, Map<String, Object>>) 
topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+
+                List<LocalResource> localResourceList = getLocalResources(pna);
+                if (!localResourceList.isEmpty()) {
+                    File userDir = getLocalUserFileCacheDir(topoOwner);
+                    if (!fsOps.fileExists(userDir)) {
+                        fsOps.forceMkdir(userDir);
+                    }
+                    List<LocalizedResource> localizedResources = 
getBlobs(localResourceList, pna, cb);
+                    fsOps.setupBlobPermissions(userDir, topoOwner);
+                    if (!symlinksDisabled) {
+                        for (LocalizedResource localizedResource : 
localizedResources) {
+                            String keyName = localizedResource.getKey();
+                            //The sym link we are pointing to
+                            File rsrcFilePath = 
localizedResource.getCurrentSymlinkPath().toFile();
+
+                            String symlinkName = null;
+                            if (blobstoreMap != null) {
+                                Map<String, Object> blobInfo = 
blobstoreMap.get(keyName);
+                                if (blobInfo != null && 
blobInfo.containsKey("localname")) {
+                                    symlinkName = (String) 
blobInfo.get("localname");
+                                } else {
+                                    symlinkName = keyName;
+                                }
+                            } else {
+                                // all things are from dependencies
+                                symlinkName = keyName;
+                            }
+                            fsOps.createSymlink(new File(stormroot, 
symlinkName), rsrcFilePath);
+                        }
+                    }
+                }
+
+                return null;
+            } catch (Exception e) {
+                LOG.warn("Caught Exception While Downloading (rethrowing)... 
", e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/localizer/BlobChangingCallback.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/localizer/BlobChangingCallback.java
 
b/storm-server/src/main/java/org/apache/storm/localizer/BlobChangingCallback.java
index e99f95b..ebbbdc5 100644
--- 
a/storm-server/src/main/java/org/apache/storm/localizer/BlobChangingCallback.java
+++ 
b/storm-server/src/main/java/org/apache/storm/localizer/BlobChangingCallback.java
@@ -26,15 +26,15 @@ import org.apache.storm.generated.LocalAssignment;
 public interface BlobChangingCallback {
 
     /**
-     * Informs the listener that a blob has changed and is ready to update and 
replace a localized blob that has been marked as
-     * tied to the life cycle of the worker process.
+     * Informs the listener that a blob has changed and is ready to update and 
replace a localized blob that has been marked as tied to the
+     * life cycle of the worker process.
+     *
+     * If `go.getLatch()` is never called before the method completes it is 
assumed that the listener is good with the blob changing.
      *
-     * If `go.getLatch()` is never called before the method completes it is 
assumed that
-     * the listener is good with the blob changing.
      * @param assignment the assignment this resource and callback are 
registered with.
-     * @param port the port that this resource and callback are registered 
with.
-     * @param blob the blob that is going to change.
-     * @param go a way to indicate if the listener is ready for the resource 
to change.
+     * @param port       the port that this resource and callback are 
registered with.
+     * @param blob       the blob that is going to change.
+     * @param go         a way to indicate if the listener is ready for the 
resource to change.
      */
     void blobChanging(LocalAssignment assignment, int port, LocallyCachedBlob 
blob, GoodToGo go);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/localizer/GoodToGo.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/localizer/GoodToGo.java 
b/storm-server/src/main/java/org/apache/storm/localizer/GoodToGo.java
index 2fffe21..9217388 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/GoodToGo.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/GoodToGo.java
@@ -22,39 +22,20 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 
 /**
- * Used as a way to give feedback that the listener is ready for the caller to 
change the blob.
- * By calling @{link GoodToGo#getLatch()} the listener indicates that it wants 
to block
- * changing the blob until the CountDownLatch is triggered with a call to 
@{link CountDownLatch#countDown()}.
+ * Used as a way to give feedback that the listener is ready for the caller to 
change the blob. By calling @{link GoodToGo#getLatch()} the
+ * listener indicates that it wants to block changing the blob until the 
CountDownLatch is triggered with a call to @{link
+ * CountDownLatch#countDown()}.
  */
 public class GoodToGo {
-    public static class GoodToGoLatch {
-        private final CountDownLatch latch;
-        private final Future<Void> doneChanging;
-        private boolean wasCounted = false;
-
-        public GoodToGoLatch(CountDownLatch latch, Future<Void> doneChanging) {
-            this.latch = latch;
-            this.doneChanging = doneChanging;
-        }
-
-        public synchronized Future<Void> countDown() {
-            if (!wasCounted) {
-                latch.countDown();
-                wasCounted = true;
-            }
-            return doneChanging;
-        }
-    }
-
     private final GoodToGoLatch latch;
     private boolean gotLatch = false;
-
     public GoodToGo(CountDownLatch latch, Future<Void> doneChanging) {
         this.latch = new GoodToGoLatch(latch, doneChanging);
     }
 
     /**
      * Get the latch and indicate that you want to block the blob being 
changed.
+     *
      * @return the latch to use when you are ready.
      */
     public synchronized GoodToGoLatch getLatch() {
@@ -67,4 +48,23 @@ public class GoodToGo {
             latch.countDown();
         }
     }
+
+    public static class GoodToGoLatch {
+        private final CountDownLatch latch;
+        private final Future<Void> doneChanging;
+        private boolean wasCounted = false;
+
+        public GoodToGoLatch(CountDownLatch latch, Future<Void> doneChanging) {
+            this.latch = latch;
+            this.doneChanging = doneChanging;
+        }
+
+        public synchronized Future<Void> countDown() {
+            if (!wasCounted) {
+                latch.countDown();
+                wasCounted = true;
+            }
+            return doneChanging;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/localizer/LocalResource.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/localizer/LocalResource.java 
b/storm-server/src/main/java/org/apache/storm/localizer/LocalResource.java
index 862349b..f5dfc77 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocalResource.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocalResource.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.localizer;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java 
b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
index 75a2d6d..6e0061d 100644
--- 
a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
+++ 
b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
@@ -18,8 +18,6 @@
 
 package org.apache.storm.localizer;
 
-import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
-
 import com.google.common.annotations.VisibleForTesting;
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
@@ -57,13 +55,13 @@ import org.apache.storm.utils.ShellUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+
 /**
- * Represents a resource that is localized on the supervisor.
- * A localized resource has a .current symlink to the current version file 
which is named
- * filename.{current version}. There is also a filename.version which contains 
the latest version.
+ * Represents a resource that is localized on the supervisor. A localized 
resource has a .current symlink to the current version file which
+ * is named filename.{current version}. There is also a filename.version which 
contains the latest version.
  */
 public class LocalizedResource extends LocallyCachedBlob {
-    private static final Logger LOG = 
LoggerFactory.getLogger(LocalizedResource.class);
     @VisibleForTesting
     static final String CURRENT_BLOB_SUFFIX = ".current";
     @VisibleForTesting
@@ -77,7 +75,34 @@ public class LocalizedResource extends LocallyCachedBlob {
     static final String FILESDIR = "files";
     @VisibleForTesting
     static final String ARCHIVESDIR = "archives";
+    private static final Logger LOG = 
LoggerFactory.getLogger(LocalizedResource.class);
     private static final String TO_UNCOMPRESS = "_tmp_";
+    private static final Pattern VERSION_FILE_PATTERN = 
Pattern.compile("^(.+)\\.(\\d+)$");
+    // filesystem path to the resource
+    private final Path baseDir;
+    private final Path versionFilePath;
+    private final Path symlinkPath;
+    private final boolean uncompressed;
+    private final IAdvancedFSOps fsOps;
+    private final String user;
+    private final Map<String, Object> conf;
+    // size of the resource
+    private long size = -1;
+
+    LocalizedResource(String key, Path localBaseDir, boolean uncompressed, 
IAdvancedFSOps fsOps, Map<String, Object> conf,
+                      String user) {
+        super(key + (uncompressed ? " archive" : " file"), key);
+        Path base = getLocalUserFileCacheDir(localBaseDir, user);
+        this.baseDir = uncompressed ? getCacheDirForArchives(base) : 
getCacheDirForFiles(base);
+        this.conf = conf;
+        this.user = user;
+        this.fsOps = fsOps;
+        versionFilePath = constructVersionFileName(baseDir, key);
+        symlinkPath = constructBlobCurrentSymlinkName(baseDir, key);
+        this.uncompressed = uncompressed;
+        //Set the size in case we are recovering an already downloaded object
+        setSize();
+    }
 
     private static Path constructVersionFileName(Path baseDir, String key) {
         return baseDir.resolve(key + BLOB_VERSION_SUFFIX);
@@ -141,16 +166,16 @@ public class LocalizedResource extends LocallyCachedBlob {
             return Collections.emptyList();
         }
         return Files.list(dir)
-            .map((p) -> p.getFileName().toString())
-            .filter((name) -> name.toLowerCase().endsWith(CURRENT_BLOB_SUFFIX))
-            .map((key) -> {
-                int p = key.lastIndexOf('.');
-                if (p > 0) {
-                    key = key.substring(0, p);
-                }
-                return key;
-            })
-            .collect(Collectors.toList());
+                    .map((p) -> p.getFileName().toString())
+                    .filter((name) -> 
name.toLowerCase().endsWith(CURRENT_BLOB_SUFFIX))
+                    .map((key) -> {
+                        int p = key.lastIndexOf('.');
+                        if (p > 0) {
+                            key = key.substring(0, p);
+                        }
+                        return key;
+                    })
+                    .collect(Collectors.toList());
     }
 
     // baseDir/supervisor/usercache/
@@ -179,32 +204,6 @@ public class LocalizedResource extends LocallyCachedBlob {
         return dir.resolve(ARCHIVESDIR);
     }
 
-    // filesystem path to the resource
-    private final Path baseDir;
-    private final Path versionFilePath;
-    private final Path symlinkPath;
-    private final boolean uncompressed;
-    private final IAdvancedFSOps fsOps;
-    private final String user;
-    // size of the resource
-    private long size = -1;
-    private final Map<String, Object> conf;
-
-    LocalizedResource(String key, Path localBaseDir, boolean uncompressed, 
IAdvancedFSOps fsOps, Map<String, Object> conf,
-                             String user) {
-        super(key + (uncompressed ? " archive" : " file"), key);
-        Path base = getLocalUserFileCacheDir(localBaseDir, user);
-        this.baseDir = uncompressed ? getCacheDirForArchives(base) : 
getCacheDirForFiles(base);
-        this.conf = conf;
-        this.user = user;
-        this.fsOps = fsOps;
-        versionFilePath = constructVersionFileName(baseDir, key);
-        symlinkPath = constructBlobCurrentSymlinkName(baseDir, key);
-        this.uncompressed = uncompressed;
-        //Set the size in case we are recovering an already downloaded object
-        setSize();
-    }
-
     Path getCurrentSymlinkPath() {
         return symlinkPath;
     }
@@ -331,7 +330,7 @@ public class LocalizedResource extends LocallyCachedBlob {
             LOG.warn("Exit code from worker-launcher is: {}", exitCode, e);
             LOG.debug("output: {}", shExec.getOutput());
             throw new IOException("Setting blob permissions failed"
-                + " (exitCode=" + exitCode + ") with output: " + 
shExec.getOutput(), e);
+                                  + " (exitCode=" + exitCode + ") with output: 
" + shExec.getOutput(), e);
         }
     }
 
@@ -343,8 +342,6 @@ public class LocalizedResource extends LocallyCachedBlob {
         return baseDir.resolve(Paths.get(LocalizedResource.TO_UNCOMPRESS + 
getKey() + CURRENT_BLOB_SUFFIX));
     }
 
-    private static final Pattern VERSION_FILE_PATTERN = 
Pattern.compile("^(.+)\\.(\\d+)$");
-
     @Override
     public void cleanupOrphanedData() throws IOException {
         //There are a few possible files that we would want to clean up
@@ -434,14 +431,14 @@ public class LocalizedResource extends LocallyCachedBlob {
     @Override
     public boolean isFullyDownloaded() {
         return Files.exists(getFilePathWithVersion())
-            && Files.exists(getCurrentSymlinkPath())
-            && Files.exists(versionFilePath);
+               && Files.exists(getCurrentSymlinkPath())
+               && Files.exists(versionFilePath);
     }
 
     @Override
     public boolean equals(Object other) {
         if (other instanceof LocalizedResource) {
-            LocalizedResource l = (LocalizedResource)other;
+            LocalizedResource l = (LocalizedResource) other;
             return getKey().equals(l.getKey()) && uncompressed == 
l.uncompressed && baseDir.equals(l.baseDir);
         }
         return false;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
 
b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
index 936dbc1..15eba02 100644
--- 
a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
+++ 
b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResourceRetentionSet.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.localizer;
@@ -37,11 +31,11 @@ import org.slf4j.LoggerFactory;
  */
 public class LocalizedResourceRetentionSet {
     public static final Logger LOG = 
LoggerFactory.getLogger(LocalizedResourceRetentionSet.class);
+    @VisibleForTesting
+    final SortedMap<LocallyCachedBlob, Map<String, ? extends 
LocallyCachedBlob>> noReferences;
     private long currentSize;
     // targetSize in Bytes
     private long targetSize;
-    @VisibleForTesting
-    final SortedMap<LocallyCachedBlob, Map<String, ? extends 
LocallyCachedBlob>> noReferences;
 
     LocalizedResourceRetentionSet(long targetSize) {
         this(targetSize, new LRUComparator());
@@ -68,7 +62,7 @@ public class LocalizedResourceRetentionSet {
      *     if they are deleted on disk too.
      */
     public void addResources(ConcurrentMap<String, ? extends 
LocallyCachedBlob> blobs) {
-        for (LocallyCachedBlob b: blobs.values()) {
+        for (LocallyCachedBlob b : blobs.values()) {
             currentSize += b.getSizeOnDisk();
             if (b.isUsed()) {
                 LOG.debug("NOT going to clean up {}, {} depends on it", 
b.getKey(), b.getDependencies());
@@ -90,7 +84,7 @@ public class LocalizedResourceRetentionSet {
         long bytesOver = currentSize - targetSize;
         //First delete everything that no longer exists...
         for (Iterator<Map.Entry<LocallyCachedBlob, Map<String, ? extends 
LocallyCachedBlob>>> i = noReferences.entrySet().iterator();
-             i.hasNext();) {
+             i.hasNext(); ) {
             Map.Entry<LocallyCachedBlob, Map<String, ? extends 
LocallyCachedBlob>> rsrc = i.next();
             LocallyCachedBlob resource = rsrc.getKey();
             try {
@@ -109,7 +103,7 @@ public class LocalizedResourceRetentionSet {
         }
 
         for (Iterator<Map.Entry<LocallyCachedBlob, Map<String, ? extends 
LocallyCachedBlob>>> i = noReferences.entrySet().iterator();
-             bytesOver > 0 && i.hasNext();) {
+             bytesOver > 0 && i.hasNext(); ) {
             Map.Entry<LocallyCachedBlob, Map<String, ? extends 
LocallyCachedBlob>> rsrc = i.next();
             LocallyCachedBlob resource = rsrc.getKey();
             Map<String, ? extends LocallyCachedBlob> set = rsrc.getValue();

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java 
b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
index 1cbb221..d112ee5 100644
--- 
a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
+++ 
b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.localizer;
@@ -43,16 +37,15 @@ import org.slf4j.LoggerFactory;
  * Represents a blob that is cached locally on disk by the supervisor.
  */
 public abstract class LocallyCachedBlob {
-    private static final Logger LOG = 
LoggerFactory.getLogger(LocallyCachedBlob.class);
     public static final long NOT_DOWNLOADED_VERSION = -1;
+    private static final Logger LOG = 
LoggerFactory.getLogger(LocallyCachedBlob.class);
     // A callback that does nothing.
     private static final BlobChangingCallback NOOP_CB = (assignment, port, 
resource, go) -> {
     };
-
-    private long lastUsed = Time.currentTimeMillis();
     private final Map<PortAndAssignment, BlobChangingCallback> references = 
new HashMap<>();
     private final String blobDescription;
     private final String blobKey;
+    private long lastUsed = Time.currentTimeMillis();
     private CompletableFuture<Void> doneUpdating = null;
 
     /**
@@ -66,28 +59,8 @@ public abstract class LocallyCachedBlob {
         this.blobKey = blobKey;
     }
 
-    /**
-     * Get the version of the blob cached locally.  If the version is unknown 
or it has not been downloaded NOT_DOWNLOADED_VERSION should be
-     * returned. PRECONDITION: this can only be called with a lock on this 
instance held.
-     */
-    public abstract long getLocalVersion();
-
-    /**
-     * Get the version of the blob in the blob store. PRECONDITION: this can 
only be called with a lock on this instance held.
-     */
-    public abstract long getRemoteVersion(ClientBlobStore store) throws 
KeyNotFoundException, AuthorizationException;
-
-    /**
-     * Download the latest version to a temp location. This may also include 
unzipping some or all of the data to a temp location.
-     * PRECONDITION: this can only be called with a lock on this instance held.
-     *
-     * @param store the store to us to download the data.
-     * @return the version that was downloaded.
-     */
-    public abstract long downloadToTempLocation(ClientBlobStore store) throws 
IOException, KeyNotFoundException, AuthorizationException;
-
     protected static long downloadToTempLocation(ClientBlobStore store, String 
key, long currentVersion, IAdvancedFSOps fsOps,
-                                          Function<Long, Path> getTempPath)
+                                                 Function<Long, Path> 
getTempPath)
         throws KeyNotFoundException, AuthorizationException, IOException {
         try (InputStreamWithMeta in = store.getBlob(key)) {
             long newVersion = in.getVersion();
@@ -116,6 +89,51 @@ public abstract class LocallyCachedBlob {
     }
 
     /**
+     * Get the size of p in bytes.
+     * @param p the path to read.
+     * @return the size of p in bytes.
+     */
+    protected static long getSizeOnDisk(Path p) throws IOException {
+        if (!Files.exists(p)) {
+            return 0;
+        } else if (Files.isRegularFile(p)) {
+            return Files.size(p);
+        } else {
+            //We will not follow sym links
+            return Files.walk(p)
+                        .filter((subp) -> Files.isRegularFile(subp, 
LinkOption.NOFOLLOW_LINKS))
+                        .mapToLong((subp) -> {
+                            try {
+                                return Files.size(subp);
+                            } catch (IOException e) {
+                                LOG.warn("Could not get the size of ");
+                            }
+                            return 0;
+                        }).sum();
+        }
+    }
+
+    /**
+     * Get the version of the blob cached locally.  If the version is unknown 
or it has not been downloaded NOT_DOWNLOADED_VERSION should be
+     * returned. PRECONDITION: this can only be called with a lock on this 
instance held.
+     */
+    public abstract long getLocalVersion();
+
+    /**
+     * Get the version of the blob in the blob store. PRECONDITION: this can 
only be called with a lock on this instance held.
+     */
+    public abstract long getRemoteVersion(ClientBlobStore store) throws 
KeyNotFoundException, AuthorizationException;
+
+    /**
+     * Download the latest version to a temp location. This may also include 
unzipping some or all of the data to a temp location.
+     * PRECONDITION: this can only be called with a lock on this instance held.
+     *
+     * @param store the store to us to download the data.
+     * @return the version that was downloaded.
+     */
+    public abstract long downloadToTempLocation(ClientBlobStore store) throws 
IOException, KeyNotFoundException, AuthorizationException;
+
+    /**
      * Commit the new version and make it available for the end user.
      * PRECONDITION: uncompressToTempLocationIfNeeded will have been called.
      * PRECONDITION: this can only be called with a lock on this instance held.
@@ -145,31 +163,6 @@ public abstract class LocallyCachedBlob {
     public abstract long getSizeOnDisk();
 
     /**
-     * Get the size of p in bytes.
-     * @param p the path to read.
-     * @return the size of p in bytes.
-     */
-    protected static long getSizeOnDisk(Path p) throws IOException {
-        if (!Files.exists(p)) {
-            return 0;
-        } else if (Files.isRegularFile(p)) {
-            return Files.size(p);
-        } else {
-            //We will not follow sym links
-            return Files.walk(p)
-                .filter((subp) -> Files.isRegularFile(subp, 
LinkOption.NOFOLLOW_LINKS))
-                .mapToLong((subp) -> {
-                    try {
-                        return Files.size(subp);
-                    } catch (IOException e) {
-                        LOG.warn("Could not get the size of ");
-                    }
-                    return 0;
-                }).sum();
-        }
-    }
-
-    /**
      * Updates the last updated time.  This should be called when references 
are added or removed.
      */
     protected synchronized void touch() {

Reply via email to