http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Slot.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Slot.java 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Slot.java
new file mode 100644
index 0000000..c92db0c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Slot.java
@@ -0,0 +1,785 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Slot extends Thread implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(Slot.class);
+    
+    static enum MachineState {
+        EMPTY,
+        RUNNING,
+        WAITING_FOR_WORKER_START,
+        KILL_AND_RELAUNCH,
+        KILL,
+        WAITING_FOR_BASIC_LOCALIZATION,
+        WAITING_FOR_BLOB_LOCALIZATION;
+    };
+    
+    static class StaticState {
+        public final ILocalizer localizer;
+        public final long hbTimeoutMs;
+        public final long firstHbTimeoutMs;
+        public final long killSleepMs;
+        public final long monitorFreqMs;
+        public final ContainerLauncher containerLauncher;
+        public final int port;
+        public final String host;
+        public final ISupervisor iSupervisor;
+        public final LocalState localState;
+        
+        StaticState(ILocalizer localizer, long hbTimeoutMs, long 
firstHbTimeoutMs,
+                long killSleepMs, long monitorFreqMs,
+                ContainerLauncher containerLauncher, String host, int port,
+                ISupervisor iSupervisor, LocalState localState) {
+            this.localizer = localizer;
+            this.hbTimeoutMs = hbTimeoutMs;
+            this.firstHbTimeoutMs = firstHbTimeoutMs;
+            this.containerLauncher = containerLauncher;
+            this.killSleepMs = killSleepMs;
+            this.monitorFreqMs = monitorFreqMs;
+            this.host = host;
+            this.port = port;
+            this.iSupervisor = iSupervisor;
+            this.localState = localState;
+        }
+    }
+    
+    static class DynamicState {
+        public final MachineState state;
+        public final LocalAssignment newAssignment;
+        public final LocalAssignment currentAssignment;
+        public final Container container;
+        public final LocalAssignment pendingLocalization;
+        public final Future<Void> pendingDownload;
+        public final Set<TopoProfileAction> profileActions;
+        public final Set<TopoProfileAction> pendingStopProfileActions;
+        
+        /**
+         * The last time that WAITING_FOR_WORKER_START, KILL, or 
KILL_AND_RELAUNCH were entered into.
+         */
+        public final long startTime;
+        
+        public DynamicState(final LocalAssignment currentAssignment, Container 
container, final LocalAssignment newAssignment) {
+            this.currentAssignment = currentAssignment;
+            this.container = container;
+            if ((currentAssignment == null) ^ (container == null)) {
+                throw new IllegalArgumentException("Container and current 
assignment must both be null, or neither can be null");
+            }
+            
+            if (currentAssignment == null) {
+                state = MachineState.EMPTY;
+            } else {
+                state = MachineState.RUNNING;
+            }
+            
+            this.startTime = System.currentTimeMillis();
+            this.newAssignment = newAssignment;
+            this.pendingLocalization = null;
+            this.pendingDownload = null;
+            this.profileActions = new HashSet<>();
+            this.pendingStopProfileActions = new HashSet<>();
+        }
+        
+        public DynamicState(final MachineState state, final LocalAssignment 
newAssignment,
+                final Container container, final LocalAssignment 
currentAssignment,
+                final LocalAssignment pendingLocalization, final long 
startTime,
+                final Future<Void> pendingDownload, final 
Set<TopoProfileAction> profileActions, 
+                final Set<TopoProfileAction> pendingStopProfileActions) {
+            this.state = state;
+            this.newAssignment = newAssignment;
+            this.currentAssignment = currentAssignment;
+            this.container = container;
+            this.pendingLocalization = pendingLocalization;
+            this.startTime = startTime;
+            this.pendingDownload = pendingDownload;
+            this.profileActions = profileActions;
+            this.pendingStopProfileActions = pendingStopProfileActions;
+        }
+        
+        public String toString() {
+            StringBuffer sb = new StringBuffer();
+            sb.append(state);
+            sb.append(" msInState: ");
+            sb.append(Time.currentTimeMillis() - startTime);
+            if (container != null) {
+                sb.append(" ");
+                sb.append(container);
+            }
+            return sb.toString();
+        }
+
+        /**
+         * Set the new assignment for the state.  This should never be called 
from within the state machine.
+         * It is an input from outside.
+         * @param newAssignment the new assignment to set
+         * @return the updated DynamicState.
+         */
+        public DynamicState withNewAssignment(LocalAssignment newAssignment) {
+            return new DynamicState(this.state, newAssignment,
+                    this.container, this.currentAssignment,
+                    this.pendingLocalization, this.startTime,
+                    this.pendingDownload, this.profileActions,
+                    this.pendingStopProfileActions);
+        }
+        
+        public DynamicState withPendingLocalization(LocalAssignment 
pendingLocalization, Future<Void> pendingDownload) {
+            return new DynamicState(this.state, this.newAssignment,
+                    this.container, this.currentAssignment,
+                    pendingLocalization, this.startTime,
+                    pendingDownload, this.profileActions,
+                    this.pendingStopProfileActions);
+        }
+        
+        public DynamicState withPendingLocalization(Future<Void> 
pendingDownload) {
+            return withPendingLocalization(this.pendingLocalization, 
pendingDownload);
+        }
+        
+        public DynamicState withState(final MachineState state) {
+            long newStartTime = Time.currentTimeMillis();
+            return new DynamicState(state, this.newAssignment,
+                    this.container, this.currentAssignment,
+                    this.pendingLocalization, newStartTime,
+                    this.pendingDownload, this.profileActions,
+                    this.pendingStopProfileActions);
+        }
+
+        public DynamicState withCurrentAssignment(final Container container, 
final LocalAssignment currentAssignment) {
+            return new DynamicState(this.state, this.newAssignment,
+                    container, currentAssignment,
+                    this.pendingLocalization, this.startTime,
+                    this.pendingDownload, this.profileActions,
+                    this.pendingStopProfileActions);
+        }
+
+        public DynamicState withProfileActions(Set<TopoProfileAction> 
profileActions, Set<TopoProfileAction> pendingStopProfileActions) {
+            return new DynamicState(this.state, this.newAssignment,
+                    this.container, this.currentAssignment,
+                    this.pendingLocalization, this.startTime,
+                    this.pendingDownload, profileActions,
+                    pendingStopProfileActions);
+        }
+    };
+    
+    static class TopoProfileAction {
+        public final String topoId;
+        public final ProfileRequest request;
+        
+        public TopoProfileAction(String topoId, ProfileRequest request) {
+            this.topoId = topoId;
+            this.request = request;
+        }
+        
+        @Override
+        public int hashCode() {
+            return (37 * topoId.hashCode()) + request.hashCode(); 
+        }
+        
+        @Override
+        public boolean equals(Object other) {
+            if (!(other instanceof TopoProfileAction)) {
+                return false;
+            }
+            TopoProfileAction o = (TopoProfileAction) other;
+            return topoId.equals(o.topoId) && request.equals(o.request);
+        }
+        
+        @Override
+        public String toString() {
+            return "{ " + topoId + ": " + request + " }";
+        }
+    }
+    
+    static boolean equivalent(LocalAssignment a, LocalAssignment b) {
+        if (a == null && b == null) {
+            return true;
+        }
+        if (a != null && b != null) {
+            if (a.get_topology_id().equals(b.get_topology_id())) {
+                Set<ExecutorInfo> aexec = new HashSet<>(a.get_executors());
+                Set<ExecutorInfo> bexec = new HashSet<>(b.get_executors());
+                if (aexec.equals(bexec)) {
+                    boolean aHasResources = a.is_set_resources();
+                    boolean bHasResources = b.is_set_resources();
+                    if (!aHasResources && !bHasResources) {
+                        return true;
+                    }
+                    if (aHasResources && bHasResources) {
+                        if (a.get_resources().equals(b.get_resources())) {
+                            return true;
+                        }
+                    }
+                }
+            }
+        }
+        return false;
+    }
+    
+    static DynamicState stateMachineStep(DynamicState dynamicState, 
StaticState staticState) throws Exception {
+        LOG.debug("STATE {}", dynamicState.state);
+        switch (dynamicState.state) {
+            case EMPTY:
+                return handleEmpty(dynamicState, staticState);
+            case RUNNING:
+                return handleRunning(dynamicState, staticState);
+            case WAITING_FOR_WORKER_START:
+                return handleWaitingForWorkerStart(dynamicState, staticState);
+            case KILL_AND_RELAUNCH:
+                return handleKillAndRelaunch(dynamicState, staticState);
+            case KILL:
+                return handleKill(dynamicState, staticState);
+            case WAITING_FOR_BASIC_LOCALIZATION:
+                return handleWaitingForBasicLocalization(dynamicState, 
staticState);
+            case WAITING_FOR_BLOB_LOCALIZATION:
+                return handleWaitingForBlobLocalization(dynamicState, 
staticState);
+            default:
+                throw new IllegalStateException("Code not ready to handle a 
state of "+dynamicState.state);
+        }
+    }
+    
+    /**
+     * Prepare for a new assignment by downloading new required blobs, or 
going to empty if there is nothing to download.
+     * PRECONDITION: The slot should be empty
+     * @param dynamicState current state
+     * @param staticState static data
+     * @return the next state
+     * @throws IOException on any error
+     */
+    static DynamicState prepareForNewAssignmentNoWorkersRunning(DynamicState 
dynamicState, StaticState staticState) throws IOException {
+        assert(dynamicState.container == null);
+        
+        if (dynamicState.newAssignment == null) {
+            return dynamicState.withState(MachineState.EMPTY);
+        }
+        Future<Void> pendingDownload = 
staticState.localizer.requestDownloadBaseTopologyBlobs(dynamicState.newAssignment,
 staticState.port);
+        return 
dynamicState.withPendingLocalization(dynamicState.newAssignment, 
pendingDownload).withState(MachineState.WAITING_FOR_BASIC_LOCALIZATION);
+    }
+    
+    /**
+     * Kill the current container and start downloading what the new 
assignment needs, if there is a new assignment
+     * PRECONDITION: container != null
+     * @param dynamicState current state
+     * @param staticState static data
+     * @return the next state
+     * @throws Exception 
+     */
+    static DynamicState killContainerForChangedAssignment(DynamicState 
dynamicState, StaticState staticState) throws Exception {
+        assert(dynamicState.container != null);
+        
+        staticState.iSupervisor.killedWorker(staticState.port);
+        dynamicState.container.kill();
+        Future<Void> pendingDownload = null;
+        if (dynamicState.newAssignment != null) {
+            pendingDownload = 
staticState.localizer.requestDownloadBaseTopologyBlobs(dynamicState.newAssignment,
 staticState.port);
+        }
+        Time.sleep(staticState.killSleepMs);
+        return 
dynamicState.withPendingLocalization(dynamicState.newAssignment, 
pendingDownload).withState(MachineState.KILL);
+    }
+    
+    /**
+     * Kill the current container and relaunch it.  (Something odd happened)
+     * PRECONDITION: container != null
+     * @param dynamicState current state
+     * @param staticState static data
+     * @return the next state
+     * @throws Exception 
+     */
+    static DynamicState killAndRelaunchContainer(DynamicState dynamicState, 
StaticState staticState) throws Exception {
+        assert(dynamicState.container != null);
+        
+        dynamicState.container.kill();
+        Time.sleep(staticState.killSleepMs);
+        
+        //any stop profile actions that hadn't timed out yet, we should 
restart after the worker is running again.
+        HashSet<TopoProfileAction> mod = new 
HashSet<>(dynamicState.profileActions);
+        mod.addAll(dynamicState.pendingStopProfileActions);
+        return 
dynamicState.withState(MachineState.KILL_AND_RELAUNCH).withProfileActions(mod, 
Collections.<TopoProfileAction> emptySet());
+    }
+    
+    /**
+     * Clean up a container
+     * PRECONDITION: All of the processes have died.
+     * @param dynamicState current state
+     * @param staticState static data
+     * @param nextState the next MachineState to go to.
+     * @return the next state.
+     */
+    static DynamicState cleanupCurrentContainer(DynamicState dynamicState, 
StaticState staticState, MachineState nextState) throws Exception {
+        assert(dynamicState.container != null);
+        assert(dynamicState.currentAssignment != null);
+        assert(dynamicState.container.areAllProcessesDead());
+        
+        dynamicState.container.cleanUp();
+        staticState.localizer.releaseSlotFor(dynamicState.currentAssignment, 
staticState.port);
+        DynamicState ret = dynamicState.withCurrentAssignment(null, null);
+        if (nextState != null) {
+            ret = ret.withState(nextState);
+        }
+        return ret;
+    }
+    
+    /**
+     * State Transitions for WAITING_FOR_BLOB_LOCALIZATION state.
+     * PRECONDITION: neither pendingLocalization nor pendingDownload is null.
+     * PRECONDITION: The slot should be empty
+     * @param dynamicState current state
+     * @param staticState static data
+     * @return the next state
+     * @throws Exception on any error
+     */
+    static DynamicState handleWaitingForBlobLocalization(DynamicState 
dynamicState, StaticState staticState) throws Exception {
+        assert(dynamicState.pendingLocalization != null);
+        assert(dynamicState.pendingDownload != null);
+        assert(dynamicState.container == null);
+        
+        //Ignore changes to scheduling while downloading the topology blobs
+        // We don't support canceling the download through the future yet,
+        // so to keep everything in sync, just wait
+        try {
+            dynamicState.pendingDownload.get(1000, TimeUnit.MILLISECONDS);
+            //Downloading of all blobs finished.
+            if (!equivalent(dynamicState.newAssignment, 
dynamicState.pendingLocalization)) {
+                //Scheduling changed
+                
staticState.localizer.releaseSlotFor(dynamicState.pendingLocalization, 
staticState.port);
+                return prepareForNewAssignmentNoWorkersRunning(dynamicState, 
staticState);
+            }
+            Container c = 
staticState.containerLauncher.launchContainer(staticState.port, 
dynamicState.pendingLocalization, staticState.localState);
+            return dynamicState.withCurrentAssignment(c, 
dynamicState.pendingLocalization).withState(MachineState.WAITING_FOR_WORKER_START).withPendingLocalization(null,
 null);
+        } catch (TimeoutException e) {
+            //We waited for 1 second loop around and try again....
+            return dynamicState;
+        }
+    }
+    
+    /**
+     * State Transitions for WAITING_FOR_BASIC_LOCALIZATION state.
+     * PRECONDITION: neither pendingLocalization nor pendingDownload is null.
+     * PRECONDITION: The slot should be empty
+     * @param dynamicState current state
+     * @param staticState static data
+     * @return the next state
+     * @throws Exception on any error
+     */
+    static DynamicState handleWaitingForBasicLocalization(DynamicState 
dynamicState, StaticState staticState) throws Exception {
+        assert(dynamicState.pendingLocalization != null);
+        assert(dynamicState.pendingDownload != null);
+        assert(dynamicState.container == null);
+        
+        //Ignore changes to scheduling while downloading the topology code
+        // We don't support canceling the download through the future yet,
+        // so to keep everything in sync, just wait
+        try {
+            dynamicState.pendingDownload.get(1000, TimeUnit.MILLISECONDS);
+            Future<Void> pendingDownload = 
staticState.localizer.requestDownloadTopologyBlobs(dynamicState.pendingLocalization,
 staticState.port);
+            return 
dynamicState.withPendingLocalization(pendingDownload).withState(MachineState.WAITING_FOR_BLOB_LOCALIZATION);
+        } catch (TimeoutException e) {
+            return dynamicState;
+        }
+    }
+
+    /**
+     * State Transitions for KILL state.
+     * PRECONDITION: container.kill() was called
+     * PRECONDITION: container != null && currentAssignment != null
+     * @param dynamicState current state
+     * @param staticState static data
+     * @return the next state
+     * @throws Exception on any error
+     */
+    static DynamicState handleKill(DynamicState dynamicState, StaticState 
staticState) throws Exception {
+        assert(dynamicState.container != null);
+        assert(dynamicState.currentAssignment != null);
+        
+        if (dynamicState.container.areAllProcessesDead()) {
+            LOG.warn("SLOT {} all processes are dead...", staticState.port);
+            return cleanupCurrentContainer(dynamicState, staticState, 
+                    dynamicState.pendingLocalization == null ? 
MachineState.EMPTY : MachineState.WAITING_FOR_BASIC_LOCALIZATION);
+        }
+
+        LOG.warn("SLOT {} force kill and wait...", staticState.port);
+        dynamicState.container.forceKill();
+        Time.sleep(staticState.killSleepMs);
+        return dynamicState;
+    }
+
+    /**
+     * State Transitions for KILL_AND_RELAUNCH state.
+     * PRECONDITION: container.kill() was called
+     * PRECONDITION: container != null && currentAssignment != null
+     * @param dynamicState current state
+     * @param staticState static data
+     * @return the next state
+     * @throws Exception on any error
+     */
+    static DynamicState handleKillAndRelaunch(DynamicState dynamicState, 
StaticState staticState) throws Exception {
+        assert(dynamicState.container != null);
+        assert(dynamicState.currentAssignment != null);
+        
+        if (dynamicState.container.areAllProcessesDead()) {
+            if (equivalent(dynamicState.newAssignment, 
dynamicState.currentAssignment)) {
+                dynamicState.container.cleanUpForRestart();
+                dynamicState.container.relaunch();
+                return 
dynamicState.withState(MachineState.WAITING_FOR_WORKER_START);
+            }
+            //Scheduling changed after we killed all of the processes
+            return 
prepareForNewAssignmentNoWorkersRunning(cleanupCurrentContainer(dynamicState, 
staticState, null), staticState);
+        }
+        //The child processes typically exit in < 1 sec.  If 2 mins later they 
are still around something is wrong
+        if ((Time.currentTimeMillis() - dynamicState.startTime) > 120_000) {
+            throw new RuntimeException("Not all processes in " + 
dynamicState.container + " exited after 120 seconds");
+        }
+        dynamicState.container.forceKill();
+        Time.sleep(staticState.killSleepMs);
+        return dynamicState;
+    }
+
+    /**
+     * State Transitions for WAITING_FOR_WORKER_START state.
+     * PRECONDITION: container != null && currentAssignment != null
+     * @param dynamicState current state
+     * @param staticState static data
+     * @return the next state
+     * @throws Exception on any error
+     */
+    static DynamicState handleWaitingForWorkerStart(DynamicState dynamicState, 
StaticState staticState) throws Exception {
+        assert(dynamicState.container != null);
+        assert(dynamicState.currentAssignment != null);
+        
+        LSWorkerHeartbeat hb = dynamicState.container.readHeartbeat();
+        if (hb != null) {
+            long hbAgeMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 
1000;
+            if (hbAgeMs <= staticState.hbTimeoutMs) {
+                return dynamicState.withState(MachineState.RUNNING);
+            }
+        }
+        
+        if (!equivalent(dynamicState.newAssignment, 
dynamicState.currentAssignment)) {
+            //We were rescheduled while waiting for the worker to come up
+            LOG.warn("SLOT {}: Assignment Changed from {} to {}", 
staticState.port, dynamicState.currentAssignment, dynamicState.newAssignment);
+            return killContainerForChangedAssignment(dynamicState, 
staticState);
+        }
+        
+        long timeDiffms = (Time.currentTimeMillis() - dynamicState.startTime);
+        if (timeDiffms > staticState.firstHbTimeoutMs) {
+            LOG.warn("SLOT {}: Container {} failed to launch in {} ms.", 
staticState.port, dynamicState.container, staticState.firstHbTimeoutMs);
+            return killAndRelaunchContainer(dynamicState, staticState);
+        }
+        Time.sleep(1000);
+        return dynamicState;
+    }
+
+    /**
+     * State Transitions for RUNNING state.
+     * PRECONDITION: container != null && currentAssignment != null
+     * @param dynamicState current state
+     * @param staticState static data
+     * @return the next state
+     * @throws Exception on any error
+     */
+    static DynamicState handleRunning(DynamicState dynamicState, StaticState 
staticState) throws Exception {
+        assert(dynamicState.container != null);
+        assert(dynamicState.currentAssignment != null);
+        
+        if (!equivalent(dynamicState.newAssignment, 
dynamicState.currentAssignment)) {
+            LOG.warn("SLOT {}: Assignment Changed from {} to {}", 
staticState.port, dynamicState.currentAssignment, dynamicState.newAssignment);
+            //Scheduling changed while running...
+            return killContainerForChangedAssignment(dynamicState, 
staticState);
+        }
+        if (dynamicState.container.didMainProcessExit()) {
+            LOG.warn("SLOT {}: main process has exited", staticState.port);
+            return killAndRelaunchContainer(dynamicState, staticState);
+        }
+        
+        LSWorkerHeartbeat hb = dynamicState.container.readHeartbeat();
+        if (hb == null) {
+            LOG.warn("SLOT {}: HB returned as null", staticState.port);
+            //This can happen if the supervisor crashed after launching a
+            // worker that never came up.
+            return killAndRelaunchContainer(dynamicState, staticState);
+        }
+        
+        long timeDiffMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 1000;
+        if (timeDiffMs > staticState.hbTimeoutMs) {
+            LOG.warn("SLOT {}: HB is too old {} > {}", staticState.port, 
timeDiffMs, staticState.hbTimeoutMs);
+            return killAndRelaunchContainer(dynamicState, staticState);
+        }
+        
+        //The worker is up and running check for profiling requests
+        if (!dynamicState.profileActions.isEmpty()) {
+            HashSet<TopoProfileAction> mod = new 
HashSet<>(dynamicState.profileActions);
+            HashSet<TopoProfileAction> modPending = new 
HashSet<>(dynamicState.pendingStopProfileActions);
+            Iterator<TopoProfileAction> iter = mod.iterator();
+            while (iter.hasNext()) {
+                TopoProfileAction action = iter.next();
+                if 
(!action.topoId.equals(dynamicState.currentAssignment.get_topology_id())) {
+                    iter.remove();
+                    LOG.warn("Dropping {} wrong topology is running", action);
+                    //Not for this topology so skip it
+                } else {
+                    if (modPending.contains(action)) {
+                        boolean isTimeForStop = Time.currentTimeMillis() > 
action.request.get_time_stamp();
+                        if (isTimeForStop) {
+                            if 
(dynamicState.container.runProfiling(action.request, true)) {
+                                LOG.debug("Stopped {} action finished", 
action);
+                                iter.remove();
+                                modPending.remove(action);
+                            } else {
+                                LOG.warn("Stopping {} failed, will be 
retried", action);
+                            }
+                        } else {
+                            LOG.debug("Still pending {} now: {}", action, 
Time.currentTimeMillis());
+                        }
+                    } else {
+                        //J_PROFILE_START is not used.  When you see a 
J_PROFILE_STOP
+                        // start profiling and save it away to stop when 
timeout happens
+                        if (action.request.get_action() == 
ProfileAction.JPROFILE_STOP) {
+                            if 
(dynamicState.container.runProfiling(action.request, false)) {
+                                modPending.add(action);
+                                LOG.debug("Started {} now: {}", action, 
Time.currentTimeMillis());
+                            } else {
+                                LOG.warn("Starting {} failed, will be 
retried", action);
+                            }
+                        } else {
+                            if 
(dynamicState.container.runProfiling(action.request, false)) {
+                                LOG.debug("Started {} action finished", 
action);
+                                iter.remove();
+                            } else {
+                                LOG.warn("Starting {} failed, will be 
retried", action);
+                            }
+                        }
+                    }
+                }
+            }
+            dynamicState = dynamicState.withProfileActions(mod, modPending);
+        }
+        Time.sleep(staticState.monitorFreqMs);
+        return dynamicState;
+    }
+
+    static DynamicState handleEmpty(DynamicState dynamicState, StaticState 
staticState) throws InterruptedException, IOException {
+        if (!equivalent(dynamicState.newAssignment, 
dynamicState.currentAssignment)) {
+            return prepareForNewAssignmentNoWorkersRunning(dynamicState, 
staticState);
+        }
+        //Both assignments are null, just wait
+        if (dynamicState.profileActions != null && 
!dynamicState.profileActions.isEmpty()) {
+            //Nothing is scheduled here so throw away all of the profileActions
+            LOG.warn("Dropping {} no topology is running", 
dynamicState.profileActions);
+            dynamicState = 
dynamicState.withProfileActions(Collections.<TopoProfileAction> emptySet(), 
Collections.<TopoProfileAction> emptySet());
+        }
+        Time.sleep(1000);
+        return dynamicState;
+    }
+    
+    private final AtomicReference<LocalAssignment> newAssignment = new 
AtomicReference<>();
+    private final AtomicReference<Set<TopoProfileAction>> profiling =
+            new AtomicReference<Set<TopoProfileAction>>(new 
HashSet<TopoProfileAction>());
+    private final StaticState staticState;
+    private final IStormClusterState clusterState;
+    private volatile boolean done = false;
+    private volatile DynamicState dynamicState;
+    private final AtomicReference<Map<Long, LocalAssignment>> 
cachedCurrentAssignments;
+    
+    public Slot(ILocalizer localizer, Map<String, Object> conf, 
+            ContainerLauncher containerLauncher, String host,
+            int port, LocalState localState,
+            IStormClusterState clusterState,
+            ISupervisor iSupervisor,
+            AtomicReference<Map<Long, LocalAssignment>> 
cachedCurrentAssignments) throws Exception {
+        super("SLOT_"+port);
+
+        this.cachedCurrentAssignments = cachedCurrentAssignments;
+        this.clusterState = clusterState;
+        Map<Integer, LocalAssignment> assignments = 
localState.getLocalAssignmentsMap();
+        LocalAssignment currentAssignment = null;
+        if (assignments != null) {
+            currentAssignment = assignments.get(port);
+        }
+        Container container = null;
+        if (currentAssignment != null) { 
+            try {
+                container = containerLauncher.recoverContainer(port, 
currentAssignment, localState);
+            } catch (ContainerRecoveryException e) {
+                //We could not recover container will be null.
+            }
+        }
+        
+        LocalAssignment newAssignment = currentAssignment;
+        if (currentAssignment != null && container == null) {
+            currentAssignment = null;
+            //Assigned something but it is not running
+        }
+        
+        dynamicState = new DynamicState(currentAssignment, container, 
newAssignment);
+        staticState = new StaticState(localizer, 
+                Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)) 
* 1000,
+                
Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_START_TIMEOUT_SECS)) * 1000,
+                
Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)) * 1000,
+                
Utils.getInt(conf.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS)) * 1000,
+                containerLauncher,
+                host,
+                port,
+                iSupervisor,
+                localState);
+        this.newAssignment.set(dynamicState.newAssignment);
+        if (MachineState.RUNNING == dynamicState.state) {
+            //We are running so we should recover the blobs.
+            staticState.localizer.recoverRunningTopology(currentAssignment, 
port);
+            saveNewAssignment(currentAssignment);
+        }
+        LOG.warn("SLOT {}:{} Starting in state {} - assignment {}", 
staticState.host, staticState.port, dynamicState.state, 
dynamicState.currentAssignment);
+    }
+    
+    public MachineState getMachineState() {
+        return dynamicState.state;
+    }
+    
+    /**
+     * Set a new assignment asynchronously
+     * @param newAssignment the new assignment for this slot to run, null to 
run nothing
+     */
+    public void setNewAssignment(LocalAssignment newAssignment) {
+        this.newAssignment.set(newAssignment);
+    }
+    
+    public void addProfilerActions(Set<TopoProfileAction> actions) {
+        if (actions != null) {
+            while(true) {
+                Set<TopoProfileAction> orig = profiling.get();
+                Set<TopoProfileAction> newActions = new HashSet<>(orig);
+                newActions.addAll(actions);
+                if (profiling.compareAndSet(orig, newActions)) {
+                    return;
+                }
+            }
+        }
+    }
+    
+    public String getWorkerId() {
+        String workerId = null;
+        Container c = dynamicState.container;
+        if (c != null) {
+            workerId = c.getWorkerId();
+        }
+        return workerId;
+    }
+    
+    private void saveNewAssignment(LocalAssignment assignment) {
+        synchronized(staticState.localState) {
+            Map<Integer, LocalAssignment> assignments = 
staticState.localState.getLocalAssignmentsMap();
+            if (assignments == null) {
+                assignments = new HashMap<>();
+            }
+            if (assignment == null) {
+                assignments.remove(staticState.port);
+            } else {
+                assignments.put(staticState.port, assignment);
+            }
+            staticState.localState.setLocalAssignmentsMap(assignments);
+        }
+        Map<Long, LocalAssignment> update = null;
+        Map<Long, LocalAssignment> orig = null;
+        do {
+            Long lport = new Long(staticState.port);
+            orig = cachedCurrentAssignments.get();
+            update = new HashMap<>(orig);
+            if (assignment == null) {
+                update.remove(lport);
+            } else {
+                update.put(lport, assignment);
+            }
+        } while (!cachedCurrentAssignments.compareAndSet(orig, update));
+    }
+    
+    public void run() {
+        try {
+            while(!done) {
+                Set<TopoProfileAction> origProfileActions = new 
HashSet<>(profiling.get());
+                Set<TopoProfileAction> removed = new 
HashSet<>(origProfileActions);
+                
+                DynamicState nextState = 
+                        
stateMachineStep(dynamicState.withNewAssignment(newAssignment.get())
+                                .withProfileActions(origProfileActions, 
dynamicState.pendingStopProfileActions), staticState);
+
+                if (LOG.isDebugEnabled() || dynamicState.state != 
nextState.state) {
+                    LOG.info("STATE {} -> {}", dynamicState, nextState);
+                }
+                //Save the current state for recovery
+                if (!equivalent(nextState.currentAssignment, 
dynamicState.currentAssignment)) {
+                    LOG.info("SLOT {}: Changing current assignment from {} to 
{}", staticState.port, dynamicState.currentAssignment, 
nextState.currentAssignment);
+                    saveNewAssignment(nextState.currentAssignment);
+                }
+                
+                // clean up the profiler actions that are not being processed
+                removed.removeAll(dynamicState.profileActions);
+                removed.removeAll(dynamicState.pendingStopProfileActions);
+                for (TopoProfileAction action: removed) {
+                    try {
+                        
clusterState.deleteTopologyProfileRequests(action.topoId, action.request);
+                    } catch (Exception e) {
+                        LOG.error("Error trying to remove profiling request, 
it will be retried", e);
+                    }
+                }
+                Set<TopoProfileAction> orig, copy;
+                do {
+                    orig = profiling.get();
+                    copy = new HashSet<>(orig);
+                    copy.removeAll(removed);
+                } while (!profiling.compareAndSet(orig, copy));
+                dynamicState = nextState;
+            }
+        } catch (Throwable e) {
+            if (!Utils.exceptionCauseIsInstanceOf(InterruptedException.class, 
e)) {
+                LOG.error("Error when processing event", e);
+                Utils.exitProcess(20, "Error when processing an event");
+            }
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        done = true;
+        this.interrupt();
+        this.join();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java
deleted file mode 100644
index 28dffd7..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.supervisor;
-
-public enum State {
-    VALID, DISALLOWED, NOT_STARTED, TIMED_OUT;
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java
deleted file mode 100644
index f4f40a1..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.supervisor;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-import org.apache.storm.generated.LSWorkerHeartbeat;
-
-public class StateHeartbeat {
-    private State state;
-    private final LSWorkerHeartbeat hb;
-
-    public StateHeartbeat(State state, LSWorkerHeartbeat hb) {
-        this.state = state;
-        this.hb = hb;
-    }
-
-    public State getState() {
-        return this.state;
-    }
-
-    public LSWorkerHeartbeat getHeartbeat() {
-        return this.hb;
-    }
-
-    @Override
-    public String toString() {
-        return ToStringBuilder.reflectionToString(this, 
ToStringStyle.SHORT_PREFIX_STYLE);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
index a3ad488..24af100 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -17,135 +17,231 @@
  */
 package org.apache.storm.daemon.supervisor;
 
+import java.io.File;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.storm.Config;
 import org.apache.storm.StormTimer;
-import org.apache.storm.daemon.supervisor.timer.RunProfilerActions;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.DaemonCommon;
 import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
 import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
 import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
+import org.apache.storm.event.EventManager;
 import org.apache.storm.event.EventManagerImp;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.localizer.AsyncLocalizer;
+import org.apache.storm.localizer.ILocalizer;
 import org.apache.storm.localizer.Localizer;
 import org.apache.storm.messaging.IContext;
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.scheduler.ISupervisor;
 import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.VersionInfo;
+import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.InterruptedIOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-
-public class Supervisor {
+public class Supervisor implements DaemonCommon, AutoCloseable {
     private static final Logger LOG = 
LoggerFactory.getLogger(Supervisor.class);
+    private final Map<String, Object> conf;
+    private final IContext sharedContext;
+    private volatile boolean active;
+    private final ISupervisor iSupervisor;
+    private final Utils.UptimeComputer upTime;
+    private final String stormVersion;
+    private final IStormClusterState stormClusterState;
+    private final LocalState localState;
+    private final String supervisorId;
+    private final String assignmentId;
+    private final String hostName;
+    // used for reporting used ports when heartbeating
+    private final AtomicReference<Map<Long, LocalAssignment>> currAssignment;
+    private final StormTimer heartbeatTimer;
+    private final StormTimer eventTimer;
+    private final StormTimer blobUpdateTimer;
+    private final Localizer localizer;
+    private final ILocalizer asyncLocalizer;
+    private EventManager eventManager;
+    private ReadClusterState readState;
+    
+    private Supervisor(ISupervisor iSupervisor) throws IOException {
+        this(Utils.readStormConfig(), null, iSupervisor);
+    }
+    
+    public Supervisor(Map<String, Object> conf, IContext sharedContext, 
ISupervisor iSupervisor) throws IOException {
+        this.conf = conf;
+        this.iSupervisor = iSupervisor;
+        this.active = true;
+        this.upTime = Utils.makeUptimeComputer();
+        this.stormVersion = VersionInfo.getVersion();
+        this.sharedContext = sharedContext;
+        
+        iSupervisor.prepare(conf, ConfigUtils.supervisorIsupervisorDir(conf));
+        
+        List<ACL> acls = null;
+        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
+            acls = SupervisorUtils.supervisorZkAcls();
+        }
+
+        try {
+            this.stormClusterState = ClusterUtils.mkStormClusterState(conf, 
acls, new ClusterStateContext(DaemonType.SUPERVISOR));
+        } catch (Exception e) {
+            LOG.error("supervisor can't create stormClusterState");
+            throw Utils.wrapInRuntime(e);
+        }
+
+        try {
+            this.localState = ConfigUtils.supervisorState(conf);
+            this.localizer = Utils.createLocalizer(conf, 
ConfigUtils.supervisorLocalDir(conf));
+            this.asyncLocalizer = new AsyncLocalizer(conf, this.localizer);
+        } catch (IOException e) {
+            throw Utils.wrapInRuntime(e);
+        }
+        this.supervisorId = iSupervisor.getSupervisorId();
+        this.assignmentId = iSupervisor.getAssignmentId();
+
+        try {
+            this.hostName = Utils.hostname(conf);
+        } catch (UnknownHostException e) {
+            throw Utils.wrapInRuntime(e);
+        }
+
+        this.currAssignment = new AtomicReference<Map<Long, 
LocalAssignment>>(new HashMap<Long,LocalAssignment>());
+
+        this.heartbeatTimer = new StormTimer(null, new 
DefaultUncaughtExceptionHandler());
+
+        this.eventTimer = new StormTimer(null, new 
DefaultUncaughtExceptionHandler());
+
+        this.blobUpdateTimer = new StormTimer("blob-update-timer", new 
DefaultUncaughtExceptionHandler());
+    }
+    
+    public String getId() {
+        return supervisorId;
+    }
     
-    private SyncProcessEvent localSyncProcess;
+    IContext getSharedContext() {
+        return sharedContext;
+    }
+
+    public Map<String, Object> getConf() {
+        return conf;
+    }
+
+    public ISupervisor getiSupervisor() {
+        return iSupervisor;
+    }
+
+    public Utils.UptimeComputer getUpTime() {
+        return upTime;
+    }
+
+    public String getStormVersion() {
+        return stormVersion;
+    }
+
+    public IStormClusterState getStormClusterState() {
+        return stormClusterState;
+    }
 
-    public void setLocalSyncProcess(SyncProcessEvent localSyncProcess) {
-        this.localSyncProcess = localSyncProcess;
+    LocalState getLocalState() {
+        return localState;
     }
 
+    public String getAssignmentId() {
+        return assignmentId;
+    }
+
+    public String getHostName() {
+        return hostName;
+    }
+
+    public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() {
+        return currAssignment;
+    }
+
+    public Localizer getLocalizer() {
+        return localizer;
+    }
+    
+    ILocalizer getAsyncLocalizer() {
+        return asyncLocalizer;
+    }
+    
+    EventManager getEventManger() {
+        return eventManager;
+    }
+    
     /**
-     * in local state, supervisor stores who its current assignments are 
another thread launches events to restart any dead processes if necessary
-     * 
-     * @param conf
-     * @param sharedContext
-     * @param iSupervisor
-     * @return
-     * @throws Exception
+     * Launch the supervisor
      */
-    public SupervisorManager mkSupervisor(final Map conf, IContext 
sharedContext, ISupervisor iSupervisor) throws Exception {
-        SupervisorManager supervisorManager = null;
-        try {
-            LOG.info("Starting Supervisor with conf {}", conf);
-            iSupervisor.prepare(conf, 
ConfigUtils.supervisorIsupervisorDir(conf));
-            String path = ConfigUtils.supervisorTmpDir(conf);
-            FileUtils.cleanDirectory(new File(path));
-
-            final SupervisorData supervisorData = new SupervisorData(conf, 
sharedContext, iSupervisor);
-            Localizer localizer = supervisorData.getLocalizer();
-
-            SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, 
supervisorData);
-            hb.run();
-            // should synchronize supervisor so it doesn't launch anything 
after being down (optimization)
-            Integer heartbeatFrequency = 
Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
-            supervisorData.getHeartbeatTimer().scheduleRecurring(0, 
heartbeatFrequency, hb);
-
-            Set<String> downloadedStormIds = 
SupervisorUtils.readDownLoadedStormIds(conf);
-            for (String stormId : downloadedStormIds) {
-                SupervisorUtils.addBlobReferences(localizer, stormId, conf);
-            }
-            // do this after adding the references so we don't try to clean 
things being used
-            localizer.startCleaner();
+    public void launch() throws Exception {
+        LOG.info("Starting Supervisor with conf {}", conf);
+        String path = ConfigUtils.supervisorTmpDir(conf);
+        FileUtils.cleanDirectory(new File(path));
 
-            EventManagerImp syncSupEventManager = new EventManagerImp(false);
-            EventManagerImp syncProcessManager = new EventManagerImp(false);
-
-            SyncProcessEvent syncProcessEvent = null;
-            if (ConfigUtils.isLocalMode(conf)) {
-                localSyncProcess.init(supervisorData);
-                syncProcessEvent = localSyncProcess;
-            } else {
-                syncProcessEvent = new SyncProcessEvent(supervisorData);
-            }
+        Localizer localizer = getLocalizer();
 
-            SyncSupervisorEvent syncSupervisorEvent = new 
SyncSupervisorEvent(supervisorData, syncProcessEvent, syncSupEventManager, 
syncProcessManager);
-            UpdateBlobs updateBlobsThread = new UpdateBlobs(supervisorData);
-            RunProfilerActions runProfilerActionThread = new 
RunProfilerActions(supervisorData);
+        SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, this);
+        hb.run();
+        // should synchronize supervisor so it doesn't launch anything after 
being down (optimization)
+        Integer heartbeatFrequency = 
Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
+        heartbeatTimer.scheduleRecurring(0, heartbeatFrequency, hb);
 
-            if ((Boolean) conf.get(Config.SUPERVISOR_ENABLE)) {
-                StormTimer eventTimer = supervisorData.getEventTimer();
-                // This isn't strictly necessary, but it doesn't hurt and 
ensures that the machine stays up
-                // to date even if callbacks don't all work exactly right
-                eventTimer.scheduleRecurring(0, 10, new 
EventManagerPushCallback(syncSupervisorEvent, syncSupEventManager));
+        this.eventManager = new EventManagerImp(false);
+        this.readState = new ReadClusterState(this);
+        
+        Set<String> downloadedTopoIds = 
SupervisorUtils.readDownloadedTopologyIds(conf);
+        for (String topoId : downloadedTopoIds) {
+            SupervisorUtils.addBlobReferences(localizer, topoId, conf);
+        }
+        // do this after adding the references so we don't try to clean things 
being used
+        localizer.startCleaner();
 
-                eventTimer.scheduleRecurring(0, 
Utils.getInt(conf.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS)),
-                        new EventManagerPushCallback(syncProcessEvent, 
syncProcessManager));
+        UpdateBlobs updateBlobsThread = new UpdateBlobs(this);
 
-                // Blob update thread. Starts with 30 seconds delay, every 30 
seconds
-                supervisorData.getBlobUpdateTimer().scheduleRecurring(30, 30, 
new EventManagerPushCallback(updateBlobsThread, syncSupEventManager));
+        if ((Boolean) conf.get(Config.SUPERVISOR_ENABLE)) {
+            // This isn't strictly necessary, but it doesn't hurt and ensures 
that the machine stays up
+            // to date even if callbacks don't all work exactly right
+            eventTimer.scheduleRecurring(0, 10, new 
EventManagerPushCallback(readState, eventManager));
 
-                // supervisor health check
-                eventTimer.scheduleRecurring(300, 300, new 
SupervisorHealthCheck(supervisorData));
+            // Blob update thread. Starts with 30 seconds delay, every 30 
seconds
+            blobUpdateTimer.scheduleRecurring(30, 30, new 
EventManagerPushCallback(updateBlobsThread, eventManager));
 
-                // Launch a thread that Runs profiler commands . Starts with 
30 seconds delay, every 30 seconds
-                eventTimer.scheduleRecurring(30, 30, new 
EventManagerPushCallback(runProfilerActionThread, syncSupEventManager));
-            }
-            LOG.info("Starting supervisor with id {} at host {}.", 
supervisorData.getSupervisorId(), supervisorData.getHostName());
-            supervisorManager = new SupervisorManager(supervisorData, 
syncSupEventManager, syncProcessManager);
-        } catch (Throwable t) {
-            if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, 
t)) {
-                throw t;
-            } else if 
(Utils.exceptionCauseIsInstanceOf(InterruptedException.class, t)) {
-                throw t;
-            } else {
-                LOG.error("Error on initialization of server supervisor: {}", 
t);
-                Utils.exitProcess(13, "Error on initialization");
-            }
+            // supervisor health check
+            eventTimer.scheduleRecurring(300, 300, new 
SupervisorHealthCheck(this));
         }
-        return supervisorManager;
+        LOG.info("Starting supervisor with id {} at host {}.", getId(), 
getHostName());
     }
 
     /**
      * start distribute supervisor
      */
-    private void launch(ISupervisor iSupervisor) {
+    private void launchDaemon() {
         LOG.info("Starting supervisor for storm version '{}'.", 
VersionInfo.getVersion());
-        SupervisorManager supervisorManager;
         try {
-            Map<Object, Object> conf = Utils.readStormConfig();
+            Map<String, Object> conf = getConf();
             if (ConfigUtils.isLocalMode(conf)) {
                 throw new IllegalArgumentException("Cannot start server in 
local mode!");
             }
-            supervisorManager = mkSupervisor(conf, null, iSupervisor);
-            if (supervisorManager != null)
-                Utils.addShutdownHookWithForceKillIn1Sec(supervisorManager);
+            launch();
+            Utils.addShutdownHookWithForceKillIn1Sec(() -> {this.close();});
             registerWorkerNumGauge("supervisor:num-slots-used-gauge", conf);
             StormMetricsRegistry.startMetricsReporters(conf);
         } catch (Exception e) {
@@ -154,7 +250,7 @@ public class Supervisor {
         }
     }
 
-    private void registerWorkerNumGauge(String name, final Map conf) {
+    private void registerWorkerNumGauge(String name, final Map<String, Object> 
conf) {
         StormMetricsRegistry.registerGauge(name, new Callable<Integer>() {
             @Override
             public Integer call() throws Exception {
@@ -163,15 +259,99 @@ public class Supervisor {
             }
         });
     }
+    
+    @Override
+    public void close() {
+        try {
+            LOG.info("Shutting down supervisor {}", getId());
+            this.active = false;
+            heartbeatTimer.close();
+            eventTimer.close();
+            blobUpdateTimer.close();
+            if (eventManager != null) {
+                eventManager.close();
+            }
+            if (readState != null) {
+                readState.close();
+            }
+            getStormClusterState().disconnect();
+        } catch (Exception e) {
+            LOG.error("Error Shutting down", e);
+        }
+    }
+    
+    void killWorkers(Collection<String> workerIds, ContainerLauncher launcher) 
throws InterruptedException, IOException {
+        HashSet<Killable> containers = new HashSet<>();
+        for (String workerId : workerIds) {
+            try {
+                Killable k = launcher.recoverContainer(workerId, localState);
+                if (!k.areAllProcessesDead()) {
+                    k.kill();
+                    containers.add(k);
+                } else {
+                    k.cleanUp();
+                }
+            } catch (Exception e) {
+                LOG.error("Error trying to kill {}", workerId, e);
+            }
+        }
+        int shutdownSleepSecs = 
Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS), 1);
+        if (!containers.isEmpty()) {
+            Time.sleepSecs(shutdownSleepSecs);
+        }
+        for (Killable k: containers) {
+            try {
+                k.forceKill();
+                long start = Time.currentTimeMillis();
+                while(!k.areAllProcessesDead()) {
+                    if ((Time.currentTimeMillis() - start) > 10_000) {
+                        throw new RuntimeException("Giving up on killing " + k 
+                                + " after " + (Time.currentTimeMillis() - 
start) + " ms");
+                    }
+                    Time.sleep(100);
+                    k.forceKill();
+                }
+                k.cleanUp();
+            } catch (Exception e) {
+                LOG.error("Error trying to clean up {}", k, e);
+            }
+        }
+    }
+
+    public void shutdownAllWorkers() {
+        if (readState != null) {
+            readState.shutdownAllWorkers();
+        } else {
+            try {
+                ContainerLauncher launcher = ContainerLauncher.make(getConf(), 
getId(), getSharedContext());
+                killWorkers(SupervisorUtils.supervisorWorkerIds(conf), 
launcher);
+            } catch (Exception e) {
+                throw Utils.wrapInRuntime(e);
+            }
+        }
+    }
+
+    @Override
+    public boolean isWaiting() {
+        if (!active) {
+            return true;
+        }
+
+        if (heartbeatTimer.isTimerWaiting() && eventTimer.isTimerWaiting() && 
eventManager.waiting()) {
+            return true;
+        }
+        return false;
+    }
 
     /**
      * supervisor daemon enter entrance
      *
      * @param args
      */
-    public static void main(String[] args) {
+    public static void main(String[] args) throws IOException {
         Utils.setupDefaultUncaughtExceptionHandler();
-        Supervisor instance = new Supervisor();
-        instance.launch(new StandaloneSupervisor());
+        @SuppressWarnings("resource")
+        Supervisor instance = new Supervisor(new StandaloneSupervisor());
+        instance.launchDaemon();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorDaemon.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorDaemon.java 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorDaemon.java
deleted file mode 100644
index 115c7c6..0000000
--- 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorDaemon.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.supervisor;
-
-import java.util.Map;
-
-public interface SupervisorDaemon {
-    String getId();
-
-    Map getConf();
-
-    void shutdownAllWorkers();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
deleted file mode 100644
index da4102c..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.supervisor;
-
-import org.apache.storm.Config;
-import org.apache.storm.StormTimer;
-import org.apache.storm.cluster.ClusterStateContext;
-import org.apache.storm.cluster.ClusterUtils;
-import org.apache.storm.cluster.DaemonType;
-import org.apache.storm.cluster.IStormClusterState;
-import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
-import org.apache.storm.generated.LocalAssignment;
-import org.apache.storm.generated.ProfileRequest;
-import org.apache.storm.localizer.Localizer;
-import org.apache.storm.messaging.IContext;
-import org.apache.storm.scheduler.ISupervisor;
-import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.LocalState;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.VersionInfo;
-import org.apache.zookeeper.data.ACL;
-import org.eclipse.jetty.util.ConcurrentHashSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class SupervisorData {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorData.class);
-
-    private final Map conf;
-    private final IContext sharedContext;
-    private volatile boolean active;
-    private final ISupervisor iSupervisor;
-    private final Utils.UptimeComputer upTime;
-    private final String stormVersion;
-    private final ConcurrentHashMap<String, String> workerThreadPids; // for 
local mode
-    private final IStormClusterState stormClusterState;
-    private final LocalState localState;
-    private final String supervisorId;
-    private final String assignmentId;
-    private final String hostName;
-    // used for reporting used ports when heartbeating
-    private final AtomicReference<Map<Long, LocalAssignment>> currAssignment;
-    private final StormTimer heartbeatTimer;
-    private final StormTimer eventTimer;
-    private final StormTimer blobUpdateTimer;
-    private final Localizer localizer;
-    private final AtomicReference<Map<String, Map<String, Object>>> 
assignmentVersions;
-    private final AtomicInteger syncRetry;
-    private final Object downloadLock = new Object();
-    private final AtomicReference<Map<String, List<ProfileRequest>>> 
stormIdToProfilerActions;
-    private final ConcurrentHashSet<String> deadWorkers;
-    private final IWorkerManager workerManager;
-
-    public SupervisorData(Map conf, IContext sharedContext, ISupervisor 
iSupervisor) {
-        this.conf = conf;
-        this.sharedContext = sharedContext;
-        this.iSupervisor = iSupervisor;
-        this.active = true;
-        this.upTime = Utils.makeUptimeComputer();
-        this.stormVersion = VersionInfo.getVersion();
-        this.workerThreadPids = new ConcurrentHashMap<String, String>();
-        this.deadWorkers = new ConcurrentHashSet();
-
-        List<ACL> acls = null;
-        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
-            acls = SupervisorUtils.supervisorZkAcls();
-        }
-
-        try {
-            this.stormClusterState = ClusterUtils.mkStormClusterState(conf, 
acls, new ClusterStateContext(DaemonType.SUPERVISOR));
-        } catch (Exception e) {
-            LOG.error("supervisor can't create stormClusterState");
-            throw Utils.wrapInRuntime(e);
-        }
-
-        try {
-            this.localState = ConfigUtils.supervisorState(conf);
-            this.localizer = Utils.createLocalizer(conf, 
ConfigUtils.supervisorLocalDir(conf));
-        } catch (IOException e) {
-            throw Utils.wrapInRuntime(e);
-        }
-        this.supervisorId = iSupervisor.getSupervisorId();
-        this.assignmentId = iSupervisor.getAssignmentId();
-
-        try {
-            this.hostName = Utils.hostname(conf);
-        } catch (UnknownHostException e) {
-            throw Utils.wrapInRuntime(e);
-        }
-
-        this.currAssignment = new AtomicReference<Map<Long, 
LocalAssignment>>(new HashMap<Long,LocalAssignment>());
-
-        this.heartbeatTimer = new StormTimer(null, new 
DefaultUncaughtExceptionHandler());
-
-        this.eventTimer = new StormTimer(null, new 
DefaultUncaughtExceptionHandler());
-
-        this.blobUpdateTimer = new StormTimer("blob-update-timer", new 
DefaultUncaughtExceptionHandler());
-
-        this.assignmentVersions = new AtomicReference<Map<String, Map<String, 
Object>>>(new HashMap<String, Map<String, Object>>());
-        this.syncRetry = new AtomicInteger(0);
-        this.stormIdToProfilerActions = new AtomicReference<Map<String, 
List<ProfileRequest>>>(new HashMap<String, List<ProfileRequest>>());
-        this.workerManager =  Utils.newInstance((String) 
conf.get(Config.STORM_SUPERVISOR_WORKER_MANAGER_PLUGIN));
-        this.workerManager.prepareWorker(conf, localizer);
-    }
-
-    public AtomicReference<Map<String, List<ProfileRequest>>> 
getStormIdToProfilerActions() {
-        return stormIdToProfilerActions;
-    }
-
-    public void setStormIdToProfilerActions(Map<String, List<ProfileRequest>> 
stormIdToProfilerActions) {
-        this.stormIdToProfilerActions.set(stormIdToProfilerActions);
-    }
-
-    public Map getConf() {
-        return conf;
-    }
-
-    public IContext getSharedContext() {
-        return sharedContext;
-    }
-
-    public boolean isActive() {
-        return active;
-    }
-
-    public void setActive(boolean active) {
-        this.active = active;
-    }
-
-    public ISupervisor getiSupervisor() {
-        return iSupervisor;
-    }
-
-    public Utils.UptimeComputer getUpTime() {
-        return upTime;
-    }
-
-    public String getStormVersion() {
-        return stormVersion;
-    }
-
-    public ConcurrentHashMap<String, String> getWorkerThreadPids() {
-        return workerThreadPids;
-    }
-
-    public IStormClusterState getStormClusterState() {
-        return stormClusterState;
-    }
-
-    public LocalState getLocalState() {
-        return localState;
-    }
-
-    public String getSupervisorId() {
-        return supervisorId;
-    }
-
-    public String getAssignmentId() {
-        return assignmentId;
-    }
-
-    public String getHostName() {
-        return hostName;
-    }
-
-    public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() {
-        return currAssignment;
-    }
-
-    public void setCurrAssignment(Map<Long, LocalAssignment> currAssignment) {
-        this.currAssignment.set(currAssignment);
-    }
-
-    public StormTimer getHeartbeatTimer() {
-        return heartbeatTimer;
-    }
-
-    public StormTimer getEventTimer() {
-        return eventTimer;
-    }
-
-    public StormTimer getBlobUpdateTimer() {
-        return blobUpdateTimer;
-    }
-
-    public Localizer getLocalizer() {
-        return localizer;
-    }
-
-    public AtomicInteger getSyncRetry() {
-        return syncRetry;
-    }
-
-    public AtomicReference<Map<String, Map<String, Object>>> 
getAssignmentVersions() {
-        return assignmentVersions;
-    }
-
-    public void setAssignmentVersions(Map<String, Map<String, Object>> 
assignmentVersions) {
-        this.assignmentVersions.set(assignmentVersions);
-    }
-
-    public ConcurrentHashSet getDeadWorkers() {
-        return deadWorkers;
-    }
-
-    public IWorkerManager getWorkerManager() {
-        return workerManager;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java
deleted file mode 100644
index 70363fa..0000000
--- 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.supervisor;
-
-import org.apache.storm.daemon.DaemonCommon;
-import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
-import org.apache.storm.event.EventManager;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.Map;
-
-public class SupervisorManager implements SupervisorDaemon, DaemonCommon, 
Runnable {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorManager.class);
-    private final EventManager eventManager;
-    private final EventManager processesEventManager;
-    private final SupervisorData supervisorData;
-
-    public SupervisorManager(SupervisorData supervisorData, EventManager 
eventManager, EventManager processesEventManager) {
-        this.eventManager = eventManager;
-        this.supervisorData = supervisorData;
-        this.processesEventManager = processesEventManager;
-    }
-
-    public void shutdown() {
-        LOG.info("Shutting down supervisor {}", 
supervisorData.getSupervisorId());
-        supervisorData.setActive(false);
-        try {
-            supervisorData.getHeartbeatTimer().close();
-            supervisorData.getEventTimer().close();
-            supervisorData.getBlobUpdateTimer().close();
-            eventManager.close();
-            processesEventManager.close();
-        } catch (Exception e) {
-            throw Utils.wrapInRuntime(e);
-        }
-        supervisorData.getStormClusterState().disconnect();
-    }
-
-    @Override
-    public void shutdownAllWorkers() {
-        IWorkerManager workerManager = supervisorData.getWorkerManager();
-        SupervisorUtils.shutdownAllWorkers(supervisorData.getConf(), 
supervisorData.getSupervisorId(), supervisorData.getWorkerThreadPids(),
-                supervisorData.getDeadWorkers(), workerManager);
-    }
-
-    @Override
-    public Map getConf() {
-        return supervisorData.getConf();
-    }
-
-    @Override
-    public String getId() {
-        return supervisorData.getSupervisorId();
-    }
-
-    @Override
-    public boolean isWaiting() {
-        if (!supervisorData.isActive()) {
-            return true;
-        }
-
-        if (supervisorData.getHeartbeatTimer().isTimerWaiting() && 
supervisorData.getEventTimer().isTimerWaiting() && eventManager.waiting()
-                && processesEventManager.waiting()) {
-            return true;
-        }
-        return false;
-    }
-
-    public void run() {
-        shutdown();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index 33b73cf..0784631 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -19,14 +19,11 @@ package org.apache.storm.daemon.supervisor;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.storm.Config;
-import org.apache.storm.ProcessSimulator;
-import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
 import org.apache.storm.generated.LSWorkerHeartbeat;
 import org.apache.storm.localizer.LocalResource;
 import org.apache.storm.localizer.Localizer;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.LocalState;
-import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
@@ -36,8 +33,14 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.net.URLDecoder;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
 
 public class SupervisorUtils {
 
@@ -52,8 +55,8 @@ public class SupervisorUtils {
         _instance = INSTANCE;
     }
 
-    public static Process processLauncher(Map conf, String user, List<String> 
commandPrefix, List<String> args, Map<String, String> environment, final String 
logPreFix,
-                                          final Utils.ExitCodeCallable 
exitCodeCallback, File dir) throws IOException {
+    static Process processLauncher(Map<String, Object> conf, String user, 
List<String> commandPrefix, List<String> args, Map<String, String> environment, 
final String logPreFix,
+                                          final ExitCodeCallback 
exitCodeCallback, File dir) throws IOException {
         if (StringUtils.isBlank(user)) {
             throw new IllegalArgumentException("User cannot be blank when 
calling processLauncher.");
         }
@@ -73,10 +76,10 @@ public class SupervisorUtils {
         commands.add(user);
         commands.addAll(args);
         LOG.info("Running as user: {} command: {}", user, commands);
-        return Utils.launchProcess(commands, environment, logPreFix, 
exitCodeCallback, dir);
+        return SupervisorUtils.launchProcess(commands, environment, logPreFix, 
exitCodeCallback, dir);
     }
 
-    public static int processLauncherAndWait(Map conf, String user, 
List<String> args, final Map<String, String> environment, final String 
logPreFix)
+    public static int processLauncherAndWait(Map<String, Object> conf, String 
user, List<String> args, final Map<String, String> environment, final String 
logPreFix)
             throws IOException {
         int ret = 0;
         Process process = processLauncher(conf, user, null, args, environment, 
logPreFix, null, null);
@@ -91,7 +94,7 @@ public class SupervisorUtils {
         return ret;
     }
 
-    public static void setupStormCodeDir(Map conf, Map stormConf, String dir) 
throws IOException {
+    public static void setupStormCodeDir(Map<String, Object> conf, Map<String, 
Object> stormConf, String dir) throws IOException {
         if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), 
false)) {
             String logPrefix = "setup conf for " + dir;
             List<String> commands = new ArrayList<>();
@@ -101,7 +104,7 @@ public class SupervisorUtils {
         }
     }
 
-    public static void rmrAsUser(Map conf, String id, String path) throws 
IOException {
+    public static void rmrAsUser(Map<String, Object> conf, String id, String 
path) throws IOException {
         String user = Utils.getFileOwner(path);
         String logPreFix = "rmr " + id;
         List<String> commands = new ArrayList<>();
@@ -148,8 +151,8 @@ public class SupervisorUtils {
      * @param stormId
      * @param conf
      */
-    public static void addBlobReferences(Localizer localizer, String stormId, 
Map conf) throws IOException {
-        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+    static void addBlobReferences(Localizer localizer, String stormId, 
Map<String, Object> conf) throws IOException {
+        Map<String, Object> stormConf = 
ConfigUtils.readSupervisorStormConf(conf, stormId);
         Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
         String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
         String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
@@ -159,7 +162,7 @@ public class SupervisorUtils {
         }
     }
 
-    public static Set<String> readDownLoadedStormIds(Map conf) throws 
IOException {
+    public static Set<String> readDownloadedTopologyIds(Map<String, Object> 
conf) throws IOException {
         Set<String> stormIds = new HashSet<>();
         String path = ConfigUtils.supervisorStormDistRoot(conf);
         Collection<String> rets = Utils.readDirContents(path);
@@ -169,12 +172,12 @@ public class SupervisorUtils {
         return stormIds;
     }
 
-    public static Collection<String> supervisorWorkerIds(Map conf) {
+    public static Collection<String> supervisorWorkerIds(Map<String, Object> 
conf) {
         String workerRoot = ConfigUtils.workerRoot(conf);
         return Utils.readDirContents(workerRoot);
     }
 
-    public static boolean doRequiredTopoFilesExist(Map conf, String stormId) 
throws IOException {
+    static boolean doRequiredTopoFilesExist(Map<String, Object> conf, String 
stormId) throws IOException {
         String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
         String stormjarpath = ConfigUtils.supervisorStormJarPath(stormroot);
         String stormcodepath = ConfigUtils.supervisorStormCodePath(stormroot);
@@ -197,11 +200,11 @@ public class SupervisorUtils {
      * @return
      * @throws Exception
      */
-    public static Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map 
conf) throws Exception {
+    public static Map<String, LSWorkerHeartbeat> 
readWorkerHeartbeats(Map<String, Object> conf) throws Exception {
         return _instance.readWorkerHeartbeatsImpl(conf);
     }
 
-    public  Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map conf) 
throws Exception {
+    public Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map<String, 
Object> conf) throws Exception {
         Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>();
 
         Collection<String> workerIds = 
SupervisorUtils.supervisorWorkerIds(conf);
@@ -223,11 +226,11 @@ public class SupervisorUtils {
      * @return
      * @throws IOException
      */
-    public static LSWorkerHeartbeat readWorkerHeartbeat(Map conf, String 
workerId) {
+    private static LSWorkerHeartbeat readWorkerHeartbeat(Map<String, Object> 
conf, String workerId) {
         return _instance.readWorkerHeartbeatImpl(conf, workerId);
     }
 
-    public  LSWorkerHeartbeat readWorkerHeartbeatImpl(Map conf, String 
workerId) {
+    protected LSWorkerHeartbeat readWorkerHeartbeatImpl(Map<String, Object> 
conf, String workerId) {
         try {
             LocalState localState = ConfigUtils.workerState(conf, workerId);
             return localState.getWorkerHeartBeat();
@@ -237,89 +240,72 @@ public class SupervisorUtils {
         }
     }
 
-    public static boolean  isWorkerHbTimedOut(int now, LSWorkerHeartbeat whb, 
Map conf) {
+    public static boolean  isWorkerHbTimedOut(int now, LSWorkerHeartbeat whb, 
Map<String, Object> conf) {
         return _instance.isWorkerHbTimedOutImpl(now, whb, conf);
     }
 
-    public  boolean  isWorkerHbTimedOutImpl(int now, LSWorkerHeartbeat whb, 
Map conf) {
+    private  boolean  isWorkerHbTimedOutImpl(int now, LSWorkerHeartbeat whb, 
Map<String, Object> conf) {
         return (now - whb.get_time_secs()) > 
Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
     }
-
-    public static String javaCmd(String cmd) {
-        return _instance.javaCmdImpl(cmd);
-    }
-
-    public String javaCmdImpl(String cmd) {
-        String ret = null;
-        String javaHome = System.getenv().get("JAVA_HOME");
-        if (StringUtils.isNotBlank(javaHome)) {
-            ret = javaHome + Utils.FILE_PATH_SEPARATOR + "bin" + 
Utils.FILE_PATH_SEPARATOR + cmd;
-        } else {
-            ret = cmd;
+    
+    /**
+     * Launch a new process as per {@link java.lang.ProcessBuilder} with a 
given
+     * callback.
+     * @param command the command to be executed in the new process
+     * @param environment the environment to be applied to the process. Can be
+     *                    null.
+     * @param logPrefix a prefix for log entries from the output of the 
process.
+     *                  Can be null.
+     * @param exitCodeCallback code to be called passing the exit code value
+     *                         when the process completes
+     * @param dir the working directory of the new process
+     * @return the new process
+     * @throws IOException
+     * @see java.lang.ProcessBuilder
+     */
+    public static Process launchProcess(List<String> command,
+                                        Map<String,String> environment,
+                                        final String logPrefix,
+                                        final ExitCodeCallback 
exitCodeCallback,
+                                        File dir)
+            throws IOException {
+        ProcessBuilder builder = new ProcessBuilder(command);
+        Map<String,String> procEnv = builder.environment();
+        if (dir != null) {
+            builder.directory(dir);
         }
-        return ret;
+        builder.redirectErrorStream(true);
+        if (environment != null) {
+            procEnv.putAll(environment);
+        }
+        final Process process = builder.start();
+        if (logPrefix != null || exitCodeCallback != null) {
+            Utils.asyncLoop(new Callable<Object>() {
+                public Object call() {
+                    if (logPrefix != null ) {
+                        Utils.readAndLogStream(logPrefix,
+                                process.getInputStream());
+                    }
+                    if (exitCodeCallback != null) {
+                        try {
+                            process.waitFor();
+                            exitCodeCallback.call(process.exitValue());
+                        } catch (InterruptedException ie) {
+                            LOG.info("{} interrupted", logPrefix);
+                            exitCodeCallback.call(-1);
+                        }
+                    }
+                    return null; // Run only once.
+                }
+            });
+        }
+        return process;
     }
     
-    public static List<ACL> supervisorZkAcls() {
+    static List<ACL> supervisorZkAcls() {
         final List<ACL> acls = new ArrayList<>();
         acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
         acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), 
ZooDefs.Ids.ANYONE_ID_UNSAFE));
         return acls;
     }
-
-    public static void shutdownAllWorkers(Map conf, String supervisorId, 
Map<String, String> workerThreadPids, Set<String> deadWorkers,
-            IWorkerManager workerManager) {
-        Collection<String> workerIds = 
SupervisorUtils.supervisorWorkerIds(conf);
-        try {
-            for (String workerId : workerIds) {
-                workerManager.shutdownWorker(supervisorId, workerId, 
workerThreadPids);
-                boolean success = workerManager.cleanupWorker(workerId);
-                if (success) {
-                    deadWorkers.remove(workerId);
-                }
-            }
-        } catch (Exception e) {
-            LOG.error("shutWorker failed");
-            throw Utils.wrapInRuntime(e);
-        }
-    }
-
-
-    /**
-     * Remove a reference to a blob when its no longer needed.
-     *
-     * @param localizer
-     * @param stormId
-     * @param conf
-     */
-    public static void removeBlobReferences(Localizer localizer, String 
stormId, Map conf) throws Exception {
-        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
-        Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
-        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
-        String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
-        if (blobstoreMap != null) {
-            for (Map.Entry<String, Map<String, Object>> entry : 
blobstoreMap.entrySet()) {
-                String key = entry.getKey();
-                Map<String, Object> blobInfo = entry.getValue();
-                localizer.removeBlobReference(key, user, topoName, 
shouldUncompressBlob(blobInfo));
-            }
-        }
-    }
-
-    public static void rmTopoFiles(Map conf, String stormId, Localizer 
localizer, boolean isrmBlobRefs) throws IOException {
-        String path = ConfigUtils.supervisorStormDistRoot(conf, stormId);
-        try {
-            if (isrmBlobRefs) {
-                removeBlobReferences(localizer, stormId, conf);
-            }
-            if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
-                SupervisorUtils.rmrAsUser(conf, stormId, path);
-            } else {
-                Utils.forceDelete(ConfigUtils.supervisorStormDistRoot(conf, 
stormId));
-            }
-        } catch (Exception e) {
-            LOG.info("Exception removing: {} ", stormId, e);
-        }
-    }
-
 }

Reply via email to