update supervisor based on revans2 and longdafeng

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dba69b52
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dba69b52
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dba69b52

Branch: refs/heads/master
Commit: dba69b528860b29fed435c85ee1f76b09f982105
Parents: d46ed8f 8be5417
Author: xiaojian.fxj <[email protected]>
Authored: Thu Mar 31 09:22:13 2016 +0800
Committer: xiaojian.fxj <[email protected]>
Committed: Thu Mar 31 13:24:15 2016 +0800

----------------------------------------------------------------------
 .gitignore                                      | 33 ++++++----
 CHANGELOG.md                                    |  1 +
 external/flux/.gitignore                        | 15 -----
 .../storm/daemon/supervisor/SupervisorData.java | 46 +++++++-------
 .../daemon/supervisor/SupervisorManager.java    | 19 ++----
 .../daemon/supervisor/SupervisorUtils.java      | 27 +++++++--
 .../daemon/supervisor/SyncProcessEvent.java     |  2 +-
 .../daemon/supervisor/SyncSupervisorEvent.java  | 41 +++++++------
 .../supervisor/timer/RunProfilerActions.java    | 63 +++++++++-----------
 .../supervisor/timer/SupervisorHealthCheck.java | 14 +----
 .../workermanager/DefaultWorkerManager.java     | 41 ++++++-------
 .../workermanager/IWorkerManager.java           | 11 ++--
 .../supervisor/workermanager/IWorkerResult.java | 21 -------
 13 files changed, 148 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/dba69b52/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
----------------------------------------------------------------------
diff --cc 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
index 213457d,0000000..da4102c
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java
@@@ -1,234 -1,0 +1,234 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.daemon.supervisor;
 +
 +import org.apache.storm.Config;
 +import org.apache.storm.StormTimer;
 +import org.apache.storm.cluster.ClusterStateContext;
 +import org.apache.storm.cluster.ClusterUtils;
 +import org.apache.storm.cluster.DaemonType;
 +import org.apache.storm.cluster.IStormClusterState;
 +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
 +import org.apache.storm.generated.LocalAssignment;
 +import org.apache.storm.generated.ProfileRequest;
 +import org.apache.storm.localizer.Localizer;
 +import org.apache.storm.messaging.IContext;
 +import org.apache.storm.scheduler.ISupervisor;
 +import org.apache.storm.utils.ConfigUtils;
 +import org.apache.storm.utils.LocalState;
 +import org.apache.storm.utils.Utils;
 +import org.apache.storm.utils.VersionInfo;
 +import org.apache.zookeeper.data.ACL;
 +import org.eclipse.jetty.util.ConcurrentHashSet;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.IOException;
 +import java.net.UnknownHostException;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +public class SupervisorData {
 +
 +    private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorData.class);
 +
 +    private final Map conf;
 +    private final IContext sharedContext;
 +    private volatile boolean active;
-     private ISupervisor iSupervisor;
-     private Utils.UptimeComputer upTime;
-     private String stormVersion;
-     private ConcurrentHashMap<String, String> workerThreadPids; // for local 
mode
-     private IStormClusterState stormClusterState;
-     private LocalState localState;
-     private String supervisorId;
-     private String assignmentId;
-     private String hostName;
++    private final ISupervisor iSupervisor;
++    private final Utils.UptimeComputer upTime;
++    private final String stormVersion;
++    private final ConcurrentHashMap<String, String> workerThreadPids; // for 
local mode
++    private final IStormClusterState stormClusterState;
++    private final LocalState localState;
++    private final String supervisorId;
++    private final String assignmentId;
++    private final String hostName;
 +    // used for reporting used ports when heartbeating
-     private AtomicReference<Map<Long, LocalAssignment>> currAssignment;
-     private StormTimer heartbeatTimer;
-     private StormTimer eventTimer;
-     private StormTimer blobUpdateTimer;
-     private Localizer localizer;
-     private AtomicReference<Map<String, Map<String, Object>>> 
assignmentVersions;
-     private AtomicInteger syncRetry;
++    private final AtomicReference<Map<Long, LocalAssignment>> currAssignment;
++    private final StormTimer heartbeatTimer;
++    private final StormTimer eventTimer;
++    private final StormTimer blobUpdateTimer;
++    private final Localizer localizer;
++    private final AtomicReference<Map<String, Map<String, Object>>> 
assignmentVersions;
++    private final AtomicInteger syncRetry;
 +    private final Object downloadLock = new Object();
-     private AtomicReference<Map<String, List<ProfileRequest>>> 
stormIdToProfileActions;
-     private ConcurrentHashSet<String> deadWorkers;
++    private final AtomicReference<Map<String, List<ProfileRequest>>> 
stormIdToProfilerActions;
++    private final ConcurrentHashSet<String> deadWorkers;
 +    private final IWorkerManager workerManager;
 +
 +    public SupervisorData(Map conf, IContext sharedContext, ISupervisor 
iSupervisor) {
 +        this.conf = conf;
 +        this.sharedContext = sharedContext;
 +        this.iSupervisor = iSupervisor;
 +        this.active = true;
 +        this.upTime = Utils.makeUptimeComputer();
 +        this.stormVersion = VersionInfo.getVersion();
 +        this.workerThreadPids = new ConcurrentHashMap<String, String>();
 +        this.deadWorkers = new ConcurrentHashSet();
 +
 +        List<ACL> acls = null;
 +        if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
 +            acls = SupervisorUtils.supervisorZkAcls();
 +        }
 +
 +        try {
 +            this.stormClusterState = ClusterUtils.mkStormClusterState(conf, 
acls, new ClusterStateContext(DaemonType.SUPERVISOR));
 +        } catch (Exception e) {
 +            LOG.error("supervisor can't create stormClusterState");
 +            throw Utils.wrapInRuntime(e);
 +        }
 +
 +        try {
 +            this.localState = ConfigUtils.supervisorState(conf);
 +            this.localizer = Utils.createLocalizer(conf, 
ConfigUtils.supervisorLocalDir(conf));
 +        } catch (IOException e) {
 +            throw Utils.wrapInRuntime(e);
 +        }
 +        this.supervisorId = iSupervisor.getSupervisorId();
 +        this.assignmentId = iSupervisor.getAssignmentId();
 +
 +        try {
 +            this.hostName = Utils.hostname(conf);
 +        } catch (UnknownHostException e) {
 +            throw Utils.wrapInRuntime(e);
 +        }
 +
 +        this.currAssignment = new AtomicReference<Map<Long, 
LocalAssignment>>(new HashMap<Long,LocalAssignment>());
 +
 +        this.heartbeatTimer = new StormTimer(null, new 
DefaultUncaughtExceptionHandler());
 +
 +        this.eventTimer = new StormTimer(null, new 
DefaultUncaughtExceptionHandler());
 +
 +        this.blobUpdateTimer = new StormTimer("blob-update-timer", new 
DefaultUncaughtExceptionHandler());
 +
 +        this.assignmentVersions = new AtomicReference<Map<String, Map<String, 
Object>>>(new HashMap<String, Map<String, Object>>());
 +        this.syncRetry = new AtomicInteger(0);
-         this.stormIdToProfileActions = new AtomicReference<Map<String, 
List<ProfileRequest>>>(new HashMap<String, List<ProfileRequest>>());
++        this.stormIdToProfilerActions = new AtomicReference<Map<String, 
List<ProfileRequest>>>(new HashMap<String, List<ProfileRequest>>());
 +        this.workerManager =  Utils.newInstance((String) 
conf.get(Config.STORM_SUPERVISOR_WORKER_MANAGER_PLUGIN));
 +        this.workerManager.prepareWorker(conf, localizer);
 +    }
 +
-     public AtomicReference<Map<String, List<ProfileRequest>>> 
getStormIdToProfileActions() {
-         return stormIdToProfileActions;
++    public AtomicReference<Map<String, List<ProfileRequest>>> 
getStormIdToProfilerActions() {
++        return stormIdToProfilerActions;
 +    }
 +
-     public void setStormIdToProfileActions(Map<String, List<ProfileRequest>> 
stormIdToProfileActions) {
-         this.stormIdToProfileActions.set(stormIdToProfileActions);
++    public void setStormIdToProfilerActions(Map<String, List<ProfileRequest>> 
stormIdToProfilerActions) {
++        this.stormIdToProfilerActions.set(stormIdToProfilerActions);
 +    }
 +
 +    public Map getConf() {
 +        return conf;
 +    }
 +
 +    public IContext getSharedContext() {
 +        return sharedContext;
 +    }
 +
 +    public boolean isActive() {
 +        return active;
 +    }
 +
 +    public void setActive(boolean active) {
 +        this.active = active;
 +    }
 +
 +    public ISupervisor getiSupervisor() {
 +        return iSupervisor;
 +    }
 +
 +    public Utils.UptimeComputer getUpTime() {
 +        return upTime;
 +    }
 +
 +    public String getStormVersion() {
 +        return stormVersion;
 +    }
 +
 +    public ConcurrentHashMap<String, String> getWorkerThreadPids() {
 +        return workerThreadPids;
 +    }
 +
 +    public IStormClusterState getStormClusterState() {
 +        return stormClusterState;
 +    }
 +
 +    public LocalState getLocalState() {
 +        return localState;
 +    }
 +
 +    public String getSupervisorId() {
 +        return supervisorId;
 +    }
 +
 +    public String getAssignmentId() {
 +        return assignmentId;
 +    }
 +
 +    public String getHostName() {
 +        return hostName;
 +    }
 +
 +    public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() {
 +        return currAssignment;
 +    }
 +
 +    public void setCurrAssignment(Map<Long, LocalAssignment> currAssignment) {
 +        this.currAssignment.set(currAssignment);
 +    }
 +
 +    public StormTimer getHeartbeatTimer() {
 +        return heartbeatTimer;
 +    }
 +
 +    public StormTimer getEventTimer() {
 +        return eventTimer;
 +    }
 +
 +    public StormTimer getBlobUpdateTimer() {
 +        return blobUpdateTimer;
 +    }
 +
 +    public Localizer getLocalizer() {
 +        return localizer;
 +    }
 +
 +    public AtomicInteger getSyncRetry() {
 +        return syncRetry;
 +    }
 +
 +    public AtomicReference<Map<String, Map<String, Object>>> 
getAssignmentVersions() {
 +        return assignmentVersions;
 +    }
 +
 +    public void setAssignmentVersions(Map<String, Map<String, Object>> 
assignmentVersions) {
 +        this.assignmentVersions.set(assignmentVersions);
 +    }
 +
 +    public ConcurrentHashSet getDeadWorkers() {
 +        return deadWorkers;
 +    }
 +
 +    public IWorkerManager getWorkerManager() {
 +        return workerManager;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/dba69b52/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java
----------------------------------------------------------------------
diff --cc 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java
index d593d3c,0000000..70363fa
mode 100644,000000..100644
--- 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java
+++ 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java
@@@ -1,103 -1,0 +1,92 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.daemon.supervisor;
 +
 +import org.apache.storm.daemon.DaemonCommon;
 +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
 +import org.apache.storm.event.EventManager;
 +import org.apache.storm.utils.Utils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.util.Collection;
 +import java.util.Map;
 +
 +public class SupervisorManager implements SupervisorDaemon, DaemonCommon, 
Runnable {
 +
 +    private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorManager.class);
 +    private final EventManager eventManager;
 +    private final EventManager processesEventManager;
-     private SupervisorData supervisorData;
++    private final SupervisorData supervisorData;
 +
 +    public SupervisorManager(SupervisorData supervisorData, EventManager 
eventManager, EventManager processesEventManager) {
 +        this.eventManager = eventManager;
 +        this.supervisorData = supervisorData;
 +        this.processesEventManager = processesEventManager;
 +    }
 +
 +    public void shutdown() {
-         LOG.info("Shutting down supervisor{}", 
supervisorData.getSupervisorId());
++        LOG.info("Shutting down supervisor {}", 
supervisorData.getSupervisorId());
 +        supervisorData.setActive(false);
 +        try {
 +            supervisorData.getHeartbeatTimer().close();
 +            supervisorData.getEventTimer().close();
 +            supervisorData.getBlobUpdateTimer().close();
 +            eventManager.close();
 +            processesEventManager.close();
 +        } catch (Exception e) {
 +            throw Utils.wrapInRuntime(e);
 +        }
 +        supervisorData.getStormClusterState().disconnect();
 +    }
 +
 +    @Override
 +    public void shutdownAllWorkers() {
-         Collection<String> workerIds = 
SupervisorUtils.supervisorWorkerIds(supervisorData.getConf());
 +        IWorkerManager workerManager = supervisorData.getWorkerManager();
-         try {
-             for (String workerId : workerIds) {
-                 
workerManager.shutdownWorker(supervisorData.getSupervisorId(), workerId, 
supervisorData.getWorkerThreadPids());
-                 boolean success = workerManager.cleanupWorker(workerId);
-                 if (success){
-                     supervisorData.getDeadWorkers().remove(workerId);
-                 }
-             }
-         } catch (Exception e) {
-             LOG.error("shutWorker failed");
-             throw Utils.wrapInRuntime(e);
-         }
++        SupervisorUtils.shutdownAllWorkers(supervisorData.getConf(), 
supervisorData.getSupervisorId(), supervisorData.getWorkerThreadPids(),
++                supervisorData.getDeadWorkers(), workerManager);
 +    }
 +
 +    @Override
 +    public Map getConf() {
 +        return supervisorData.getConf();
 +    }
 +
 +    @Override
 +    public String getId() {
 +        return supervisorData.getSupervisorId();
 +    }
 +
 +    @Override
 +    public boolean isWaiting() {
 +        if (!supervisorData.isActive()) {
 +            return true;
 +        }
 +
 +        if (supervisorData.getHeartbeatTimer().isTimerWaiting() && 
supervisorData.getEventTimer().isTimerWaiting() && eventManager.waiting()
 +                && processesEventManager.waiting()) {
 +            return true;
 +        }
 +        return false;
 +    }
 +
 +    public void run() {
 +        shutdown();
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/dba69b52/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --cc 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index a567956,0000000..33a8525
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@@ -1,271 -1,0 +1,286 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.daemon.supervisor;
 +
 +import org.apache.commons.lang.StringUtils;
 +import org.apache.storm.Config;
 +import org.apache.storm.ProcessSimulator;
++import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
 +import org.apache.storm.generated.LSWorkerHeartbeat;
 +import org.apache.storm.localizer.LocalResource;
 +import org.apache.storm.localizer.Localizer;
 +import org.apache.storm.utils.ConfigUtils;
 +import org.apache.storm.utils.LocalState;
 +import org.apache.storm.utils.Time;
 +import org.apache.storm.utils.Utils;
 +import org.apache.zookeeper.ZooDefs;
 +import org.apache.zookeeper.data.ACL;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.net.URLDecoder;
 +import java.util.*;
++import java.util.concurrent.ConcurrentHashMap;
 +
 +public class SupervisorUtils {
 +
 +    private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorUtils.class);
 +
 +    private static final SupervisorUtils INSTANCE = new SupervisorUtils();
 +    private static SupervisorUtils _instance = INSTANCE;
 +    public static void setInstance(SupervisorUtils u) {
 +        _instance = u;
 +    }
 +    public static void resetInstance() {
 +        _instance = INSTANCE;
 +    }
 +
 +    public static Process processLauncher(Map conf, String user, List<String> 
commandPrefix, List<String> args, Map<String, String> environment, final String 
logPreFix,
 +                                          final Utils.ExitCodeCallable 
exitCodeCallback, File dir) throws IOException {
 +        if (StringUtils.isBlank(user)) {
 +            throw new IllegalArgumentException("User cannot be blank when 
calling processLauncher.");
 +        }
 +        String wlinitial = (String) 
(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
 +        String stormHome = 
ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
 +        String wl;
 +        if (StringUtils.isNotBlank(wlinitial)) {
 +            wl = wlinitial;
 +        } else {
 +            wl = stormHome + "/bin/worker-launcher";
 +        }
 +        List<String> commands = new ArrayList<>();
 +        if (commandPrefix != null){
 +            commands.addAll(commandPrefix);
 +        }
 +        commands.add(wl);
 +        commands.add(user);
 +        commands.addAll(args);
 +        LOG.info("Running as user: {} command: {}", user, commands);
 +        return Utils.launchProcess(commands, environment, logPreFix, 
exitCodeCallback, dir);
 +    }
 +
 +    public static int processLauncherAndWait(Map conf, String user, 
List<String> args, final Map<String, String> environment, final String 
logPreFix)
 +            throws IOException {
 +        int ret = 0;
 +        Process process = processLauncher(conf, user, null, args, 
environment, logPreFix, null, null);
 +        if (StringUtils.isNotBlank(logPreFix))
 +            Utils.readAndLogStream(logPreFix, process.getInputStream());
 +        try {
 +            process.waitFor();
 +        } catch (InterruptedException e) {
 +            LOG.info("{} interrupted.", logPreFix);
 +        }
 +        ret = process.exitValue();
 +        return ret;
 +    }
 +
 +    public static void setupStormCodeDir(Map conf, Map stormConf, String dir) 
throws IOException {
 +        if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), 
false)) {
 +            String logPrefix = "setup conf for " + dir;
 +            List<String> commands = new ArrayList<>();
 +            commands.add("code-dir");
 +            commands.add(dir);
 +            processLauncherAndWait(conf, (String) 
(stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
 +        }
 +    }
 +
 +    public static void rmrAsUser(Map conf, String id, String path) throws 
IOException {
 +        String user = Utils.getFileOwner(path);
 +        String logPreFix = "rmr " + id;
 +        List<String> commands = new ArrayList<>();
 +        commands.add("rmr");
 +        commands.add(path);
 +        SupervisorUtils.processLauncherAndWait(conf, user, commands, null, 
logPreFix);
 +        if (Utils.checkFileExists(path)) {
 +            throw new RuntimeException(path + " was not deleted.");
 +        }
 +    }
 +
 +    /**
 +     * Given the blob information returns the value of the uncompress field, 
handling it either being a string or a boolean value, or if it's not specified 
then
 +     * returns false
 +     * 
 +     * @param blobInfo
 +     * @return
 +     */
 +    public static Boolean shouldUncompressBlob(Map<String, Object> blobInfo) {
 +        return Utils.getBoolean(blobInfo.get("uncompress"), false);
 +    }
 +
 +    /**
 +     * Returns a list of LocalResources based on the blobstore-map passed in
 +     * 
 +     * @param blobstoreMap
 +     * @return
 +     */
 +    public static List<LocalResource> 
blobstoreMapToLocalresources(Map<String, Map<String, Object>> blobstoreMap) {
 +        List<LocalResource> localResourceList = new ArrayList<>();
 +        if (blobstoreMap != null) {
 +            for (Map.Entry<String, Map<String, Object>> map : 
blobstoreMap.entrySet()) {
 +                LocalResource localResource = new LocalResource(map.getKey(), 
shouldUncompressBlob(map.getValue()));
 +                localResourceList.add(localResource);
 +            }
 +        }
 +        return localResourceList;
 +    }
 +
 +    /**
 +     * For each of the downloaded topologies, adds references to the blobs 
that the topologies are using. This is used to reconstruct the cache on restart.
 +     * 
 +     * @param localizer
 +     * @param stormId
 +     * @param conf
 +     */
 +    public static void addBlobReferences(Localizer localizer, String stormId, 
Map conf) throws IOException {
 +        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
 +        Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
 +        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
 +        String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
 +        List<LocalResource> localresources = 
SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
 +        if (blobstoreMap != null) {
 +            localizer.addReferences(localresources, user, topoName);
 +        }
 +    }
 +
 +    public static Set<String> readDownLoadedStormIds(Map conf) throws 
IOException {
 +        Set<String> stormIds = new HashSet<>();
 +        String path = ConfigUtils.supervisorStormDistRoot(conf);
 +        Collection<String> rets = Utils.readDirContents(path);
 +        for (String ret : rets) {
 +            stormIds.add(URLDecoder.decode(ret));
 +        }
 +        return stormIds;
 +    }
 +
 +    public static Collection<String> supervisorWorkerIds(Map conf) {
 +        String workerRoot = ConfigUtils.workerRoot(conf);
 +        return Utils.readDirContents(workerRoot);
 +    }
 +
 +    public static boolean doRequiredTopoFilesExist(Map conf, String stormId) 
throws IOException {
 +        String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
 +        String stormjarpath = ConfigUtils.supervisorStormJarPath(stormroot);
 +        String stormcodepath = ConfigUtils.supervisorStormCodePath(stormroot);
 +        String stormconfpath = ConfigUtils.supervisorStormConfPath(stormroot);
 +        if (!Utils.checkFileExists(stormroot))
 +            return false;
 +        if (!Utils.checkFileExists(stormcodepath))
 +            return false;
 +        if (!Utils.checkFileExists(stormconfpath))
 +            return false;
 +        if (ConfigUtils.isLocalMode(conf) || 
Utils.checkFileExists(stormjarpath))
 +            return true;
 +        return false;
 +    }
 +
 +    /**
 +     * map from worker id to heartbeat
 +     *
 +     * @param conf
 +     * @return
 +     * @throws Exception
 +     */
 +    public static Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map 
conf) throws Exception {
 +        return _instance.readWorkerHeartbeatsImpl(conf);
 +    }
 +
 +    public  Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map conf) 
throws Exception {
 +        Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>();
 +
 +        Collection<String> workerIds = 
SupervisorUtils.supervisorWorkerIds(conf);
 +
 +        for (String workerId : workerIds) {
 +            LSWorkerHeartbeat whb = readWorkerHeartbeat(conf, workerId);
 +            // ATTENTION: whb can be null
 +            workerHeartbeats.put(workerId, whb);
 +        }
 +        return workerHeartbeats;
 +    }
 +
 +
 +    /**
 +     * get worker heartbeat by workerId
 +     *
 +     * @param conf
 +     * @param workerId
 +     * @return
 +     * @throws IOException
 +     */
 +    public static LSWorkerHeartbeat readWorkerHeartbeat(Map conf, String 
workerId) {
 +        return _instance.readWorkerHeartbeatImpl(conf, workerId);
 +    }
 +
 +    public  LSWorkerHeartbeat readWorkerHeartbeatImpl(Map conf, String 
workerId) {
 +        try {
 +            LocalState localState = ConfigUtils.workerState(conf, workerId);
 +            return localState.getWorkerHeartBeat();
 +        } catch (Exception e) {
 +            LOG.warn("Failed to read local heartbeat for workerId : 
{},Ignoring exception.", workerId, e);
 +            return null;
 +        }
 +    }
 +
 +    public static boolean  isWorkerHbTimedOut(int now, LSWorkerHeartbeat whb, 
Map conf) {
 +        return _instance.isWorkerHbTimedOutImpl(now, whb, conf);
 +    }
 +
 +    public  boolean  isWorkerHbTimedOutImpl(int now, LSWorkerHeartbeat whb, 
Map conf) {
-         boolean result = false;
-         if ((now - whb.get_time_secs()) > 
Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) {
-             result = true;
-         }
-         return result;
++        return (now - whb.get_time_secs()) > 
Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
 +    }
 +
 +    public static String javaCmd(String cmd) {
 +        return _instance.javaCmdImpl(cmd);
 +    }
 +
 +    public String javaCmdImpl(String cmd) {
 +        String ret = null;
 +        String javaHome = System.getenv().get("JAVA_HOME");
 +        if (StringUtils.isNotBlank(javaHome)) {
 +            ret = javaHome + Utils.FILE_PATH_SEPARATOR + "bin" + 
Utils.FILE_PATH_SEPARATOR + cmd;
 +        } else {
 +            ret = cmd;
 +        }
 +        return ret;
 +    }
 +    
-     public final static List<ACL> supervisorZkAcls() {
++    public static List<ACL> supervisorZkAcls() {
 +        final List<ACL> acls = new ArrayList<>();
 +        acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
 +        acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), 
ZooDefs.Ids.ANYONE_ID_UNSAFE));
 +        return acls;
 +    }
++
++    public static void shutdownAllWorkers(Map conf, String supervisorId, 
Map<String, String> workerThreadPids, Set<String> deadWorkers,
++            IWorkerManager workerManager) {
++        Collection<String> workerIds = 
SupervisorUtils.supervisorWorkerIds(conf);
++        try {
++            for (String workerId : workerIds) {
++                workerManager.shutdownWorker(supervisorId, workerId, 
workerThreadPids);
++                boolean success = workerManager.cleanupWorker(workerId);
++                if (success) {
++                    deadWorkers.remove(workerId);
++                }
++            }
++        } catch (Exception e) {
++            LOG.error("shutWorker failed");
++            throw Utils.wrapInRuntime(e);
++        }
++    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/dba69b52/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
----------------------------------------------------------------------
diff --cc 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
index fb4e7ab,0000000..38b79e1
mode 100644,000000..100644
--- 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
+++ 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java
@@@ -1,427 -1,0 +1,427 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.daemon.supervisor;
 +
 +import org.apache.commons.io.FileUtils;
 +import org.apache.commons.lang.StringUtils;
 +import org.apache.storm.Config;
 +import org.apache.storm.container.cgroup.CgroupManager;
 +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
 +import org.apache.storm.generated.ExecutorInfo;
 +import org.apache.storm.generated.LSWorkerHeartbeat;
 +import org.apache.storm.generated.LocalAssignment;
 +import org.apache.storm.generated.WorkerResources;
 +import org.apache.storm.utils.ConfigUtils;
 +import org.apache.storm.utils.LocalState;
 +import org.apache.storm.utils.Time;
 +import org.apache.storm.utils.Utils;
 +import org.eclipse.jetty.util.ConcurrentHashSet;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import org.yaml.snakeyaml.Yaml;
 +
 +import java.io.File;
 +import java.io.FileWriter;
 +import java.io.IOException;
 +import java.util.*;
 +
 +/**
 + * 1. to kill are those in allocated that are dead or disallowed 2. kill the 
ones that should be dead - read pids, kill -9 and individually remove file - rmr
 + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of 
the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
 + * ids, write new "approved workers" to LS 5. create local dir for worker id 
5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
 + * launch
 + */
 +public class SyncProcessEvent implements Runnable {
 +
 +    private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
 +
 +    private  LocalState localState;
 +    private  SupervisorData supervisorData;
 +    public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
 +
 +    private class ProcessExitCallback implements Utils.ExitCodeCallable {
 +        private final String logPrefix;
 +        private final String workerId;
 +
 +        public ProcessExitCallback(String logPrefix, String workerId) {
 +            this.logPrefix = logPrefix;
 +            this.workerId = workerId;
 +        }
 +
 +        @Override
 +        public Object call() throws Exception {
 +            return null;
 +        }
 +
 +        @Override
 +        public Object call(int exitCode) {
 +            LOG.info("{} exited with code: {}", logPrefix, exitCode);
 +            supervisorData.getDeadWorkers().add(workerId);
 +            return null;
 +        }
 +    }
 +
 +    public SyncProcessEvent(){
 +
 +    }
 +    public SyncProcessEvent(SupervisorData supervisorData) {
 +        init(supervisorData);
 +    }
 +    
 +    public void init(SupervisorData supervisorData){
 +        this.supervisorData = supervisorData;
 +        this.localState = supervisorData.getLocalState();
 +    }
 +
 +    @Override
 +    public void run() {
 +        LOG.debug("Syncing processes");
 +        try {
 +            Map conf = supervisorData.getConf();
 +            Map<Integer, LocalAssignment> assignedExecutors = 
localState.getLocalAssignmentsMap();
 +
 +            if (assignedExecutors == null) {
 +                assignedExecutors = new HashMap<>();
 +            }
 +            int now = Time.currentTimeSecs();
 +
 +            Map<String, StateHeartbeat> localWorkerStats = 
getLocalWorkerStats(supervisorData, assignedExecutors, now);
 +
 +            Set<String> keeperWorkerIds = new HashSet<>();
 +            Set<Integer> keepPorts = new HashSet<>();
 +            for (Map.Entry<String, StateHeartbeat> entry : 
localWorkerStats.entrySet()) {
 +                StateHeartbeat stateHeartbeat = entry.getValue();
 +                if (stateHeartbeat.getState() == State.VALID) {
 +                    keeperWorkerIds.add(entry.getKey());
 +                    keepPorts.add(stateHeartbeat.getHeartbeat().get_port());
 +                }
 +            }
 +            Map<Integer, LocalAssignment> reassignExecutors = 
getReassignExecutors(assignedExecutors, keepPorts);
 +            Map<Integer, String> newWorkerIds = new HashMap<>();
 +            for (Integer port : reassignExecutors.keySet()) {
 +                newWorkerIds.put(port, Utils.uuid());
 +            }
-             LOG.debug("Syncing processes");
 +            LOG.debug("Assigned executors: {}", assignedExecutors);
 +            LOG.debug("Allocated: {}", localWorkerStats);
 +
 +            for (Map.Entry<String, StateHeartbeat> entry : 
localWorkerStats.entrySet()) {
 +                StateHeartbeat stateHeartbeat = entry.getValue();
 +                if (stateHeartbeat.getState() != State.VALID) {
 +                    LOG.info("Shutting down and clearing state for id {}, 
Current supervisor time: {}, State: {}, Heartbeat: {}", entry.getKey(), now,
 +                            stateHeartbeat.getState(), 
stateHeartbeat.getHeartbeat());
 +                    killWorker(supervisorData, 
supervisorData.getWorkerManager(), entry.getKey());
 +                }
 +            }
 +            // start new workers
 +            Map<String, Integer> newWorkerPortToIds = 
startNewWorkers(newWorkerIds, reassignExecutors);
 +
 +            Map<String, Integer> allWorkerPortToIds = new HashMap<>();
 +            Map<String, Integer> approvedWorkers = 
localState.getApprovedWorkers();
 +            for (String keeper : keeperWorkerIds) {
 +                allWorkerPortToIds.put(keeper, approvedWorkers.get(keeper));
 +            }
 +            allWorkerPortToIds.putAll(newWorkerPortToIds);
 +            localState.setApprovedWorkers(allWorkerPortToIds);
 +            waitForWorkersLaunch(conf, newWorkerPortToIds.keySet());
 +
 +        } catch (Exception e) {
 +            LOG.error("Failed Sync Process", e);
 +            throw Utils.wrapInRuntime(e);
 +        }
 +
 +    }
 +
 +    protected void waitForWorkersLaunch(Map conf, Set<String> workerIds) 
throws Exception {
 +        int startTime = Time.currentTimeSecs();
 +        int timeOut = (int) conf.get(Config.NIMBUS_SUPERVISOR_TIMEOUT_SECS);
 +        for (String workerId : workerIds) {
 +            LocalState localState = ConfigUtils.workerState(conf, workerId);
 +            while (true) {
 +                LSWorkerHeartbeat hb = localState.getWorkerHeartBeat();
 +                if (hb != null || (Time.currentTimeSecs() - startTime) > 
timeOut)
 +                    break;
 +                LOG.info("{} still hasn't started", workerId);
 +                Time.sleep(500);
 +            }
 +            if (localState.getWorkerHeartBeat() == null) {
 +                LOG.info("Worker {} failed to start", workerId);
 +            }
 +        }
 +    }
 +
 +    protected Map<Integer, LocalAssignment> getReassignExecutors(Map<Integer, 
LocalAssignment> assignExecutors, Set<Integer> keepPorts) {
 +        Map<Integer, LocalAssignment> reassignExecutors = new HashMap<>();
 +        reassignExecutors.putAll(assignExecutors);
 +        for (Integer port : keepPorts) {
 +            reassignExecutors.remove(port);
 +        }
 +        return reassignExecutors;
 +    }
 +    
 +    /**
 +     * Returns map from worker id to worker heartbeat. if the heartbeat is 
nil, then the worker is dead
 +     * 
 +     * @param assignedExecutors
 +     * @return
 +     * @throws Exception
 +     */
 +    public Map<String, StateHeartbeat> getLocalWorkerStats(SupervisorData 
supervisorData, Map<Integer, LocalAssignment> assignedExecutors, int now) 
throws Exception {
 +        Map<String, StateHeartbeat> workerIdHbstate = new HashMap<>();
 +        Map conf = supervisorData.getConf();
 +        LocalState localState = supervisorData.getLocalState();
 +        Map<String, LSWorkerHeartbeat> idToHeartbeat = 
SupervisorUtils.readWorkerHeartbeats(conf);
 +        Map<String, Integer> approvedWorkers = 
localState.getApprovedWorkers();
 +        Set<String> approvedIds = new HashSet<>();
 +        if (approvedWorkers != null) {
 +            approvedIds.addAll(approvedWorkers.keySet());
 +        }
 +        for (Map.Entry<String, LSWorkerHeartbeat> entry : 
idToHeartbeat.entrySet()) {
 +            String workerId = entry.getKey();
 +            LSWorkerHeartbeat whb = entry.getValue();
 +            State state;
 +            if (whb == null) {
 +                state = State.NOT_STARTED;
 +            } else if (!approvedIds.contains(workerId) || 
!matchesAssignment(whb, assignedExecutors)) {
 +                state = State.DISALLOWED;
 +            } else if (supervisorData.getDeadWorkers().contains(workerId)) {
 +                LOG.info("Worker Process {} has died", workerId);
 +                state = State.TIMED_OUT;
 +            } else if (SupervisorUtils.isWorkerHbTimedOut(now, whb, conf)) {
 +                state = State.TIMED_OUT;
 +            } else {
 +                state = State.VALID;
 +            }
 +            LOG.debug("Worker:{} state:{} WorkerHeartbeat:{} at supervisor 
time-secs {}", workerId, state, whb, now);
 +            workerIdHbstate.put(workerId, new StateHeartbeat(state, whb));
 +        }
 +        return workerIdHbstate;
 +    }
 +
 +    protected boolean matchesAssignment(LSWorkerHeartbeat whb, Map<Integer, 
LocalAssignment> assignedExecutors) {
 +        LocalAssignment localAssignment = 
assignedExecutors.get(whb.get_port());
 +        if (localAssignment == null || 
!localAssignment.get_topology_id().equals(whb.get_topology_id())) {
 +            return false;
 +        }
 +        List<ExecutorInfo> executorInfos = new ArrayList<>();
 +        executorInfos.addAll(whb.get_executors());
 +        // remove SYSTEM_EXECUTOR_ID
 +        executorInfos.remove(SYSTEM_EXECUTOR_INFO);
 +        List<ExecutorInfo> localExecuorInfos = 
localAssignment.get_executors();
 +
 +        if (localExecuorInfos.size() != executorInfos.size())
 +            return false;
 +
 +        for (ExecutorInfo executorInfo : localExecuorInfos){
 +            if (!localExecuorInfos.contains(executorInfo))
 +                return false;
 +        }
 +        return true;
 +    }
 +
 +    /**
 +     * launch a worker in local mode.
 +     */
 +    protected void launchLocalWorker(SupervisorData supervisorData, String 
stormId, Long port, String workerId, WorkerResources resources) throws 
IOException {
 +        // port this function after porting worker to java
 +    }
 +
 +    protected void launchDistributedWorker(IWorkerManager workerManager, Map 
conf, String supervisorId, String assignmentId, String stormId, Long port, 
String workerId,
 +                                           WorkerResources resources, 
ConcurrentHashSet deadWorkers) throws IOException {
 +        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
 +        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
 +        writeLogMetadata(stormConf, user, workerId, stormId, port, conf);
 +        ConfigUtils.setWorkerUserWSE(conf, workerId, user);
 +        createArtifactsLink(conf, stormId, port, workerId);
 +
 +        String logPrefix = "Worker Process " + workerId;
 +        if (deadWorkers != null)
 +            deadWorkers.remove(workerId);
 +        createBlobstoreLinks(conf, stormId, workerId);
 +        ProcessExitCallback processExitCallback = new 
ProcessExitCallback(logPrefix, workerId);
 +        workerManager.launchWorker(supervisorId, assignmentId, stormId, port, 
workerId, resources, processExitCallback);
 +    }
 +
 +    protected Map<String, Integer> startNewWorkers(Map<Integer, String> 
newWorkerIds, Map<Integer, LocalAssignment> reassignExecutors) throws 
IOException {
 +
 +        Map<String, Integer> newValidWorkerIds = new HashMap<>();
 +        Map conf = supervisorData.getConf();
 +        String supervisorId = supervisorData.getSupervisorId();
 +        String clusterMode = ConfigUtils.clusterMode(conf);
 +
 +        for (Map.Entry<Integer, LocalAssignment> entry : 
reassignExecutors.entrySet()) {
 +            Integer port = entry.getKey();
 +            LocalAssignment assignment = entry.getValue();
 +            String workerId = newWorkerIds.get(port);
 +            String stormId = assignment.get_topology_id();
 +            WorkerResources resources = assignment.get_resources();
 +
 +            // This condition checks for required files exist before 
launching the worker
 +            if (SupervisorUtils.doRequiredTopoFilesExist(conf, stormId)) {
 +                String pidsPath = ConfigUtils.workerPidsRoot(conf, workerId);
 +                String hbPath = ConfigUtils.workerHeartbeatsRoot(conf, 
workerId);
 +
 +                LOG.info("Launching worker with assignment {} for this 
supervisor {} on port {} with id {}", assignment, 
supervisorData.getSupervisorId(), port,
 +                        workerId);
 +
 +                FileUtils.forceMkdir(new File(pidsPath));
 +                FileUtils.forceMkdir(new File(ConfigUtils.workerTmpRoot(conf, 
workerId)));
 +                FileUtils.forceMkdir(new File(hbPath));
 +
 +                if (clusterMode.endsWith("distributed")) {
 +                    
launchDistributedWorker(supervisorData.getWorkerManager(), conf, supervisorId, 
supervisorData.getAssignmentId(), stormId, port.longValue(), workerId, 
resources, supervisorData.getDeadWorkers());
 +                } else if (clusterMode.endsWith("local")) {
 +                    launchLocalWorker(supervisorData, stormId, 
port.longValue(), workerId, resources);
 +                }
 +                newValidWorkerIds.put(workerId, port);
 +
 +            } else {
 +                LOG.info("Missing topology storm code, so can't launch worker 
with assignment {} for this supervisor {} on port {} with id {}", assignment,
 +                        supervisorData.getSupervisorId(), port, workerId);
 +            }
 +
 +        }
 +        return newValidWorkerIds;
 +    }
 +
 +    public void writeLogMetadata(Map stormconf, String user, String workerId, 
String stormId, Long port, Map conf) throws IOException {
 +        Map data = new HashMap();
 +        data.put(Config.TOPOLOGY_SUBMITTER_USER, user);
 +        data.put("worker-id", workerId);
 +
 +        Set<String> logsGroups = new HashSet<>();
 +        //for supervisor-test
 +        if (stormconf.get(Config.LOGS_GROUPS) != null) {
 +            List<String> groups = (List<String>) 
stormconf.get(Config.LOGS_GROUPS);
 +            for (String group : groups){
 +                logsGroups.add(group);
 +            }
 +        }
 +        if (stormconf.get(Config.TOPOLOGY_GROUPS) != null) {
 +            List<String> topGroups = (List<String>) 
stormconf.get(Config.TOPOLOGY_GROUPS);
 +            logsGroups.addAll(topGroups);
 +        }
 +        data.put(Config.LOGS_GROUPS, logsGroups.toArray());
 +
 +        Set<String> logsUsers = new HashSet<>();
 +        if (stormconf.get(Config.LOGS_USERS) != null) {
 +            List<String> logUsers = (List<String>) 
stormconf.get(Config.LOGS_USERS);
 +            for (String logUser : logUsers){
 +                logsUsers.add(logUser);
 +            }
 +        }
 +        if (stormconf.get(Config.TOPOLOGY_USERS) != null) {
 +            List<String> topUsers = (List<String>) 
stormconf.get(Config.TOPOLOGY_USERS);
 +            for (String logUser : topUsers){
 +                logsUsers.add(logUser);
 +            }
 +        }
 +        data.put(Config.LOGS_USERS, logsUsers.toArray());
 +        writeLogMetadataToYamlFile(stormId, port, data, conf);
 +    }
 +
 +    /**
 +     * run worker as user needs the directory to have special permissions or 
it is insecure
 +     * 
 +     * @param stormId
 +     * @param port
 +     * @param data
 +     * @param conf
 +     * @throws IOException
 +     */
 +    public void writeLogMetadataToYamlFile(String stormId, Long port, Map 
data, Map conf) throws IOException {
 +        File file = ConfigUtils.getLogMetaDataFile(conf, stormId, 
port.intValue());
 +
 +        if (!Utils.checkFileExists(file.getParent())) {
 +            if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
 +                FileUtils.forceMkdir(file.getParentFile());
 +                SupervisorUtils.setupStormCodeDir(conf, 
ConfigUtils.readSupervisorStormConf(conf, stormId), 
file.getParentFile().getCanonicalPath());
 +            } else {
 +                file.getParentFile().mkdirs();
 +            }
 +        }
 +        FileWriter writer = new FileWriter(file);
 +        Yaml yaml = new Yaml();
 +        try {
 +            yaml.dump(data, writer);
 +        }finally {
 +            writer.close();
 +        }
 +    }
 +
 +    /**
 +     * Create a symlink from workder directory to its port artifacts directory
 +     * 
 +     * @param conf
 +     * @param stormId
 +     * @param port
 +     * @param workerId
 +     */
 +    protected void createArtifactsLink(Map conf, String stormId, Long port, 
String workerId) throws IOException {
 +        String workerDir = ConfigUtils.workerRoot(conf, workerId);
 +        String topoDir = ConfigUtils.workerArtifactsRoot(conf, stormId);
 +        if (Utils.checkFileExists(workerDir)) {
++            LOG.debug("Creating symlinks for worker-id: {} storm-id: {} to 
its port artifacts directory", workerId, stormId);
 +            Utils.createSymlink(workerDir, topoDir, "artifacts", 
String.valueOf(port));
 +        }
 +    }
 +
 +    /**
 +     * Create symlinks in worker launch directory for all blobs
 +     * 
 +     * @param conf
 +     * @param stormId
 +     * @param workerId
 +     * @throws IOException
 +     */
 +    protected void createBlobstoreLinks(Map conf, String stormId, String 
workerId) throws IOException {
 +        String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
 +        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
 +        String workerRoot = ConfigUtils.workerRoot(conf, workerId);
 +        Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
 +        List<String> blobFileNames = new ArrayList<>();
 +        if (blobstoreMap != null) {
 +            for (Map.Entry<String, Map<String, Object>> entry : 
blobstoreMap.entrySet()) {
 +                String key = entry.getKey();
 +                Map<String, Object> blobInfo = entry.getValue();
 +                String ret = null;
 +                if (blobInfo != null && blobInfo.containsKey("localname")) {
 +                    ret = (String) blobInfo.get("localname");
 +                } else {
 +                    ret = key;
 +                }
 +                blobFileNames.add(ret);
 +            }
 +        }
 +        List<String> resourceFileNames = new ArrayList<>();
 +        resourceFileNames.add(ConfigUtils.RESOURCES_SUBDIR);
 +        resourceFileNames.addAll(blobFileNames);
 +        LOG.info("Creating symlinks for worker-id: {} storm-id: {} for 
files({}): {}", workerId, stormId, resourceFileNames.size(), resourceFileNames);
 +        Utils.createSymlink(workerRoot, stormRoot, 
ConfigUtils.RESOURCES_SUBDIR);
 +        for (String fileName : blobFileNames) {
 +            Utils.createSymlink(workerRoot, stormRoot, fileName, fileName);
 +        }
 +    }
 +
 +    public void killWorker(SupervisorData supervisorData, IWorkerManager 
workerManager, String workerId) throws IOException, InterruptedException{
 +        workerManager.shutdownWorker(supervisorData.getSupervisorId(), 
workerId, supervisorData.getWorkerThreadPids());
 +        boolean success = workerManager.cleanupWorker(workerId);
 +        if (success){
 +            supervisorData.getDeadWorkers().remove(workerId);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/dba69b52/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
----------------------------------------------------------------------
diff --cc 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
index b53db06,0000000..128d229
mode 100644,000000..100644
--- 
a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
+++ 
b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java
@@@ -1,626 -1,0 +1,633 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.daemon.supervisor;
 +
 +import org.apache.commons.io.FileUtils;
 +import org.apache.storm.Config;
 +import org.apache.storm.blobstore.BlobStore;
 +import org.apache.storm.blobstore.ClientBlobStore;
 +import org.apache.storm.cluster.IStateStorage;
 +import org.apache.storm.cluster.IStormClusterState;
 +import org.apache.storm.event.EventManager;
 +import org.apache.storm.generated.*;
 +import org.apache.storm.localizer.LocalResource;
 +import org.apache.storm.localizer.LocalizedResource;
 +import org.apache.storm.localizer.Localizer;
 +import org.apache.storm.utils.*;
 +import org.apache.thrift.transport.TTransportException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.File;
 +import java.io.FileOutputStream;
 +import java.io.IOException;
 +import java.net.JarURLConnection;
 +import java.net.URL;
 +import java.nio.file.Files;
 +import java.nio.file.StandardCopyOption;
 +import java.util.*;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +public class SyncSupervisorEvent implements Runnable {
 +
 +    private static final Logger LOG = 
LoggerFactory.getLogger(SyncSupervisorEvent.class);
 +
 +    private EventManager syncSupEventManager;
 +    private EventManager syncProcessManager;
 +    private IStormClusterState stormClusterState;
 +    private LocalState localState;
 +    private SyncProcessEvent syncProcesses;
 +    private SupervisorData supervisorData;
 +
 +    public SyncSupervisorEvent(SupervisorData supervisorData, 
SyncProcessEvent syncProcesses, EventManager syncSupEventManager,
 +            EventManager syncProcessManager) {
 +
 +        this.syncProcesses = syncProcesses;
 +        this.syncSupEventManager = syncSupEventManager;
 +        this.syncProcessManager = syncProcessManager;
 +        this.stormClusterState = supervisorData.getStormClusterState();
 +        this.localState = supervisorData.getLocalState();
 +        this.supervisorData = supervisorData;
 +    }
 +
 +    @Override
 +    public void run() {
 +        try {
 +            Map conf = supervisorData.getConf();
 +            Runnable syncCallback = new EventManagerPushCallback(this, 
syncSupEventManager);
 +            List<String> stormIds = 
stormClusterState.assignments(syncCallback);
 +            Map<String, Map<String, Object>> assignmentsSnapshot =
 +                    getAssignmentsSnapshot(stormClusterState, stormIds, 
supervisorData.getAssignmentVersions().get(), syncCallback);
 +            Map<String, List<ProfileRequest>> stormIdToProfilerActions = 
getProfileActions(stormClusterState, stormIds);
 +
 +            Set<String> allDownloadedTopologyIds = 
SupervisorUtils.readDownLoadedStormIds(conf);
 +            Map<String, String> stormcodeMap = 
readStormCodeLocations(assignmentsSnapshot);
 +            Map<Integer, LocalAssignment> existingAssignment = 
localState.getLocalAssignmentsMap();
 +            if (existingAssignment == null) {
 +                existingAssignment = new HashMap<>();
 +            }
 +
 +            Map<Integer, LocalAssignment> allAssignment =
 +                    readAssignments(assignmentsSnapshot, existingAssignment, 
supervisorData.getAssignmentId(), supervisorData.getSyncRetry());
 +
 +            Map<Integer, LocalAssignment> newAssignment = new HashMap<>();
 +            Set<String> assignedStormIds = new HashSet<>();
 +
 +            for (Map.Entry<Integer, LocalAssignment> entry : 
allAssignment.entrySet()) {
 +                if 
(supervisorData.getiSupervisor().confirmAssigned(entry.getKey())) {
 +                    newAssignment.put(entry.getKey(), entry.getValue());
 +                    assignedStormIds.add(entry.getValue().get_topology_id());
 +                }
 +            }
 +
-             Set<String> srashStormIds = verifyDownloadedFiles(conf, 
supervisorData.getLocalizer(), assignedStormIds, allDownloadedTopologyIds);
++            Set<String> crashedStormIds = verifyDownloadedFiles(conf, 
supervisorData.getLocalizer(), assignedStormIds, allDownloadedTopologyIds);
 +            Set<String> downloadedStormIds = new HashSet<>();
 +            downloadedStormIds.addAll(allDownloadedTopologyIds);
-             downloadedStormIds.removeAll(srashStormIds);
++            downloadedStormIds.removeAll(crashedStormIds);
 +
 +            LOG.debug("Synchronizing supervisor");
 +            LOG.debug("Storm code map: {}", stormcodeMap);
 +            LOG.debug("All assignment: {}", allAssignment);
 +            LOG.debug("New assignment: {}", newAssignment);
 +            LOG.debug("Assigned Storm Ids {}", assignedStormIds);
 +            LOG.debug("All Downloaded Ids {}", allDownloadedTopologyIds);
-             LOG.debug("Checked Downloaded Ids {}", srashStormIds);
++            LOG.debug("Checked Downloaded Ids {}", crashedStormIds);
 +            LOG.debug("Downloaded Ids {}", downloadedStormIds);
 +            LOG.debug("Storm Ids Profiler Actions {}", 
stormIdToProfilerActions);
 +
 +            // download code first
 +            // This might take awhile
 +            // - should this be done separately from usual monitoring?
 +            // should we only download when topology is assigned to this 
supervisor?
 +            for (Map.Entry<String, String> entry : stormcodeMap.entrySet()) {
 +                String stormId = entry.getKey();
 +                if (!downloadedStormIds.contains(stormId) && 
assignedStormIds.contains(stormId)) {
 +                    LOG.info("Downloading code for storm id {}.", stormId);
 +                    try {
 +                        downloadStormCode(conf, stormId, entry.getValue(), 
supervisorData.getLocalizer());
 +                    } catch (Exception e) {
 +                        if 
(Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) {
 +                            LOG.warn("Nimbus leader was not available.", e);
 +                        } else if 
(Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) {
 +                            LOG.warn("There was a connection problem with 
nimbus.", e);
 +                        } else {
 +                            throw e;
 +                        }
 +                    }
 +                    LOG.info("Finished downloading code for storm id {}", 
stormId);
 +                }
 +            }
 +
 +            LOG.debug("Writing new assignment {}", newAssignment);
 +
 +            Set<Integer> killWorkers = new HashSet<>();
 +            killWorkers.addAll(existingAssignment.keySet());
 +            killWorkers.removeAll(newAssignment.keySet());
 +            for (Integer port : killWorkers) {
 +                supervisorData.getiSupervisor().killedWorker(port);
 +            }
 +
 +            killExistingWorkersWithChangeInComponents(supervisorData, 
existingAssignment, newAssignment);
 +
 +            supervisorData.getiSupervisor().assigned(newAssignment.keySet());
 +            localState.setLocalAssignmentsMap(newAssignment);
 +            supervisorData.setAssignmentVersions(assignmentsSnapshot);
-             
supervisorData.setStormIdToProfileActions(stormIdToProfilerActions);
++            
supervisorData.setStormIdToProfilerActions(stormIdToProfilerActions);
 +
 +            Map<Long, LocalAssignment> convertNewAssignment = new HashMap<>();
 +            for (Map.Entry<Integer, LocalAssignment> entry : 
newAssignment.entrySet()) {
 +                convertNewAssignment.put(entry.getKey().longValue(), 
entry.getValue());
 +            }
 +            supervisorData.setCurrAssignment(convertNewAssignment);
 +            // remove any downloaded code that's no longer assigned or active
 +            // important that this happens after setting the local assignment 
so that
 +            // synchronize-supervisor doesn't try to launch workers for which 
the
 +            // resources don't exist
 +            if (Utils.isOnWindows()) {
 +                shutdownDisallowedWorkers();
 +            }
 +            for (String stormId : allDownloadedTopologyIds) {
 +                if (!stormcodeMap.containsKey(stormId)) {
 +                    LOG.info("Removing code for storm id {}.", stormId);
 +                    rmTopoFiles(conf, stormId, supervisorData.getLocalizer(), 
true);
 +                }
 +            }
 +            syncProcessManager.add(syncProcesses);
 +        } catch (Exception e) {
 +            LOG.error("Failed to Sync Supervisor", e);
 +            throw new RuntimeException(e);
 +        }
 +
 +    }
 +
 +    private void killExistingWorkersWithChangeInComponents(SupervisorData 
supervisorData, Map<Integer, LocalAssignment> existingAssignment,
 +            Map<Integer, LocalAssignment> newAssignment) throws Exception {
-         LocalState localState = supervisorData.getLocalState();
-         Map<Integer, LocalAssignment> assignedExecutors = 
localState.getLocalAssignmentsMap();
-         if (assignedExecutors == null) {
-             assignedExecutors = new HashMap<>();
-         }
 +        int now = Time.currentTimeSecs();
-         Map<String, StateHeartbeat> workerIdHbstate = 
syncProcesses.getLocalWorkerStats(supervisorData, assignedExecutors, now);
++        Map<String, StateHeartbeat> workerIdHbstate = 
syncProcesses.getLocalWorkerStats(supervisorData, existingAssignment, now);
 +        Map<Integer, String> vaildPortToWorkerIds = new HashMap<>();
 +        for (Map.Entry<String, StateHeartbeat> entry : 
workerIdHbstate.entrySet()) {
 +            String workerId = entry.getKey();
 +            StateHeartbeat stateHeartbeat = entry.getValue();
 +            if (stateHeartbeat != null && stateHeartbeat.getState() == 
State.VALID) {
 +                
vaildPortToWorkerIds.put(stateHeartbeat.getHeartbeat().get_port(), workerId);
 +            }
 +        }
 +
 +        Map<Integer, LocalAssignment> intersectAssignment = new HashMap<>();
 +        for (Map.Entry<Integer, LocalAssignment> entry : 
newAssignment.entrySet()) {
 +            Integer port = entry.getKey();
 +            if (existingAssignment.containsKey(port)) {
 +                intersectAssignment.put(port, entry.getValue());
 +            }
 +        }
 +
 +        for (Integer port : intersectAssignment.keySet()) {
 +            List<ExecutorInfo> existExecutors = 
existingAssignment.get(port).get_executors();
 +            List<ExecutorInfo> newExecutors = 
newAssignment.get(port).get_executors();
 +            Set<ExecutorInfo> setExitExecutors = new 
HashSet<>(existExecutors);
 +            Set<ExecutorInfo>  setNewExecutors = new HashSet<>(newExecutors);
 +            if (!setExitExecutors.equals(setNewExecutors)){
 +                syncProcesses.killWorker(supervisorData, 
supervisorData.getWorkerManager(), vaildPortToWorkerIds.get(port));
 +            }
 +        }
 +    }
 +
 +    protected Map<String, Map<String, Object>> 
getAssignmentsSnapshot(IStormClusterState stormClusterState, List<String> 
stormIds,
 +            Map<String, Map<String, Object>> localAssignmentVersion, Runnable 
callback) throws Exception {
 +        Map<String, Map<String, Object>> updateAssignmentVersion = new 
HashMap<>();
 +        for (String stormId : stormIds) {
 +            Integer recordedVersion = -1;
 +            Integer version = stormClusterState.assignmentVersion(stormId, 
callback);
 +            if (localAssignmentVersion.containsKey(stormId) && 
localAssignmentVersion.get(stormId) != null) {
 +                recordedVersion = (Integer) 
localAssignmentVersion.get(stormId).get(IStateStorage.VERSION);
 +            }
 +            if (version == null) {
 +                // ignore
 +            } else if (version == recordedVersion) {
 +                updateAssignmentVersion.put(stormId, 
localAssignmentVersion.get(stormId));
 +            } else {
 +                Map<String, Object> assignmentVersion = (Map<String, Object>) 
stormClusterState.assignmentInfoWithVersion(stormId, callback);
 +                updateAssignmentVersion.put(stormId, assignmentVersion);
 +            }
 +        }
 +        return updateAssignmentVersion;
 +    }
 +
 +    protected Map<String, List<ProfileRequest>> 
getProfileActions(IStormClusterState stormClusterState, List<String> stormIds) 
throws Exception {
 +        Map<String, List<ProfileRequest>> ret = new HashMap<String, 
List<ProfileRequest>>();
 +        for (String stormId : stormIds) {
 +            List<ProfileRequest> profileRequests = 
stormClusterState.getTopologyProfileRequests(stormId);
 +            ret.put(stormId, profileRequests);
 +        }
 +        return ret;
 +    }
 +
 +    protected Map<String, String> readStormCodeLocations(Map<String, 
Map<String, Object>> assignmentsSnapshot) {
 +        Map<String, String> stormcodeMap = new HashMap<>();
 +        for (Map.Entry<String, Map<String, Object>> entry : 
assignmentsSnapshot.entrySet()) {
 +            Assignment assignment = (Assignment) 
(entry.getValue().get(IStateStorage.DATA));
 +            if (assignment != null) {
 +                stormcodeMap.put(entry.getKey(), 
assignment.get_master_code_dir());
 +            }
 +        }
 +        return stormcodeMap;
 +    }
 +
 +    /**
 +     * Remove a reference to a blob when its no longer needed.
 +     * 
 +     * @param localizer
 +     * @param stormId
 +     * @param conf
 +     */
 +    protected void removeBlobReferences(Localizer localizer, String stormId, 
Map conf) throws Exception {
 +        Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId);
 +        Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
 +        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
 +        String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
 +        if (blobstoreMap != null) {
 +            for (Map.Entry<String, Map<String, Object>> entry : 
blobstoreMap.entrySet()) {
 +                String key = entry.getKey();
 +                Map<String, Object> blobInfo = entry.getValue();
 +                localizer.removeBlobReference(key, user, topoName, 
SupervisorUtils.shouldUncompressBlob(blobInfo));
 +            }
 +        }
 +    }
 +
 +    protected void rmTopoFiles(Map conf, String stormId, Localizer localizer, 
boolean isrmBlobRefs) throws IOException {
 +        String path = ConfigUtils.supervisorStormDistRoot(conf, stormId);
 +        try {
 +            if (isrmBlobRefs) {
 +                removeBlobReferences(localizer, stormId, conf);
 +            }
 +            if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
 +                SupervisorUtils.rmrAsUser(conf, stormId, path);
 +            } else {
 +                Utils.forceDelete(ConfigUtils.supervisorStormDistRoot(conf, 
stormId));
 +            }
 +        } catch (Exception e) {
 +            LOG.info("Exception removing: {} ", stormId, e);
 +        }
 +    }
 +
 +    /**
 +     * Check for the files exists to avoid supervisor crashing Also makes 
sure there is no necessity for locking"
 +     * 
 +     * @param conf
 +     * @param localizer
 +     * @param assignedStormIds
 +     * @param allDownloadedTopologyIds
 +     * @return
 +     */
 +    protected Set<String> verifyDownloadedFiles(Map conf, Localizer 
localizer, Set<String> assignedStormIds, Set<String> allDownloadedTopologyIds)
 +            throws IOException {
 +        Set<String> srashStormIds = new HashSet<>();
 +        for (String stormId : allDownloadedTopologyIds) {
 +            if (assignedStormIds.contains(stormId)) {
 +                if (!SupervisorUtils.doRequiredTopoFilesExist(conf, stormId)) 
{
 +                    LOG.debug("Files not present in topology directory");
 +                    rmTopoFiles(conf, stormId, localizer, false);
 +                    srashStormIds.add(stormId);
 +                }
 +            }
 +        }
 +        return srashStormIds;
 +    }
 +
 +    /**
 +     * download code ; two cluster mode: local and distributed
 +     *
 +     * @param conf
 +     * @param stormId
 +     * @param masterCodeDir
 +     * @throws IOException
 +     */
 +    private void downloadStormCode(Map conf, String stormId, String 
masterCodeDir, Localizer localizer) throws Exception {
 +        String clusterMode = ConfigUtils.clusterMode(conf);
 +
 +        if (clusterMode.endsWith("distributed")) {
 +            downloadDistributeStormCode(conf, stormId, masterCodeDir, 
localizer);
 +        } else if (clusterMode.endsWith("local")) {
 +            downloadLocalStormCode(conf, stormId, masterCodeDir, localizer);
 +        }
 +    }
 +
 +    private void downloadLocalStormCode(Map conf, String stormId, String 
masterCodeDir, Localizer localizer) throws Exception {
 +
 +        String tmproot = ConfigUtils.supervisorTmpDir(conf) + 
Utils.FILE_PATH_SEPARATOR + Utils.uuid();
 +        String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
 +        BlobStore blobStore = Utils.getNimbusBlobStore(conf, masterCodeDir, 
null);
++        FileOutputStream codeOutStream = null;
++        FileOutputStream confOutStream = null;
 +        try {
 +            FileUtils.forceMkdir(new File(tmproot));
 +            String stormCodeKey = ConfigUtils.masterStormCodeKey(stormId);
 +            String stormConfKey = ConfigUtils.masterStormConfKey(stormId);
 +            String codePath = ConfigUtils.supervisorStormCodePath(tmproot);
 +            String confPath = ConfigUtils.supervisorStormConfPath(tmproot);
-             blobStore.readBlobTo(stormCodeKey, new 
FileOutputStream(codePath), null);
-             blobStore.readBlobTo(stormConfKey, new 
FileOutputStream(confPath), null);
++            codeOutStream = new FileOutputStream(codePath);
++            blobStore.readBlobTo(stormCodeKey, codeOutStream, null);
++            confOutStream = new FileOutputStream(confPath);
++            blobStore.readBlobTo(stormConfKey, confOutStream, null);
 +        } finally {
++            if (codeOutStream != null)
++                codeOutStream.close();
++            if (confOutStream != null)
++                codeOutStream.close();
 +            blobStore.shutdown();
 +        }
 +        FileUtils.moveDirectory(new File(tmproot), new File(stormroot));
 +        SupervisorUtils.setupStormCodeDir(conf, 
ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot);
 +        ClassLoader classloader = 
Thread.currentThread().getContextClassLoader();
 +
 +        String resourcesJar = resourcesJar();
 +
 +        URL url = classloader.getResource(ConfigUtils.RESOURCES_SUBDIR);
 +
 +        String targetDir = stormroot + Utils.FILE_PATH_SEPARATOR + 
ConfigUtils.RESOURCES_SUBDIR;
 +
 +        if (resourcesJar != null) {
 +            LOG.info("Extracting resources from jar at {} to {}", 
resourcesJar, targetDir);
 +            Utils.extractDirFromJar(resourcesJar, 
ConfigUtils.RESOURCES_SUBDIR, stormroot);
 +        } else if (url != null) {
 +
 +            LOG.info("Copying resources at {} to {} ", url.toString(), 
targetDir);
 +            if (url.getProtocol() == "jar") {
 +                JarURLConnection urlConnection = (JarURLConnection) 
url.openConnection();
 +                
Utils.extractDirFromJar(urlConnection.getJarFileURL().getFile(), 
ConfigUtils.RESOURCES_SUBDIR, stormroot);
 +            } else {
 +                FileUtils.copyDirectory(new File(url.getFile()), (new 
File(targetDir)));
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Downloading to permanent location is atomic
 +     * 
 +     * @param conf
 +     * @param stormId
 +     * @param masterCodeDir
 +     * @param localizer
 +     * @throws Exception
 +     */
 +    private void downloadDistributeStormCode(Map conf, String stormId, String 
masterCodeDir, Localizer localizer) throws Exception {
 +
 +        String tmproot = ConfigUtils.supervisorTmpDir(conf) + 
Utils.FILE_PATH_SEPARATOR + Utils.uuid();
 +        String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId);
 +        ClientBlobStore blobStore = 
Utils.getClientBlobStoreForSupervisor(conf);
 +        FileUtils.forceMkdir(new File(tmproot));
 +        if (Utils.isOnWindows()) {
 +            if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
 +                throw new RuntimeException("ERROR: Windows doesn't implement 
setting the correct permissions");
 +            }
 +        } else {
 +            Utils.restrictPermissions(tmproot);
 +        }
 +        String stormJarKey = ConfigUtils.masterStormJarKey(stormId);
 +        String stormCodeKey = ConfigUtils.masterStormCodeKey(stormId);
 +        String stormConfKey = ConfigUtils.masterStormConfKey(stormId);
 +        String jarPath = ConfigUtils.supervisorStormJarPath(tmproot);
 +        String codePath = ConfigUtils.supervisorStormCodePath(tmproot);
 +        String confPath = ConfigUtils.supervisorStormConfPath(tmproot);
 +        Utils.downloadResourcesAsSupervisor(stormJarKey, jarPath, blobStore);
 +        Utils.downloadResourcesAsSupervisor(stormCodeKey, codePath, 
blobStore);
 +        Utils.downloadResourcesAsSupervisor(stormConfKey, confPath, 
blobStore);
 +        blobStore.shutdown();
 +        Utils.extractDirFromJar(jarPath, ConfigUtils.RESOURCES_SUBDIR, 
tmproot);
 +        downloadBlobsForTopology(conf, confPath, localizer, tmproot);
-         if (IsDownloadBlobsForTopologySucceed(confPath, tmproot)) {
++        if (didDownloadBlobsForTopologySucceed(confPath, tmproot)) {
 +            LOG.info("Successfully downloaded blob resources for storm-id 
{}", stormId);
-             FileUtils.forceMkdir(new File(stormroot));
-             Files.move(new File(tmproot).toPath(), new 
File(stormroot).toPath(), StandardCopyOption.ATOMIC_MOVE);
-             SupervisorUtils.setupStormCodeDir(conf, 
ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot);
++            if (Utils.isOnWindows()) {
++                // Files/move with non-empty directory doesn't work well on 
Windows
++                FileUtils.moveDirectory(new File(tmproot), new 
File(stormroot));
++            } else {
++                FileUtils.forceMkdir(new File(stormroot));
++                Files.move(new File(tmproot).toPath(), new 
File(stormroot).toPath(), StandardCopyOption.ATOMIC_MOVE);
++            }
 +        } else {
 +            LOG.info("Failed to download blob resources for storm-id ", 
stormId);
 +            Utils.forceDelete(tmproot);
 +        }
 +    }
 +
 +    /**
 +     * Assert if all blobs are downloaded for the given topology
 +     * 
 +     * @param stormconfPath
 +     * @param targetDir
 +     * @return
 +     */
-     protected boolean IsDownloadBlobsForTopologySucceed(String stormconfPath, 
String targetDir) throws IOException {
++    protected boolean didDownloadBlobsForTopologySucceed(String 
stormconfPath, String targetDir) throws IOException {
 +        Map stormConf = 
Utils.fromCompressedJsonConf(FileUtils.readFileToByteArray(new 
File(stormconfPath)));
 +        Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
 +        List<String> blobFileNames = new ArrayList<>();
 +        if (blobstoreMap != null) {
 +            for (Map.Entry<String, Map<String, Object>> entry : 
blobstoreMap.entrySet()) {
 +                String key = entry.getKey();
 +                Map<String, Object> blobInfo = entry.getValue();
 +                String ret = null;
 +                if (blobInfo != null && blobInfo.containsKey("localname")) {
 +                    ret = (String) blobInfo.get("localname");
 +                } else {
 +                    ret = key;
 +                }
 +                blobFileNames.add(ret);
 +            }
 +        }
 +        for (String string : blobFileNames) {
 +            if (!Utils.checkFileExists(string))
 +                return false;
 +        }
 +        return true;
 +    }
 +
 +    /**
 +     * Download all blobs listed in the topology configuration for a given 
topology.
 +     * 
 +     * @param conf
 +     * @param stormconfPath
 +     * @param localizer
 +     * @param tmpRoot
 +     */
 +    protected void downloadBlobsForTopology(Map conf, String stormconfPath, 
Localizer localizer, String tmpRoot) throws IOException {
 +        Map stormConf = ConfigUtils.readSupervisorStormConfGivenPath(conf, 
stormconfPath);
 +        Map<String, Map<String, Object>> blobstoreMap = (Map<String, 
Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
 +        String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER);
 +        String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME);
 +        File userDir = localizer.getLocalUserFileCacheDir(user);
 +        List<LocalResource> localResourceList = 
SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
 +        if (localResourceList.size() > 0) {
 +            if (!userDir.exists()) {
 +                FileUtils.forceMkdir(userDir);
 +            }
 +            try {
 +                List<LocalizedResource> localizedResources = 
localizer.getBlobs(localResourceList, user, topoName, userDir);
 +                setupBlobPermission(conf, user, userDir.toString());
 +                for (LocalizedResource localizedResource : 
localizedResources) {
 +                    File rsrcFilePath = new 
File(localizedResource.getFilePath());
 +                    String keyName = rsrcFilePath.getName();
 +                    String blobSymlinkTargetName = new 
File(localizedResource.getCurrentSymlinkPath()).getName();
 +
 +                    String symlinkName = null;
 +                    if (blobstoreMap != null) {
 +                        Map<String, Object> blobInfo = 
blobstoreMap.get(keyName);
 +                        if (blobInfo != null && 
blobInfo.containsKey("localname")) {
 +                            symlinkName = (String) blobInfo.get("localname");
 +                        } else {
 +                            symlinkName = keyName;
 +                        }
 +                    }
 +                    Utils.createSymlink(tmpRoot, rsrcFilePath.getParent(), 
symlinkName, blobSymlinkTargetName);
 +                }
 +            } catch (AuthorizationException authExp) {
 +                LOG.error("AuthorizationException error {}", authExp);
 +            } catch (KeyNotFoundException knf) {
 +                LOG.error("KeyNotFoundException error {}", knf);
 +            }
 +        }
 +    }
 +
 +    protected void setupBlobPermission(Map conf, String user, String path) 
throws IOException {
 +        if (Utils.getBoolean(Config.SUPERVISOR_RUN_WORKER_AS_USER, false)) {
 +            String logPrefix = "setup blob permissions for " + path;
 +            SupervisorUtils.processLauncherAndWait(conf, user, 
Arrays.asList("blob", path), null, logPrefix);
 +        }
 +
 +    }
 +
 +    private String resourcesJar() throws IOException {
 +
 +        String path = Utils.currentClasspath();
 +        if (path == null) {
 +            return null;
 +        }
 +        String[] paths = path.split(File.pathSeparator);
 +        List<String> jarPaths = new ArrayList<String>();
 +        for (String s : paths) {
 +            if (s.endsWith(".jar")) {
 +                jarPaths.add(s);
 +            }
 +        }
 +
 +        List<String> rtn = new ArrayList<String>();
 +        int size = jarPaths.size();
 +        for (int i = 0; i < size; i++) {
 +            if (Utils.zipDoesContainDir(jarPaths.get(i), 
ConfigUtils.RESOURCES_SUBDIR)) {
 +                rtn.add(jarPaths.get(i));
 +            }
 +        }
 +        if (rtn.size() == 0)
 +            return null;
 +
 +        return rtn.get(0);
 +    }
 +
 +    protected Map<Integer, LocalAssignment> readAssignments(Map<String, 
Map<String, Object>> assignmentsSnapshot,
 +            Map<Integer, LocalAssignment> existingAssignment, String 
assignmentId, AtomicInteger retries) {
 +        try {
 +            Map<Integer, LocalAssignment> portLA = new HashMap<Integer, 
LocalAssignment>();
 +            for (Map.Entry<String, Map<String, Object>> assignEntry : 
assignmentsSnapshot.entrySet()) {
 +                String stormId = assignEntry.getKey();
 +                Assignment assignment = (Assignment) 
assignEntry.getValue().get(IStateStorage.DATA);
 +
 +                Map<Integer, LocalAssignment> portTasks = 
readMyExecutors(stormId, assignmentId, assignment);
 +
 +                for (Map.Entry<Integer, LocalAssignment> entry : 
portTasks.entrySet()) {
 +
 +                    Integer port = entry.getKey();
 +
 +                    LocalAssignment la = entry.getValue();
 +
 +                    if (!portLA.containsKey(port)) {
 +                        portLA.put(port, la);
 +                    } else {
 +                        throw new RuntimeException("Should not have multiple 
topologys assigned to one port");
 +                    }
 +                }
 +            }
 +            retries.set(0);
 +            return portLA;
 +        } catch (RuntimeException e) {
 +            if (retries.get() > 2) {
 +                throw e;
 +            } else {
 +                retries.addAndGet(1);
 +            }
 +            LOG.warn("{} : retrying {} of 3", e.getMessage(), retries.get());
 +            return existingAssignment;
 +        }
 +    }
 +
 +    protected Map<Integer, LocalAssignment> readMyExecutors(String stormId, 
String assignmentId, Assignment assignment) {
 +        Map<Integer, LocalAssignment> portTasks = new HashMap<>();
 +        Map<Long, WorkerResources> slotsResources = new HashMap<>();
 +        Map<NodeInfo, WorkerResources> nodeInfoWorkerResourcesMap = 
assignment.get_worker_resources();
 +        if (nodeInfoWorkerResourcesMap != null) {
 +            for (Map.Entry<NodeInfo, WorkerResources> entry : 
nodeInfoWorkerResourcesMap.entrySet()) {
 +                if (entry.getKey().get_node().equals(assignmentId)) {
 +                    Set<Long> ports = entry.getKey().get_port();
 +                    for (Long port : ports) {
 +                        slotsResources.put(port, entry.getValue());
 +                    }
 +                }
 +            }
 +        }
 +        Map<List<Long>, NodeInfo> executorNodePort = 
assignment.get_executor_node_port();
 +        if (executorNodePort != null) {
 +            for (Map.Entry<List<Long>, NodeInfo> entry : 
executorNodePort.entrySet()) {
 +                if (entry.getValue().get_node().equals(assignmentId)) {
 +                    for (Long port : entry.getValue().get_port()) {
 +                        LocalAssignment localAssignment = 
portTasks.get(port.intValue());
 +                        if (localAssignment == null) {
 +                            List<ExecutorInfo> executors = new 
ArrayList<ExecutorInfo>();
 +                            localAssignment = new LocalAssignment(stormId, 
executors);
 +                            if (slotsResources.containsKey(port)) {
 +                                
localAssignment.set_resources(slotsResources.get(port));
 +                            }
 +                            portTasks.put(port.intValue(), localAssignment);
 +                        }
 +                        List<ExecutorInfo> executorInfoList = 
localAssignment.get_executors();
 +                        executorInfoList.add(new 
ExecutorInfo(entry.getKey().get(0).intValue(), 
entry.getKey().get(entry.getKey().size() - 1).intValue()));
 +                    }
 +                }
 +            }
 +        }
 +        return portTasks;
 +    }
 +
 +    // I konw it's not a good idea to create SyncProcessEvent, but I only 
hope SyncProcessEvent is responsible for start/shutdown
 +    // workers, and SyncSupervisorEvent is responsible for download/remove 
topologys' binary.
 +    protected void shutdownDisallowedWorkers() throws Exception {
 +        LocalState localState = supervisorData.getLocalState();
 +        Map<Integer, LocalAssignment> assignedExecutors = 
localState.getLocalAssignmentsMap();
 +        if (assignedExecutors == null) {
 +            assignedExecutors = new HashMap<>();
 +        }
 +        int now = Time.currentTimeSecs();
 +        Map<String, StateHeartbeat> workerIdHbstate = 
syncProcesses.getLocalWorkerStats(supervisorData, assignedExecutors, now);
 +        LOG.debug("Allocated workers ", assignedExecutors);
 +        for (Map.Entry<String, StateHeartbeat> entry : 
workerIdHbstate.entrySet()) {
 +            String workerId = entry.getKey();
 +            StateHeartbeat stateHeartbeat = entry.getValue();
 +            if (stateHeartbeat.getState() == State.DISALLOWED) {
 +                syncProcesses.killWorker(supervisorData, 
supervisorData.getWorkerManager(), workerId);
 +                LOG.debug("{}'s state disallowed, so shutdown this worker");
 +            }
 +        }
 +    }
 +}

Reply via email to