Merge branch 'master' into supervisor
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/65ce9d2e Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/65ce9d2e Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/65ce9d2e Branch: refs/heads/master Commit: 65ce9d2e03be5f5c4defa8342bfbefe9f59adcf9 Parents: 184dc4a 81fb727 Author: xiaojian.fxj <[email protected]> Authored: Thu Mar 10 22:57:01 2016 +0800 Committer: xiaojian.fxj <[email protected]> Committed: Thu Mar 10 22:57:01 2016 +0800 ---------------------------------------------------------------------- CHANGELOG.md | 6 + conf/log4j2.xml | 2 +- .../travis/print-errors-from-test-reports.py | 4 + .../org/apache/storm/starter/ManualDRPC.java | 53 ++- .../src/clj/org/apache/storm/LocalDRPC.clj | 56 --- .../org/apache/storm/command/kill_workers.clj | 2 +- .../src/clj/org/apache/storm/daemon/common.clj | 13 +- .../src/clj/org/apache/storm/daemon/drpc.clj | 221 +----------- .../apache/storm/daemon/local_supervisor.clj | 2 +- .../clj/org/apache/storm/daemon/logviewer.clj | 27 +- .../src/clj/org/apache/storm/daemon/nimbus.clj | 155 ++++---- storm-core/src/clj/org/apache/storm/testing.clj | 5 +- .../clj/org/apache/storm/trident/testing.clj | 2 - storm-core/src/clj/org/apache/storm/ui/core.clj | 81 +++-- .../src/clj/org/apache/storm/ui/helpers.clj | 10 +- .../src/jvm/org/apache/storm/LocalDRPC.java | 72 ++++ .../jvm/org/apache/storm/daemon/DrpcServer.java | 357 +++++++++++++++++++ .../storm/daemon/supervisor/ShutdownWork.java | 124 ------- .../daemon/supervisor/StandaloneSupervisor.java | 1 - .../storm/daemon/supervisor/Supervisor.java | 35 +- .../storm/daemon/supervisor/SupervisorData.java | 18 - .../daemon/supervisor/SupervisorManger.java | 3 - .../daemon/supervisor/SupervisorUtils.java | 3 - .../daemon/supervisor/SyncProcessEvent.java | 4 +- .../daemon/supervisor/SyncSupervisorEvent.java | 24 +- .../supervisor/timer/SupervisorHealthCheck.java | 1 - .../daemon/supervisor/timer/UpdateBlobs.java | 1 - .../storm/metric/StormMetricsRegistry.java | 86 +++++ .../auth/AbstractSaslClientCallbackHandler.java | 76 ++++ .../auth/AbstractSaslServerCallbackHandler.java | 94 +++++ .../auth/digest/ClientCallbackHandler.java | 60 +--- .../auth/digest/ServerCallbackHandler.java | 61 +--- .../auth/plain/PlainClientCallbackHandler.java | 31 ++ .../auth/plain/PlainSaslTransportPlugin.java | 71 ++++ .../auth/plain/PlainServerCallbackHandler.java | 55 +++ .../security/auth/plain/SaslPlainServer.java | 158 ++++++++ .../test/clj/org/apache/storm/drpc_test.clj | 27 +- .../storm/security/auth/drpc_auth_test.clj | 5 +- 38 files changed, 1237 insertions(+), 769 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/65ce9d2e/storm-core/src/clj/org/apache/storm/command/kill_workers.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/command/kill_workers.clj index 4ddc993,4e713f9..aadc9fd --- a/storm-core/src/clj/org/apache/storm/command/kill_workers.clj +++ b/storm-core/src/clj/org/apache/storm/command/kill_workers.clj @@@ -14,10 -14,11 +14,10 @@@ ;; See the License for the specific language governing permissions and ;; limitations under the License. (ns org.apache.storm.command.kill-workers - (:import [java.io File]) + (:import [java.io File] - [org.apache.storm.daemon.supervisor SupervisorUtils StandaloneSupervisor SupervisorData ShutdownWork]) ++ [org.apache.storm.daemon.supervisor SupervisorUtils StandaloneSupervisor SupervisorData]) (:use [org.apache.storm.daemon common]) (:use [org.apache.storm util config]) - (:require [org.apache.storm.daemon - [supervisor :as supervisor]]) (:import [org.apache.storm.utils ConfigUtils]) (:gen-class)) http://git-wip-us.apache.org/repos/asf/storm/blob/65ce9d2e/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj index 2361817,0000000..c8ae2d6 mode 100644,000000..100644 --- a/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj @@@ -1,60 -1,0 +1,60 @@@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.daemon.local-supervisor - (:import [org.apache.storm.daemon.supervisor SyncProcessEvent SupervisorData ShutdownWork Supervisor SupervisorUtils] ++ (:import [org.apache.storm.daemon.supervisor SyncProcessEvent SupervisorData Supervisor SupervisorUtils] + [org.apache.storm.utils Utils ConfigUtils] + [org.apache.storm ProcessSimulator]) + (:use [org.apache.storm.daemon common] + [org.apache.storm log]) + (:require [org.apache.storm.daemon [worker :as worker] ]) + (:require [clojure.string :as str]) + (:gen-class)) + +(defn launch-local-worker [supervisorData stormId port workerId resources] + (let [conf (.getConf supervisorData) + pid (Utils/uuid) + worker (worker/mk-worker conf + (.getSharedContext supervisorData) + stormId + (.getAssignmentId supervisorData) + (int port) + workerId)] + (ConfigUtils/setWorkerUserWSE conf workerId "") + (ProcessSimulator/registerProcess pid worker) + (.put (.getWorkerThreadPids supervisorData) workerId pid) + )) + +(defn shutdown-local-worker [supervisorData workerId] + (log-message "shutdown-local-worker") + (SupervisorUtils/shutWorker supervisorData workerId)) + +(defn local-process [] + "Create a local process event" + (proxy [SyncProcessEvent] [] + (launchWorker [supervisorData stormId port workerId resources] + (launch-local-worker supervisorData stormId port workerId resources)) + (shutWorker [supervisorData workerId] (shutdown-local-worker supervisorData workerId)))) + + +(defserverfn mk-local-supervisor [conf shared-context isupervisor] + (log-message "Starting local Supervisor with conf " conf) + (if (not (ConfigUtils/isLocalMode conf)) + (throw + (IllegalArgumentException. "Cannot start server in distrubuted mode!"))) + (let [local-process (local-process) + supervisor-server (Supervisor.)] + (.setLocalSyncProcess supervisor-server local-process) + (.mkSupervisor supervisor-server conf shared-context isupervisor))) http://git-wip-us.apache.org/repos/asf/storm/blob/65ce9d2e/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/logviewer.clj index 38ac3ee,ed8d980..a9a3447 --- a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj @@@ -21,7 -21,7 +21,8 @@@ (:use [org.apache.storm config util log]) (:use [org.apache.storm.ui helpers]) (:import [org.apache.storm StormTimer] - [org.apache.storm.daemon.supervisor SupervisorUtils]) ++ [org.apache.storm.daemon.supervisor SupervisorUtils] + [org.apache.storm.metric StormMetricsRegistry]) (:import [org.apache.storm.utils Utils Time VersionInfo ConfigUtils]) (:import [org.slf4j LoggerFactory]) (:import [java.util Arrays ArrayList HashSet]) http://git-wip-us.apache.org/repos/asf/storm/blob/65ce9d2e/storm-core/src/clj/org/apache/storm/testing.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/testing.clj index 1bd54f7,66fc051..dd1f2df --- a/storm-core/src/clj/org/apache/storm/testing.clj +++ b/storm-core/src/clj/org/apache/storm/testing.clj @@@ -24,8 -24,7 +24,8 @@@ (:import [org.apache.commons.io FileUtils] [org.apache.storm.utils] [org.apache.storm.zookeeper Zookeeper] - [org.apache.storm ProcessSimulator]) + [org.apache.storm ProcessSimulator] - [org.apache.storm.daemon.supervisor StandaloneSupervisor SupervisorData ShutdownWork SupervisorManger]) ++ [org.apache.storm.daemon.supervisor StandaloneSupervisor SupervisorData SupervisorManger SupervisorUtils]) (:import [java.io File]) (:import [java.util HashMap ArrayList]) (:import [java.util.concurrent.atomic AtomicInteger]) @@@ -412,15 -406,14 +412,14 @@@ (defn mk-capture-shutdown-fn [capture-atom] - (let [shut-down (ShutdownWork.)] - (let [existing-fn supervisor/shutdown-worker] - (fn [supervisor worker-id] - (let [conf (:conf supervisor) - supervisor-id (:supervisor-id supervisor) - port (find-worker-port conf worker-id) + (fn [supervisorData workerId] + (let [conf (.getConf supervisorData) + supervisor-id (.getSupervisorId supervisorData) + port (find-worker-port conf workerId) existing (get @capture-atom [supervisor-id port] 0)] + (log-message "mk-capture-shutdown-fn") (swap! capture-atom assoc [supervisor-id port] (inc existing)) - (.shutWorker shut-down supervisorData workerId))))) - (existing-fn supervisor worker-id))))) ++ (SupervisorUtils/shutWorker supervisorData workerId)))) (defmacro capture-changed-workers [& body] http://git-wip-us.apache.org/repos/asf/storm/blob/65ce9d2e/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java index d4ce623,0000000..4947c6f mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java @@@ -1,85 -1,0 +1,84 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.Config; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Utils; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; - import java.util.UUID; + +public class StandaloneSupervisor implements ISupervisor { + private String supervisorId; + private Map conf; + + @Override + public void prepare(Map stormConf, String schedulerLocalDir) { + try { + LocalState localState = new LocalState(schedulerLocalDir); + String supervisorId = localState.getSupervisorId(); + if (supervisorId == null) { + supervisorId = generateSupervisorId(); + localState.setSupervisorId(supervisorId); + } + this.conf = stormConf; + this.supervisorId = supervisorId; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public String getSupervisorId() { + return supervisorId; + } + + @Override + public String getAssignmentId() { + return supervisorId; + } + + @Override + // @return is vector which need be converted to be int + public Object getMetadata() { + Object ports = conf.get(Config.SUPERVISOR_SLOTS_PORTS); + return ports; + } + + @Override + public boolean confirmAssigned(int port) { + return true; + } + + @Override + public void killedWorker(int port) { + + } + + @Override + public void assigned(Collection<Integer> ports) { + + } + + public String generateSupervisorId(){ + return Utils.uuid(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/65ce9d2e/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java index 847b38d,0000000..6124aef mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java @@@ -1,195 -1,0 +1,178 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + - import com.codahale.metrics.Gauge; - import com.codahale.metrics.MetricRegistry; +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.StormTimer; - import org.apache.storm.daemon.metrics.MetricsUtils; - import org.apache.storm.daemon.metrics.reporters.PreparableReporter; +import org.apache.storm.daemon.supervisor.timer.RunProfilerActions; +import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck; +import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat; +import org.apache.storm.daemon.supervisor.timer.UpdateBlobs; +import org.apache.storm.event.EventManagerImp; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.messaging.IContext; ++import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.VersionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.InterruptedIOException; +import java.util.Collection; - import java.util.List; +import java.util.Map; +import java.util.Set; ++import java.util.concurrent.Callable; + +public class Supervisor { + private static Logger LOG = LoggerFactory.getLogger(Supervisor.class); + - //TODO: to be removed after porting worker.clj. localSyncProcess is intended to start local supervisor ++ // TODO: to be removed after porting worker.clj. localSyncProcess is intended to start local supervisor + private SyncProcessEvent localSyncProcess; + + public void setLocalSyncProcess(SyncProcessEvent localSyncProcess) { + this.localSyncProcess = localSyncProcess; + } + - + /** + * in local state, supervisor stores who its current assignments are another thread launches events to restart any dead processes if necessary + * + * @param conf + * @param sharedContext + * @param iSupervisor + * @return + * @throws Exception + */ + public SupervisorManger mkSupervisor(final Map conf, IContext sharedContext, ISupervisor iSupervisor) throws Exception { + SupervisorManger supervisorManger = null; + try { + LOG.info("Starting Supervisor with conf {}", conf); + iSupervisor.prepare(conf, ConfigUtils.supervisorIsupervisorDir(conf)); + String path = ConfigUtils.supervisorTmpDir(conf); + FileUtils.cleanDirectory(new File(path)); + + final SupervisorData supervisorData = new SupervisorData(conf, sharedContext, iSupervisor); + Localizer localizer = supervisorData.getLocalizer(); + + SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, supervisorData); + hb.run(); + // should synchronize supervisor so it doesn't launch anything after being down (optimization) + Integer heartbeatFrequency = Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS)); + supervisorData.getHeartbeatTimer().scheduleRecurring(0, heartbeatFrequency, hb); + + Set<String> downdedStormId = SupervisorUtils.readDownLoadedStormIds(conf); + for (String stormId : downdedStormId) { + SupervisorUtils.addBlobReferences(localizer, stormId, conf); + } + // do this after adding the references so we don't try to clean things being used + localizer.startCleaner(); + + EventManagerImp syncSupEventManager = new EventManagerImp(false); + EventManagerImp syncProcessManager = new EventManagerImp(false); + + SyncProcessEvent syncProcessEvent = null; - if (ConfigUtils.isLocalMode(conf)){ ++ if (ConfigUtils.isLocalMode(conf)) { + localSyncProcess.init(supervisorData); + syncProcessEvent = localSyncProcess; - }else{ ++ } else { + syncProcessEvent = new SyncProcessEvent(supervisorData); + } + + SyncSupervisorEvent syncSupervisorEvent = new SyncSupervisorEvent(supervisorData, syncProcessEvent, syncSupEventManager, syncProcessManager); + UpdateBlobs updateBlobsThread = new UpdateBlobs(supervisorData); + RunProfilerActions runProfilerActionThread = new RunProfilerActions(supervisorData); + + if ((Boolean) conf.get(Config.SUPERVISOR_ENABLE)) { + StormTimer eventTimer = supervisorData.getEventTimer(); + // This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up + // to date even if callbacks don't all work exactly right + eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(syncSupervisorEvent, syncSupEventManager)); + + eventTimer.scheduleRecurring(0, Utils.getInt(conf.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS)), + new EventManagerPushCallback(syncProcessEvent, syncProcessManager)); + + // Blob update thread. Starts with 30 seconds delay, every 30 seconds + supervisorData.getBlobUpdateTimer().scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, syncSupEventManager)); + + // supervisor health check + eventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(supervisorData)); + + // Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds + eventTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(runProfilerActionThread, syncSupEventManager)); + } - LOG.info("Starting supervisor with id {} at host {}.", supervisorData.getSupervisorId(), supervisorData.getHostName() ); ++ LOG.info("Starting supervisor with id {} at host {}.", supervisorData.getSupervisorId(), supervisorData.getHostName()); + supervisorManger = new SupervisorManger(supervisorData, syncSupEventManager, syncProcessManager); + } catch (Throwable t) { + if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, t)) { + throw t; + } else if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, t)) { + throw t; + } else { + LOG.error("Error on initialization of server supervisor: {}", t); + Utils.exitProcess(13, "Error on initialization"); + } + } + return supervisorManger; + } + + /** + * start distribute supervisor + */ + private void launch(ISupervisor iSupervisor) { + LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion()); + SupervisorManger supervisorManager; + try { + Map<Object, Object> conf = Utils.readStormConfig(); + if (ConfigUtils.isLocalMode(conf)) { + throw new IllegalArgumentException("Cannot start server in local mode!"); + } + supervisorManager = mkSupervisor(conf, null, iSupervisor); + if (supervisorManager != null) + Utils.addShutdownHookWithForceKillIn1Sec(supervisorManager); + registerWorkerNumGauge("supervisor:num-slots-used-gauge", conf); - startMetricsReporters(conf); ++ StormMetricsRegistry.startMetricsReporters(conf); + } catch (Exception e) { + LOG.error("Failed to start supervisor\n", e); + System.exit(1); + } + } + - // To be removed + private void registerWorkerNumGauge(String name, final Map conf) { - MetricRegistry metricRegistry = new MetricRegistry(); - metricRegistry.remove(name); - metricRegistry.register(name, new Gauge<Integer>() { ++ StormMetricsRegistry.registerGauge("supervisor:num-slots-used-gauge", new Callable<Integer>() { + @Override - public Integer getValue() { ++ public Integer call() throws Exception { + Collection<String> pids = SupervisorUtils.supervisorWorkerIds(conf); + return pids.size(); + } + }); + } + - // To be removed - private void startMetricsReporters(Map conf) { - List<PreparableReporter> preparableReporters = MetricsUtils.getPreparableReporters(conf); - for (PreparableReporter reporter : preparableReporters) { - reporter.prepare(new MetricRegistry(), conf); - reporter.start(); - } - LOG.info("Started statistics report plugin..."); - } - + /** + * supervisor daemon enter entrance + * + * @param args + */ + public static void main(String[] args) { + Utils.setupDefaultUncaughtExceptionHandler(); + Supervisor instance = new Supervisor(); + instance.launch(new StandaloneSupervisor()); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/65ce9d2e/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java index be39b4e,0000000..be79847 mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java @@@ -1,267 -1,0 +1,249 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.Config; +import org.apache.storm.StormTimer; +import org.apache.storm.cluster.ClusterStateContext; +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.messaging.IContext; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.VersionInfo; - import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class SupervisorData { + + private static final Logger LOG = LoggerFactory.getLogger(SupervisorData.class); + + private final Map conf; + private final IContext sharedContext; + private volatile boolean active; + private ISupervisor iSupervisor; + private Utils.UptimeComputer upTime; + private String stormVersion; - + private ConcurrentHashMap<String, String> workerThreadPids; // for local mode - + private IStormClusterState stormClusterState; - + private LocalState localState; - + private String supervisorId; - + private String assignmentId; - + private String hostName; - + // used for reporting used ports when heartbeating + private AtomicReference<Map<Long, LocalAssignment>> currAssignment; - + private StormTimer heartbeatTimer; - + private StormTimer eventTimer; - + private StormTimer blobUpdateTimer; - + private Localizer localizer; - + private AtomicReference<Map<String, Map<String, Object>>> assignmentVersions; - + private AtomicInteger syncRetry; - + private final Object downloadLock = new Object(); - + private AtomicReference<Map<String, List<ProfileRequest>>> stormIdToProfileActions; - + private CgroupManager resourceIsolationManager; - + private ConcurrentHashSet<String> deadWorkers; + + public SupervisorData(Map conf, IContext sharedContext, ISupervisor iSupervisor) { + this.conf = conf; + this.sharedContext = sharedContext; + this.iSupervisor = iSupervisor; + this.active = true; + this.upTime = Utils.makeUptimeComputer(); + this.stormVersion = VersionInfo.getVersion(); + this.workerThreadPids = new ConcurrentHashMap<String, String>(); + this.deadWorkers = new ConcurrentHashSet(); + + List<ACL> acls = null; + if (Utils.isZkAuthenticationConfiguredStormServer(conf)) { + acls = SupervisorUtils.supervisorZkAcls(); + } + + try { + this.stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.SUPERVISOR)); + } catch (Exception e) { + LOG.error("supervisor can't create stormClusterState"); + throw Utils.wrapInRuntime(e); + } + + try { + this.localState = ConfigUtils.supervisorState(conf); + this.localizer = Utils.createLocalizer(conf, ConfigUtils.supervisorLocalDir(conf)); + } catch (IOException e) { + throw Utils.wrapInRuntime(e); + } + this.supervisorId = iSupervisor.getSupervisorId(); + this.assignmentId = iSupervisor.getAssignmentId(); + + try { + this.hostName = Utils.hostname(conf); + } catch (UnknownHostException e) { + throw Utils.wrapInRuntime(e); + } + + this.currAssignment = new AtomicReference<Map<Long, LocalAssignment>>(new HashMap<Long,LocalAssignment>()); + + this.heartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler()); + + this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler()); + + this.blobUpdateTimer = new StormTimer("blob-update-timer", new DefaultUncaughtExceptionHandler()); + + this.assignmentVersions = new AtomicReference<Map<String, Map<String, Object>>>(new HashMap<String, Map<String, Object>>()); + this.syncRetry = new AtomicInteger(0); + this.stormIdToProfileActions = new AtomicReference<Map<String, List<ProfileRequest>>>(new HashMap<String, List<ProfileRequest>>()); + if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) { + try { + this.resourceIsolationManager = (CgroupManager) Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN)); + this.resourceIsolationManager.prepare(conf); + LOG.info("Using resource isolation plugin {} {}", conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager); + } catch (IOException e) { + throw Utils.wrapInRuntime(e); + } + } else { + this.resourceIsolationManager = null; + } + } + + public AtomicReference<Map<String, List<ProfileRequest>>> getStormIdToProfileActions() { + return stormIdToProfileActions; + } + + public void setStormIdToProfileActions(Map<String, List<ProfileRequest>> stormIdToProfileActions) { + this.stormIdToProfileActions.set(stormIdToProfileActions); + } + + public Map getConf() { + return conf; + } + + public IContext getSharedContext() { + return sharedContext; + } + + public boolean isActive() { + return active; + } + + public void setActive(boolean active) { + this.active = active; + } + + public ISupervisor getiSupervisor() { + return iSupervisor; + } + + public Utils.UptimeComputer getUpTime() { + return upTime; + } + + public String getStormVersion() { + return stormVersion; + } + + public ConcurrentHashMap<String, String> getWorkerThreadPids() { + return workerThreadPids; + } + + public IStormClusterState getStormClusterState() { + return stormClusterState; + } + + public LocalState getLocalState() { + return localState; + } + + public String getSupervisorId() { + return supervisorId; + } + + public String getAssignmentId() { + return assignmentId; + } + + public String getHostName() { + return hostName; + } + + public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() { + return currAssignment; + } + + public void setCurrAssignment(Map<Long, LocalAssignment> currAssignment) { + this.currAssignment.set(currAssignment); + } + + public StormTimer getHeartbeatTimer() { + return heartbeatTimer; + } + + public StormTimer getEventTimer() { + return eventTimer; + } + + public StormTimer getBlobUpdateTimer() { + return blobUpdateTimer; + } + + public Localizer getLocalizer() { + return localizer; + } + + public void setLocalizer(Localizer localizer) { + this.localizer = localizer; + } + + public AtomicInteger getSyncRetry() { + return syncRetry; + } + + public AtomicReference<Map<String, Map<String, Object>>> getAssignmentVersions() { + return assignmentVersions; + } + + public void setAssignmentVersions(Map<String, Map<String, Object>> assignmentVersions) { + this.assignmentVersions.set(assignmentVersions); + } + + public CgroupManager getResourceIsolationManager() { + return resourceIsolationManager; + } + + public ConcurrentHashSet getDeadWorkers() { + return deadWorkers; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/65ce9d2e/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java index 6578529,0000000..26f0aae mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java @@@ -1,100 -1,0 +1,97 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.event.EventManager; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Map; + +public class SupervisorManger implements SupervisorDaemon, DaemonCommon, Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(SupervisorManger.class); - + private final EventManager eventManager; - + private final EventManager processesEventManager; - + private SupervisorData supervisorData; + + public SupervisorManger(SupervisorData supervisorData, EventManager eventManager, EventManager processesEventManager) { + this.eventManager = eventManager; + this.supervisorData = supervisorData; + this.processesEventManager = processesEventManager; + } + + public void shutdown() { + LOG.info("Shutting down supervisor{}", supervisorData.getSupervisorId()); + supervisorData.setActive(false); + try { + supervisorData.getHeartbeatTimer().close(); + supervisorData.getEventTimer().close(); + supervisorData.getBlobUpdateTimer().close(); + eventManager.close(); + processesEventManager.close(); + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } + supervisorData.getStormClusterState().disconnect(); + } + + @Override + public void shutdownAllWorkers() { + + Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(supervisorData.getConf()); + try { + for (String workerId : workerIds) { + SupervisorUtils.shutWorker(supervisorData, workerId); + } + } catch (Exception e) { + LOG.error("shutWorker failed"); + throw Utils.wrapInRuntime(e); + } + } + + @Override + public Map getConf() { + return supervisorData.getConf(); + } + + @Override + public String getId() { + return supervisorData.getSupervisorId(); + } + + @Override + public boolean isWaiting() { + if (!supervisorData.isActive()) { + return true; + } + + if (supervisorData.getHeartbeatTimer().isTimerWaiting() && supervisorData.getEventTimer().isTimerWaiting() && eventManager.waiting() + && processesEventManager.waiting()) { + return true; + } + return false; + } + + public void run() { + shutdown(); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/65ce9d2e/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java index dd2a538,0000000..ae3422e mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java @@@ -1,356 -1,0 +1,353 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.lang.StringUtils; - import org.apache.curator.utils.PathUtils; +import org.apache.storm.Config; +import org.apache.storm.ProcessSimulator; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URLDecoder; +import java.util.*; + +public class SupervisorUtils { + + private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class); + + private static final SupervisorUtils INSTANCE = new SupervisorUtils(); + private static SupervisorUtils _instance = INSTANCE; - + public static void setInstance(SupervisorUtils u) { + _instance = u; + } - + public static void resetInstance() { + _instance = INSTANCE; + } + + public static Process workerLauncher(Map conf, String user, List<String> args, Map<String, String> environment, final String logPreFix, + final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException { + if (StringUtils.isBlank(user)) { + throw new IllegalArgumentException("User cannot be blank when calling workerLauncher."); + } + String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER)); + String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); + String wl; + if (StringUtils.isNotBlank(wlinitial)) { + wl = wlinitial; + } else { + wl = stormHome + "/bin/worker-launcher"; + } + List<String> commands = new ArrayList<>(); + commands.add(wl); + commands.add(user); + commands.addAll(args); + LOG.info("Running as user: {} command: {}", user, commands); + return Utils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir); + } + + public static int workerLauncherAndWait(Map conf, String user, List<String> args, final Map<String, String> environment, final String logPreFix) + throws IOException { + int ret = 0; + Process process = workerLauncher(conf, user, args, environment, logPreFix, null, null); + if (StringUtils.isNotBlank(logPreFix)) + Utils.readAndLogStream(logPreFix, process.getInputStream()); + try { + process.waitFor(); + } catch (InterruptedException e) { + LOG.info("{} interrupted.", logPreFix); + } + ret = process.exitValue(); + return ret; + } + + public static void setupStormCodeDir(Map conf, Map stormConf, String dir) throws IOException { + if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { + String logPrefix = "setup conf for " + dir; + List<String> commands = new ArrayList<>(); + commands.add("code-dir"); + commands.add(dir); + workerLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix); + } + } + + public static void rmrAsUser(Map conf, String id, String path) throws IOException { + String user = Utils.getFileOwner(path); + String logPreFix = "rmr " + id; + List<String> commands = new ArrayList<>(); + commands.add("rmr"); + commands.add(path); + SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPreFix); + if (Utils.checkFileExists(path)) { + throw new RuntimeException(path + " was not deleted."); + } + } + + /** + * Given the blob information returns the value of the uncompress field, handling it either being a string or a boolean value, or if it's not specified then + * returns false + * + * @param blobInfo + * @return + */ + public static Boolean shouldUncompressBlob(Map<String, Object> blobInfo) { + return new Boolean((String) blobInfo.get("uncompress")); + } + + /** + * Remove a reference to a blob when its no longer needed + * + * @param blobstoreMap + * @return + */ + public static List<LocalResource> blobstoreMapToLocalresources(Map<String, Map<String, Object>> blobstoreMap) { + List<LocalResource> localResourceList = new ArrayList<>(); + if (blobstoreMap != null) { + for (Map.Entry<String, Map<String, Object>> map : blobstoreMap.entrySet()) { + LocalResource localResource = new LocalResource(map.getKey(), shouldUncompressBlob(map.getValue())); + localResourceList.add(localResource); + } + } + return localResourceList; + } + + /** + * For each of the downloaded topologies, adds references to the blobs that the topologies are using. This is used to reconstruct the cache on restart. + * + * @param localizer + * @param stormId + * @param conf + */ + public static void addBlobReferences(Localizer localizer, String stormId, Map conf) throws IOException { + Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); + Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); + String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); + String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME); + List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap); + if (blobstoreMap != null) { + localizer.addReferences(localresources, user, topoName); + } + } + + public static Set<String> readDownLoadedStormIds(Map conf) throws IOException { + Set<String> stormIds = new HashSet<>(); + String path = ConfigUtils.supervisorStormDistRoot(conf); + Collection<String> rets = Utils.readDirContents(path); + for (String ret : rets) { + stormIds.add(URLDecoder.decode(ret)); + } + return stormIds; + } + + public static Collection<String> supervisorWorkerIds(Map conf) { + String workerRoot = ConfigUtils.workerRoot(conf); + return Utils.readDirContents(workerRoot); + } + + public static boolean doRequiredTopoFilesExist(Map conf, String stormId) throws IOException { + String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId); + String stormjarpath = ConfigUtils.supervisorStormJarPath(stormroot); + String stormcodepath = ConfigUtils.supervisorStormCodePath(stormroot); + String stormconfpath = ConfigUtils.supervisorStormConfPath(stormroot); + if (!Utils.checkFileExists(stormroot)) + return false; + if (!Utils.checkFileExists(stormcodepath)) + return false; + if (!Utils.checkFileExists(stormconfpath)) + return false; + if (ConfigUtils.isLocalMode(conf) || Utils.checkFileExists(stormjarpath)) + return true; + return false; + } + + /** + * Returns map from worr id to heartbeat + * + * @param conf + * @return + * @throws Exception + */ + public static Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map conf) throws Exception { + return _instance.readWorkerHeartbeatsImpl(conf); + } + + public Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map conf) throws Exception { + Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>(); + + Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf); + + for (String workerId : workerIds) { + LSWorkerHeartbeat whb = readWorkerHeartbeat(conf, workerId); + // ATTENTION: whb can be null + workerHeartbeats.put(workerId, whb); + } + return workerHeartbeats; + } + + + /** + * get worker heartbeat by workerId + * + * @param conf + * @param workerId + * @return + * @throws IOException + */ + public static LSWorkerHeartbeat readWorkerHeartbeat(Map conf, String workerId) { + return _instance.readWorkerHeartbeatImpl(conf, workerId); + } + + public LSWorkerHeartbeat readWorkerHeartbeatImpl(Map conf, String workerId) { + try { + LocalState localState = ConfigUtils.workerState(conf, workerId); + return localState.getWorkerHeartBeat(); + } catch (Exception e) { + LOG.warn("Failed to read local heartbeat for workerId : {},Ignoring exception.", workerId, e); + return null; + } + } + + public static boolean isWorkerHbTimedOut(int now, LSWorkerHeartbeat whb, Map conf) { + return _instance.isWorkerHbTimedOutImpl(now, whb, conf); + } + + public boolean isWorkerHbTimedOutImpl(int now, LSWorkerHeartbeat whb, Map conf) { + boolean result = false; + if ((now - whb.get_time_secs()) > Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) { + result = true; + } + return result; + } + + public static String javaCmd(String cmd) { + return _instance.javaCmdImpl(cmd); + } + + public String javaCmdImpl(String cmd) { + String ret = null; + String javaHome = System.getenv().get("JAVA_HOME"); + if (StringUtils.isNotBlank(javaHome)) { + ret = javaHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR + cmd; + } else { + ret = cmd; + } + return ret; + } + + public final static List<ACL> supervisorZkAcls() { + final List<ACL> acls = new ArrayList<>(); + acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0)); + acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE)); + return acls; + } + + public static void shutWorker(SupervisorData supervisorData, String workerId) throws IOException, InterruptedException { + LOG.info("Shutting down {}:{}", supervisorData.getSupervisorId(), workerId); + Map conf = supervisorData.getConf(); + Collection<String> pids = Utils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId)); + Integer shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)); + Boolean asUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false); + String user = ConfigUtils.getWorkerUser(conf, workerId); + String threadPid = supervisorData.getWorkerThreadPids().get(workerId); + if (StringUtils.isNotBlank(threadPid)) { + ProcessSimulator.killProcess(threadPid); + } + + for (String pid : pids) { + if (asUser) { + List<String> commands = new ArrayList<>(); + commands.add("signal"); + commands.add(pid); + commands.add("15"); + String logPrefix = "kill -15 " + pid; + SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix); + } else { + Utils.killProcessWithSigTerm(pid); + } + } + + if (pids.size() > 0) { + LOG.info("Sleep {} seconds for execution of cleanup threads on worker.", shutdownSleepSecs); + Time.sleepSecs(shutdownSleepSecs); + } + + for (String pid : pids) { + if (asUser) { + List<String> commands = new ArrayList<>(); + commands.add("signal"); + commands.add(pid); + commands.add("9"); + String logPrefix = "kill -9 " + pid; + SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix); + } else { + Utils.forceKillProcess(pid); + } + String path = ConfigUtils.workerPidPath(conf, workerId, pid); + if (asUser) { + SupervisorUtils.rmrAsUser(conf, workerId, path); + } else { + try { + LOG.debug("Removing path {}", path); + new File(path).delete(); + } catch (Exception e) { + // on windows, the supervisor may still holds the lock on the worker directory + // ignore + } + } + } + tryCleanupWorker(conf, supervisorData, workerId); + LOG.info("Shut down {}:{}", supervisorData.getSupervisorId(), workerId); + + } + + public static void tryCleanupWorker(Map conf, SupervisorData supervisorData, String workerId) { + try { + String workerRoot = ConfigUtils.workerRoot(conf, workerId); + if (Utils.checkFileExists(workerRoot)) { + if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { + SupervisorUtils.rmrAsUser(conf, workerId, workerRoot); + } else { + Utils.forceDelete(ConfigUtils.workerHeartbeatsRoot(conf, workerId)); + Utils.forceDelete(ConfigUtils.workerPidsRoot(conf, workerId)); + Utils.forceDelete(ConfigUtils.workerTmpRoot(conf, workerId)); + Utils.forceDelete(ConfigUtils.workerRoot(conf, workerId)); + } + ConfigUtils.removeWorkerUserWSE(conf, workerId); + supervisorData.getDeadWorkers().remove(workerId); + } + if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)){ + supervisorData.getResourceIsolationManager().releaseResourcesForWorker(workerId); + } + } catch (IOException e) { + LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e); + } catch (RuntimeException e) { + LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e); + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/65ce9d2e/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java index cf26896,0000000..068c442 mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java @@@ -1,674 -1,0 +1,672 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.*; + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers + * launch + */ +public class SyncProcessEvent implements Runnable { + + private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); + + private LocalState localState; - - private SupervisorData supervisorData; - ++ private SupervisorData supervisorData; + public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1); + + private class ProcessExitCallback implements Utils.ExitCodeCallable { + private final String logPrefix; + private final String workerId; + + public ProcessExitCallback(String logPrefix, String workerId) { + this.logPrefix = logPrefix; + this.workerId = workerId; + } + + @Override + public Object call() throws Exception { + return null; + } + + @Override + public Object call(int exitCode) { + LOG.info("{} exited with code: {}", logPrefix, exitCode); + supervisorData.getDeadWorkers().add(workerId); + return null; + } + } + + public SyncProcessEvent(){ + + } + public SyncProcessEvent(SupervisorData supervisorData) { + init(supervisorData); + } + + //TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java + public void init(SupervisorData supervisorData){ + this.supervisorData = supervisorData; + this.localState = supervisorData.getLocalState(); + } + + + /** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - + * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new + * worker ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait + * for workers launch + */ + @Override + public void run() { + LOG.debug("Syncing processes"); + try { + Map conf = supervisorData.getConf(); + Map<Integer, LocalAssignment> assignedExecutors = localState.getLocalAssignmentsMap(); + + if (assignedExecutors == null) { + assignedExecutors = new HashMap<>(); + } + int now = Time.currentTimeSecs(); + + Map<String, StateHeartbeat> localWorkerStats = getLocalWorkerStats(supervisorData, assignedExecutors, now); + + Set<String> keeperWorkerIds = new HashSet<>(); + Set<Integer> keepPorts = new HashSet<>(); + for (Map.Entry<String, StateHeartbeat> entry : localWorkerStats.entrySet()) { + StateHeartbeat stateHeartbeat = entry.getValue(); + if (stateHeartbeat.getState() == State.VALID) { + keeperWorkerIds.add(entry.getKey()); + keepPorts.add(stateHeartbeat.getHeartbeat().get_port()); + } + } + Map<Integer, LocalAssignment> reassignExecutors = getReassignExecutors(assignedExecutors, keepPorts); + Map<Integer, String> newWorkerIds = new HashMap<>(); + for (Integer port : reassignExecutors.keySet()) { + newWorkerIds.put(port, Utils.uuid()); + } + LOG.debug("Syncing processes"); + LOG.debug("Assigned executors: {}", assignedExecutors); + LOG.debug("Allocated: {}", localWorkerStats); + + for (Map.Entry<String, StateHeartbeat> entry : localWorkerStats.entrySet()) { + StateHeartbeat stateHeartbeat = entry.getValue(); + if (stateHeartbeat.getState() != State.VALID) { + LOG.info("Shutting down and clearing state for id {}, Current supervisor time: {}, State: {}, Heartbeat: {}", entry.getKey(), now, + stateHeartbeat.getState(), stateHeartbeat.getHeartbeat()); + shutWorker(supervisorData, entry.getKey()); + } + } + // start new workers + Map<String, Integer> newWorkerPortToIds = startNewWorkers(newWorkerIds, reassignExecutors); + + Map<String, Integer> allWorkerPortToIds = new HashMap<>(); + Map<String, Integer> approvedWorkers = localState.getApprovedWorkers(); + for (String keeper : keeperWorkerIds) { + allWorkerPortToIds.put(keeper, approvedWorkers.get(keeper)); + } + allWorkerPortToIds.putAll(newWorkerPortToIds); + localState.setApprovedWorkers(allWorkerPortToIds); + waitForWorkersLaunch(conf, newWorkerPortToIds.keySet()); + + } catch (Exception e) { + LOG.error("Failed Sync Process", e); + throw Utils.wrapInRuntime(e); + } + + } + + protected void waitForWorkersLaunch(Map conf, Set<String> workerIds) throws Exception { + int startTime = Time.currentTimeSecs(); + int timeOut = (int) conf.get(Config.NIMBUS_SUPERVISOR_TIMEOUT_SECS); + for (String workerId : workerIds) { + LocalState localState = ConfigUtils.workerState(conf, workerId); + while (true) { + LSWorkerHeartbeat hb = localState.getWorkerHeartBeat(); + if (hb != null || (Time.currentTimeSecs() - startTime) > timeOut) + break; + LOG.info("{} still hasn't started", workerId); + Time.sleep(500); + } + if (localState.getWorkerHeartBeat() == null) { + LOG.info("Worker {} failed to start", workerId); + } + } + } + + protected Map<Integer, LocalAssignment> getReassignExecutors(Map<Integer, LocalAssignment> assignExecutors, Set<Integer> keepPorts) { + Map<Integer, LocalAssignment> reassignExecutors = new HashMap<>(); + reassignExecutors.putAll(assignExecutors); + for (Integer port : keepPorts) { + reassignExecutors.remove(port); + } + return reassignExecutors; + } + + /** + * Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead + * + * @param assignedExecutors + * @return + * @throws Exception + */ + public Map<String, StateHeartbeat> getLocalWorkerStats(SupervisorData supervisorData, Map<Integer, LocalAssignment> assignedExecutors, int now) throws Exception { + Map<String, StateHeartbeat> workerIdHbstate = new HashMap<>(); + Map conf = supervisorData.getConf(); + LocalState localState = supervisorData.getLocalState(); + Map<String, LSWorkerHeartbeat> idToHeartbeat = SupervisorUtils.readWorkerHeartbeats(conf); + Map<String, Integer> approvedWorkers = localState.getApprovedWorkers(); + Set<String> approvedIds = new HashSet<>(); + if (approvedWorkers != null) { + approvedIds.addAll(approvedWorkers.keySet()); + } + for (Map.Entry<String, LSWorkerHeartbeat> entry : idToHeartbeat.entrySet()) { + String workerId = entry.getKey(); + LSWorkerHeartbeat whb = entry.getValue(); + State state; + if (whb == null) { + state = State.NOT_STARTED; + } else if (!approvedIds.contains(workerId) || !matchesAssignment(whb, assignedExecutors)) { + state = State.DISALLOWED; + } else if (supervisorData.getDeadWorkers().contains(workerId)) { + LOG.info("Worker Process {} has died", workerId); + state = State.TIMED_OUT; + } else if (SupervisorUtils.isWorkerHbTimedOut(now, whb, conf)) { + state = State.TIMED_OUT; + } else { + state = State.VALID; + } + LOG.debug("Worker:{} state:{} WorkerHeartbeat:{} at supervisor time-secs {}", workerId, state, whb, now); + workerIdHbstate.put(workerId, new StateHeartbeat(state, whb)); + } + return workerIdHbstate; + } + + protected boolean matchesAssignment(LSWorkerHeartbeat whb, Map<Integer, LocalAssignment> assignedExecutors) { + LocalAssignment localAssignment = assignedExecutors.get(whb.get_port()); + if (localAssignment == null || !localAssignment.get_topology_id().equals(whb.get_topology_id())) { + return false; + } + List<ExecutorInfo> executorInfos = new ArrayList<>(); + executorInfos.addAll(whb.get_executors()); + // remove SYSTEM_EXECUTOR_ID + executorInfos.remove(SYSTEM_EXECUTOR_INFO); + List<ExecutorInfo> localExecuorInfos = localAssignment.get_executors(); + + if (localExecuorInfos.size() != executorInfos.size()) + return false; + + for (ExecutorInfo executorInfo : localExecuorInfos){ + if (!localExecuorInfos.contains(executorInfo)) + return false; + } + return true; + } + + /** + * launch a worker in local mode. + */ + protected void launchWorker(SupervisorData supervisorData, String stormId, Long port, String workerId, WorkerResources resources) throws IOException { + // port this function after porting worker to java + } + + protected String getWorkerClassPath(String stormJar, Map stormConf) { + List<String> topoClasspath = new ArrayList<>(); + Object object = stormConf.get(Config.TOPOLOGY_CLASSPATH); + + if (object instanceof List) { + topoClasspath.addAll((List<String>) object); + } else if (object instanceof String){ + topoClasspath.add((String)object); + }else { + //ignore + } + String classPath = Utils.workerClasspath(); + String classAddPath = Utils.addToClasspath(classPath, Arrays.asList(stormJar)); + return Utils.addToClasspath(classAddPath, topoClasspath); + } + + /** + * "Generates runtime childopts by replacing keys with topology-id, worker-id, port, mem-onheap" + * + * @param value + * @param workerId + * @param stormId + * @param port + * @param memOnheap + */ + public List<String> substituteChildopts(Object value, String workerId, String stormId, Long port, int memOnheap) { + List<String> rets = new ArrayList<>(); + if (value instanceof String) { + String string = (String) value; + string = string.replace("%ID%", String.valueOf(port)); + string = string.replace("%WORKER-ID%", workerId); + string = string.replace("%TOPOLOGY-ID%", stormId); + string = string.replace("%WORKER-PORT%", String.valueOf(port)); + string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap)); + String[] strings = string.split("\\s+"); + rets.addAll(Arrays.asList(strings)); + } else if (value instanceof List) { + List<Object> objects = (List<Object>) value; + for (Object object : objects) { + String str = (String)object; + str = str.replace("%ID%", String.valueOf(port)); + str = str.replace("%WORKER-ID%", workerId); + str = str.replace("%TOPOLOGY-ID%", stormId); + str = str.replace("%WORKER-PORT%", String.valueOf(port)); + str = str.replace("%HEAP-MEM%", String.valueOf(memOnheap)); + rets.add(str); + } + } + return rets; + } + + + + /** + * launch a worker in distributed mode + * supervisorId for testing + * @throws IOException + */ + protected void launchWorker(Map conf, String supervisorId, String assignmentId, String stormId, Long port, String workerId, + WorkerResources resources, CgroupManager cgroupManager, ConcurrentHashSet deadWorkers) throws IOException { + + Boolean runWorkerAsUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false); + String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); + String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options")); + String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file")); + String workerTmpDir = ConfigUtils.workerTmpRoot(conf, workerId); + + String stormLogDir = ConfigUtils.getLogDir(); + String stormLogConfDir = (String) (conf.get(Config.STORM_LOG4J2_CONF_DIR)); + + String stormLog4j2ConfDir; + if (StringUtils.isNotBlank(stormLogConfDir)) { + if (Utils.isAbsolutePath(stormLogConfDir)) { + stormLog4j2ConfDir = stormLogConfDir; + } else { + stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + stormLogConfDir; + } + } else { + stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2"; + } + + String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId); + + String jlp = jlp(stormRoot, conf); + + String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot); + + Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); + + String workerClassPath = getWorkerClassPath(stormJar, stormConf); + + Object topGcOptsObject = stormConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS); + List<String> topGcOpts = new ArrayList<>(); + if (topGcOptsObject instanceof String) { + topGcOpts.add((String) topGcOptsObject); + } else if (topGcOptsObject instanceof List) { + topGcOpts.addAll((List<String>) topGcOptsObject); + } + + int memOnheap = 0; + if (resources.get_mem_on_heap() > 0) { + memOnheap = (int) Math.ceil(resources.get_mem_on_heap()); + } else { + //set the default heap memory size for supervisor-test + memOnheap = Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB), 768); + } + + int memoffheap = (int) Math.ceil(resources.get_mem_off_heap()); + + int cpu = (int) Math.ceil(resources.get_cpu()); + + List<String> gcOpts = null; + + if (topGcOpts != null) { + gcOpts = substituteChildopts(topGcOpts, workerId, stormId, port, memOnheap); + } else { + gcOpts = substituteChildopts(conf.get(Config.WORKER_GC_CHILDOPTS), workerId, stormId, port, memOnheap); + } + + Object topoWorkerLogwriterObject = stormConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS); + List<String> topoWorkerLogwriterChildopts = new ArrayList<>(); + if (topoWorkerLogwriterObject instanceof String) { + topoWorkerLogwriterChildopts.add((String) topoWorkerLogwriterObject); + } else if (topoWorkerLogwriterObject instanceof List) { + topoWorkerLogwriterChildopts.addAll((List<String>) topoWorkerLogwriterObject); + } + + String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); + + String logfileName = "worker.log"; + + String workersArtifacets = ConfigUtils.workerArtifactsRoot(conf); + + String loggingSensitivity = (String) stormConf.get(Config.TOPOLOGY_LOGGING_SENSITIVITY); + if (loggingSensitivity == null) { + loggingSensitivity = "S3"; + } + + List<String> workerChildopts = substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), workerId, stormId, port, memOnheap); + + List<String> topWorkerChildopts = substituteChildopts(stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), workerId, stormId, port, memOnheap); + + List<String> workerProfilerChildopts = null; + if (Utils.getBoolean(conf.get(Config.WORKER_PROFILER_ENABLED), false)) { + workerProfilerChildopts = substituteChildopts(conf.get(Config.WORKER_PROFILER_CHILDOPTS), workerId, stormId, port, memOnheap); + }else { + workerProfilerChildopts = new ArrayList<>(); + } + + Map<String, String> topEnvironment = new HashMap<String, String>(); + Map<String, String> environment = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT); + if (environment != null) { + topEnvironment.putAll(environment); + } + topEnvironment.put("LD_LIBRARY_PATH", jlp); + + String log4jConfigurationFile = null; + if (System.getProperty("os.name").startsWith("Windows") && !stormLog4j2ConfDir.startsWith("file:")) { + log4jConfigurationFile = "file:///" + stormLog4j2ConfDir; + } else { + log4jConfigurationFile = stormLog4j2ConfDir; + } + log4jConfigurationFile = log4jConfigurationFile + Utils.FILE_PATH_SEPARATOR + "worker.xml"; + + List<String> commandList = new ArrayList<>(); + commandList.add(SupervisorUtils.javaCmd("java")); + commandList.add("-cp"); + commandList.add(workerClassPath); + commandList.addAll(topoWorkerLogwriterChildopts); + commandList.add("-Dlogfile.name=" + logfileName); + commandList.add("-Dstorm.home=" + stormHome); + commandList.add("-Dworkers.artifacts=" + workersArtifacets); + commandList.add("-Dstorm.id=" + stormId); + commandList.add("-Dworker.id=" + workerId); + commandList.add("-Dworker.port=" + port); + commandList.add("-Dstorm.log.dir=" + stormLogDir); + commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile); + commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"); + commandList.add("org.apache.storm.LogWriter"); + + commandList.add(SupervisorUtils.javaCmd("java")); + commandList.add("-server"); + commandList.addAll(workerChildopts); + commandList.addAll(topWorkerChildopts); + commandList.addAll(gcOpts); + commandList.addAll(workerProfilerChildopts); + commandList.add("-Djava.library.path=" + jlp); + commandList.add("-Dlogfile.name=" + logfileName); + commandList.add("-Dstorm.home=" + stormHome); + commandList.add("-Dworkers.artifacts=" + workersArtifacets); + commandList.add("-Dstorm.conf.file=" + stormConfFile); + commandList.add("-Dstorm.options=" + stormOptions); + commandList.add("-Dstorm.log.dir=" + stormLogDir); + commandList.add("-Djava.io.tmpdir=" + workerTmpDir); + commandList.add("-Dlogging.sensitivity=" + loggingSensitivity); + commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile); + commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"); + commandList.add("-Dstorm.id=" + stormId); + commandList.add("-Dworker.id=" + workerId); + commandList.add("-Dworker.port=" + port); + commandList.add("-cp"); + commandList.add(workerClassPath); + commandList.add("org.apache.storm.daemon.worker"); + commandList.add(stormId); + commandList.add(assignmentId); + commandList.add(String.valueOf(port)); + commandList.add(workerId); + + // {"cpu" cpu "memory" (+ mem-onheap mem-offheap (int (Math/ceil (conf STORM-CGROUP-MEMORY-LIMIT-TOLERANCE-MARGIN-MB)))) + if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) { + int cgRoupMem = (int) (Math.ceil((double) conf.get(Config.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB))); + int memoryValue = memoffheap + memOnheap + cgRoupMem; + int cpuValue = cpu; + Map<String, Number> map = new HashMap<>(); + map.put("cpu", cpuValue); + map.put("memory", memoryValue); + cgroupManager.reserveResourcesForWorker(workerId, map); + commandList = cgroupManager.getLaunchCommand(workerId, commandList); + } + + LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList)); + writeLogMetadata(stormConf, user, workerId, stormId, port, conf); + ConfigUtils.setWorkerUserWSE(conf, workerId, user); + createArtifactsLink(conf, stormId, port, workerId); + + String logPrefix = "Worker Process " + workerId; + String workerDir = ConfigUtils.workerRoot(conf, workerId); + + if (deadWorkers != null) + deadWorkers.remove(workerId); + createBlobstoreLinks(conf, stormId, workerId); + + ProcessExitCallback processExitCallback = new ProcessExitCallback(logPrefix, workerId); + if (runWorkerAsUser) { + List<String> args = new ArrayList<>(); + args.add("worker"); + args.add(workerDir); + args.add(Utils.writeScript(workerDir, commandList, topEnvironment)); + SupervisorUtils.workerLauncher(conf, user, args, null, logPrefix, processExitCallback, new File(workerDir)); + } else { + Utils.launchProcess(commandList, topEnvironment, logPrefix, processExitCallback, new File(workerDir)); + } + } + + protected String jlp(String stormRoot, Map conf) { + String resourceRoot = stormRoot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR; + String os = System.getProperty("os.name").replaceAll("\\s+", "_"); + String arch = System.getProperty("os.arch"); + String archResourceRoot = resourceRoot + Utils.FILE_PATH_SEPARATOR + os + "-" + arch; + String ret = archResourceRoot + Utils.FILE_PATH_SEPARATOR + resourceRoot + Utils.FILE_PATH_SEPARATOR + conf.get(Config.JAVA_LIBRARY_PATH); + return ret; + } + + protected Map<String, Integer> startNewWorkers(Map<Integer, String> newWorkerIds, Map<Integer, LocalAssignment> reassignExecutors) throws IOException { + + Map<String, Integer> newValidWorkerIds = new HashMap<>(); + Map conf = supervisorData.getConf(); + String supervisorId = supervisorData.getSupervisorId(); + String clusterMode = ConfigUtils.clusterMode(conf); + + for (Map.Entry<Integer, LocalAssignment> entry : reassignExecutors.entrySet()) { + Integer port = entry.getKey(); + LocalAssignment assignment = entry.getValue(); + String workerId = newWorkerIds.get(port); + String stormId = assignment.get_topology_id(); + WorkerResources resources = assignment.get_resources(); + + // This condition checks for required files exist before launching the worker + if (SupervisorUtils.doRequiredTopoFilesExist(conf, stormId)) { + String pidsPath = ConfigUtils.workerPidsRoot(conf, workerId); + String hbPath = ConfigUtils.workerHeartbeatsRoot(conf, workerId); + + LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", assignment, supervisorData.getSupervisorId(), port, + workerId); + + FileUtils.forceMkdir(new File(pidsPath)); + FileUtils.forceMkdir(new File(ConfigUtils.workerTmpRoot(conf, workerId))); + FileUtils.forceMkdir(new File(hbPath)); + + if (clusterMode.endsWith("distributed")) { + launchWorker(conf, supervisorId, supervisorData.getAssignmentId(), stormId, port.longValue(), workerId, resources, + supervisorData.getResourceIsolationManager(), supervisorData.getDeadWorkers()); + } else if (clusterMode.endsWith("local")) { + launchWorker(supervisorData, stormId, port.longValue(), workerId, resources); + } + newValidWorkerIds.put(workerId, port); + + } else { + LOG.info("Missing topology storm code, so can't launch worker with assignment {} for this supervisor {} on port {} with id {}", assignment, + supervisorData.getSupervisorId(), port, workerId); + } + + } + return newValidWorkerIds; + } + + public void writeLogMetadata(Map stormconf, String user, String workerId, String stormId, Long port, Map conf) throws IOException { + Map data = new HashMap(); + data.put(Config.TOPOLOGY_SUBMITTER_USER, user); + data.put("worker-id", workerId); + + Set<String> logsGroups = new HashSet<>(); + //for supervisor-test + if (stormconf.get(Config.LOGS_GROUPS) != null) { + List<String> groups = (List<String>) stormconf.get(Config.LOGS_GROUPS); + for (String group : groups){ + logsGroups.add(group); + } + } + if (stormconf.get(Config.TOPOLOGY_GROUPS) != null) { + List<String> topGroups = (List<String>) stormconf.get(Config.TOPOLOGY_GROUPS); + for (String group : topGroups){ + logsGroups.add(group); + } + } + data.put(Config.LOGS_GROUPS, logsGroups.toArray()); + + Set<String> logsUsers = new HashSet<>(); + if (stormconf.get(Config.LOGS_USERS) != null) { + List<String> logUsers = (List<String>) stormconf.get(Config.LOGS_USERS); + for (String logUser : logUsers){ + logsUsers.add(logUser); + } + } + if (stormconf.get(Config.TOPOLOGY_USERS) != null) { + List<String> topUsers = (List<String>) stormconf.get(Config.TOPOLOGY_USERS); + for (String logUser : topUsers){ + logsUsers.add(logUser); + } + } + data.put(Config.LOGS_USERS, logsUsers.toArray()); + writeLogMetadataToYamlFile(stormId, port, data, conf); + } + + /** + * run worker as user needs the directory to have special permissions or it is insecure + * + * @param stormId + * @param port + * @param data + * @param conf + * @throws IOException + */ + public void writeLogMetadataToYamlFile(String stormId, Long port, Map data, Map conf) throws IOException { + File file = ConfigUtils.getLogMetaDataFile(conf, stormId, port.intValue()); + + if (!Utils.checkFileExists(file.getParent())) { + if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { + FileUtils.forceMkdir(file.getParentFile()); + SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), file.getParentFile().getCanonicalPath()); + } else { + file.getParentFile().mkdirs(); + } + } + FileWriter writer = new FileWriter(file); + Yaml yaml = new Yaml(); + try { + yaml.dump(data, writer); + }finally { + writer.close(); + } + + } + + /** + * Create a symlink from workder directory to its port artifacts directory + * + * @param conf + * @param stormId + * @param port + * @param workerId + */ + protected void createArtifactsLink(Map conf, String stormId, Long port, String workerId) throws IOException { + String workerDir = ConfigUtils.workerRoot(conf, workerId); + String topoDir = ConfigUtils.workerArtifactsRoot(conf, stormId); + if (Utils.checkFileExists(workerDir)) { + Utils.createSymlink(workerDir, topoDir, "artifacts", String.valueOf(port)); + } + } + + /** + * Create symlinks in worker launch directory for all blobs + * + * @param conf + * @param stormId + * @param workerId + * @throws IOException + */ + protected void createBlobstoreLinks(Map conf, String stormId, String workerId) throws IOException { + String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId); + Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); + String workerRoot = ConfigUtils.workerRoot(conf, workerId); + Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); + List<String> blobFileNames = new ArrayList<>(); + if (blobstoreMap != null) { + for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) { + String key = entry.getKey(); + Map<String, Object> blobInfo = entry.getValue(); + String ret = null; + if (blobInfo != null && blobInfo.containsKey("localname")) { + ret = (String) blobInfo.get("localname"); + } else { + ret = key; + } + blobFileNames.add(ret); + } + } + List<String> resourceFileNames = new ArrayList<>(); + resourceFileNames.add(ConfigUtils.RESOURCES_SUBDIR); + resourceFileNames.addAll(blobFileNames); + LOG.info("Creating symlinks for worker-id: {} storm-id: {} for files({}): {}", workerId, stormId, resourceFileNames.size(), resourceFileNames); + Utils.createSymlink(workerRoot, stormRoot, ConfigUtils.RESOURCES_SUBDIR); + for (String fileName : blobFileNames) { + Utils.createSymlink(workerRoot, stormRoot, fileName, fileName); + } + } + + //for supervisor-test + public void shutWorker(SupervisorData supervisorData, String workerId) throws IOException, InterruptedException{ + SupervisorUtils.shutWorker(supervisorData, workerId); + } +}
