http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopoCache.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopoCache.java 
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopoCache.java
index 2b9de22..0b0e70c 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopoCache.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopoCache.java
@@ -1,32 +1,22 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.daemon.nimbus;
 
-import static org.apache.storm.blobstore.BlobStoreAclHandler.READ;
-
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-
 import javax.security.auth.Subject;
-
 import org.apache.storm.blobstore.BlobStore;
 import org.apache.storm.blobstore.BlobStoreAclHandler;
 import org.apache.storm.generated.AccessControl;
@@ -41,6 +31,8 @@ import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.storm.blobstore.BlobStoreAclHandler.READ;
+
 /**
  * Cache topologies and topology confs from the blob store.
  * Makes reading this faster because it can skip
@@ -48,22 +40,10 @@ import org.slf4j.LoggerFactory;
  */
 public class TopoCache {
     public static final Logger LOG = LoggerFactory.getLogger(TopoCache.class);
-
-    private static final class WithAcl<T> {
-        public final List<AccessControl> acl;
-        public final T data;
-
-        public WithAcl(List<AccessControl> acl, T data) {
-            this.acl = acl;
-            this.data = data;
-        }
-    }
-
     private final BlobStore store;
     private final BlobStoreAclHandler aclHandler;
     private final ConcurrentHashMap<String, WithAcl<StormTopology>> topos = 
new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String, WithAcl<Map<String, Object>>> 
confs = new ConcurrentHashMap<>();
-
     public TopoCache(BlobStore store, Map<String, Object> conf) {
         this.store = store;
         aclHandler = new BlobStoreAclHandler(conf);
@@ -243,4 +223,14 @@ public class TopoCache {
         confs.clear();
         topos.clear();
     }
+
+    private static final class WithAcl<T> {
+        public final List<AccessControl> acl;
+        public final T data;
+
+        public WithAcl(List<AccessControl> acl, T data) {
+            this.acl = acl;
+            this.data = data;
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyActions.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyActions.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyActions.java
index 6a4a67a..bc41da5 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyActions.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyActions.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.daemon.nimbus;
 
 /**

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java
index 2812cce..bb932ed 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java
@@ -1,26 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.daemon.nimbus;
 
 import java.util.Collection;
 import java.util.Map;
-
 import org.apache.storm.generated.Assignment;
 import org.apache.storm.generated.NodeInfo;
 import org.apache.storm.generated.WorkerResources;
@@ -29,44 +22,6 @@ import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.WorkerSlot;
 
 public final class TopologyResources {
-    private static Collection<WorkerResources> 
getWorkerResources(SchedulerAssignment assignment) {
-        Collection<WorkerResources> ret = null;
-        if (assignment != null) {
-            Map<WorkerSlot, WorkerResources> allResources = 
assignment.getScheduledResources();
-            if (allResources != null) {
-                ret = allResources.values();
-            }
-        }
-        return ret;
-    }
-
-    private static Collection<WorkerResources> getWorkerResources(Assignment 
assignment) {
-        Collection<WorkerResources> ret = null;
-        if (assignment != null) {
-            Map<NodeInfo, WorkerResources> allResources = 
assignment.get_worker_resources();
-            if (allResources != null) {
-                ret = allResources.values();
-            }
-        }
-        return ret;
-    }
-
-    private static Map<String, Double> 
getNodeIdToSharedOffHeap(SchedulerAssignment assignment) {
-        Map<String, Double> ret = null;
-        if (assignment != null) {
-            ret = assignment.getNodeIdToTotalSharedOffHeapMemory();
-        }
-        return ret;
-    }
-
-    private static Map<String, Double> getNodeIdToSharedOffHeap(Assignment 
assignment) {
-        Map<String, Double> ret = null;
-        if (assignment != null) {
-            ret = assignment.get_total_shared_off_heap();
-        }
-        return ret;
-    }
-
     private final double requestedMemOnHeap;
     private final double requestedMemOffHeap;
     private final double requestedSharedMemOnHeap;
@@ -81,7 +36,6 @@ public final class TopologyResources {
     private double assignedNonSharedMemOnHeap;
     private double assignedNonSharedMemOffHeap;
     private double assignedCpu;
-
     private TopologyResources(TopologyDetails td, Collection<WorkerResources> 
workers,
                               Map<String, Double> sharedOffHeap) {
         requestedMemOnHeap = td.getTotalRequestedMemOnHeap();
@@ -125,15 +79,12 @@ public final class TopologyResources {
             assignedMemOffHeap += sharedOff;
         }
     }
-
     public TopologyResources(TopologyDetails td, SchedulerAssignment 
assignment) {
         this(td, getWorkerResources(assignment), 
getNodeIdToSharedOffHeap(assignment));
     }
-
     public TopologyResources(TopologyDetails td, Assignment assignment) {
         this(td, getWorkerResources(assignment), 
getNodeIdToSharedOffHeap(assignment));
     }
-
     public TopologyResources() {
         this(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
     }
@@ -169,6 +120,44 @@ public final class TopologyResources {
         this.assignedCpu = assignedCpu;
     }
 
+    private static Collection<WorkerResources> 
getWorkerResources(SchedulerAssignment assignment) {
+        Collection<WorkerResources> ret = null;
+        if (assignment != null) {
+            Map<WorkerSlot, WorkerResources> allResources = 
assignment.getScheduledResources();
+            if (allResources != null) {
+                ret = allResources.values();
+            }
+        }
+        return ret;
+    }
+
+    private static Collection<WorkerResources> getWorkerResources(Assignment 
assignment) {
+        Collection<WorkerResources> ret = null;
+        if (assignment != null) {
+            Map<NodeInfo, WorkerResources> allResources = 
assignment.get_worker_resources();
+            if (allResources != null) {
+                ret = allResources.values();
+            }
+        }
+        return ret;
+    }
+
+    private static Map<String, Double> 
getNodeIdToSharedOffHeap(SchedulerAssignment assignment) {
+        Map<String, Double> ret = null;
+        if (assignment != null) {
+            ret = assignment.getNodeIdToTotalSharedOffHeapMemory();
+        }
+        return ret;
+    }
+
+    private static Map<String, Double> getNodeIdToSharedOffHeap(Assignment 
assignment) {
+        Map<String, Double> ret = null;
+        if (assignment != null) {
+            ret = assignment.get_total_shared_off_heap();
+        }
+        return ret;
+    }
+
     public double getRequestedMemOnHeap() {
         return requestedMemOnHeap;
     }
@@ -209,6 +198,10 @@ public final class TopologyResources {
         return assignedSharedMemOnHeap;
     }
 
+    public void setAssignedSharedMemOnHeap(double assignedSharedMemOnHeap) {
+        this.assignedSharedMemOnHeap = assignedSharedMemOnHeap;
+    }
+
     public double getRequestedSharedMemOnHeap() {
         return requestedSharedMemOnHeap;
     }
@@ -225,10 +218,6 @@ public final class TopologyResources {
         return requestedNonSharedMemOffHeap;
     }
 
-    public void setAssignedSharedMemOnHeap(double assignedSharedMemOnHeap) {
-        this.assignedSharedMemOnHeap = assignedSharedMemOnHeap;
-    }
-
     public double getAssignedSharedMemOffHeap() {
         return assignedSharedMemOffHeap;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyStateTransition.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyStateTransition.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyStateTransition.java
index 39151e4..37019b9 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyStateTransition.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyStateTransition.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.daemon.nimbus;
 
 import org.apache.storm.generated.StormBase;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
index e2ecbfb..2358f2a 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
@@ -8,7 +8,7 @@
  * with the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,12 +18,8 @@
 
 package org.apache.storm.daemon.supervisor;
 
-import static 
org.apache.storm.daemon.nimbus.Nimbus.MIN_VERSION_SUPPORT_RPC_HEARTBEAT;
-import static org.apache.storm.utils.Utils.OR;
-
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
-
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -38,7 +34,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
@@ -60,84 +55,75 @@ import org.apache.storm.utils.VersionInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.storm.daemon.nimbus.Nimbus.MIN_VERSION_SUPPORT_RPC_HEARTBEAT;
+import static org.apache.storm.utils.Utils.OR;
+
 /**
  * A container that runs processes on the local box.
  */
 public class BasicContainer extends Container {
+    static final TopoMetaLRUCache TOPO_META_CACHE = new TopoMetaLRUCache();
     private static final Logger LOG = 
LoggerFactory.getLogger(BasicContainer.class);
     private static final FilenameFilter jarFilter = (dir, name) -> 
name.endsWith(".jar");
-    private static final Joiner CPJ = 
-            Joiner.on(File.pathSeparator).skipNulls();
-    
+    private static final Joiner CPJ =
+        Joiner.on(File.pathSeparator).skipNulls();
     protected final LocalState _localState;
     protected final String _profileCmd;
     protected final String _stormHome = System.getProperty("storm.home");
-    protected volatile boolean _exitedEarly = false;
-    protected volatile long memoryLimitMB;
-    protected volatile long memoryLimitExceededStart;
     protected final double hardMemoryLimitMultiplier;
     protected final long hardMemoryLimitOver;
     protected final long lowMemoryThresholdMB;
     protected final long mediumMemoryThresholdMb;
     protected final long mediumMemoryGracePeriodMs;
-
-    private class ProcessExitCallback implements ExitCodeCallback {
-        private final String _logPrefix;
-
-        public ProcessExitCallback(String logPrefix) {
-            _logPrefix = logPrefix;
-        }
-
-        @Override
-        public void call(int exitCode) {
-            LOG.info("{} exited with code: {}", _logPrefix, exitCode);
-            _exitedEarly = true;
-        }
-    }
+    protected volatile boolean _exitedEarly = false;
+    protected volatile long memoryLimitMB;
+    protected volatile long memoryLimitExceededStart;
 
     /**
      * Create a new BasicContainer.
-     * @param type the type of container being made.
-     * @param conf the supervisor config
-     * @param supervisorId the ID of the supervisor this is a part of.
-     * @param supervisorPort the thrift server port of the supervisor this is 
a part of.
-     * @param port the port the container is on.  Should be <= 0 if only a 
partial recovery
-     * @param assignment the assignment for this container. Should be null if 
only a partial recovery.
+     *
+     * @param type                     the type of container being made.
+     * @param conf                     the supervisor config
+     * @param supervisorId             the ID of the supervisor this is a part 
of.
+     * @param supervisorPort           the thrift server port of the 
supervisor this is a part of.
+     * @param port                     the port the container is on.  Should 
be <= 0 if only a partial recovery
+     * @param assignment               the assignment for this container. 
Should be null if only a partial recovery.
      * @param resourceIsolationManager used to isolate resources for a 
container can be null if no isolation is used.
-     * @param localState the local state of the supervisor.  May be null if 
partial recovery
-     * @param workerId the id of the worker to use.  Must not be null if doing 
a partial recovery.
+     * @param localState               the local state of the supervisor.  May 
be null if partial recovery
+     * @param workerId                 the id of the worker to use.  Must not 
be null if doing a partial recovery.
      */
     public BasicContainer(ContainerType type, Map<String, Object> conf, String 
supervisorId, int supervisorPort,
-            int port, LocalAssignment assignment, ResourceIsolationInterface 
resourceIsolationManager,
-            LocalState localState, String workerId) throws IOException {
+                          int port, LocalAssignment assignment, 
ResourceIsolationInterface resourceIsolationManager,
+                          LocalState localState, String workerId) throws 
IOException {
         this(type, conf, supervisorId, supervisorPort, port, assignment, 
resourceIsolationManager, localState,
-                workerId, null, null, null);
+             workerId, null, null, null);
     }
 
     /**
      * Create a new BasicContainer.
-     * @param type the type of container being made.
-     * @param conf the supervisor config
-     * @param supervisorId the ID of the supervisor this is a part of.
-     * @param supervisorPort the thrift server port of the supervisor this is 
a part of.
-     * @param port the port the container is on.  Should be <= 0 if only a 
partial recovery
-     * @param assignment the assignment for this container. Should be null if 
only a partial recovery.
+     *
+     * @param type                     the type of container being made.
+     * @param conf                     the supervisor config
+     * @param supervisorId             the ID of the supervisor this is a part 
of.
+     * @param supervisorPort           the thrift server port of the 
supervisor this is a part of.
+     * @param port                     the port the container is on.  Should 
be <= 0 if only a partial recovery
+     * @param assignment               the assignment for this container. 
Should be null if only a partial recovery.
      * @param resourceIsolationManager used to isolate resources for a 
container can be null if no isolation is used.
-     * @param localState the local state of the supervisor.  May be null if 
partial recovery
-     * @param workerId the id of the worker to use.  Must not be null if doing 
a partial recovery.
-     * @param ops file system operations (mostly for testing) if null a new 
one is made
-     * @param topoConf the config of the topology (mostly for testing) if null
-     * and not a partial recovery the real conf is read.
-     * @param profileCmd the command to use when profiling (used for testing)
-     * @throws IOException on any error
+     * @param localState               the local state of the supervisor.  May 
be null if partial recovery
+     * @param workerId                 the id of the worker to use.  Must not 
be null if doing a partial recovery.
+     * @param ops                      file system operations (mostly for 
testing) if null a new one is made
+     * @param topoConf                 the config of the topology (mostly for 
testing) if null and not a partial recovery the real conf is
+     *                                 read.
+     * @param profileCmd               the command to use when profiling (used 
for testing)
+     * @throws IOException                on any error
      * @throws ContainerRecoveryException if the Container could not be 
recovered.
      */
     BasicContainer(ContainerType type, Map<String, Object> conf, String 
supervisorId, int supervisorPort, int port,
-            LocalAssignment assignment, ResourceIsolationInterface 
resourceIsolationManager,
-            LocalState localState, String workerId, Map<String, Object> 
topoConf,
-            AdvancedFSOps ops, String profileCmd) throws IOException {
+                   LocalAssignment assignment, ResourceIsolationInterface 
resourceIsolationManager,
+                   LocalState localState, String workerId, Map<String, Object> 
topoConf,
+                   AdvancedFSOps ops, String profileCmd) throws IOException {
         super(type, conf, supervisorId, supervisorPort, port, assignment, 
resourceIsolationManager, workerId, topoConf, ops);
-        assert(localState != null);
+        assert (localState != null);
         _localState = localState;
 
         if (type.isRecovery() && !type.isOnlyKillable()) {
@@ -161,7 +147,7 @@ public class BasicContainer extends Container {
 
         if (profileCmd == null) {
             profileCmd = _stormHome + File.separator + "bin" + File.separator
-                    + conf.get(DaemonConfig.WORKER_PROFILER_COMMAND);
+                         + conf.get(DaemonConfig.WORKER_PROFILER_COMMAND);
         }
         _profileCmd = profileCmd;
 
@@ -181,14 +167,33 @@ public class BasicContainer extends Container {
         }
     }
 
+    private static void removeWorkersOn(Map<String, Integer> workerToPort, int 
_port) {
+        for (Iterator<Entry<String, Integer>> i = 
workerToPort.entrySet().iterator(); i.hasNext(); ) {
+            Entry<String, Integer> found = i.next();
+            if (_port == found.getValue().intValue()) {
+                LOG.warn("Deleting worker {} from state", found.getKey());
+                i.remove();
+            }
+        }
+    }
+
+    public static List<String> getDependencyLocationsFor(final Map<String, 
Object> conf, final String topologyId, final AdvancedFSOps ops,
+                                                         String stormRoot) 
throws IOException {
+        return TOPO_META_CACHE.get(conf, topologyId, ops, 
stormRoot).getDepLocs();
+    }
+
+    public static String getStormVersionFor(final Map<String, Object> conf, 
final String topologyId, final AdvancedFSOps ops,
+                                            String stormRoot) throws 
IOException {
+        return TOPO_META_CACHE.get(conf, topologyId, ops, 
stormRoot).getStormVersion();
+    }
+
     /**
-     * Create a new worker ID for this process and store in in this object and
-     * in the local state.  Never call this if a worker is currently up and 
running.
-     * We will lose track of the process.
+     * Create a new worker ID for this process and store in in this object and 
in the local state.  Never call this if a worker is currently
+     * up and running. We will lose track of the process.
      */
     protected void createNewWorkerId() {
         _type.assertFull();
-        assert(_workerId == null);
+        assert (_workerId == null);
         synchronized (_localState) {
             _workerId = Utils.uuid();
             Map<String, Integer> workerToPort = 
_localState.getApprovedWorkers();
@@ -202,16 +207,6 @@ public class BasicContainer extends Container {
         }
     }
 
-    private static void removeWorkersOn(Map<String, Integer> workerToPort, int 
_port) {
-        for (Iterator<Entry<String, Integer>> i = 
workerToPort.entrySet().iterator(); i.hasNext();) {
-            Entry<String, Integer> found = i.next();
-            if (_port == found.getValue().intValue()) {
-                LOG.warn("Deleting worker {} from state", found.getKey());
-                i.remove();
-            }
-        }
-    }
-
     @Override
     public void cleanUpForRestart() throws IOException {
         String origWorkerId = _workerId;
@@ -243,22 +238,17 @@ public class BasicContainer extends Container {
     /**
      * Run the given command for profiling.
      *
-     * @param command
-     *            the command to run
-     * @param env
-     *            the environment to run the command
-     * @param logPrefix
-     *            the prefix to include in the logs
-     * @param targetDir
-     *            the working directory to run the command in
+     * @param command   the command to run
+     * @param env       the environment to run the command
+     * @param logPrefix the prefix to include in the logs
+     * @param targetDir the working directory to run the command in
      * @return true if it ran successfully, else false
-     * @throws IOException
-     *             on any error
-     * @throws InterruptedException
-     *             if interrupted wile waiting for the process to exit.
+     *
+     * @throws IOException          on any error
+     * @throws InterruptedException if interrupted wile waiting for the 
process to exit.
      */
     protected boolean runProfilingCommand(List<String> command, Map<String, 
String> env, String logPrefix,
-            File targetDir) throws IOException, InterruptedException {
+                                          File targetDir) throws IOException, 
InterruptedException {
         _type.assertFull();
         Process p = ClientSupervisorUtils.launchProcess(command, env, 
logPrefix, null, targetDir);
         int ret = p.waitFor();
@@ -282,7 +272,7 @@ public class BasicContainer extends Container {
 
         ProfileAction profileAction = request.get_action();
         String logPrefix = "ProfilerAction process " + _topologyId + ":" + 
_port + " PROFILER_ACTION: " + profileAction
-                + " ";
+                           + " ";
 
         List<String> command = mkProfileCommand(profileAction, stop, 
workerPid, targetDir);
 
@@ -296,14 +286,15 @@ public class BasicContainer extends Container {
 
     /**
      * Get the command to run when doing profiling.
-     * @param action the profiling action to perform
-     * @param stop if this is meant to stop the profiling or start it
+     *
+     * @param action    the profiling action to perform
+     * @param stop      if this is meant to stop the profiling or start it
      * @param workerPid the PID of the process to profile
      * @param targetDir the current working directory of the worker process
      * @return the command to run for profiling.
      */
     private List<String> mkProfileCommand(ProfileAction action, boolean stop, 
String workerPid, String targetDir) {
-        switch(action) {
+        switch (action) {
             case JMAP_DUMP:
                 return jmapDumpCmd(workerPid, targetDir);
             case JSTACK_DUMP:
@@ -347,10 +338,11 @@ public class BasicContainer extends Container {
     }
 
     /**
-     * Compute the java.library.path that should be used for the worker.
-     * This helps it to load JNI libraries that are packaged in the uber jar.
+     * Compute the java.library.path that should be used for the worker. This 
helps it to load JNI libraries that are packaged in the uber
+     * jar.
+     *
      * @param stormRoot the root directory of the worker process
-     * @param conf the config for the supervisor.
+     * @param conf      the config for the supervisor.
      * @return the java.library.path/LD_LIBRARY_PATH to use so native 
libraries load correctly.
      */
     protected String javaLibraryPath(String stormRoot, Map<String, Object> 
conf) {
@@ -359,13 +351,13 @@ public class BasicContainer extends Container {
         String arch = System.getProperty("os.arch");
         String archResourceRoot = resourceRoot + File.separator + os + "-" + 
arch;
         String ret = CPJ.join(archResourceRoot, resourceRoot,
-                conf.get(DaemonConfig.JAVA_LIBRARY_PATH));
+                              conf.get(DaemonConfig.JAVA_LIBRARY_PATH));
         return ret;
     }
 
     /**
-     * Returns a path with a wildcard as the final element, so that the JVM 
will expand
-     * that to all JARs in the directory.
+     * Returns a path with a wildcard as the final element, so that the JVM 
will expand that to all JARs in the directory.
+     *
      * @param dir the directory to which a wildcard will be appended
      * @return the path with wildcard ("*") suffix
      */
@@ -376,7 +368,7 @@ public class BasicContainer extends Container {
     protected List<String> frameworkClasspath(SimpleVersion topoVersion) {
         File stormWorkerLibDir = new File(_stormHome, "lib-worker");
         String topoConfDir =
-                System.getenv("STORM_CONF_DIR") != null ?
+            System.getenv("STORM_CONF_DIR") != null ?
                 System.getenv("STORM_CONF_DIR") :
                 new File(_stormHome, "conf").getAbsolutePath();
         File stormExtlibDir = new File(_stormHome, "extlib");
@@ -401,7 +393,7 @@ public class BasicContainer extends Container {
             //Have not moved to a java worker yet
             defaultWorkerGuess = "org.apache.storm.daemon.worker";
         }
-        NavigableMap<SimpleVersion,String> mains = 
Utils.getConfiguredWorkerMainVersions(_conf);
+        NavigableMap<SimpleVersion, String> mains = 
Utils.getConfiguredWorkerMainVersions(_conf);
         return Utils.getCompatibleVersion(mains, topoVersion, "worker main 
class", defaultWorkerGuess);
     }
 
@@ -411,25 +403,26 @@ public class BasicContainer extends Container {
             //Prior to the org.apache change
             defaultGuess = "backtype.storm.LogWriter";
         }
-        NavigableMap<SimpleVersion,String> mains = 
Utils.getConfiguredWorkerLogWriterVersions(_conf);
+        NavigableMap<SimpleVersion, String> mains = 
Utils.getConfiguredWorkerLogWriterVersions(_conf);
         return Utils.getCompatibleVersion(mains, topoVersion, "worker log 
writer class", defaultGuess);
     }
 
     @SuppressWarnings("unchecked")
     private List<String> asStringList(Object o) {
         if (o instanceof String) {
-            return Arrays.asList((String)o);
+            return Arrays.asList((String) o);
         } else if (o instanceof List) {
-            return (List<String>)o;
+            return (List<String>) o;
         }
         return Collections.EMPTY_LIST;
     }
 
     /**
      * Compute the classpath for the worker process.
-     * @param stormJar the topology jar
+     *
+     * @param stormJar            the topology jar
      * @param dependencyLocations any dependencies from the topology
-     * @param topoVersion the version of the storm framework to use
+     * @param topoVersion         the version of the storm framework to use
      * @return the full classpath
      */
     protected String getWorkerClassPath(String stormJar, List<String> 
dependencyLocations, SimpleVersion topoVersion) {
@@ -469,7 +462,7 @@ public class BasicContainer extends Container {
             String string = substituteChildOptsInternal((String) value, 
memOnheap);
             if (StringUtils.isNotBlank(string)) {
                 String[] strings = string.split("\\s+");
-                for (String s: strings) {
+                for (String s : strings) {
                     if (StringUtils.isNotBlank(s)) {
                         rets.add(s);
                     }
@@ -491,24 +484,19 @@ public class BasicContainer extends Container {
     /**
      * Launch the worker process (non-blocking).
      *
-     * @param command
-     *            the command to run
-     * @param env
-     *            the environment to run the command
-     * @param processExitCallback
-     *            a callback for when the process exits
-     * @param logPrefix
-     *            the prefix to include in the logs
-     * @param targetDir
-     *            the working directory to run the command in
+     * @param command             the command to run
+     * @param env                 the environment to run the command
+     * @param processExitCallback a callback for when the process exits
+     * @param logPrefix           the prefix to include in the logs
+     * @param targetDir           the working directory to run the command in
      * @return true if it ran successfully, else false
-     * @throws IOException
-     *             on any error
+     *
+     * @throws IOException on any error
      */
     protected void launchWorkerProcess(List<String> command, Map<String, 
String> env, String logPrefix,
-            ExitCodeCallback processExitCallback, File targetDir) throws 
IOException {
+                                       ExitCodeCallback processExitCallback, 
File targetDir) throws IOException {
         if (_resourceIsolationManager != null) {
-          command = _resourceIsolationManager.getLaunchCommand(_workerId, 
command);
+            command = _resourceIsolationManager.getLaunchCommand(_workerId, 
command);
         }
         ClientSupervisorUtils.launchProcess(command, env, logPrefix, 
processExitCallback, targetDir);
     }
@@ -530,107 +518,12 @@ public class BasicContainer extends Container {
         return log4jConfigurationDir + File.separator + "worker.xml";
     }
 
-    private static class TopologyMetaData {
-        private boolean _dataCached = false;
-        private List<String> _depLocs = null;
-        private String _stormVersion = null;
-        private final Map<String, Object> _conf;
-        private final String _topologyId;
-        private final AdvancedFSOps _ops;
-        private final String _stormRoot;
-
-        public TopologyMetaData(final Map<String, Object> conf, final String 
topologyId, final AdvancedFSOps ops, final String stormRoot) {
-            _conf = conf;
-            _topologyId = topologyId;
-            _ops = ops;
-            _stormRoot = stormRoot;
-        }
-
-        public String toString() {
-            List<String> data;
-            String stormVersion;
-            synchronized(this) {
-                data = _depLocs;
-                stormVersion = _stormVersion;
-            }
-            return "META for " + _topologyId +" DEP_LOCS => " + data + " 
STORM_VERSION => " + stormVersion;
-        }
-
-        private synchronized void readData() throws IOException {
-            final StormTopology stormTopology = 
ConfigUtils.readSupervisorTopology(_conf, _topologyId, _ops);
-            final List<String> dependencyLocations = new ArrayList<>();
-            if (stormTopology.get_dependency_jars() != null) {
-                for (String dependency : stormTopology.get_dependency_jars()) {
-                    dependencyLocations.add(new File(_stormRoot, 
dependency).getAbsolutePath());
-                }
-            }
-
-            if (stormTopology.get_dependency_artifacts() != null) {
-                for (String dependency : 
stormTopology.get_dependency_artifacts()) {
-                    dependencyLocations.add(new File(_stormRoot, 
dependency).getAbsolutePath());
-                }
-            }
-            _depLocs = dependencyLocations;
-            _stormVersion = stormTopology.get_storm_version();
-            _dataCached = true;
-        }
-
-        public synchronized List<String> getDepLocs() throws IOException {
-            if (!_dataCached) {
-                readData();
-            }
-            return _depLocs;
-        }
-
-        public synchronized String getStormVersion() throws IOException {
-            if (!_dataCached) {
-                readData();
-            }
-            return _stormVersion;
-        }
-    }
-
-    static class TopoMetaLRUCache {
-        public final int _maxSize = 100; //We could make this configurable in 
the future...
-
-        @SuppressWarnings("serial")
-        private LinkedHashMap<String, TopologyMetaData> _cache = new 
LinkedHashMap<String, TopologyMetaData>() {
-            @Override
-            protected boolean 
removeEldestEntry(Map.Entry<String,TopologyMetaData> eldest) {
-                return (size() > _maxSize);
-            }
-        };
-
-        public synchronized TopologyMetaData get(final Map<String, Object> 
conf, final String topologyId, final AdvancedFSOps ops, String stormRoot) {
-            //Only go off of the topology id for now.
-            TopologyMetaData dl = _cache.get(topologyId);
-            if (dl == null) {
-                _cache.putIfAbsent(topologyId, new TopologyMetaData(conf, 
topologyId, ops, stormRoot));
-                dl = _cache.get(topologyId);
-            }
-            return dl;
-        }
-
-        public synchronized void clear() {
-            _cache.clear();
-        }
-    }
-
-    static final TopoMetaLRUCache TOPO_META_CACHE = new TopoMetaLRUCache();
-
-    public static List<String> getDependencyLocationsFor(final Map<String, 
Object> conf, final String topologyId, final AdvancedFSOps ops, String 
stormRoot) throws IOException {
-        return TOPO_META_CACHE.get(conf, topologyId, ops, 
stormRoot).getDepLocs();
-    }
-
-    public static String getStormVersionFor(final Map<String, Object> conf, 
final String topologyId, final AdvancedFSOps ops, String stormRoot) throws 
IOException {
-        return TOPO_META_CACHE.get(conf, topologyId, ops, 
stormRoot).getStormVersion();
-    }
-
     /**
-     * Get parameters for the class path of the worker process.  Also used by 
the
-     * log Writer.
+     * Get parameters for the class path of the worker process.  Also used by 
the log Writer.
+     *
      * @param stormRoot the root dist dir for the topology
      * @return the classpath for the topology as command line arguments.
+     *
      * @throws IOException on any error.
      */
     private List<String> getClassPathParams(final String stormRoot, final 
SimpleVersion topoVersion) throws IOException {
@@ -645,8 +538,9 @@ public class BasicContainer extends Container {
     }
 
     /**
-     * Get a set of java properties that are common to both the log writer and 
the worker processes.
-     * These are mostly system properties that are used by logging.
+     * Get a set of java properties that are common to both the log writer and 
the worker processes. These are mostly system properties that
+     * are used by logging.
+     *
      * @return a list of command line options
      */
     private List<String> getCommonParams() {
@@ -667,7 +561,7 @@ public class BasicContainer extends Container {
         
commonParams.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
         commonParams.add("-Dstorm.local.dir=" + 
_conf.get(Config.STORM_LOCAL_DIR));
         if (memoryLimitMB > 0) {
-            commonParams.add("-Dworker.memory_limit_mb="+ memoryLimitMB);
+            commonParams.add("-Dworker.memory_limit_mb=" + memoryLimitMB);
         }
         return commonParams;
     }
@@ -675,7 +569,7 @@ public class BasicContainer extends Container {
     private int getMemOnHeap(WorkerResources resources) {
         int memOnheap = 0;
         if (resources != null && resources.is_set_mem_on_heap() &&
-                resources.get_mem_on_heap() > 0) {
+            resources.get_mem_on_heap() > 0) {
             memOnheap = (int) Math.ceil(resources.get_mem_on_heap());
         } else {
             // set the default heap memory size for supervisor-test
@@ -705,21 +599,23 @@ public class BasicContainer extends Container {
 
     /**
      * Create the command to launch the worker process.
+     *
      * @param memOnheap the on heap memory for the worker
      * @param stormRoot the root dist dir for the topology
-     * @param jlp java library path for the topology
+     * @param jlp       java library path for the topology
      * @return the command to run
+     *
      * @throws IOException on any error.
      */
     private List<String> mkLaunchCommand(final int memOnheap, final String 
stormRoot,
-            final String jlp) throws IOException {
+                                         final String jlp) throws IOException {
         final String javaCmd = javaCmd("java");
         final String stormOptions = 
ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
         final String topoConfFile = 
ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
         final String workerTmpDir = ConfigUtils.workerTmpRoot(_conf, 
_workerId);
         String topoVersionString = getStormVersionFor(_conf, _topologyId, 
_ops, stormRoot);
         if (topoVersionString == null) {
-            topoVersionString = 
(String)_conf.getOrDefault(Config.SUPERVISOR_WORKER_DEFAULT_VERSION, 
VersionInfo.getVersion());
+            topoVersionString = (String) 
_conf.getOrDefault(Config.SUPERVISOR_WORKER_DEFAULT_VERSION, 
VersionInfo.getVersion());
         }
         final SimpleVersion topoVersion = new SimpleVersion(topoVersionString);
 
@@ -744,8 +640,8 @@ public class BasicContainer extends Container {
         
commandList.addAll(substituteChildopts(_conf.get(Config.WORKER_CHILDOPTS), 
memOnheap));
         
commandList.addAll(substituteChildopts(_topoConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS),
 memOnheap));
         commandList.addAll(substituteChildopts(Utils.OR(
-                _topoConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS),
-                _conf.get(Config.WORKER_GC_CHILDOPTS)), memOnheap));
+            _topoConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS),
+            _conf.get(Config.WORKER_GC_CHILDOPTS)), memOnheap));
         commandList.addAll(getWorkerProfilerChildOpts(memOnheap));
         commandList.add("-Djava.library.path=" + jlp);
         commandList.add("-Dstorm.conf.file=" + topoConfFile);
@@ -759,112 +655,112 @@ public class BasicContainer extends Container {
         // supervisor port should be only presented to worker which supports 
RPC heartbeat
         // unknown version should be treated as "current version", which 
supports RPC heartbeat
         if ((topoVersion.getMajor() == -1 && topoVersion.getMinor() == -1) ||
-                topoVersion.compareTo(MIN_VERSION_SUPPORT_RPC_HEARTBEAT) >= 0) 
{
+            topoVersion.compareTo(MIN_VERSION_SUPPORT_RPC_HEARTBEAT) >= 0) {
             commandList.add(String.valueOf(_supervisorPort));
         }
 
         commandList.add(String.valueOf(_port));
         commandList.add(_workerId);
-        
+
         return commandList;
     }
-    
-  @Override
-  public boolean isMemoryLimitViolated(LocalAssignment withUpdatedLimits) 
throws IOException {
-    if (super.isMemoryLimitViolated(withUpdatedLimits)) {
-      return true;
-    }
-    if (_resourceIsolationManager != null) {
-      // In the short term the goal is to not shoot anyone unless we really 
need to.
-      // The on heap should limit the memory usage in most cases to a 
reasonable amount
-      // If someone is using way more than they requested this is a bug and we 
should
-      // not allow it
-      long usageMb;
-      long memoryLimitMb;
-      long hardMemoryLimitOver;
-      String typeOfCheck;
-
-      if (withUpdatedLimits.is_set_total_node_shared()) {
-        //We need to do enforcement on a topology level, not a single worker 
level...
-        // Because in for cgroups each page in shared memory goes to the 
worker that touched it
-        // first. We may need to make this more plugable in the future and let 
the resource
-        // isolation manager tell us what to do
-        usageMb = getTotalTopologyMemoryUsed();
-        memoryLimitMb = getTotalTopologyMemoryReserved(withUpdatedLimits);
-        hardMemoryLimitOver = this.hardMemoryLimitOver * 
getTotalWorkersForThisTopology();
-        typeOfCheck = "TOPOLOGY " + _topologyId;
-      } else {
-        usageMb = getMemoryUsageMb();
-        memoryLimitMb = this.memoryLimitMB;
-        hardMemoryLimitOver = this.hardMemoryLimitOver;
-        typeOfCheck = "WORKER " + _workerId;
-      }
-      LOG.debug(
-          "Enforcing memory usage for {} with usage of {} out of {} total and 
a hard limit of {}",
-          typeOfCheck,
-          usageMb,
-          memoryLimitMb,
-          hardMemoryLimitOver);
-
-      if (usageMb <= 0) {
-        //Looks like usage might not be supported
-        return false;
-      }
-      long hardLimitMb = Math.max((long)(memoryLimitMb * 
hardMemoryLimitMultiplier), memoryLimitMb + hardMemoryLimitOver);
-      if (usageMb > hardLimitMb) {
-        LOG.warn(
-            "{} is using {} MB > adjusted hard limit {} MB", typeOfCheck, 
usageMb, hardLimitMb);
-        return true;
-      }
-      if (usageMb > memoryLimitMb) {
-        //For others using too much it is really a question of how much memory 
is free in the system
-        // to be use. If we cannot calculate it assume that it is bad
-        long systemFreeMemoryMb = 0;
-        try {
-          systemFreeMemoryMb = 
_resourceIsolationManager.getSystemFreeMemoryMb();
-        } catch (IOException e) {
-          LOG.warn("Error trying to calculate free memory on the system {}", 
e);
-        }
-        LOG.debug("SYSTEM MEMORY FREE {} MB", systemFreeMemoryMb);
-        //If the system is low on memory we cannot be kind and need to shoot 
something
-        if (systemFreeMemoryMb <= lowMemoryThresholdMB) {
-          LOG.warn(
-              "{} is using {} MB > memory limit {} MB and system is low on 
memory {} free",
-              typeOfCheck,
-              usageMb,
-              memoryLimitMb,
-              systemFreeMemoryMb);
-          return true;
-        }
-
-        //If the system still has some free memory give them a grace period to
-        // drop back down.
-        if (systemFreeMemoryMb < mediumMemoryThresholdMb) {
-          if (memoryLimitExceededStart < 0) {
-            memoryLimitExceededStart = Time.currentTimeMillis();
-          } else {
-            long timeInViolation = Time.currentTimeMillis() - 
memoryLimitExceededStart;
-            if (timeInViolation > mediumMemoryGracePeriodMs) {
-              LOG.warn(
-                  "{} is using {} MB > memory limit {} MB for {} seconds",
-                  typeOfCheck,
-                  usageMb,
-                  memoryLimitMb,
-                  timeInViolation / 1000);
-              return true;
+
+    @Override
+    public boolean isMemoryLimitViolated(LocalAssignment withUpdatedLimits) 
throws IOException {
+        if (super.isMemoryLimitViolated(withUpdatedLimits)) {
+            return true;
+        }
+        if (_resourceIsolationManager != null) {
+            // In the short term the goal is to not shoot anyone unless we 
really need to.
+            // The on heap should limit the memory usage in most cases to a 
reasonable amount
+            // If someone is using way more than they requested this is a bug 
and we should
+            // not allow it
+            long usageMb;
+            long memoryLimitMb;
+            long hardMemoryLimitOver;
+            String typeOfCheck;
+
+            if (withUpdatedLimits.is_set_total_node_shared()) {
+                //We need to do enforcement on a topology level, not a single 
worker level...
+                // Because in for cgroups each page in shared memory goes to 
the worker that touched it
+                // first. We may need to make this more plugable in the future 
and let the resource
+                // isolation manager tell us what to do
+                usageMb = getTotalTopologyMemoryUsed();
+                memoryLimitMb = 
getTotalTopologyMemoryReserved(withUpdatedLimits);
+                hardMemoryLimitOver = this.hardMemoryLimitOver * 
getTotalWorkersForThisTopology();
+                typeOfCheck = "TOPOLOGY " + _topologyId;
+            } else {
+                usageMb = getMemoryUsageMb();
+                memoryLimitMb = this.memoryLimitMB;
+                hardMemoryLimitOver = this.hardMemoryLimitOver;
+                typeOfCheck = "WORKER " + _workerId;
+            }
+            LOG.debug(
+                "Enforcing memory usage for {} with usage of {} out of {} 
total and a hard limit of {}",
+                typeOfCheck,
+                usageMb,
+                memoryLimitMb,
+                hardMemoryLimitOver);
+
+            if (usageMb <= 0) {
+                //Looks like usage might not be supported
+                return false;
+            }
+            long hardLimitMb = Math.max((long) (memoryLimitMb * 
hardMemoryLimitMultiplier), memoryLimitMb + hardMemoryLimitOver);
+            if (usageMb > hardLimitMb) {
+                LOG.warn(
+                    "{} is using {} MB > adjusted hard limit {} MB", 
typeOfCheck, usageMb, hardLimitMb);
+                return true;
+            }
+            if (usageMb > memoryLimitMb) {
+                //For others using too much it is really a question of how 
much memory is free in the system
+                // to be use. If we cannot calculate it assume that it is bad
+                long systemFreeMemoryMb = 0;
+                try {
+                    systemFreeMemoryMb = 
_resourceIsolationManager.getSystemFreeMemoryMb();
+                } catch (IOException e) {
+                    LOG.warn("Error trying to calculate free memory on the 
system {}", e);
+                }
+                LOG.debug("SYSTEM MEMORY FREE {} MB", systemFreeMemoryMb);
+                //If the system is low on memory we cannot be kind and need to 
shoot something
+                if (systemFreeMemoryMb <= lowMemoryThresholdMB) {
+                    LOG.warn(
+                        "{} is using {} MB > memory limit {} MB and system is 
low on memory {} free",
+                        typeOfCheck,
+                        usageMb,
+                        memoryLimitMb,
+                        systemFreeMemoryMb);
+                    return true;
+                }
+
+                //If the system still has some free memory give them a grace 
period to
+                // drop back down.
+                if (systemFreeMemoryMb < mediumMemoryThresholdMb) {
+                    if (memoryLimitExceededStart < 0) {
+                        memoryLimitExceededStart = Time.currentTimeMillis();
+                    } else {
+                        long timeInViolation = Time.currentTimeMillis() - 
memoryLimitExceededStart;
+                        if (timeInViolation > mediumMemoryGracePeriodMs) {
+                            LOG.warn(
+                                "{} is using {} MB > memory limit {} MB for {} 
seconds",
+                                typeOfCheck,
+                                usageMb,
+                                memoryLimitMb,
+                                timeInViolation / 1000);
+                            return true;
+                        }
+                    }
+                } else {
+                    //Otherwise don't bother them
+                    LOG.debug("{} is using {} MB > memory limit {} MB", 
typeOfCheck, usageMb, memoryLimitMb);
+                    memoryLimitExceededStart = -1;
+                }
+            } else {
+                memoryLimitExceededStart = -1;
             }
-          }
-        } else {
-          //Otherwise don't bother them
-          LOG.debug("{} is using {} MB > memory limit {} MB", typeOfCheck, 
usageMb, memoryLimitMb);
-          memoryLimitExceededStart = -1;
         }
-      } else {
-        memoryLimitExceededStart = -1;
-      }
+        return false;
     }
-    return false;
-  }
 
     @Override
     public long getMemoryUsageMb() {
@@ -883,35 +779,35 @@ public class BasicContainer extends Container {
         }
     }
 
-  @Override
-  public long getMemoryReservationMb() {
-    return memoryLimitMB;
-  }
+    @Override
+    public long getMemoryReservationMb() {
+        return memoryLimitMB;
+    }
 
-  private long calculateMemoryLimit(final WorkerResources resources, final int 
memOnHeap) {
-    long ret = memOnHeap;
-    if (_resourceIsolationManager != null) {
-      final int memoffheap = (int) Math.ceil(resources.get_mem_off_heap());
-      final int extraMem =
-          (int)
-              (Math.ceil(
-                  ObjectReader.getDouble(
-                      
_conf.get(DaemonConfig.STORM_SUPERVISOR_MEMORY_LIMIT_TOLERANCE_MARGIN_MB),
-                      0.0)));
-      ret += memoffheap + extraMem;
+    private long calculateMemoryLimit(final WorkerResources resources, final 
int memOnHeap) {
+        long ret = memOnHeap;
+        if (_resourceIsolationManager != null) {
+            final int memoffheap = (int) 
Math.ceil(resources.get_mem_off_heap());
+            final int extraMem =
+                (int)
+                    (Math.ceil(
+                        ObjectReader.getDouble(
+                            
_conf.get(DaemonConfig.STORM_SUPERVISOR_MEMORY_LIMIT_TOLERANCE_MARGIN_MB),
+                            0.0)));
+            ret += memoffheap + extraMem;
+        }
+        return ret;
     }
-    return ret;
-  }
-    
+
     @Override
     public void launch() throws IOException {
         _type.assertFull();
         LOG.info("Launching worker with assignment {} for this supervisor {} 
on port {} with id {}", _assignment,
-                _supervisorId, _port, _workerId);
+                 _supervisorId, _port, _workerId);
         String logPrefix = "Worker Process " + _workerId;
         ProcessExitCallback processExitCallback = new 
ProcessExitCallback(logPrefix);
         _exitedEarly = false;
-        
+
         final WorkerResources resources = _assignment.get_resources();
         final int memOnHeap = getMemOnHeap(resources);
         memoryLimitMB = calculateMemoryLimit(resources, memOnHeap);
@@ -929,7 +825,7 @@ public class BasicContainer extends Container {
         if (ld_library_path != null) {
             jlp = jlp + System.getProperty("path.separator") + ld_library_path;
         }
-        
+
         topEnvironment.put("LD_LIBRARY_PATH", jlp);
 
         if (_resourceIsolationManager != null) {
@@ -939,11 +835,112 @@ public class BasicContainer extends Container {
         }
 
         List<String> commandList = mkLaunchCommand(memOnHeap, stormRoot, jlp);
-        
+
         LOG.info("Launching worker with command: {}. ", 
ServerUtils.shellCmd(commandList));
 
         String workerDir = ConfigUtils.workerRoot(_conf, _workerId);
 
         launchWorkerProcess(commandList, topEnvironment, logPrefix, 
processExitCallback, new File(workerDir));
     }
+
+    private static class TopologyMetaData {
+        private final Map<String, Object> _conf;
+        private final String _topologyId;
+        private final AdvancedFSOps _ops;
+        private final String _stormRoot;
+        private boolean _dataCached = false;
+        private List<String> _depLocs = null;
+        private String _stormVersion = null;
+
+        public TopologyMetaData(final Map<String, Object> conf, final String 
topologyId, final AdvancedFSOps ops, final String stormRoot) {
+            _conf = conf;
+            _topologyId = topologyId;
+            _ops = ops;
+            _stormRoot = stormRoot;
+        }
+
+        public String toString() {
+            List<String> data;
+            String stormVersion;
+            synchronized (this) {
+                data = _depLocs;
+                stormVersion = _stormVersion;
+            }
+            return "META for " + _topologyId + " DEP_LOCS => " + data + " 
STORM_VERSION => " + stormVersion;
+        }
+
+        private synchronized void readData() throws IOException {
+            final StormTopology stormTopology = 
ConfigUtils.readSupervisorTopology(_conf, _topologyId, _ops);
+            final List<String> dependencyLocations = new ArrayList<>();
+            if (stormTopology.get_dependency_jars() != null) {
+                for (String dependency : stormTopology.get_dependency_jars()) {
+                    dependencyLocations.add(new File(_stormRoot, 
dependency).getAbsolutePath());
+                }
+            }
+
+            if (stormTopology.get_dependency_artifacts() != null) {
+                for (String dependency : 
stormTopology.get_dependency_artifacts()) {
+                    dependencyLocations.add(new File(_stormRoot, 
dependency).getAbsolutePath());
+                }
+            }
+            _depLocs = dependencyLocations;
+            _stormVersion = stormTopology.get_storm_version();
+            _dataCached = true;
+        }
+
+        public synchronized List<String> getDepLocs() throws IOException {
+            if (!_dataCached) {
+                readData();
+            }
+            return _depLocs;
+        }
+
+        public synchronized String getStormVersion() throws IOException {
+            if (!_dataCached) {
+                readData();
+            }
+            return _stormVersion;
+        }
+    }
+
+    static class TopoMetaLRUCache {
+        public final int _maxSize = 100; //We could make this configurable in 
the future...
+
+        @SuppressWarnings("serial")
+        private LinkedHashMap<String, TopologyMetaData> _cache = new 
LinkedHashMap<String, TopologyMetaData>() {
+            @Override
+            protected boolean removeEldestEntry(Map.Entry<String, 
TopologyMetaData> eldest) {
+                return (size() > _maxSize);
+            }
+        };
+
+        public synchronized TopologyMetaData get(final Map<String, Object> 
conf, final String topologyId, final AdvancedFSOps ops,
+                                                 String stormRoot) {
+            //Only go off of the topology id for now.
+            TopologyMetaData dl = _cache.get(topologyId);
+            if (dl == null) {
+                _cache.putIfAbsent(topologyId, new TopologyMetaData(conf, 
topologyId, ops, stormRoot));
+                dl = _cache.get(topologyId);
+            }
+            return dl;
+        }
+
+        public synchronized void clear() {
+            _cache.clear();
+        }
+    }
+
+    private class ProcessExitCallback implements ExitCodeCallback {
+        private final String _logPrefix;
+
+        public ProcessExitCallback(String logPrefix) {
+            _logPrefix = logPrefix;
+        }
+
+        @Override
+        public void call(int exitCode) {
+            LOG.info("{} exited with code: {}", _logPrefix, exitCode);
+            _exitedEarly = true;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java
index e2043e8..715fd61 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java
@@ -1,25 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.daemon.supervisor;
 
 import java.io.IOException;
 import java.util.Map;
-
 import org.apache.storm.container.ResourceIsolationInterface;
 import org.apache.storm.daemon.supervisor.Container.ContainerType;
 import org.apache.storm.generated.LocalAssignment;
@@ -29,13 +23,13 @@ import org.apache.storm.utils.LocalState;
  * Launch containers with no security using standard java commands
  */
 public class BasicContainerLauncher extends ContainerLauncher {
+    protected final ResourceIsolationInterface _resourceIsolationManager;
     private final Map<String, Object> _conf;
     private final String _supervisorId;
     private final int _supervisorPort;
-    protected final ResourceIsolationInterface _resourceIsolationManager;
-    
+
     public BasicContainerLauncher(Map<String, Object> conf, String 
supervisorId, int supervisorPort,
-        ResourceIsolationInterface resourceIsolationManager) throws 
IOException {
+                                  ResourceIsolationInterface 
resourceIsolationManager) throws IOException {
         _conf = conf;
         _supervisorId = supervisorId;
         _supervisorPort = supervisorPort;
@@ -45,7 +39,7 @@ public class BasicContainerLauncher extends ContainerLauncher 
{
     @Override
     public Container launchContainer(int port, LocalAssignment assignment, 
LocalState state) throws IOException {
         Container container = new BasicContainer(ContainerType.LAUNCH, _conf, 
_supervisorId, _supervisorPort, port,
-                assignment, _resourceIsolationManager, state, null);
+                                                 assignment, 
_resourceIsolationManager, state, null);
         container.setup();
         container.launch();
         return container;
@@ -54,12 +48,12 @@ public class BasicContainerLauncher extends 
ContainerLauncher {
     @Override
     public Container recoverContainer(int port, LocalAssignment assignment, 
LocalState state) throws IOException {
         return new BasicContainer(ContainerType.RECOVER_FULL, _conf, 
_supervisorId, _supervisorPort, port, assignment,
-                _resourceIsolationManager, state, null);
+                                  _resourceIsolationManager, state, null);
     }
 
     @Override
     public Killable recoverContainer(String workerId, LocalState localState) 
throws IOException {
         return new BasicContainer(ContainerType.RECOVER_PARTIAL, _conf, 
_supervisorId, _supervisorPort, -1, null,
-                    _resourceIsolationManager, localState, workerId);
+                                  _resourceIsolationManager, localState, 
workerId);
     }
 }

Reply via email to