http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index 5a629ad..1dda41c 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -1,19 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
 
 package org.apache.storm.daemon.supervisor;
@@ -72,212 +66,85 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
         
StormMetricsRegistry.registerMeter("supervisor:num-workers-killed-hb-null");
     private static final Meter numForceKill =
         
StormMetricsRegistry.registerMeter("supervisor:num-workers-force-kill");
+    private static final long ONE_SEC_IN_NANO = 
TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS);
+    private final AtomicReference<LocalAssignment> newAssignment = new 
AtomicReference<>();
+    private final AtomicReference<Set<TopoProfileAction>> profiling = new 
AtomicReference<>(new HashSet<>());
 
-    static enum MachineState {
-        EMPTY,
-        RUNNING,
-        WAITING_FOR_WORKER_START,
-        KILL_AND_RELAUNCH,
-        KILL,
-        KILL_BLOB_UPDATE,
-        WAITING_FOR_BLOB_LOCALIZATION,
-        WAITING_FOR_BLOB_UPDATE;
-    }
-    
-    static class StaticState {
-        public final AsyncLocalizer localizer;
-        public final long hbTimeoutMs;
-        public final long firstHbTimeoutMs;
-        public final long killSleepMs;
-        public final long monitorFreqMs;
-        public final ContainerLauncher containerLauncher;
-        public final int port;
-        public final String host;
-        public final ISupervisor iSupervisor;
-        public final LocalState localState;
-        public final BlobChangingCallback changingCallback;
-        public final OnlyLatestExecutor<Integer> metricsExec;
-        public final WorkerMetricsProcessor metricsProcessor;
-
-        StaticState(AsyncLocalizer localizer, long hbTimeoutMs, long 
firstHbTimeoutMs,
-                    long killSleepMs, long monitorFreqMs,
-                    ContainerLauncher containerLauncher, String host, int port,
-                    ISupervisor iSupervisor, LocalState localState,
-                    BlobChangingCallback changingCallback,
-                    OnlyLatestExecutor<Integer> metricsExec,
-                    WorkerMetricsProcessor metricsProcessor) {
-            this.localizer = localizer;
-            this.hbTimeoutMs = hbTimeoutMs;
-            this.firstHbTimeoutMs = firstHbTimeoutMs;
-            this.containerLauncher = containerLauncher;
-            this.killSleepMs = killSleepMs;
-            this.monitorFreqMs = monitorFreqMs;
-            this.host = host;
-            this.port = port;
-            this.iSupervisor = iSupervisor;
-            this.localState = localState;
-            this.changingCallback = changingCallback;
-            this.metricsExec = metricsExec;
-            this.metricsProcessor = metricsProcessor;
-        }
-    }
+    ;
+    private final BlockingQueue<BlobChanging> changingBlobs = new 
LinkedBlockingQueue<>();
+    private final StaticState staticState;
+    private final IStormClusterState clusterState;
+    private final AtomicReference<Map<Long, LocalAssignment>> 
cachedCurrentAssignments;
+    private final OnlyLatestExecutor<Integer> metricsExec;
+    private volatile boolean done = false;
+    private volatile DynamicState dynamicState;
 
-    static class DynamicState {
-        public final MachineState state;
-        public final LocalAssignment newAssignment;
-        public final LocalAssignment currentAssignment;
-        public final Container container;
-        public final LocalAssignment pendingLocalization;
-        public final Future<Void> pendingDownload;
-        public final Set<TopoProfileAction> profileActions;
-        public final Set<TopoProfileAction> pendingStopProfileActions;
-        public final Set<BlobChanging> changingBlobs;
-        public final LocalAssignment pendingChangingBlobsAssignment;
-        public final Set<Future<Void>> pendingChangingBlobs;
-        
-        /**
-         * The last time that WAITING_FOR_WORKER_START, KILL, or 
KILL_AND_RELAUNCH were entered into.
-         */
-        public final long startTime;
-        
-        public DynamicState(final LocalAssignment currentAssignment, Container 
container, final LocalAssignment newAssignment) {
-            this.currentAssignment = currentAssignment;
-            this.container = container;
-            if ((currentAssignment == null) ^ (container == null)) {
-                throw new IllegalArgumentException("Container and current 
assignment must both be null, or neither can be null");
-            }
-            
-            if (currentAssignment == null) {
-                state = MachineState.EMPTY;
-            } else {
-                state = MachineState.RUNNING;
-            }
-            
-            this.startTime = System.currentTimeMillis();
-            this.newAssignment = newAssignment;
-            this.pendingLocalization = null;
-            this.pendingDownload = null;
-            this.profileActions = Collections.emptySet();
-            this.pendingStopProfileActions = Collections.emptySet();
-            this.changingBlobs = Collections.emptySet();
-            this.pendingChangingBlobsAssignment = null;
-            this.pendingChangingBlobs = Collections.emptySet();
-        }
+    public Slot(AsyncLocalizer localizer, Map<String, Object> conf,
+                ContainerLauncher containerLauncher, String host,
+                int port, LocalState localState,
+                IStormClusterState clusterState,
+                ISupervisor iSupervisor,
+                AtomicReference<Map<Long, LocalAssignment>> 
cachedCurrentAssignments,
+                OnlyLatestExecutor<Integer> metricsExec,
+                WorkerMetricsProcessor metricsProcessor) throws Exception {
+        super("SLOT_" + port);
+        this.metricsExec = metricsExec;
 
-        public DynamicState(final MachineState state, final LocalAssignment 
newAssignment,
-                            final Container container, final LocalAssignment 
currentAssignment,
-                            final LocalAssignment pendingLocalization, final 
long startTime,
-                            final Future<Void> pendingDownload, final 
Set<TopoProfileAction> profileActions,
-                            final Set<TopoProfileAction> 
pendingStopProfileActions,
-                            final Set<BlobChanging> changingBlobs,
-                            final Set<Future<Void>> pendingChangingBlobs, 
final LocalAssignment pendingChaningBlobsAssignment) {
-            assert pendingChangingBlobs != null;
-            assert !(pendingChangingBlobs.isEmpty() ^ 
(pendingChaningBlobsAssignment == null));
-            this.state = state;
-            this.newAssignment = newAssignment;
-            this.currentAssignment = currentAssignment;
-            this.container = container;
-            this.pendingLocalization = pendingLocalization;
-            this.startTime = startTime;
-            this.pendingDownload = pendingDownload;
-            this.profileActions = profileActions;
-            this.pendingStopProfileActions = pendingStopProfileActions;
-            this.changingBlobs = changingBlobs;
-            this.pendingChangingBlobs = pendingChangingBlobs;
-            this.pendingChangingBlobsAssignment = 
pendingChaningBlobsAssignment;
+        this.cachedCurrentAssignments = cachedCurrentAssignments;
+        this.clusterState = clusterState;
+        Map<Integer, LocalAssignment> assignments = 
localState.getLocalAssignmentsMap();
+        LocalAssignment currentAssignment = null;
+        if (assignments != null) {
+            currentAssignment = assignments.get(port);
         }
-        
-        public String toString() {
-            StringBuffer sb = new StringBuffer();
-            sb.append(state);
-            sb.append(" msInState: ");
-            sb.append(Time.currentTimeMillis() - startTime);
-            if (container != null) {
-                sb.append(" ");
-                sb.append(container);
+        Container container = null;
+        if (currentAssignment != null) {
+            try {
+                // For now we do not make a transaction when removing a 
topology assignment from local, an overdue
+                // assignment may be left on local disk.
+                // So we should check if the local disk assignment is valid 
when initializing:
+                // if topology files does not exist, the worker[possibly 
alive] will be reassigned if it is timed-out;
+                // if topology files exist but the topology id is invalid, 
just let Supervisor make a sync;
+                // if topology files exist and topology files is valid, 
recover the container.
+                if (ClientSupervisorUtils.doRequiredTopoFilesExist(conf, 
currentAssignment.get_topology_id())) {
+                    container = containerLauncher.recoverContainer(port, 
currentAssignment, localState);
+                } else {
+                    // Make the assignment null to let slot clean up the disk 
assignment.
+                    currentAssignment = null;
+                }
+            } catch (ContainerRecoveryException e) {
+                //We could not recover container will be null.
             }
-            return sb.toString();
-        }
-
-        /**
-         * Set the new assignment for the state.  This should never be called 
from within the state machine.
-         * It is an input from outside.
-         * @param newAssignment the new assignment to set
-         * @return the updated DynamicState.
-         */
-        public DynamicState withNewAssignment(LocalAssignment newAssignment) {
-            return new DynamicState(this.state, newAssignment,
-                this.container, this.currentAssignment,
-                this.pendingLocalization, this.startTime,
-                this.pendingDownload, this.profileActions,
-                this.pendingStopProfileActions, this.changingBlobs,
-                this.pendingChangingBlobs, 
this.pendingChangingBlobsAssignment);
-        }
-
-        public DynamicState withPendingLocalization(LocalAssignment 
pendingLocalization, Future<Void> pendingDownload) {
-            return new DynamicState(this.state, this.newAssignment,
-                this.container, this.currentAssignment,
-                pendingLocalization, this.startTime,
-                pendingDownload, this.profileActions,
-                this.pendingStopProfileActions, this.changingBlobs,
-                this.pendingChangingBlobs, 
this.pendingChangingBlobsAssignment);
-        }
-        
-        public DynamicState withPendingLocalization(Future<Void> 
pendingDownload) {
-            return withPendingLocalization(this.pendingLocalization, 
pendingDownload);
-        }
-
-        public DynamicState withState(final MachineState state) {
-            long newStartTime = Time.currentTimeMillis();
-            return new DynamicState(state, this.newAssignment,
-                this.container, this.currentAssignment,
-                this.pendingLocalization, newStartTime,
-                this.pendingDownload, this.profileActions,
-                this.pendingStopProfileActions, this.changingBlobs,
-                this.pendingChangingBlobs, 
this.pendingChangingBlobsAssignment);
-        }
-
-        public DynamicState withCurrentAssignment(final Container container, 
final LocalAssignment currentAssignment) {
-            return new DynamicState(this.state, this.newAssignment,
-                container, currentAssignment,
-                this.pendingLocalization, this.startTime,
-                this.pendingDownload, this.profileActions,
-                this.pendingStopProfileActions, this.changingBlobs,
-                this.pendingChangingBlobs, 
this.pendingChangingBlobsAssignment);
-        }
-
-        public DynamicState withProfileActions(Set<TopoProfileAction> 
profileActions, Set<TopoProfileAction> pendingStopProfileActions) {
-            return new DynamicState(this.state, this.newAssignment,
-                this.container, this.currentAssignment,
-                this.pendingLocalization, this.startTime,
-                this.pendingDownload, profileActions,
-                pendingStopProfileActions, this.changingBlobs,
-                this.pendingChangingBlobs, 
this.pendingChangingBlobsAssignment);
         }
 
-        public DynamicState withChangingBlobs(Set<BlobChanging> changingBlobs) 
{
-            if (changingBlobs == this.changingBlobs) {
-                return this;
-            }
-            return new DynamicState(this.state, this.newAssignment,
-                this.container, this.currentAssignment,
-                this.pendingLocalization, this.startTime,
-                this.pendingDownload, profileActions,
-                this.pendingStopProfileActions, changingBlobs,
-                this.pendingChangingBlobs, 
this.pendingChangingBlobsAssignment);
+        LocalAssignment newAssignment = currentAssignment;
+        if (currentAssignment != null && container == null) {
+            currentAssignment = null;
+            //Assigned something but it is not running
         }
 
-        public DynamicState withPendingChangingBlobs(Set<Future<Void>> 
pendingChangingBlobs,
-                                                     LocalAssignment 
pendingChangingBlobsAssignment) {
-            return new DynamicState(this.state, this.newAssignment,
-                this.container, this.currentAssignment,
-                this.pendingLocalization, this.startTime,
-                this.pendingDownload, profileActions,
-                this.pendingStopProfileActions, this.changingBlobs,
-                pendingChangingBlobs,
-                pendingChangingBlobsAssignment);
+        dynamicState = new DynamicState(currentAssignment, container, 
newAssignment);
+        staticState = new StaticState(localizer,
+                                      
ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)) * 1000,
+                                      
ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_START_TIMEOUT_SECS))
 * 1000,
+                                      
ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS))
 * 1000,
+                                      
ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS)) * 
1000,
+                                      containerLauncher,
+                                      host,
+                                      port,
+                                      iSupervisor,
+                                      localState,
+                                      this,
+                                      metricsExec, metricsProcessor);
+        this.newAssignment.set(dynamicState.newAssignment);
+        if (MachineState.RUNNING == dynamicState.state) {
+            //We are running so we should recover the blobs.
+            staticState.localizer.recoverRunningTopology(currentAssignment, 
port, this);
+            saveNewAssignment(currentAssignment);
         }
-    };
+        LOG.info("SLOT {}:{} Starting in state {} - assignment {}", 
staticState.host, staticState.port, dynamicState.state,
+                 dynamicState.currentAssignment);
+    }
 
     //In some cases the new LocalAssignment may be equivalent to the old, but
     // It is not equal.  In those cases we want to update the current 
assignment to
@@ -291,55 +158,6 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
         return dynamicState;
     }
 
-    static class TopoProfileAction {
-        public final String topoId;
-        public final ProfileRequest request;
-
-        public TopoProfileAction(String topoId, ProfileRequest request) {
-            this.topoId = topoId;
-            this.request = request;
-        }
-
-        @Override
-        public int hashCode() {
-            return (37 * topoId.hashCode()) + request.hashCode();
-        }
-
-        @Override
-        public boolean equals(Object other) {
-            if (!(other instanceof TopoProfileAction)) {
-                return false;
-            }
-            TopoProfileAction o = (TopoProfileAction) other;
-            return topoId.equals(o.topoId) && request.equals(o.request);
-        }
-
-        @Override
-        public String toString() {
-            return "{ " + topoId + ": " + request + " }";
-        }
-    }
-
-    /**
-     * Holds the information about a blob that is changing.
-     */
-    static class BlobChanging {
-        private final LocalAssignment assignment;
-        private final LocallyCachedBlob blob;
-        private final GoodToGo.GoodToGoLatch latch;
-
-        public BlobChanging(LocalAssignment assignment, LocallyCachedBlob 
blob, GoodToGo.GoodToGoLatch latch) {
-            this.assignment = assignment;
-            this.blob = blob;
-            this.latch = latch;
-        }
-
-        @Override
-        public String toString() {
-            return "BLOB CHANGING " + blob + " " + assignment;
-        }
-    }
-
     @VisibleForTesting
     static boolean forSameTopology(LocalAssignment a, LocalAssignment b) {
         if (a == null && b == null) {
@@ -378,7 +196,7 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
         }
         return false;
     }
-    
+
     static DynamicState stateMachineStep(DynamicState dynamicState, 
StaticState staticState) throws Exception {
         LOG.debug("STATE {}", dynamicState.state);
         switch (dynamicState.state) {
@@ -399,10 +217,10 @@ public class Slot extends Thread implements 
AutoCloseable, BlobChangingCallback
             case WAITING_FOR_BLOB_UPDATE:
                 return handleWaitingForBlobUpdate(dynamicState, staticState);
             default:
-                throw new IllegalStateException("Code not ready to handle a 
state of "+dynamicState.state);
+                throw new IllegalStateException("Code not ready to handle a 
state of " + dynamicState.state);
         }
     }
-    
+
     /**
      * Prepare for a new assignment by downloading new required blobs, or 
going to empty if there is nothing to download.
      * PRECONDITION: The slot should be empty
@@ -412,34 +230,34 @@ public class Slot extends Thread implements 
AutoCloseable, BlobChangingCallback
      * @throws IOException on any error
      */
     static DynamicState prepareForNewAssignmentNoWorkersRunning(DynamicState 
dynamicState, StaticState staticState) throws IOException {
-        assert(dynamicState.container == null);
-        
+        assert (dynamicState.container == null);
+
         if (dynamicState.newAssignment == null) {
             return dynamicState.withState(MachineState.EMPTY);
         }
         Future<Void> pendingDownload = 
staticState.localizer.requestDownloadTopologyBlobs(dynamicState.newAssignment,
-            staticState.port, staticState.changingCallback);
+                                                                               
           staticState.port, staticState.changingCallback);
         return 
dynamicState.withPendingLocalization(dynamicState.newAssignment, 
pendingDownload)
-            .withState(MachineState.WAITING_FOR_BLOB_LOCALIZATION);
+                           
.withState(MachineState.WAITING_FOR_BLOB_LOCALIZATION);
     }
-    
+
     /**
      * Kill the current container and start downloading what the new 
assignment needs, if there is a new assignment.
      * PRECONDITION: container != null
      * @param dynamicState current state
      * @param staticState static data
      * @return the next state
-     * @throws Exception 
+     * @throws Exception
      */
     static DynamicState killContainerForChangedAssignment(DynamicState 
dynamicState, StaticState staticState) throws Exception {
-        assert(dynamicState.container != null);
+        assert (dynamicState.container != null);
 
         staticState.iSupervisor.killedWorker(staticState.port);
         dynamicState.container.kill();
         Future<Void> pendingDownload = null;
         if (dynamicState.newAssignment != null) {
             pendingDownload = 
staticState.localizer.requestDownloadTopologyBlobs(dynamicState.newAssignment, 
staticState.port,
-                staticState.changingCallback);
+                                                                               
  staticState.changingCallback);
         }
         dynamicState = drainAllChangingBlobs(dynamicState);
         Time.sleep(staticState.killSleepMs);
@@ -454,7 +272,7 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
      * @return the next state
      */
     private static DynamicState killContainerForChangedBlobs(DynamicState 
dynamicState, StaticState staticState) throws Exception {
-        assert(dynamicState.container != null);
+        assert (dynamicState.container != null);
 
         staticState.iSupervisor.killedWorker(staticState.port);
         dynamicState.container.kill();
@@ -469,20 +287,20 @@ public class Slot extends Thread implements 
AutoCloseable, BlobChangingCallback
      * @param dynamicState current state
      * @param staticState static data
      * @return the next state
-     * @throws Exception 
+     * @throws Exception
      */
     static DynamicState killAndRelaunchContainer(DynamicState dynamicState, 
StaticState staticState) throws Exception {
-        assert(dynamicState.container != null);
-        
+        assert (dynamicState.container != null);
+
         dynamicState.container.kill();
         Time.sleep(staticState.killSleepMs);
-        
+
         //any stop profile actions that hadn't timed out yet, we should 
restart after the worker is running again.
         HashSet<TopoProfileAction> mod = new 
HashSet<>(dynamicState.profileActions);
         mod.addAll(dynamicState.pendingStopProfileActions);
         return 
dynamicState.withState(MachineState.KILL_AND_RELAUNCH).withProfileActions(mod, 
Collections.emptySet());
     }
-    
+
     /**
      * Clean up a container.
      * PRECONDITION: All of the processes have died.
@@ -491,11 +309,12 @@ public class Slot extends Thread implements 
AutoCloseable, BlobChangingCallback
      * @param nextState the next MachineState to go to.
      * @return the next state.
      */
-    static DynamicState cleanupCurrentContainer(DynamicState dynamicState, 
StaticState staticState, MachineState nextState) throws Exception {
-        assert(dynamicState.container != null);
-        assert(dynamicState.currentAssignment != null);
-        assert(dynamicState.container.areAllProcessesDead());
-        
+    static DynamicState cleanupCurrentContainer(DynamicState dynamicState, 
StaticState staticState, MachineState nextState) throws
+        Exception {
+        assert (dynamicState.container != null);
+        assert (dynamicState.currentAssignment != null);
+        assert (dynamicState.container.areAllProcessesDead());
+
         dynamicState.container.cleanUp();
         staticState.localizer.releaseSlotFor(dynamicState.currentAssignment, 
staticState.port);
         DynamicState ret = dynamicState.withCurrentAssignment(null, null);
@@ -544,13 +363,13 @@ public class Slot extends Thread implements 
AutoCloseable, BlobChangingCallback
         }
         //Otherwise they will just be replaced
 
-        for (BlobChanging rc: dynamicState.changingBlobs) {
+        for (BlobChanging rc : dynamicState.changingBlobs) {
             futures.add(rc.latch.countDown());
         }
 
         LOG.debug("found changing blobs {} moving them to pending...", 
dynamicState.changingBlobs);
         return dynamicState.withChangingBlobs(Collections.emptySet())
-            .withPendingChangingBlobs(futures, assignment);
+                           .withPendingChangingBlobs(futures, assignment);
     }
 
     /**
@@ -567,7 +386,7 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
         }
 
         HashSet<BlobChanging> savedBlobs = new 
HashSet<>(dynamicState.changingBlobs.size());
-        for (BlobChanging rc: dynamicState.changingBlobs) {
+        for (BlobChanging rc : dynamicState.changingBlobs) {
             if (forSameTopology(assignment, rc.assignment)) {
                 savedBlobs.add(rc);
             } else {
@@ -587,10 +406,10 @@ public class Slot extends Thread implements 
AutoCloseable, BlobChangingCallback
      * @throws Exception on any error
      */
     static DynamicState handleWaitingForBlobLocalization(DynamicState 
dynamicState, StaticState staticState) throws Exception {
-        assert(dynamicState.pendingLocalization != null);
-        assert(dynamicState.pendingDownload != null);
-        assert(dynamicState.container == null);
-        
+        assert (dynamicState.pendingLocalization != null);
+        assert (dynamicState.pendingDownload != null);
+        assert (dynamicState.container == null);
+
         //Ignore changes to scheduling while downloading the topology blobs
         // We don't support canceling the download through the future yet,
         // so to keep everything in sync, just wait
@@ -608,19 +427,20 @@ public class Slot extends Thread implements 
AutoCloseable, BlobChangingCallback
                 //Scheduling changed
                 
staticState.localizer.releaseSlotFor(dynamicState.pendingLocalization, 
staticState.port);
                 return 
prepareForNewAssignmentNoWorkersRunning(dynamicState.withPendingChangingBlobs(Collections.emptySet(),
 null),
-                    staticState);
+                                                               staticState);
             }
 
             if (!dynamicState.pendingChangingBlobs.isEmpty()) {
                 LOG.info("There are pending changes, waiting for them to 
finish before launching container...");
                 //We cannot launch the container yet the resources may still 
be updating
                 return 
dynamicState.withState(MachineState.WAITING_FOR_BLOB_UPDATE)
-                    .withPendingLocalization(null, null);
+                                   .withPendingLocalization(null, null);
             }
 
             dynamicState = updateAssignmentIfNeeded(dynamicState);
             numWorkersLaunched.mark();
-            Container c = 
staticState.containerLauncher.launchContainer(staticState.port, 
dynamicState.pendingLocalization, staticState.localState);
+            Container c =
+                
staticState.containerLauncher.launchContainer(staticState.port, 
dynamicState.pendingLocalization, staticState.localState);
             return dynamicState
                 .withCurrentAssignment(c, 
dynamicState.pendingLocalization).withState(MachineState.WAITING_FOR_WORKER_START)
                 .withPendingLocalization(null, null);
@@ -643,8 +463,6 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
         }
     }
 
-    private static final long ONE_SEC_IN_NANO = 
TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS);
-
     /**
      * State Transitions for WAITING_FOR_BLOB_UPDATE state.
      *
@@ -667,14 +485,14 @@ public class Slot extends Thread implements 
AutoCloseable, BlobChangingCallback
             //We were rescheduled while waiting for the resources to be 
updated,
             // but the container is already not running.
             LOG.info("SLOT {}: Assignment Changed from {} to {}", 
staticState.port,
-                dynamicState.currentAssignment, dynamicState.newAssignment);
+                     dynamicState.currentAssignment, 
dynamicState.newAssignment);
             if (dynamicState.currentAssignment != null) {
                 
staticState.localizer.releaseSlotFor(dynamicState.currentAssignment, 
staticState.port);
             }
             
staticState.localizer.releaseSlotFor(dynamicState.pendingChangingBlobsAssignment,
 staticState.port);
             return 
prepareForNewAssignmentNoWorkersRunning(dynamicState.withCurrentAssignment(null,
 null)
-                    .withPendingChangingBlobs(Collections.emptySet(), null),
-                staticState);
+                                                                       
.withPendingChangingBlobs(Collections.emptySet(), null),
+                                                           staticState);
         }
 
         dynamicState = filterChangingBlobsFor(dynamicState, 
dynamicState.pendingChangingBlobsAssignment);
@@ -685,7 +503,7 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
         //We only have a set amount of time we can wait for before looping 
around again
         long start = Time.nanoTime();
         try {
-            for (Future<Void> pending: dynamicState.pendingChangingBlobs) {
+            for (Future<Void> pending : dynamicState.pendingChangingBlobs) {
                 long now = Time.nanoTime();
                 long timeLeft = ONE_SEC_IN_NANO - (now - start);
                 if (timeLeft <= 0) {
@@ -695,7 +513,7 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
             }
             //All done we can launch the worker now
             Container c = 
staticState.containerLauncher.launchContainer(staticState.port, 
dynamicState.pendingChangingBlobsAssignment,
-                staticState.localState);
+                                                                        
staticState.localState);
             return dynamicState
                 .withCurrentAssignment(c, 
dynamicState.pendingChangingBlobsAssignment).withState(MachineState.WAITING_FOR_WORKER_START)
                 .withPendingChangingBlobs(Collections.emptySet(), null);
@@ -714,13 +532,14 @@ public class Slot extends Thread implements 
AutoCloseable, BlobChangingCallback
      * @throws Exception on any error
      */
     static DynamicState handleKill(DynamicState dynamicState, StaticState 
staticState) throws Exception {
-        assert(dynamicState.container != null);
-        assert(dynamicState.currentAssignment != null);
+        assert (dynamicState.container != null);
+        assert (dynamicState.currentAssignment != null);
 
         if (dynamicState.container.areAllProcessesDead()) {
             LOG.info("SLOT {} all processes are dead...", staticState.port);
             return cleanupCurrentContainer(dynamicState, staticState,
-                dynamicState.pendingLocalization == null ? MachineState.EMPTY 
: MachineState.WAITING_FOR_BLOB_LOCALIZATION);
+                                           dynamicState.pendingLocalization ==
+                                           null ? MachineState.EMPTY : 
MachineState.WAITING_FOR_BLOB_LOCALIZATION);
         }
 
         LOG.warn("SLOT {} force kill and wait...", staticState.port);
@@ -740,9 +559,9 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
      * @throws Exception on any error
      */
     static DynamicState handleKillAndRelaunch(DynamicState dynamicState, 
StaticState staticState) throws Exception {
-        assert(dynamicState.container != null);
-        assert(dynamicState.currentAssignment != null);
-        
+        assert (dynamicState.container != null);
+        assert (dynamicState.currentAssignment != null);
+
         if (dynamicState.container.areAllProcessesDead()) {
             if (equivalent(dynamicState.newAssignment, 
dynamicState.currentAssignment)) {
                 dynamicState.container.cleanUpForRestart();
@@ -773,8 +592,8 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
      * @throws Exception on any error
      */
     private static DynamicState handleKillBlobUpdate(DynamicState 
dynamicState, StaticState staticState) throws Exception {
-        assert(dynamicState.container != null);
-        assert(dynamicState.currentAssignment != null);
+        assert (dynamicState.container != null);
+        assert (dynamicState.currentAssignment != null);
 
         //Release things that don't need to wait for us
         dynamicState = filterChangingBlobsFor(dynamicState, 
dynamicState.currentAssignment);
@@ -807,9 +626,9 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
      * @throws Exception on any error
      */
     static DynamicState handleWaitingForWorkerStart(DynamicState dynamicState, 
StaticState staticState) throws Exception {
-        assert(dynamicState.container != null);
-        assert(dynamicState.currentAssignment != null);
-        
+        assert (dynamicState.container != null);
+        assert (dynamicState.currentAssignment != null);
+
         LSWorkerHeartbeat hb = dynamicState.container.readHeartbeat();
         if (hb != null) {
             long hbAgeMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 
1000;
@@ -817,18 +636,19 @@ public class Slot extends Thread implements 
AutoCloseable, BlobChangingCallback
                 return dynamicState.withState(MachineState.RUNNING);
             }
         }
-        
+
         if (!equivalent(dynamicState.newAssignment, 
dynamicState.currentAssignment)) {
             //We were rescheduled while waiting for the worker to come up
             LOG.info("SLOT {}: Assignment Changed from {} to {}", 
staticState.port, dynamicState.currentAssignment,
-                dynamicState.newAssignment);
+                     dynamicState.newAssignment);
             return killContainerForChangedAssignment(dynamicState, 
staticState);
         }
         dynamicState = updateAssignmentIfNeeded(dynamicState);
 
         long timeDiffms = (Time.currentTimeMillis() - dynamicState.startTime);
         if (timeDiffms > staticState.firstHbTimeoutMs) {
-            LOG.warn("SLOT {}: Container {} failed to launch in {} ms.", 
staticState.port, dynamicState.container, staticState.firstHbTimeoutMs);
+            LOG.warn("SLOT {}: Container {} failed to launch in {} ms.", 
staticState.port, dynamicState.container,
+                     staticState.firstHbTimeoutMs);
             return killAndRelaunchContainer(dynamicState, staticState);
         }
 
@@ -850,12 +670,12 @@ public class Slot extends Thread implements 
AutoCloseable, BlobChangingCallback
      * @throws Exception on any error
      */
     static DynamicState handleRunning(DynamicState dynamicState, StaticState 
staticState) throws Exception {
-        assert(dynamicState.container != null);
-        assert(dynamicState.currentAssignment != null);
-        
+        assert (dynamicState.container != null);
+        assert (dynamicState.currentAssignment != null);
+
         if (!equivalent(dynamicState.newAssignment, 
dynamicState.currentAssignment)) {
             LOG.info("SLOT {}: Assignment Changed from {} to {}", 
staticState.port, dynamicState.currentAssignment,
-                dynamicState.newAssignment);
+                     dynamicState.newAssignment);
             //Scheduling changed while running...
             return killContainerForChangedAssignment(dynamicState, 
staticState);
         }
@@ -878,7 +698,7 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
             LOG.warn("SLOT {}: violated memory limits", staticState.port);
             return killAndRelaunchContainer(dynamicState, staticState);
         }
-        
+
         LSWorkerHeartbeat hb = dynamicState.container.readHeartbeat();
         if (hb == null) {
             numWorkersKilledHBNull.mark();
@@ -887,14 +707,14 @@ public class Slot extends Thread implements 
AutoCloseable, BlobChangingCallback
             // worker that never came up.
             return killAndRelaunchContainer(dynamicState, staticState);
         }
-        
+
         long timeDiffMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 1000;
         if (timeDiffMs > staticState.hbTimeoutMs) {
             numWorkersKilledHBTimeout.mark();
             LOG.warn("SLOT {}: HB is too old {} > {}", staticState.port, 
timeDiffMs, staticState.hbTimeoutMs);
             return killAndRelaunchContainer(dynamicState, staticState);
         }
-        
+
         //The worker is up and running check for profiling requests
         if (!dynamicState.profileActions.isEmpty()) {
             HashSet<TopoProfileAction> mod = new 
HashSet<>(dynamicState.profileActions);
@@ -943,114 +763,37 @@ public class Slot extends Thread implements 
AutoCloseable, BlobChangingCallback
             }
             dynamicState = dynamicState.withProfileActions(mod, modPending);
         }
-
-        dynamicState.container.processMetrics(staticState.metricsExec, 
staticState.metricsProcessor);
-
-        Time.sleep(staticState.monitorFreqMs);
-        return dynamicState;
-    }
-
-    static DynamicState handleEmpty(DynamicState dynamicState, StaticState 
staticState) throws InterruptedException, IOException {
-        assert dynamicState.changingBlobs.isEmpty();
-        assert dynamicState.pendingChangingBlobsAssignment == null;
-        if (!equivalent(dynamicState.newAssignment, 
dynamicState.currentAssignment)) {
-            return prepareForNewAssignmentNoWorkersRunning(dynamicState, 
staticState);
-        }
-        dynamicState = updateAssignmentIfNeeded(dynamicState);
-        
-        //Both assignments are null, just wait
-        if (dynamicState.profileActions != null && 
!dynamicState.profileActions.isEmpty()) {
-            //Nothing is scheduled here so throw away all of the profileActions
-            LOG.warn("Dropping {} no topology is running", 
dynamicState.profileActions);
-            dynamicState = 
dynamicState.withProfileActions(Collections.emptySet(), Collections.emptySet());
-        }
-        //Drop the change notifications we are not running anything right now
-        dynamicState = drainAllChangingBlobs(dynamicState);
-        Time.sleep(1000);
-        return dynamicState;
-    }
-
-    private final AtomicReference<LocalAssignment> newAssignment = new 
AtomicReference<>();
-    private final AtomicReference<Set<TopoProfileAction>> profiling = new 
AtomicReference<>(new HashSet<>());
-    private final BlockingQueue<BlobChanging> changingBlobs = new 
LinkedBlockingQueue<>();
-    private final StaticState staticState;
-    private final IStormClusterState clusterState;
-    private volatile boolean done = false;
-    private volatile DynamicState dynamicState;
-    private final AtomicReference<Map<Long, LocalAssignment>> 
cachedCurrentAssignments;
-    private final OnlyLatestExecutor<Integer> metricsExec;
-
-    public Slot(AsyncLocalizer localizer, Map<String, Object> conf,
-                ContainerLauncher containerLauncher, String host,
-                int port, LocalState localState,
-                IStormClusterState clusterState,
-                ISupervisor iSupervisor,
-                AtomicReference<Map<Long, LocalAssignment>> 
cachedCurrentAssignments,
-                OnlyLatestExecutor<Integer> metricsExec,
-                WorkerMetricsProcessor metricsProcessor) throws Exception {
-        super("SLOT_"+port);
-        this.metricsExec = metricsExec;
-
-        this.cachedCurrentAssignments = cachedCurrentAssignments;
-        this.clusterState = clusterState;
-        Map<Integer, LocalAssignment> assignments = 
localState.getLocalAssignmentsMap();
-        LocalAssignment currentAssignment = null;
-        if (assignments != null) {
-            currentAssignment = assignments.get(port);
-        }
-        Container container = null;
-        if (currentAssignment != null) { 
-            try {
-                // For now we do not make a transaction when removing a 
topology assignment from local, an overdue
-                // assignment may be left on local disk.
-                // So we should check if the local disk assignment is valid 
when initializing:
-                // if topology files does not exist, the worker[possibly 
alive] will be reassigned if it is timed-out;
-                // if topology files exist but the topology id is invalid, 
just let Supervisor make a sync;
-                // if topology files exist and topology files is valid, 
recover the container.
-                if (ClientSupervisorUtils.doRequiredTopoFilesExist(conf, 
currentAssignment.get_topology_id())) {
-                    container = containerLauncher.recoverContainer(port, 
currentAssignment, localState);
-                } else {
-                    // Make the assignment null to let slot clean up the disk 
assignment.
-                    currentAssignment = null;
-                }
-            } catch (ContainerRecoveryException e) {
-                //We could not recover container will be null.
-            }
-        }
-        
-        LocalAssignment newAssignment = currentAssignment;
-        if (currentAssignment != null && container == null) {
-            currentAssignment = null;
-            //Assigned something but it is not running
+
+        dynamicState.container.processMetrics(staticState.metricsExec, 
staticState.metricsProcessor);
+
+        Time.sleep(staticState.monitorFreqMs);
+        return dynamicState;
+    }
+
+    static DynamicState handleEmpty(DynamicState dynamicState, StaticState 
staticState) throws InterruptedException, IOException {
+        assert dynamicState.changingBlobs.isEmpty();
+        assert dynamicState.pendingChangingBlobsAssignment == null;
+        if (!equivalent(dynamicState.newAssignment, 
dynamicState.currentAssignment)) {
+            return prepareForNewAssignmentNoWorkersRunning(dynamicState, 
staticState);
         }
-        
-        dynamicState = new DynamicState(currentAssignment, container, 
newAssignment);
-        staticState = new StaticState(localizer,
-            
ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)) * 1000,
-            
ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_START_TIMEOUT_SECS))
 * 1000,
-            
ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS))
 * 1000,
-            
ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS)) * 
1000,
-            containerLauncher,
-            host,
-            port,
-            iSupervisor,
-            localState,
-            this,
-            metricsExec, metricsProcessor);
-        this.newAssignment.set(dynamicState.newAssignment);
-        if (MachineState.RUNNING == dynamicState.state) {
-            //We are running so we should recover the blobs.
-            staticState.localizer.recoverRunningTopology(currentAssignment, 
port, this);
-            saveNewAssignment(currentAssignment);
+        dynamicState = updateAssignmentIfNeeded(dynamicState);
+
+        //Both assignments are null, just wait
+        if (dynamicState.profileActions != null && 
!dynamicState.profileActions.isEmpty()) {
+            //Nothing is scheduled here so throw away all of the profileActions
+            LOG.warn("Dropping {} no topology is running", 
dynamicState.profileActions);
+            dynamicState = 
dynamicState.withProfileActions(Collections.emptySet(), Collections.emptySet());
         }
-        LOG.info("SLOT {}:{} Starting in state {} - assignment {}", 
staticState.host, staticState.port, dynamicState.state,
-            dynamicState.currentAssignment);
+        //Drop the change notifications we are not running anything right now
+        dynamicState = drainAllChangingBlobs(dynamicState);
+        Time.sleep(1000);
+        return dynamicState;
     }
-    
+
     public MachineState getMachineState() {
         return dynamicState.state;
     }
-    
+
     /**
      * Set a new assignment asynchronously.
      * @param newAssignment the new assignment for this slot to run, null to 
run nothing
@@ -1072,7 +815,7 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
 
     public void addProfilerActions(Set<TopoProfileAction> actions) {
         if (actions != null) {
-            while(true) {
+            while (true) {
                 Set<TopoProfileAction> orig = profiling.get();
                 Set<TopoProfileAction> newActions = new HashSet<>(orig);
                 newActions.addAll(actions);
@@ -1082,7 +825,7 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
             }
         }
     }
-    
+
     public String getWorkerId() {
         String workerId = null;
         Container c = dynamicState.container;
@@ -1091,9 +834,9 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
         }
         return workerId;
     }
-    
+
     private void saveNewAssignment(LocalAssignment assignment) {
-        synchronized(staticState.localState) {
+        synchronized (staticState.localState) {
             Map<Integer, LocalAssignment> assignments = 
staticState.localState.getLocalAssignmentsMap();
             if (assignments == null) {
                 assignments = new HashMap<>();
@@ -1118,10 +861,10 @@ public class Slot extends Thread implements 
AutoCloseable, BlobChangingCallback
             }
         } while (!cachedCurrentAssignments.compareAndSet(orig, update));
     }
-    
+
     public void run() {
         try {
-            while(!done) {
+            while (!done) {
                 Set<TopoProfileAction> origProfileActions = new 
HashSet<>(profiling.get());
                 Set<TopoProfileAction> removed = new 
HashSet<>(origProfileActions);
 
@@ -1132,7 +875,7 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
                     Iterator<BlobChanging> it = 
changingResourcesToHandle.iterator();
 
                     //Remove/Clean up changed requests that are not for us
-                    while(it.hasNext()) {
+                    while (it.hasNext()) {
                         BlobChanging rc = it.next();
                         if (!forSameTopology(rc.assignment, 
dynamicState.currentAssignment) &&
                             !forSameTopology(rc.assignment, 
dynamicState.newAssignment)) {
@@ -1144,16 +887,17 @@ public class Slot extends Thread implements 
AutoCloseable, BlobChangingCallback
 
                 DynamicState nextState =
                     
stateMachineStep(dynamicState.withNewAssignment(newAssignment.get())
-                        .withProfileActions(origProfileActions, 
dynamicState.pendingStopProfileActions)
-                        .withChangingBlobs(changingResourcesToHandle), 
staticState);
+                                                 
.withProfileActions(origProfileActions, dynamicState.pendingStopProfileActions)
+                                                 
.withChangingBlobs(changingResourcesToHandle), staticState);
 
                 if (LOG.isDebugEnabled() || dynamicState.state != 
nextState.state) {
                     LOG.info("STATE {} -> {}", dynamicState, nextState);
                 }
                 //Save the current state for recovery
                 if ((nextState.currentAssignment != null && 
!nextState.currentAssignment.equals(dynamicState.currentAssignment)) ||
-                        (dynamicState.currentAssignment != null && 
!dynamicState.currentAssignment.equals(nextState.currentAssignment))) {
-                    LOG.info("SLOT {}: Changing current assignment from {} to 
{}", staticState.port, dynamicState.currentAssignment, 
nextState.currentAssignment);
+                    (dynamicState.currentAssignment != null && 
!dynamicState.currentAssignment.equals(nextState.currentAssignment))) {
+                    LOG.info("SLOT {}: Changing current assignment from {} to 
{}", staticState.port, dynamicState.currentAssignment,
+                             nextState.currentAssignment);
                     saveNewAssignment(nextState.currentAssignment);
                 }
 
@@ -1167,11 +911,11 @@ public class Slot extends Thread implements 
AutoCloseable, BlobChangingCallback
                     saveNewAssignment(nextState.newAssignment);
                     nextState = 
nextState.withCurrentAssignment(nextState.container, nextState.newAssignment);
                 }
-                
+
                 // clean up the profiler actions that are not being processed
                 removed.removeAll(dynamicState.profileActions);
                 removed.removeAll(dynamicState.pendingStopProfileActions);
-                for (TopoProfileAction action: removed) {
+                for (TopoProfileAction action : removed) {
                     try {
                         
clusterState.deleteTopologyProfileRequests(action.topoId, action.request);
                     } catch (Exception e) {
@@ -1200,4 +944,259 @@ public class Slot extends Thread implements 
AutoCloseable, BlobChangingCallback
         this.interrupt();
         this.join();
     }
+
+    static enum MachineState {
+        EMPTY,
+        RUNNING,
+        WAITING_FOR_WORKER_START,
+        KILL_AND_RELAUNCH,
+        KILL,
+        KILL_BLOB_UPDATE,
+        WAITING_FOR_BLOB_LOCALIZATION,
+        WAITING_FOR_BLOB_UPDATE;
+    }
+
+    static class StaticState {
+        public final AsyncLocalizer localizer;
+        public final long hbTimeoutMs;
+        public final long firstHbTimeoutMs;
+        public final long killSleepMs;
+        public final long monitorFreqMs;
+        public final ContainerLauncher containerLauncher;
+        public final int port;
+        public final String host;
+        public final ISupervisor iSupervisor;
+        public final LocalState localState;
+        public final BlobChangingCallback changingCallback;
+        public final OnlyLatestExecutor<Integer> metricsExec;
+        public final WorkerMetricsProcessor metricsProcessor;
+
+        StaticState(AsyncLocalizer localizer, long hbTimeoutMs, long 
firstHbTimeoutMs,
+                    long killSleepMs, long monitorFreqMs,
+                    ContainerLauncher containerLauncher, String host, int port,
+                    ISupervisor iSupervisor, LocalState localState,
+                    BlobChangingCallback changingCallback,
+                    OnlyLatestExecutor<Integer> metricsExec,
+                    WorkerMetricsProcessor metricsProcessor) {
+            this.localizer = localizer;
+            this.hbTimeoutMs = hbTimeoutMs;
+            this.firstHbTimeoutMs = firstHbTimeoutMs;
+            this.containerLauncher = containerLauncher;
+            this.killSleepMs = killSleepMs;
+            this.monitorFreqMs = monitorFreqMs;
+            this.host = host;
+            this.port = port;
+            this.iSupervisor = iSupervisor;
+            this.localState = localState;
+            this.changingCallback = changingCallback;
+            this.metricsExec = metricsExec;
+            this.metricsProcessor = metricsProcessor;
+        }
+    }
+
+    static class DynamicState {
+        public final MachineState state;
+        public final LocalAssignment newAssignment;
+        public final LocalAssignment currentAssignment;
+        public final Container container;
+        public final LocalAssignment pendingLocalization;
+        public final Future<Void> pendingDownload;
+        public final Set<TopoProfileAction> profileActions;
+        public final Set<TopoProfileAction> pendingStopProfileActions;
+        public final Set<BlobChanging> changingBlobs;
+        public final LocalAssignment pendingChangingBlobsAssignment;
+        public final Set<Future<Void>> pendingChangingBlobs;
+
+        /**
+         * The last time that WAITING_FOR_WORKER_START, KILL, or 
KILL_AND_RELAUNCH were entered into.
+         */
+        public final long startTime;
+
+        public DynamicState(final LocalAssignment currentAssignment, Container 
container, final LocalAssignment newAssignment) {
+            this.currentAssignment = currentAssignment;
+            this.container = container;
+            if ((currentAssignment == null) ^ (container == null)) {
+                throw new IllegalArgumentException("Container and current 
assignment must both be null, or neither can be null");
+            }
+
+            if (currentAssignment == null) {
+                state = MachineState.EMPTY;
+            } else {
+                state = MachineState.RUNNING;
+            }
+
+            this.startTime = System.currentTimeMillis();
+            this.newAssignment = newAssignment;
+            this.pendingLocalization = null;
+            this.pendingDownload = null;
+            this.profileActions = Collections.emptySet();
+            this.pendingStopProfileActions = Collections.emptySet();
+            this.changingBlobs = Collections.emptySet();
+            this.pendingChangingBlobsAssignment = null;
+            this.pendingChangingBlobs = Collections.emptySet();
+        }
+
+        public DynamicState(final MachineState state, final LocalAssignment 
newAssignment,
+                            final Container container, final LocalAssignment 
currentAssignment,
+                            final LocalAssignment pendingLocalization, final 
long startTime,
+                            final Future<Void> pendingDownload, final 
Set<TopoProfileAction> profileActions,
+                            final Set<TopoProfileAction> 
pendingStopProfileActions,
+                            final Set<BlobChanging> changingBlobs,
+                            final Set<Future<Void>> pendingChangingBlobs, 
final LocalAssignment pendingChaningBlobsAssignment) {
+            assert pendingChangingBlobs != null;
+            assert !(pendingChangingBlobs.isEmpty() ^ 
(pendingChaningBlobsAssignment == null));
+            this.state = state;
+            this.newAssignment = newAssignment;
+            this.currentAssignment = currentAssignment;
+            this.container = container;
+            this.pendingLocalization = pendingLocalization;
+            this.startTime = startTime;
+            this.pendingDownload = pendingDownload;
+            this.profileActions = profileActions;
+            this.pendingStopProfileActions = pendingStopProfileActions;
+            this.changingBlobs = changingBlobs;
+            this.pendingChangingBlobs = pendingChangingBlobs;
+            this.pendingChangingBlobsAssignment = 
pendingChaningBlobsAssignment;
+        }
+
+        public String toString() {
+            StringBuffer sb = new StringBuffer();
+            sb.append(state);
+            sb.append(" msInState: ");
+            sb.append(Time.currentTimeMillis() - startTime);
+            if (container != null) {
+                sb.append(" ");
+                sb.append(container);
+            }
+            return sb.toString();
+        }
+
+        /**
+         * Set the new assignment for the state.  This should never be called 
from within the state machine.
+         * It is an input from outside.
+         * @param newAssignment the new assignment to set
+         * @return the updated DynamicState.
+         */
+        public DynamicState withNewAssignment(LocalAssignment newAssignment) {
+            return new DynamicState(this.state, newAssignment,
+                                    this.container, this.currentAssignment,
+                                    this.pendingLocalization, this.startTime,
+                                    this.pendingDownload, this.profileActions,
+                                    this.pendingStopProfileActions, 
this.changingBlobs,
+                                    this.pendingChangingBlobs, 
this.pendingChangingBlobsAssignment);
+        }
+
+        public DynamicState withPendingLocalization(LocalAssignment 
pendingLocalization, Future<Void> pendingDownload) {
+            return new DynamicState(this.state, this.newAssignment,
+                                    this.container, this.currentAssignment,
+                                    pendingLocalization, this.startTime,
+                                    pendingDownload, this.profileActions,
+                                    this.pendingStopProfileActions, 
this.changingBlobs,
+                                    this.pendingChangingBlobs, 
this.pendingChangingBlobsAssignment);
+        }
+
+        public DynamicState withPendingLocalization(Future<Void> 
pendingDownload) {
+            return withPendingLocalization(this.pendingLocalization, 
pendingDownload);
+        }
+
+        public DynamicState withState(final MachineState state) {
+            long newStartTime = Time.currentTimeMillis();
+            return new DynamicState(state, this.newAssignment,
+                                    this.container, this.currentAssignment,
+                                    this.pendingLocalization, newStartTime,
+                                    this.pendingDownload, this.profileActions,
+                                    this.pendingStopProfileActions, 
this.changingBlobs,
+                                    this.pendingChangingBlobs, 
this.pendingChangingBlobsAssignment);
+        }
+
+        public DynamicState withCurrentAssignment(final Container container, 
final LocalAssignment currentAssignment) {
+            return new DynamicState(this.state, this.newAssignment,
+                                    container, currentAssignment,
+                                    this.pendingLocalization, this.startTime,
+                                    this.pendingDownload, this.profileActions,
+                                    this.pendingStopProfileActions, 
this.changingBlobs,
+                                    this.pendingChangingBlobs, 
this.pendingChangingBlobsAssignment);
+        }
+
+        public DynamicState withProfileActions(Set<TopoProfileAction> 
profileActions, Set<TopoProfileAction> pendingStopProfileActions) {
+            return new DynamicState(this.state, this.newAssignment,
+                                    this.container, this.currentAssignment,
+                                    this.pendingLocalization, this.startTime,
+                                    this.pendingDownload, profileActions,
+                                    pendingStopProfileActions, 
this.changingBlobs,
+                                    this.pendingChangingBlobs, 
this.pendingChangingBlobsAssignment);
+        }
+
+        public DynamicState withChangingBlobs(Set<BlobChanging> changingBlobs) 
{
+            if (changingBlobs == this.changingBlobs) {
+                return this;
+            }
+            return new DynamicState(this.state, this.newAssignment,
+                                    this.container, this.currentAssignment,
+                                    this.pendingLocalization, this.startTime,
+                                    this.pendingDownload, profileActions,
+                                    this.pendingStopProfileActions, 
changingBlobs,
+                                    this.pendingChangingBlobs, 
this.pendingChangingBlobsAssignment);
+        }
+
+        public DynamicState withPendingChangingBlobs(Set<Future<Void>> 
pendingChangingBlobs,
+                                                     LocalAssignment 
pendingChangingBlobsAssignment) {
+            return new DynamicState(this.state, this.newAssignment,
+                                    this.container, this.currentAssignment,
+                                    this.pendingLocalization, this.startTime,
+                                    this.pendingDownload, profileActions,
+                                    this.pendingStopProfileActions, 
this.changingBlobs,
+                                    pendingChangingBlobs,
+                                    pendingChangingBlobsAssignment);
+        }
+    }
+
+    static class TopoProfileAction {
+        public final String topoId;
+        public final ProfileRequest request;
+
+        public TopoProfileAction(String topoId, ProfileRequest request) {
+            this.topoId = topoId;
+            this.request = request;
+        }
+
+        @Override
+        public int hashCode() {
+            return (37 * topoId.hashCode()) + request.hashCode();
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (!(other instanceof TopoProfileAction)) {
+                return false;
+            }
+            TopoProfileAction o = (TopoProfileAction) other;
+            return topoId.equals(o.topoId) && request.equals(o.request);
+        }
+
+        @Override
+        public String toString() {
+            return "{ " + topoId + ": " + request + " }";
+        }
+    }
+
+    /**
+     * Holds the information about a blob that is changing.
+     */
+    static class BlobChanging {
+        private final LocalAssignment assignment;
+        private final LocallyCachedBlob blob;
+        private final GoodToGo.GoodToGoLatch latch;
+
+        public BlobChanging(LocalAssignment assignment, LocallyCachedBlob 
blob, GoodToGo.GoodToGoLatch latch) {
+            this.assignment = assignment;
+            this.blob = blob;
+            this.latch = latch;
+        }
+
+        @Override
+        public String toString() {
+            return "BLOB CHANGING " + blob + " " + assignment;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
index 1332a21..0730957 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java
@@ -1,32 +1,26 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.daemon.supervisor;
 
-import org.apache.storm.DaemonConfig;
-import org.apache.storm.scheduler.ISupervisor;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.LocalState;
+package org.apache.storm.daemon.supervisor;
 
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.Map;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
 
 public class StandaloneSupervisor implements ISupervisor {
     private String supervisorId;
@@ -79,7 +73,7 @@ public class StandaloneSupervisor implements ISupervisor {
 
     }
 
-    public String generateSupervisorId(){
+    public String generateSupervisorId() {
         String extraPart = "";
         try {
             extraPart = "-" + InetAddress.getLocalHost().getHostAddress();

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index 4e5b861..c445383 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -8,7 +8,7 @@
  * with the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,6 +18,7 @@
 
 package org.apache.storm.daemon.supervisor;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
 import java.io.IOException;
 import java.net.BindException;
@@ -31,8 +32,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
-
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.io.FileUtils;
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
@@ -66,8 +65,6 @@ import org.apache.storm.security.auth.ReqContext;
 import org.apache.storm.security.auth.ThriftConnectionType;
 import org.apache.storm.security.auth.ThriftServer;
 import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.ServerConfigUtils;
-import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ServerConfigUtils;
@@ -76,7 +73,6 @@ import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.VersionInfo;
 import org.apache.thrift.TException;
 import org.apache.thrift.TProcessor;
-import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,7 +81,6 @@ public class Supervisor implements DaemonCommon, 
AutoCloseable {
     private final Map<String, Object> conf;
     private final IContext sharedContext;
     private final IAuthorizer authorizationHandler;
-    private volatile boolean active;
     private final ISupervisor iSupervisor;
     private final Utils.UptimeComputer upTime;
     private final String stormVersion;
@@ -105,12 +100,13 @@ public class Supervisor implements DaemonCommon, 
AutoCloseable {
     // to really make this work well.
     private final ExecutorService heartbeatExecutor;
     private final AsyncLocalizer asyncLocalizer;
+    private volatile boolean active;
     private EventManager eventManager;
     private ReadClusterState readState;
     private ThriftServer thriftServer;
     //used for local cluster heartbeating
     private Nimbus.Iface localNimbus;
-    
+
     private Supervisor(ISupervisor iSupervisor)
         throws IOException, IllegalAccessException, InstantiationException, 
ClassNotFoundException {
         this(Utils.readStormConfig(), null, iSupervisor);
@@ -118,13 +114,14 @@ public class Supervisor implements DaemonCommon, 
AutoCloseable {
 
     /**
      * Constructor for supervisor daemon.
-     * @param conf config
+     *
+     * @param conf          config
      * @param sharedContext {@link IContext}
-     * @param iSupervisor {@link ISupervisor}
+     * @param iSupervisor   {@link ISupervisor}
      * @throws IOException
      */
     public Supervisor(Map<String, Object> conf, IContext sharedContext, 
ISupervisor iSupervisor)
-        throws IOException, IllegalAccessException, ClassNotFoundException, 
InstantiationException{
+        throws IOException, IllegalAccessException, ClassNotFoundException, 
InstantiationException {
         this.conf = conf;
         this.iSupervisor = iSupervisor;
         this.active = true;
@@ -133,17 +130,17 @@ public class Supervisor implements DaemonCommon, 
AutoCloseable {
         this.sharedContext = sharedContext;
         this.heartbeatExecutor = Executors.newFixedThreadPool(1);
         this.authorizationHandler = StormCommon.mkAuthorizationHandler(
-                                        (String) 
conf.get(DaemonConfig.SUPERVISOR_AUTHORIZER), conf);
+            (String) conf.get(DaemonConfig.SUPERVISOR_AUTHORIZER), conf);
         if (authorizationHandler == null && 
conf.get(DaemonConfig.NIMBUS_AUTHORIZER) != null) {
             throw new IllegalStateException("It looks like authorization is 
turned on for nimbus but not for the "
-                                                + "supervisor....");
+                                            + "supervisor....");
         }
-        
+
         iSupervisor.prepare(conf, 
ServerConfigUtils.supervisorIsupervisorDir(conf));
 
         try {
             this.stormClusterState = ClusterUtils.mkStormClusterState(conf,
-                new ClusterStateContext(DaemonType.SUPERVISOR, conf));
+                                                                      new 
ClusterStateContext(DaemonType.SUPERVISOR, conf));
         } catch (Exception e) {
             LOG.error("supervisor can't create stormClusterState");
             throw Utils.wrapInRuntime(e);
@@ -175,6 +172,18 @@ public class Supervisor implements DaemonCommon, 
AutoCloseable {
     }
 
     /**
+     * supervisor daemon enter entrance.
+     *
+     * @param args
+     */
+    public static void main(String[] args) throws Exception {
+        Utils.setupDefaultUncaughtExceptionHandler();
+        @SuppressWarnings("resource")
+        Supervisor instance = new Supervisor(new StandaloneSupervisor());
+        instance.launchDaemon();
+    }
+
+    /**
      * Get the executor service that is supposed to be used for heart-beats.
      */
     public ExecutorService getHeartbeatExecutor() {
@@ -184,7 +193,7 @@ public class Supervisor implements DaemonCommon, 
AutoCloseable {
     public String getId() {
         return supervisorId;
     }
-    
+
     IContext getSharedContext() {
         return sharedContext;
     }
@@ -232,11 +241,11 @@ public class Supervisor implements DaemonCommon, 
AutoCloseable {
     public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() {
         return currAssignment;
     }
-    
+
     AsyncLocalizer getAsyncLocalizer() {
         return asyncLocalizer;
     }
-    
+
     EventManager getEventManger() {
         return eventManager;
     }
@@ -245,14 +254,14 @@ public class Supervisor implements DaemonCommon, 
AutoCloseable {
         return this;
     }
 
+    public Nimbus.Iface getLocalNimbus() {
+        return this.localNimbus;
+    }
+
     public void setLocalNimbus(Nimbus.Iface nimbus) {
         this.localNimbus = nimbus;
     }
 
-    public Nimbus.Iface getLocalNimbus() {
-        return this.localNimbus;
-    }
-    
     /**
      * Launch the supervisor.
      */
@@ -276,7 +285,7 @@ public class Supervisor implements DaemonCommon, 
AutoCloseable {
             // This isn't strictly necessary, but it doesn't hurt and ensures 
that the machine stays up
             // to date even if callbacks don't all work exactly right
             eventTimer.scheduleRecurring(0, 10,
-                    new EventManagerPushCallback(new 
SynchronizeAssignments(this, null, readState), eventManager));
+                                         new EventManagerPushCallback(new 
SynchronizeAssignments(this, null, readState), eventManager));
 
             // supervisor health check
             eventTimer.scheduleRecurring(30, 30, new 
SupervisorHealthCheck(this));
@@ -323,7 +332,7 @@ public class Supervisor implements DaemonCommon, 
AutoCloseable {
 
     @VisibleForTesting
     public void checkAuthorization(String topoName, Map<String, Object> 
topoConf, String operation, ReqContext context)
-            throws AuthorizationException {
+        throws AuthorizationException {
         IAuthorizer aclHandler = authorizationHandler;
         if (context == null) {
             context = ReqContext.context();
@@ -337,19 +346,19 @@ public class Supervisor implements DaemonCommon, 
AutoCloseable {
 
         if (context.isImpersonating()) {
             LOG.warn("principal: {} is trying to impersonate principal: {}", 
context.realPrincipal(),
-                    context.principal());
+                     context.principal());
             throw new AuthorizationException("Supervisor does not support 
impersonation");
         }
 
         if (aclHandler != null) {
             if (!aclHandler.permit(context, operation, checkConf)) {
                 ThriftAccessLogger.logAccess(context.requestID(), 
context.remoteAddress(), context.principal(),
-                        operation, topoName, "access-denied");
-                throw new AuthorizationException( operation + (topoName != 
null ? " on topology " + topoName : "") +
-                        " is not authorized");
+                                             operation, topoName, 
"access-denied");
+                throw new AuthorizationException(operation + (topoName != null 
? " on topology " + topoName : "") +
+                                                 " is not authorized");
             } else {
                 ThriftAccessLogger.logAccess(context.requestID(), 
context.remoteAddress(), context.principal(),
-                        operation, topoName, "access-granted");
+                                             operation, topoName, 
"access-granted");
             }
         }
     }
@@ -366,58 +375,59 @@ public class Supervisor implements DaemonCommon, 
AutoCloseable {
         }
 
         TProcessor processor = new 
org.apache.storm.generated.Supervisor.Processor(
-                new org.apache.storm.generated.Supervisor.Iface() {
-                    @Override
-                    public void 
sendSupervisorAssignments(SupervisorAssignments assignments)
-                            throws AuthorizationException, TException {
-                        checkAuthorization("sendSupervisorAssignments");
-                        LOG.info("Got an assignments from master, will start 
to sync with assignments: {}", assignments);
-                        SynchronizeAssignments syn = new 
SynchronizeAssignments(getSupervisor(), assignments,
-                                                             
getReadClusterState());
-                        getEventManger().add(syn);
-                    }
+            new org.apache.storm.generated.Supervisor.Iface() {
+                @Override
+                public void sendSupervisorAssignments(SupervisorAssignments 
assignments)
+                    throws AuthorizationException, TException {
+                    checkAuthorization("sendSupervisorAssignments");
+                    LOG.info("Got an assignments from master, will start to 
sync with assignments: {}", assignments);
+                    SynchronizeAssignments syn = new 
SynchronizeAssignments(getSupervisor(), assignments,
+                                                                            
getReadClusterState());
+                    getEventManger().add(syn);
+                }
 
-                    @Override
-                    public Assignment getLocalAssignmentForStorm(String id)
-                            throws NotAliveException, AuthorizationException, 
TException {
-                        Map<String, Object> topoConf = null;
-                        try {
-                            topoConf = 
ConfigUtils.readSupervisorStormConf(conf, id);
-                        } catch (IOException e) {
-                            LOG.warn("Topology config is not localized 
yet...");
-                        }
-                        checkAuthorization(id, topoConf, 
"getLocalAssignmentForStorm");
-                        Assignment assignment = 
getStormClusterState().assignmentInfo(id, null);
-                        if (null == assignment) {
-                            throw new NotAliveException("No local assignment 
assigned for storm: "
-                                                            + id
-                                                            + " for node: "
-                                                            + getHostName());
-                        }
-                        return assignment;
+                @Override
+                public Assignment getLocalAssignmentForStorm(String id)
+                    throws NotAliveException, AuthorizationException, 
TException {
+                    Map<String, Object> topoConf = null;
+                    try {
+                        topoConf = ConfigUtils.readSupervisorStormConf(conf, 
id);
+                    } catch (IOException e) {
+                        LOG.warn("Topology config is not localized yet...");
                     }
+                    checkAuthorization(id, topoConf, 
"getLocalAssignmentForStorm");
+                    Assignment assignment = 
getStormClusterState().assignmentInfo(id, null);
+                    if (null == assignment) {
+                        throw new NotAliveException("No local assignment 
assigned for storm: "
+                                                    + id
+                                                    + " for node: "
+                                                    + getHostName());
+                    }
+                    return assignment;
+                }
 
-                    @Override
-                    public void 
sendSupervisorWorkerHeartbeat(SupervisorWorkerHeartbeat heartbeat)
-                            throws AuthorizationException, NotAliveException, 
TException {
-                        // do nothing except validate heartbeat for now.
-                        String id = heartbeat.get_storm_id();
-                        Map<String, Object> topoConf = null;
-                        try {
-                            topoConf = 
ConfigUtils.readSupervisorStormConf(conf, id);
-                        } catch (IOException e) {
-                            LOG.warn("Topology config is not localized 
yet...");
-                            throw new NotAliveException(id + " does not appear 
to be alive, you should probably exit");
-                        }
-                        checkAuthorization(id, topoConf, 
"sendSupervisorWorkerHeartbeat");
+                @Override
+                public void 
sendSupervisorWorkerHeartbeat(SupervisorWorkerHeartbeat heartbeat)
+                    throws AuthorizationException, NotAliveException, 
TException {
+                    // do nothing except validate heartbeat for now.
+                    String id = heartbeat.get_storm_id();
+                    Map<String, Object> topoConf = null;
+                    try {
+                        topoConf = ConfigUtils.readSupervisorStormConf(conf, 
id);
+                    } catch (IOException e) {
+                        LOG.warn("Topology config is not localized yet...");
+                        throw new NotAliveException(id + " does not appear to 
be alive, you should probably exit");
                     }
-                });
+                    checkAuthorization(id, topoConf, 
"sendSupervisorWorkerHeartbeat");
+                }
+            });
         this.thriftServer = new ThriftServer(conf, processor, 
ThriftConnectionType.SUPERVISOR);
         this.thriftServer.serve();
     }
 
     /**
      * Used for local cluster assignments distribution.
+     *
      * @param assignments {@link SupervisorAssignments}
      */
     public void sendSupervisorAssignments(SupervisorAssignments assignments) {
@@ -438,7 +448,7 @@ public class Supervisor implements DaemonCommon, 
AutoCloseable {
             }
         });
     }
-    
+
     @Override
     public void close() {
         try {
@@ -455,14 +465,14 @@ public class Supervisor implements DaemonCommon, 
AutoCloseable {
             }
             asyncLocalizer.close();
             getStormClusterState().disconnect();
-            if(thriftServer != null) {
+            if (thriftServer != null) {
                 this.thriftServer.stop();
             }
         } catch (Exception e) {
             LOG.error("Error Shutting down", e);
         }
     }
-    
+
     void killWorkers(Collection<String> workerIds, ContainerLauncher launcher) 
throws InterruptedException, IOException {
         HashSet<Killable> containers = new HashSet<>();
         for (String workerId : workerIds) {
@@ -482,14 +492,14 @@ public class Supervisor implements DaemonCommon, 
AutoCloseable {
         if (!containers.isEmpty()) {
             Time.sleepSecs(shutdownSleepSecs);
         }
-        for (Killable k: containers) {
+        for (Killable k : containers) {
             try {
                 k.forceKill();
                 long start = Time.currentTimeMillis();
                 while (!k.areAllProcessesDead()) {
                     if ((Time.currentTimeMillis() - start) > 10_000) {
-                        throw new RuntimeException("Giving up on killing " + k 
-                                + " after " + (Time.currentTimeMillis() - 
start) + " ms");
+                        throw new RuntimeException("Giving up on killing " + k
+                                                   + " after " + 
(Time.currentTimeMillis() - start) + " ms");
                     }
                     Time.sleep(100);
                     k.forceKill();
@@ -507,7 +517,7 @@ public class Supervisor implements DaemonCommon, 
AutoCloseable {
         } else {
             try {
                 ContainerLauncher launcher = ContainerLauncher.make(getConf(), 
getId(), getThriftServerPort(),
-                    getSharedContext());
+                                                                    
getSharedContext());
                 killWorkers(SupervisorUtils.supervisorWorkerIds(conf), 
launcher);
             } catch (Exception e) {
                 throw Utils.wrapInRuntime(e);
@@ -522,20 +532,8 @@ public class Supervisor implements DaemonCommon, 
AutoCloseable {
         }
 
         return heartbeatTimer.isTimerWaiting()
-                && workerHeartbeatTimer.isTimerWaiting()
-                && eventTimer.isTimerWaiting()
-                && eventManager.waiting();
-    }
-
-    /**
-     * supervisor daemon enter entrance.
-     *
-     * @param args
-     */
-    public static void main(String[] args) throws Exception {
-        Utils.setupDefaultUncaughtExceptionHandler();
-        @SuppressWarnings("resource")
-        Supervisor instance = new Supervisor(new StandaloneSupervisor());
-        instance.launchDaemon();
+               && workerHeartbeatTimer.isTimerWaiting()
+               && eventTimer.isTimerWaiting()
+               && eventManager.waiting();
     }
 }

Reply via email to