xxxx
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/465a4b89 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/465a4b89 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/465a4b89 Branch: refs/heads/master Commit: 465a4b89521a4ac15b81969009133bdfa12d0655 Parents: 42bacde Author: xiaojian.fxj <[email protected]> Authored: Thu Mar 10 20:12:18 2016 +0800 Committer: xiaojian.fxj <[email protected]> Committed: Thu Mar 10 20:12:18 2016 +0800 ---------------------------------------------------------------------- .../org/apache/storm/command/kill_workers.clj | 5 +- .../apache/storm/daemon/local_supervisor.clj | 5 +- .../storm/daemon/supervisor/ShutdownWork.java | 7 +- .../daemon/supervisor/StandaloneSupervisor.java | 2 - .../apache/storm/daemon/supervisor/State.java | 2 +- .../storm/daemon/supervisor/Supervisor.java | 9 +- .../storm/daemon/supervisor/SupervisorData.java | 112 ++++--------------- .../daemon/supervisor/SupervisorManger.java | 5 +- .../daemon/supervisor/SupervisorUtils.java | 101 +++++++++++++++-- .../daemon/supervisor/SyncProcessEvent.java | 33 +++--- .../daemon/supervisor/SyncSupervisorEvent.java | 17 ++- .../supervisor/timer/RunProfilerActions.java | 2 +- .../supervisor/timer/SupervisorHealthCheck.java | 4 +- .../supervisor/timer/SupervisorHeartbeat.java | 14 +-- .../daemon/supervisor/timer/UpdateBlobs.java | 5 +- 15 files changed, 168 insertions(+), 155 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/clj/org/apache/storm/command/kill_workers.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/kill_workers.clj b/storm-core/src/clj/org/apache/storm/command/kill_workers.clj index a7de176..4ddc993 100644 --- a/storm-core/src/clj/org/apache/storm/command/kill_workers.clj +++ b/storm-core/src/clj/org/apache/storm/command/kill_workers.clj @@ -28,7 +28,6 @@ conf (assoc conf STORM-LOCAL-DIR (. (File. (conf STORM-LOCAL-DIR)) getCanonicalPath)) isupervisor (StandaloneSupervisor.) supervisor-data (SupervisorData. conf nil isupervisor) - ids (SupervisorUtils/myWorkerIds conf) - shut-workers (ShutdownWork.)] + ids (SupervisorUtils/supervisorWorkerIds conf)] (doseq [id ids] - (.shutWorker shut-workers supervisor-data id)))) + (SupervisorUtils/shutWorker supervisor-data id)))) http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj index 3dfed6f..70c280a 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj @@ -14,7 +14,7 @@ ;; See the License for the specific language governing permissions and ;; limitations under the License. (ns org.apache.storm.daemon.local-supervisor - (:import [org.apache.storm.daemon.supervisor SyncProcessEvent SupervisorData ShutdownWork Supervisor] + (:import [org.apache.storm.daemon.supervisor SyncProcessEvent SupervisorData ShutdownWork Supervisor SupervisorUtils] [org.apache.storm.utils Utils ConfigUtils] [org.apache.storm ProcessSimulator]) (:use [org.apache.storm.daemon common] @@ -38,9 +38,8 @@ )) (defn shutdown-local-worker [supervisorData workerId] - (let [shut-workers (ShutdownWork.)] (log-message "shutdown-local-worker") - (.shutWorker shut-workers supervisorData workerId))) + (SupervisorUtils/shutWorker supervisorData workerId)) (defn local-process [] "Create a local process event" http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java index 5018ce1..ec69980 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java @@ -26,7 +26,6 @@ import org.apache.storm.utils.Time; import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.File; import java.io.IOException; import java.util.*; @@ -42,7 +41,7 @@ public class ShutdownWork implements Shutdownable { Integer shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)); Boolean asUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false); String user = ConfigUtils.getWorkerUser(conf, workerId); - String threadPid = supervisorData.getWorkerThreadPidsAtom().get(workerId); + String threadPid = supervisorData.getWorkerThreadPids().get(workerId); if (StringUtils.isNotBlank(threadPid)) { ProcessSimulator.killProcess(threadPid); } @@ -53,7 +52,7 @@ public class ShutdownWork implements Shutdownable { commands.add("signal"); commands.add(pid); commands.add("15"); - String logPrefix = "kill - 15 " + pid; + String logPrefix = "kill -15 " + pid; SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix); } else { Utils.killProcessWithSigTerm(pid); @@ -71,7 +70,7 @@ public class ShutdownWork implements Shutdownable { commands.add("signal"); commands.add(pid); commands.add("9"); - String logPrefix = "kill - 9 " + pid; + String logPrefix = "kill -9 " + pid; SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix); } else { Utils.forceKillProcess(pid); http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java index c13df8b..d4ce623 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java @@ -28,9 +28,7 @@ import java.util.Map; import java.util.UUID; public class StandaloneSupervisor implements ISupervisor { - private String supervisorId; - private Map conf; @Override http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java index 1913c91..28dffd7 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java @@ -18,5 +18,5 @@ package org.apache.storm.daemon.supervisor; public enum State { - valid, disallowed, notStarted, timedOut; + VALID, DISALLOWED, NOT_STARTED, TIMED_OUT; } http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java index 2c7810d..847b38d 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java @@ -140,7 +140,7 @@ public class Supervisor { /** * start distribute supervisor */ - private void launch() { + private void launch(ISupervisor iSupervisor) { LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion()); SupervisorManger supervisorManager; try { @@ -148,11 +148,10 @@ public class Supervisor { if (ConfigUtils.isLocalMode(conf)) { throw new IllegalArgumentException("Cannot start server in local mode!"); } - ISupervisor iSupervisor = new StandaloneSupervisor(); supervisorManager = mkSupervisor(conf, null, iSupervisor); if (supervisorManager != null) Utils.addShutdownHookWithForceKillIn1Sec(supervisorManager); - registerWorkerNumGauge("drpc:num-execute-http-requests", conf); + registerWorkerNumGauge("supervisor:num-slots-used-gauge", conf); startMetricsReporters(conf); } catch (Exception e) { LOG.error("Failed to start supervisor\n", e); @@ -167,7 +166,7 @@ public class Supervisor { metricRegistry.register(name, new Gauge<Integer>() { @Override public Integer getValue() { - Collection<String> pids = SupervisorUtils.myWorkerIds(conf); + Collection<String> pids = SupervisorUtils.supervisorWorkerIds(conf); return pids.size(); } }); @@ -191,6 +190,6 @@ public class Supervisor { public static void main(String[] args) { Utils.setupDefaultUncaughtExceptionHandler(); Supervisor instance = new Supervisor(); - instance.launch(); + instance.launch(new StandaloneSupervisor()); } } http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java index 039fe30..be39b4e 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java @@ -42,23 +42,25 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; public class SupervisorData { private static final Logger LOG = LoggerFactory.getLogger(SupervisorData.class); - private Map conf; - private IContext sharedContext; + private final Map conf; + private final IContext sharedContext; private volatile boolean active; private ISupervisor iSupervisor; private Utils.UptimeComputer upTime; private String stormVersion; - private ConcurrentHashMap<String, String> workerThreadPidsAtom; // for local mode + private ConcurrentHashMap<String, String> workerThreadPids; // for local mode private IStormClusterState stormClusterState; @@ -71,7 +73,7 @@ public class SupervisorData { private String hostName; // used for reporting used ports when heartbeating - private ConcurrentHashMap<Long, LocalAssignment> currAssignment; + private AtomicReference<Map<Long, LocalAssignment>> currAssignment; private StormTimer heartbeatTimer; @@ -81,13 +83,13 @@ public class SupervisorData { private Localizer localizer; - private ConcurrentHashMap<String, Map<String, Object>> assignmentVersions; + private AtomicReference<Map<String, Map<String, Object>>> assignmentVersions; private AtomicInteger syncRetry; private final Object downloadLock = new Object(); - private ConcurrentHashMap<String, List<ProfileRequest>> stormIdToProfileActions; + private AtomicReference<Map<String, List<ProfileRequest>>> stormIdToProfileActions; private CgroupManager resourceIsolationManager; @@ -100,7 +102,7 @@ public class SupervisorData { this.active = true; this.upTime = Utils.makeUptimeComputer(); this.stormVersion = VersionInfo.getVersion(); - this.workerThreadPidsAtom = new ConcurrentHashMap<String, String>(); + this.workerThreadPids = new ConcurrentHashMap<String, String>(); this.deadWorkers = new ConcurrentHashSet(); List<ACL> acls = null; @@ -130,7 +132,7 @@ public class SupervisorData { throw Utils.wrapInRuntime(e); } - this.currAssignment = new ConcurrentHashMap<>(); + this.currAssignment = new AtomicReference<Map<Long, LocalAssignment>>(new HashMap<Long,LocalAssignment>()); this.heartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler()); @@ -138,9 +140,9 @@ public class SupervisorData { this.blobUpdateTimer = new StormTimer("blob-update-timer", new DefaultUncaughtExceptionHandler()); - this.assignmentVersions = new ConcurrentHashMap<>(); + this.assignmentVersions = new AtomicReference<Map<String, Map<String, Object>>>(new HashMap<String, Map<String, Object>>()); this.syncRetry = new AtomicInteger(0); - this.stormIdToProfileActions = new ConcurrentHashMap<>(); + this.stormIdToProfileActions = new AtomicReference<Map<String, List<ProfileRequest>>>(new HashMap<String, List<ProfileRequest>>()); if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) { try { this.resourceIsolationManager = (CgroupManager) Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN)); @@ -154,31 +156,22 @@ public class SupervisorData { } } - public ConcurrentHashMap<String, List<ProfileRequest>> getStormIdToProfileActions() { + public AtomicReference<Map<String, List<ProfileRequest>>> getStormIdToProfileActions() { return stormIdToProfileActions; } public void setStormIdToProfileActions(Map<String, List<ProfileRequest>> stormIdToProfileActions) { - this.stormIdToProfileActions.clear(); - this.stormIdToProfileActions.putAll(stormIdToProfileActions); + this.stormIdToProfileActions.set(stormIdToProfileActions); } public Map getConf() { return conf; } - public void setConf(Map conf) { - this.conf = conf; - } - public IContext getSharedContext() { return sharedContext; } - public void setSharedContext(IContext sharedContext) { - this.sharedContext = sharedContext; - } - public boolean isActive() { return active; } @@ -191,107 +184,58 @@ public class SupervisorData { return iSupervisor; } - public void setiSupervisor(ISupervisor iSupervisor) { - this.iSupervisor = iSupervisor; - } - public Utils.UptimeComputer getUpTime() { return upTime; } - public void setUpTime(Utils.UptimeComputer upTime) { - this.upTime = upTime; - } - public String getStormVersion() { return stormVersion; } - public void setStormVersion(String stormVersion) { - this.stormVersion = stormVersion; - } - - public ConcurrentHashMap<String, String> getWorkerThreadPidsAtom() { - return workerThreadPidsAtom; - } - - public void setWorkerThreadPidsAtom(ConcurrentHashMap<String, String> workerThreadPidsAtom) { - this.workerThreadPidsAtom = workerThreadPidsAtom; + public ConcurrentHashMap<String, String> getWorkerThreadPids() { + return workerThreadPids; } public IStormClusterState getStormClusterState() { return stormClusterState; } - public void setStormClusterState(IStormClusterState stormClusterState) { - this.stormClusterState = stormClusterState; - } - public LocalState getLocalState() { return localState; } - public void setLocalState(LocalState localState) { - this.localState = localState; - } - public String getSupervisorId() { return supervisorId; } - public void setSupervisorId(String supervisorId) { - this.supervisorId = supervisorId; - } - public String getAssignmentId() { return assignmentId; } - public void setAssignmentId(String assignmentId) { - this.assignmentId = assignmentId; - } - public String getHostName() { return hostName; } - public void setHostName(String hostName) { - this.hostName = hostName; - } - - public ConcurrentHashMap<Long, LocalAssignment> getCurrAssignment() { + public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() { return currAssignment; } public void setCurrAssignment(Map<Long, LocalAssignment> currAssignment) { - this.currAssignment.clear(); - this.currAssignment.putAll(currAssignment); + this.currAssignment.set(currAssignment); } public StormTimer getHeartbeatTimer() { return heartbeatTimer; } - public void setHeartbeatTimer(StormTimer heartbeatTimer) { - this.heartbeatTimer = heartbeatTimer; - } - public StormTimer getEventTimer() { return eventTimer; } - public void setEventTimer(StormTimer eventTimer) { - this.eventTimer = eventTimer; - } - public StormTimer getBlobUpdateTimer() { return blobUpdateTimer; } - public void setBlobUpdateTimer(StormTimer blobUpdateTimer) { - this.blobUpdateTimer = blobUpdateTimer; - } - public Localizer getLocalizer() { return localizer; } @@ -304,36 +248,20 @@ public class SupervisorData { return syncRetry; } - public void setSyncRetry(AtomicInteger syncRetry) { - this.syncRetry = syncRetry; - } - - public ConcurrentHashMap<String, Map<String, Object>> getAssignmentVersions() { + public AtomicReference<Map<String, Map<String, Object>>> getAssignmentVersions() { return assignmentVersions; } public void setAssignmentVersions(Map<String, Map<String, Object>> assignmentVersions) { - this.assignmentVersions.clear(); - this.assignmentVersions.putAll(assignmentVersions); + this.assignmentVersions.set(assignmentVersions); } public CgroupManager getResourceIsolationManager() { return resourceIsolationManager; } - public void setResourceIsolationManager(CgroupManager resourceIsolationManager) { - this.resourceIsolationManager = resourceIsolationManager; - } - - public Object getDownloadLock() { - return downloadLock; - } - public ConcurrentHashSet getDeadWorkers() { return deadWorkers; } - public void setDeadWorkers(ConcurrentHashSet deadWorkers) { - this.deadWorkers = deadWorkers; - } } http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java index acc2cb8..6578529 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java @@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.Map; -public class SupervisorManger extends ShutdownWork implements SupervisorDaemon, DaemonCommon, Runnable { +public class SupervisorManger implements SupervisorDaemon, DaemonCommon, Runnable { private static final Logger LOG = LoggerFactory.getLogger(SupervisorManger.class); @@ -41,7 +41,6 @@ public class SupervisorManger extends ShutdownWork implements SupervisorDaemon, this.processesEventManager = processesEventManager; } - @Override public void shutdown() { LOG.info("Shutting down supervisor{}", supervisorData.getSupervisorId()); supervisorData.setActive(false); @@ -63,7 +62,7 @@ public class SupervisorManger extends ShutdownWork implements SupervisorDaemon, Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(supervisorData.getConf()); try { for (String workerId : workerIds) { - shutWorker(supervisorData, workerId); + SupervisorUtils.shutWorker(supervisorData, workerId); } } catch (Exception e) { LOG.error("shutWorker failed"); http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java index 9d0b343..dd2a538 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java @@ -20,11 +20,13 @@ package org.apache.storm.daemon.supervisor; import org.apache.commons.lang.StringUtils; import org.apache.curator.utils.PathUtils; import org.apache.storm.Config; +import org.apache.storm.ProcessSimulator; import org.apache.storm.generated.LSWorkerHeartbeat; import org.apache.storm.localizer.LocalResource; import org.apache.storm.localizer.Localizer; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; import org.apache.storm.utils.Utils; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; @@ -68,6 +70,7 @@ public class SupervisorUtils { commands.add(wl); commands.add(user); commands.addAll(args); + LOG.info("Running as user: {} command: {}", user, commands); return Utils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir); } @@ -115,7 +118,7 @@ public class SupervisorUtils { * @param blobInfo * @return */ - public static Boolean isShouldUncompressBlob(Map<String, Object> blobInfo) { + public static Boolean shouldUncompressBlob(Map<String, Object> blobInfo) { return new Boolean((String) blobInfo.get("uncompress")); } @@ -129,7 +132,7 @@ public class SupervisorUtils { List<LocalResource> localResourceList = new ArrayList<>(); if (blobstoreMap != null) { for (Map.Entry<String, Map<String, Object>> map : blobstoreMap.entrySet()) { - LocalResource localResource = new LocalResource(map.getKey(), isShouldUncompressBlob(map.getValue())); + LocalResource localResource = new LocalResource(map.getKey(), shouldUncompressBlob(map.getValue())); localResourceList.add(localResource); } } @@ -169,7 +172,7 @@ public class SupervisorUtils { return Utils.readDirContents(workerRoot); } - public static boolean checkTopoFilesExist(Map conf, String stormId) throws IOException { + public static boolean doRequiredTopoFilesExist(Map conf, String stormId) throws IOException { String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId); String stormjarpath = ConfigUtils.supervisorStormJarPath(stormroot); String stormcodepath = ConfigUtils.supervisorStormCodePath(stormroot); @@ -185,10 +188,6 @@ public class SupervisorUtils { return false; } - public static Collection<String> myWorkerIds(Map conf){ - return Utils.readDirContents(ConfigUtils.workerRoot(conf)); - } - /** * Returns map from worr id to heartbeat * @@ -263,11 +262,95 @@ public class SupervisorUtils { return ret; } - public static List<ACL> supervisorZkAcls() { - List<ACL> acls = new ArrayList<>(); + public final static List<ACL> supervisorZkAcls() { + final List<ACL> acls = new ArrayList<>(); acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0)); acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE)); return acls; } + public static void shutWorker(SupervisorData supervisorData, String workerId) throws IOException, InterruptedException { + LOG.info("Shutting down {}:{}", supervisorData.getSupervisorId(), workerId); + Map conf = supervisorData.getConf(); + Collection<String> pids = Utils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId)); + Integer shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)); + Boolean asUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false); + String user = ConfigUtils.getWorkerUser(conf, workerId); + String threadPid = supervisorData.getWorkerThreadPids().get(workerId); + if (StringUtils.isNotBlank(threadPid)) { + ProcessSimulator.killProcess(threadPid); + } + + for (String pid : pids) { + if (asUser) { + List<String> commands = new ArrayList<>(); + commands.add("signal"); + commands.add(pid); + commands.add("15"); + String logPrefix = "kill -15 " + pid; + SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix); + } else { + Utils.killProcessWithSigTerm(pid); + } + } + + if (pids.size() > 0) { + LOG.info("Sleep {} seconds for execution of cleanup threads on worker.", shutdownSleepSecs); + Time.sleepSecs(shutdownSleepSecs); + } + + for (String pid : pids) { + if (asUser) { + List<String> commands = new ArrayList<>(); + commands.add("signal"); + commands.add(pid); + commands.add("9"); + String logPrefix = "kill -9 " + pid; + SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix); + } else { + Utils.forceKillProcess(pid); + } + String path = ConfigUtils.workerPidPath(conf, workerId, pid); + if (asUser) { + SupervisorUtils.rmrAsUser(conf, workerId, path); + } else { + try { + LOG.debug("Removing path {}", path); + new File(path).delete(); + } catch (Exception e) { + // on windows, the supervisor may still holds the lock on the worker directory + // ignore + } + } + } + tryCleanupWorker(conf, supervisorData, workerId); + LOG.info("Shut down {}:{}", supervisorData.getSupervisorId(), workerId); + + } + + public static void tryCleanupWorker(Map conf, SupervisorData supervisorData, String workerId) { + try { + String workerRoot = ConfigUtils.workerRoot(conf, workerId); + if (Utils.checkFileExists(workerRoot)) { + if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { + SupervisorUtils.rmrAsUser(conf, workerId, workerRoot); + } else { + Utils.forceDelete(ConfigUtils.workerHeartbeatsRoot(conf, workerId)); + Utils.forceDelete(ConfigUtils.workerPidsRoot(conf, workerId)); + Utils.forceDelete(ConfigUtils.workerTmpRoot(conf, workerId)); + Utils.forceDelete(ConfigUtils.workerRoot(conf, workerId)); + } + ConfigUtils.removeWorkerUserWSE(conf, workerId); + supervisorData.getDeadWorkers().remove(workerId); + } + if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)){ + supervisorData.getResourceIsolationManager().releaseResourcesForWorker(workerId); + } + } catch (IOException e) { + LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e); + } catch (RuntimeException e) { + LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e); + } + } + } http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java index 172d223..cf26896 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java @@ -45,7 +45,7 @@ import java.util.*; * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers * launch */ -public class SyncProcessEvent extends ShutdownWork implements Runnable { +public class SyncProcessEvent implements Runnable { private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); @@ -53,6 +53,8 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { private SupervisorData supervisorData; + public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1); + private class ProcessExitCallback implements Utils.ExitCodeCallable { private final String logPrefix; private final String workerId; @@ -113,7 +115,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { Set<Integer> keepPorts = new HashSet<>(); for (Map.Entry<String, StateHeartbeat> entry : localWorkerStats.entrySet()) { StateHeartbeat stateHeartbeat = entry.getValue(); - if (stateHeartbeat.getState() == State.valid) { + if (stateHeartbeat.getState() == State.VALID) { keeperWorkerIds.add(entry.getKey()); keepPorts.add(stateHeartbeat.getHeartbeat().get_port()); } @@ -129,7 +131,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { for (Map.Entry<String, StateHeartbeat> entry : localWorkerStats.entrySet()) { StateHeartbeat stateHeartbeat = entry.getValue(); - if (stateHeartbeat.getState() != State.valid) { + if (stateHeartbeat.getState() != State.VALID) { LOG.info("Shutting down and clearing state for id {}, Current supervisor time: {}, State: {}, Heartbeat: {}", entry.getKey(), now, stateHeartbeat.getState(), stateHeartbeat.getHeartbeat()); shutWorker(supervisorData, entry.getKey()); @@ -180,9 +182,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { } return reassignExecutors; } - - - + /** * Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead * @@ -205,16 +205,16 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { LSWorkerHeartbeat whb = entry.getValue(); State state; if (whb == null) { - state = State.notStarted; + state = State.NOT_STARTED; } else if (!approvedIds.contains(workerId) || !matchesAssignment(whb, assignedExecutors)) { - state = State.disallowed; + state = State.DISALLOWED; } else if (supervisorData.getDeadWorkers().contains(workerId)) { - LOG.info("Worker Process {}as died", workerId); - state = State.timedOut; + LOG.info("Worker Process {} has died", workerId); + state = State.TIMED_OUT; } else if (SupervisorUtils.isWorkerHbTimedOut(now, whb, conf)) { - state = State.timedOut; + state = State.TIMED_OUT; } else { - state = State.valid; + state = State.VALID; } LOG.debug("Worker:{} state:{} WorkerHeartbeat:{} at supervisor time-secs {}", workerId, state, whb, now); workerIdHbstate.put(workerId, new StateHeartbeat(state, whb)); @@ -230,7 +230,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { List<ExecutorInfo> executorInfos = new ArrayList<>(); executorInfos.addAll(whb.get_executors()); // remove SYSTEM_EXECUTOR_ID - executorInfos.remove(new ExecutorInfo(-1, -1)); + executorInfos.remove(SYSTEM_EXECUTOR_INFO); List<ExecutorInfo> localExecuorInfos = localAssignment.get_executors(); if (localExecuorInfos.size() != executorInfos.size()) @@ -518,7 +518,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { WorkerResources resources = assignment.get_resources(); // This condition checks for required files exist before launching the worker - if (SupervisorUtils.checkTopoFilesExist(conf, stormId)) { + if (SupervisorUtils.doRequiredTopoFilesExist(conf, stormId)) { String pidsPath = ConfigUtils.workerPidsRoot(conf, workerId); String hbPath = ConfigUtils.workerHeartbeatsRoot(conf, workerId); @@ -666,4 +666,9 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { Utils.createSymlink(workerRoot, stormRoot, fileName, fileName); } } + + //for supervisor-test + public void shutWorker(SupervisorData supervisorData, String workerId) throws IOException, InterruptedException{ + SupervisorUtils.shutWorker(supervisorData, workerId); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java index 29aad12..e96395f 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java @@ -75,7 +75,7 @@ public class SyncSupervisorEvent implements Runnable { Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager); List<String> stormIds = stormClusterState.assignments(syncCallback); Map<String, Map<String, Object>> assignmentsSnapshot = - getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions(), syncCallback); + getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback); Map<String, List<ProfileRequest>> stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds); Set<String> allDownloadedTopologyIds = SupervisorUtils.readDownLoadedStormIds(conf); @@ -191,7 +191,7 @@ public class SyncSupervisorEvent implements Runnable { for (Map.Entry<String, StateHeartbeat> entry : workerIdHbstate.entrySet()) { String workerId = entry.getKey(); StateHeartbeat stateHeartbeat = entry.getValue(); - if (stateHeartbeat != null && stateHeartbeat.getState() == State.valid) { + if (stateHeartbeat != null && stateHeartbeat.getState() == State.VALID) { vaildPortToWorkerIds.put(stateHeartbeat.getHeartbeat().get_port(), workerId); } } @@ -277,7 +277,7 @@ public class SyncSupervisorEvent implements Runnable { for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) { String key = entry.getKey(); Map<String, Object> blobInfo = entry.getValue(); - localizer.removeBlobReference(key, user, topoName, SupervisorUtils.isShouldUncompressBlob(blobInfo)); + localizer.removeBlobReference(key, user, topoName, SupervisorUtils.shouldUncompressBlob(blobInfo)); } } } @@ -312,7 +312,7 @@ public class SyncSupervisorEvent implements Runnable { Set<String> srashStormIds = new HashSet<>(); for (String stormId : allDownloadedTopologyIds) { if (assignedStormIds.contains(stormId)) { - if (!SupervisorUtils.checkTopoFilesExist(conf, stormId)) { + if (!SupervisorUtils.doRequiredTopoFilesExist(conf, stormId)) { LOG.debug("Files not present in topology directory"); rmTopoFiles(conf, stormId, localizer, false); srashStormIds.add(stormId); @@ -357,7 +357,12 @@ public class SyncSupervisorEvent implements Runnable { blobStore.shutdown(); } - FileUtils.moveDirectory(new File(tmproot), new File(stormroot)); + try { + FileUtils.moveDirectory(new File(tmproot), new File(stormroot)); + }catch (Exception e){ + ; + } + SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot); ClassLoader classloader = Thread.currentThread().getContextClassLoader(); @@ -627,7 +632,7 @@ public class SyncSupervisorEvent implements Runnable { for (Map.Entry<String, StateHeartbeat> entry : workerIdHbstate.entrySet()){ String workerId = entry.getKey(); StateHeartbeat stateHeartbeat = entry.getValue(); - if (stateHeartbeat.getState() == State.disallowed){ + if (stateHeartbeat.getState() == State.DISALLOWED){ syncProcesses.shutWorker(supervisorData, workerId); LOG.debug("{}'s state disallowed, so shutdown this worker"); } http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java index 91044cc..d39a679 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java @@ -84,7 +84,7 @@ public class RunProfilerActions implements Runnable { @Override public void run() { - Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfileActions(); + Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfileActions().get(); try { for (Map.Entry<String, List<ProfileRequest>> entry : stormIdToActions.entrySet()) { String stormId = entry.getKey(); http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java index 36ee6b6..49f48ef 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.Map; -public class SupervisorHealthCheck extends ShutdownWork implements Runnable { +public class SupervisorHealthCheck implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(SupervisorHealthCheck.class); @@ -47,7 +47,7 @@ public class SupervisorHealthCheck extends ShutdownWork implements Runnable { if (healthCode != 0) { for (String workerId : workerIds) { try { - shutWorker(supervisorData, workerId); + SupervisorUtils.shutWorker(supervisorData, workerId); } catch (Exception e) { throw Utils.wrapInRuntime(e); } http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java index e158dbc..4137e94 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java @@ -31,12 +31,10 @@ import java.util.Map; public class SupervisorHeartbeat implements Runnable { - private IStormClusterState stormClusterState; - private String supervisorId; - private Map conf; - private SupervisorInfo supervisorInfo; - - private SupervisorData supervisorData; + private final IStormClusterState stormClusterState; + private final String supervisorId; + private final Map conf; + private final SupervisorData supervisorData; public SupervisorHeartbeat(Map conf, SupervisorData supervisorData) { this.stormClusterState = supervisorData.getStormClusterState(); @@ -46,13 +44,13 @@ public class SupervisorHeartbeat implements Runnable { } private SupervisorInfo update(Map conf, SupervisorData supervisorData) { - supervisorInfo = new SupervisorInfo(); + SupervisorInfo supervisorInfo = new SupervisorInfo(); supervisorInfo.set_time_secs(Time.currentTimeSecs()); supervisorInfo.set_hostname(supervisorData.getHostName()); supervisorInfo.set_assignment_id(supervisorData.getAssignmentId()); List<Long> usedPorts = new ArrayList<>(); - usedPorts.addAll(supervisorData.getCurrAssignment().keySet()); + usedPorts.addAll(supervisorData.getCurrAssignment().get().keySet()); supervisorInfo.set_used_ports(usedPorts); List metaDatas = (List)supervisorData.getiSupervisor().getMetadata(); List<Long> portList = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/storm/blob/465a4b89/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java index 623afa5..ebb1d5f 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; /** * downloads all blobs listed in the topology configuration for all topologies assigned to this supervisor, and creates version files with a suffix. The @@ -58,9 +59,9 @@ public class UpdateBlobs implements Runnable { try { Map conf = supervisorData.getConf(); Set<String> downloadedStormIds = SupervisorUtils.readDownLoadedStormIds(conf); - ConcurrentHashMap<Long, LocalAssignment> newAssignment = supervisorData.getCurrAssignment(); + AtomicReference<Map<Long, LocalAssignment>> newAssignment = supervisorData.getCurrAssignment(); Set<String> assignedStormIds = new HashSet<>(); - for (LocalAssignment localAssignment : newAssignment.values()) { + for (LocalAssignment localAssignment : newAssignment.get().values()) { assignedStormIds.add(localAssignment.get_topology_id()); } for (String stormId : downloadedStormIds) {
