http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
index 10d81e2..b4d6442 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
@@ -8,7 +8,7 @@
  * with the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -33,26 +33,23 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.container.ResourceIsolationInterface;
 import org.apache.storm.generated.LSWorkerHeartbeat;
 import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.generated.ProfileRequest;
-import org.apache.storm.generated.WorkerMetricPoint;
 import org.apache.storm.generated.WorkerMetricList;
+import org.apache.storm.generated.WorkerMetricPoint;
 import org.apache.storm.generated.WorkerMetrics;
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.metricstore.MetricException;
 import org.apache.storm.metricstore.WorkerMetricsProcessor;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.LocalState;
-import org.apache.storm.utils.NimbusClient;
 import org.apache.storm.utils.ServerConfigUtils;
 import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.Utils;
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
@@ -66,50 +63,6 @@ public abstract class Container implements Killable {
     private static final String SYSTEM_COMPONENT_ID = "System";
     private static final String INVALID_EXECUTOR_ID = "-1";
     private static final String INVALID_STREAM_ID = "None";
-
-    public static enum ContainerType {
-        LAUNCH(false, false),
-        RECOVER_FULL(true, false),
-        RECOVER_PARTIAL(true, true);
-
-        private final boolean _recovery;
-        private final boolean _onlyKillable;
-        
-        ContainerType(boolean recovery, boolean onlyKillable) {
-            _recovery = recovery;
-            _onlyKillable = onlyKillable;
-        }
-        
-        public boolean isRecovery() {
-            return _recovery;
-        }
-        
-        public void assertFull() {
-            if (_onlyKillable) {
-                throw new IllegalStateException("Container is only Killable.");
-            }
-        }
-        
-        public boolean isOnlyKillable() {
-            return _onlyKillable;
-        }
-    }
-
-    private static class TopoAndMemory {
-        public final String topoId;
-        public final long memory;
-
-        public TopoAndMemory(String id, long mem) {
-            topoId = id;
-            memory = mem;
-        }
-
-        @Override
-        public String toString() {
-            return "{TOPO: " + topoId + " at " + memory + " MB}";
-        }
-    }
-
     private static final ConcurrentHashMap<Integer, TopoAndMemory> _usedMemory 
=
         new ConcurrentHashMap<>();
     private static final ConcurrentHashMap<Integer, TopoAndMemory> 
_reservedMemory =
@@ -139,10 +92,9 @@ public abstract class Container implements Killable {
                 return ret;
             });
     }
-    
+
     protected final Map<String, Object> _conf;
     protected final Map<String, Object> _topoConf; //Not set if RECOVER_PARTIAL
-    protected String _workerId; 
     protected final String _topologyId; //Not set if RECOVER_PARTIAL
     protected final String _supervisorId;
     protected final int _supervisorPort;
@@ -150,38 +102,39 @@ public abstract class Container implements Killable {
     protected final LocalAssignment _assignment; //Not set if RECOVER_PARTIAL
     protected final AdvancedFSOps _ops;
     protected final ResourceIsolationInterface _resourceIsolationManager;
-    protected ContainerType _type;
     protected final boolean _symlinksDisabled;
+    protected String _workerId;
+    protected ContainerType _type;
     private long lastMetricProcessTime = 0L;
-
     /**
      * Create a new Container.
-     * @param type the type of container being made.
-     * @param conf the supervisor config
-     * @param supervisorId the ID of the supervisor this is a part of.
-     * @param supervisorPort the thrift server port of the supervisor this is 
a part of.
-     * @param port the port the container is on.  Should be <= 0 if only a 
partial recovery
-     * @param assignment the assignment for this container. Should be null if 
only a partial recovery.
+     *
+     * @param type                     the type of container being made.
+     * @param conf                     the supervisor config
+     * @param supervisorId             the ID of the supervisor this is a part 
of.
+     * @param supervisorPort           the thrift server port of the 
supervisor this is a part of.
+     * @param port                     the port the container is on.  Should 
be <= 0 if only a partial recovery
+     * @param assignment               the assignment for this container. 
Should be null if only a partial recovery.
      * @param resourceIsolationManager used to isolate resources for a 
container can be null if no isolation is used.
-     * @param workerId the id of the worker to use.  Must not be null if doing 
a partial recovery.
-     * @param topoConf the config of the topology (mostly for testing) if null 
-     * and not a partial recovery the real conf is read.
-     * @param ops file system operations (mostly for testing) if null a new 
one is made
+     * @param workerId                 the id of the worker to use.  Must not 
be null if doing a partial recovery.
+     * @param topoConf                 the config of the topology (mostly for 
testing) if null and not a partial recovery the real conf is
+     *                                 read.
+     * @param ops                      file system operations (mostly for 
testing) if null a new one is made
      * @throws IOException on any error.
      */
     protected Container(ContainerType type, Map<String, Object> conf, String 
supervisorId, int supervisorPort,
-            int port, LocalAssignment assignment, ResourceIsolationInterface 
resourceIsolationManager,
-            String workerId, Map<String, Object> topoConf,  AdvancedFSOps ops) 
throws IOException {
-        assert(type != null);
-        assert(conf != null);
-        assert(supervisorId != null);
-        
-        _symlinksDisabled = 
(boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
-        
+                        int port, LocalAssignment assignment, 
ResourceIsolationInterface resourceIsolationManager,
+                        String workerId, Map<String, Object> topoConf, 
AdvancedFSOps ops) throws IOException {
+        assert (type != null);
+        assert (conf != null);
+        assert (supervisorId != null);
+
+        _symlinksDisabled = (boolean) 
conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
+
         if (ops == null) {
             ops = AdvancedFSOps.make(conf);
         }
-        
+
         _workerId = workerId;
         _type = type;
         _port = port;
@@ -191,20 +144,22 @@ public abstract class Container implements Killable {
         _supervisorPort = supervisorPort;
         _resourceIsolationManager = resourceIsolationManager;
         _assignment = assignment;
-        
+
         if (_type.isOnlyKillable()) {
-            assert(_assignment == null);
-            assert(_port <= 0);
-            assert(_workerId != null);
+            assert (_assignment == null);
+            assert (_port <= 0);
+            assert (_workerId != null);
             _topologyId = null;
             _topoConf = null;
         } else {
-            assert(assignment != null);
-            assert(port > 0);
+            assert (assignment != null);
+            assert (port > 0);
             _topologyId = assignment.get_topology_id();
             if (!_ops.doRequiredTopoFilesExist(_conf, _topologyId)) {
-                LOG.info("Missing topology storm code, so can't launch  worker 
with assignment {} for this supervisor {} on port {} with id {}", _assignment,
-                        _supervisorId, _port, _workerId);
+                LOG.info(
+                    "Missing topology storm code, so can't launch  worker with 
assignment {} for this supervisor {} on port {} with id {}",
+                    _assignment,
+                    _supervisorId, _port, _workerId);
                 throw new ContainerRecoveryException("Missing required 
topology files...");
             }
             if (topoConf == null) {
@@ -215,35 +170,37 @@ public abstract class Container implements Killable {
             }
         }
     }
-    
+
     @Override
     public String toString() {
         return "topo:" + _topologyId + " worker:" + _workerId;
     }
-    
+
     protected Map<String, Object> readTopoConf() throws IOException {
-        assert(_topologyId != null);
+        assert (_topologyId != null);
         return ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
     }
-    
+
     /**
      * Kill a given process.
+     *
      * @param pid the id of the process to kill
      * @throws IOException
      */
     protected void kill(long pid) throws IOException {
         ServerUtils.killProcessWithSigTerm(String.valueOf(pid));
     }
-    
+
     /**
      * Kill a given process.
+     *
      * @param pid the id of the process to kill
      * @throws IOException
      */
     protected void forceKill(long pid) throws IOException {
         ServerUtils.forceKillProcess(String.valueOf(pid));
     }
-    
+
     @Override
     public void kill() throws IOException {
         LOG.info("Killing {}:{}", _supervisorId, _workerId);
@@ -253,20 +210,22 @@ public abstract class Container implements Killable {
             kill(pid);
         }
     }
-    
+
     @Override
     public void forceKill() throws IOException {
         LOG.info("Force Killing {}:{}", _supervisorId, _workerId);
         Set<Long> pids = getAllPids();
-        
+
         for (Long pid : pids) {
             forceKill(pid);
         }
     }
-    
+
     /**
      * Read the Heartbeat for the current container.
+     *
      * @return the Heartbeat
+     *
      * @throws IOException on any error
      */
     public LSWorkerHeartbeat readHeartbeat() throws IOException {
@@ -278,9 +237,11 @@ public abstract class Container implements Killable {
 
     /**
      * Is a process alive and running?.
-     * @param pid the PID of the running process
+     *
+     * @param pid  the PID of the running process
      * @param user the user that is expected to own that process
      * @return true if it is, else false
+     *
      * @throws IOException on any error
      */
     protected boolean isProcessAlive(long pid, String user) throws IOException 
{
@@ -289,7 +250,7 @@ public abstract class Container implements Killable {
         }
         return isPosixProcessAlive(pid, user);
     }
-    
+
     private boolean isWindowsProcessAlive(long pid, String user) throws 
IOException {
         boolean ret = false;
         ProcessBuilder pb = new ProcessBuilder("tasklist", "/fo", "list", 
"/fi", "pid eq " + pid, "/v");
@@ -302,16 +263,17 @@ public abstract class Container implements Killable {
                     //This line contains the user name for the pid we're 
looking up
                     //Example line: "User Name:    exampleDomain\exampleUser"
                     List<String> userNameLineSplitOnWhitespace = 
Arrays.asList(read.split(":"));
-                    if(userNameLineSplitOnWhitespace.size() == 2){
+                    if (userNameLineSplitOnWhitespace.size() == 2) {
                         List<String> userAndMaybeDomain = 
Arrays.asList(userNameLineSplitOnWhitespace.get(1).trim().split("\\\\"));
                         String processUser = userAndMaybeDomain.size() == 2 ? 
userAndMaybeDomain.get(1) : userAndMaybeDomain.get(0);
-                        if(user.equals(processUser)){
+                        if (user.equals(processUser)) {
                             ret = true;
                         } else {
                             LOG.info("Found {} running as {}, but expected it 
to be {}", pid, processUser, user);
                         }
                     } else {
-                        LOG.error("Received unexpected output from tasklist 
command. Expected one colon in user name line. Line was {}", read);
+                        LOG.error("Received unexpected output from tasklist 
command. Expected one colon in user name line. Line was {}",
+                                  read);
                     }
                     break;
                 }
@@ -319,7 +281,7 @@ public abstract class Container implements Killable {
         }
         return ret;
     }
-    
+
     private boolean isPosixProcessAlive(long pid, String user) throws 
IOException {
         boolean ret = false;
         ProcessBuilder pb = new ProcessBuilder("ps", "-o", "user", "-p", 
String.valueOf(pid));
@@ -327,7 +289,7 @@ public abstract class Container implements Killable {
         Process p = pb.start();
         try (BufferedReader in = new BufferedReader(new 
InputStreamReader(p.getInputStream()))) {
             String first = in.readLine();
-            assert("USER".equals(first));
+            assert ("USER".equals(first));
             String processUser;
             while ((processUser = in.readLine()) != null) {
                 if (user.equals(processUser)) {
@@ -340,14 +302,14 @@ public abstract class Container implements Killable {
         }
         return ret;
     }
-    
+
     @Override
     public boolean areAllProcessesDead() throws IOException {
         Set<Long> pids = getAllPids();
         String user = getWorkerUser();
-        
+
         boolean allDead = true;
-        for (Long pid: pids) {
+        for (Long pid : pids) {
             if (!isProcessAlive(pid, user)) {
                 LOG.debug("{}: PID {} is dead", _workerId, pid);
             } else {
@@ -364,42 +326,43 @@ public abstract class Container implements Killable {
         _reservedMemory.remove(_port);
         cleanUpForRestart();
     }
-    
+
     /**
-     * Setup the container to run.  By default this creates the needed 
directories/links in the
-     * local file system
-     * PREREQUISITE: All needed blobs and topology, jars/configs have been 
downloaded and
-     * placed in the appropriate locations
+     * Setup the container to run.  By default this creates the needed 
directories/links in the local file system PREREQUISITE: All needed
+     * blobs and topology, jars/configs have been downloaded and placed in the 
appropriate locations
+     *
      * @throws IOException on any error
      */
     protected void setup() throws IOException {
         _type.assertFull();
         if (!_ops.doRequiredTopoFilesExist(_conf, _topologyId)) {
-            LOG.info("Missing topology storm code, so can't launch  worker 
with assignment {} for this supervisor {} on port {} with id {}", _assignment,
-                    _supervisorId, _port, _workerId);
+            LOG.info("Missing topology storm code, so can't launch  worker 
with assignment {} for this supervisor {} on port {} with id {}",
+                     _assignment,
+                     _supervisorId, _port, _workerId);
             throw new IllegalStateException("Not all needed files are 
here!!!!");
-        } 
+        }
         LOG.info("Setting up {}:{}", _supervisorId, _workerId);
 
         _ops.forceMkdir(new File(ConfigUtils.workerPidsRoot(_conf, 
_workerId)));
         _ops.forceMkdir(new File(ConfigUtils.workerTmpRoot(_conf, _workerId)));
         _ops.forceMkdir(new File(ConfigUtils.workerHeartbeatsRoot(_conf, 
_workerId)));
-        
+
         File workerArtifacts = new File(ConfigUtils.workerArtifactsRoot(_conf, 
_topologyId, _port));
         if (!_ops.fileExists(workerArtifacts)) {
             _ops.forceMkdir(workerArtifacts);
             _ops.setupWorkerArtifactsDir(_assignment.get_owner(), 
workerArtifacts);
         }
-    
+
         String user = getWorkerUser();
         writeLogMetadata(user);
         saveWorkerUser(user);
         createArtifactsLink();
         createBlobstoreLinks();
     }
-    
+
     /**
      * Write out the file used by the log viewer to allow/reject log access.
+     *
      * @param user the user this is going to run as
      * @throws IOException on any error
      */
@@ -413,7 +376,7 @@ public abstract class Container implements Killable {
         Set<String> logsGroups = new HashSet<>();
         if (_topoConf.get(DaemonConfig.LOGS_GROUPS) != null) {
             List<String> groups = (List<String>) 
_topoConf.get(DaemonConfig.LOGS_GROUPS);
-            for (String group : groups){
+            for (String group : groups) {
                 logsGroups.add(group);
             }
         }
@@ -426,13 +389,13 @@ public abstract class Container implements Killable {
         Set<String> logsUsers = new HashSet<>();
         if (_topoConf.get(DaemonConfig.LOGS_USERS) != null) {
             List<String> logUsers = (List<String>) 
_topoConf.get(DaemonConfig.LOGS_USERS);
-            for (String logUser : logUsers){
+            for (String logUser : logUsers) {
                 logsUsers.add(logUser);
             }
         }
         if (_topoConf.get(Config.TOPOLOGY_USERS) != null) {
             List<String> topUsers = (List<String>) 
_topoConf.get(Config.TOPOLOGY_USERS);
-            for (String logUser : topUsers){
+            for (String logUser : topUsers) {
                 logsUsers.add(logUser);
             }
         }
@@ -445,9 +408,10 @@ public abstract class Container implements Killable {
             yaml.dump(data, writer);
         }
     }
-    
+
     /**
      * Create symlink from the containers directory/artifacts to the artifacts 
directory.
+     *
      * @throws IOException on any error
      */
     protected void createArtifactsLink() throws IOException {
@@ -461,17 +425,17 @@ public abstract class Container implements Killable {
             }
         }
     }
-    
+
     /**
-     * Create symlinks for each of the blobs from the container's directory to
-     * corresponding links in the storm dist directory.
+     * Create symlinks for each of the blobs from the container's directory to 
corresponding links in the storm dist directory.
+     *
      * @throws IOException on any error.
      */
     protected void createBlobstoreLinks() throws IOException {
         _type.assertFull();
         String stormRoot = ConfigUtils.supervisorStormDistRoot(_conf, 
_topologyId);
         String workerRoot = ConfigUtils.workerRoot(_conf, _workerId);
-        
+
         @SuppressWarnings("unchecked")
         Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) _topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
         List<String> blobFileNames = new ArrayList<>();
@@ -496,41 +460,44 @@ public abstract class Container implements Killable {
         resourceFileNames.addAll(blobFileNames);
 
         if (!_symlinksDisabled) {
-            LOG.info("Creating symlinks for worker-id: {} storm-id: {} for 
files({}): {}", _workerId, _topologyId, resourceFileNames.size(), 
resourceFileNames);
+            LOG.info("Creating symlinks for worker-id: {} storm-id: {} for 
files({}): {}", _workerId, _topologyId, resourceFileNames.size(),
+                     resourceFileNames);
             if (targetResourcesDir.exists()) {
-                _ops.createSymlink(new File(workerRoot, 
ServerConfigUtils.RESOURCES_SUBDIR),  targetResourcesDir );
+                _ops.createSymlink(new File(workerRoot, 
ServerConfigUtils.RESOURCES_SUBDIR), targetResourcesDir);
             } else {
-                LOG.info("Topology jar for worker-id: {} storm-id: {} does not 
contain re sources directory {}." , _workerId, _topologyId, 
targetResourcesDir.toString() );
+                LOG.info("Topology jar for worker-id: {} storm-id: {} does not 
contain re sources directory {}.", _workerId, _topologyId,
+                         targetResourcesDir.toString());
             }
             for (String fileName : blobFileNames) {
                 _ops.createSymlink(new File(workerRoot, fileName),
-                        new File(stormRoot, fileName));
+                                   new File(stormRoot, fileName));
             }
         } else if (blobFileNames.size() > 0) {
             LOG.warn("Symlinks are disabled, no symlinks created for blobs 
{}", blobFileNames);
         }
     }
-    
+
     /**
      * @return all of the pids that are a part of this container.
      */
     protected Set<Long> getAllPids() throws IOException {
         Set<Long> ret = new HashSet<>();
-        for (String listing: 
ConfigUtils.readDirContents(ConfigUtils.workerPidsRoot(_conf, _workerId))) {
+        for (String listing : 
ConfigUtils.readDirContents(ConfigUtils.workerPidsRoot(_conf, _workerId))) {
             ret.add(Long.valueOf(listing));
         }
-        
+
         if (_resourceIsolationManager != null) {
             Set<Long> morePids = 
_resourceIsolationManager.getRunningPids(_workerId);
-            assert(morePids != null);
+            assert (morePids != null);
             ret.addAll(morePids);
         }
-        
+
         return ret;
     }
-    
-    /** 
+
+    /**
      * @return the user that some operations should be done as.
+     *
      * @throws IOException on any error
      */
     protected String getWorkerUser() throws IOException {
@@ -552,40 +519,39 @@ public abstract class Container implements Killable {
             throw new IllegalStateException("Could not recover the user for " 
+ _workerId);
         }
     }
-    
+
     protected void saveWorkerUser(String user) throws IOException {
         _type.assertFull();
         LOG.info("SET worker-user {} {}", _workerId, user);
         _ops.dump(new File(ConfigUtils.workerUserFile(_conf, _workerId)), 
user);
     }
-    
+
     protected void deleteSavedWorkerUser() throws IOException {
         LOG.info("REMOVE worker-user {}", _workerId);
         _ops.deleteIfExists(new File(ConfigUtils.workerUserFile(_conf, 
_workerId)));
     }
-    
+
     /**
-     * Clean up the container partly preparing for restart.
-     * By default delete all of the temp directories we are going
-     * to get a new worker_id anyways.
-     * POST CONDITION: the workerId will be set to null
+     * Clean up the container partly preparing for restart. By default delete 
all of the temp directories we are going to get a new
+     * worker_id anyways. POST CONDITION: the workerId will be set to null
+     *
      * @throws IOException on any error
      */
     public void cleanUpForRestart() throws IOException {
         LOG.info("Cleaning up {}:{}", _supervisorId, _workerId);
         Set<Long> pids = getAllPids();
         String user = getWorkerUser();
-        
+
         for (Long pid : pids) {
             File path = new File(ConfigUtils.workerPidPath(_conf, _workerId, 
pid));
             _ops.deleteIfExists(path, user, _workerId);
         }
-        
+
         //clean up for resource isolation if enabled
         if (_resourceIsolationManager != null) {
             _resourceIsolationManager.releaseResourcesForWorker(_workerId);
         }
-        
+
         //Always make sure to clean up everything else before worker directory
         //is removed since that is what is going to trigger the retry for 
cleanup
         _ops.deleteIfExists(new File(ConfigUtils.workerHeartbeatsRoot(_conf, 
_workerId)), user, _workerId);
@@ -597,8 +563,9 @@ public abstract class Container implements Killable {
     }
 
     /**
-     * Check if the container is over its memory limit AND needs to be killed. 
This does not necessarily mean
-     * that it just went over the limit.
+     * Check if the container is over its memory limit AND needs to be killed. 
This does not necessarily mean that it just went over the
+     * limit.
+     *
      * @throws IOException on any error
      */
     public boolean isMemoryLimitViolated(LocalAssignment withUpdatedLimits) 
throws IOException {
@@ -674,16 +641,14 @@ public abstract class Container implements Killable {
     }
 
     /**
-     * Launch the process for the first time.
-     * PREREQUISITE: setup has run and passed
+     * Launch the process for the first time. PREREQUISITE: setup has run and 
passed
      *
      * @throws IOException on any error
      */
     public abstract void launch() throws IOException;
 
     /**
-     * Restart the processes in this container.
-     * PREREQUISITE: cleanUpForRestart has run and passed
+     * Restart the processes in this container. PREREQUISITE: 
cleanUpForRestart has run and passed
      *
      * @throws IOException on any error
      */
@@ -698,9 +663,10 @@ public abstract class Container implements Killable {
      * Run a profiling request.
      *
      * @param request the request to run
-     * @param stop is this a stop request?
+     * @param stop    is this a stop request?
      * @return true if it succeeded, else false
-     * @throws IOException on any error
+     *
+     * @throws IOException          on any error
      * @throws InterruptedException if running the command is interrupted.
      */
     public abstract boolean runProfiling(ProfileRequest request, boolean stop) 
throws IOException, InterruptedException;
@@ -731,7 +697,7 @@ public abstract class Container implements Killable {
                 long timestamp = System.currentTimeMillis();
                 double value = _usedMemory.get(_port).memory;
                 WorkerMetricPoint workerMetric = new 
WorkerMetricPoint(MEMORY_USED_METRIC, timestamp, value, SYSTEM_COMPONENT_ID,
-                    INVALID_EXECUTOR_ID, INVALID_STREAM_ID);
+                                                                       
INVALID_EXECUTOR_ID, INVALID_STREAM_ID);
 
                 WorkerMetricList metricList = new WorkerMetricList();
                 metricList.add_to_metrics(workerMetric);
@@ -751,4 +717,47 @@ public abstract class Container implements Killable {
             this.lastMetricProcessTime = System.currentTimeMillis();
         }
     }
+
+    public static enum ContainerType {
+        LAUNCH(false, false),
+        RECOVER_FULL(true, false),
+        RECOVER_PARTIAL(true, true);
+
+        private final boolean _recovery;
+        private final boolean _onlyKillable;
+
+        ContainerType(boolean recovery, boolean onlyKillable) {
+            _recovery = recovery;
+            _onlyKillable = onlyKillable;
+        }
+
+        public boolean isRecovery() {
+            return _recovery;
+        }
+
+        public void assertFull() {
+            if (_onlyKillable) {
+                throw new IllegalStateException("Container is only Killable.");
+            }
+        }
+
+        public boolean isOnlyKillable() {
+            return _onlyKillable;
+        }
+    }
+
+    private static class TopoAndMemory {
+        public final String topoId;
+        public final long memory;
+
+        public TopoAndMemory(String id, long mem) {
+            topoId = id;
+            memory = mem;
+        }
+
+        @Override
+        public String toString() {
+            return "{TOPO: " + topoId + " at " + memory + " MB}";
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerLauncher.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerLauncher.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerLauncher.java
index 527b321..2f32e38 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerLauncher.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerLauncher.java
@@ -1,33 +1,27 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.daemon.supervisor;
 
 import java.io.IOException;
 import java.util.Map;
-
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.container.ResourceIsolationInterface;
 import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.messaging.IContext;
 import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,9 +31,13 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class ContainerLauncher {
     private static final Logger LOG = 
LoggerFactory.getLogger(ContainerLauncher.class);
-    
+
+    protected ContainerLauncher() {
+        //Empty
+    }
+
     /**
-     * Factory to create the right container launcher 
+     * Factory to create the right container launcher
      * for the config and the environment.
      * @param conf the config
      * @param supervisorId the ID of the supervisor
@@ -49,16 +47,17 @@ public abstract class ContainerLauncher {
      * @throws IOException on any error
      */
     public static ContainerLauncher make(Map<String, Object> conf, String 
supervisorId, int supervisorPort,
-        IContext sharedContext) throws IOException {
+                                         IContext sharedContext) throws 
IOException {
         if (ConfigUtils.isLocalMode(conf)) {
             return new LocalContainerLauncher(conf, supervisorId, 
supervisorPort, sharedContext);
         }
-        
+
         ResourceIsolationInterface resourceIsolationManager = null;
         if 
(ObjectReader.getBoolean(conf.get(DaemonConfig.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE),
 false)) {
             resourceIsolationManager = ReflectionUtils.newInstance((String) 
conf.get(DaemonConfig.STORM_RESOURCE_ISOLATION_PLUGIN));
             resourceIsolationManager.prepare(conf);
-            LOG.info("Using resource isolation plugin {} {}", 
conf.get(DaemonConfig.STORM_RESOURCE_ISOLATION_PLUGIN), 
resourceIsolationManager);
+            LOG.info("Using resource isolation plugin {} {}", 
conf.get(DaemonConfig.STORM_RESOURCE_ISOLATION_PLUGIN),
+                     resourceIsolationManager);
         }
 
         if 
(ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), 
false)) {
@@ -66,10 +65,6 @@ public abstract class ContainerLauncher {
         }
         return new BasicContainerLauncher(conf, supervisorId, supervisorPort, 
resourceIsolationManager);
     }
-    
-    protected ContainerLauncher() {
-        //Empty
-    }
 
     /**
      * Launch a container in a given slot
@@ -80,7 +75,7 @@ public abstract class ContainerLauncher {
      * @throws IOException on any error 
      */
     public abstract Container launchContainer(int port, LocalAssignment 
assignment, LocalState state) throws IOException;
-    
+
     /**
      * Recover a container for a running process
      * @param port the port the assignment is running on
@@ -90,8 +85,9 @@ public abstract class ContainerLauncher {
      * @throws IOException on any error
      * @throws ContainerRecoveryException if the Container could not be 
recovered
      */
-    public abstract Container recoverContainer(int port, LocalAssignment 
assignment, LocalState state) throws IOException, ContainerRecoveryException;
-    
+    public abstract Container recoverContainer(int port, LocalAssignment 
assignment, LocalState state) throws IOException,
+        ContainerRecoveryException;
+
     /**
      * Try to recover a container using just the worker ID.  
      * The result is really only useful for killing the container

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerRecoveryException.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerRecoveryException.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerRecoveryException.java
index 7ab6e67..491ffdc 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerRecoveryException.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerRecoveryException.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.daemon.supervisor;
 
 /**

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/DefaultUncaughtExceptionHandler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/DefaultUncaughtExceptionHandler.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/DefaultUncaughtExceptionHandler.java
index 8785f86..7b7ff1b 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/DefaultUncaughtExceptionHandler.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/DefaultUncaughtExceptionHandler.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.daemon.supervisor;
 
 import org.apache.storm.utils.Utils;
@@ -23,6 +18,7 @@ import org.slf4j.LoggerFactory;
 
 public class DefaultUncaughtExceptionHandler implements 
Thread.UncaughtExceptionHandler {
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultUncaughtExceptionHandler.class);
+
     @Override
     public void uncaughtException(Thread t, Throwable e) {
         LOG.error("Error when processing event", e);

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/EventManagerPushCallback.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/EventManagerPushCallback.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/EventManagerPushCallback.java
index 79df800..09a7bfb 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/EventManagerPushCallback.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/EventManagerPushCallback.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.daemon.supervisor;
 
 import org.apache.storm.event.EventManager;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Killable.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Killable.java 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Killable.java
index 8d6d8e0..420f277 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Killable.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Killable.java
@@ -1,46 +1,41 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.daemon.supervisor;
 
 import java.io.IOException;
 
 public interface Killable {
-    
+
     /**
      * Kill the processes in this container nicely.
      * kill -15 equivalent
      * @throws IOException on any error
      */
     public void kill() throws IOException;
-    
+
     /**
      * Kill the processes in this container violently.
      * kill -9 equivalent
      * @throws IOException on any error
      */
     public void forceKill() throws IOException;
-    
+
     /**
      * @return true if all of the processes are dead, else false
      * @throws IOException on any error
      */
     public boolean areAllProcessesDead() throws IOException;
-    
+
     /**
      * Clean up the container. It is not coming back.
      * by default do the same thing as when restarting.

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainer.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainer.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainer.java
index 4afaffe..e972feb 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainer.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainer.java
@@ -1,25 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.daemon.supervisor;
 
 import java.io.IOException;
 import java.util.Map;
-
 import org.apache.storm.ProcessSimulator;
 import org.apache.storm.daemon.worker.Worker;
 import org.apache.storm.generated.LocalAssignment;
@@ -31,11 +25,11 @@ import org.slf4j.LoggerFactory;
 
 public class LocalContainer extends Container {
     private static final Logger LOG = 
LoggerFactory.getLogger(LocalContainer.class);
-    private volatile boolean _isAlive = false;
     private final IContext _sharedContext;
-    
+    private volatile boolean _isAlive = false;
+
     public LocalContainer(Map<String, Object> conf, String supervisorId, int 
supervisorPort, int port,
-            LocalAssignment assignment, IContext sharedContext) throws 
IOException {
+                          LocalAssignment assignment, IContext sharedContext) 
throws IOException {
         super(ContainerType.LAUNCH, conf, supervisorId, supervisorPort, port, 
assignment, null, null, null, null);
         _sharedContext = sharedContext;
         _workerId = Utils.uuid();

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java
index 1c43128..c2ff66f 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java
@@ -1,25 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.daemon.supervisor;
 
 import java.io.IOException;
 import java.util.Map;
-
 import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.messaging.IContext;
 import org.apache.storm.utils.LocalState;
@@ -34,7 +28,7 @@ public class LocalContainerLauncher extends ContainerLauncher 
{
     private final IContext _sharedContext;
 
     public LocalContainerLauncher(Map<String, Object> conf, String 
supervisorId, int supervisorPort,
-        IContext sharedContext) {
+                                  IContext sharedContext) {
         _conf = conf;
         _supervisorId = supervisorId;
         _supervisorPort = supervisorPort;

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java
index 7dc9b0b..a0d35d9 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java
@@ -25,8 +25,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This allows you to submit a Runnable with a key.  If the previous 
submission for that key has not yet run,
- * it will be replaced with the latest one.
+ * This allows you to submit a Runnable with a key.  If the previous 
submission for that key has not yet run, it will be replaced with the
+ * latest one.
  */
 public class OnlyLatestExecutor<K> {
     private static final Logger LOG = 
LoggerFactory.getLogger(OnlyLatestExecutor.class);
@@ -40,8 +40,9 @@ public class OnlyLatestExecutor<K> {
 
     /**
      * Run something in the future, but replace it with the latest if it is 
taking too long
+     *
      * @param key what to use to dedupe things.
-     * @param r what you want to run.
+     * @param r   what you want to run.
      */
     public void execute(final K key, Runnable r) {
         Runnable old = latest.put(key, r);
@@ -55,6 +56,6 @@ public class OnlyLatestExecutor<K> {
             });
         } else {
             LOG.debug("Replacing runnable for {} - {}", key, r);
-       }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
index e065f38..b617345 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.daemon.supervisor;
@@ -28,7 +22,6 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.daemon.supervisor.Slot.MachineState;
@@ -51,7 +44,17 @@ import org.slf4j.LoggerFactory;
 
 public class ReadClusterState implements Runnable, AutoCloseable {
     private static final Logger LOG = 
LoggerFactory.getLogger(ReadClusterState.class);
-    
+    private static final long ERROR_MILLIS = 60_000; //1 min.  This really 
means something is wrong.  Even on a very slow node
+    public static final UniFunc<Slot> DEFAULT_ON_ERROR_TIMEOUT = (slot) -> {
+        throw new IllegalStateException("It took over " + ERROR_MILLIS + "ms 
to shut down slot " + slot);
+    };
+    public static final UniFunc<Slot> THREAD_DUMP_ON_ERROR = (slot) -> {
+        LOG.warn("Shutdown of slot {} appears to be stuck\n{}", slot, 
Utils.threadDump());
+        DEFAULT_ON_ERROR_TIMEOUT.call(slot);
+    };
+    private static final long WARN_MILLIS = 1_000; //Initial timeout 1 second. 
 Workers commit suicide after this
+    public static final UniFunc<Slot> DEFAULT_ON_WARN_TIMEOUT =
+        (slot) -> LOG.warn("It has taken {}ms so far and {} is still not shut 
down.", WARN_MILLIS, slot);
     private final Map<String, Object> superConf;
     private final IStormClusterState stormClusterState;
     private final Map<Integer, Slot> slots = new HashMap<>();
@@ -90,14 +93,14 @@ public class ReadClusterState implements Runnable, 
AutoCloseable {
         }
 
         @SuppressWarnings("unchecked")
-        List<Number> ports = 
(List<Number>)superConf.get(DaemonConfig.SUPERVISOR_SLOTS_PORTS);
-        for (Number port: ports) {
+        List<Number> ports = (List<Number>) 
superConf.get(DaemonConfig.SUPERVISOR_SLOTS_PORTS);
+        for (Number port : ports) {
             slots.put(port.intValue(), mkSlot(port.intValue()));
         }
-        
+
         try {
             Collection<String> workers = 
SupervisorUtils.supervisorWorkerIds(superConf);
-            for (Slot slot: slots.values()) {
+            for (Slot slot : slots.values()) {
                 String workerId = slot.getWorkerId();
                 if (workerId != null) {
                     workers.remove(workerId);
@@ -109,35 +112,35 @@ public class ReadClusterState implements Runnable, 
AutoCloseable {
         } catch (Exception e) {
             LOG.warn("Error trying to clean up old workers", e);
         }
-        
-        for (Slot slot: slots.values()) {
+
+        for (Slot slot : slots.values()) {
             slot.start();
         }
     }
 
     private Slot mkSlot(int port) throws Exception {
         return new Slot(localizer, superConf, launcher, host, port,
-                localState, stormClusterState, iSuper, cachedAssignments, 
metricsExec, metricsProcessor);
+                        localState, stormClusterState, iSuper, 
cachedAssignments, metricsExec, metricsProcessor);
     }
-    
+
     @Override
     public synchronized void run() {
         try {
             List<String> stormIds = stormClusterState.assignments(null);
             Map<String, Assignment> assignmentsSnapshot = 
getAssignmentsSnapshot(stormClusterState);
-            
+
             Map<Integer, LocalAssignment> allAssignments = 
readAssignments(assignmentsSnapshot);
             if (allAssignments == null) {
                 //Something odd happened try again later
                 return;
             }
             Map<String, List<ProfileRequest>> topoIdToProfilerActions = 
getProfileActions(stormClusterState, stormIds);
-            
+
             HashSet<Integer> assignedPorts = new HashSet<>();
             LOG.debug("Synchronizing supervisor");
             LOG.debug("All assignment: {}", allAssignments);
             LOG.debug("Topology Ids -> Profiler Actions {}", 
topoIdToProfilerActions);
-            for (Integer port: allAssignments.keySet()) {
+            for (Integer port : allAssignments.keySet()) {
                 if (iSuper.confirmAssigned(port)) {
                     assignedPorts.add(port);
                 }
@@ -145,12 +148,12 @@ public class ReadClusterState implements Runnable, 
AutoCloseable {
             HashSet<Integer> allPorts = new HashSet<>(assignedPorts);
             iSuper.assigned(allPorts);
             allPorts.addAll(slots.keySet());
-            
+
             Map<Integer, Set<TopoProfileAction>> filtered = new HashMap<>();
-            for (Entry<String, List<ProfileRequest>> entry: 
topoIdToProfilerActions.entrySet()) {
+            for (Entry<String, List<ProfileRequest>> entry : 
topoIdToProfilerActions.entrySet()) {
                 String topoId = entry.getKey();
                 if (entry.getValue() != null) {
-                    for (ProfileRequest req: entry.getValue()) {
+                    for (ProfileRequest req : entry.getValue()) {
                         NodeInfo ni = req.get_nodeInfo();
                         if (host.equals(ni.get_node())) {
                             Long port = ni.get_port().iterator().next();
@@ -164,8 +167,8 @@ public class ReadClusterState implements Runnable, 
AutoCloseable {
                     }
                 }
             }
-            
-            for (Integer port: allPorts) {
+
+            for (Integer port : allPorts) {
                 Slot slot = slots.get(port);
                 if (slot == null) {
                     slot = mkSlot(port);
@@ -175,18 +178,19 @@ public class ReadClusterState implements Runnable, 
AutoCloseable {
                 slot.setNewAssignment(allAssignments.get(port));
                 slot.addProfilerActions(filtered.get(port));
             }
-            
+
         } catch (Exception e) {
             LOG.error("Failed to Sync Supervisor", e);
             throw new RuntimeException(e);
         }
     }
-    
+
     protected Map<String, Assignment> 
getAssignmentsSnapshot(IStormClusterState stormClusterState) throws Exception {
         return stormClusterState.assignmentsInfo();
     }
-    
-    protected Map<String, List<ProfileRequest>> 
getProfileActions(IStormClusterState stormClusterState, List<String> stormIds) 
throws Exception {
+
+    protected Map<String, List<ProfileRequest>> 
getProfileActions(IStormClusterState stormClusterState, List<String> stormIds) 
throws
+        Exception {
         Map<String, List<ProfileRequest>> ret = new HashMap<String, 
List<ProfileRequest>>();
         for (String stormId : stormIds) {
             List<ProfileRequest> profileRequests = 
stormClusterState.getTopologyProfileRequests(stormId);
@@ -194,7 +198,7 @@ public class ReadClusterState implements Runnable, 
AutoCloseable {
         }
         return ret;
     }
-    
+
     protected Map<Integer, LocalAssignment> readAssignments(Map<String, 
Assignment> assignmentsSnapshot) {
         try {
             Map<Integer, LocalAssignment> portLA = new HashMap<>();
@@ -214,7 +218,7 @@ public class ReadClusterState implements Runnable, 
AutoCloseable {
                         portLA.put(port, la);
                     } else {
                         throw new RuntimeException("Should not have multiple 
topologies assigned to one port "
-                          + port + " " + la + " " + portLA);
+                                                   + port + " " + la + " " + 
portLA);
                     }
                 }
             }
@@ -230,7 +234,7 @@ public class ReadClusterState implements Runnable, 
AutoCloseable {
             return null;
         }
     }
-    
+
     protected Map<Integer, LocalAssignment> readMyExecutors(String topoId, 
String assignmentId, Assignment assignment) {
         Map<Integer, LocalAssignment> portTasks = new HashMap<>();
         Map<Long, WorkerResources> slotsResources = new HashMap<>();
@@ -276,7 +280,7 @@ public class ReadClusterState implements Runnable, 
AutoCloseable {
                         }
                         List<ExecutorInfo> executorInfoList = 
localAssignment.get_executors();
                         executorInfoList.add(new 
ExecutorInfo(entry.getKey().get(0).intValue(),
-                                entry.getKey().get(entry.getKey().size() - 
1).intValue()));
+                                                              
entry.getKey().get(entry.getKey().size() - 1).intValue()));
                     }
                 }
             }
@@ -284,34 +288,23 @@ public class ReadClusterState implements Runnable, 
AutoCloseable {
         return portTasks;
     }
 
-    private static final long WARN_MILLIS = 1_000; //Initial timeout 1 second. 
 Workers commit suicide after this
-    private static final long ERROR_MILLIS = 60_000; //1 min.  This really 
means something is wrong.  Even on a very slow node
-    public static final UniFunc<Slot> DEFAULT_ON_ERROR_TIMEOUT = (slot) -> {
-        throw new IllegalStateException("It took over " + ERROR_MILLIS + "ms 
to shut down slot " + slot);
-    };
-    public static final UniFunc<Slot> DEFAULT_ON_WARN_TIMEOUT = (slot) -> 
LOG.warn("It has taken {}ms so far and {} is still not shut down.", 
WARN_MILLIS, slot);
-    public static final UniFunc<Slot> THREAD_DUMP_ON_ERROR = (slot) -> {
-        LOG.warn("Shutdown of slot {} appears to be stuck\n{}", slot, 
Utils.threadDump());
-        DEFAULT_ON_ERROR_TIMEOUT.call(slot);
-    };
-    
     public synchronized void shutdownAllWorkers(UniFunc<Slot> onWarnTimeout, 
UniFunc<Slot> onErrorTimeout) {
-        for (Slot slot: slots.values()) {
+        for (Slot slot : slots.values()) {
             LOG.info("Setting {} assignment to null", slot);
             slot.setNewAssignment(null);
         }
-        
+
         if (onWarnTimeout == null) {
             onWarnTimeout = DEFAULT_ON_WARN_TIMEOUT;
         }
-        
+
         if (onErrorTimeout == null) {
             onErrorTimeout = DEFAULT_ON_ERROR_TIMEOUT;
         }
-        
+
         long startTime = Time.currentTimeMillis();
         Exception exp = null;
-        for (Slot slot: slots.values()) {
+        for (Slot slot : slots.values()) {
             LOG.info("Waiting for {} to be EMPTY, currently {}", slot, 
slot.getMachineState());
             try {
                 while (slot.getMachineState() != MachineState.EMPTY) {
@@ -319,7 +312,7 @@ public class ReadClusterState implements Runnable, 
AutoCloseable {
                     if (timeSpentMillis > ERROR_MILLIS) {
                         onErrorTimeout.call(slot);
                     }
-                    
+
                     if (timeSpentMillis > WARN_MILLIS) {
                         onWarnTimeout.call(slot);
                     }
@@ -335,15 +328,15 @@ public class ReadClusterState implements Runnable, 
AutoCloseable {
         }
         if (exp != null) {
             if (exp instanceof RuntimeException) {
-                throw (RuntimeException)exp;
+                throw (RuntimeException) exp;
             }
             throw new RuntimeException(exp);
         }
     }
-    
+
     @Override
     public void close() {
-        for (Slot slot: slots.values()) {
+        for (Slot slot : slots.values()) {
             try {
                 slot.close();
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
index 2559d7a..eadd635 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.daemon.supervisor;
 
 import java.io.File;
@@ -22,12 +23,11 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.storm.container.ResourceIsolationInterface;
 import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.LocalState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,15 +39,15 @@ public class RunAsUserContainer extends BasicContainer {
                               ResourceIsolationInterface 
resourceIsolationManager, LocalState localState,
                               String workerId) throws IOException {
         this(type, conf, supervisorId, supervisorPort, port, assignment, 
resourceIsolationManager, localState, workerId,
-                null, null, null);
+             null, null, null);
     }
-    
+
     RunAsUserContainer(Container.ContainerType type, Map<String, Object> conf, 
String supervisorId, int supervisorPort,
                        int port, LocalAssignment assignment, 
ResourceIsolationInterface resourceIsolationManager,
                        LocalState localState, String workerId, Map<String, 
Object> topoConf, AdvancedFSOps ops,
                        String profileCmd) throws IOException {
         super(type, conf, supervisorId, supervisorPort, port, assignment, 
resourceIsolationManager, localState,
-                workerId, topoConf, ops, profileCmd);
+              workerId, topoConf, ops, profileCmd);
         if (Utils.isOnWindows()) {
             throw new UnsupportedOperationException("ERROR: Windows doesn't 
support running workers as different users yet");
         }
@@ -56,22 +56,23 @@ public class RunAsUserContainer extends BasicContainer {
     private void signal(long pid, int signal) throws IOException {
         List<String> commands = Arrays.asList("signal", String.valueOf(pid), 
String.valueOf(signal));
         String user = getWorkerUser();
-        String logPrefix = "kill -"+signal+" " + pid;
+        String logPrefix = "kill -" + signal + " " + pid;
         ClientSupervisorUtils.processLauncherAndWait(_conf, user, commands, 
null, logPrefix);
     }
-    
+
     @Override
     protected void kill(long pid) throws IOException {
         signal(pid, 15);
     }
-    
+
     @Override
     protected void forceKill(long pid) throws IOException {
         signal(pid, 9);
     }
-    
+
     @Override
-    protected boolean runProfilingCommand(List<String> command, Map<String, 
String> env, String logPrefix, File targetDir) throws IOException, 
InterruptedException {
+    protected boolean runProfilingCommand(List<String> command, Map<String, 
String> env, String logPrefix, File targetDir) throws
+        IOException, InterruptedException {
         String user = this.getWorkerUser();
         String td = targetDir.getAbsolutePath();
         LOG.info("Running as user: {} command: {}", user, command);
@@ -90,8 +91,8 @@ public class RunAsUserContainer extends BasicContainer {
     }
 
     @Override
-    protected void launchWorkerProcess(List<String> command, Map<String, 
String> env, 
-            String logPrefix, ExitCodeCallback processExitCallback, File 
targetDir) throws IOException {
+    protected void launchWorkerProcess(List<String> command, Map<String, 
String> env,
+                                       String logPrefix, ExitCodeCallback 
processExitCallback, File targetDir) throws IOException {
         String workerDir = targetDir.getAbsolutePath();
         String user = this.getWorkerUser();
         List<String> args = Arrays.asList("worker", workerDir, 
ServerUtils.writeScript(workerDir, command, env));

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java
index e6439db..c0bb47f 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java
@@ -1,38 +1,32 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.daemon.supervisor;
 
 import java.io.IOException;
 import java.util.Map;
-
 import org.apache.storm.container.ResourceIsolationInterface;
 import org.apache.storm.daemon.supervisor.Container.ContainerType;
 import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.utils.LocalState;
 
 public class RunAsUserContainerLauncher extends ContainerLauncher {
+    protected final ResourceIsolationInterface _resourceIsolationManager;
     private final Map<String, Object> _conf;
     private final String _supervisorId;
     private final int _supervisorPort;
-    protected final ResourceIsolationInterface _resourceIsolationManager;
-    
+
     public RunAsUserContainerLauncher(Map<String, Object> conf, String 
supervisorId, int supervisorPort,
-            ResourceIsolationInterface resourceIsolationManager) throws 
IOException {
+                                      ResourceIsolationInterface 
resourceIsolationManager) throws IOException {
         _conf = conf;
         _supervisorId = supervisorId;
         _supervisorPort = supervisorPort;
@@ -42,7 +36,7 @@ public class RunAsUserContainerLauncher extends 
ContainerLauncher {
     @Override
     public Container launchContainer(int port, LocalAssignment assignment, 
LocalState state) throws IOException {
         Container container = new RunAsUserContainer(ContainerType.LAUNCH, 
_conf, _supervisorId, _supervisorPort, port,
-                assignment, _resourceIsolationManager, state, null, null, 
null, null);
+                                                     assignment, 
_resourceIsolationManager, state, null, null, null, null);
         container.setup();
         container.launch();
         return container;
@@ -51,13 +45,13 @@ public class RunAsUserContainerLauncher extends 
ContainerLauncher {
     @Override
     public Container recoverContainer(int port, LocalAssignment assignment, 
LocalState state) throws IOException {
         return new RunAsUserContainer(ContainerType.RECOVER_FULL, _conf, 
_supervisorId, _supervisorPort, port,
-                assignment, _resourceIsolationManager, state, null, null, 
null, null);
+                                      assignment, _resourceIsolationManager, 
state, null, null, null, null);
     }
-    
+
     @Override
     public Killable recoverContainer(String workerId, LocalState localState) 
throws IOException {
         return new RunAsUserContainer(ContainerType.RECOVER_PARTIAL, _conf, 
_supervisorId, _supervisorPort, -1, null,
-                _resourceIsolationManager, localState, workerId, null, null, 
null);
+                                      _resourceIsolationManager, localState, 
workerId, null, null, null);
     }
 
 }

Reply via email to