http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java index 10d81e2..b4d6442 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java @@ -8,7 +8,7 @@ * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -33,26 +33,23 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; - import org.apache.storm.Config; import org.apache.storm.DaemonConfig; import org.apache.storm.container.ResourceIsolationInterface; import org.apache.storm.generated.LSWorkerHeartbeat; import org.apache.storm.generated.LocalAssignment; import org.apache.storm.generated.ProfileRequest; -import org.apache.storm.generated.WorkerMetricPoint; import org.apache.storm.generated.WorkerMetricList; +import org.apache.storm.generated.WorkerMetricPoint; import org.apache.storm.generated.WorkerMetrics; import org.apache.storm.metric.StormMetricsRegistry; import org.apache.storm.metricstore.MetricException; import org.apache.storm.metricstore.WorkerMetricsProcessor; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.LocalState; -import org.apache.storm.utils.NimbusClient; import org.apache.storm.utils.ServerConfigUtils; import org.apache.storm.utils.ServerUtils; import org.apache.storm.utils.Utils; -import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.yaml.snakeyaml.Yaml; @@ -66,50 +63,6 @@ public abstract class Container implements Killable { private static final String SYSTEM_COMPONENT_ID = "System"; private static final String INVALID_EXECUTOR_ID = "-1"; private static final String INVALID_STREAM_ID = "None"; - - public static enum ContainerType { - LAUNCH(false, false), - RECOVER_FULL(true, false), - RECOVER_PARTIAL(true, true); - - private final boolean _recovery; - private final boolean _onlyKillable; - - ContainerType(boolean recovery, boolean onlyKillable) { - _recovery = recovery; - _onlyKillable = onlyKillable; - } - - public boolean isRecovery() { - return _recovery; - } - - public void assertFull() { - if (_onlyKillable) { - throw new IllegalStateException("Container is only Killable."); - } - } - - public boolean isOnlyKillable() { - return _onlyKillable; - } - } - - private static class TopoAndMemory { - public final String topoId; - public final long memory; - - public TopoAndMemory(String id, long mem) { - topoId = id; - memory = mem; - } - - @Override - public String toString() { - return "{TOPO: " + topoId + " at " + memory + " MB}"; - } - } - private static final ConcurrentHashMap<Integer, TopoAndMemory> _usedMemory = new ConcurrentHashMap<>(); private static final ConcurrentHashMap<Integer, TopoAndMemory> _reservedMemory = @@ -139,10 +92,9 @@ public abstract class Container implements Killable { return ret; }); } - + protected final Map<String, Object> _conf; protected final Map<String, Object> _topoConf; //Not set if RECOVER_PARTIAL - protected String _workerId; protected final String _topologyId; //Not set if RECOVER_PARTIAL protected final String _supervisorId; protected final int _supervisorPort; @@ -150,38 +102,39 @@ public abstract class Container implements Killable { protected final LocalAssignment _assignment; //Not set if RECOVER_PARTIAL protected final AdvancedFSOps _ops; protected final ResourceIsolationInterface _resourceIsolationManager; - protected ContainerType _type; protected final boolean _symlinksDisabled; + protected String _workerId; + protected ContainerType _type; private long lastMetricProcessTime = 0L; - /** * Create a new Container. - * @param type the type of container being made. - * @param conf the supervisor config - * @param supervisorId the ID of the supervisor this is a part of. - * @param supervisorPort the thrift server port of the supervisor this is a part of. - * @param port the port the container is on. Should be <= 0 if only a partial recovery - * @param assignment the assignment for this container. Should be null if only a partial recovery. + * + * @param type the type of container being made. + * @param conf the supervisor config + * @param supervisorId the ID of the supervisor this is a part of. + * @param supervisorPort the thrift server port of the supervisor this is a part of. + * @param port the port the container is on. Should be <= 0 if only a partial recovery + * @param assignment the assignment for this container. Should be null if only a partial recovery. * @param resourceIsolationManager used to isolate resources for a container can be null if no isolation is used. - * @param workerId the id of the worker to use. Must not be null if doing a partial recovery. - * @param topoConf the config of the topology (mostly for testing) if null - * and not a partial recovery the real conf is read. - * @param ops file system operations (mostly for testing) if null a new one is made + * @param workerId the id of the worker to use. Must not be null if doing a partial recovery. + * @param topoConf the config of the topology (mostly for testing) if null and not a partial recovery the real conf is + * read. + * @param ops file system operations (mostly for testing) if null a new one is made * @throws IOException on any error. */ protected Container(ContainerType type, Map<String, Object> conf, String supervisorId, int supervisorPort, - int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, - String workerId, Map<String, Object> topoConf, AdvancedFSOps ops) throws IOException { - assert(type != null); - assert(conf != null); - assert(supervisorId != null); - - _symlinksDisabled = (boolean)conf.getOrDefault(Config.DISABLE_SYMLINKS, false); - + int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, + String workerId, Map<String, Object> topoConf, AdvancedFSOps ops) throws IOException { + assert (type != null); + assert (conf != null); + assert (supervisorId != null); + + _symlinksDisabled = (boolean) conf.getOrDefault(Config.DISABLE_SYMLINKS, false); + if (ops == null) { ops = AdvancedFSOps.make(conf); } - + _workerId = workerId; _type = type; _port = port; @@ -191,20 +144,22 @@ public abstract class Container implements Killable { _supervisorPort = supervisorPort; _resourceIsolationManager = resourceIsolationManager; _assignment = assignment; - + if (_type.isOnlyKillable()) { - assert(_assignment == null); - assert(_port <= 0); - assert(_workerId != null); + assert (_assignment == null); + assert (_port <= 0); + assert (_workerId != null); _topologyId = null; _topoConf = null; } else { - assert(assignment != null); - assert(port > 0); + assert (assignment != null); + assert (port > 0); _topologyId = assignment.get_topology_id(); if (!_ops.doRequiredTopoFilesExist(_conf, _topologyId)) { - LOG.info("Missing topology storm code, so can't launch worker with assignment {} for this supervisor {} on port {} with id {}", _assignment, - _supervisorId, _port, _workerId); + LOG.info( + "Missing topology storm code, so can't launch worker with assignment {} for this supervisor {} on port {} with id {}", + _assignment, + _supervisorId, _port, _workerId); throw new ContainerRecoveryException("Missing required topology files..."); } if (topoConf == null) { @@ -215,35 +170,37 @@ public abstract class Container implements Killable { } } } - + @Override public String toString() { return "topo:" + _topologyId + " worker:" + _workerId; } - + protected Map<String, Object> readTopoConf() throws IOException { - assert(_topologyId != null); + assert (_topologyId != null); return ConfigUtils.readSupervisorStormConf(_conf, _topologyId); } - + /** * Kill a given process. + * * @param pid the id of the process to kill * @throws IOException */ protected void kill(long pid) throws IOException { ServerUtils.killProcessWithSigTerm(String.valueOf(pid)); } - + /** * Kill a given process. + * * @param pid the id of the process to kill * @throws IOException */ protected void forceKill(long pid) throws IOException { ServerUtils.forceKillProcess(String.valueOf(pid)); } - + @Override public void kill() throws IOException { LOG.info("Killing {}:{}", _supervisorId, _workerId); @@ -253,20 +210,22 @@ public abstract class Container implements Killable { kill(pid); } } - + @Override public void forceKill() throws IOException { LOG.info("Force Killing {}:{}", _supervisorId, _workerId); Set<Long> pids = getAllPids(); - + for (Long pid : pids) { forceKill(pid); } } - + /** * Read the Heartbeat for the current container. + * * @return the Heartbeat + * * @throws IOException on any error */ public LSWorkerHeartbeat readHeartbeat() throws IOException { @@ -278,9 +237,11 @@ public abstract class Container implements Killable { /** * Is a process alive and running?. - * @param pid the PID of the running process + * + * @param pid the PID of the running process * @param user the user that is expected to own that process * @return true if it is, else false + * * @throws IOException on any error */ protected boolean isProcessAlive(long pid, String user) throws IOException { @@ -289,7 +250,7 @@ public abstract class Container implements Killable { } return isPosixProcessAlive(pid, user); } - + private boolean isWindowsProcessAlive(long pid, String user) throws IOException { boolean ret = false; ProcessBuilder pb = new ProcessBuilder("tasklist", "/fo", "list", "/fi", "pid eq " + pid, "/v"); @@ -302,16 +263,17 @@ public abstract class Container implements Killable { //This line contains the user name for the pid we're looking up //Example line: "User Name: exampleDomain\exampleUser" List<String> userNameLineSplitOnWhitespace = Arrays.asList(read.split(":")); - if(userNameLineSplitOnWhitespace.size() == 2){ + if (userNameLineSplitOnWhitespace.size() == 2) { List<String> userAndMaybeDomain = Arrays.asList(userNameLineSplitOnWhitespace.get(1).trim().split("\\\\")); String processUser = userAndMaybeDomain.size() == 2 ? userAndMaybeDomain.get(1) : userAndMaybeDomain.get(0); - if(user.equals(processUser)){ + if (user.equals(processUser)) { ret = true; } else { LOG.info("Found {} running as {}, but expected it to be {}", pid, processUser, user); } } else { - LOG.error("Received unexpected output from tasklist command. Expected one colon in user name line. Line was {}", read); + LOG.error("Received unexpected output from tasklist command. Expected one colon in user name line. Line was {}", + read); } break; } @@ -319,7 +281,7 @@ public abstract class Container implements Killable { } return ret; } - + private boolean isPosixProcessAlive(long pid, String user) throws IOException { boolean ret = false; ProcessBuilder pb = new ProcessBuilder("ps", "-o", "user", "-p", String.valueOf(pid)); @@ -327,7 +289,7 @@ public abstract class Container implements Killable { Process p = pb.start(); try (BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()))) { String first = in.readLine(); - assert("USER".equals(first)); + assert ("USER".equals(first)); String processUser; while ((processUser = in.readLine()) != null) { if (user.equals(processUser)) { @@ -340,14 +302,14 @@ public abstract class Container implements Killable { } return ret; } - + @Override public boolean areAllProcessesDead() throws IOException { Set<Long> pids = getAllPids(); String user = getWorkerUser(); - + boolean allDead = true; - for (Long pid: pids) { + for (Long pid : pids) { if (!isProcessAlive(pid, user)) { LOG.debug("{}: PID {} is dead", _workerId, pid); } else { @@ -364,42 +326,43 @@ public abstract class Container implements Killable { _reservedMemory.remove(_port); cleanUpForRestart(); } - + /** - * Setup the container to run. By default this creates the needed directories/links in the - * local file system - * PREREQUISITE: All needed blobs and topology, jars/configs have been downloaded and - * placed in the appropriate locations + * Setup the container to run. By default this creates the needed directories/links in the local file system PREREQUISITE: All needed + * blobs and topology, jars/configs have been downloaded and placed in the appropriate locations + * * @throws IOException on any error */ protected void setup() throws IOException { _type.assertFull(); if (!_ops.doRequiredTopoFilesExist(_conf, _topologyId)) { - LOG.info("Missing topology storm code, so can't launch worker with assignment {} for this supervisor {} on port {} with id {}", _assignment, - _supervisorId, _port, _workerId); + LOG.info("Missing topology storm code, so can't launch worker with assignment {} for this supervisor {} on port {} with id {}", + _assignment, + _supervisorId, _port, _workerId); throw new IllegalStateException("Not all needed files are here!!!!"); - } + } LOG.info("Setting up {}:{}", _supervisorId, _workerId); _ops.forceMkdir(new File(ConfigUtils.workerPidsRoot(_conf, _workerId))); _ops.forceMkdir(new File(ConfigUtils.workerTmpRoot(_conf, _workerId))); _ops.forceMkdir(new File(ConfigUtils.workerHeartbeatsRoot(_conf, _workerId))); - + File workerArtifacts = new File(ConfigUtils.workerArtifactsRoot(_conf, _topologyId, _port)); if (!_ops.fileExists(workerArtifacts)) { _ops.forceMkdir(workerArtifacts); _ops.setupWorkerArtifactsDir(_assignment.get_owner(), workerArtifacts); } - + String user = getWorkerUser(); writeLogMetadata(user); saveWorkerUser(user); createArtifactsLink(); createBlobstoreLinks(); } - + /** * Write out the file used by the log viewer to allow/reject log access. + * * @param user the user this is going to run as * @throws IOException on any error */ @@ -413,7 +376,7 @@ public abstract class Container implements Killable { Set<String> logsGroups = new HashSet<>(); if (_topoConf.get(DaemonConfig.LOGS_GROUPS) != null) { List<String> groups = (List<String>) _topoConf.get(DaemonConfig.LOGS_GROUPS); - for (String group : groups){ + for (String group : groups) { logsGroups.add(group); } } @@ -426,13 +389,13 @@ public abstract class Container implements Killable { Set<String> logsUsers = new HashSet<>(); if (_topoConf.get(DaemonConfig.LOGS_USERS) != null) { List<String> logUsers = (List<String>) _topoConf.get(DaemonConfig.LOGS_USERS); - for (String logUser : logUsers){ + for (String logUser : logUsers) { logsUsers.add(logUser); } } if (_topoConf.get(Config.TOPOLOGY_USERS) != null) { List<String> topUsers = (List<String>) _topoConf.get(Config.TOPOLOGY_USERS); - for (String logUser : topUsers){ + for (String logUser : topUsers) { logsUsers.add(logUser); } } @@ -445,9 +408,10 @@ public abstract class Container implements Killable { yaml.dump(data, writer); } } - + /** * Create symlink from the containers directory/artifacts to the artifacts directory. + * * @throws IOException on any error */ protected void createArtifactsLink() throws IOException { @@ -461,17 +425,17 @@ public abstract class Container implements Killable { } } } - + /** - * Create symlinks for each of the blobs from the container's directory to - * corresponding links in the storm dist directory. + * Create symlinks for each of the blobs from the container's directory to corresponding links in the storm dist directory. + * * @throws IOException on any error. */ protected void createBlobstoreLinks() throws IOException { _type.assertFull(); String stormRoot = ConfigUtils.supervisorStormDistRoot(_conf, _topologyId); String workerRoot = ConfigUtils.workerRoot(_conf, _workerId); - + @SuppressWarnings("unchecked") Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) _topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); List<String> blobFileNames = new ArrayList<>(); @@ -496,41 +460,44 @@ public abstract class Container implements Killable { resourceFileNames.addAll(blobFileNames); if (!_symlinksDisabled) { - LOG.info("Creating symlinks for worker-id: {} storm-id: {} for files({}): {}", _workerId, _topologyId, resourceFileNames.size(), resourceFileNames); + LOG.info("Creating symlinks for worker-id: {} storm-id: {} for files({}): {}", _workerId, _topologyId, resourceFileNames.size(), + resourceFileNames); if (targetResourcesDir.exists()) { - _ops.createSymlink(new File(workerRoot, ServerConfigUtils.RESOURCES_SUBDIR), targetResourcesDir ); + _ops.createSymlink(new File(workerRoot, ServerConfigUtils.RESOURCES_SUBDIR), targetResourcesDir); } else { - LOG.info("Topology jar for worker-id: {} storm-id: {} does not contain re sources directory {}." , _workerId, _topologyId, targetResourcesDir.toString() ); + LOG.info("Topology jar for worker-id: {} storm-id: {} does not contain re sources directory {}.", _workerId, _topologyId, + targetResourcesDir.toString()); } for (String fileName : blobFileNames) { _ops.createSymlink(new File(workerRoot, fileName), - new File(stormRoot, fileName)); + new File(stormRoot, fileName)); } } else if (blobFileNames.size() > 0) { LOG.warn("Symlinks are disabled, no symlinks created for blobs {}", blobFileNames); } } - + /** * @return all of the pids that are a part of this container. */ protected Set<Long> getAllPids() throws IOException { Set<Long> ret = new HashSet<>(); - for (String listing: ConfigUtils.readDirContents(ConfigUtils.workerPidsRoot(_conf, _workerId))) { + for (String listing : ConfigUtils.readDirContents(ConfigUtils.workerPidsRoot(_conf, _workerId))) { ret.add(Long.valueOf(listing)); } - + if (_resourceIsolationManager != null) { Set<Long> morePids = _resourceIsolationManager.getRunningPids(_workerId); - assert(morePids != null); + assert (morePids != null); ret.addAll(morePids); } - + return ret; } - - /** + + /** * @return the user that some operations should be done as. + * * @throws IOException on any error */ protected String getWorkerUser() throws IOException { @@ -552,40 +519,39 @@ public abstract class Container implements Killable { throw new IllegalStateException("Could not recover the user for " + _workerId); } } - + protected void saveWorkerUser(String user) throws IOException { _type.assertFull(); LOG.info("SET worker-user {} {}", _workerId, user); _ops.dump(new File(ConfigUtils.workerUserFile(_conf, _workerId)), user); } - + protected void deleteSavedWorkerUser() throws IOException { LOG.info("REMOVE worker-user {}", _workerId); _ops.deleteIfExists(new File(ConfigUtils.workerUserFile(_conf, _workerId))); } - + /** - * Clean up the container partly preparing for restart. - * By default delete all of the temp directories we are going - * to get a new worker_id anyways. - * POST CONDITION: the workerId will be set to null + * Clean up the container partly preparing for restart. By default delete all of the temp directories we are going to get a new + * worker_id anyways. POST CONDITION: the workerId will be set to null + * * @throws IOException on any error */ public void cleanUpForRestart() throws IOException { LOG.info("Cleaning up {}:{}", _supervisorId, _workerId); Set<Long> pids = getAllPids(); String user = getWorkerUser(); - + for (Long pid : pids) { File path = new File(ConfigUtils.workerPidPath(_conf, _workerId, pid)); _ops.deleteIfExists(path, user, _workerId); } - + //clean up for resource isolation if enabled if (_resourceIsolationManager != null) { _resourceIsolationManager.releaseResourcesForWorker(_workerId); } - + //Always make sure to clean up everything else before worker directory //is removed since that is what is going to trigger the retry for cleanup _ops.deleteIfExists(new File(ConfigUtils.workerHeartbeatsRoot(_conf, _workerId)), user, _workerId); @@ -597,8 +563,9 @@ public abstract class Container implements Killable { } /** - * Check if the container is over its memory limit AND needs to be killed. This does not necessarily mean - * that it just went over the limit. + * Check if the container is over its memory limit AND needs to be killed. This does not necessarily mean that it just went over the + * limit. + * * @throws IOException on any error */ public boolean isMemoryLimitViolated(LocalAssignment withUpdatedLimits) throws IOException { @@ -674,16 +641,14 @@ public abstract class Container implements Killable { } /** - * Launch the process for the first time. - * PREREQUISITE: setup has run and passed + * Launch the process for the first time. PREREQUISITE: setup has run and passed * * @throws IOException on any error */ public abstract void launch() throws IOException; /** - * Restart the processes in this container. - * PREREQUISITE: cleanUpForRestart has run and passed + * Restart the processes in this container. PREREQUISITE: cleanUpForRestart has run and passed * * @throws IOException on any error */ @@ -698,9 +663,10 @@ public abstract class Container implements Killable { * Run a profiling request. * * @param request the request to run - * @param stop is this a stop request? + * @param stop is this a stop request? * @return true if it succeeded, else false - * @throws IOException on any error + * + * @throws IOException on any error * @throws InterruptedException if running the command is interrupted. */ public abstract boolean runProfiling(ProfileRequest request, boolean stop) throws IOException, InterruptedException; @@ -731,7 +697,7 @@ public abstract class Container implements Killable { long timestamp = System.currentTimeMillis(); double value = _usedMemory.get(_port).memory; WorkerMetricPoint workerMetric = new WorkerMetricPoint(MEMORY_USED_METRIC, timestamp, value, SYSTEM_COMPONENT_ID, - INVALID_EXECUTOR_ID, INVALID_STREAM_ID); + INVALID_EXECUTOR_ID, INVALID_STREAM_ID); WorkerMetricList metricList = new WorkerMetricList(); metricList.add_to_metrics(workerMetric); @@ -751,4 +717,47 @@ public abstract class Container implements Killable { this.lastMetricProcessTime = System.currentTimeMillis(); } } + + public static enum ContainerType { + LAUNCH(false, false), + RECOVER_FULL(true, false), + RECOVER_PARTIAL(true, true); + + private final boolean _recovery; + private final boolean _onlyKillable; + + ContainerType(boolean recovery, boolean onlyKillable) { + _recovery = recovery; + _onlyKillable = onlyKillable; + } + + public boolean isRecovery() { + return _recovery; + } + + public void assertFull() { + if (_onlyKillable) { + throw new IllegalStateException("Container is only Killable."); + } + } + + public boolean isOnlyKillable() { + return _onlyKillable; + } + } + + private static class TopoAndMemory { + public final String topoId; + public final long memory; + + public TopoAndMemory(String id, long mem) { + topoId = id; + memory = mem; + } + + @Override + public String toString() { + return "{TOPO: " + topoId + " at " + memory + " MB}"; + } + } }
http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerLauncher.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerLauncher.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerLauncher.java index 527b321..2f32e38 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerLauncher.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerLauncher.java @@ -1,33 +1,27 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.daemon.supervisor; import java.io.IOException; import java.util.Map; - import org.apache.storm.Config; import org.apache.storm.DaemonConfig; import org.apache.storm.container.ResourceIsolationInterface; import org.apache.storm.generated.LocalAssignment; import org.apache.storm.messaging.IContext; import org.apache.storm.utils.ConfigUtils; -import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,9 +31,13 @@ import org.slf4j.LoggerFactory; */ public abstract class ContainerLauncher { private static final Logger LOG = LoggerFactory.getLogger(ContainerLauncher.class); - + + protected ContainerLauncher() { + //Empty + } + /** - * Factory to create the right container launcher + * Factory to create the right container launcher * for the config and the environment. * @param conf the config * @param supervisorId the ID of the supervisor @@ -49,16 +47,17 @@ public abstract class ContainerLauncher { * @throws IOException on any error */ public static ContainerLauncher make(Map<String, Object> conf, String supervisorId, int supervisorPort, - IContext sharedContext) throws IOException { + IContext sharedContext) throws IOException { if (ConfigUtils.isLocalMode(conf)) { return new LocalContainerLauncher(conf, supervisorId, supervisorPort, sharedContext); } - + ResourceIsolationInterface resourceIsolationManager = null; if (ObjectReader.getBoolean(conf.get(DaemonConfig.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) { resourceIsolationManager = ReflectionUtils.newInstance((String) conf.get(DaemonConfig.STORM_RESOURCE_ISOLATION_PLUGIN)); resourceIsolationManager.prepare(conf); - LOG.info("Using resource isolation plugin {} {}", conf.get(DaemonConfig.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager); + LOG.info("Using resource isolation plugin {} {}", conf.get(DaemonConfig.STORM_RESOURCE_ISOLATION_PLUGIN), + resourceIsolationManager); } if (ObjectReader.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { @@ -66,10 +65,6 @@ public abstract class ContainerLauncher { } return new BasicContainerLauncher(conf, supervisorId, supervisorPort, resourceIsolationManager); } - - protected ContainerLauncher() { - //Empty - } /** * Launch a container in a given slot @@ -80,7 +75,7 @@ public abstract class ContainerLauncher { * @throws IOException on any error */ public abstract Container launchContainer(int port, LocalAssignment assignment, LocalState state) throws IOException; - + /** * Recover a container for a running process * @param port the port the assignment is running on @@ -90,8 +85,9 @@ public abstract class ContainerLauncher { * @throws IOException on any error * @throws ContainerRecoveryException if the Container could not be recovered */ - public abstract Container recoverContainer(int port, LocalAssignment assignment, LocalState state) throws IOException, ContainerRecoveryException; - + public abstract Container recoverContainer(int port, LocalAssignment assignment, LocalState state) throws IOException, + ContainerRecoveryException; + /** * Try to recover a container using just the worker ID. * The result is really only useful for killing the container http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerRecoveryException.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerRecoveryException.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerRecoveryException.java index 7ab6e67..491ffdc 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerRecoveryException.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerRecoveryException.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.daemon.supervisor; /** http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/DefaultUncaughtExceptionHandler.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/DefaultUncaughtExceptionHandler.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/DefaultUncaughtExceptionHandler.java index 8785f86..7b7ff1b 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/DefaultUncaughtExceptionHandler.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/DefaultUncaughtExceptionHandler.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.daemon.supervisor; import org.apache.storm.utils.Utils; @@ -23,6 +18,7 @@ import org.slf4j.LoggerFactory; public class DefaultUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { private static final Logger LOG = LoggerFactory.getLogger(DefaultUncaughtExceptionHandler.class); + @Override public void uncaughtException(Thread t, Throwable e) { LOG.error("Error when processing event", e); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/EventManagerPushCallback.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/EventManagerPushCallback.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/EventManagerPushCallback.java index 79df800..09a7bfb 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/EventManagerPushCallback.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/EventManagerPushCallback.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.daemon.supervisor; import org.apache.storm.event.EventManager; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Killable.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Killable.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Killable.java index 8d6d8e0..420f277 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Killable.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Killable.java @@ -1,46 +1,41 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.daemon.supervisor; import java.io.IOException; public interface Killable { - + /** * Kill the processes in this container nicely. * kill -15 equivalent * @throws IOException on any error */ public void kill() throws IOException; - + /** * Kill the processes in this container violently. * kill -9 equivalent * @throws IOException on any error */ public void forceKill() throws IOException; - + /** * @return true if all of the processes are dead, else false * @throws IOException on any error */ public boolean areAllProcessesDead() throws IOException; - + /** * Clean up the container. It is not coming back. * by default do the same thing as when restarting. http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainer.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainer.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainer.java index 4afaffe..e972feb 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainer.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainer.java @@ -1,25 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.daemon.supervisor; import java.io.IOException; import java.util.Map; - import org.apache.storm.ProcessSimulator; import org.apache.storm.daemon.worker.Worker; import org.apache.storm.generated.LocalAssignment; @@ -31,11 +25,11 @@ import org.slf4j.LoggerFactory; public class LocalContainer extends Container { private static final Logger LOG = LoggerFactory.getLogger(LocalContainer.class); - private volatile boolean _isAlive = false; private final IContext _sharedContext; - + private volatile boolean _isAlive = false; + public LocalContainer(Map<String, Object> conf, String supervisorId, int supervisorPort, int port, - LocalAssignment assignment, IContext sharedContext) throws IOException { + LocalAssignment assignment, IContext sharedContext) throws IOException { super(ContainerType.LAUNCH, conf, supervisorId, supervisorPort, port, assignment, null, null, null, null); _sharedContext = sharedContext; _workerId = Utils.uuid(); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java index 1c43128..c2ff66f 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java @@ -1,25 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.daemon.supervisor; import java.io.IOException; import java.util.Map; - import org.apache.storm.generated.LocalAssignment; import org.apache.storm.messaging.IContext; import org.apache.storm.utils.LocalState; @@ -34,7 +28,7 @@ public class LocalContainerLauncher extends ContainerLauncher { private final IContext _sharedContext; public LocalContainerLauncher(Map<String, Object> conf, String supervisorId, int supervisorPort, - IContext sharedContext) { + IContext sharedContext) { _conf = conf; _supervisorId = supervisorId; _supervisorPort = supervisorPort; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java index 7dc9b0b..a0d35d9 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java @@ -25,8 +25,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * This allows you to submit a Runnable with a key. If the previous submission for that key has not yet run, - * it will be replaced with the latest one. + * This allows you to submit a Runnable with a key. If the previous submission for that key has not yet run, it will be replaced with the + * latest one. */ public class OnlyLatestExecutor<K> { private static final Logger LOG = LoggerFactory.getLogger(OnlyLatestExecutor.class); @@ -40,8 +40,9 @@ public class OnlyLatestExecutor<K> { /** * Run something in the future, but replace it with the latest if it is taking too long + * * @param key what to use to dedupe things. - * @param r what you want to run. + * @param r what you want to run. */ public void execute(final K key, Runnable r) { Runnable old = latest.put(key, r); @@ -55,6 +56,6 @@ public class OnlyLatestExecutor<K> { }); } else { LOG.debug("Replacing runnable for {} - {}", key, r); - } + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java index e065f38..b617345 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.daemon.supervisor; @@ -28,7 +22,6 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; - import org.apache.storm.DaemonConfig; import org.apache.storm.cluster.IStormClusterState; import org.apache.storm.daemon.supervisor.Slot.MachineState; @@ -51,7 +44,17 @@ import org.slf4j.LoggerFactory; public class ReadClusterState implements Runnable, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(ReadClusterState.class); - + private static final long ERROR_MILLIS = 60_000; //1 min. This really means something is wrong. Even on a very slow node + public static final UniFunc<Slot> DEFAULT_ON_ERROR_TIMEOUT = (slot) -> { + throw new IllegalStateException("It took over " + ERROR_MILLIS + "ms to shut down slot " + slot); + }; + public static final UniFunc<Slot> THREAD_DUMP_ON_ERROR = (slot) -> { + LOG.warn("Shutdown of slot {} appears to be stuck\n{}", slot, Utils.threadDump()); + DEFAULT_ON_ERROR_TIMEOUT.call(slot); + }; + private static final long WARN_MILLIS = 1_000; //Initial timeout 1 second. Workers commit suicide after this + public static final UniFunc<Slot> DEFAULT_ON_WARN_TIMEOUT = + (slot) -> LOG.warn("It has taken {}ms so far and {} is still not shut down.", WARN_MILLIS, slot); private final Map<String, Object> superConf; private final IStormClusterState stormClusterState; private final Map<Integer, Slot> slots = new HashMap<>(); @@ -90,14 +93,14 @@ public class ReadClusterState implements Runnable, AutoCloseable { } @SuppressWarnings("unchecked") - List<Number> ports = (List<Number>)superConf.get(DaemonConfig.SUPERVISOR_SLOTS_PORTS); - for (Number port: ports) { + List<Number> ports = (List<Number>) superConf.get(DaemonConfig.SUPERVISOR_SLOTS_PORTS); + for (Number port : ports) { slots.put(port.intValue(), mkSlot(port.intValue())); } - + try { Collection<String> workers = SupervisorUtils.supervisorWorkerIds(superConf); - for (Slot slot: slots.values()) { + for (Slot slot : slots.values()) { String workerId = slot.getWorkerId(); if (workerId != null) { workers.remove(workerId); @@ -109,35 +112,35 @@ public class ReadClusterState implements Runnable, AutoCloseable { } catch (Exception e) { LOG.warn("Error trying to clean up old workers", e); } - - for (Slot slot: slots.values()) { + + for (Slot slot : slots.values()) { slot.start(); } } private Slot mkSlot(int port) throws Exception { return new Slot(localizer, superConf, launcher, host, port, - localState, stormClusterState, iSuper, cachedAssignments, metricsExec, metricsProcessor); + localState, stormClusterState, iSuper, cachedAssignments, metricsExec, metricsProcessor); } - + @Override public synchronized void run() { try { List<String> stormIds = stormClusterState.assignments(null); Map<String, Assignment> assignmentsSnapshot = getAssignmentsSnapshot(stormClusterState); - + Map<Integer, LocalAssignment> allAssignments = readAssignments(assignmentsSnapshot); if (allAssignments == null) { //Something odd happened try again later return; } Map<String, List<ProfileRequest>> topoIdToProfilerActions = getProfileActions(stormClusterState, stormIds); - + HashSet<Integer> assignedPorts = new HashSet<>(); LOG.debug("Synchronizing supervisor"); LOG.debug("All assignment: {}", allAssignments); LOG.debug("Topology Ids -> Profiler Actions {}", topoIdToProfilerActions); - for (Integer port: allAssignments.keySet()) { + for (Integer port : allAssignments.keySet()) { if (iSuper.confirmAssigned(port)) { assignedPorts.add(port); } @@ -145,12 +148,12 @@ public class ReadClusterState implements Runnable, AutoCloseable { HashSet<Integer> allPorts = new HashSet<>(assignedPorts); iSuper.assigned(allPorts); allPorts.addAll(slots.keySet()); - + Map<Integer, Set<TopoProfileAction>> filtered = new HashMap<>(); - for (Entry<String, List<ProfileRequest>> entry: topoIdToProfilerActions.entrySet()) { + for (Entry<String, List<ProfileRequest>> entry : topoIdToProfilerActions.entrySet()) { String topoId = entry.getKey(); if (entry.getValue() != null) { - for (ProfileRequest req: entry.getValue()) { + for (ProfileRequest req : entry.getValue()) { NodeInfo ni = req.get_nodeInfo(); if (host.equals(ni.get_node())) { Long port = ni.get_port().iterator().next(); @@ -164,8 +167,8 @@ public class ReadClusterState implements Runnable, AutoCloseable { } } } - - for (Integer port: allPorts) { + + for (Integer port : allPorts) { Slot slot = slots.get(port); if (slot == null) { slot = mkSlot(port); @@ -175,18 +178,19 @@ public class ReadClusterState implements Runnable, AutoCloseable { slot.setNewAssignment(allAssignments.get(port)); slot.addProfilerActions(filtered.get(port)); } - + } catch (Exception e) { LOG.error("Failed to Sync Supervisor", e); throw new RuntimeException(e); } } - + protected Map<String, Assignment> getAssignmentsSnapshot(IStormClusterState stormClusterState) throws Exception { return stormClusterState.assignmentsInfo(); } - - protected Map<String, List<ProfileRequest>> getProfileActions(IStormClusterState stormClusterState, List<String> stormIds) throws Exception { + + protected Map<String, List<ProfileRequest>> getProfileActions(IStormClusterState stormClusterState, List<String> stormIds) throws + Exception { Map<String, List<ProfileRequest>> ret = new HashMap<String, List<ProfileRequest>>(); for (String stormId : stormIds) { List<ProfileRequest> profileRequests = stormClusterState.getTopologyProfileRequests(stormId); @@ -194,7 +198,7 @@ public class ReadClusterState implements Runnable, AutoCloseable { } return ret; } - + protected Map<Integer, LocalAssignment> readAssignments(Map<String, Assignment> assignmentsSnapshot) { try { Map<Integer, LocalAssignment> portLA = new HashMap<>(); @@ -214,7 +218,7 @@ public class ReadClusterState implements Runnable, AutoCloseable { portLA.put(port, la); } else { throw new RuntimeException("Should not have multiple topologies assigned to one port " - + port + " " + la + " " + portLA); + + port + " " + la + " " + portLA); } } } @@ -230,7 +234,7 @@ public class ReadClusterState implements Runnable, AutoCloseable { return null; } } - + protected Map<Integer, LocalAssignment> readMyExecutors(String topoId, String assignmentId, Assignment assignment) { Map<Integer, LocalAssignment> portTasks = new HashMap<>(); Map<Long, WorkerResources> slotsResources = new HashMap<>(); @@ -276,7 +280,7 @@ public class ReadClusterState implements Runnable, AutoCloseable { } List<ExecutorInfo> executorInfoList = localAssignment.get_executors(); executorInfoList.add(new ExecutorInfo(entry.getKey().get(0).intValue(), - entry.getKey().get(entry.getKey().size() - 1).intValue())); + entry.getKey().get(entry.getKey().size() - 1).intValue())); } } } @@ -284,34 +288,23 @@ public class ReadClusterState implements Runnable, AutoCloseable { return portTasks; } - private static final long WARN_MILLIS = 1_000; //Initial timeout 1 second. Workers commit suicide after this - private static final long ERROR_MILLIS = 60_000; //1 min. This really means something is wrong. Even on a very slow node - public static final UniFunc<Slot> DEFAULT_ON_ERROR_TIMEOUT = (slot) -> { - throw new IllegalStateException("It took over " + ERROR_MILLIS + "ms to shut down slot " + slot); - }; - public static final UniFunc<Slot> DEFAULT_ON_WARN_TIMEOUT = (slot) -> LOG.warn("It has taken {}ms so far and {} is still not shut down.", WARN_MILLIS, slot); - public static final UniFunc<Slot> THREAD_DUMP_ON_ERROR = (slot) -> { - LOG.warn("Shutdown of slot {} appears to be stuck\n{}", slot, Utils.threadDump()); - DEFAULT_ON_ERROR_TIMEOUT.call(slot); - }; - public synchronized void shutdownAllWorkers(UniFunc<Slot> onWarnTimeout, UniFunc<Slot> onErrorTimeout) { - for (Slot slot: slots.values()) { + for (Slot slot : slots.values()) { LOG.info("Setting {} assignment to null", slot); slot.setNewAssignment(null); } - + if (onWarnTimeout == null) { onWarnTimeout = DEFAULT_ON_WARN_TIMEOUT; } - + if (onErrorTimeout == null) { onErrorTimeout = DEFAULT_ON_ERROR_TIMEOUT; } - + long startTime = Time.currentTimeMillis(); Exception exp = null; - for (Slot slot: slots.values()) { + for (Slot slot : slots.values()) { LOG.info("Waiting for {} to be EMPTY, currently {}", slot, slot.getMachineState()); try { while (slot.getMachineState() != MachineState.EMPTY) { @@ -319,7 +312,7 @@ public class ReadClusterState implements Runnable, AutoCloseable { if (timeSpentMillis > ERROR_MILLIS) { onErrorTimeout.call(slot); } - + if (timeSpentMillis > WARN_MILLIS) { onWarnTimeout.call(slot); } @@ -335,15 +328,15 @@ public class ReadClusterState implements Runnable, AutoCloseable { } if (exp != null) { if (exp instanceof RuntimeException) { - throw (RuntimeException)exp; + throw (RuntimeException) exp; } throw new RuntimeException(exp); } } - + @Override public void close() { - for (Slot slot: slots.values()) { + for (Slot slot : slots.values()) { try { slot.close(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainer.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainer.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainer.java index 2559d7a..eadd635 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainer.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainer.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.daemon.supervisor; import java.io.File; @@ -22,12 +23,11 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Map; - import org.apache.storm.container.ResourceIsolationInterface; import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.utils.LocalState; import org.apache.storm.utils.ServerUtils; import org.apache.storm.utils.Utils; -import org.apache.storm.utils.LocalState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,15 +39,15 @@ public class RunAsUserContainer extends BasicContainer { ResourceIsolationInterface resourceIsolationManager, LocalState localState, String workerId) throws IOException { this(type, conf, supervisorId, supervisorPort, port, assignment, resourceIsolationManager, localState, workerId, - null, null, null); + null, null, null); } - + RunAsUserContainer(Container.ContainerType type, Map<String, Object> conf, String supervisorId, int supervisorPort, int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, LocalState localState, String workerId, Map<String, Object> topoConf, AdvancedFSOps ops, String profileCmd) throws IOException { super(type, conf, supervisorId, supervisorPort, port, assignment, resourceIsolationManager, localState, - workerId, topoConf, ops, profileCmd); + workerId, topoConf, ops, profileCmd); if (Utils.isOnWindows()) { throw new UnsupportedOperationException("ERROR: Windows doesn't support running workers as different users yet"); } @@ -56,22 +56,23 @@ public class RunAsUserContainer extends BasicContainer { private void signal(long pid, int signal) throws IOException { List<String> commands = Arrays.asList("signal", String.valueOf(pid), String.valueOf(signal)); String user = getWorkerUser(); - String logPrefix = "kill -"+signal+" " + pid; + String logPrefix = "kill -" + signal + " " + pid; ClientSupervisorUtils.processLauncherAndWait(_conf, user, commands, null, logPrefix); } - + @Override protected void kill(long pid) throws IOException { signal(pid, 15); } - + @Override protected void forceKill(long pid) throws IOException { signal(pid, 9); } - + @Override - protected boolean runProfilingCommand(List<String> command, Map<String, String> env, String logPrefix, File targetDir) throws IOException, InterruptedException { + protected boolean runProfilingCommand(List<String> command, Map<String, String> env, String logPrefix, File targetDir) throws + IOException, InterruptedException { String user = this.getWorkerUser(); String td = targetDir.getAbsolutePath(); LOG.info("Running as user: {} command: {}", user, command); @@ -90,8 +91,8 @@ public class RunAsUserContainer extends BasicContainer { } @Override - protected void launchWorkerProcess(List<String> command, Map<String, String> env, - String logPrefix, ExitCodeCallback processExitCallback, File targetDir) throws IOException { + protected void launchWorkerProcess(List<String> command, Map<String, String> env, + String logPrefix, ExitCodeCallback processExitCallback, File targetDir) throws IOException { String workerDir = targetDir.getAbsolutePath(); String user = this.getWorkerUser(); List<String> args = Arrays.asList("worker", workerDir, ServerUtils.writeScript(workerDir, command, env)); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java index e6439db..c0bb47f 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java @@ -1,38 +1,32 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.daemon.supervisor; import java.io.IOException; import java.util.Map; - import org.apache.storm.container.ResourceIsolationInterface; import org.apache.storm.daemon.supervisor.Container.ContainerType; import org.apache.storm.generated.LocalAssignment; import org.apache.storm.utils.LocalState; public class RunAsUserContainerLauncher extends ContainerLauncher { + protected final ResourceIsolationInterface _resourceIsolationManager; private final Map<String, Object> _conf; private final String _supervisorId; private final int _supervisorPort; - protected final ResourceIsolationInterface _resourceIsolationManager; - + public RunAsUserContainerLauncher(Map<String, Object> conf, String supervisorId, int supervisorPort, - ResourceIsolationInterface resourceIsolationManager) throws IOException { + ResourceIsolationInterface resourceIsolationManager) throws IOException { _conf = conf; _supervisorId = supervisorId; _supervisorPort = supervisorPort; @@ -42,7 +36,7 @@ public class RunAsUserContainerLauncher extends ContainerLauncher { @Override public Container launchContainer(int port, LocalAssignment assignment, LocalState state) throws IOException { Container container = new RunAsUserContainer(ContainerType.LAUNCH, _conf, _supervisorId, _supervisorPort, port, - assignment, _resourceIsolationManager, state, null, null, null, null); + assignment, _resourceIsolationManager, state, null, null, null, null); container.setup(); container.launch(); return container; @@ -51,13 +45,13 @@ public class RunAsUserContainerLauncher extends ContainerLauncher { @Override public Container recoverContainer(int port, LocalAssignment assignment, LocalState state) throws IOException { return new RunAsUserContainer(ContainerType.RECOVER_FULL, _conf, _supervisorId, _supervisorPort, port, - assignment, _resourceIsolationManager, state, null, null, null, null); + assignment, _resourceIsolationManager, state, null, null, null, null); } - + @Override public Killable recoverContainer(String workerId, LocalState localState) throws IOException { return new RunAsUserContainer(ContainerType.RECOVER_PARTIAL, _conf, _supervisorId, _supervisorPort, -1, null, - _resourceIsolationManager, localState, workerId, null, null, null); + _resourceIsolationManager, localState, workerId, null, null, null); } }
