http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java new file mode 100644 index 0000000..9df7ec1 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.MetricRegistry; +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.StormTimer; +import org.apache.storm.daemon.metrics.MetricsUtils; +import org.apache.storm.daemon.metrics.reporters.PreparableReporter; +import org.apache.storm.daemon.supervisor.timer.RunProfilerActions; +import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck; +import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat; +import org.apache.storm.daemon.supervisor.timer.UpdateBlobs; +import org.apache.storm.event.EventManagerImp; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.messaging.IContext; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.VersionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.InterruptedIOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class Supervisor { + private static Logger LOG = LoggerFactory.getLogger(Supervisor.class); + + //TODO: to be removed after porting worker.clj. localSyncProcess is intended to start local supervisor + private SyncProcessEvent localSyncProcess; + + public void setLocalSyncProcess(SyncProcessEvent localSyncProcess) { + this.localSyncProcess = localSyncProcess; + } + + + /** + * in local state, supervisor stores who its current assignments are another thread launches events to restart any dead processes if necessary + * + * @param conf + * @param sharedContext + * @param iSupervisor + * @return + * @throws Exception + */ + public SupervisorManger mkSupervisor(final Map conf, IContext sharedContext, ISupervisor iSupervisor) throws Exception { + SupervisorManger supervisorManger = null; + try { + LOG.info("Starting Supervisor with conf {}", conf); + iSupervisor.prepare(conf, ConfigUtils.supervisorIsupervisorDir(conf)); + String path = ConfigUtils.supervisorTmpDir(conf); + FileUtils.cleanDirectory(new File(path)); + + final SupervisorData supervisorData = new SupervisorData(conf, sharedContext, iSupervisor); + Localizer localizer = supervisorData.getLocalizer(); + + SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, supervisorData); + hb.run(); + // should synchronize supervisor so it doesn't launch anything after being down (optimization) + Integer heartbeatFrequency = Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS)); + supervisorData.getHeartbeatTimer().scheduleRecurring(0, heartbeatFrequency, hb); + + Set<String> downdedStormId = SupervisorUtils.readDownLoadedStormIds(conf); + for (String stormId : downdedStormId) { + SupervisorUtils.addBlobReferences(localizer, stormId, conf); + } + // do this after adding the references so we don't try to clean things being used + localizer.startCleaner(); + + EventManagerImp syncSupEventManager = new EventManagerImp(false); + EventManagerImp syncProcessManager = new EventManagerImp(false); + + SyncProcessEvent syncProcessEvent = null; + if (ConfigUtils.isLocalMode(conf)){ + localSyncProcess.init(supervisorData); + syncProcessEvent = localSyncProcess; + }else{ + syncProcessEvent = new SyncProcessEvent(supervisorData); + } + + SyncSupervisorEvent syncSupervisorEvent = new SyncSupervisorEvent(supervisorData, syncProcessEvent, syncSupEventManager, syncProcessManager); + UpdateBlobs updateBlobsThread = new UpdateBlobs(supervisorData); + RunProfilerActions runProfilerActionThread = new RunProfilerActions(supervisorData); + + if ((Boolean) conf.get(Config.SUPERVISOR_ENABLE)) { + StormTimer eventTimer = supervisorData.getEventTimer(); + // This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up + // to date even if callbacks don't all work exactly right + eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(syncSupervisorEvent, syncSupEventManager)); + + eventTimer.scheduleRecurring(0, Utils.getInt(conf.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS)), + new EventManagerPushCallback(syncProcessEvent, syncProcessManager)); + + // Blob update thread. Starts with 30 seconds delay, every 30 seconds + supervisorData.getBlobUpdateTimer().scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, syncSupEventManager)); + + // supervisor health check + eventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(supervisorData)); + + // Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds + eventTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(runProfilerActionThread, syncSupEventManager)); + } + LOG.info("Starting supervisor with id {} at host {}.", supervisorData.getSupervisorId(), supervisorData.getHostName() ); + supervisorManger = new SupervisorManger(supervisorData, syncSupEventManager, syncProcessManager); + } catch (Throwable t) { + if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, t)) { + throw t; + } else if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, t)) { + throw t; + } else { + LOG.error("Error on initialization of server supervisor: {}", t); + Utils.exitProcess(13, "Error on initialization"); + } + } + return supervisorManger; + } + + /** + * start distribute supervisor + */ + private void distributeLaunch() { + LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion()); + SupervisorManger supervisorManager; + try { + Map<Object, Object> conf = Utils.readStormConfig(); + if (ConfigUtils.isLocalMode(conf)) { + throw new IllegalArgumentException("Cannot start server in local mode!"); + } + ISupervisor iSupervisor = new StandaloneSupervisor(); + supervisorManager = mkSupervisor(conf, null, iSupervisor); + if (supervisorManager != null) + Utils.addShutdownHookWithForceKillIn1Sec(supervisorManager); + registerWorkerNumGauge("drpc:num-execute-http-requests", conf); + startMetricsReporters(conf); + } catch (Exception e) { + LOG.error("Failed to start supervisor\n", e); + System.exit(1); + } + } + + // To be removed + private void registerWorkerNumGauge(String name, final Map conf) { + MetricRegistry metricRegistry = new MetricRegistry(); + metricRegistry.remove(name); + metricRegistry.register(name, new Gauge<Integer>() { + @Override + public Integer getValue() { + Collection<String> pids = SupervisorUtils.myWorkerIds(conf); + return pids.size(); + } + }); + } + + // To be removed + private void startMetricsReporters(Map conf) { + List<PreparableReporter> preparableReporters = MetricsUtils.getPreparableReporters(conf); + for (PreparableReporter reporter : preparableReporters) { + reporter.prepare(new MetricRegistry(), conf); + reporter.start(); + } + LOG.info("Started statistics report plugin..."); + } + + /** + * supervisor daemon enter entrance + * + * @param args + */ + public static void main(String[] args) { + Utils.setupDefaultUncaughtExceptionHandler(); + Supervisor instance = new Supervisor(); + instance.distributeLaunch(); + } +}
http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java index 9eec253..039fe30 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java @@ -105,10 +105,9 @@ public class SupervisorData { List<ACL> acls = null; if (Utils.isZkAuthenticationConfiguredStormServer(conf)) { - acls = new ArrayList<>(); - acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0)); - acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE)); + acls = SupervisorUtils.supervisorZkAcls(); } + try { this.stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.SUPERVISOR)); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java deleted file mode 100644 index fd31631..0000000 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java +++ /dev/null @@ -1,201 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.daemon.supervisor; - -import com.codahale.metrics.Gauge; -import com.codahale.metrics.MetricRegistry; -import org.apache.commons.io.FileUtils; -import org.apache.storm.Config; -import org.apache.storm.StormTimer; -import org.apache.storm.command.HealthCheck; -import org.apache.storm.daemon.metrics.MetricsUtils; -import org.apache.storm.daemon.metrics.reporters.PreparableReporter; -import org.apache.storm.daemon.supervisor.timer.RunProfilerActions; -import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck; -import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat; -import org.apache.storm.daemon.supervisor.timer.UpdateBlobs; -import org.apache.storm.event.EventManagerImp; -import org.apache.storm.localizer.Localizer; -import org.apache.storm.messaging.IContext; -import org.apache.storm.scheduler.ISupervisor; -import org.apache.storm.utils.ConfigUtils; -import org.apache.storm.utils.Utils; -import org.apache.storm.utils.VersionInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.InterruptedIOException; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class SupervisorServer { - private static Logger LOG = LoggerFactory.getLogger(SupervisorServer.class); - - /** - * in local state, supervisor stores who its current assignments are another thread launches events to restart any dead processes if necessary - * - * @param conf - * @param sharedContext - * @param iSupervisor - * @return - * @throws Exception - */ - private SupervisorManger mkSupervisor(final Map conf, IContext sharedContext, ISupervisor iSupervisor) throws Exception { - SupervisorManger supervisorManger = null; - try { - LOG.info("Starting Supervisor with conf {}", conf); - iSupervisor.prepare(conf, ConfigUtils.supervisorIsupervisorDir(conf)); - String path = ConfigUtils.supervisorTmpDir(conf); - FileUtils.cleanDirectory(new File(path)); - - final SupervisorData supervisorData = new SupervisorData(conf, sharedContext, iSupervisor); - Localizer localizer = supervisorData.getLocalizer(); - - SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, supervisorData); - hb.run(); - // should synchronize supervisor so it doesn't launch anything after being down (optimization) - Integer heartbeatFrequency = (Integer) conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS); - supervisorData.getHeartbeatTimer().scheduleRecurring(0, heartbeatFrequency, hb); - - Set<String> downdedStormId = SupervisorUtils.readDownLoadedStormIds(conf); - for (String stormId : downdedStormId) { - SupervisorUtils.addBlobReferences(localizer, stormId, conf); - } - // do this after adding the references so we don't try to clean things being used - localizer.startCleaner(); - - EventManagerImp syncSupEventManager = new EventManagerImp(false); - EventManagerImp syncProcessManager = new EventManagerImp(false); - SyncProcessEvent syncProcessEvent = new SyncProcessEvent(supervisorData); - SyncSupervisorEvent syncSupervisorEvent = new SyncSupervisorEvent(supervisorData, syncProcessEvent, syncSupEventManager, syncProcessManager); - UpdateBlobs updateBlobsThread = new UpdateBlobs(supervisorData); - RunProfilerActions runProfilerActionThread = new RunProfilerActions(supervisorData); - - if ((Boolean) conf.get(Config.SUPERVISOR_ENABLE)) { - StormTimer eventTimer = supervisorData.getEventTimer(); - // This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up - // to date even if callbacks don't all work exactly right - eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(syncSupervisorEvent, syncSupEventManager)); - - eventTimer.scheduleRecurring(0, (Integer) conf.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS), - new EventManagerPushCallback(syncProcessEvent, syncProcessManager)); - - // Blob update thread. Starts with 30 seconds delay, every 30 seconds - supervisorData.getBlobUpdateTimer().scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, syncSupEventManager)); - - // supervisor health check - eventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(supervisorData)); - - // Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds - eventTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(runProfilerActionThread, syncSupEventManager)); - } - supervisorManger = new SupervisorManger(supervisorData, syncSupEventManager, syncProcessManager); - } catch (Throwable t) { - if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, t)) { - throw t; - } else if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, t)) { - throw t; - } else { - LOG.error("Error on initialization of server supervisor"); - Utils.exitProcess(13, "Error on initialization"); - } - } - return supervisorManger; - } - - /** - * start local supervisor - */ - public void localLaunch() { - LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion()); - SupervisorManger supervisorManager; - try { - Map<Object, Object> conf = Utils.readStormConfig(); - if (!ConfigUtils.isLocalMode(conf)) { - throw new IllegalArgumentException("Cannot start server in distribute mode!"); - } - ISupervisor iSupervisor = new StandaloneSupervisor(); - supervisorManager = mkSupervisor(conf, null, iSupervisor); - if (supervisorManager != null) - Utils.addShutdownHookWithForceKillIn1Sec(supervisorManager); - } catch (Exception e) { - LOG.error("Failed to start supervisor\n", e); - System.exit(1); - } - } - - /** - * start distribute supervisor - */ - private void distributeLaunch() { - LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion()); - SupervisorManger supervisorManager; - try { - Map<Object, Object> conf = Utils.readStormConfig(); - if (ConfigUtils.isLocalMode(conf)) { - throw new IllegalArgumentException("Cannot start server in local mode!"); - } - ISupervisor iSupervisor = new StandaloneSupervisor(); - supervisorManager = mkSupervisor(conf, null, iSupervisor); - if (supervisorManager != null) - Utils.addShutdownHookWithForceKillIn1Sec(supervisorManager); - registerWorkerNumGauge("drpc:num-execute-http-requests", conf); - startMetricsReporters(conf); - } catch (Exception e) { - LOG.error("Failed to start supervisor\n", e); - System.exit(1); - } - } - - // To be removed - private void registerWorkerNumGauge(String name, final Map conf) { - MetricRegistry metricRegistry = new MetricRegistry(); - metricRegistry.remove(name); - metricRegistry.register(name, new Gauge<Integer>() { - @Override - public Integer getValue() { - Collection<String> pids = Utils.readDirContents(ConfigUtils.workerRoot(conf)); - return pids.size(); - } - }); - } - - // To be removed - private void startMetricsReporters(Map conf) { - List<PreparableReporter> preparableReporters = MetricsUtils.getPreparableReporters(conf); - for (PreparableReporter reporter : preparableReporters) { - reporter.prepare(new MetricRegistry(), conf); - reporter.start(); - } - LOG.info("Started statistics report plugin..."); - } - - /** - * supervisor daemon enter entrance - * - * @param args - */ - public static void main(String[] args) { - Utils.setupDefaultUncaughtExceptionHandler(); - SupervisorServer instance = new SupervisorServer(); - instance.distributeLaunch(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java index ffdb839..9d0b343 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java @@ -20,10 +20,14 @@ package org.apache.storm.daemon.supervisor; import org.apache.commons.lang.StringUtils; import org.apache.curator.utils.PathUtils; import org.apache.storm.Config; +import org.apache.storm.generated.LSWorkerHeartbeat; import org.apache.storm.localizer.LocalResource; import org.apache.storm.localizer.Localizer; import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; import org.apache.storm.utils.Utils; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,13 +40,24 @@ public class SupervisorUtils { private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class); + private static final SupervisorUtils INSTANCE = new SupervisorUtils(); + private static SupervisorUtils _instance = INSTANCE; + + public static void setInstance(SupervisorUtils u) { + _instance = u; + } + + public static void resetInstance() { + _instance = INSTANCE; + } + public static Process workerLauncher(Map conf, String user, List<String> args, Map<String, String> environment, final String logPreFix, final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException { if (StringUtils.isBlank(user)) { throw new IllegalArgumentException("User cannot be blank when calling workerLauncher."); } String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER)); - String stormHome = System.getProperty("storm.home"); + String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); String wl; if (StringUtils.isNotBlank(wlinitial)) { wl = wlinitial; @@ -165,9 +180,94 @@ public class SupervisorUtils { return false; if (!Utils.checkFileExists(stormconfpath)) return false; - if (!ConfigUtils.isLocalMode(conf) && !Utils.checkFileExists(stormjarpath)) - return false; - return true; + if (ConfigUtils.isLocalMode(conf) || Utils.checkFileExists(stormjarpath)) + return true; + return false; + } + + public static Collection<String> myWorkerIds(Map conf){ + return Utils.readDirContents(ConfigUtils.workerRoot(conf)); + } + + /** + * Returns map from worr id to heartbeat + * + * @param conf + * @return + * @throws Exception + */ + public static Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map conf) throws Exception { + return _instance.readWorkerHeartbeatsImpl(conf); + } + + public Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map conf) throws Exception { + Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>(); + + Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf); + + for (String workerId : workerIds) { + LSWorkerHeartbeat whb = readWorkerHeartbeat(conf, workerId); + // ATTENTION: whb can be null + workerHeartbeats.put(workerId, whb); + } + return workerHeartbeats; + } + + + /** + * get worker heartbeat by workerId + * + * @param conf + * @param workerId + * @return + * @throws IOException + */ + public static LSWorkerHeartbeat readWorkerHeartbeat(Map conf, String workerId) { + return _instance.readWorkerHeartbeatImpl(conf, workerId); + } + + public LSWorkerHeartbeat readWorkerHeartbeatImpl(Map conf, String workerId) { + try { + LocalState localState = ConfigUtils.workerState(conf, workerId); + return localState.getWorkerHeartBeat(); + } catch (Exception e) { + LOG.warn("Failed to read local heartbeat for workerId : {},Ignoring exception.", workerId, e); + return null; + } + } + + public static boolean isWorkerHbTimedOut(int now, LSWorkerHeartbeat whb, Map conf) { + return _instance.isWorkerHbTimedOutImpl(now, whb, conf); + } + + public boolean isWorkerHbTimedOutImpl(int now, LSWorkerHeartbeat whb, Map conf) { + boolean result = false; + if ((now - whb.get_time_secs()) > Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) { + result = true; + } + return result; + } + + public static String javaCmd(String cmd) { + return _instance.javaCmdImpl(cmd); + } + + public String javaCmdImpl(String cmd) { + String ret = null; + String javaHome = System.getenv().get("JAVA_HOME"); + if (StringUtils.isNotBlank(javaHome)) { + ret = javaHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR + cmd; + } else { + ret = cmd; + } + return ret; + } + + public static List<ACL> supervisorZkAcls() { + List<ACL> acls = new ArrayList<>(); + acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0)); + acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE)); + return acls; } } http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java index af454b9..4ef6d1c 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java @@ -17,14 +17,10 @@ */ package org.apache.storm.daemon.supervisor; -import clojure.lang.IFn; -import clojure.lang.RT; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.apache.storm.Config; -import org.apache.storm.ProcessSimulator; -import org.apache.storm.cluster.IStormClusterState; -import org.apache.storm.daemon.Shutdownable; +import org.apache.storm.container.cgroup.CgroupManager; import org.apache.storm.generated.ExecutorInfo; import org.apache.storm.generated.LSWorkerHeartbeat; import org.apache.storm.generated.LocalAssignment; @@ -33,6 +29,7 @@ import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.LocalState; import org.apache.storm.utils.Time; import org.apache.storm.utils.Utils; +import org.eclipse.jetty.util.ConcurrentHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.yaml.snakeyaml.Yaml; @@ -52,9 +49,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); - private final LocalState localState; - - private IStormClusterState stormClusterState; + private LocalState localState; private SupervisorData supervisorData; @@ -80,15 +75,21 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { } } + public SyncProcessEvent(){ + + } + public SyncProcessEvent(SupervisorData supervisorData) { + init(supervisorData); + } + //TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java + public void init(SupervisorData supervisorData){ this.supervisorData = supervisorData; - this.localState = supervisorData.getLocalState(); - - this.stormClusterState = supervisorData.getStormClusterState(); } + /** * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new @@ -101,12 +102,13 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { try { Map conf = supervisorData.getConf(); Map<Integer, LocalAssignment> assignedExecutors = localState.getLocalAssignmentsMap(); + if (assignedExecutors == null) { assignedExecutors = new HashMap<>(); } int now = Time.currentTimeSecs(); - Map<String, StateHeartbeat> localWorkerStats = getLocalWorkerStats(assignedExecutors, now); + Map<String, StateHeartbeat> localWorkerStats = getLocalWorkerStats(supervisorData, assignedExecutors, now); Set<String> keeperWorkerIds = new HashSet<>(); Set<Integer> keepPorts = new HashSet<>(); @@ -171,16 +173,17 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { } } - Map<Integer, LocalAssignment> getReassignExecutors(Map<Integer, LocalAssignment> assignExecutors, Set<Integer> keepPorts) { + protected Map<Integer, LocalAssignment> getReassignExecutors(Map<Integer, LocalAssignment> assignExecutors, Set<Integer> keepPorts) { Map<Integer, LocalAssignment> reassignExecutors = new HashMap<>(); + reassignExecutors.putAll(assignExecutors); for (Integer port : keepPorts) { - if (assignExecutors.containsKey(port)) { - reassignExecutors.put(port, assignExecutors.get(port)); - } + reassignExecutors.remove(port); } return reassignExecutors; } + + /** * Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead * @@ -188,11 +191,11 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { * @return * @throws Exception */ - public Map<String, StateHeartbeat> getLocalWorkerStats(Map<Integer, LocalAssignment> assignedExecutors, int now) throws Exception { + public Map<String, StateHeartbeat> getLocalWorkerStats(SupervisorData supervisorData, Map<Integer, LocalAssignment> assignedExecutors, int now) throws Exception { Map<String, StateHeartbeat> workerIdHbstate = new HashMap<>(); Map conf = supervisorData.getConf(); LocalState localState = supervisorData.getLocalState(); - Map<String, LSWorkerHeartbeat> idToHeartbeat = readWorkerHeartbeats(conf); + Map<String, LSWorkerHeartbeat> idToHeartbeat = SupervisorUtils.readWorkerHeartbeats(conf); Map<String, Integer> approvedWorkers = localState.getApprovedWorkers(); Set<String> approvedIds = new HashSet<>(); if (approvedWorkers != null) { @@ -209,12 +212,12 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { } else if (supervisorData.getDeadWorkers().contains(workerId)) { LOG.info("Worker Process {}as died", workerId); state = State.timedOut; - } else if ((now - whb.get_time_secs()) > (Integer) (conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) { + } else if (SupervisorUtils.isWorkerHbTimedOut(now, whb, conf)) { state = State.timedOut; } else { state = State.valid; } - LOG.debug("Worker:{} state:{} WorkerHeartbeat:{} at supervisor time-secs {}", workerId, state, whb.toString(), now); + LOG.debug("Worker:{} state:{} WorkerHeartbeat:{} at supervisor time-secs {}", workerId, state, whb, now); workerIdHbstate.put(workerId, new StateHeartbeat(state, whb)); } return workerIdHbstate; @@ -222,7 +225,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { protected boolean matchesAssignment(LSWorkerHeartbeat whb, Map<Integer, LocalAssignment> assignedExecutors) { LocalAssignment localAssignment = assignedExecutors.get(whb.get_port()); - if (localAssignment == null || localAssignment.get_topology_id() != whb.get_topology_id()) { + if (localAssignment == null || !localAssignment.get_topology_id().equals(whb.get_topology_id())) { return false; } List<ExecutorInfo> executorInfos = new ArrayList<>(); @@ -230,61 +233,34 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { // remove SYSTEM_EXECUTOR_ID executorInfos.remove(new ExecutorInfo(-1, -1)); List<ExecutorInfo> localExecuorInfos = localAssignment.get_executors(); - if (executorInfos != localExecuorInfos) - return false; - return true; - } - - /** - * Returns map from worr id to heartbeat - * - * @param conf - * @return - * @throws Exception - */ - protected Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map conf) throws Exception { - Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>(); - Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf); - - for (String workerId : workerIds) { - LSWorkerHeartbeat whb = readWorkerHeartbeat(conf, workerId); - // ATTENTION: whb can be null - workerHeartbeats.put(workerId, whb); - } - return workerHeartbeats; - } + if (localExecuorInfos.size() != executorInfos.size()) + return false; - /** - * get worker heartbeat by workerId - * - * @param conf - * @param workerId - * @return - * @throws IOException - */ - protected LSWorkerHeartbeat readWorkerHeartbeat(Map conf, String workerId) { - try { - LocalState localState = ConfigUtils.workerState(conf, workerId); - return localState.getWorkerHeartBeat(); - } catch (Exception e) { - LOG.warn("Failed to read local heartbeat for workerId : {},Ignoring exception.", workerId, e); - return null; + for (ExecutorInfo executorInfo : localExecuorInfos){ + if (!localExecuorInfos.contains(executorInfo)) + return false; } + return true; } /** * launch a worker in local mode. But it may exist question??? */ - protected void launchLocalWorker(String stormId, Integer port, String workerId, WorkerResources resources) throws IOException { + protected void launchLocalWorker(SupervisorData supervisorData, String stormId, Long port, String workerId, WorkerResources resources) throws IOException { // port this function after porting worker to java } protected String getWorkerClassPath(String stormJar, Map stormConf) { List<String> topoClasspath = new ArrayList<>(); Object object = stormConf.get(Config.TOPOLOGY_CLASSPATH); - if (object != null) { + + if (object instanceof List) { topoClasspath.addAll((List<String>) object); + } else if (object instanceof String){ + topoClasspath.add((String)object); + }else { + //ignore } String classPath = Utils.workerClasspath(); String classAddPath = Utils.addToClasspath(classPath, Arrays.asList(stormJar)); @@ -300,54 +276,46 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { * @param port * @param memOnheap */ - public List<String> substituteChildopts(Object value, String workerId, String stormId, Integer port, int memOnheap) { + public List<String> substituteChildopts(Object value, String workerId, String stormId, Long port, int memOnheap) { List<String> rets = new ArrayList<>(); if (value instanceof String) { String string = (String) value; - string.replace("%ID%", String.valueOf(port)); - string.replace("%WORKER-ID%", workerId); - string.replace("%TOPOLOGY-ID%", stormId); - string.replace("%WORKER-PORT%", String.valueOf(port)); - string.replace("%HEAP-MEM%", String.valueOf(memOnheap)); + string = string.replace("%ID%", String.valueOf(port)); + string = string.replace("%WORKER-ID%", workerId); + string = string.replace("%TOPOLOGY-ID%", stormId); + string = string.replace("%WORKER-PORT%", String.valueOf(port)); + string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap)); String[] strings = string.split("\\s+"); rets.addAll(Arrays.asList(strings)); } else if (value instanceof List) { - List<String> strings = (List<String>) value; - for (String str : strings) { - str.replace("%ID%", String.valueOf(port)); - str.replace("%WORKER-ID%", workerId); - str.replace("%TOPOLOGY-ID%", stormId); - str.replace("%WORKER-PORT%", String.valueOf(port)); - str.replace("%HEAP-MEM%", String.valueOf(memOnheap)); + List<Object> objects = (List<Object>) value; + for (Object object : objects) { + String str = (String)object; + str = str.replace("%ID%", String.valueOf(port)); + str = str.replace("%WORKER-ID%", workerId); + str = str.replace("%TOPOLOGY-ID%", stormId); + str = str.replace("%WORKER-PORT%", String.valueOf(port)); + str = str.replace("%HEAP-MEM%", String.valueOf(memOnheap)); rets.add(str); } } return rets; } - private String jvmCmd(String cmd) { - String ret = null; - String javaHome = System.getProperty("JAVA_HOME"); - if (StringUtils.isNotBlank(javaHome)) { - ret = javaHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR + cmd; - } else { - ret = cmd; - } - return ret; - } + /** * launch a worker in distributed mode - * + * supervisorId for testing * @throws IOException */ - protected void launchDistributeWorker(String stormId, Integer port, String workerId, WorkerResources resources) throws IOException { + protected void launchDistributeWorker(Map conf, String supervisorId, String assignmentId, String stormId, Long port, String workerId, + WorkerResources resources, CgroupManager cgroupManager, ConcurrentHashSet deadWorkers) throws IOException { - Map conf = supervisorData.getConf(); Boolean runWorkerAsUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false); - String stormHome = System.getProperty("storm.home"); - String stormOptions = System.getProperty("storm.options"); - String stormConfFile = System.getProperty("storm.conf.file"); + String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); + String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options")); + String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file")); String stormLogDir = ConfigUtils.getLogDir(); String stormLogConfDir = (String) (conf.get(Config.STORM_LOG4J2_CONF_DIR)); @@ -384,7 +352,8 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { if (resources.get_mem_on_heap() > 0) { memOnheap = (int) Math.ceil(resources.get_mem_on_heap()); } else { - memOnheap = Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB)); + //set the default heap memory size for supervisor-test + memOnheap = Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB), 768); } int memoffheap = (int) Math.ceil(resources.get_mem_off_heap()); @@ -425,16 +394,16 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { List<String> workerProfilerChildopts = null; if (Utils.getBoolean(conf.get(Config.WORKER_PROFILER_ENABLED), false)) { workerProfilerChildopts = substituteChildopts(conf.get(Config.WORKER_PROFILER_CHILDOPTS), workerId, stormId, port, memOnheap); + }else { + workerProfilerChildopts = new ArrayList<>(); } - Map<String, String> environment = new HashMap<String, String>(); - Map<String, String> topEnvironment = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT); - if (topEnvironment != null) { - environment.putAll(topEnvironment); - environment.put("LD_LIBRARY_PATH", jlp); - } else { - environment.put("LD_LIBRARY_PATH", jlp); + Map<String, String> topEnvironment = new HashMap<String, String>(); + Map<String, String> environment = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT); + if (environment != null) { + topEnvironment.putAll(environment); } + topEnvironment.put("LD_LIBRARY_PATH", jlp); String log4jConfigurationFile = null; if (System.getProperty("os.name").startsWith("Windows") && !stormLog4j2ConfDir.startsWith("file:")) { @@ -444,10 +413,8 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { } log4jConfigurationFile = log4jConfigurationFile + Utils.FILE_PATH_SEPARATOR + "worker.xml"; - StringBuilder commandSB = new StringBuilder(); - List<String> commandList = new ArrayList<>(); - commandList.add(jvmCmd("java")); + commandList.add(SupervisorUtils.javaCmd("java")); commandList.add("-cp"); commandList.add(workerClassPath); commandList.addAll(topoWorkerLogwriterChildopts); @@ -462,7 +429,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"); commandList.add("org.apache.storm.LogWriter"); - commandList.add(jvmCmd("java")); + commandList.add(SupervisorUtils.javaCmd("java")); commandList.add("-server"); commandList.addAll(workerChildopts); commandList.addAll(topWorkerChildopts); @@ -476,7 +443,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { commandList.add("-Dstorm.options=" + stormOptions); commandList.add("-Dstorm.log.dir=" + stormLogDir); commandList.add("-Dlogging.sensitivity=" + loggingSensitivity); - commandList.add(" -Dlog4j.configurationFile=" + log4jConfigurationFile); + commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile); commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"); commandList.add("-Dstorm.id=" + stormId); commandList.add("-Dworker.id=" + workerId); @@ -485,7 +452,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { commandList.add(workerClassPath); commandList.add("org.apache.storm.daemon.worker"); commandList.add(stormId); - commandList.add(supervisorData.getAssignmentId()); + commandList.add(assignmentId); commandList.add(String.valueOf(port)); commandList.add(workerId); @@ -497,27 +464,29 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { Map<String, Number> map = new HashMap<>(); map.put("cpu", cpuValue); map.put("memory", memoryValue); - supervisorData.getResourceIsolationManager().reserveResourcesForWorker(workerId, map); - commandList = supervisorData.getResourceIsolationManager().getLaunchCommand(workerId, commandList); + cgroupManager.reserveResourcesForWorker(workerId, map); + commandList = cgroupManager.getLaunchCommand(workerId, commandList); } - LOG.info("Launching worker with command: ", Utils.shellCmd(commandList)); + LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList)); writeLogMetadata(stormConf, user, workerId, stormId, port, conf); ConfigUtils.setWorkerUserWSE(conf, workerId, user); createArtifactsLink(conf, stormId, port, workerId); String logPrefix = "Worker Process " + workerId; String workerDir = ConfigUtils.workerRoot(conf, workerId); - supervisorData.getDeadWorkers().remove(workerId); + + if (deadWorkers != null) + deadWorkers.remove(workerId); createBlobstoreLinks(conf, stormId, workerId); ProcessExitCallback processExitCallback = new ProcessExitCallback(logPrefix, workerId); if (runWorkerAsUser) { - List<String> stringList = new ArrayList<>(); - stringList.add("worker"); - stringList.add(workerDir); - stringList.add(Utils.writeScript(workerDir, commandList, topEnvironment)); - SupervisorUtils.workerLauncher(conf, user, stringList, null, logPrefix, processExitCallback, new File(workerDir)); + List<String> args = new ArrayList<>(); + args.add("worker"); + args.add(workerDir); + args.add(Utils.writeScript(workerDir, commandList, topEnvironment)); + SupervisorUtils.workerLauncher(conf, user, args, null, logPrefix, processExitCallback, new File(workerDir)); } else { Utils.launchProcess(commandList, topEnvironment, logPrefix, processExitCallback, new File(workerDir)); } @@ -536,6 +505,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { Map<String, Integer> newValidWorkerIds = new HashMap<>(); Map conf = supervisorData.getConf(); + String supervisorId = supervisorData.getSupervisorId(); String clusterMode = ConfigUtils.clusterMode(conf); for (Map.Entry<Integer, LocalAssignment> entry : reassignExecutors.entrySet()) { @@ -550,17 +520,20 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { String pidsPath = ConfigUtils.workerPidsRoot(conf, workerId); String hbPath = ConfigUtils.workerHeartbeatsRoot(conf, workerId); + LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", assignment, supervisorData.getSupervisorId(), port, + workerId); + FileUtils.forceMkdir(new File(pidsPath)); FileUtils.forceMkdir(new File(hbPath)); if (clusterMode.endsWith("distributed")) { - launchDistributeWorker(stormId, port, workerId, resources); + launchDistributeWorker(conf, supervisorId, supervisorData.getAssignmentId(), stormId, port.longValue(), workerId, resources, + supervisorData.getResourceIsolationManager(), supervisorData.getDeadWorkers()); } else if (clusterMode.endsWith("local")) { - launchLocalWorker(stormId, port, workerId, resources); + launchLocalWorker(supervisorData, stormId, port.longValue(), workerId, resources); } newValidWorkerIds.put(workerId, port); - LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", assignment, supervisorData.getSupervisorId(), port, - workerId); + } else { LOG.info("Missing topology storm code, so can't launch worker with assignment {} for this supervisor {} on port {} with id {}", assignment, supervisorData.getSupervisorId(), port, workerId); @@ -570,26 +543,39 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { return newValidWorkerIds; } - protected void writeLogMetadata(Map stormconf, String user, String workerId, String stormId, int port, Map conf) throws IOException { + public void writeLogMetadata(Map stormconf, String user, String workerId, String stormId, Long port, Map conf) throws IOException { Map data = new HashMap(); data.put(Config.TOPOLOGY_SUBMITTER_USER, user); data.put("worker-id", workerId); Set<String> logsGroups = new HashSet<>(); + //for supervisor-test if (stormconf.get(Config.LOGS_GROUPS) != null) { - logsGroups.addAll((List<String>) stormconf.get(Config.LOGS_GROUPS)); + List<String> groups = (List<String>) stormconf.get(Config.LOGS_GROUPS); + for (String group : groups){ + logsGroups.add(group); + } } if (stormconf.get(Config.TOPOLOGY_GROUPS) != null) { - logsGroups.addAll((List<String>) stormconf.get(Config.TOPOLOGY_GROUPS)); + List<String> topGroups = (List<String>) stormconf.get(Config.TOPOLOGY_GROUPS); + for (String group : topGroups){ + logsGroups.add(group); + } } data.put(Config.LOGS_GROUPS, logsGroups.toArray()); Set<String> logsUsers = new HashSet<>(); if (stormconf.get(Config.LOGS_USERS) != null) { - logsUsers.addAll((List<String>) stormconf.get(Config.LOGS_USERS)); + List<String> logUsers = (List<String>) stormconf.get(Config.LOGS_USERS); + for (String logUser : logUsers){ + logsUsers.add(logUser); + } } if (stormconf.get(Config.TOPOLOGY_USERS) != null) { - logsUsers.addAll((List<String>) stormconf.get(Config.TOPOLOGY_USERS)); + List<String> topUsers = (List<String>) stormconf.get(Config.TOPOLOGY_USERS); + for (String logUser : topUsers){ + logsUsers.add(logUser); + } } data.put(Config.LOGS_USERS, logsUsers.toArray()); writeLogMetadataToYamlFile(stormId, port, data, conf); @@ -604,19 +590,25 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { * @param conf * @throws IOException */ - protected void writeLogMetadataToYamlFile(String stormId, int port, Map data, Map conf) throws IOException { - File file = ConfigUtils.getLogMetaDataFile(conf, stormId, port); + public void writeLogMetadataToYamlFile(String stormId, Long port, Map data, Map conf) throws IOException { + File file = ConfigUtils.getLogMetaDataFile(conf, stormId, port.intValue()); + if (!Utils.checkFileExists(file.getParent())) { if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { FileUtils.forceMkdir(file.getParentFile()); SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), file.getParentFile().getCanonicalPath()); } else { - file.getParentFile().mkdir(); + file.getParentFile().mkdirs(); } } FileWriter writer = new FileWriter(file); Yaml yaml = new Yaml(); - yaml.dump(data, writer); + try { + yaml.dump(data, writer); + }finally { + writer.close(); + } + } /** @@ -627,7 +619,7 @@ public class SyncProcessEvent extends ShutdownWork implements Runnable { * @param port * @param workerId */ - protected void createArtifactsLink(Map conf, String stormId, int port, String workerId) throws IOException { + protected void createArtifactsLink(Map conf, String stormId, Long port, String workerId) throws IOException { String workerDir = ConfigUtils.workerRoot(conf, workerId); String topoDir = ConfigUtils.workerArtifactsRoot(conf, stormId); if (Utils.checkFileExists(workerDir)) { http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java index d6dc45e..2de9203 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java @@ -88,6 +88,7 @@ public class SyncSupervisorEvent implements Runnable { Map<Integer, LocalAssignment> allAssignment = readAssignments(assignmentsSnapshot, existingAssignment, supervisorData.getAssignmentId(), supervisorData.getSyncRetry()); + Map<Integer, LocalAssignment> newAssignment = new HashMap<>(); Set<String> assignedStormIds = new HashSet<>(); @@ -97,6 +98,7 @@ public class SyncSupervisorEvent implements Runnable { assignedStormIds.add(entry.getValue().get_topology_id()); } } + Set<String> srashStormIds = verifyDownloadedFiles(conf, supervisorData.getLocalizer(), assignedStormIds, allDownloadedTopologyIds); Set<String> downloadedStormIds = new HashSet<>(); downloadedStormIds.addAll(allDownloadedTopologyIds); @@ -312,6 +314,7 @@ public class SyncSupervisorEvent implements Runnable { } FileUtils.moveDirectory(new File(tmproot), new File(stormroot)); + SupervisorUtils.setupStormCodeDir(conf, ConfigUtils.readSupervisorStormConf(conf, stormId), stormroot); ClassLoader classloader = Thread.currentThread().getContextClassLoader(); @@ -350,7 +353,7 @@ public class SyncSupervisorEvent implements Runnable { String tmproot = ConfigUtils.supervisorTmpDir(conf) + Utils.FILE_PATH_SEPARATOR + Utils.uuid(); String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId); ClientBlobStore blobStore = Utils.getClientBlobStoreForSupervisor(conf); - + FileUtils.forceMkdir(new File(tmproot)); if (Utils.isOnWindows()) { if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { throw new RuntimeException("ERROR: Windows doesn't implement setting the correct permissions"); @@ -358,7 +361,6 @@ public class SyncSupervisorEvent implements Runnable { } else { Utils.restrictPermissions(tmproot); } - FileUtils.forceMkdir(new File(tmproot)); String stormJarKey = ConfigUtils.masterStormJarKey(stormId); String stormCodeKey = ConfigUtils.masterStormCodeKey(stormId); String stormConfKey = ConfigUtils.masterStormConfKey(stormId); @@ -549,7 +551,7 @@ public class SyncSupervisorEvent implements Runnable { for (Map.Entry<List<Long>, NodeInfo> entry : executorNodePort.entrySet()) { if (entry.getValue().get_node().equals(assignmentId)) { for (Long port : entry.getValue().get_port()) { - LocalAssignment localAssignment = portTasks.get(port); + LocalAssignment localAssignment = portTasks.get(port.intValue()); if (localAssignment == null) { List<ExecutorInfo> executors = new ArrayList<ExecutorInfo>(); localAssignment = new LocalAssignment(stormId, executors); @@ -577,8 +579,7 @@ public class SyncSupervisorEvent implements Runnable { assignedExecutors = new HashMap<>(); } int now = Time.currentTimeSecs(); - SyncProcessEvent syncProcesses = new SyncProcessEvent(supervisorData); - Map<String, StateHeartbeat> workerIdHbstate = syncProcesses.getLocalWorkerStats(assignedExecutors, now); + Map<String, StateHeartbeat> workerIdHbstate = syncProcesses.getLocalWorkerStats(supervisorData, assignedExecutors, now); LOG.debug("Allocated workers ", assignedExecutors); for (Map.Entry<String, StateHeartbeat> entry : workerIdHbstate.entrySet()){ String workerId = entry.getKey(); http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java index 2d73327..91044cc 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java @@ -41,7 +41,6 @@ public class RunProfilerActions implements Runnable { private Map conf; private IStormClusterState stormClusterState; private String hostName; - private String stormHome; private String profileCmd; @@ -79,7 +78,6 @@ public class RunProfilerActions implements Runnable { this.conf = supervisorData.getConf(); this.stormClusterState = supervisorData.getStormClusterState(); this.hostName = supervisorData.getHostName(); - this.stormHome = System.getProperty("storm.home"); this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND)); this.supervisorData = supervisorData; } http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java index d41ca87..e158dbc 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java @@ -22,6 +22,7 @@ import org.apache.storm.cluster.IStormClusterState; import org.apache.storm.daemon.supervisor.SupervisorData; import org.apache.storm.generated.SupervisorInfo; import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; import java.util.ArrayList; import java.util.HashMap; @@ -53,13 +54,16 @@ public class SupervisorHeartbeat implements Runnable { List<Long> usedPorts = new ArrayList<>(); usedPorts.addAll(supervisorData.getCurrAssignment().keySet()); supervisorInfo.set_used_ports(usedPorts); + List metaDatas = (List)supervisorData.getiSupervisor().getMetadata(); List<Long> portList = new ArrayList<>(); - Object metas = supervisorData.getiSupervisor().getMetadata(); - if (metas != null) { - for (Integer port : (List<Integer>) metas) { - portList.add(port.longValue()); + if (metaDatas != null){ + for (Object data : metaDatas){ + Integer port = Utils.getInt(data); + if (port != null) + portList.add(port.longValue()); } } + supervisorInfo.set_meta(portList); supervisorInfo.set_scheduler_meta((Map<String, String>) conf.get(Config.SUPERVISOR_SCHEDULER_META)); supervisorInfo.set_uptime_secs(supervisorData.getUpTime().upTime()); http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedSupervisorUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedSupervisorUtils.java b/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedSupervisorUtils.java new file mode 100644 index 0000000..d33dc9c --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedSupervisorUtils.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.testing.staticmocking; + +import org.apache.storm.daemon.supervisor.SupervisorUtils; + +public class MockedSupervisorUtils implements AutoCloseable { + + public MockedSupervisorUtils(SupervisorUtils inst) { + SupervisorUtils.setInstance(inst); + } + + @Override + public void close() throws Exception { + SupervisorUtils.resetInstance(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/src/jvm/org/apache/storm/utils/Utils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java index 1ba3de7..4e3dbb4 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java +++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java @@ -215,7 +215,7 @@ public class Utils { try { T ret = (T) c.newInstance(); TDeserializer des = getDes(); - des.deserialize((TBase)ret, b, offset, length); + des.deserialize((TBase) ret, b, offset, length); return ret; } catch (Exception e) { throw new RuntimeException(e); @@ -1700,7 +1700,7 @@ public class Utils { if(map == null) { return null; } - return findOne(pred, (Set<T>)map.entrySet()); + return findOne(pred, (Set<T>) map.entrySet()); } public static String localHostname () throws UnknownHostException { http://git-wip-us.apache.org/repos/asf/storm/blob/19fcafbd/storm-core/test/clj/org/apache/storm/logviewer_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/logviewer_test.clj b/storm-core/test/clj/org/apache/storm/logviewer_test.clj index 4889c8e..d06c11c 100644 --- a/storm-core/test/clj/org/apache/storm/logviewer_test.clj +++ b/storm-core/test/clj/org/apache/storm/logviewer_test.clj @@ -15,8 +15,7 @@ ;; limitations under the License. (ns org.apache.storm.logviewer-test (:use [org.apache.storm config util]) - (:require [org.apache.storm.daemon [logviewer :as logviewer] - [supervisor :as supervisor]]) + (:require [org.apache.storm.daemon [logviewer :as logviewer]]) (:require [conjure.core]) (:use [clojure test]) (:use [conjure core]) @@ -24,7 +23,10 @@ [org.apache.storm.ui helpers]) (:import [org.apache.storm.daemon DirectoryCleaner] [org.apache.storm.utils Utils Time] - [org.apache.storm.utils.staticmocking UtilsInstaller]) + [org.apache.storm.utils.staticmocking UtilsInstaller] + [org.apache.storm.daemon.supervisor SupervisorUtils] + [org.apache.storm.testing.staticmocking MockedSupervisorUtils] + [org.apache.storm.generated LSWorkerHeartbeat]) (:import [java.nio.file Files Path DirectoryStream]) (:import [java.nio.file Files]) (:import [java.nio.file.attribute FileAttribute]) @@ -236,25 +238,33 @@ mock-metaFile (mk-mock-File {:name "worker.yaml" :type :file}) exp-id "id12345" - expected {exp-id port1-dir}] - (stubbing [supervisor/read-worker-heartbeats nil - logviewer/get-metadata-file-for-wroker-logdir mock-metaFile - logviewer/get-worker-id-from-metadata-file exp-id] - (is (= expected (logviewer/identify-worker-log-dirs [port1-dir]))))))) + expected {exp-id port1-dir} + supervisor-util (Mockito/mock SupervisorUtils)] + (with-open [_ (MockedSupervisorUtils. supervisor-util)] + (stubbing [logviewer/get-metadata-file-for-wroker-logdir mock-metaFile + logviewer/get-worker-id-from-metadata-file exp-id] + (. (Mockito/when (.readWorkerHeartbeatsImpl supervisor-util (Mockito/any))) (thenReturn nil)) + (is (= expected (logviewer/identify-worker-log-dirs [port1-dir])))))))) + + (deftest test-get-dead-worker-dirs (testing "removes any files of workers that are still alive" (let [conf {SUPERVISOR-WORKER-TIMEOUT-SECS 5} - id->hb {"42" {:time-secs 1}} + hb (let[lwb (LSWorkerHeartbeat.)] + (.set_time_secs lwb (int 1)) lwb) + id->hb {"42" hb} now-secs 2 unexpected-dir (mk-mock-File {:name "dir1" :type :directory}) expected-dir (mk-mock-File {:name "dir2" :type :directory}) - log-dirs #{unexpected-dir expected-dir}] + log-dirs #{unexpected-dir expected-dir} + supervisor-util (Mockito/mock SupervisorUtils)] + (with-open [_ (MockedSupervisorUtils. supervisor-util)] (stubbing [logviewer/identify-worker-log-dirs {"42" unexpected-dir, - "007" expected-dir} - supervisor/read-worker-heartbeats id->hb] + "007" expected-dir}] + (. (Mockito/when (.readWorkerHeartbeatsImpl supervisor-util (Mockito/any))) (thenReturn id->hb)) (is (= #{expected-dir} - (logviewer/get-dead-worker-dirs conf now-secs log-dirs))))))) + (logviewer/get-dead-worker-dirs conf now-secs log-dirs)))))))) (deftest test-cleanup-fn (testing "cleanup function forceDeletes files of dead workers"
