add the plugin to use for manager worker
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a1e47352 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a1e47352 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a1e47352 Branch: refs/heads/master Commit: a1e473526b5d9074ae1f9ff98162ddc78e426a73 Parents: cc95d4f Author: xiaojian.fxj <[email protected]> Authored: Mon Mar 14 16:54:36 2016 +0800 Committer: xiaojian.fxj <[email protected]> Committed: Mon Mar 14 18:55:57 2016 +0800 ---------------------------------------------------------------------- conf/defaults.yaml | 4 + .../org/apache/storm/command/kill_workers.clj | 11 +- .../apache/storm/daemon/local_supervisor.clj | 16 +- storm-core/src/clj/org/apache/storm/testing.clj | 16 +- storm-core/src/jvm/org/apache/storm/Config.java | 7 + .../storm/daemon/supervisor/DaemonCommon.java | 22 - .../daemon/supervisor/StandaloneSupervisor.java | 1 - .../storm/daemon/supervisor/Supervisor.java | 14 +- .../storm/daemon/supervisor/SupervisorData.java | 24 +- .../daemon/supervisor/SupervisorManager.java | 103 +++++ .../daemon/supervisor/SupervisorManger.java | 97 ----- .../daemon/supervisor/SupervisorUtils.java | 105 +---- .../daemon/supervisor/SyncProcessEvent.java | 274 +------------ .../daemon/supervisor/SyncSupervisorEvent.java | 16 +- .../supervisor/timer/RunProfilerActions.java | 2 +- .../supervisor/timer/SupervisorHealthCheck.java | 8 +- .../workermanager/DefaultWorkerManager.java | 397 +++++++++++++++++++ .../workermanager/IWorkerManager.java | 38 ++ .../supervisor/workermanager/IWorkerResult.java | 21 + .../clj/org/apache/storm/supervisor_test.clj | 84 ++-- 20 files changed, 706 insertions(+), 554 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 9817161..da25ef8 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -289,6 +289,10 @@ storm.daemon.metrics.reporter.plugins: storm.resource.isolation.plugin: "org.apache.storm.container.cgroup.CgroupManager" storm.resource.isolation.plugin.enable: false + +# Default plugin to use for manager worker +storm.supervisor.worker.manager.plugin: org.apache.storm.daemon.supervisor.workermanager.DefaultWorkerManager + # Configs for CGroup support storm.cgroup.hierarchy.dir: "/cgroup/storm_resources" storm.cgroup.resources: http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/clj/org/apache/storm/command/kill_workers.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/kill_workers.clj b/storm-core/src/clj/org/apache/storm/command/kill_workers.clj index aadc9fd..08de3ed 100644 --- a/storm-core/src/clj/org/apache/storm/command/kill_workers.clj +++ b/storm-core/src/clj/org/apache/storm/command/kill_workers.clj @@ -28,6 +28,13 @@ conf (assoc conf STORM-LOCAL-DIR (. (File. (conf STORM-LOCAL-DIR)) getCanonicalPath)) isupervisor (StandaloneSupervisor.) supervisor-data (SupervisorData. conf nil isupervisor) - ids (SupervisorUtils/supervisorWorkerIds conf)] + worker-manager (.getWorkerManager supervisor-data) + ids (SupervisorUtils/supervisorWorkerIds conf) + supervisor-id (.getSupervisorId supervisor-data) + worker-pids (.getWorkerThreadPids supervisor-data) + dead-workers (.getDeadWorkers supervisor-data)] (doseq [id ids] - (SupervisorUtils/shutWorker supervisor-data id)))) + (.shutdownWorker worker-manager supervisor-id id worker-pids) + (if (.cleanupWorker worker-manager id) + (.remove dead-workers id)) + ))) http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj index c8ae2d6..b28ae08 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj @@ -36,17 +36,21 @@ (ProcessSimulator/registerProcess pid worker) (.put (.getWorkerThreadPids supervisorData) workerId pid) )) - -(defn shutdown-local-worker [supervisorData workerId] - (log-message "shutdown-local-worker") - (SupervisorUtils/shutWorker supervisorData workerId)) +(defn shutdown-local-worker [supervisorData worker-manager workerId] + (log-message "shutdown-local-worker") + (let [supervisor-id (.getSupervisorId supervisorData) + worker-pids (.getWorkerThreadPids supervisorData) + dead-workers (.getDeadWorkers supervisorData)] + (.shutdownWorker worker-manager supervisor-id workerId worker-pids) + (if (.cleanupWorker worker-manager workerId) + (.remove dead-workers workerId)))) (defn local-process [] "Create a local process event" (proxy [SyncProcessEvent] [] - (launchWorker [supervisorData stormId port workerId resources] + (launchLocalWorker [supervisorData stormId port workerId resources] (launch-local-worker supervisorData stormId port workerId resources)) - (shutWorker [supervisorData workerId] (shutdown-local-worker supervisorData workerId)))) + (shutWorker [supervisorData worker-manager workerId] (shutdown-local-worker supervisorData worker-manager workerId)))) (defserverfn mk-local-supervisor [conf shared-context isupervisor] http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/clj/org/apache/storm/testing.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj index 7804747..5000fd3 100644 --- a/storm-core/src/clj/org/apache/storm/testing.clj +++ b/storm-core/src/clj/org/apache/storm/testing.clj @@ -25,7 +25,7 @@ [org.apache.storm.utils] [org.apache.storm.zookeeper Zookeeper] [org.apache.storm ProcessSimulator] - [org.apache.storm.daemon.supervisor StandaloneSupervisor SupervisorData SupervisorManger SupervisorUtils]) + [org.apache.storm.daemon.supervisor StandaloneSupervisor SupervisorData SupervisorManager SupervisorUtils SupervisorManager]) (:import [java.io File]) (:import [java.util HashMap ArrayList]) (:import [java.util.concurrent.atomic AtomicInteger]) @@ -137,7 +137,8 @@ supervisor-conf (merge (:daemon-conf cluster-map) conf {STORM-LOCAL-DIR tmp-dir - SUPERVISOR-SLOTS-PORTS port-ids}) + SUPERVISOR-SLOTS-PORTS port-ids + STORM-SUPERVISOR-WORKER-MANAGER-PLUGIN "org.apache.storm.daemon.supervisor.workermanager.DefaultWorkerManager"}) id-fn (if id id (Utils/uuid)) isupervisor (proxy [StandaloneSupervisor] [] (generateSupervisorId [] id-fn)) @@ -282,7 +283,7 @@ ([timeout-ms apredicate] (while-timeout timeout-ms (not (apredicate)) (Time/sleep 100)))) -(defn is-supervisor-waiting [^SupervisorManger supervisor] +(defn is-supervisor-waiting [^SupervisorManager supervisor] (.isWaiting supervisor)) (defn wait-until-cluster-waiting @@ -415,15 +416,18 @@ (defn mk-capture-shutdown-fn [capture-atom] - (fn [supervisorData workerId] + (fn [supervisorData worker-manager workerId] (let [conf (.getConf supervisorData) supervisor-id (.getSupervisorId supervisorData) port (find-worker-port conf workerId) + worker-pids (.getWorkerThreadPids supervisorData) + dead-workers (.getDeadWorkers supervisorData) existing (get @capture-atom [supervisor-id port] 0)] (log-message "mk-capture-shutdown-fn") (swap! capture-atom assoc [supervisor-id port] (inc existing)) - (SupervisorUtils/shutWorker supervisorData workerId)))) - + (.shutdownWorker worker-manager supervisor-id workerId worker-pids) + (if (.cleanupWorker worker-manager workerId) + (.remove dead-workers workerId))))) (defmacro capture-changed-workers [& body] `(let [launch-captured# (atom {}) http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java index 6ea8b0f..103e585 100644 --- a/storm-core/src/jvm/org/apache/storm/Config.java +++ b/storm-core/src/jvm/org/apache/storm/Config.java @@ -18,6 +18,7 @@ package org.apache.storm; import org.apache.storm.container.ResourceIsolationInterface; +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy; import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy; import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy; @@ -2212,6 +2213,12 @@ public class Config extends HashMap<String, Object> { public static final Object STORM_RESOURCE_ISOLATION_PLUGIN = "storm.resource.isolation.plugin"; /** + * The plugin to be used for manager worker + */ + @isImplementationOfClass(implementsClass = IWorkerManager.class) + public static final Object STORM_SUPERVISOR_WORKER_MANAGER_PLUGIN = "storm.supervisor.worker.manager.plugin"; + + /** * CGroup Setting below */ http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DaemonCommon.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DaemonCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DaemonCommon.java deleted file mode 100644 index 3b7a18e..0000000 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DaemonCommon.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.daemon.supervisor; - -public interface DaemonCommon { - boolean isWaiting(); -} http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java index 4947c6f..a1fa798 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java @@ -57,7 +57,6 @@ public class StandaloneSupervisor implements ISupervisor { } @Override - // @return is vector which need be converted to be int public Object getMetadata() { Object ports = conf.get(Config.SUPERVISOR_SLOTS_PORTS); return ports; http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java index 6124aef..1dd44a9 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java @@ -61,8 +61,8 @@ public class Supervisor { * @return * @throws Exception */ - public SupervisorManger mkSupervisor(final Map conf, IContext sharedContext, ISupervisor iSupervisor) throws Exception { - SupervisorManger supervisorManger = null; + public SupervisorManager mkSupervisor(final Map conf, IContext sharedContext, ISupervisor iSupervisor) throws Exception { + SupervisorManager supervisorManager = null; try { LOG.info("Starting Supervisor with conf {}", conf); iSupervisor.prepare(conf, ConfigUtils.supervisorIsupervisorDir(conf)); @@ -78,8 +78,8 @@ public class Supervisor { Integer heartbeatFrequency = Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS)); supervisorData.getHeartbeatTimer().scheduleRecurring(0, heartbeatFrequency, hb); - Set<String> downdedStormId = SupervisorUtils.readDownLoadedStormIds(conf); - for (String stormId : downdedStormId) { + Set<String> downloadedStormIds = SupervisorUtils.readDownLoadedStormIds(conf); + for (String stormId : downloadedStormIds) { SupervisorUtils.addBlobReferences(localizer, stormId, conf); } // do this after adding the references so we don't try to clean things being used @@ -119,7 +119,7 @@ public class Supervisor { eventTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(runProfilerActionThread, syncSupEventManager)); } LOG.info("Starting supervisor with id {} at host {}.", supervisorData.getSupervisorId(), supervisorData.getHostName()); - supervisorManger = new SupervisorManger(supervisorData, syncSupEventManager, syncProcessManager); + supervisorManager = new SupervisorManager(supervisorData, syncSupEventManager, syncProcessManager); } catch (Throwable t) { if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, t)) { throw t; @@ -130,7 +130,7 @@ public class Supervisor { Utils.exitProcess(13, "Error on initialization"); } } - return supervisorManger; + return supervisorManager; } /** @@ -138,7 +138,7 @@ public class Supervisor { */ private void launch(ISupervisor iSupervisor) { LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion()); - SupervisorManger supervisorManager; + SupervisorManager supervisorManager; try { Map<Object, Object> conf = Utils.readStormConfig(); if (ConfigUtils.isLocalMode(conf)) { http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java index 8c17edc..213457d 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java @@ -23,7 +23,7 @@ import org.apache.storm.cluster.ClusterStateContext; import org.apache.storm.cluster.ClusterUtils; import org.apache.storm.cluster.DaemonType; import org.apache.storm.cluster.IStormClusterState; -import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; import org.apache.storm.generated.LocalAssignment; import org.apache.storm.generated.ProfileRequest; import org.apache.storm.localizer.Localizer; @@ -73,8 +73,8 @@ public class SupervisorData { private AtomicInteger syncRetry; private final Object downloadLock = new Object(); private AtomicReference<Map<String, List<ProfileRequest>>> stormIdToProfileActions; - private CgroupManager resourceIsolationManager; private ConcurrentHashSet<String> deadWorkers; + private final IWorkerManager workerManager; public SupervisorData(Map conf, IContext sharedContext, ISupervisor iSupervisor) { this.conf = conf; @@ -124,17 +124,8 @@ public class SupervisorData { this.assignmentVersions = new AtomicReference<Map<String, Map<String, Object>>>(new HashMap<String, Map<String, Object>>()); this.syncRetry = new AtomicInteger(0); this.stormIdToProfileActions = new AtomicReference<Map<String, List<ProfileRequest>>>(new HashMap<String, List<ProfileRequest>>()); - if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) { - try { - this.resourceIsolationManager = (CgroupManager) Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN)); - this.resourceIsolationManager.prepare(conf); - LOG.info("Using resource isolation plugin {} {}", conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager); - } catch (IOException e) { - throw Utils.wrapInRuntime(e); - } - } else { - this.resourceIsolationManager = null; - } + this.workerManager = Utils.newInstance((String) conf.get(Config.STORM_SUPERVISOR_WORKER_MANAGER_PLUGIN)); + this.workerManager.prepareWorker(conf, localizer); } public AtomicReference<Map<String, List<ProfileRequest>>> getStormIdToProfileActions() { @@ -233,12 +224,11 @@ public class SupervisorData { this.assignmentVersions.set(assignmentVersions); } - public CgroupManager getResourceIsolationManager() { - return resourceIsolationManager; - } - public ConcurrentHashSet getDeadWorkers() { return deadWorkers; } + public IWorkerManager getWorkerManager() { + return workerManager; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java new file mode 100644 index 0000000..d593d3c --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManager.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.daemon.DaemonCommon; +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; +import org.apache.storm.event.EventManager; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Map; + +public class SupervisorManager implements SupervisorDaemon, DaemonCommon, Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(SupervisorManager.class); + private final EventManager eventManager; + private final EventManager processesEventManager; + private SupervisorData supervisorData; + + public SupervisorManager(SupervisorData supervisorData, EventManager eventManager, EventManager processesEventManager) { + this.eventManager = eventManager; + this.supervisorData = supervisorData; + this.processesEventManager = processesEventManager; + } + + public void shutdown() { + LOG.info("Shutting down supervisor{}", supervisorData.getSupervisorId()); + supervisorData.setActive(false); + try { + supervisorData.getHeartbeatTimer().close(); + supervisorData.getEventTimer().close(); + supervisorData.getBlobUpdateTimer().close(); + eventManager.close(); + processesEventManager.close(); + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } + supervisorData.getStormClusterState().disconnect(); + } + + @Override + public void shutdownAllWorkers() { + Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(supervisorData.getConf()); + IWorkerManager workerManager = supervisorData.getWorkerManager(); + try { + for (String workerId : workerIds) { + workerManager.shutdownWorker(supervisorData.getSupervisorId(), workerId, supervisorData.getWorkerThreadPids()); + boolean success = workerManager.cleanupWorker(workerId); + if (success){ + supervisorData.getDeadWorkers().remove(workerId); + } + } + } catch (Exception e) { + LOG.error("shutWorker failed"); + throw Utils.wrapInRuntime(e); + } + } + + @Override + public Map getConf() { + return supervisorData.getConf(); + } + + @Override + public String getId() { + return supervisorData.getSupervisorId(); + } + + @Override + public boolean isWaiting() { + if (!supervisorData.isActive()) { + return true; + } + + if (supervisorData.getHeartbeatTimer().isTimerWaiting() && supervisorData.getEventTimer().isTimerWaiting() && eventManager.waiting() + && processesEventManager.waiting()) { + return true; + } + return false; + } + + public void run() { + shutdown(); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java deleted file mode 100644 index 26f0aae..0000000 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.daemon.supervisor; - -import org.apache.storm.event.EventManager; -import org.apache.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import java.util.Map; - -public class SupervisorManger implements SupervisorDaemon, DaemonCommon, Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(SupervisorManger.class); - private final EventManager eventManager; - private final EventManager processesEventManager; - private SupervisorData supervisorData; - - public SupervisorManger(SupervisorData supervisorData, EventManager eventManager, EventManager processesEventManager) { - this.eventManager = eventManager; - this.supervisorData = supervisorData; - this.processesEventManager = processesEventManager; - } - - public void shutdown() { - LOG.info("Shutting down supervisor{}", supervisorData.getSupervisorId()); - supervisorData.setActive(false); - try { - supervisorData.getHeartbeatTimer().close(); - supervisorData.getEventTimer().close(); - supervisorData.getBlobUpdateTimer().close(); - eventManager.close(); - processesEventManager.close(); - } catch (Exception e) { - throw Utils.wrapInRuntime(e); - } - supervisorData.getStormClusterState().disconnect(); - } - - @Override - public void shutdownAllWorkers() { - - Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(supervisorData.getConf()); - try { - for (String workerId : workerIds) { - SupervisorUtils.shutWorker(supervisorData, workerId); - } - } catch (Exception e) { - LOG.error("shutWorker failed"); - throw Utils.wrapInRuntime(e); - } - } - - @Override - public Map getConf() { - return supervisorData.getConf(); - } - - @Override - public String getId() { - return supervisorData.getSupervisorId(); - } - - @Override - public boolean isWaiting() { - if (!supervisorData.isActive()) { - return true; - } - - if (supervisorData.getHeartbeatTimer().isTimerWaiting() && supervisorData.getEventTimer().isTimerWaiting() && eventManager.waiting() - && processesEventManager.waiting()) { - return true; - } - return false; - } - - public void run() { - shutdown(); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java index ae3422e..bb2525a 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java @@ -50,10 +50,10 @@ public class SupervisorUtils { _instance = INSTANCE; } - public static Process workerLauncher(Map conf, String user, List<String> args, Map<String, String> environment, final String logPreFix, - final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException { + public static Process processLauncher(Map conf, String user, List<String> args, Map<String, String> environment, final String logPreFix, + final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException { if (StringUtils.isBlank(user)) { - throw new IllegalArgumentException("User cannot be blank when calling workerLauncher."); + throw new IllegalArgumentException("User cannot be blank when calling processLauncher."); } String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER)); String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); @@ -71,10 +71,10 @@ public class SupervisorUtils { return Utils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir); } - public static int workerLauncherAndWait(Map conf, String user, List<String> args, final Map<String, String> environment, final String logPreFix) + public static int processLauncherAndWait(Map conf, String user, List<String> args, final Map<String, String> environment, final String logPreFix) throws IOException { int ret = 0; - Process process = workerLauncher(conf, user, args, environment, logPreFix, null, null); + Process process = processLauncher(conf, user, args, environment, logPreFix, null, null); if (StringUtils.isNotBlank(logPreFix)) Utils.readAndLogStream(logPreFix, process.getInputStream()); try { @@ -92,7 +92,7 @@ public class SupervisorUtils { List<String> commands = new ArrayList<>(); commands.add("code-dir"); commands.add(dir); - workerLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix); + processLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix); } } @@ -102,7 +102,7 @@ public class SupervisorUtils { List<String> commands = new ArrayList<>(); commands.add("rmr"); commands.add(path); - SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPreFix); + SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPreFix); if (Utils.checkFileExists(path)) { throw new RuntimeException(path + " was not deleted."); } @@ -116,11 +116,11 @@ public class SupervisorUtils { * @return */ public static Boolean shouldUncompressBlob(Map<String, Object> blobInfo) { - return new Boolean((String) blobInfo.get("uncompress")); + return Utils.getBoolean(blobInfo.get("uncompress"), false); } /** - * Remove a reference to a blob when its no longer needed + * Returns a list of LocalResources based on the blobstore-map passed in * * @param blobstoreMap * @return @@ -186,7 +186,7 @@ public class SupervisorUtils { } /** - * Returns map from worr id to heartbeat + * map from worker id to heartbeat * * @param conf * @return @@ -265,89 +265,4 @@ public class SupervisorUtils { acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE)); return acls; } - - public static void shutWorker(SupervisorData supervisorData, String workerId) throws IOException, InterruptedException { - LOG.info("Shutting down {}:{}", supervisorData.getSupervisorId(), workerId); - Map conf = supervisorData.getConf(); - Collection<String> pids = Utils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId)); - Integer shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)); - Boolean asUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false); - String user = ConfigUtils.getWorkerUser(conf, workerId); - String threadPid = supervisorData.getWorkerThreadPids().get(workerId); - if (StringUtils.isNotBlank(threadPid)) { - ProcessSimulator.killProcess(threadPid); - } - - for (String pid : pids) { - if (asUser) { - List<String> commands = new ArrayList<>(); - commands.add("signal"); - commands.add(pid); - commands.add("15"); - String logPrefix = "kill -15 " + pid; - SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix); - } else { - Utils.killProcessWithSigTerm(pid); - } - } - - if (pids.size() > 0) { - LOG.info("Sleep {} seconds for execution of cleanup threads on worker.", shutdownSleepSecs); - Time.sleepSecs(shutdownSleepSecs); - } - - for (String pid : pids) { - if (asUser) { - List<String> commands = new ArrayList<>(); - commands.add("signal"); - commands.add(pid); - commands.add("9"); - String logPrefix = "kill -9 " + pid; - SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix); - } else { - Utils.forceKillProcess(pid); - } - String path = ConfigUtils.workerPidPath(conf, workerId, pid); - if (asUser) { - SupervisorUtils.rmrAsUser(conf, workerId, path); - } else { - try { - LOG.debug("Removing path {}", path); - new File(path).delete(); - } catch (Exception e) { - // on windows, the supervisor may still holds the lock on the worker directory - // ignore - } - } - } - tryCleanupWorker(conf, supervisorData, workerId); - LOG.info("Shut down {}:{}", supervisorData.getSupervisorId(), workerId); - - } - - public static void tryCleanupWorker(Map conf, SupervisorData supervisorData, String workerId) { - try { - String workerRoot = ConfigUtils.workerRoot(conf, workerId); - if (Utils.checkFileExists(workerRoot)) { - if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { - SupervisorUtils.rmrAsUser(conf, workerId, workerRoot); - } else { - Utils.forceDelete(ConfigUtils.workerHeartbeatsRoot(conf, workerId)); - Utils.forceDelete(ConfigUtils.workerPidsRoot(conf, workerId)); - Utils.forceDelete(ConfigUtils.workerTmpRoot(conf, workerId)); - Utils.forceDelete(ConfigUtils.workerRoot(conf, workerId)); - } - ConfigUtils.removeWorkerUserWSE(conf, workerId); - supervisorData.getDeadWorkers().remove(workerId); - } - if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)){ - supervisorData.getResourceIsolationManager().releaseResourcesForWorker(workerId); - } - } catch (IOException e) { - LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e); - } catch (RuntimeException e) { - LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e); - } - } - } http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java index 068c442..41fa01d 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java @@ -21,6 +21,7 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.apache.storm.Config; import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; import org.apache.storm.generated.ExecutorInfo; import org.apache.storm.generated.LSWorkerHeartbeat; import org.apache.storm.generated.LocalAssignment; @@ -88,13 +89,6 @@ public class SyncProcessEvent implements Runnable { this.localState = supervisorData.getLocalState(); } - - /** - * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - - * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new - * worker ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait - * for workers launch - */ @Override public void run() { LOG.debug("Syncing processes"); @@ -132,7 +126,7 @@ public class SyncProcessEvent implements Runnable { if (stateHeartbeat.getState() != State.VALID) { LOG.info("Shutting down and clearing state for id {}, Current supervisor time: {}, State: {}, Heartbeat: {}", entry.getKey(), now, stateHeartbeat.getState(), stateHeartbeat.getHeartbeat()); - shutWorker(supervisorData, entry.getKey()); + shutWorker(supervisorData, supervisorData.getWorkerManager(), entry.getKey()); } } // start new workers @@ -244,261 +238,24 @@ public class SyncProcessEvent implements Runnable { /** * launch a worker in local mode. */ - protected void launchWorker(SupervisorData supervisorData, String stormId, Long port, String workerId, WorkerResources resources) throws IOException { + protected void launchLocalWorker(SupervisorData supervisorData, String stormId, Long port, String workerId, WorkerResources resources) throws IOException { // port this function after porting worker to java } - protected String getWorkerClassPath(String stormJar, Map stormConf) { - List<String> topoClasspath = new ArrayList<>(); - Object object = stormConf.get(Config.TOPOLOGY_CLASSPATH); - - if (object instanceof List) { - topoClasspath.addAll((List<String>) object); - } else if (object instanceof String){ - topoClasspath.add((String)object); - }else { - //ignore - } - String classPath = Utils.workerClasspath(); - String classAddPath = Utils.addToClasspath(classPath, Arrays.asList(stormJar)); - return Utils.addToClasspath(classAddPath, topoClasspath); - } - - /** - * "Generates runtime childopts by replacing keys with topology-id, worker-id, port, mem-onheap" - * - * @param value - * @param workerId - * @param stormId - * @param port - * @param memOnheap - */ - public List<String> substituteChildopts(Object value, String workerId, String stormId, Long port, int memOnheap) { - List<String> rets = new ArrayList<>(); - if (value instanceof String) { - String string = (String) value; - string = string.replace("%ID%", String.valueOf(port)); - string = string.replace("%WORKER-ID%", workerId); - string = string.replace("%TOPOLOGY-ID%", stormId); - string = string.replace("%WORKER-PORT%", String.valueOf(port)); - string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap)); - String[] strings = string.split("\\s+"); - rets.addAll(Arrays.asList(strings)); - } else if (value instanceof List) { - List<Object> objects = (List<Object>) value; - for (Object object : objects) { - String str = (String)object; - str = str.replace("%ID%", String.valueOf(port)); - str = str.replace("%WORKER-ID%", workerId); - str = str.replace("%TOPOLOGY-ID%", stormId); - str = str.replace("%WORKER-PORT%", String.valueOf(port)); - str = str.replace("%HEAP-MEM%", String.valueOf(memOnheap)); - rets.add(str); - } - } - return rets; - } - - - - /** - * launch a worker in distributed mode - * supervisorId for testing - * @throws IOException - */ - protected void launchWorker(Map conf, String supervisorId, String assignmentId, String stormId, Long port, String workerId, - WorkerResources resources, CgroupManager cgroupManager, ConcurrentHashSet deadWorkers) throws IOException { - - Boolean runWorkerAsUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false); - String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); - String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options")); - String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file")); - String workerTmpDir = ConfigUtils.workerTmpRoot(conf, workerId); - - String stormLogDir = ConfigUtils.getLogDir(); - String stormLogConfDir = (String) (conf.get(Config.STORM_LOG4J2_CONF_DIR)); - - String stormLog4j2ConfDir; - if (StringUtils.isNotBlank(stormLogConfDir)) { - if (Utils.isAbsolutePath(stormLogConfDir)) { - stormLog4j2ConfDir = stormLogConfDir; - } else { - stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + stormLogConfDir; - } - } else { - stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2"; - } - - String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId); - - String jlp = jlp(stormRoot, conf); - - String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot); - + protected void launchDistributedWorker(IWorkerManager workerManager, Map conf, String supervisorId, String assignmentId, String stormId, Long port, String workerId, + WorkerResources resources, ConcurrentHashSet deadWorkers) throws IOException { Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); - - String workerClassPath = getWorkerClassPath(stormJar, stormConf); - - Object topGcOptsObject = stormConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS); - List<String> topGcOpts = new ArrayList<>(); - if (topGcOptsObject instanceof String) { - topGcOpts.add((String) topGcOptsObject); - } else if (topGcOptsObject instanceof List) { - topGcOpts.addAll((List<String>) topGcOptsObject); - } - - int memOnheap = 0; - if (resources.get_mem_on_heap() > 0) { - memOnheap = (int) Math.ceil(resources.get_mem_on_heap()); - } else { - //set the default heap memory size for supervisor-test - memOnheap = Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB), 768); - } - - int memoffheap = (int) Math.ceil(resources.get_mem_off_heap()); - - int cpu = (int) Math.ceil(resources.get_cpu()); - - List<String> gcOpts = null; - - if (topGcOpts != null) { - gcOpts = substituteChildopts(topGcOpts, workerId, stormId, port, memOnheap); - } else { - gcOpts = substituteChildopts(conf.get(Config.WORKER_GC_CHILDOPTS), workerId, stormId, port, memOnheap); - } - - Object topoWorkerLogwriterObject = stormConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS); - List<String> topoWorkerLogwriterChildopts = new ArrayList<>(); - if (topoWorkerLogwriterObject instanceof String) { - topoWorkerLogwriterChildopts.add((String) topoWorkerLogwriterObject); - } else if (topoWorkerLogwriterObject instanceof List) { - topoWorkerLogwriterChildopts.addAll((List<String>) topoWorkerLogwriterObject); - } - String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); - - String logfileName = "worker.log"; - - String workersArtifacets = ConfigUtils.workerArtifactsRoot(conf); - - String loggingSensitivity = (String) stormConf.get(Config.TOPOLOGY_LOGGING_SENSITIVITY); - if (loggingSensitivity == null) { - loggingSensitivity = "S3"; - } - - List<String> workerChildopts = substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), workerId, stormId, port, memOnheap); - - List<String> topWorkerChildopts = substituteChildopts(stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), workerId, stormId, port, memOnheap); - - List<String> workerProfilerChildopts = null; - if (Utils.getBoolean(conf.get(Config.WORKER_PROFILER_ENABLED), false)) { - workerProfilerChildopts = substituteChildopts(conf.get(Config.WORKER_PROFILER_CHILDOPTS), workerId, stormId, port, memOnheap); - }else { - workerProfilerChildopts = new ArrayList<>(); - } - - Map<String, String> topEnvironment = new HashMap<String, String>(); - Map<String, String> environment = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT); - if (environment != null) { - topEnvironment.putAll(environment); - } - topEnvironment.put("LD_LIBRARY_PATH", jlp); - - String log4jConfigurationFile = null; - if (System.getProperty("os.name").startsWith("Windows") && !stormLog4j2ConfDir.startsWith("file:")) { - log4jConfigurationFile = "file:///" + stormLog4j2ConfDir; - } else { - log4jConfigurationFile = stormLog4j2ConfDir; - } - log4jConfigurationFile = log4jConfigurationFile + Utils.FILE_PATH_SEPARATOR + "worker.xml"; - - List<String> commandList = new ArrayList<>(); - commandList.add(SupervisorUtils.javaCmd("java")); - commandList.add("-cp"); - commandList.add(workerClassPath); - commandList.addAll(topoWorkerLogwriterChildopts); - commandList.add("-Dlogfile.name=" + logfileName); - commandList.add("-Dstorm.home=" + stormHome); - commandList.add("-Dworkers.artifacts=" + workersArtifacets); - commandList.add("-Dstorm.id=" + stormId); - commandList.add("-Dworker.id=" + workerId); - commandList.add("-Dworker.port=" + port); - commandList.add("-Dstorm.log.dir=" + stormLogDir); - commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile); - commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"); - commandList.add("org.apache.storm.LogWriter"); - - commandList.add(SupervisorUtils.javaCmd("java")); - commandList.add("-server"); - commandList.addAll(workerChildopts); - commandList.addAll(topWorkerChildopts); - commandList.addAll(gcOpts); - commandList.addAll(workerProfilerChildopts); - commandList.add("-Djava.library.path=" + jlp); - commandList.add("-Dlogfile.name=" + logfileName); - commandList.add("-Dstorm.home=" + stormHome); - commandList.add("-Dworkers.artifacts=" + workersArtifacets); - commandList.add("-Dstorm.conf.file=" + stormConfFile); - commandList.add("-Dstorm.options=" + stormOptions); - commandList.add("-Dstorm.log.dir=" + stormLogDir); - commandList.add("-Djava.io.tmpdir=" + workerTmpDir); - commandList.add("-Dlogging.sensitivity=" + loggingSensitivity); - commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile); - commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"); - commandList.add("-Dstorm.id=" + stormId); - commandList.add("-Dworker.id=" + workerId); - commandList.add("-Dworker.port=" + port); - commandList.add("-cp"); - commandList.add(workerClassPath); - commandList.add("org.apache.storm.daemon.worker"); - commandList.add(stormId); - commandList.add(assignmentId); - commandList.add(String.valueOf(port)); - commandList.add(workerId); - - // {"cpu" cpu "memory" (+ mem-onheap mem-offheap (int (Math/ceil (conf STORM-CGROUP-MEMORY-LIMIT-TOLERANCE-MARGIN-MB)))) - if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) { - int cgRoupMem = (int) (Math.ceil((double) conf.get(Config.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB))); - int memoryValue = memoffheap + memOnheap + cgRoupMem; - int cpuValue = cpu; - Map<String, Number> map = new HashMap<>(); - map.put("cpu", cpuValue); - map.put("memory", memoryValue); - cgroupManager.reserveResourcesForWorker(workerId, map); - commandList = cgroupManager.getLaunchCommand(workerId, commandList); - } - - LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList)); writeLogMetadata(stormConf, user, workerId, stormId, port, conf); ConfigUtils.setWorkerUserWSE(conf, workerId, user); createArtifactsLink(conf, stormId, port, workerId); String logPrefix = "Worker Process " + workerId; - String workerDir = ConfigUtils.workerRoot(conf, workerId); - if (deadWorkers != null) deadWorkers.remove(workerId); createBlobstoreLinks(conf, stormId, workerId); - ProcessExitCallback processExitCallback = new ProcessExitCallback(logPrefix, workerId); - if (runWorkerAsUser) { - List<String> args = new ArrayList<>(); - args.add("worker"); - args.add(workerDir); - args.add(Utils.writeScript(workerDir, commandList, topEnvironment)); - SupervisorUtils.workerLauncher(conf, user, args, null, logPrefix, processExitCallback, new File(workerDir)); - } else { - Utils.launchProcess(commandList, topEnvironment, logPrefix, processExitCallback, new File(workerDir)); - } - } - - protected String jlp(String stormRoot, Map conf) { - String resourceRoot = stormRoot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR; - String os = System.getProperty("os.name").replaceAll("\\s+", "_"); - String arch = System.getProperty("os.arch"); - String archResourceRoot = resourceRoot + Utils.FILE_PATH_SEPARATOR + os + "-" + arch; - String ret = archResourceRoot + Utils.FILE_PATH_SEPARATOR + resourceRoot + Utils.FILE_PATH_SEPARATOR + conf.get(Config.JAVA_LIBRARY_PATH); - return ret; + workerManager.launchWorker(supervisorId, assignmentId, stormId, port, workerId, resources, processExitCallback); } protected Map<String, Integer> startNewWorkers(Map<Integer, String> newWorkerIds, Map<Integer, LocalAssignment> reassignExecutors) throws IOException { @@ -528,10 +285,9 @@ public class SyncProcessEvent implements Runnable { FileUtils.forceMkdir(new File(hbPath)); if (clusterMode.endsWith("distributed")) { - launchWorker(conf, supervisorId, supervisorData.getAssignmentId(), stormId, port.longValue(), workerId, resources, - supervisorData.getResourceIsolationManager(), supervisorData.getDeadWorkers()); + launchDistributedWorker(supervisorData.getWorkerManager(), conf, supervisorId, supervisorData.getAssignmentId(), stormId, port.longValue(), workerId, resources, supervisorData.getDeadWorkers()); } else if (clusterMode.endsWith("local")) { - launchWorker(supervisorData, stormId, port.longValue(), workerId, resources); + launchLocalWorker(supervisorData, stormId, port.longValue(), workerId, resources); } newValidWorkerIds.put(workerId, port); @@ -559,9 +315,7 @@ public class SyncProcessEvent implements Runnable { } if (stormconf.get(Config.TOPOLOGY_GROUPS) != null) { List<String> topGroups = (List<String>) stormconf.get(Config.TOPOLOGY_GROUPS); - for (String group : topGroups){ - logsGroups.add(group); - } + logsGroups.addAll(topGroups); } data.put(Config.LOGS_GROUPS, logsGroups.toArray()); @@ -609,7 +363,6 @@ public class SyncProcessEvent implements Runnable { }finally { writer.close(); } - } /** @@ -665,8 +418,11 @@ public class SyncProcessEvent implements Runnable { } } - //for supervisor-test - public void shutWorker(SupervisorData supervisorData, String workerId) throws IOException, InterruptedException{ - SupervisorUtils.shutWorker(supervisorData, workerId); + public void shutWorker(SupervisorData supervisorData, IWorkerManager workerManager, String workerId) throws IOException, InterruptedException{ + workerManager.shutdownWorker(supervisorData.getSupervisorId(), workerId, supervisorData.getWorkerThreadPids()); + boolean success = workerManager.cleanupWorker(workerId); + if (success){ + supervisorData.getDeadWorkers().remove(workerId); + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java index 4c08014..47cf440 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java @@ -109,6 +109,7 @@ public class SyncSupervisorEvent implements Runnable { LOG.debug("Checked Downloaded Ids {}", srashStormIds); LOG.debug("Downloaded Ids {}", downloadedStormIds); LOG.debug("Storm Ids Profiler Actions {}", stormIdToProfilerActions); + // download code first // This might take awhile // - should this be done separately from usual monitoring? @@ -204,12 +205,12 @@ public class SyncSupervisorEvent implements Runnable { List<ExecutorInfo> existExecutors = existingAssignment.get(port).get_executors(); List<ExecutorInfo> newExecutors = newAssignment.get(port).get_executors(); if (newExecutors.size() != existExecutors.size()) { - syncProcesses.shutWorker(supervisorData, vaildPortToWorkerIds.get(port)); + syncProcesses.shutWorker(supervisorData, supervisorData.getWorkerManager(), vaildPortToWorkerIds.get(port)); continue; } for (ExecutorInfo executorInfo : newExecutors) { if (!existExecutors.contains(executorInfo)) { - syncProcesses.shutWorker(supervisorData, vaildPortToWorkerIds.get(port)); + syncProcesses.shutWorker(supervisorData, supervisorData.getWorkerManager(), vaildPortToWorkerIds.get(port)); break; } } @@ -353,7 +354,12 @@ public class SyncSupervisorEvent implements Runnable { } finally { blobStore.shutdown(); } - FileUtils.moveDirectory(new File(tmproot), new File(stormroot)); + try { + FileUtils.moveDirectory(new File(tmproot), new File(stormroot)); + }catch (Exception e){ + //igonre + } + SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot); ClassLoader classloader = Thread.currentThread().getContextClassLoader(); @@ -503,7 +509,7 @@ public class SyncSupervisorEvent implements Runnable { protected void setupBlobPermission(Map conf, String user, String path) throws IOException { if (Utils.getBoolean(Config.SUPERVISOR_RUN_WORKER_AS_USER, false)) { String logPrefix = "setup blob permissions for " + path; - SupervisorUtils.workerLauncherAndWait(conf, user, Arrays.asList("blob", path), null, logPrefix); + SupervisorUtils.processLauncherAndWait(conf, user, Arrays.asList("blob", path), null, logPrefix); } } @@ -623,7 +629,7 @@ public class SyncSupervisorEvent implements Runnable { String workerId = entry.getKey(); StateHeartbeat stateHeartbeat = entry.getValue(); if (stateHeartbeat.getState() == State.DISALLOWED) { - syncProcesses.shutWorker(supervisorData, workerId); + syncProcesses.shutWorker(supervisorData, supervisorData.getWorkerManager(), workerId); LOG.debug("{}'s state disallowed, so shutdown this worker"); } } http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java index d39a679..ec29855 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java @@ -171,7 +171,7 @@ public class RunProfilerActions implements Runnable { newCommands.add("profiler"); newCommands.add(targetDir); newCommands.add(script); - SupervisorUtils.workerLauncher(conf, user, newCommands, environment, logPrefix, exitCodeCallable, targetFile); + SupervisorUtils.processLauncher(conf, user, newCommands, environment, logPrefix, exitCodeCallable, targetFile); } else { Utils.launchProcess(commands, environment, logPrefix, exitCodeCallable, targetFile); } http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java index f6b3ed6..5e7b6d3 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java @@ -21,6 +21,7 @@ package org.apache.storm.daemon.supervisor.timer; import org.apache.storm.command.HealthCheck; import org.apache.storm.daemon.supervisor.SupervisorData; import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,12 +42,17 @@ public class SupervisorHealthCheck implements Runnable { @Override public void run() { Map conf = supervisorData.getConf(); + IWorkerManager workerManager = supervisorData.getWorkerManager(); int healthCode = HealthCheck.healthCheck(conf); Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf); if (healthCode != 0) { for (String workerId : workerIds) { try { - SupervisorUtils.shutWorker(supervisorData, workerId); + workerManager.shutdownWorker(supervisorData.getSupervisorId(), workerId, supervisorData.getWorkerThreadPids()); + boolean success = workerManager.cleanupWorker(workerId); + if (success){ + supervisorData.getDeadWorkers().remove(workerId); + } } catch (Exception e) { throw Utils.wrapInRuntime(e); } http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java new file mode 100644 index 0000000..b19fd89 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java @@ -0,0 +1,397 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor.workermanager; + +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.ProcessSimulator; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.*; + +public class DefaultWorkerManager implements IWorkerManager { + + private static Logger LOG = LoggerFactory.getLogger(DefaultWorkerManager.class); + + private Map conf; + private CgroupManager resourceIsolationManager; + private boolean runWorkerAsUser; + + @Override + public void prepareWorker(Map conf, Localizer localizer) { + this.conf = conf; + if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) { + try { + this.resourceIsolationManager = Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN)); + this.resourceIsolationManager.prepare(conf); + LOG.info("Using resource isolation plugin {} {}", conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager); + } catch (IOException e) { + throw Utils.wrapInRuntime(e); + } + } else { + this.resourceIsolationManager = null; + } + this.runWorkerAsUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false); + } + + @Override + public IWorkerResult launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources, + Utils.ExitCodeCallable workerExitCallback) { + try { + + String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); + String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options")); + String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file")); + String workerTmpDir = ConfigUtils.workerTmpRoot(conf, workerId); + + String stormLogDir = ConfigUtils.getLogDir(); + String stormLogConfDir = (String) (conf.get(Config.STORM_LOG4J2_CONF_DIR)); + + String stormLog4j2ConfDir; + if (StringUtils.isNotBlank(stormLogConfDir)) { + if (Utils.isAbsolutePath(stormLogConfDir)) { + stormLog4j2ConfDir = stormLogConfDir; + } else { + stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + stormLogConfDir; + } + } else { + stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2"; + } + + String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId); + + String jlp = jlp(stormRoot, conf); + + String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot); + + Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); + + String workerClassPath = getWorkerClassPath(stormJar, stormConf); + + Object topGcOptsObject = stormConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS); + List<String> topGcOpts = new ArrayList<>(); + if (topGcOptsObject instanceof String) { + topGcOpts.add((String) topGcOptsObject); + } else if (topGcOptsObject instanceof List) { + topGcOpts.addAll((List<String>) topGcOptsObject); + } + + int memOnheap = 0; + if (resources.get_mem_on_heap() > 0) { + memOnheap = (int) Math.ceil(resources.get_mem_on_heap()); + } else { + // set the default heap memory size for supervisor-test + memOnheap = Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB), 768); + } + + int memoffheap = (int) Math.ceil(resources.get_mem_off_heap()); + + int cpu = (int) Math.ceil(resources.get_cpu()); + + List<String> gcOpts = null; + + if (topGcOpts.size() > 0) { + gcOpts = substituteChildopts(topGcOpts, workerId, stormId, port, memOnheap); + } else { + gcOpts = substituteChildopts(conf.get(Config.WORKER_GC_CHILDOPTS), workerId, stormId, port, memOnheap); + } + + Object topoWorkerLogwriterObject = stormConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS); + List<String> topoWorkerLogwriterChildopts = new ArrayList<>(); + if (topoWorkerLogwriterObject instanceof String) { + topoWorkerLogwriterChildopts.add((String) topoWorkerLogwriterObject); + } else if (topoWorkerLogwriterObject instanceof List) { + topoWorkerLogwriterChildopts.addAll((List<String>) topoWorkerLogwriterObject); + } + + String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); + + String logfileName = "worker.log"; + + String workersArtifacets = ConfigUtils.workerArtifactsRoot(conf); + + String loggingSensitivity = (String) stormConf.get(Config.TOPOLOGY_LOGGING_SENSITIVITY); + if (loggingSensitivity == null) { + loggingSensitivity = "S3"; + } + + List<String> workerChildopts = substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), workerId, stormId, port, memOnheap); + + List<String> topWorkerChildopts = substituteChildopts(stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), workerId, stormId, port, memOnheap); + + List<String> workerProfilerChildopts = null; + if (Utils.getBoolean(conf.get(Config.WORKER_PROFILER_ENABLED), false)) { + workerProfilerChildopts = substituteChildopts(conf.get(Config.WORKER_PROFILER_CHILDOPTS), workerId, stormId, port, memOnheap); + } else { + workerProfilerChildopts = new ArrayList<>(); + } + + Map<String, String> topEnvironment = new HashMap<String, String>(); + Map<String, String> environment = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT); + if (environment != null) { + topEnvironment.putAll(environment); + } + topEnvironment.put("LD_LIBRARY_PATH", jlp); + + String log4jConfigurationFile = null; + if (System.getProperty("os.name").startsWith("Windows") && !stormLog4j2ConfDir.startsWith("file:")) { + log4jConfigurationFile = "file:///" + stormLog4j2ConfDir; + } else { + log4jConfigurationFile = stormLog4j2ConfDir; + } + log4jConfigurationFile = log4jConfigurationFile + Utils.FILE_PATH_SEPARATOR + "worker.xml"; + + List<String> commandList = new ArrayList<>(); + commandList.add(SupervisorUtils.javaCmd("java")); + commandList.add("-cp"); + commandList.add(workerClassPath); + commandList.addAll(topoWorkerLogwriterChildopts); + commandList.add("-Dlogfile.name=" + logfileName); + commandList.add("-Dstorm.home=" + stormHome); + commandList.add("-Dworkers.artifacts=" + workersArtifacets); + commandList.add("-Dstorm.id=" + stormId); + commandList.add("-Dworker.id=" + workerId); + commandList.add("-Dworker.port=" + port); + commandList.add("-Dstorm.log.dir=" + stormLogDir); + commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile); + commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"); + commandList.add("org.apache.storm.LogWriter"); + + commandList.add(SupervisorUtils.javaCmd("java")); + commandList.add("-server"); + commandList.addAll(workerChildopts); + commandList.addAll(topWorkerChildopts); + commandList.addAll(gcOpts); + commandList.addAll(workerProfilerChildopts); + commandList.add("-Djava.library.path=" + jlp); + commandList.add("-Dlogfile.name=" + logfileName); + commandList.add("-Dstorm.home=" + stormHome); + commandList.add("-Dworkers.artifacts=" + workersArtifacets); + commandList.add("-Dstorm.conf.file=" + stormConfFile); + commandList.add("-Dstorm.options=" + stormOptions); + commandList.add("-Dstorm.log.dir=" + stormLogDir); + commandList.add("-Djava.io.tmpdir=" + workerTmpDir); + commandList.add("-Dlogging.sensitivity=" + loggingSensitivity); + commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile); + commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"); + commandList.add("-Dstorm.id=" + stormId); + commandList.add("-Dworker.id=" + workerId); + commandList.add("-Dworker.port=" + port); + commandList.add("-cp"); + commandList.add(workerClassPath); + commandList.add("org.apache.storm.daemon.worker"); + commandList.add(stormId); + commandList.add(assignmentId); + commandList.add(String.valueOf(port)); + commandList.add(workerId); + + // {"cpu" cpu "memory" (+ mem-onheap mem-offheap (int (Math/ceil (conf STORM-CGROUP-MEMORY-LIMIT-TOLERANCE-MARGIN-MB)))) + if (resourceIsolationManager != null) { + int cGroupMem = (int) (Math.ceil((double) conf.get(Config.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB))); + int memoryValue = memoffheap + memOnheap + cGroupMem; + int cpuValue = cpu; + Map<String, Number> map = new HashMap<>(); + map.put("cpu", cpuValue); + map.put("memory", memoryValue); + resourceIsolationManager.reserveResourcesForWorker(workerId, map); + commandList = resourceIsolationManager.getLaunchCommand(workerId, commandList); + } + + LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList)); + + String logPrefix = "Worker Process " + workerId; + String workerDir = ConfigUtils.workerRoot(conf, workerId); + + if (runWorkerAsUser) { + List<String> args = new ArrayList<>(); + args.add("worker"); + args.add(workerDir); + args.add(Utils.writeScript(workerDir, commandList, topEnvironment)); + SupervisorUtils.processLauncher(conf, user, args, null, logPrefix, workerExitCallback, new File(workerDir)); + } else { + Utils.launchProcess(commandList, topEnvironment, logPrefix, workerExitCallback, new File(workerDir)); + } + } catch (IOException e) { + throw Utils.wrapInRuntime(e); + } + return null; + } + + @Override + public IWorkerResult shutdownWorker(String supervisorId, String workerId, Map<String, String> workerThreadPids) { + try { + LOG.info("Shutting down {}:{}", supervisorId, workerId); + Collection<String> pids = Utils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId)); + Integer shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)); + String user = ConfigUtils.getWorkerUser(conf, workerId); + String threadPid = workerThreadPids.get(workerId); + if (StringUtils.isNotBlank(threadPid)) { + ProcessSimulator.killProcess(threadPid); + } + + for (String pid : pids) { + if (runWorkerAsUser) { + List<String> commands = new ArrayList<>(); + commands.add("signal"); + commands.add(pid); + commands.add("15"); + String logPrefix = "kill -15 " + pid; + SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPrefix); + } else { + Utils.killProcessWithSigTerm(pid); + } + } + + if (pids.size() > 0) { + LOG.info("Sleep {} seconds for execution of cleanup threads on worker.", shutdownSleepSecs); + Time.sleepSecs(shutdownSleepSecs); + } + + for (String pid : pids) { + if (runWorkerAsUser) { + List<String> commands = new ArrayList<>(); + commands.add("signal"); + commands.add(pid); + commands.add("9"); + String logPrefix = "kill -9 " + pid; + SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPrefix); + } else { + Utils.forceKillProcess(pid); + } + String path = ConfigUtils.workerPidPath(conf, workerId, pid); + if (runWorkerAsUser) { + SupervisorUtils.rmrAsUser(conf, workerId, path); + } else { + try { + LOG.debug("Removing path {}", path); + new File(path).delete(); + } catch (Exception e) { + // on windows, the supervisor may still holds the lock on the worker directory + // ignore + } + } + } + LOG.info("Shut down {}:{}", supervisorId, workerId); + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } + return null; + } + + @Override + public boolean cleanupWorker(String workerId) { + try { + String workerRoot = ConfigUtils.workerRoot(conf, workerId); + if (Utils.checkFileExists(workerRoot)) { + if (runWorkerAsUser) { + SupervisorUtils.rmrAsUser(conf, workerId, workerRoot); + } else { + Utils.forceDelete(ConfigUtils.workerHeartbeatsRoot(conf, workerId)); + Utils.forceDelete(ConfigUtils.workerPidsRoot(conf, workerId)); + Utils.forceDelete(ConfigUtils.workerTmpRoot(conf, workerId)); + Utils.forceDelete(ConfigUtils.workerRoot(conf, workerId)); + } + ConfigUtils.removeWorkerUserWSE(conf, workerId); + } + if (resourceIsolationManager != null) { + resourceIsolationManager.releaseResourcesForWorker(workerId); + } + return true; + } catch (IOException e) { + LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e); + } catch (RuntimeException e) { + LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e); + } + return false; + } + + @Override + public IWorkerResult resizeWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources) { + return null; + } + + protected String jlp(String stormRoot, Map conf) { + String resourceRoot = stormRoot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR; + String os = System.getProperty("os.name").replaceAll("\\s+", "_"); + String arch = System.getProperty("os.arch"); + String archResourceRoot = resourceRoot + Utils.FILE_PATH_SEPARATOR + os + "-" + arch; + String ret = archResourceRoot + Utils.FILE_PATH_SEPARATOR + resourceRoot + Utils.FILE_PATH_SEPARATOR + conf.get(Config.JAVA_LIBRARY_PATH); + return ret; + } + + protected String getWorkerClassPath(String stormJar, Map stormConf) { + List<String> topoClasspath = new ArrayList<>(); + Object object = stormConf.get(Config.TOPOLOGY_CLASSPATH); + + if (object instanceof List) { + topoClasspath.addAll((List<String>) object); + } else if (object instanceof String) { + topoClasspath.add((String) object); + } else { + LOG.error("topology specific classpath is invaild"); + } + String classPath = Utils.workerClasspath(); + String classAddPath = Utils.addToClasspath(classPath, Arrays.asList(stormJar)); + return Utils.addToClasspath(classAddPath, topoClasspath); + } + + /** + * "Generates runtime childopts by replacing keys with topology-id, worker-id, port, mem-onheap" + * + * @param value + * @param workerId + * @param stormId + * @param port + * @param memOnheap + */ + public List<String> substituteChildopts(Object value, String workerId, String stormId, Long port, int memOnheap) { + List<String> rets = new ArrayList<>(); + if (value instanceof String) { + String string = (String) value; + string = string.replace("%ID%", String.valueOf(port)); + string = string.replace("%WORKER-ID%", workerId); + string = string.replace("%TOPOLOGY-ID%", stormId); + string = string.replace("%WORKER-PORT%", String.valueOf(port)); + string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap)); + String[] strings = string.split("\\s+"); + rets.addAll(Arrays.asList(strings)); + } else if (value instanceof List) { + List<Object> objects = (List<Object>) value; + for (Object object : objects) { + String str = (String) object; + str = str.replace("%ID%", String.valueOf(port)); + str = str.replace("%WORKER-ID%", workerId); + str = str.replace("%TOPOLOGY-ID%", stormId); + str = str.replace("%WORKER-PORT%", String.valueOf(port)); + str = str.replace("%HEAP-MEM%", String.valueOf(memOnheap)); + rets.add(str); + } + } + return rets; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java new file mode 100644 index 0000000..3b0912a --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor.workermanager; + +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.Utils; + +import java.util.List; +import java.util.Map; + +public interface IWorkerManager { + public void prepareWorker(Map conf, Localizer localizer); + + IWorkerResult launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources, + Utils.ExitCodeCallable workerExitCallback); + + IWorkerResult shutdownWorker(String supervisorId, String workerId, Map<String, String> workerThreadPids); + + IWorkerResult resizeWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources); + + public boolean cleanupWorker(String workerId); +} http://git-wip-us.apache.org/repos/asf/storm/blob/a1e47352/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerResult.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerResult.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerResult.java new file mode 100644 index 0000000..8bf5b14 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerResult.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor.workermanager; + +public interface IWorkerResult { +}
