Repository: storm
Updated Branches:
  refs/heads/master b74320497 -> 3de3afc37


http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
new file mode 100644
index 0000000..af454b9
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
@@ -0,0 +1,674 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import clojure.lang.IFn;
+import clojure.lang.RT;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.ProcessSimulator;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill the 
ones that should be dead - read pids, kill -9 and individually remove file - rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of 
the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker id 
5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent extends ShutdownWork implements Runnable {
+
+    private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+    private final LocalState localState;
+
+    private IStormClusterState stormClusterState;
+
+    private SupervisorData supervisorData;
+
+    private class ProcessExitCallback implements Utils.ExitCodeCallable {
+        private final String logPrefix;
+        private final String workerId;
+
+        public ProcessExitCallback(String logPrefix, String workerId) {
+            this.logPrefix = logPrefix;
+            this.workerId = workerId;
+        }
+
+        @Override
+        public Object call() throws Exception {
+            return null;
+        }
+
+        @Override
+        public Object call(int exitCode) {
+            LOG.info("{} exited with code: {}", logPrefix, exitCode);
+            supervisorData.getDeadWorkers().add(workerId);
+            return null;
+        }
+    }
+
+    public SyncProcessEvent(SupervisorData supervisorData) {
+
+        this.supervisorData = supervisorData;
+
+        this.localState = supervisorData.getLocalState();
+
+        this.stormClusterState = supervisorData.getStormClusterState();
+    }
+
+    /**
+     * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file -
+     * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and 
log) 3. of the rest, figure out what assignments aren't yet satisfied 4. 
generate new
+     * worker ids, write new "approved workers" to LS 5. create local dir for 
worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. 
wait
+     * for workers launch
+     */
+    @Override
+    public void run() {
+        LOG.debug("Syncing processes");
+        try {
+            Map conf = supervisorData.getConf();
+            Map<Integer, LocalAssignment> assignedExecutors = 
localState.getLocalAssignmentsMap();
+            if (assignedExecutors == null) {
+                assignedExecutors = new HashMap<>();
+            }
+            int now = Time.currentTimeSecs();
+
+            Map<String, StateHeartbeat> localWorkerStats = 
getLocalWorkerStats(assignedExecutors, now);
+
+            Set<String> keeperWorkerIds = new HashSet<>();
+            Set<Integer> keepPorts = new HashSet<>();
+            for (Map.Entry<String, StateHeartbeat> entry : 
localWorkerStats.entrySet()) {
+                StateHeartbeat stateHeartbeat = entry.getValue();
+                if (stateHeartbeat.getState() == State.valid) {
+                    keeperWorkerIds.add(entry.getKey());
+                    keepPorts.add(stateHeartbeat.getHeartbeat().get_port());
+                }
+            }
+            Map<Integer, LocalAssignment> reassignExecutors = 
getReassignExecutors(assignedExecutors, keepPorts);
+            Map<Integer, String> newWorkerIds = new HashMap<>();
+            for (Integer port : reassignExecutors.keySet()) {
+                newWorkerIds.put(port, Utils.uuid());
+            }
+            LOG.debug("Syncing processes");
+            LOG.debug("Assigned executors: {}", assignedExecutors);
+            LOG.debug("Allocated: {}", localWorkerStats);
+
+            for (Map.Entry<String, StateHeartbeat> entry : 
localWorkerStats.entrySet()) {
+                StateHeartbeat stateHeartbeat = entry.getValue();
+                if (stateHeartbeat.getState() != State.valid) {
+                    LOG.info("Shutting down and clearing state for id {}, 
Current supervisor time: {}, State: {}, Heartbeat: {}", entry.getKey(), now,
+                            stateHeartbeat.getState(), 
stateHeartbeat.getHeartbeat());
+                    shutWorker(supervisorData, entry.getKey());
+                }
+            }
+            // start new workers
+            Map<String, Integer> newWorkerPortToIds = 
startNewWorkers(newWorkerIds, reassignExecutors);
+
+            Map<String, Integer> allWorkerPortToIds = new HashMap<>();
+            Map<String, Integer> approvedWorkers = 
localState.getApprovedWorkers();
+            for (String keeper : keeperWorkerIds) {
+                allWorkerPortToIds.put(keeper, approvedWorkers.get(keeper));
+            }
+            allWorkerPortToIds.putAll(newWorkerPortToIds);
+            localState.setApprovedWorkers(allWorkerPortToIds);
+            waitForWorkersLaunch(conf, newWorkerPortToIds.keySet());
+
+        } catch (Exception e) {
+            LOG.error("Failed Sync Process", e);
+            throw Utils.wrapInRuntime(e);
+        }
+
+    }
+
+    protected void waitForWorkersLaunch(Map conf, Set<String> workerIds) 
throws Exception {
+        int startTime = Time.currentTimeSecs();
+        int timeOut = (int) conf.get(Config.NIMBUS_SUPERVISOR_TIMEOUT_SECS);
+        for (String workerId : workerIds) {
+            LocalState localState = ConfigUtils.workerState(conf, workerId);
+            while (true) {
+                LSWorkerHeartbeat hb = localState.getWorkerHeartBeat();
+                if (hb != null || (Time.currentTimeSecs() - startTime) > 
timeOut)
+                    break;
+                LOG.info("{} still hasn't started", workerId);
+                Time.sleep(500);
+            }
+            if (localState.getWorkerHeartBeat() == null) {
+                LOG.info("Worker {} failed to start", workerId);
+            }
+        }
+    }
+
+    Map<Integer, LocalAssignment> getReassignExecutors(Map<Integer, 
LocalAssignment> assignExecutors, Set<Integer> keepPorts) {
+        Map<Integer, LocalAssignment> reassignExecutors = new HashMap<>();
+        for (Integer port : keepPorts) {
+            if (assignExecutors.containsKey(port)) {
+                reassignExecutors.put(port, assignExecutors.get(port));
+            }
+        }
+        return reassignExecutors;
+    }
+
+    /**
+     * Returns map from worker id to worker heartbeat. if the heartbeat is 
nil, then the worker is dead
+     * 
+     * @param assignedExecutors
+     * @return
+     * @throws Exception
+     */
+    public Map<String, StateHeartbeat> getLocalWorkerStats(Map<Integer, 
LocalAssignment> assignedExecutors, int now) throws Exception {
+        Map<String, StateHeartbeat> workerIdHbstate = new HashMap<>();
+        Map conf = supervisorData.getConf();
+        LocalState localState = supervisorData.getLocalState();
+        Map<String, LSWorkerHeartbeat> idToHeartbeat = 
readWorkerHeartbeats(conf);
+        Map<String, Integer> approvedWorkers = localState.getApprovedWorkers();
+        Set<String> approvedIds = new HashSet<>();
+        if (approvedWorkers != null) {
+            approvedIds.addAll(approvedWorkers.keySet());
+        }
+        for (Map.Entry<String, LSWorkerHeartbeat> entry : 
idToHeartbeat.entrySet()) {
+            String workerId = entry.getKey();
+            LSWorkerHeartbeat whb = entry.getValue();
+            State state;
+            if (whb == null) {
+                state = State.notStarted;
+            } else if (!approvedIds.contains(workerId) || 
!matchesAssignment(whb, assignedExecutors)) {
+                state = State.disallowed;
+            } else if (supervisorData.getDeadWorkers().contains(workerId)) {
+                LOG.info("Worker Process {}as died", workerId);
+                state = State.timedOut;
+            } else if ((now - whb.get_time_secs()) > (Integer) 
(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) {
+                state = State.timedOut;
+            } else {
+                state = State.valid;
+            }
+            LOG.debug("Worker:{} state:{} WorkerHeartbeat:{} at supervisor 
time-secs {}", workerId, state, whb.toString(), now);
+            workerIdHbstate.put(workerId, new StateHeartbeat(state, whb));
+        }
+        return workerIdHbstate;
+    }
+
+    protected boolean matchesAssignment(LSWorkerHeartbeat whb, Map<Integer, 
LocalAssignment> assignedExecutors) {
+        LocalAssignment localAssignment = 
assignedExecutors.get(whb.get_port());
+        if (localAssignment == null || localAssignment.get_topology_id() != 
whb.get_topology_id()) {
+            return false;
+        }
+        List<ExecutorInfo> executorInfos = new ArrayList<>();
+        executorInfos.addAll(whb.get_executors());
+        // remove SYSTEM_EXECUTOR_ID
+        executorInfos.remove(new ExecutorInfo(-1, -1));
+        List<ExecutorInfo> localExecuorInfos = localAssignment.get_executors();
+        if (executorInfos != localExecuorInfos)
+            return false;
+        return true;
+    }
+
+    /**
+     * Returns map from worr id to heartbeat
+     * 
+     * @param conf
+     * @return
+     * @throws Exception
+     */
+    protected Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map conf) 
throws Exception {
+        Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>();
+
+        Collection<String> workerIds = 
SupervisorUtils.supervisorWorkerIds(conf);
+
+        for (String workerId : workerIds) {
+            LSWorkerHeartbeat whb = readWorkerHeartbeat(conf, workerId);
+            // ATTENTION: whb can be null
+            workerHeartbeats.put(workerId, whb);
+        }
+        return workerHeartbeats;
+    }
+
+    /**
+     * get worker heartbeat by workerId
+     * 
+     * @param conf
+     * @param workerId
+     * @return
+     * @throws IOException
+     */
+    protected LSWorkerHeartbeat readWorkerHeartbeat(Map conf, String workerId) 
{
+        try {
+            LocalState localState = ConfigUtils.workerState(conf, workerId);
+            return localState.getWorkerHeartBeat();
+        } catch (Exception e) {
+            LOG.warn("Failed to read local heartbeat for workerId : 
{},Ignoring exception.", workerId, e);
+            return null;
+        }
+    }
+
+    /**
+     * launch a worker in local mode. But it may exist question???
+     */
+    protected void launchLocalWorker(String stormId, Integer port, String 
workerId, WorkerResources resources) throws IOException {
+        // port this function after porting worker to java
+    }
+
+    protected String getWorkerClassPath(String stormJar, Map stormConf) {
+        List<String> topoClasspath = new ArrayList<>();
+        Object object = stormConf.get(Config.TOPOLOGY_CLASSPATH);
+        if (object != null) {
+            topoClasspath.addAll((List<String>) object);
+        }
+        String classPath = Utils.workerClasspath();
+        String classAddPath = Utils.addToClasspath(classPath, 
Arrays.asList(stormJar));
+        return Utils.addToClasspath(classAddPath, topoClasspath);
+    }
+
+    /**
+     * "Generates runtime childopts by replacing keys with topology-id, 
worker-id, port, mem-onheap"
+     * 
+     * @param value
+     * @param workerId
+     * @param stormId
+     * @param port
+     * @param memOnheap
+     */
+    public List<String> substituteChildopts(Object value, String workerId, 
String stormId, Integer port, int memOnheap) {
+        List<String> rets = new ArrayList<>();
+        if (value instanceof String) {
+            String string = (String) value;
+            string.replace("%ID%", String.valueOf(port));
+            string.replace("%WORKER-ID%", workerId);
+            string.replace("%TOPOLOGY-ID%", stormId);
+            string.replace("%WORKER-PORT%", String.valueOf(port));
+            string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
+            String[] strings = string.split("\\s+");
+            rets.addAll(Arrays.asList(strings));
+        } else if (value instanceof List) {
+            List<String> strings = (List<String>) value;
+            for (String str : strings) {
+                str.replace("%ID%", String.valueOf(port));
+                str.replace("%WORKER-ID%", workerId);
+                str.replace("%TOPOLOGY-ID%", stormId);
+                str.replace("%WORKER-PORT%", String.valueOf(port));
+                str.replace("%HEAP-MEM%", String.valueOf(memOnheap));
+                rets.add(str);
+            }
+        }
+        return rets;
+    }
+
+    private String jvmCmd(String cmd) {
+        String ret = null;
+        String javaHome = System.getProperty("JAVA_HOME");
+        if (StringUtils.isNotBlank(javaHome)) {
+            ret = javaHome + Utils.FILE_PATH_SEPARATOR + "bin" + 
Utils.FILE_PATH_SEPARATOR + cmd;
+        } else {
+            ret = cmd;
+        }
+        return ret;
+    }
+
+    /**
+     * launch a worker in distributed mode
+     *
+     * @throws IOException
+     */
+    protected void launchDistributeWorker(String stormId, Integer port, String 
workerId, WorkerResources resources) throws IOException {
+
+        Map conf = supervisorData.getConf();
+        Boolean runWorkerAsUser = 
Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
+        String stormHome = System.getProperty("storm.home");
+        String stormOptions = System.getProperty("storm.options");
+        String stormConfFile = System.getProperty("storm.conf.file");
+        String stormLogDir = ConfigUtils.getLogDir();
+        String stormLogConfDir = (String) 
(conf.get(Config.STORM_LOG4J2_CONF_DIR));
+
+        String stormLog4j2ConfDir;
+        if (StringUtils.isNotBlank(stormLogConfDir)) {
+            if (Utils.isAbsolutePath(stormLogConfDir)) {
+                stormLog4j2ConfDir = stormLogConfDir;
+            } else {
+                stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + 
stormLogConfDir;
+            }
+        } else {
+            stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + 
"log4j2";
+        }
+
+        String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+
+        String jlp = jlp(stormRoot, conf);
+
+        String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot);
+
+        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+
+        String workerClassPath = getWorkerClassPath(stormJar, stormConf);
+
+        Object topGcOptsObject = 
stormConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS);
+        List<String> topGcOpts = new ArrayList<>();
+        if (topGcOptsObject instanceof String) {
+            topGcOpts.add((String) topGcOptsObject);
+        } else if (topGcOptsObject instanceof List) {
+            topGcOpts.addAll((List<String>) topGcOptsObject);
+        }
+
+        int memOnheap = 0;
+        if (resources.get_mem_on_heap() > 0) {
+            memOnheap = (int) Math.ceil(resources.get_mem_on_heap());
+        } else {
+            memOnheap = 
Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB));
+        }
+
+        int memoffheap = (int) Math.ceil(resources.get_mem_off_heap());
+
+        int cpu = (int) Math.ceil(resources.get_cpu());
+
+        List<String> gcOpts = null;
+
+        if (topGcOpts != null) {
+            gcOpts = substituteChildopts(topGcOpts, workerId, stormId, port, 
memOnheap);
+        } else {
+            gcOpts = substituteChildopts(conf.get(Config.WORKER_GC_CHILDOPTS), 
workerId, stormId, port, memOnheap);
+        }
+
+        Object topoWorkerLogwriterObject = 
stormConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS);
+        List<String> topoWorkerLogwriterChildopts = new ArrayList<>();
+        if (topoWorkerLogwriterObject instanceof String) {
+            topoWorkerLogwriterChildopts.add((String) 
topoWorkerLogwriterObject);
+        } else if (topoWorkerLogwriterObject instanceof List) {
+            topoWorkerLogwriterChildopts.addAll((List<String>) 
topoWorkerLogwriterObject);
+        }
+
+        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+
+        String logfileName = "worker.log";
+
+        String workersArtifacets = ConfigUtils.workerArtifactsRoot(conf);
+
+        String loggingSensitivity = (String) 
stormConf.get(Config.TOPOLOGY_LOGGING_SENSITIVITY);
+        if (loggingSensitivity == null) {
+            loggingSensitivity = "S3";
+        }
+
+        List<String> workerChildopts = 
substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), workerId, stormId, port, 
memOnheap);
+
+        List<String> topWorkerChildopts = 
substituteChildopts(stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), workerId, 
stormId, port, memOnheap);
+
+        List<String> workerProfilerChildopts = null;
+        if (Utils.getBoolean(conf.get(Config.WORKER_PROFILER_ENABLED), false)) 
{
+            workerProfilerChildopts = 
substituteChildopts(conf.get(Config.WORKER_PROFILER_CHILDOPTS), workerId, 
stormId, port, memOnheap);
+        }
+
+        Map<String, String> environment = new HashMap<String, String>();
+        Map<String, String> topEnvironment = (Map<String, String>) 
stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
+        if (topEnvironment != null) {
+            environment.putAll(topEnvironment);
+            environment.put("LD_LIBRARY_PATH", jlp);
+        } else {
+            environment.put("LD_LIBRARY_PATH", jlp);
+        }
+
+        String log4jConfigurationFile = null;
+        if (System.getProperty("os.name").startsWith("Windows") && 
!stormLog4j2ConfDir.startsWith("file:")) {
+            log4jConfigurationFile = "file:///" + stormLog4j2ConfDir;
+        } else {
+            log4jConfigurationFile = stormLog4j2ConfDir;
+        }
+        log4jConfigurationFile = log4jConfigurationFile + 
Utils.FILE_PATH_SEPARATOR + "worker.xml";
+
+        StringBuilder commandSB = new StringBuilder();
+
+        List<String> commandList = new ArrayList<>();
+        commandList.add(jvmCmd("java"));
+        commandList.add("-cp");
+        commandList.add(workerClassPath);
+        commandList.addAll(topoWorkerLogwriterChildopts);
+        commandList.add("-Dlogfile.name=" + logfileName);
+        commandList.add("-Dstorm.home=" + stormHome);
+        commandList.add("-Dworkers.artifacts=" + workersArtifacets);
+        commandList.add("-Dstorm.id=" + stormId);
+        commandList.add("-Dworker.id=" + workerId);
+        commandList.add("-Dworker.port=" + port);
+        commandList.add("-Dstorm.log.dir=" + stormLogDir);
+        commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile);
+        
commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
+        commandList.add("org.apache.storm.LogWriter");
+
+        commandList.add(jvmCmd("java"));
+        commandList.add("-server");
+        commandList.addAll(workerChildopts);
+        commandList.addAll(topWorkerChildopts);
+        commandList.addAll(gcOpts);
+        commandList.addAll(workerProfilerChildopts);
+        commandList.add("-Djava.library.path=" + jlp);
+        commandList.add("-Dlogfile.name=" + logfileName);
+        commandList.add("-Dstorm.home=" + stormHome);
+        commandList.add("-Dworkers.artifacts=" + workersArtifacets);
+        commandList.add("-Dstorm.conf.file=" + stormConfFile);
+        commandList.add("-Dstorm.options=" + stormOptions);
+        commandList.add("-Dstorm.log.dir=" + stormLogDir);
+        commandList.add("-Dlogging.sensitivity=" + loggingSensitivity);
+        commandList.add(" -Dlog4j.configurationFile=" + 
log4jConfigurationFile);
+        
commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector");
+        commandList.add("-Dstorm.id=" + stormId);
+        commandList.add("-Dworker.id=" + workerId);
+        commandList.add("-Dworker.port=" + port);
+        commandList.add("-cp");
+        commandList.add(workerClassPath);
+        commandList.add("org.apache.storm.daemon.worker");
+        commandList.add(stormId);
+        commandList.add(supervisorData.getAssignmentId());
+        commandList.add(String.valueOf(port));
+        commandList.add(workerId);
+
+        // {"cpu" cpu "memory" (+ mem-onheap mem-offheap (int (Math/ceil (conf 
STORM-CGROUP-MEMORY-LIMIT-TOLERANCE-MARGIN-MB))))
+        if 
(Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), 
false)) {
+            int cgRoupMem = (int) (Math.ceil((double) 
conf.get(Config.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB)));
+            int memoryValue = memoffheap + memOnheap + cgRoupMem;
+            int cpuValue = cpu;
+            Map<String, Number> map = new HashMap<>();
+            map.put("cpu", cpuValue);
+            map.put("memory", memoryValue);
+            
supervisorData.getResourceIsolationManager().reserveResourcesForWorker(workerId,
 map);
+            commandList = 
supervisorData.getResourceIsolationManager().getLaunchCommand(workerId, 
commandList);
+        }
+
+        LOG.info("Launching worker with command: ", 
Utils.shellCmd(commandList));
+        writeLogMetadata(stormConf, user, workerId, stormId, port, conf);
+        ConfigUtils.setWorkerUserWSE(conf, workerId, user);
+        createArtifactsLink(conf, stormId, port, workerId);
+
+        String logPrefix = "Worker Process " + workerId;
+        String workerDir = ConfigUtils.workerRoot(conf, workerId);
+        supervisorData.getDeadWorkers().remove(workerId);
+        createBlobstoreLinks(conf, stormId, workerId);
+
+        ProcessExitCallback processExitCallback = new 
ProcessExitCallback(logPrefix, workerId);
+        if (runWorkerAsUser) {
+            List<String> stringList = new ArrayList<>();
+            stringList.add("worker");
+            stringList.add(workerDir);
+            stringList.add(Utils.writeScript(workerDir, commandList, 
topEnvironment));
+            SupervisorUtils.workerLauncher(conf, user, stringList, null, 
logPrefix, processExitCallback, new File(workerDir));
+        } else {
+            Utils.launchProcess(commandList, topEnvironment, logPrefix, 
processExitCallback, new File(workerDir));
+        }
+    }
+
+    protected String jlp(String stormRoot, Map conf) {
+        String resourceRoot = stormRoot + Utils.FILE_PATH_SEPARATOR + 
ConfigUtils.RESOURCES_SUBDIR;
+        String os = System.getProperty("os.name").replaceAll("\\s+", "_");
+        String arch = System.getProperty("os.arch");
+        String archResourceRoot = resourceRoot + Utils.FILE_PATH_SEPARATOR + 
os + "-" + arch;
+        String ret = archResourceRoot + Utils.FILE_PATH_SEPARATOR + 
resourceRoot + Utils.FILE_PATH_SEPARATOR + conf.get(Config.JAVA_LIBRARY_PATH);
+        return ret;
+    }
+
+    protected Map<String, Integer> startNewWorkers(Map<Integer, String> 
newWorkerIds, Map<Integer, LocalAssignment> reassignExecutors) throws 
IOException {
+
+        Map<String, Integer> newValidWorkerIds = new HashMap<>();
+        Map conf = supervisorData.getConf();
+        String clusterMode = ConfigUtils.clusterMode(conf);
+
+        for (Map.Entry<Integer, LocalAssignment> entry : 
reassignExecutors.entrySet()) {
+            Integer port = entry.getKey();
+            LocalAssignment assignment = entry.getValue();
+            String workerId = newWorkerIds.get(port);
+            String stormId = assignment.get_topology_id();
+            WorkerResources resources = assignment.get_resources();
+
+            // This condition checks for required files exist before launching 
the worker
+            if (SupervisorUtils.checkTopoFilesExist(conf, stormId)) {
+                String pidsPath = ConfigUtils.workerPidsRoot(conf, workerId);
+                String hbPath = ConfigUtils.workerHeartbeatsRoot(conf, 
workerId);
+
+                FileUtils.forceMkdir(new File(pidsPath));
+                FileUtils.forceMkdir(new File(hbPath));
+
+                if (clusterMode.endsWith("distributed")) {
+                    launchDistributeWorker(stormId, port, workerId, resources);
+                } else if (clusterMode.endsWith("local")) {
+                    launchLocalWorker(stormId, port, workerId, resources);
+                }
+                newValidWorkerIds.put(workerId, port);
+                LOG.info("Launching worker with assignment {} for this 
supervisor {} on port {} with id {}", assignment, 
supervisorData.getSupervisorId(), port,
+                        workerId);
+            } else {
+                LOG.info("Missing topology storm code, so can't launch worker 
with assignment {} for this supervisor {} on port {} with id {}", assignment,
+                        supervisorData.getSupervisorId(), port, workerId);
+            }
+
+        }
+        return newValidWorkerIds;
+    }
+
+    protected void writeLogMetadata(Map stormconf, String user, String 
workerId, String stormId, int port, Map conf) throws IOException {
+        Map data = new HashMap();
+        data.put(Config.TOPOLOGY_SUBMITTER_USER, user);
+        data.put("worker-id", workerId);
+
+        Set<String> logsGroups = new HashSet<>();
+        if (stormconf.get(Config.LOGS_GROUPS) != null) {
+            logsGroups.addAll((List<String>) 
stormconf.get(Config.LOGS_GROUPS));
+        }
+        if (stormconf.get(Config.TOPOLOGY_GROUPS) != null) {
+            logsGroups.addAll((List<String>) 
stormconf.get(Config.TOPOLOGY_GROUPS));
+        }
+        data.put(Config.LOGS_GROUPS, logsGroups.toArray());
+
+        Set<String> logsUsers = new HashSet<>();
+        if (stormconf.get(Config.LOGS_USERS) != null) {
+            logsUsers.addAll((List<String>) stormconf.get(Config.LOGS_USERS));
+        }
+        if (stormconf.get(Config.TOPOLOGY_USERS) != null) {
+            logsUsers.addAll((List<String>) 
stormconf.get(Config.TOPOLOGY_USERS));
+        }
+        data.put(Config.LOGS_USERS, logsUsers.toArray());
+        writeLogMetadataToYamlFile(stormId, port, data, conf);
+    }
+
+    /**
+     * run worker as user needs the directory to have special permissions or 
it is insecure
+     * 
+     * @param stormId
+     * @param port
+     * @param data
+     * @param conf
+     * @throws IOException
+     */
+    protected void writeLogMetadataToYamlFile(String stormId, int port, Map 
data, Map conf) throws IOException {
+        File file = ConfigUtils.getLogMetaDataFile(conf, stormId, port);
+        if (!Utils.checkFileExists(file.getParent())) {
+            if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+                FileUtils.forceMkdir(file.getParentFile());
+                SupervisorUtils.setupStormCodeDir(conf, 
ConfigUtils.readSupervisorStormConf(conf, stormId), 
file.getParentFile().getCanonicalPath());
+            } else {
+                file.getParentFile().mkdir();
+            }
+        }
+        FileWriter writer = new FileWriter(file);
+        Yaml yaml = new Yaml();
+        yaml.dump(data, writer);
+    }
+
+    /**
+     * Create a symlink from workder directory to its port artifacts directory
+     * 
+     * @param conf
+     * @param stormId
+     * @param port
+     * @param workerId
+     */
+    protected void createArtifactsLink(Map conf, String stormId, int port, 
String workerId) throws IOException {
+        String workerDir = ConfigUtils.workerRoot(conf, workerId);
+        String topoDir = ConfigUtils.workerArtifactsRoot(conf, stormId);
+        if (Utils.checkFileExists(workerDir)) {
+            Utils.createSymlink(workerDir, topoDir, "artifacts", 
String.valueOf(port));
+        }
+    }
+
+    /**
+     * Create symlinks in worker launch directory for all blobs
+     * 
+     * @param conf
+     * @param stormId
+     * @param workerId
+     * @throws IOException
+     */
+    protected void createBlobstoreLinks(Map conf, String stormId, String 
workerId) throws IOException {
+        String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+        String workerRoot = ConfigUtils.workerRoot(conf, workerId);
+        Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+        List<String> blobFileNames = new ArrayList<>();
+        if (blobstoreMap != null) {
+            for (Map.Entry<String, Map<String, Object>> entry : 
blobstoreMap.entrySet()) {
+                String key = entry.getKey();
+                Map<String, Object> blobInfo = entry.getValue();
+                String ret = null;
+                if (blobInfo != null && blobInfo.containsKey("localname")) {
+                    ret = (String) blobInfo.get("localname");
+                } else {
+                    ret = key;
+                }
+                blobFileNames.add(ret);
+            }
+        }
+        List<String> resourceFileNames = new ArrayList<>();
+        resourceFileNames.add(ConfigUtils.RESOURCES_SUBDIR);
+        resourceFileNames.addAll(blobFileNames);
+        LOG.info("Creating symlinks for worker-id: {} storm-id: {} for 
files({}): {}", workerId, stormId, resourceFileNames.size(), resourceFileNames);
+        Utils.createSymlink(workerRoot, stormRoot, 
ConfigUtils.RESOURCES_SUBDIR);
+        for (String fileName : blobFileNames) {
+            Utils.createSymlink(workerRoot, stormRoot, fileName, fileName);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
new file mode 100644
index 0000000..d6dc45e
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
@@ -0,0 +1,592 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.cluster.IStateStorage;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.generated.*;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.LocalizedResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.*;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SyncSupervisorEvent implements Runnable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SyncSupervisorEvent.class);
+
+    private EventManager syncSupEventManager;
+    private EventManager syncProcessManager;
+
+    private IStormClusterState stormClusterState;
+
+    private LocalState localState;
+
+    private SyncProcessEvent syncProcesses;
+    private SupervisorData supervisorData;
+
+    public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent 
syncProcesses, EventManager syncSupEventManager,
+            EventManager syncProcessManager) {
+
+        this.syncProcesses = syncProcesses;
+        this.syncSupEventManager = syncSupEventManager;
+        this.syncProcessManager = syncProcessManager;
+        this.stormClusterState = supervisorData.getStormClusterState();
+        this.localState = supervisorData.getLocalState();
+        this.supervisorData = supervisorData;
+    }
+
+    @Override
+    public void run() {
+        try {
+            Map conf = supervisorData.getConf();
+            Runnable syncCallback = new EventManagerPushCallback(this, 
syncSupEventManager);
+            List<String> stormIds = 
stormClusterState.assignments(syncCallback);
+            Map<String, Map<String, Object>> assignmentsSnapshot =
+                    getAssignmentsSnapshot(stormClusterState, stormIds, 
supervisorData.getAssignmentVersions(), syncCallback);
+            Map<String, List<ProfileRequest>> stormIdToProfilerActions = 
getProfileActions(stormClusterState, stormIds);
+
+            Set<String> allDownloadedTopologyIds = 
SupervisorUtils.readDownLoadedStormIds(conf);
+            Map<String, String> stormcodeMap = 
readStormCodeLocations(assignmentsSnapshot);
+            Map<Integer, LocalAssignment> existingAssignment = 
localState.getLocalAssignmentsMap();
+            if (existingAssignment == null){
+                existingAssignment = new HashMap<>();
+            }
+
+            Map<Integer, LocalAssignment> allAssignment =
+                    readAssignments(assignmentsSnapshot, existingAssignment, 
supervisorData.getAssignmentId(), supervisorData.getSyncRetry());
+
+            Map<Integer, LocalAssignment> newAssignment = new HashMap<>();
+            Set<String> assignedStormIds = new HashSet<>();
+
+            for (Map.Entry<Integer, LocalAssignment> entry : 
allAssignment.entrySet()) {
+                if 
(supervisorData.getiSupervisor().confirmAssigned(entry.getKey())) {
+                    newAssignment.put(entry.getKey(), entry.getValue());
+                    assignedStormIds.add(entry.getValue().get_topology_id());
+                }
+            }
+            Set<String> srashStormIds = verifyDownloadedFiles(conf, 
supervisorData.getLocalizer(), assignedStormIds, allDownloadedTopologyIds);
+            Set<String> downloadedStormIds = new HashSet<>();
+            downloadedStormIds.addAll(allDownloadedTopologyIds);
+            downloadedStormIds.removeAll(srashStormIds);
+
+            LOG.debug("Synchronizing supervisor");
+            LOG.debug("Storm code map: {}", stormcodeMap);
+            LOG.debug("All assignment: {}", allAssignment);
+            LOG.debug("New assignment: {}", newAssignment);
+            LOG.debug("Assigned Storm Ids {}", assignedStormIds);
+            LOG.debug("All Downloaded Ids {}", allDownloadedTopologyIds);
+            LOG.debug("Checked Downloaded Ids {}", srashStormIds);
+            LOG.debug("Downloaded Ids {}", downloadedStormIds);
+            LOG.debug("Storm Ids Profiler Actions {}", 
stormIdToProfilerActions);
+            // download code first
+            // This might take awhile
+            // - should this be done separately from usual monitoring?
+            // should we only download when topology is assigned to this 
supervisor?
+            for (Map.Entry<String, String> entry : stormcodeMap.entrySet()) {
+                String stormId = entry.getKey();
+                if (!downloadedStormIds.contains(stormId) && 
assignedStormIds.contains(stormId)) {
+                    LOG.info("Downloading code for storm id {}.", stormId);
+                    try {
+                        downloadStormCode(conf, stormId, entry.getValue(), 
supervisorData.getLocalizer());
+                    } catch (Exception e) {
+                        if 
(Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) {
+                            LOG.warn("Nimbus leader was not available.", e);
+                        } else if 
(Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) {
+                            LOG.warn("There was a connection problem with 
nimbus.", e);
+                        } else {
+                            throw e;
+                        }
+                    }
+                    LOG.info("Finished downloading code for storm id {}", 
stormId);
+                }
+            }
+
+            LOG.debug("Writing new assignment {}", newAssignment);
+
+            Set<Integer> killWorkers = new HashSet<>();
+            killWorkers.addAll(existingAssignment.keySet());
+            killWorkers.removeAll(newAssignment.keySet());
+            for (Integer port : killWorkers) {
+                supervisorData.getiSupervisor().killedWorker(port);
+            }
+
+            supervisorData.getiSupervisor().assigned(newAssignment.keySet());
+            localState.setLocalAssignmentsMap(newAssignment);
+            supervisorData.setAssignmentVersions(assignmentsSnapshot);
+            
supervisorData.setStormIdToProfileActions(stormIdToProfilerActions);
+
+            Map<Long, LocalAssignment> convertNewAssignment = new HashMap<>();
+            for (Map.Entry<Integer, LocalAssignment> entry : 
newAssignment.entrySet()) {
+                convertNewAssignment.put(entry.getKey().longValue(), 
entry.getValue());
+            }
+            supervisorData.setCurrAssignment(convertNewAssignment);
+            // remove any downloaded code that's no longer assigned or active
+            // important that this happens after setting the local assignment 
so that
+            // synchronize-supervisor doesn't try to launch workers for which 
the
+            // resources don't exist
+            if (Utils.isOnWindows()) {
+                shutdownDisallowedWorkers();
+            }
+            for (String stormId : allDownloadedTopologyIds) {
+                if (!stormcodeMap.containsKey(stormId)) {
+                    LOG.info("Removing code for storm id {}.", stormId);
+                    rmTopoFiles(conf, stormId, supervisorData.getLocalizer(), 
true);
+                }
+            }
+            syncProcessManager.add(syncProcesses);
+        } catch (Exception e) {
+            LOG.error("Failed to Sync Supervisor", e);
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    protected Map<String, Map<String, Object>> 
getAssignmentsSnapshot(IStormClusterState stormClusterState, List<String> 
stormIds,
+            Map<String, Map<String, Object>> localAssignmentVersion, Runnable 
callback) throws Exception {
+        Map<String, Map<String, Object>> updateAssignmentVersion = new 
HashMap<>();
+        for (String stormId : stormIds) {
+            Integer recordedVersion = -1;
+            Integer version = stormClusterState.assignmentVersion(stormId, 
callback);
+            if (localAssignmentVersion.containsKey(stormId) && 
localAssignmentVersion.get(stormId) != null) {
+                recordedVersion = (Integer) 
localAssignmentVersion.get(stormId).get(IStateStorage.VERSION);
+            }
+            if (version == null) {
+                // ignore
+            } else if (version == recordedVersion) {
+                updateAssignmentVersion.put(stormId, 
localAssignmentVersion.get(stormId));
+            } else {
+                Map<String, Object> assignmentVersion = (Map<String, Object>) 
stormClusterState.assignmentInfoWithVersion(stormId, callback);
+                updateAssignmentVersion.put(stormId, assignmentVersion);
+            }
+        }
+        return updateAssignmentVersion;
+    }
+
+    protected Map<String, List<ProfileRequest>> 
getProfileActions(IStormClusterState stormClusterState, List<String> stormIds) 
throws Exception {
+        Map<String, List<ProfileRequest>> ret = new HashMap<String, 
List<ProfileRequest>>();
+        for (String stormId : stormIds) {
+            List<ProfileRequest> profileRequests = 
stormClusterState.getTopologyProfileRequests(stormId);
+            ret.put(stormId, profileRequests);
+        }
+        return ret;
+    }
+
+    protected Map<String, String> readStormCodeLocations(Map<String, 
Map<String, Object>> assignmentsSnapshot) {
+        Map<String, String> stormcodeMap = new HashMap<>();
+        for (Map.Entry<String, Map<String, Object>> entry : 
assignmentsSnapshot.entrySet()) {
+            Assignment assignment = (Assignment) 
(entry.getValue().get(IStateStorage.DATA));
+            if (assignment != null) {
+                stormcodeMap.put(entry.getKey(), 
assignment.get_master_code_dir());
+            }
+        }
+        return stormcodeMap;
+    }
+
+    /**
+     * Remove a reference to a blob when its no longer needed.
+     * 
+     * @param localizer
+     * @param stormId
+     * @param conf
+     */
+    protected void removeBlobReferences(Localizer localizer, String stormId, 
Map conf) throws Exception {
+        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+        Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+        String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
+        if (blobstoreMap != null) {
+            for (Map.Entry<String, Map<String, Object>> entry : 
blobstoreMap.entrySet()) {
+                String key = entry.getKey();
+                Map<String, Object> blobInfo = entry.getValue();
+                localizer.removeBlobReference(key, user, topoName, 
SupervisorUtils.isShouldUncompressBlob(blobInfo));
+            }
+        }
+    }
+
+    protected void rmTopoFiles(Map conf, String stormId, Localizer localizer, 
boolean isrmBlobRefs) throws IOException {
+        String path = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+        try {
+            if (isrmBlobRefs) {
+                removeBlobReferences(localizer, stormId, conf);
+            }
+            if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+                SupervisorUtils.rmrAsUser(conf, stormId, path);
+            } else {
+                Utils.forceDelete(ConfigUtils.supervisorStormDistRoot(conf, 
stormId));
+            }
+        } catch (Exception e) {
+            LOG.info("Exception removing: {} ", stormId, e);
+        }
+    }
+
+    /**
+     * Check for the files exists to avoid supervisor crashing Also makes sure 
there is no necessity for locking"
+     * 
+     * @param conf
+     * @param localizer
+     * @param assignedStormIds
+     * @param allDownloadedTopologyIds
+     * @return
+     */
+    protected Set<String> verifyDownloadedFiles(Map conf, Localizer localizer, 
Set<String> assignedStormIds, Set<String> allDownloadedTopologyIds)
+            throws IOException {
+        Set<String> srashStormIds = new HashSet<>();
+        for (String stormId : allDownloadedTopologyIds) {
+            if (assignedStormIds.contains(stormId)) {
+                if (!SupervisorUtils.checkTopoFilesExist(conf, stormId)) {
+                    LOG.debug("Files not present in topology directory");
+                    rmTopoFiles(conf, stormId, localizer, false);
+                    srashStormIds.add(stormId);
+                }
+            }
+        }
+        return srashStormIds;
+    }
+
+    /**
+     * download code ; two cluster mode: local and distributed
+     *
+     * @param conf
+     * @param stormId
+     * @param masterCodeDir
+     * @throws IOException
+     */
+    private void downloadStormCode(Map conf, String stormId, String 
masterCodeDir, Localizer localizer) throws Exception {
+        String clusterMode = ConfigUtils.clusterMode(conf);
+
+        if (clusterMode.endsWith("distributed")) {
+            downloadDistributeStormCode(conf, stormId, masterCodeDir, 
localizer);
+        } else if (clusterMode.endsWith("local")) {
+            downloadLocalStormCode(conf, stormId, masterCodeDir, localizer);
+        }
+    }
+
+    private void downloadLocalStormCode(Map conf, String stormId, String 
masterCodeDir, Localizer localizer) throws Exception {
+
+        String tmproot = ConfigUtils.supervisorTmpDir(conf) + 
Utils.FILE_PATH_SEPARATOR + Utils.uuid();
+        String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+        BlobStore blobStore = Utils.getNimbusBlobStore(conf, masterCodeDir, 
null);
+        try {
+            FileUtils.forceMkdir(new File(tmproot));
+            String stormCodeKey = ConfigUtils.masterStormCodeKey(stormId);
+            String stormConfKey = ConfigUtils.masterStormConfKey(stormId);
+            String codePath = ConfigUtils.supervisorStormCodePath(tmproot);
+            String confPath = ConfigUtils.supervisorStormConfPath(tmproot);
+            blobStore.readBlobTo(stormCodeKey, new FileOutputStream(codePath), 
null);
+            blobStore.readBlobTo(stormConfKey, new FileOutputStream(confPath), 
null);
+        } finally {
+            blobStore.shutdown();
+        }
+
+        FileUtils.moveDirectory(new File(tmproot), new File(stormroot));
+        SupervisorUtils.setupStormCodeDir(conf, 
ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot);
+        ClassLoader classloader = 
Thread.currentThread().getContextClassLoader();
+
+        String resourcesJar = resourcesJar();
+
+        URL url = classloader.getResource(ConfigUtils.RESOURCES_SUBDIR);
+
+        String targetDir = stormroot + Utils.FILE_PATH_SEPARATOR + 
ConfigUtils.RESOURCES_SUBDIR;
+
+        if (resourcesJar != null) {
+            LOG.info("Extracting resources from jar at {} to {}", 
resourcesJar, targetDir);
+            Utils.extractDirFromJar(resourcesJar, 
ConfigUtils.RESOURCES_SUBDIR, stormroot);
+        } else if (url != null) {
+
+            LOG.info("Copying resources at {} to {} ", url.toString(), 
targetDir);
+            if (url.getProtocol() == "jar") {
+                JarURLConnection urlConnection = (JarURLConnection) 
url.openConnection();
+                
Utils.extractDirFromJar(urlConnection.getJarFileURL().getFile(), 
ConfigUtils.RESOURCES_SUBDIR, stormroot);
+            } else {
+                FileUtils.copyDirectory(new File(url.getFile()), (new 
File(targetDir)));
+            }
+        }
+    }
+
+    /**
+     * Downloading to permanent location is atomic
+     * 
+     * @param conf
+     * @param stormId
+     * @param masterCodeDir
+     * @param localizer
+     * @throws Exception
+     */
+    private void downloadDistributeStormCode(Map conf, String stormId, String 
masterCodeDir, Localizer localizer) throws Exception {
+
+        String tmproot = ConfigUtils.supervisorTmpDir(conf) + 
Utils.FILE_PATH_SEPARATOR + Utils.uuid();
+        String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
+        ClientBlobStore blobStore = 
Utils.getClientBlobStoreForSupervisor(conf);
+
+        if (Utils.isOnWindows()) {
+            if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+                throw new RuntimeException("ERROR: Windows doesn't implement 
setting the correct permissions");
+            }
+        } else {
+            Utils.restrictPermissions(tmproot);
+        }
+        FileUtils.forceMkdir(new File(tmproot));
+        String stormJarKey = ConfigUtils.masterStormJarKey(stormId);
+        String stormCodeKey = ConfigUtils.masterStormCodeKey(stormId);
+        String stormConfKey = ConfigUtils.masterStormConfKey(stormId);
+        String jarPath = ConfigUtils.supervisorStormJarPath(tmproot);
+        String codePath = ConfigUtils.supervisorStormCodePath(tmproot);
+        String confPath = ConfigUtils.supervisorStormConfPath(tmproot);
+        Utils.downloadResourcesAsSupervisor(stormJarKey, jarPath, blobStore);
+        Utils.downloadResourcesAsSupervisor(stormCodeKey, codePath, blobStore);
+        Utils.downloadResourcesAsSupervisor(stormConfKey, confPath, blobStore);
+        blobStore.shutdown();
+        Utils.extractDirFromJar(jarPath, ConfigUtils.RESOURCES_SUBDIR, 
tmproot);
+        downloadBlobsForTopology(conf, confPath, localizer, tmproot);
+        if (IsDownloadBlobsForTopologySucceed(confPath, tmproot)) {
+            LOG.info("Successfully downloaded blob resources for storm-id {}", 
stormId);
+            FileUtils.forceMkdir(new File(stormroot));
+            Files.move(new File(tmproot).toPath(), new 
File(stormroot).toPath(), StandardCopyOption.ATOMIC_MOVE);
+            SupervisorUtils.setupStormCodeDir(conf, 
ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot);
+        } else {
+            LOG.info("Failed to download blob resources for storm-id ", 
stormId);
+            Utils.forceDelete(tmproot);
+        }
+    }
+
+    /**
+     * Assert if all blobs are downloaded for the given topology
+     * 
+     * @param stormconfPath
+     * @param targetDir
+     * @return
+     */
+    protected boolean IsDownloadBlobsForTopologySucceed(String stormconfPath, 
String targetDir) throws IOException {
+        Map stormConf = 
Utils.fromCompressedJsonConf(FileUtils.readFileToByteArray(new 
File(stormconfPath)));
+        Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+        List<String> blobFileNames = new ArrayList<>();
+        if (blobstoreMap != null) {
+            for (Map.Entry<String, Map<String, Object>> entry : 
blobstoreMap.entrySet()) {
+                String key = entry.getKey();
+                Map<String, Object> blobInfo = entry.getValue();
+                String ret = null;
+                if (blobInfo != null && blobInfo.containsKey("localname")) {
+                    ret = (String) blobInfo.get("localname");
+                } else {
+                    ret = key;
+                }
+                blobFileNames.add(ret);
+            }
+        }
+        for (String string : blobFileNames) {
+            if (!Utils.checkFileExists(string))
+                return false;
+        }
+        return true;
+    }
+
+    /**
+     * Download all blobs listed in the topology configuration for a given 
topology.
+     * 
+     * @param conf
+     * @param stormconfPath
+     * @param localizer
+     * @param tmpRoot
+     */
+    protected void downloadBlobsForTopology(Map conf, String stormconfPath, 
Localizer localizer, String tmpRoot) throws IOException {
+        Map stormConf = ConfigUtils.readSupervisorStormConfGivenPath(conf, 
stormconfPath);
+        Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+        String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
+        File userDir = localizer.getLocalUserFileCacheDir(user);
+        List<LocalResource> localResourceList = 
SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
+        if (localResourceList.size() > 0) {
+            if (!userDir.exists()) {
+                FileUtils.forceMkdir(userDir);
+            }
+            try {
+                List<LocalizedResource> localizedResources = 
localizer.getBlobs(localResourceList, user, topoName, userDir);
+                setupBlobPermission(conf, user, userDir.toString());
+                for (LocalizedResource localizedResource : localizedResources) 
{
+                    File rsrcFilePath = new 
File(localizedResource.getFilePath());
+                    String keyName = rsrcFilePath.getName();
+                    String blobSymlinkTargetName = new 
File(localizedResource.getCurrentSymlinkPath()).getName();
+
+                    String symlinkName = null;
+                    if (blobstoreMap != null) {
+                        Map<String, Object> blobInfo = 
blobstoreMap.get(keyName);
+                        if (blobInfo != null && 
blobInfo.containsKey("localname")) {
+                            symlinkName = (String) blobInfo.get("localname");
+                        } else {
+                            symlinkName = keyName;
+                        }
+                    }
+                    Utils.createSymlink(tmpRoot, rsrcFilePath.getParent(), 
symlinkName, blobSymlinkTargetName);
+                }
+            } catch (AuthorizationException authExp) {
+                LOG.error("AuthorizationException error {}", authExp);
+            } catch (KeyNotFoundException knf) {
+                LOG.error("KeyNotFoundException error {}", knf);
+            }
+        }
+    }
+
+    protected void setupBlobPermission(Map conf, String user, String path) 
throws IOException {
+        if (Utils.getBoolean(Config.SUPERVISOR_RUN_WORKER_AS_USER, false)) {
+            String logPrefix = "setup blob permissions for " + path;
+            SupervisorUtils.workerLauncherAndWait(conf, user, 
Arrays.asList("blob", path), null, logPrefix);
+        }
+
+    }
+
+    private String resourcesJar() throws IOException {
+
+        String path = Utils.currentClasspath();
+        if (path == null) {
+            return null;
+        }
+        String[] paths = path.split(File.pathSeparator);
+        List<String> jarPaths = new ArrayList<String>();
+        for (String s : paths) {
+            if (s.endsWith(".jar")) {
+                jarPaths.add(s);
+            }
+        }
+
+        List<String> rtn = new ArrayList<String>();
+        int size = jarPaths.size();
+        for (int i = 0; i < size; i++) {
+            if (Utils.zipDoesContainDir(jarPaths.get(i), 
ConfigUtils.RESOURCES_SUBDIR)) {
+                rtn.add(jarPaths.get(i));
+            }
+        }
+        if (rtn.size() == 0)
+            return null;
+
+        return rtn.get(0);
+    }
+
+    protected Map<Integer, LocalAssignment> readAssignments(Map<String, 
Map<String, Object>> assignmentsSnapshot,
+            Map<Integer, LocalAssignment> existingAssignment, String 
assignmentId, AtomicInteger retries) {
+        try {
+            Map<Integer, LocalAssignment> portLA = new HashMap<Integer, 
LocalAssignment>();
+            for (Map.Entry<String, Map<String, Object>> assignEntry : 
assignmentsSnapshot.entrySet()) {
+                String stormId = assignEntry.getKey();
+                Assignment assignment = (Assignment) 
assignEntry.getValue().get(IStateStorage.DATA);
+
+                Map<Integer, LocalAssignment> portTasks = 
readMyExecutors(stormId, assignmentId, assignment);
+
+                for (Map.Entry<Integer, LocalAssignment> entry : 
portTasks.entrySet()) {
+
+                    Integer port = entry.getKey();
+
+                    LocalAssignment la = entry.getValue();
+
+                    if (!portLA.containsKey(port)) {
+                        portLA.put(port, la);
+                    } else {
+                        throw new RuntimeException("Should not have multiple 
topologys assigned to one port");
+                    }
+                }
+            }
+            retries.set(0);
+            return portLA;
+        } catch (RuntimeException e) {
+            if (retries.get() > 2) {
+                throw e;
+            } else {
+                retries.addAndGet(1);
+            }
+            LOG.warn("{} : retrying {} of 3", e.getMessage(), retries.get());
+            return existingAssignment;
+        }
+    }
+
+    protected Map<Integer, LocalAssignment> readMyExecutors(String stormId, 
String assignmentId, Assignment assignment) {
+        Map<Integer, LocalAssignment> portTasks = new HashMap<>();
+        Map<Long, WorkerResources> slotsResources = new HashMap<>();
+        Map<NodeInfo, WorkerResources> nodeInfoWorkerResourcesMap = 
assignment.get_worker_resources();
+        if (nodeInfoWorkerResourcesMap != null) {
+            for (Map.Entry<NodeInfo, WorkerResources> entry : 
nodeInfoWorkerResourcesMap.entrySet()) {
+                if (entry.getKey().get_node().equals(assignmentId)) {
+                    Set<Long> ports = entry.getKey().get_port();
+                    for (Long port : ports) {
+                        slotsResources.put(port, entry.getValue());
+                    }
+                }
+            }
+        }
+        Map<List<Long>, NodeInfo> executorNodePort = 
assignment.get_executor_node_port();
+        if (executorNodePort != null) {
+            for (Map.Entry<List<Long>, NodeInfo> entry : 
executorNodePort.entrySet()) {
+                if (entry.getValue().get_node().equals(assignmentId)) {
+                    for (Long port : entry.getValue().get_port()) {
+                        LocalAssignment localAssignment = portTasks.get(port);
+                        if (localAssignment == null) {
+                            List<ExecutorInfo> executors = new 
ArrayList<ExecutorInfo>();
+                            localAssignment = new LocalAssignment(stormId, 
executors);
+                            if (slotsResources.containsKey(port)) {
+                                
localAssignment.set_resources(slotsResources.get(port));
+                            }
+                            portTasks.put(port.intValue(), localAssignment);
+                        }
+                        List<ExecutorInfo> executorInfoList = 
localAssignment.get_executors();
+                        executorInfoList.add(new 
ExecutorInfo(entry.getKey().get(0).intValue(), 
entry.getKey().get(entry.getKey().size() - 1).intValue()));
+                    }
+                }
+            }
+        }
+        return portTasks;
+    }
+
+    // I konw it's not a good idea to create SyncProcessEvent, but I only hope 
SyncProcessEvent is responsible for start/shutdown
+    //workers, and SyncSupervisorEvent is responsible for download/remove 
topologys' binary.
+    protected void shutdownDisallowedWorkers() throws Exception{
+        Map conf = supervisorData.getConf();
+        LocalState localState = supervisorData.getLocalState();
+        Map<Integer, LocalAssignment> assignedExecutors = 
localState.getLocalAssignmentsMap();
+        if (assignedExecutors == null) {
+            assignedExecutors = new HashMap<>();
+        }
+        int now = Time.currentTimeSecs();
+        SyncProcessEvent syncProcesses = new SyncProcessEvent(supervisorData);
+        Map<String, StateHeartbeat> workerIdHbstate = 
syncProcesses.getLocalWorkerStats(assignedExecutors, now);
+        LOG.debug("Allocated workers ", assignedExecutors);
+        for (Map.Entry<String, StateHeartbeat> entry : 
workerIdHbstate.entrySet()){
+            String workerId = entry.getKey();
+            StateHeartbeat stateHeartbeat = entry.getValue();
+            if (stateHeartbeat.getState() == State.disallowed){
+                syncProcesses.shutWorker(supervisorData, workerId);
+                LOG.debug("{}'s state disallowed, so shutdown this worker");
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UpdateBlobs.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UpdateBlobs.java 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UpdateBlobs.java
new file mode 100644
index 0000000..90dccae
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UpdateBlobs.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.NimbusLeaderNotFoundException;
+import org.apache.storm.utils.Utils;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * downloads all blobs listed in the topology configuration for all topologies 
assigned to this supervisor, and creates version files with a suffix. The
+ * Runnable is intended to be run periodically by a timer, created elsewhere.
+ */
+public class UpdateBlobs implements Runnable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(UpdateBlobs.class);
+
+    private SupervisorData supervisorData;
+
+    public UpdateBlobs(SupervisorData supervisorData) {
+        this.supervisorData = supervisorData;
+    }
+
+    @Override
+    public void run() {
+        try {
+            Map conf = supervisorData.getConf();
+            Set<String> downloadedStormIds = 
SupervisorUtils.readDownLoadedStormIds(conf);
+            ConcurrentHashMap<Long, LocalAssignment> newAssignment = 
supervisorData.getCurrAssignment();
+            Set<String> assignedStormIds = new HashSet<>();
+            for (LocalAssignment localAssignment : newAssignment.values()) {
+                assignedStormIds.add(localAssignment.get_topology_id());
+            }
+            for (String stormId : downloadedStormIds) {
+                if (assignedStormIds.contains(stormId)) {
+                    String stormRoot = 
ConfigUtils.supervisorStormDistRoot(conf, stormId);
+                    LOG.debug("Checking Blob updates for storm topology id {} 
With target_dir: {}", stormId, stormRoot);
+                    updateBlobsForTopology(conf, stormId, 
supervisorData.getLocalizer());
+                }
+            }
+        } catch (Exception e) {
+            if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, 
e)) {
+                LOG.error("Network error while updating blobs, will retry 
again later", e);
+            } else if 
(Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) {
+                LOG.error("Nimbus unavailable to update blobs, will retry 
again later", e);
+            } else {
+                throw Utils.wrapInRuntime(e);
+            }
+        }
+    }
+
+    /**
+     * Update each blob listed in the topology configuration if the latest 
version of the blob has not been downloaded.
+     * 
+     * @param conf
+     * @param stormId
+     * @param localizer
+     * @throws IOException
+     */
+    private void updateBlobsForTopology(Map conf, String stormId, Localizer 
localizer) throws IOException {
+        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
+        Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
+        List<LocalResource> localresources = 
SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
+        try {
+            localizer.updateBlobs(localresources, user);
+        } catch (AuthorizationException authExp) {
+            LOG.error("AuthorizationException error", authExp);
+        } catch (KeyNotFoundException knf) {
+            LOG.error("KeyNotFoundException error", knf);
+        }
+    }
+}

Reply via email to