http://git-wip-us.apache.org/repos/asf/storm/blob/dba69b52/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java index 6b294f2,0000000..04467c2 mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java @@@ -1,221 -1,0 +1,214 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.supervisor.timer; + ++import com.google.common.collect.Lists; +import org.apache.storm.Config; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.generated.ProfileAction; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.*; + +public class RunProfilerActions implements Runnable { + private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class); + + private Map conf; + private IStormClusterState stormClusterState; + private String hostName; + + private String profileCmd; + + private SupervisorData supervisorData; + + private class ActionExitCallback implements Utils.ExitCodeCallable { + private String stormId; + private ProfileRequest profileRequest; + private String logPrefix; ++ private boolean stop; + - public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix) { ++ public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix, boolean stop) { + this.stormId = stormId; + this.profileRequest = profileRequest; + this.logPrefix = logPrefix; ++ this.stop = stop; + } + + @Override + public Object call() throws Exception { + return null; + } + + @Override + public Object call(int exitCode) { + LOG.info("{} profile-action exited for {}", logPrefix, exitCode); + try { - stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest); ++ if (stop) ++ stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest); + } catch (Exception e) { + LOG.warn("failed delete profileRequest: " + profileRequest); + } + return null; + } + } + + public RunProfilerActions(SupervisorData supervisorData) { + this.conf = supervisorData.getConf(); + this.stormClusterState = supervisorData.getStormClusterState(); + this.hostName = supervisorData.getHostName(); + this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND)); + this.supervisorData = supervisorData; + } + + @Override + public void run() { - Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfileActions().get(); ++ Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfilerActions().get(); + try { + for (Map.Entry<String, List<ProfileRequest>> entry : stormIdToActions.entrySet()) { + String stormId = entry.getKey(); + List<ProfileRequest> requests = entry.getValue(); + if (requests != null) { + for (ProfileRequest profileRequest : requests) { + if (profileRequest.get_nodeInfo().get_node().equals(hostName)) { - boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp() ? true : false; ++ boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp(); + Long port = profileRequest.get_nodeInfo().get_port().iterator().next(); + String targetDir = ConfigUtils.workerArtifactsRoot(conf, String.valueOf(port)); + Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); + + String user = null; + if (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER) != null) { + user = (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)); + } + Map<String, String> env = null; + if (stormConf.get(Config.TOPOLOGY_ENVIRONMENT) != null) { + env = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT); + } else { + env = new HashMap<String, String>(); + } + + String str = ConfigUtils.workerArtifactsPidPath(conf, stormId, port.intValue()); + StringBuilder stringBuilder = new StringBuilder(); - FileReader reader = null; - BufferedReader br = null; - try { - reader = new FileReader(str); - br = new BufferedReader(reader); ++ ++ try (FileReader reader = new FileReader(str); ++ BufferedReader br = new BufferedReader(reader)) { + int c; + while ((c = br.read()) >= 0) { + stringBuilder.append(c); + } - } catch (IOException e) { - if (reader != null) - reader.close(); - if (br != null) - br.close(); + } + String workerPid = stringBuilder.toString().trim(); + ProfileAction profileAction = profileRequest.get_action(); + String logPrefix = "ProfilerAction process " + stormId + ":" + port + " PROFILER_ACTION: " + profileAction + " "; + + // Until PROFILER_STOP action is invalid, keep launching profiler start in case worker restarted + // The profiler plugin script validates if JVM is recording before starting another recording. - String command = mkCommand(profileAction, stop, workerPid, targetDir); - List<String> listCommand = new ArrayList<>(); - if (command != null) { - listCommand.addAll(Arrays.asList(command.split(" "))); - } ++ List<String> command = mkCommand(profileAction, stop, workerPid, targetDir); + try { - ActionExitCallback actionExitCallback = new ActionExitCallback(stormId, profileRequest, logPrefix); - launchProfilerActionForWorker(user, targetDir, listCommand, env, actionExitCallback, logPrefix); ++ ActionExitCallback actionExitCallback = new ActionExitCallback(stormId, profileRequest, logPrefix, stop); ++ launchProfilerActionForWorker(user, targetDir, command, env, actionExitCallback, logPrefix); + } catch (IOException e) { + LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port); + } catch (RuntimeException e) { + LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port); + } + } + } + } + } + } catch (Exception e) { + LOG.error("Error running profiler actions, will retry again later"); + } + } + + private void launchProfilerActionForWorker(String user, String targetDir, List<String> commands, Map<String, String> environment, + final Utils.ExitCodeCallable exitCodeCallable, String logPrefix) throws IOException { + File targetFile = new File(targetDir); + if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { + LOG.info("Running as user:{} command:{}", user, commands); + String containerFile = Utils.containerFilePath(targetDir); + if (Utils.checkFileExists(containerFile)) { + SupervisorUtils.rmrAsUser(conf, containerFile, containerFile); + } + String scriptFile = Utils.scriptFilePath(targetDir); + if (Utils.checkFileExists(scriptFile)) { + SupervisorUtils.rmrAsUser(conf, scriptFile, scriptFile); + } + String script = Utils.writeScript(targetDir, commands, environment); + List<String> args = new ArrayList<>(); + args.add("profiler"); + args.add(targetDir); + args.add(script); + SupervisorUtils.processLauncher(conf, user, null, args, environment, logPrefix, exitCodeCallable, targetFile); + } else { + Utils.launchProcess(commands, environment, logPrefix, exitCodeCallable, targetFile); + } + } + - private String mkCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) { ++ private List<String> mkCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) { + if (action == ProfileAction.JMAP_DUMP) { + return jmapDumpCmd(workerPid, targetDir); + } else if (action == ProfileAction.JSTACK_DUMP) { + return jstackDumpCmd(workerPid, targetDir); + } else if (action == ProfileAction.JPROFILE_DUMP) { + return jprofileDump(workerPid, targetDir); + } else if (action == ProfileAction.JVM_RESTART) { + return jprofileJvmRestart(workerPid); + } else if (!stop && action == ProfileAction.JPROFILE_STOP) { + return jprofileStart(workerPid); + } else if (stop && action == ProfileAction.JPROFILE_STOP) { + return jprofileStop(workerPid, targetDir); + } - return null; ++ return Lists.newArrayList(); + } + - private String jmapDumpCmd(String pid, String targetDir) { - return profileCmd + " " + pid + " jmap " + targetDir; ++ private List<String> jmapDumpCmd(String pid, String targetDir) { ++ return Lists.newArrayList(profileCmd, pid, "jmap", targetDir); + } + - private String jstackDumpCmd(String pid, String targetDir) { - return profileCmd + " " + pid + " jstack " + targetDir; ++ private List<String> jstackDumpCmd(String pid, String targetDir) { ++ return Lists.newArrayList(profileCmd, pid, "jstack", targetDir); + } + - private String jprofileStart(String pid) { - return profileCmd + " " + pid + " start"; ++ private List<String> jprofileStart(String pid) { ++ return Lists.newArrayList(profileCmd, pid, "start"); + } + - private String jprofileStop(String pid, String targetDir) { - return profileCmd + " " + pid + " stop " + targetDir; ++ private List<String> jprofileStop(String pid, String targetDir) { ++ return Lists.newArrayList(profileCmd, pid, "stop", targetDir); + } + - private String jprofileDump(String pid, String targetDir) { - return profileCmd + " " + pid + " dump " + targetDir; ++ private List<String> jprofileDump(String pid, String targetDir) { ++ return Lists.newArrayList(profileCmd, pid, "dump", targetDir); + } + - private String jprofileJvmRestart(String pid) { - return profileCmd + " " + pid + " kill"; ++ private List<String> jprofileJvmRestart(String pid) { ++ return Lists.newArrayList(profileCmd, pid, "kill"); + } + +}
http://git-wip-us.apache.org/repos/asf/storm/blob/dba69b52/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java index 5e7b6d3,0000000..3ce8f5d mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java @@@ -1,62 -1,0 +1,52 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.supervisor.timer; + +import org.apache.storm.command.HealthCheck; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Map; + +public class SupervisorHealthCheck implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(SupervisorHealthCheck.class); + + private SupervisorData supervisorData; + + public SupervisorHealthCheck(SupervisorData supervisorData) { + this.supervisorData = supervisorData; + } + + @Override + public void run() { + Map conf = supervisorData.getConf(); + IWorkerManager workerManager = supervisorData.getWorkerManager(); + int healthCode = HealthCheck.healthCheck(conf); - Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf); + if (healthCode != 0) { - for (String workerId : workerIds) { - try { - workerManager.shutdownWorker(supervisorData.getSupervisorId(), workerId, supervisorData.getWorkerThreadPids()); - boolean success = workerManager.cleanupWorker(workerId); - if (success){ - supervisorData.getDeadWorkers().remove(workerId); - } - } catch (Exception e) { - throw Utils.wrapInRuntime(e); - } - } ++ SupervisorUtils.shutdownAllWorkers(conf, supervisorData.getSupervisorId(), supervisorData.getWorkerThreadPids(), supervisorData.getDeadWorkers(), ++ workerManager); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/dba69b52/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java index 9529b1a,0000000..05ed82b mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java @@@ -1,408 -1,0 +1,401 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor.workermanager; + +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.ProcessSimulator; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.*; + +public class DefaultWorkerManager implements IWorkerManager { + + private static Logger LOG = LoggerFactory.getLogger(DefaultWorkerManager.class); + + private Map conf; + private CgroupManager resourceIsolationManager; + private boolean runWorkerAsUser; + + @Override + public void prepareWorker(Map conf, Localizer localizer) { + this.conf = conf; + if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) { + try { + this.resourceIsolationManager = Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN)); + this.resourceIsolationManager.prepare(conf); + LOG.info("Using resource isolation plugin {} {}", conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager); + } catch (IOException e) { + throw Utils.wrapInRuntime(e); + } + } else { + this.resourceIsolationManager = null; + } + this.runWorkerAsUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false); + } + + @Override - public IWorkerResult launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources, ++ public void launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources, + Utils.ExitCodeCallable workerExitCallback) { + try { + + String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); + String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options")); + String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file")); + String workerTmpDir = ConfigUtils.workerTmpRoot(conf, workerId); + + String stormLogDir = ConfigUtils.getLogDir(); + String stormLogConfDir = (String) (conf.get(Config.STORM_LOG4J2_CONF_DIR)); + + String stormLog4j2ConfDir; + if (StringUtils.isNotBlank(stormLogConfDir)) { + if (Utils.isAbsolutePath(stormLogConfDir)) { + stormLog4j2ConfDir = stormLogConfDir; + } else { + stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + stormLogConfDir; + } + } else { + stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2"; + } + + String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId); + + String jlp = jlp(stormRoot, conf); + + String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot); + + Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); + + String workerClassPath = getWorkerClassPath(stormJar, stormConf); + + Object topGcOptsObject = stormConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS); + List<String> topGcOpts = new ArrayList<>(); + if (topGcOptsObject instanceof String) { + topGcOpts.add((String) topGcOptsObject); + } else if (topGcOptsObject instanceof List) { + topGcOpts.addAll((List<String>) topGcOptsObject); + } + + int memOnheap = 0; + if (resources.get_mem_on_heap() > 0) { + memOnheap = (int) Math.ceil(resources.get_mem_on_heap()); + } else { + // set the default heap memory size for supervisor-test + memOnheap = Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB), 768); + } + + int memoffheap = (int) Math.ceil(resources.get_mem_off_heap()); + + int cpu = (int) Math.ceil(resources.get_cpu()); + + List<String> gcOpts = null; + + if (topGcOpts.size() > 0) { + gcOpts = substituteChildopts(topGcOpts, workerId, stormId, port, memOnheap); + } else { + gcOpts = substituteChildopts(conf.get(Config.WORKER_GC_CHILDOPTS), workerId, stormId, port, memOnheap); + } + + Object topoWorkerLogwriterObject = stormConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS); + List<String> topoWorkerLogwriterChildopts = new ArrayList<>(); + if (topoWorkerLogwriterObject instanceof String) { + topoWorkerLogwriterChildopts.add((String) topoWorkerLogwriterObject); + } else if (topoWorkerLogwriterObject instanceof List) { + topoWorkerLogwriterChildopts.addAll((List<String>) topoWorkerLogwriterObject); + } + + String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); + + String logfileName = "worker.log"; + + String workersArtifacets = ConfigUtils.workerArtifactsRoot(conf); + + String loggingSensitivity = (String) stormConf.get(Config.TOPOLOGY_LOGGING_SENSITIVITY); + if (loggingSensitivity == null) { + loggingSensitivity = "S3"; + } + + List<String> workerChildopts = substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), workerId, stormId, port, memOnheap); + + List<String> topWorkerChildopts = substituteChildopts(stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), workerId, stormId, port, memOnheap); + + List<String> workerProfilerChildopts = null; + if (Utils.getBoolean(conf.get(Config.WORKER_PROFILER_ENABLED), false)) { + workerProfilerChildopts = substituteChildopts(conf.get(Config.WORKER_PROFILER_CHILDOPTS), workerId, stormId, port, memOnheap); + } else { + workerProfilerChildopts = new ArrayList<>(); + } + + Map<String, String> topEnvironment = new HashMap<String, String>(); + Map<String, String> environment = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT); + if (environment != null) { + topEnvironment.putAll(environment); + } + topEnvironment.put("LD_LIBRARY_PATH", jlp); + + String log4jConfigurationFile = null; + if (System.getProperty("os.name").startsWith("Windows") && !stormLog4j2ConfDir.startsWith("file:")) { + log4jConfigurationFile = "file:///" + stormLog4j2ConfDir; + } else { + log4jConfigurationFile = stormLog4j2ConfDir; + } + log4jConfigurationFile = log4jConfigurationFile + Utils.FILE_PATH_SEPARATOR + "worker.xml"; + + List<String> commandList = new ArrayList<>(); + commandList.add(SupervisorUtils.javaCmd("java")); + commandList.add("-cp"); + commandList.add(workerClassPath); + commandList.addAll(topoWorkerLogwriterChildopts); + commandList.add("-Dlogfile.name=" + logfileName); + commandList.add("-Dstorm.home=" + stormHome); + commandList.add("-Dworkers.artifacts=" + workersArtifacets); + commandList.add("-Dstorm.id=" + stormId); + commandList.add("-Dworker.id=" + workerId); + commandList.add("-Dworker.port=" + port); + commandList.add("-Dstorm.log.dir=" + stormLogDir); + commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile); + commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"); + commandList.add("org.apache.storm.LogWriter"); + + commandList.add(SupervisorUtils.javaCmd("java")); + commandList.add("-server"); + commandList.addAll(workerChildopts); + commandList.addAll(topWorkerChildopts); + commandList.addAll(gcOpts); + commandList.addAll(workerProfilerChildopts); + commandList.add("-Djava.library.path=" + jlp); + commandList.add("-Dlogfile.name=" + logfileName); + commandList.add("-Dstorm.home=" + stormHome); + commandList.add("-Dworkers.artifacts=" + workersArtifacets); + commandList.add("-Dstorm.conf.file=" + stormConfFile); + commandList.add("-Dstorm.options=" + stormOptions); + commandList.add("-Dstorm.log.dir=" + stormLogDir); + commandList.add("-Djava.io.tmpdir=" + workerTmpDir); + commandList.add("-Dlogging.sensitivity=" + loggingSensitivity); + commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile); + commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"); + commandList.add("-Dstorm.id=" + stormId); + commandList.add("-Dworker.id=" + workerId); + commandList.add("-Dworker.port=" + port); + commandList.add("-cp"); + commandList.add(workerClassPath); + commandList.add("org.apache.storm.daemon.worker"); + commandList.add(stormId); + commandList.add(assignmentId); + commandList.add(String.valueOf(port)); + commandList.add(workerId); + + // {"cpu" cpu "memory" (+ mem-onheap mem-offheap (int (Math/ceil (conf STORM-CGROUP-MEMORY-LIMIT-TOLERANCE-MARGIN-MB)))) + if (resourceIsolationManager != null) { + int cGroupMem = (int) (Math.ceil((double) conf.get(Config.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB))); + int memoryValue = memoffheap + memOnheap + cGroupMem; + int cpuValue = cpu; + Map<String, Number> map = new HashMap<>(); + map.put("cpu", cpuValue); + map.put("memory", memoryValue); + resourceIsolationManager.reserveResourcesForWorker(workerId, map); + commandList = resourceIsolationManager.getLaunchCommand(workerId, commandList); + } + + LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList)); + + String logPrefix = "Worker Process " + workerId; + String workerDir = ConfigUtils.workerRoot(conf, workerId); + + if (runWorkerAsUser) { + List<String> args = new ArrayList<>(); + args.add("worker"); + args.add(workerDir); + args.add(Utils.writeScript(workerDir, commandList, topEnvironment)); + List<String> commandPrefix = null; + if (resourceIsolationManager != null) + commandPrefix = resourceIsolationManager.getLaunchCommandPrefix(workerId); + SupervisorUtils.processLauncher(conf, user, commandPrefix, args, null, logPrefix, workerExitCallback, new File(workerDir)); + } else { + Utils.launchProcess(commandList, topEnvironment, logPrefix, workerExitCallback, new File(workerDir)); + } + } catch (IOException e) { + throw Utils.wrapInRuntime(e); + } - return null; + } + + @Override - public IWorkerResult shutdownWorker(String supervisorId, String workerId, Map<String, String> workerThreadPids) { ++ public void shutdownWorker(String supervisorId, String workerId, Map<String, String> workerThreadPids) { + try { + LOG.info("Shutting down {}:{}", supervisorId, workerId); + Collection<String> pids = Utils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId)); + Integer shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)); + String user = ConfigUtils.getWorkerUser(conf, workerId); + String threadPid = workerThreadPids.get(workerId); + if (StringUtils.isNotBlank(threadPid)) { + ProcessSimulator.killProcess(threadPid); + } + + for (String pid : pids) { + if (runWorkerAsUser) { + List<String> commands = new ArrayList<>(); + commands.add("signal"); + commands.add(pid); + commands.add("15"); + String logPrefix = "kill -15 " + pid; + SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPrefix); + } else { + Utils.killProcessWithSigTerm(pid); + } + } + + if (pids.size() > 0) { + LOG.info("Sleep {} seconds for execution of cleanup threads on worker.", shutdownSleepSecs); + Time.sleepSecs(shutdownSleepSecs); + } + + for (String pid : pids) { + if (runWorkerAsUser) { + List<String> commands = new ArrayList<>(); + commands.add("signal"); + commands.add(pid); + commands.add("9"); + String logPrefix = "kill -9 " + pid; + SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPrefix); + } else { + Utils.forceKillProcess(pid); + } + String path = ConfigUtils.workerPidPath(conf, workerId, pid); + if (runWorkerAsUser) { + SupervisorUtils.rmrAsUser(conf, workerId, path); + } else { + try { + LOG.debug("Removing path {}", path); + new File(path).delete(); + } catch (Exception e) { + // on windows, the supervisor may still holds the lock on the worker directory + // ignore + } + } + } + LOG.info("Shut down {}:{}", supervisorId, workerId); + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } - return null; + } + + @Override + public boolean cleanupWorker(String workerId) { + try { + //clean up for resource isolation if enabled + if (resourceIsolationManager != null) { + resourceIsolationManager.releaseResourcesForWorker(workerId); + } + //Always make sure to clean up everything else before worker directory + //is removed since that is what is going to trigger the retry for cleanup + String workerRoot = ConfigUtils.workerRoot(conf, workerId); + if (Utils.checkFileExists(workerRoot)) { + if (runWorkerAsUser) { + SupervisorUtils.rmrAsUser(conf, workerId, workerRoot); + } else { + Utils.forceDelete(ConfigUtils.workerHeartbeatsRoot(conf, workerId)); + Utils.forceDelete(ConfigUtils.workerPidsRoot(conf, workerId)); + Utils.forceDelete(ConfigUtils.workerTmpRoot(conf, workerId)); + Utils.forceDelete(ConfigUtils.workerRoot(conf, workerId)); + } + ConfigUtils.removeWorkerUserWSE(conf, workerId); + } + return true; + } catch (IOException e) { + LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e); + } catch (RuntimeException e) { + LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e); + } + return false; + } + - @Override - public IWorkerResult resizeWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources) { - return null; - } - + protected String jlp(String stormRoot, Map conf) { + String resourceRoot = stormRoot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR; + String os = System.getProperty("os.name").replaceAll("\\s+", "_"); + String arch = System.getProperty("os.arch"); + String archResourceRoot = resourceRoot + Utils.FILE_PATH_SEPARATOR + os + "-" + arch; + String ret = archResourceRoot + Utils.CLASS_PATH_SEPARATOR + resourceRoot + Utils.CLASS_PATH_SEPARATOR + conf.get(Config.JAVA_LIBRARY_PATH); + return ret; + } + + protected String getWorkerClassPath(String stormJar, Map stormConf) { + List<String> topoClasspath = new ArrayList<>(); + Object object = stormConf.get(Config.TOPOLOGY_CLASSPATH); + + if (object instanceof List) { + topoClasspath.addAll((List<String>) object); + } else if (object instanceof String) { + topoClasspath.add((String) object); - } else { - LOG.error("topology specific classpath is invaild"); + } ++ LOG.debug("topology specific classpath is {}", object); ++ + String classPath = Utils.workerClasspath(); + String classAddPath = Utils.addToClasspath(classPath, Arrays.asList(stormJar)); + return Utils.addToClasspath(classAddPath, topoClasspath); + } + ++ private static String substituteChildOptsInternal(String string, String workerId, String stormId, Long port, int memOnheap) { ++ if (StringUtils.isNotBlank(string)){ ++ string = string.replace("%ID%", String.valueOf(port)); ++ string = string.replace("%WORKER-ID%", workerId); ++ string = string.replace("%TOPOLOGY-ID%", stormId); ++ string = string.replace("%WORKER-PORT%", String.valueOf(port)); ++ string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap)); ++ } ++ return string; ++ } ++ + /** + * "Generates runtime childopts by replacing keys with topology-id, worker-id, port, mem-onheap" + * + * @param value + * @param workerId + * @param stormId + * @param port + * @param memOnheap + */ + public List<String> substituteChildopts(Object value, String workerId, String stormId, Long port, int memOnheap) { + List<String> rets = new ArrayList<>(); + if (value instanceof String) { - String string = (String) value; ++ String string = substituteChildOptsInternal((String) value, workerId, stormId, port, memOnheap); + if (StringUtils.isNotBlank(string)){ - string = string.replace("%ID%", String.valueOf(port)); - string = string.replace("%WORKER-ID%", workerId); - string = string.replace("%TOPOLOGY-ID%", stormId); - string = string.replace("%WORKER-PORT%", String.valueOf(port)); - string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap)); + String[] strings = string.split("\\s+"); + rets.addAll(Arrays.asList(strings)); + } - + } else if (value instanceof List) { + List<Object> objects = (List<Object>) value; + for (Object object : objects) { - String str = (String) object; ++ String str = substituteChildOptsInternal((String) object, workerId, stormId, port, memOnheap); + if (StringUtils.isNotBlank(str)){ - str = str.replace("%ID%", String.valueOf(port)); - str = str.replace("%WORKER-ID%", workerId); - str = str.replace("%TOPOLOGY-ID%", stormId); - str = str.replace("%WORKER-PORT%", String.valueOf(port)); - str = str.replace("%HEAP-MEM%", String.valueOf(memOnheap)); + rets.add(str); + } + } + } + return rets; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/dba69b52/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java index 3b0912a,0000000..e62b9d8 mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/IWorkerManager.java @@@ -1,38 -1,0 +1,35 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor.workermanager; + +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.Utils; + +import java.util.List; +import java.util.Map; + +public interface IWorkerManager { - public void prepareWorker(Map conf, Localizer localizer); ++ void prepareWorker(Map conf, Localizer localizer); + - IWorkerResult launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources, ++ void launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources, + Utils.ExitCodeCallable workerExitCallback); ++ void shutdownWorker(String supervisorId, String workerId, Map<String, String> workerThreadPids); + - IWorkerResult shutdownWorker(String supervisorId, String workerId, Map<String, String> workerThreadPids); - - IWorkerResult resizeWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources); - - public boolean cleanupWorker(String workerId); ++ boolean cleanupWorker(String workerId); +}
