http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java new file mode 100644 index 0000000..d76b950 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java @@ -0,0 +1,549 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Writer; +import java.lang.ProcessBuilder.Redirect; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.storm.Config; +import org.apache.storm.container.ResourceIsolationInterface; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +/** + * Represents a container that a worker will run in. + */ +public abstract class Container implements Killable { + private static final Logger LOG = LoggerFactory.getLogger(Container.class); + public static enum ContainerType { + LAUNCH(false, false), + RECOVER_FULL(true, false), + RECOVER_PARTIAL(true, true); + + private final boolean _recovery; + private final boolean _onlyKillable; + + ContainerType(boolean recovery, boolean onlyKillable) { + _recovery = recovery; + _onlyKillable = onlyKillable; + } + + public boolean isRecovery() { + return _recovery; + } + + public void assertFull() { + if (_onlyKillable) { + throw new IllegalStateException("Container is only Killable."); + } + } + + public boolean isOnlyKillable() { + return _onlyKillable; + } + } + + protected final Map<String, Object> _conf; + protected final Map<String, Object> _topoConf; //Not set if RECOVER_PARTIAL + protected String _workerId; + protected final String _topologyId; //Not set if RECOVER_PARTIAL + protected final String _supervisorId; + protected final int _port; //Not set if RECOVER_PARTIAL + protected final LocalAssignment _assignment; //Not set if RECOVER_PARTIAL + protected final AdvancedFSOps _ops; + protected final ResourceIsolationInterface _resourceIsolationManager; + protected ContainerType _type; + + /** + * Create a new Container. + * @param type the type of container being made. + * @param conf the supervisor config + * @param supervisorId the ID of the supervisor this is a part of. + * @param port the port the container is on. Should be <= 0 if only a partial recovery + * @param assignment the assignment for this container. Should be null if only a partial recovery. + * @param resourceIsolationManager used to isolate resources for a container can be null if no isolation is used. + * @param workerId the id of the worker to use. Must not be null if doing a partial recovery. + * @param topoConf the config of the topology (mostly for testing) if null + * and not a partial recovery the real conf is read. + * @param ops file system operations (mostly for testing) if null a new one is made + * @throws IOException on any error. + */ + protected Container(ContainerType type, Map<String, Object> conf, String supervisorId, + int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, + String workerId, Map<String, Object> topoConf, AdvancedFSOps ops) throws IOException { + assert(type != null); + assert(conf != null); + assert(supervisorId != null); + + if (ops == null) { + ops = AdvancedFSOps.make(conf); + } + + _workerId = workerId; + _type = type; + _port = port; + _ops = ops; + _conf = conf; + _supervisorId = supervisorId; + _resourceIsolationManager = resourceIsolationManager; + _assignment = assignment; + + if (_type.isOnlyKillable()) { + assert(_assignment == null); + assert(_port <= 0); + assert(_workerId != null); + _topologyId = null; + _topoConf = null; + } else { + assert(assignment != null); + assert(port > 0); + _topologyId = assignment.get_topology_id(); + if (!_ops.doRequiredTopoFilesExist(_conf, _topologyId)) { + LOG.info("Missing topology storm code, so can't launch worker with assignment {} for this supervisor {} on port {} with id {}", _assignment, + _supervisorId, _port, _workerId); + throw new ContainerRecoveryException("Missing required topology files..."); + } + if (topoConf == null) { + _topoConf = readTopoConf(); + } else { + //For testing... + _topoConf = topoConf; + } + } + } + + @Override + public String toString() { + return "topo:" + _topologyId + " worker:" + _workerId; + } + + protected Map<String, Object> readTopoConf() throws IOException { + assert(_topologyId != null); + return ConfigUtils.readSupervisorStormConf(_conf, _topologyId); + } + + /** + * Kill a given process + * @param pid the id of the process to kill + * @throws IOException + */ + protected void kill(long pid) throws IOException { + Utils.killProcessWithSigTerm(String.valueOf(pid)); + } + + /** + * Kill a given process + * @param pid the id of the process to kill + * @throws IOException + */ + protected void forceKill(long pid) throws IOException { + Utils.forceKillProcess(String.valueOf(pid)); + } + + @Override + public void kill() throws IOException { + LOG.info("Killing {}:{}", _supervisorId, _workerId); + Set<Long> pids = getAllPids(); + + for (Long pid : pids) { + kill(pid); + } + } + + @Override + public void forceKill() throws IOException { + LOG.info("Force Killing {}:{}", _supervisorId, _workerId); + Set<Long> pids = getAllPids(); + + for (Long pid : pids) { + forceKill(pid); + } + } + + /** + * Read the Heartbeat for the current container. + * @return the Heartbeat + * @throws IOException on any error + */ + public LSWorkerHeartbeat readHeartbeat() throws IOException { + LocalState localState = ConfigUtils.workerState(_conf, _workerId); + LSWorkerHeartbeat hb = localState.getWorkerHeartBeat(); + LOG.trace("{}: Reading heartbeat {}", _workerId, hb); + return hb; + } + + /** + * Is a process alive and running? + * @param pid the PID of the running process + * @param user the user that is expected to own that process + * @return true if it is, else false + * @throws IOException on any error + */ + protected boolean isProcessAlive(long pid, String user) throws IOException { + if (Utils.IS_ON_WINDOWS) { + return isWindowsProcessAlive(pid, user); + } + return isPosixProcessAlive(pid, user); + } + + private boolean isWindowsProcessAlive(long pid, String user) throws IOException { + boolean ret = false; + ProcessBuilder pb = new ProcessBuilder("tasklist", "/fo", "list", "/fi", "pid eq " + pid, "/v"); + pb.redirectError(Redirect.INHERIT); + Process p = pb.start(); + try (BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()))) { + String read; + while ((read = in.readLine()) != null) { + if (read.contains("User Name:")) { //Check for : in case someone called their user "User Name" + //This line contains the user name for the pid we're looking up + //Example line: "User Name: exampleDomain\exampleUser" + List<String> userNameLineSplitOnWhitespace = Arrays.asList(read.split(":")); + if(userNameLineSplitOnWhitespace.size() == 2){ + List<String> userAndMaybeDomain = Arrays.asList(userNameLineSplitOnWhitespace.get(1).trim().split("\\\\")); + String processUser = userAndMaybeDomain.size() == 2 ? userAndMaybeDomain.get(1) : userAndMaybeDomain.get(0); + if(user.equals(processUser)){ + ret = true; + } else { + LOG.info("Found {} running as {}, but expected it to be {}", pid, processUser, user); + } + } else { + LOG.error("Received unexpected output from tasklist command. Expected one colon in user name line. Line was {}", read); + } + break; + } + } + } + return ret; + } + + private boolean isPosixProcessAlive(long pid, String user) throws IOException { + boolean ret = false; + ProcessBuilder pb = new ProcessBuilder("ps", "-o", "user", "-p", String.valueOf(pid)); + pb.redirectError(Redirect.INHERIT); + Process p = pb.start(); + try (BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()))) { + String first = in.readLine(); + assert("USER".equals(first)); + String processUser; + while ((processUser = in.readLine()) != null) { + if (user.equals(processUser)) { + ret = true; + break; + } else { + LOG.info("Found {} running as {}, but expected it to be {}", pid, processUser, user); + } + } + } + return ret; + } + + @Override + public boolean areAllProcessesDead() throws IOException { + Set<Long> pids = getAllPids(); + String user = getWorkerUser(); + + boolean allDead = true; + for (Long pid: pids) { + if (!isProcessAlive(pid, user)) { + LOG.debug("{}: PID {} is dead", _workerId, pid); + } else { + allDead = false; + break; + } + } + return allDead; + } + + @Override + public void cleanUp() throws IOException { + cleanUpForRestart(); + } + + /** + * Setup the container to run. By default this creates the needed directories/links in the + * local file system + * PREREQUISITE: All needed blobs and topology, jars/configs have been downloaded and + * placed in the appropriate locations + * @throws IOException on any error + */ + protected void setup() throws IOException { + _type.assertFull(); + if (!_ops.doRequiredTopoFilesExist(_conf, _topologyId)) { + LOG.info("Missing topology storm code, so can't launch worker with assignment {} for this supervisor {} on port {} with id {}", _assignment, + _supervisorId, _port, _workerId); + throw new IllegalStateException("Not all needed files are here!!!!"); + } + LOG.info("Setting up {}:{}", _supervisorId, _workerId); + + _ops.forceMkdir(new File(ConfigUtils.workerPidsRoot(_conf, _workerId))); + _ops.forceMkdir(new File(ConfigUtils.workerTmpRoot(_conf, _workerId))); + _ops.forceMkdir(new File(ConfigUtils.workerHeartbeatsRoot(_conf, _workerId))); + + File workerArtifacts = new File(ConfigUtils.workerArtifactsRoot(_conf, _topologyId, _port)); + if (!_ops.fileExists(workerArtifacts)) { + _ops.forceMkdir(workerArtifacts); + _ops.setupStormCodeDir(_topoConf, workerArtifacts); + } + + String user = getWorkerUser(); + writeLogMetadata(user); + saveWorkerUser(user); + createArtifactsLink(); + createBlobstoreLinks(); + } + + /** + * Write out the file used by the log viewer to allow/reject log access + * @param user the user this is going to run as + * @throws IOException on any error + */ + @SuppressWarnings("unchecked") + protected void writeLogMetadata(String user) throws IOException { + _type.assertFull(); + Map<String, Object> data = new HashMap<>(); + data.put(Config.TOPOLOGY_SUBMITTER_USER, user); + data.put("worker-id", _workerId); + + Set<String> logsGroups = new HashSet<>(); + if (_topoConf.get(Config.LOGS_GROUPS) != null) { + List<String> groups = (List<String>) _topoConf.get(Config.LOGS_GROUPS); + for (String group : groups){ + logsGroups.add(group); + } + } + if (_topoConf.get(Config.TOPOLOGY_GROUPS) != null) { + List<String> topGroups = (List<String>) _topoConf.get(Config.TOPOLOGY_GROUPS); + logsGroups.addAll(topGroups); + } + data.put(Config.LOGS_GROUPS, logsGroups.toArray()); + + Set<String> logsUsers = new HashSet<>(); + if (_topoConf.get(Config.LOGS_USERS) != null) { + List<String> logUsers = (List<String>) _topoConf.get(Config.LOGS_USERS); + for (String logUser : logUsers){ + logsUsers.add(logUser); + } + } + if (_topoConf.get(Config.TOPOLOGY_USERS) != null) { + List<String> topUsers = (List<String>) _topoConf.get(Config.TOPOLOGY_USERS); + for (String logUser : topUsers){ + logsUsers.add(logUser); + } + } + data.put(Config.LOGS_USERS, logsUsers.toArray()); + + File file = ConfigUtils.getLogMetaDataFile(_conf, _topologyId, _port); + + Yaml yaml = new Yaml(); + try (Writer writer = _ops.getWriter(file)) { + yaml.dump(data, writer); + } + } + + /** + * Create symlink from the containers directory/artifacts to the artifacts directory + * @throws IOException on any error + */ + protected void createArtifactsLink() throws IOException { + _type.assertFull(); + File workerDir = new File(ConfigUtils.workerRoot(_conf, _workerId)); + File topoDir = new File(ConfigUtils.workerArtifactsRoot(_conf, _topologyId, _port)); + if (_ops.fileExists(workerDir)) { + LOG.debug("Creating symlinks for worker-id: {} topology-id: {} to its port artifacts directory", _workerId, _topologyId); + _ops.createSymlink(new File(workerDir, "artifacts"), topoDir); + } + } + + /** + * Create symlinks for each of the blobs from the container's directory to + * corresponding links in the storm dist directory. + * @throws IOException on any error. + */ + protected void createBlobstoreLinks() throws IOException { + _type.assertFull(); + String stormRoot = ConfigUtils.supervisorStormDistRoot(_conf, _topologyId); + String workerRoot = ConfigUtils.workerRoot(_conf, _workerId); + + @SuppressWarnings("unchecked") + Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) _topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); + List<String> blobFileNames = new ArrayList<>(); + if (blobstoreMap != null) { + for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet()) { + String key = entry.getKey(); + Map<String, Object> blobInfo = entry.getValue(); + String ret = null; + if (blobInfo != null && blobInfo.containsKey("localname")) { + ret = (String) blobInfo.get("localname"); + } else { + ret = key; + } + blobFileNames.add(ret); + } + } + List<String> resourceFileNames = new ArrayList<>(); + resourceFileNames.add(ConfigUtils.RESOURCES_SUBDIR); + resourceFileNames.addAll(blobFileNames); + LOG.info("Creating symlinks for worker-id: {} storm-id: {} for files({}): {}", _workerId, _topologyId, resourceFileNames.size(), resourceFileNames); + _ops.createSymlink(new File(workerRoot, ConfigUtils.RESOURCES_SUBDIR), + new File(stormRoot, ConfigUtils.RESOURCES_SUBDIR)); + for (String fileName : blobFileNames) { + _ops.createSymlink(new File(workerRoot, fileName), + new File(stormRoot, fileName)); + } + } + + /** + * @return all of the pids that are a part of this container. + */ + protected Set<Long> getAllPids() throws IOException { + Set<Long> ret = new HashSet<>(); + for (String listing: Utils.readDirContents(ConfigUtils.workerPidsRoot(_conf, _workerId))) { + ret.add(Long.valueOf(listing)); + } + + if (_resourceIsolationManager != null) { + Set<Long> morePids = _resourceIsolationManager.getRunningPIDs(_workerId); + assert(morePids != null); + ret.addAll(morePids); + } + + return ret; + } + + /** + * @return the user that some operations should be done as. + * @throws IOException on any error + */ + protected String getWorkerUser() throws IOException { + LOG.info("GET worker-user for {}", _workerId); + File file = new File(ConfigUtils.workerUserFile(_conf, _workerId)); + + if (_ops.fileExists(file)) { + return _ops.slurpString(file).trim(); + } else if (_topoConf != null) { + return (String) _topoConf.get(Config.TOPOLOGY_SUBMITTER_USER); + } + if (ConfigUtils.isLocalMode(_conf)) { + return System.getProperty("user.name"); + } else { + File f = new File(ConfigUtils.workerArtifactsRoot(_conf)); + if (f.exists()) { + return Files.getOwner(f.toPath()).getName(); + } + throw new IllegalStateException("Could not recover the user for " + _workerId); + } + } + + protected void saveWorkerUser(String user) throws IOException { + _type.assertFull(); + LOG.info("SET worker-user {} {}", _workerId, user); + _ops.dump(new File(ConfigUtils.workerUserFile(_conf, _workerId)), user); + } + + protected void deleteSavedWorkerUser() throws IOException { + LOG.info("REMOVE worker-user {}", _workerId); + _ops.deleteIfExists(new File(ConfigUtils.workerUserFile(_conf, _workerId))); + } + + /** + * Clean up the container partly preparing for restart. + * By default delete all of the temp directories we are going + * to get a new worker_id anyways. + * POST CONDITION: the workerId will be set to null + * @throws IOException on any error + */ + public void cleanUpForRestart() throws IOException { + LOG.info("Cleaning up {}:{}", _supervisorId, _workerId); + Set<Long> pids = getAllPids(); + String user = getWorkerUser(); + + for (Long pid : pids) { + File path = new File(ConfigUtils.workerPidPath(_conf, _workerId, pid)); + _ops.deleteIfExists(path, user, _workerId); + } + + //clean up for resource isolation if enabled + if (_resourceIsolationManager != null) { + _resourceIsolationManager.releaseResourcesForWorker(_workerId); + } + + //Always make sure to clean up everything else before worker directory + //is removed since that is what is going to trigger the retry for cleanup + _ops.deleteIfExists(new File(ConfigUtils.workerHeartbeatsRoot(_conf, _workerId)), user, _workerId); + _ops.deleteIfExists(new File(ConfigUtils.workerPidsRoot(_conf, _workerId)), user, _workerId); + _ops.deleteIfExists(new File(ConfigUtils.workerTmpRoot(_conf, _workerId)), user, _workerId); + _ops.deleteIfExists(new File(ConfigUtils.workerRoot(_conf, _workerId)), user, _workerId); + deleteSavedWorkerUser(); + _workerId = null; + } + + /** + * Launch the process for the first time + * PREREQUISITE: setup has run and passed + * @throws IOException on any error + */ + public abstract void launch() throws IOException; + + /** + * Restart the processes in this container + * PREREQUISITE: cleanUpForRestart has run and passed + * @throws IOException on any error + */ + public abstract void relaunch() throws IOException; + + /** + * @return true if the main process exited, else false. This is just best effort return false if unknown. + */ + public abstract boolean didMainProcessExit(); + + /** + * Run a profiling request + * @param request the request to run + * @param stop is this a stop request? + * @return true if it succeeded, else false + * @throws IOException on any error + * @throws InterruptedException if running the command is interrupted. + */ + public abstract boolean runProfiling(ProfileRequest request, boolean stop) throws IOException, InterruptedException; + + /** + * @return the id of the container or null if there is no worker id right now. + */ + public String getWorkerId() { + return _workerId; + } +}
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ContainerLauncher.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ContainerLauncher.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ContainerLauncher.java new file mode 100644 index 0000000..55f167c --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ContainerLauncher.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import java.io.IOException; +import java.util.Map; + +import org.apache.storm.Config; +import org.apache.storm.container.ResourceIsolationInterface; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.messaging.IContext; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Launches containers + */ +public abstract class ContainerLauncher { + private static final Logger LOG = LoggerFactory.getLogger(ContainerLauncher.class); + + /** + * Factory to create the right container launcher + * for the config and the environment. + * @param conf the config + * @param supervisorId the ID of the supervisor + * @param sharedContext Used in local mode to let workers talk together without netty + * @return the proper container launcher + * @throws IOException on any error + */ + public static ContainerLauncher make(Map<String, Object> conf, String supervisorId, IContext sharedContext) throws IOException { + if (ConfigUtils.isLocalMode(conf)) { + return new LocalContainerLauncher(conf, supervisorId, sharedContext); + } + + ResourceIsolationInterface resourceIsolationManager = null; + if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) { + resourceIsolationManager = Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN)); + resourceIsolationManager.prepare(conf); + LOG.info("Using resource isolation plugin {} {}", conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager); + } + + if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { + return new RunAsUserContainerLauncher(conf, supervisorId, resourceIsolationManager); + } + return new BasicContainerLauncher(conf, supervisorId, resourceIsolationManager); + } + + protected ContainerLauncher() { + //Empty + } + + /** + * Launch a container in a given slot + * @param port the port to run this on + * @param assignment what to launch + * @param state the current state of the supervisor + * @return The container that can be used to manager the processes. + * @throws IOException on any error + */ + public abstract Container launchContainer(int port, LocalAssignment assignment, LocalState state) throws IOException; + + /** + * Recover a container for a running process + * @param port the port the assignment is running on + * @param assignment the assignment that was launched + * @param state the current state of the supervisor + * @return The container that can be used to manage the processes. + * @throws IOException on any error + * @throws ContainerRecoveryException if the Container could not be recovered + */ + public abstract Container recoverContainer(int port, LocalAssignment assignment, LocalState state) throws IOException, ContainerRecoveryException; + + /** + * Try to recover a container using just the worker ID. + * The result is really only useful for killing the container + * and so is returning a Killable. Even if a Container is returned + * do not case the result to Container because only the Killable APIs + * are guaranteed to work. + * @param workerId the id of the worker to use + * @param localState the state of the running supervisor + * @return a Killable that can be used to kill the underlying container. + * @throws IOException on any error + * @throws ContainerRecoveryException if the Container could not be recovered + */ + public abstract Killable recoverContainer(String workerId, LocalState localState) throws IOException, ContainerRecoveryException; +} http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ContainerRecoveryException.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ContainerRecoveryException.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ContainerRecoveryException.java new file mode 100644 index 0000000..7ab6e67 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ContainerRecoveryException.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +/** + * Could not recover the container. + */ +public class ContainerRecoveryException extends RuntimeException { + + public ContainerRecoveryException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ExitCodeCallback.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ExitCodeCallback.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ExitCodeCallback.java new file mode 100644 index 0000000..082f205 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ExitCodeCallback.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +/** + * A callback that can accept an integer. + */ +public interface ExitCodeCallback { + + /** + * The process finished + * @param exitCode the exit code of the finished process. + */ + public void call(int exitCode); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Killable.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Killable.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Killable.java new file mode 100644 index 0000000..8d6d8e0 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Killable.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import java.io.IOException; + +public interface Killable { + + /** + * Kill the processes in this container nicely. + * kill -15 equivalent + * @throws IOException on any error + */ + public void kill() throws IOException; + + /** + * Kill the processes in this container violently. + * kill -9 equivalent + * @throws IOException on any error + */ + public void forceKill() throws IOException; + + /** + * @return true if all of the processes are dead, else false + * @throws IOException on any error + */ + public boolean areAllProcessesDead() throws IOException; + + /** + * Clean up the container. It is not coming back. + * by default do the same thing as when restarting. + * @throws IOException on any error + */ + public void cleanUp() throws IOException; +} http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainer.java new file mode 100644 index 0000000..9b196d4 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainer.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import java.io.IOException; +import java.util.Map; + +import org.apache.storm.ProcessSimulator; +import org.apache.storm.daemon.Shutdownable; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.messaging.IContext; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import clojure.java.api.Clojure; +import clojure.lang.IFn; + +public class LocalContainer extends Container { + private static final Logger LOG = LoggerFactory.getLogger(LocalContainer.class); + private volatile boolean _isAlive = false; + private final IContext _sharedContext; + + public LocalContainer(Map<String, Object> conf, String supervisorId, int port, LocalAssignment assignment, IContext sharedContext) throws IOException { + super(ContainerType.LAUNCH, conf, supervisorId, port, assignment, null, null, null, null); + _sharedContext = sharedContext; + _workerId = Utils.uuid(); + } + + @Override + public void launch() throws IOException { + //TODO when worker goes to java, just call it directly (not through clojure) + IFn mkWorker = Clojure.var("org.apache.storm.daemon.worker", "mk-worker"); + + Shutdownable worker = (Shutdownable) mkWorker.invoke(_conf, _sharedContext, _topologyId, _supervisorId, _port, _workerId); + saveWorkerUser(System.getProperty("user.name")); + ProcessSimulator.registerProcess(_workerId, worker); + _isAlive = true; + } + + @Override + public void kill() throws IOException { + ProcessSimulator.killProcess(_workerId); + _isAlive = false; + //Make sure the worker is down before we try to shoot any child processes + super.kill(); + } + + @Override + public boolean areAllProcessesDead() throws IOException { + return !_isAlive && super.areAllProcessesDead(); + } + + @Override + public void relaunch() throws IOException { + LOG.warn("NOOP relaunch in local mode..."); + } + + @Override + public boolean didMainProcessExit() { + //In local mode the main process should never exit on it's own + return false; + } + + @Override + public boolean runProfiling(ProfileRequest request, boolean stop) throws IOException, InterruptedException { + throw new RuntimeException("Profiling requests are not supported in local mode"); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java new file mode 100644 index 0000000..c25bc49 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import java.io.IOException; +import java.util.Map; + +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.messaging.IContext; +import org.apache.storm.utils.LocalState; + +/** + * Launch Containers in local mode. + */ +public class LocalContainerLauncher extends ContainerLauncher { + private final Map<String, Object> _conf; + private final String _supervisorId; + private final IContext _sharedContext; + + public LocalContainerLauncher(Map<String, Object> conf, String supervisorId, IContext sharedContext) { + _conf = conf; + _supervisorId = supervisorId; + _sharedContext = sharedContext; + } + + @Override + public Container launchContainer(int port, LocalAssignment assignment, LocalState state) throws IOException { + LocalContainer ret = new LocalContainer(_conf, _supervisorId, port, assignment, _sharedContext); + ret.setup(); + ret.launch(); + return ret; + } + + @Override + public Container recoverContainer(int port, LocalAssignment assignment, LocalState state) throws IOException { + //We are in the same process we cannot recover anything + throw new ContainerRecoveryException("Local Mode Recovery is not supported"); + } + + @Override + public Killable recoverContainer(String workerId, LocalState localState) throws IOException { + //We are in the same process we cannot recover anything + throw new ContainerRecoveryException("Local Mode Recovery is not supported"); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java new file mode 100644 index 0000000..27f18b6 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java @@ -0,0 +1,327 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.storm.Config; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.cluster.VersionedData; +import org.apache.storm.daemon.supervisor.Slot.MachineState; +import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction; +import org.apache.storm.event.EventManager; +import org.apache.storm.generated.Assignment; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.NodeInfo; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.localizer.ILocalizer; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReadClusterState implements Runnable, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(ReadClusterState.class); + + private final Map<String, Object> superConf; + private final IStormClusterState stormClusterState; + private final EventManager syncSupEventManager; + private final AtomicReference<Map<String, VersionedData<Assignment>>> assignmentVersions; + private final Map<Integer, Slot> slots = new HashMap<>(); + private final AtomicInteger readRetry = new AtomicInteger(0); + private final String assignmentId; + private final ISupervisor iSuper; + private final ILocalizer localizer; + private final ContainerLauncher launcher; + private final String host; + private final LocalState localState; + private final IStormClusterState clusterState; + private final AtomicReference<Map<Long, LocalAssignment>> cachedAssignments; + + public ReadClusterState(Supervisor supervisor) throws Exception { + this.superConf = supervisor.getConf(); + this.stormClusterState = supervisor.getStormClusterState(); + this.syncSupEventManager = supervisor.getEventManger(); + this.assignmentVersions = new AtomicReference<>(new HashMap<>()); + this.assignmentId = supervisor.getAssignmentId(); + this.iSuper = supervisor.getiSupervisor(); + this.localizer = supervisor.getAsyncLocalizer(); + this.host = supervisor.getHostName(); + this.localState = supervisor.getLocalState(); + this.clusterState = supervisor.getStormClusterState(); + this.cachedAssignments = supervisor.getCurrAssignment(); + + this.launcher = ContainerLauncher.make(superConf, assignmentId, supervisor.getSharedContext()); + + @SuppressWarnings("unchecked") + List<Number> ports = (List<Number>)superConf.get(Config.SUPERVISOR_SLOTS_PORTS); + for (Number port: ports) { + slots.put(port.intValue(), mkSlot(port.intValue())); + } + + try { + Collection<String> workers = SupervisorUtils.supervisorWorkerIds(superConf); + for (Slot slot: slots.values()) { + String workerId = slot.getWorkerId(); + if (workerId != null) { + workers.remove(workerId); + } + } + if (!workers.isEmpty()) { + supervisor.killWorkers(workers, launcher); + } + } catch (Exception e) { + LOG.warn("Error trying to clean up old workers", e); + } + + //All the slots/assignments should be recovered now, so we can clean up anything that we don't expect to be here + try { + localizer.cleanupUnusedTopologies(); + } catch (Exception e) { + LOG.warn("Error trying to clean up old topologies", e); + } + + for (Slot slot: slots.values()) { + slot.start(); + } + } + + private Slot mkSlot(int port) throws Exception { + return new Slot(localizer, superConf, launcher, host, port, + localState, clusterState, iSuper, cachedAssignments); + } + + @Override + public synchronized void run() { + try { + Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager); + List<String> stormIds = stormClusterState.assignments(syncCallback); + Map<String, VersionedData<Assignment>> assignmentsSnapshot = + getAssignmentsSnapshot(stormClusterState, stormIds, assignmentVersions.get(), syncCallback); + + Map<Integer, LocalAssignment> allAssignments = + readAssignments(assignmentsSnapshot); + if (allAssignments == null) { + //Something odd happened try again later + return; + } + Map<String, List<ProfileRequest>> topoIdToProfilerActions = getProfileActions(stormClusterState, stormIds); + + HashSet<Integer> assignedPorts = new HashSet<>(); + LOG.debug("Synchronizing supervisor"); + LOG.debug("All assignment: {}", allAssignments); + LOG.debug("Topology Ids -> Profiler Actions {}", topoIdToProfilerActions); + for (Integer port: allAssignments.keySet()) { + if (iSuper.confirmAssigned(port)) { + assignedPorts.add(port); + } + } + HashSet<Integer> allPorts = new HashSet<>(assignedPorts); + allPorts.addAll(slots.keySet()); + + Map<Integer, Set<TopoProfileAction>> filtered = new HashMap<>(); + for (Entry<String, List<ProfileRequest>> entry: topoIdToProfilerActions.entrySet()) { + String topoId = entry.getKey(); + if (entry.getValue() != null) { + for (ProfileRequest req: entry.getValue()) { + NodeInfo ni = req.get_nodeInfo(); + if (host.equals(ni.get_node())) { + Long port = ni.get_port().iterator().next(); + Set<TopoProfileAction> actions = filtered.get(port); + if (actions == null) { + actions = new HashSet<>(); + filtered.put(port.intValue(), actions); + } + actions.add(new TopoProfileAction(topoId, req)); + } + } + } + } + + for (Integer port: allPorts) { + Slot slot = slots.get(port); + if (slot == null) { + slot = mkSlot(port); + slots.put(port, slot); + slot.start(); + } + slot.setNewAssignment(allAssignments.get(port)); + slot.addProfilerActions(filtered.get(port)); + } + + } catch (Exception e) { + LOG.error("Failed to Sync Supervisor", e); + throw new RuntimeException(e); + } + } + + protected Map<String, VersionedData<Assignment>> getAssignmentsSnapshot(IStormClusterState stormClusterState, List<String> topoIds, + Map<String, VersionedData<Assignment>> localAssignmentVersion, Runnable callback) throws Exception { + Map<String, VersionedData<Assignment>> updateAssignmentVersion = new HashMap<>(); + for (String topoId : topoIds) { + Integer recordedVersion = -1; + Integer version = stormClusterState.assignmentVersion(topoId, callback); + VersionedData<Assignment> locAssignment = localAssignmentVersion.get(topoId); + if (locAssignment != null) { + recordedVersion = locAssignment.getVersion(); + } + if (version == null) { + // ignore + } else if (version == recordedVersion) { + updateAssignmentVersion.put(topoId, locAssignment); + } else { + VersionedData<Assignment> assignmentVersion = stormClusterState.assignmentInfoWithVersion(topoId, callback); + updateAssignmentVersion.put(topoId, assignmentVersion); + } + } + return updateAssignmentVersion; + } + + protected Map<String, List<ProfileRequest>> getProfileActions(IStormClusterState stormClusterState, List<String> stormIds) throws Exception { + Map<String, List<ProfileRequest>> ret = new HashMap<String, List<ProfileRequest>>(); + for (String stormId : stormIds) { + List<ProfileRequest> profileRequests = stormClusterState.getTopologyProfileRequests(stormId); + ret.put(stormId, profileRequests); + } + return ret; + } + + protected Map<Integer, LocalAssignment> readAssignments(Map<String, VersionedData<Assignment>> assignmentsSnapshot) { + try { + Map<Integer, LocalAssignment> portLA = new HashMap<>(); + for (Map.Entry<String, VersionedData<Assignment>> assignEntry : assignmentsSnapshot.entrySet()) { + String topoId = assignEntry.getKey(); + Assignment assignment = assignEntry.getValue().getData(); + + Map<Integer, LocalAssignment> portTasks = readMyExecutors(topoId, assignmentId, assignment); + + for (Map.Entry<Integer, LocalAssignment> entry : portTasks.entrySet()) { + + Integer port = entry.getKey(); + + LocalAssignment la = entry.getValue(); + + if (!portLA.containsKey(port)) { + portLA.put(port, la); + } else { + throw new RuntimeException("Should not have multiple topologies assigned to one port " + + port + " " + la + " " + portLA); + } + } + } + readRetry.set(0); + return portLA; + } catch (RuntimeException e) { + if (readRetry.get() > 2) { + throw e; + } else { + readRetry.addAndGet(1); + } + LOG.warn("{} : retrying {} of 3", e.getMessage(), readRetry.get()); + return null; + } + } + + protected Map<Integer, LocalAssignment> readMyExecutors(String stormId, String assignmentId, Assignment assignment) { + Map<Integer, LocalAssignment> portTasks = new HashMap<>(); + Map<Long, WorkerResources> slotsResources = new HashMap<>(); + Map<NodeInfo, WorkerResources> nodeInfoWorkerResourcesMap = assignment.get_worker_resources(); + if (nodeInfoWorkerResourcesMap != null) { + for (Map.Entry<NodeInfo, WorkerResources> entry : nodeInfoWorkerResourcesMap.entrySet()) { + if (entry.getKey().get_node().equals(assignmentId)) { + Set<Long> ports = entry.getKey().get_port(); + for (Long port : ports) { + slotsResources.put(port, entry.getValue()); + } + } + } + } + Map<List<Long>, NodeInfo> executorNodePort = assignment.get_executor_node_port(); + if (executorNodePort != null) { + for (Map.Entry<List<Long>, NodeInfo> entry : executorNodePort.entrySet()) { + if (entry.getValue().get_node().equals(assignmentId)) { + for (Long port : entry.getValue().get_port()) { + LocalAssignment localAssignment = portTasks.get(port.intValue()); + if (localAssignment == null) { + List<ExecutorInfo> executors = new ArrayList<>(); + localAssignment = new LocalAssignment(stormId, executors); + if (slotsResources.containsKey(port)) { + localAssignment.set_resources(slotsResources.get(port)); + } + portTasks.put(port.intValue(), localAssignment); + } + List<ExecutorInfo> executorInfoList = localAssignment.get_executors(); + executorInfoList.add(new ExecutorInfo(entry.getKey().get(0).intValue(), entry.getKey().get(entry.getKey().size() - 1).intValue())); + } + } + } + } + return portTasks; + } + + public synchronized void shutdownAllWorkers() { + for (Slot slot: slots.values()) { + slot.setNewAssignment(null); + } + + for (Slot slot: slots.values()) { + try { + int count = 0; + while (slot.getMachineState() != MachineState.EMPTY) { + if (count > 10) { + LOG.warn("DONE waiting for {} to finish {}", slot, slot.getMachineState()); + break; + } + if (Time.isSimulating()) { + Time.advanceTime(1000); + Thread.sleep(100); + } else { + Time.sleep(100); + } + count++; + } + } catch (Exception e) { + LOG.error("Error trying to shutdown workers in {}", slot, e); + } + } + } + + @Override + public void close() { + for (Slot slot: slots.values()) { + try { + slot.close(); + } catch (Exception e) { + LOG.error("Error trying to shutdown {}", slot, e); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java new file mode 100644 index 0000000..a2d8991 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.storm.container.ResourceIsolationInterface; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RunAsUserContainer extends BasicContainer { + private static final Logger LOG = LoggerFactory.getLogger(RunAsUserContainer.class); + + public RunAsUserContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int port, + LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, LocalState localState, + String workerId) throws IOException { + this(type, conf, supervisorId, port, assignment, resourceIsolationManager, localState, workerId, null, null, null); + } + + RunAsUserContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int port, + LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, LocalState localState, + String workerId, Map<String, Object> topoConf, AdvancedFSOps ops, String profileCmd) throws IOException { + super(type, conf, supervisorId, port, assignment, resourceIsolationManager, localState, workerId, topoConf, ops, + profileCmd); + if (Utils.isOnWindows()) { + throw new UnsupportedOperationException("ERROR: Windows doesn't support running workers as different users yet"); + } + } + + private void signal(long pid, int signal) throws IOException { + List<String> commands = Arrays.asList("signal", String.valueOf(pid), String.valueOf(signal)); + String user = getWorkerUser(); + String logPrefix = "kill -"+signal+" " + pid; + SupervisorUtils.processLauncherAndWait(_conf, user, commands, null, logPrefix); + } + + @Override + protected void kill(long pid) throws IOException { + signal(pid, 15); + } + + @Override + protected void forceKill(long pid) throws IOException { + signal(pid, 9); + } + + @Override + protected boolean runProfilingCommand(List<String> command, Map<String, String> env, String logPrefix, File targetDir) throws IOException, InterruptedException { + String user = this.getWorkerUser(); + String td = targetDir.getAbsolutePath(); + LOG.info("Running as user: {} command: {}", user, command); + String containerFile = Utils.containerFilePath(td); + if (Utils.checkFileExists(containerFile)) { + SupervisorUtils.rmrAsUser(_conf, containerFile, containerFile); + } + String scriptFile = Utils.scriptFilePath(td); + if (Utils.checkFileExists(scriptFile)) { + SupervisorUtils.rmrAsUser(_conf, scriptFile, scriptFile); + } + String script = Utils.writeScript(td, command, env); + List<String> args = Arrays.asList("profiler", td, script); + int ret = SupervisorUtils.processLauncherAndWait(_conf, user, args, env, logPrefix); + return ret == 0; + } + + @Override + protected void launchWorkerProcess(List<String> command, Map<String, String> env, + String logPrefix, ExitCodeCallback processExitCallback, File targetDir) throws IOException { + String workerDir = targetDir.getAbsolutePath(); + String user = this.getWorkerUser(); + List<String> args = Arrays.asList("worker", workerDir, Utils.writeScript(workerDir, command, env)); + List<String> commandPrefix = null; + if (_resourceIsolationManager != null) { + commandPrefix = _resourceIsolationManager.getLaunchCommandPrefix(_workerId); + } + SupervisorUtils.processLauncher(_conf, user, commandPrefix, args, null, logPrefix, processExitCallback, targetDir); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java new file mode 100644 index 0000000..c8bee27 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import java.io.IOException; +import java.util.Map; + +import org.apache.storm.container.ResourceIsolationInterface; +import org.apache.storm.daemon.supervisor.Container.ContainerType; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.utils.LocalState; + +public class RunAsUserContainerLauncher extends ContainerLauncher { + private final Map<String, Object> _conf; + private final String _supervisorId; + protected final ResourceIsolationInterface _resourceIsolationManager; + + public RunAsUserContainerLauncher(Map<String, Object> conf, String supervisorId, ResourceIsolationInterface resourceIsolationManager) throws IOException { + _conf = conf; + _supervisorId = supervisorId; + _resourceIsolationManager = resourceIsolationManager; + } + + @Override + public Container launchContainer(int port, LocalAssignment assignment, LocalState state) throws IOException { + Container container = new RunAsUserContainer(ContainerType.LAUNCH, _conf, _supervisorId, port, assignment, + _resourceIsolationManager, state, null, null, null, null); + container.setup(); + container.launch(); + return container; + } + + @Override + public Container recoverContainer(int port, LocalAssignment assignment, LocalState state) throws IOException { + return new RunAsUserContainer(ContainerType.RECOVER_FULL, _conf, _supervisorId, port, assignment, + _resourceIsolationManager, state, null, null, null, null); + } + + @Override + public Killable recoverContainer(String workerId, LocalState localState) throws IOException { + return new RunAsUserContainer(ContainerType.RECOVER_PARTIAL, _conf, _supervisorId, -1, null, + _resourceIsolationManager, localState, workerId, null, null, null); + } + +}