http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopoCache.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopoCache.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopoCache.java index 2b9de22..0b0e70c 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopoCache.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopoCache.java @@ -1,32 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.daemon.nimbus; -import static org.apache.storm.blobstore.BlobStoreAclHandler.READ; - import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; - import javax.security.auth.Subject; - import org.apache.storm.blobstore.BlobStore; import org.apache.storm.blobstore.BlobStoreAclHandler; import org.apache.storm.generated.AccessControl; @@ -41,6 +31,8 @@ import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.storm.blobstore.BlobStoreAclHandler.READ; + /** * Cache topologies and topology confs from the blob store. * Makes reading this faster because it can skip @@ -48,22 +40,10 @@ import org.slf4j.LoggerFactory; */ public class TopoCache { public static final Logger LOG = LoggerFactory.getLogger(TopoCache.class); - - private static final class WithAcl<T> { - public final List<AccessControl> acl; - public final T data; - - public WithAcl(List<AccessControl> acl, T data) { - this.acl = acl; - this.data = data; - } - } - private final BlobStore store; private final BlobStoreAclHandler aclHandler; private final ConcurrentHashMap<String, WithAcl<StormTopology>> topos = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, WithAcl<Map<String, Object>>> confs = new ConcurrentHashMap<>(); - public TopoCache(BlobStore store, Map<String, Object> conf) { this.store = store; aclHandler = new BlobStoreAclHandler(conf); @@ -243,4 +223,14 @@ public class TopoCache { confs.clear(); topos.clear(); } + + private static final class WithAcl<T> { + public final List<AccessControl> acl; + public final T data; + + public WithAcl(List<AccessControl> acl, T data) { + this.acl = acl; + this.data = data; + } + } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyActions.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyActions.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyActions.java index 6a4a67a..bc41da5 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyActions.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyActions.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.daemon.nimbus; /** http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java index 2812cce..bb932ed 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyResources.java @@ -1,26 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.daemon.nimbus; import java.util.Collection; import java.util.Map; - import org.apache.storm.generated.Assignment; import org.apache.storm.generated.NodeInfo; import org.apache.storm.generated.WorkerResources; @@ -29,44 +22,6 @@ import org.apache.storm.scheduler.TopologyDetails; import org.apache.storm.scheduler.WorkerSlot; public final class TopologyResources { - private static Collection<WorkerResources> getWorkerResources(SchedulerAssignment assignment) { - Collection<WorkerResources> ret = null; - if (assignment != null) { - Map<WorkerSlot, WorkerResources> allResources = assignment.getScheduledResources(); - if (allResources != null) { - ret = allResources.values(); - } - } - return ret; - } - - private static Collection<WorkerResources> getWorkerResources(Assignment assignment) { - Collection<WorkerResources> ret = null; - if (assignment != null) { - Map<NodeInfo, WorkerResources> allResources = assignment.get_worker_resources(); - if (allResources != null) { - ret = allResources.values(); - } - } - return ret; - } - - private static Map<String, Double> getNodeIdToSharedOffHeap(SchedulerAssignment assignment) { - Map<String, Double> ret = null; - if (assignment != null) { - ret = assignment.getNodeIdToTotalSharedOffHeapMemory(); - } - return ret; - } - - private static Map<String, Double> getNodeIdToSharedOffHeap(Assignment assignment) { - Map<String, Double> ret = null; - if (assignment != null) { - ret = assignment.get_total_shared_off_heap(); - } - return ret; - } - private final double requestedMemOnHeap; private final double requestedMemOffHeap; private final double requestedSharedMemOnHeap; @@ -81,7 +36,6 @@ public final class TopologyResources { private double assignedNonSharedMemOnHeap; private double assignedNonSharedMemOffHeap; private double assignedCpu; - private TopologyResources(TopologyDetails td, Collection<WorkerResources> workers, Map<String, Double> sharedOffHeap) { requestedMemOnHeap = td.getTotalRequestedMemOnHeap(); @@ -125,15 +79,12 @@ public final class TopologyResources { assignedMemOffHeap += sharedOff; } } - public TopologyResources(TopologyDetails td, SchedulerAssignment assignment) { this(td, getWorkerResources(assignment), getNodeIdToSharedOffHeap(assignment)); } - public TopologyResources(TopologyDetails td, Assignment assignment) { this(td, getWorkerResources(assignment), getNodeIdToSharedOffHeap(assignment)); } - public TopologyResources() { this(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0); } @@ -169,6 +120,44 @@ public final class TopologyResources { this.assignedCpu = assignedCpu; } + private static Collection<WorkerResources> getWorkerResources(SchedulerAssignment assignment) { + Collection<WorkerResources> ret = null; + if (assignment != null) { + Map<WorkerSlot, WorkerResources> allResources = assignment.getScheduledResources(); + if (allResources != null) { + ret = allResources.values(); + } + } + return ret; + } + + private static Collection<WorkerResources> getWorkerResources(Assignment assignment) { + Collection<WorkerResources> ret = null; + if (assignment != null) { + Map<NodeInfo, WorkerResources> allResources = assignment.get_worker_resources(); + if (allResources != null) { + ret = allResources.values(); + } + } + return ret; + } + + private static Map<String, Double> getNodeIdToSharedOffHeap(SchedulerAssignment assignment) { + Map<String, Double> ret = null; + if (assignment != null) { + ret = assignment.getNodeIdToTotalSharedOffHeapMemory(); + } + return ret; + } + + private static Map<String, Double> getNodeIdToSharedOffHeap(Assignment assignment) { + Map<String, Double> ret = null; + if (assignment != null) { + ret = assignment.get_total_shared_off_heap(); + } + return ret; + } + public double getRequestedMemOnHeap() { return requestedMemOnHeap; } @@ -209,6 +198,10 @@ public final class TopologyResources { return assignedSharedMemOnHeap; } + public void setAssignedSharedMemOnHeap(double assignedSharedMemOnHeap) { + this.assignedSharedMemOnHeap = assignedSharedMemOnHeap; + } + public double getRequestedSharedMemOnHeap() { return requestedSharedMemOnHeap; } @@ -225,10 +218,6 @@ public final class TopologyResources { return requestedNonSharedMemOffHeap; } - public void setAssignedSharedMemOnHeap(double assignedSharedMemOnHeap) { - this.assignedSharedMemOnHeap = assignedSharedMemOnHeap; - } - public double getAssignedSharedMemOffHeap() { return assignedSharedMemOffHeap; } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyStateTransition.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyStateTransition.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyStateTransition.java index 39151e4..37019b9 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyStateTransition.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopologyStateTransition.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.daemon.nimbus; import org.apache.storm.generated.StormBase; http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java index e2ecbfb..2358f2a 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java @@ -8,7 +8,7 @@ * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,12 +18,8 @@ package org.apache.storm.daemon.supervisor; -import static org.apache.storm.daemon.nimbus.Nimbus.MIN_VERSION_SUPPORT_RPC_HEARTBEAT; -import static org.apache.storm.utils.Utils.OR; - import com.google.common.base.Joiner; import com.google.common.collect.Lists; - import java.io.File; import java.io.FilenameFilter; import java.io.IOException; @@ -38,7 +34,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; - import org.apache.commons.lang.StringUtils; import org.apache.storm.Config; import org.apache.storm.DaemonConfig; @@ -60,84 +55,75 @@ import org.apache.storm.utils.VersionInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.storm.daemon.nimbus.Nimbus.MIN_VERSION_SUPPORT_RPC_HEARTBEAT; +import static org.apache.storm.utils.Utils.OR; + /** * A container that runs processes on the local box. */ public class BasicContainer extends Container { + static final TopoMetaLRUCache TOPO_META_CACHE = new TopoMetaLRUCache(); private static final Logger LOG = LoggerFactory.getLogger(BasicContainer.class); private static final FilenameFilter jarFilter = (dir, name) -> name.endsWith(".jar"); - private static final Joiner CPJ = - Joiner.on(File.pathSeparator).skipNulls(); - + private static final Joiner CPJ = + Joiner.on(File.pathSeparator).skipNulls(); protected final LocalState _localState; protected final String _profileCmd; protected final String _stormHome = System.getProperty("storm.home"); - protected volatile boolean _exitedEarly = false; - protected volatile long memoryLimitMB; - protected volatile long memoryLimitExceededStart; protected final double hardMemoryLimitMultiplier; protected final long hardMemoryLimitOver; protected final long lowMemoryThresholdMB; protected final long mediumMemoryThresholdMb; protected final long mediumMemoryGracePeriodMs; - - private class ProcessExitCallback implements ExitCodeCallback { - private final String _logPrefix; - - public ProcessExitCallback(String logPrefix) { - _logPrefix = logPrefix; - } - - @Override - public void call(int exitCode) { - LOG.info("{} exited with code: {}", _logPrefix, exitCode); - _exitedEarly = true; - } - } + protected volatile boolean _exitedEarly = false; + protected volatile long memoryLimitMB; + protected volatile long memoryLimitExceededStart; /** * Create a new BasicContainer. - * @param type the type of container being made. - * @param conf the supervisor config - * @param supervisorId the ID of the supervisor this is a part of. - * @param supervisorPort the thrift server port of the supervisor this is a part of. - * @param port the port the container is on. Should be <= 0 if only a partial recovery - * @param assignment the assignment for this container. Should be null if only a partial recovery. + * + * @param type the type of container being made. + * @param conf the supervisor config + * @param supervisorId the ID of the supervisor this is a part of. + * @param supervisorPort the thrift server port of the supervisor this is a part of. + * @param port the port the container is on. Should be <= 0 if only a partial recovery + * @param assignment the assignment for this container. Should be null if only a partial recovery. * @param resourceIsolationManager used to isolate resources for a container can be null if no isolation is used. - * @param localState the local state of the supervisor. May be null if partial recovery - * @param workerId the id of the worker to use. Must not be null if doing a partial recovery. + * @param localState the local state of the supervisor. May be null if partial recovery + * @param workerId the id of the worker to use. Must not be null if doing a partial recovery. */ public BasicContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int supervisorPort, - int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, - LocalState localState, String workerId) throws IOException { + int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, + LocalState localState, String workerId) throws IOException { this(type, conf, supervisorId, supervisorPort, port, assignment, resourceIsolationManager, localState, - workerId, null, null, null); + workerId, null, null, null); } /** * Create a new BasicContainer. - * @param type the type of container being made. - * @param conf the supervisor config - * @param supervisorId the ID of the supervisor this is a part of. - * @param supervisorPort the thrift server port of the supervisor this is a part of. - * @param port the port the container is on. Should be <= 0 if only a partial recovery - * @param assignment the assignment for this container. Should be null if only a partial recovery. + * + * @param type the type of container being made. + * @param conf the supervisor config + * @param supervisorId the ID of the supervisor this is a part of. + * @param supervisorPort the thrift server port of the supervisor this is a part of. + * @param port the port the container is on. Should be <= 0 if only a partial recovery + * @param assignment the assignment for this container. Should be null if only a partial recovery. * @param resourceIsolationManager used to isolate resources for a container can be null if no isolation is used. - * @param localState the local state of the supervisor. May be null if partial recovery - * @param workerId the id of the worker to use. Must not be null if doing a partial recovery. - * @param ops file system operations (mostly for testing) if null a new one is made - * @param topoConf the config of the topology (mostly for testing) if null - * and not a partial recovery the real conf is read. - * @param profileCmd the command to use when profiling (used for testing) - * @throws IOException on any error + * @param localState the local state of the supervisor. May be null if partial recovery + * @param workerId the id of the worker to use. Must not be null if doing a partial recovery. + * @param ops file system operations (mostly for testing) if null a new one is made + * @param topoConf the config of the topology (mostly for testing) if null and not a partial recovery the real conf is + * read. + * @param profileCmd the command to use when profiling (used for testing) + * @throws IOException on any error * @throws ContainerRecoveryException if the Container could not be recovered. */ BasicContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int supervisorPort, int port, - LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, - LocalState localState, String workerId, Map<String, Object> topoConf, - AdvancedFSOps ops, String profileCmd) throws IOException { + LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, + LocalState localState, String workerId, Map<String, Object> topoConf, + AdvancedFSOps ops, String profileCmd) throws IOException { super(type, conf, supervisorId, supervisorPort, port, assignment, resourceIsolationManager, workerId, topoConf, ops); - assert(localState != null); + assert (localState != null); _localState = localState; if (type.isRecovery() && !type.isOnlyKillable()) { @@ -161,7 +147,7 @@ public class BasicContainer extends Container { if (profileCmd == null) { profileCmd = _stormHome + File.separator + "bin" + File.separator - + conf.get(DaemonConfig.WORKER_PROFILER_COMMAND); + + conf.get(DaemonConfig.WORKER_PROFILER_COMMAND); } _profileCmd = profileCmd; @@ -181,14 +167,33 @@ public class BasicContainer extends Container { } } + private static void removeWorkersOn(Map<String, Integer> workerToPort, int _port) { + for (Iterator<Entry<String, Integer>> i = workerToPort.entrySet().iterator(); i.hasNext(); ) { + Entry<String, Integer> found = i.next(); + if (_port == found.getValue().intValue()) { + LOG.warn("Deleting worker {} from state", found.getKey()); + i.remove(); + } + } + } + + public static List<String> getDependencyLocationsFor(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, + String stormRoot) throws IOException { + return TOPO_META_CACHE.get(conf, topologyId, ops, stormRoot).getDepLocs(); + } + + public static String getStormVersionFor(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, + String stormRoot) throws IOException { + return TOPO_META_CACHE.get(conf, topologyId, ops, stormRoot).getStormVersion(); + } + /** - * Create a new worker ID for this process and store in in this object and - * in the local state. Never call this if a worker is currently up and running. - * We will lose track of the process. + * Create a new worker ID for this process and store in in this object and in the local state. Never call this if a worker is currently + * up and running. We will lose track of the process. */ protected void createNewWorkerId() { _type.assertFull(); - assert(_workerId == null); + assert (_workerId == null); synchronized (_localState) { _workerId = Utils.uuid(); Map<String, Integer> workerToPort = _localState.getApprovedWorkers(); @@ -202,16 +207,6 @@ public class BasicContainer extends Container { } } - private static void removeWorkersOn(Map<String, Integer> workerToPort, int _port) { - for (Iterator<Entry<String, Integer>> i = workerToPort.entrySet().iterator(); i.hasNext();) { - Entry<String, Integer> found = i.next(); - if (_port == found.getValue().intValue()) { - LOG.warn("Deleting worker {} from state", found.getKey()); - i.remove(); - } - } - } - @Override public void cleanUpForRestart() throws IOException { String origWorkerId = _workerId; @@ -243,22 +238,17 @@ public class BasicContainer extends Container { /** * Run the given command for profiling. * - * @param command - * the command to run - * @param env - * the environment to run the command - * @param logPrefix - * the prefix to include in the logs - * @param targetDir - * the working directory to run the command in + * @param command the command to run + * @param env the environment to run the command + * @param logPrefix the prefix to include in the logs + * @param targetDir the working directory to run the command in * @return true if it ran successfully, else false - * @throws IOException - * on any error - * @throws InterruptedException - * if interrupted wile waiting for the process to exit. + * + * @throws IOException on any error + * @throws InterruptedException if interrupted wile waiting for the process to exit. */ protected boolean runProfilingCommand(List<String> command, Map<String, String> env, String logPrefix, - File targetDir) throws IOException, InterruptedException { + File targetDir) throws IOException, InterruptedException { _type.assertFull(); Process p = ClientSupervisorUtils.launchProcess(command, env, logPrefix, null, targetDir); int ret = p.waitFor(); @@ -282,7 +272,7 @@ public class BasicContainer extends Container { ProfileAction profileAction = request.get_action(); String logPrefix = "ProfilerAction process " + _topologyId + ":" + _port + " PROFILER_ACTION: " + profileAction - + " "; + + " "; List<String> command = mkProfileCommand(profileAction, stop, workerPid, targetDir); @@ -296,14 +286,15 @@ public class BasicContainer extends Container { /** * Get the command to run when doing profiling. - * @param action the profiling action to perform - * @param stop if this is meant to stop the profiling or start it + * + * @param action the profiling action to perform + * @param stop if this is meant to stop the profiling or start it * @param workerPid the PID of the process to profile * @param targetDir the current working directory of the worker process * @return the command to run for profiling. */ private List<String> mkProfileCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) { - switch(action) { + switch (action) { case JMAP_DUMP: return jmapDumpCmd(workerPid, targetDir); case JSTACK_DUMP: @@ -347,10 +338,11 @@ public class BasicContainer extends Container { } /** - * Compute the java.library.path that should be used for the worker. - * This helps it to load JNI libraries that are packaged in the uber jar. + * Compute the java.library.path that should be used for the worker. This helps it to load JNI libraries that are packaged in the uber + * jar. + * * @param stormRoot the root directory of the worker process - * @param conf the config for the supervisor. + * @param conf the config for the supervisor. * @return the java.library.path/LD_LIBRARY_PATH to use so native libraries load correctly. */ protected String javaLibraryPath(String stormRoot, Map<String, Object> conf) { @@ -359,13 +351,13 @@ public class BasicContainer extends Container { String arch = System.getProperty("os.arch"); String archResourceRoot = resourceRoot + File.separator + os + "-" + arch; String ret = CPJ.join(archResourceRoot, resourceRoot, - conf.get(DaemonConfig.JAVA_LIBRARY_PATH)); + conf.get(DaemonConfig.JAVA_LIBRARY_PATH)); return ret; } /** - * Returns a path with a wildcard as the final element, so that the JVM will expand - * that to all JARs in the directory. + * Returns a path with a wildcard as the final element, so that the JVM will expand that to all JARs in the directory. + * * @param dir the directory to which a wildcard will be appended * @return the path with wildcard ("*") suffix */ @@ -376,7 +368,7 @@ public class BasicContainer extends Container { protected List<String> frameworkClasspath(SimpleVersion topoVersion) { File stormWorkerLibDir = new File(_stormHome, "lib-worker"); String topoConfDir = - System.getenv("STORM_CONF_DIR") != null ? + System.getenv("STORM_CONF_DIR") != null ? System.getenv("STORM_CONF_DIR") : new File(_stormHome, "conf").getAbsolutePath(); File stormExtlibDir = new File(_stormHome, "extlib"); @@ -401,7 +393,7 @@ public class BasicContainer extends Container { //Have not moved to a java worker yet defaultWorkerGuess = "org.apache.storm.daemon.worker"; } - NavigableMap<SimpleVersion,String> mains = Utils.getConfiguredWorkerMainVersions(_conf); + NavigableMap<SimpleVersion, String> mains = Utils.getConfiguredWorkerMainVersions(_conf); return Utils.getCompatibleVersion(mains, topoVersion, "worker main class", defaultWorkerGuess); } @@ -411,25 +403,26 @@ public class BasicContainer extends Container { //Prior to the org.apache change defaultGuess = "backtype.storm.LogWriter"; } - NavigableMap<SimpleVersion,String> mains = Utils.getConfiguredWorkerLogWriterVersions(_conf); + NavigableMap<SimpleVersion, String> mains = Utils.getConfiguredWorkerLogWriterVersions(_conf); return Utils.getCompatibleVersion(mains, topoVersion, "worker log writer class", defaultGuess); } @SuppressWarnings("unchecked") private List<String> asStringList(Object o) { if (o instanceof String) { - return Arrays.asList((String)o); + return Arrays.asList((String) o); } else if (o instanceof List) { - return (List<String>)o; + return (List<String>) o; } return Collections.EMPTY_LIST; } /** * Compute the classpath for the worker process. - * @param stormJar the topology jar + * + * @param stormJar the topology jar * @param dependencyLocations any dependencies from the topology - * @param topoVersion the version of the storm framework to use + * @param topoVersion the version of the storm framework to use * @return the full classpath */ protected String getWorkerClassPath(String stormJar, List<String> dependencyLocations, SimpleVersion topoVersion) { @@ -469,7 +462,7 @@ public class BasicContainer extends Container { String string = substituteChildOptsInternal((String) value, memOnheap); if (StringUtils.isNotBlank(string)) { String[] strings = string.split("\\s+"); - for (String s: strings) { + for (String s : strings) { if (StringUtils.isNotBlank(s)) { rets.add(s); } @@ -491,24 +484,19 @@ public class BasicContainer extends Container { /** * Launch the worker process (non-blocking). * - * @param command - * the command to run - * @param env - * the environment to run the command - * @param processExitCallback - * a callback for when the process exits - * @param logPrefix - * the prefix to include in the logs - * @param targetDir - * the working directory to run the command in + * @param command the command to run + * @param env the environment to run the command + * @param processExitCallback a callback for when the process exits + * @param logPrefix the prefix to include in the logs + * @param targetDir the working directory to run the command in * @return true if it ran successfully, else false - * @throws IOException - * on any error + * + * @throws IOException on any error */ protected void launchWorkerProcess(List<String> command, Map<String, String> env, String logPrefix, - ExitCodeCallback processExitCallback, File targetDir) throws IOException { + ExitCodeCallback processExitCallback, File targetDir) throws IOException { if (_resourceIsolationManager != null) { - command = _resourceIsolationManager.getLaunchCommand(_workerId, command); + command = _resourceIsolationManager.getLaunchCommand(_workerId, command); } ClientSupervisorUtils.launchProcess(command, env, logPrefix, processExitCallback, targetDir); } @@ -530,107 +518,12 @@ public class BasicContainer extends Container { return log4jConfigurationDir + File.separator + "worker.xml"; } - private static class TopologyMetaData { - private boolean _dataCached = false; - private List<String> _depLocs = null; - private String _stormVersion = null; - private final Map<String, Object> _conf; - private final String _topologyId; - private final AdvancedFSOps _ops; - private final String _stormRoot; - - public TopologyMetaData(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, final String stormRoot) { - _conf = conf; - _topologyId = topologyId; - _ops = ops; - _stormRoot = stormRoot; - } - - public String toString() { - List<String> data; - String stormVersion; - synchronized(this) { - data = _depLocs; - stormVersion = _stormVersion; - } - return "META for " + _topologyId +" DEP_LOCS => " + data + " STORM_VERSION => " + stormVersion; - } - - private synchronized void readData() throws IOException { - final StormTopology stormTopology = ConfigUtils.readSupervisorTopology(_conf, _topologyId, _ops); - final List<String> dependencyLocations = new ArrayList<>(); - if (stormTopology.get_dependency_jars() != null) { - for (String dependency : stormTopology.get_dependency_jars()) { - dependencyLocations.add(new File(_stormRoot, dependency).getAbsolutePath()); - } - } - - if (stormTopology.get_dependency_artifacts() != null) { - for (String dependency : stormTopology.get_dependency_artifacts()) { - dependencyLocations.add(new File(_stormRoot, dependency).getAbsolutePath()); - } - } - _depLocs = dependencyLocations; - _stormVersion = stormTopology.get_storm_version(); - _dataCached = true; - } - - public synchronized List<String> getDepLocs() throws IOException { - if (!_dataCached) { - readData(); - } - return _depLocs; - } - - public synchronized String getStormVersion() throws IOException { - if (!_dataCached) { - readData(); - } - return _stormVersion; - } - } - - static class TopoMetaLRUCache { - public final int _maxSize = 100; //We could make this configurable in the future... - - @SuppressWarnings("serial") - private LinkedHashMap<String, TopologyMetaData> _cache = new LinkedHashMap<String, TopologyMetaData>() { - @Override - protected boolean removeEldestEntry(Map.Entry<String,TopologyMetaData> eldest) { - return (size() > _maxSize); - } - }; - - public synchronized TopologyMetaData get(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, String stormRoot) { - //Only go off of the topology id for now. - TopologyMetaData dl = _cache.get(topologyId); - if (dl == null) { - _cache.putIfAbsent(topologyId, new TopologyMetaData(conf, topologyId, ops, stormRoot)); - dl = _cache.get(topologyId); - } - return dl; - } - - public synchronized void clear() { - _cache.clear(); - } - } - - static final TopoMetaLRUCache TOPO_META_CACHE = new TopoMetaLRUCache(); - - public static List<String> getDependencyLocationsFor(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, String stormRoot) throws IOException { - return TOPO_META_CACHE.get(conf, topologyId, ops, stormRoot).getDepLocs(); - } - - public static String getStormVersionFor(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, String stormRoot) throws IOException { - return TOPO_META_CACHE.get(conf, topologyId, ops, stormRoot).getStormVersion(); - } - /** - * Get parameters for the class path of the worker process. Also used by the - * log Writer. + * Get parameters for the class path of the worker process. Also used by the log Writer. + * * @param stormRoot the root dist dir for the topology * @return the classpath for the topology as command line arguments. + * * @throws IOException on any error. */ private List<String> getClassPathParams(final String stormRoot, final SimpleVersion topoVersion) throws IOException { @@ -645,8 +538,9 @@ public class BasicContainer extends Container { } /** - * Get a set of java properties that are common to both the log writer and the worker processes. - * These are mostly system properties that are used by logging. + * Get a set of java properties that are common to both the log writer and the worker processes. These are mostly system properties that + * are used by logging. + * * @return a list of command line options */ private List<String> getCommonParams() { @@ -667,7 +561,7 @@ public class BasicContainer extends Container { commonParams.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"); commonParams.add("-Dstorm.local.dir=" + _conf.get(Config.STORM_LOCAL_DIR)); if (memoryLimitMB > 0) { - commonParams.add("-Dworker.memory_limit_mb="+ memoryLimitMB); + commonParams.add("-Dworker.memory_limit_mb=" + memoryLimitMB); } return commonParams; } @@ -675,7 +569,7 @@ public class BasicContainer extends Container { private int getMemOnHeap(WorkerResources resources) { int memOnheap = 0; if (resources != null && resources.is_set_mem_on_heap() && - resources.get_mem_on_heap() > 0) { + resources.get_mem_on_heap() > 0) { memOnheap = (int) Math.ceil(resources.get_mem_on_heap()); } else { // set the default heap memory size for supervisor-test @@ -705,21 +599,23 @@ public class BasicContainer extends Container { /** * Create the command to launch the worker process. + * * @param memOnheap the on heap memory for the worker * @param stormRoot the root dist dir for the topology - * @param jlp java library path for the topology + * @param jlp java library path for the topology * @return the command to run + * * @throws IOException on any error. */ private List<String> mkLaunchCommand(final int memOnheap, final String stormRoot, - final String jlp) throws IOException { + final String jlp) throws IOException { final String javaCmd = javaCmd("java"); final String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options")); final String topoConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file")); final String workerTmpDir = ConfigUtils.workerTmpRoot(_conf, _workerId); String topoVersionString = getStormVersionFor(_conf, _topologyId, _ops, stormRoot); if (topoVersionString == null) { - topoVersionString = (String)_conf.getOrDefault(Config.SUPERVISOR_WORKER_DEFAULT_VERSION, VersionInfo.getVersion()); + topoVersionString = (String) _conf.getOrDefault(Config.SUPERVISOR_WORKER_DEFAULT_VERSION, VersionInfo.getVersion()); } final SimpleVersion topoVersion = new SimpleVersion(topoVersionString); @@ -744,8 +640,8 @@ public class BasicContainer extends Container { commandList.addAll(substituteChildopts(_conf.get(Config.WORKER_CHILDOPTS), memOnheap)); commandList.addAll(substituteChildopts(_topoConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), memOnheap)); commandList.addAll(substituteChildopts(Utils.OR( - _topoConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS), - _conf.get(Config.WORKER_GC_CHILDOPTS)), memOnheap)); + _topoConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS), + _conf.get(Config.WORKER_GC_CHILDOPTS)), memOnheap)); commandList.addAll(getWorkerProfilerChildOpts(memOnheap)); commandList.add("-Djava.library.path=" + jlp); commandList.add("-Dstorm.conf.file=" + topoConfFile); @@ -759,112 +655,112 @@ public class BasicContainer extends Container { // supervisor port should be only presented to worker which supports RPC heartbeat // unknown version should be treated as "current version", which supports RPC heartbeat if ((topoVersion.getMajor() == -1 && topoVersion.getMinor() == -1) || - topoVersion.compareTo(MIN_VERSION_SUPPORT_RPC_HEARTBEAT) >= 0) { + topoVersion.compareTo(MIN_VERSION_SUPPORT_RPC_HEARTBEAT) >= 0) { commandList.add(String.valueOf(_supervisorPort)); } commandList.add(String.valueOf(_port)); commandList.add(_workerId); - + return commandList; } - - @Override - public boolean isMemoryLimitViolated(LocalAssignment withUpdatedLimits) throws IOException { - if (super.isMemoryLimitViolated(withUpdatedLimits)) { - return true; - } - if (_resourceIsolationManager != null) { - // In the short term the goal is to not shoot anyone unless we really need to. - // The on heap should limit the memory usage in most cases to a reasonable amount - // If someone is using way more than they requested this is a bug and we should - // not allow it - long usageMb; - long memoryLimitMb; - long hardMemoryLimitOver; - String typeOfCheck; - - if (withUpdatedLimits.is_set_total_node_shared()) { - //We need to do enforcement on a topology level, not a single worker level... - // Because in for cgroups each page in shared memory goes to the worker that touched it - // first. We may need to make this more plugable in the future and let the resource - // isolation manager tell us what to do - usageMb = getTotalTopologyMemoryUsed(); - memoryLimitMb = getTotalTopologyMemoryReserved(withUpdatedLimits); - hardMemoryLimitOver = this.hardMemoryLimitOver * getTotalWorkersForThisTopology(); - typeOfCheck = "TOPOLOGY " + _topologyId; - } else { - usageMb = getMemoryUsageMb(); - memoryLimitMb = this.memoryLimitMB; - hardMemoryLimitOver = this.hardMemoryLimitOver; - typeOfCheck = "WORKER " + _workerId; - } - LOG.debug( - "Enforcing memory usage for {} with usage of {} out of {} total and a hard limit of {}", - typeOfCheck, - usageMb, - memoryLimitMb, - hardMemoryLimitOver); - - if (usageMb <= 0) { - //Looks like usage might not be supported - return false; - } - long hardLimitMb = Math.max((long)(memoryLimitMb * hardMemoryLimitMultiplier), memoryLimitMb + hardMemoryLimitOver); - if (usageMb > hardLimitMb) { - LOG.warn( - "{} is using {} MB > adjusted hard limit {} MB", typeOfCheck, usageMb, hardLimitMb); - return true; - } - if (usageMb > memoryLimitMb) { - //For others using too much it is really a question of how much memory is free in the system - // to be use. If we cannot calculate it assume that it is bad - long systemFreeMemoryMb = 0; - try { - systemFreeMemoryMb = _resourceIsolationManager.getSystemFreeMemoryMb(); - } catch (IOException e) { - LOG.warn("Error trying to calculate free memory on the system {}", e); - } - LOG.debug("SYSTEM MEMORY FREE {} MB", systemFreeMemoryMb); - //If the system is low on memory we cannot be kind and need to shoot something - if (systemFreeMemoryMb <= lowMemoryThresholdMB) { - LOG.warn( - "{} is using {} MB > memory limit {} MB and system is low on memory {} free", - typeOfCheck, - usageMb, - memoryLimitMb, - systemFreeMemoryMb); - return true; - } - - //If the system still has some free memory give them a grace period to - // drop back down. - if (systemFreeMemoryMb < mediumMemoryThresholdMb) { - if (memoryLimitExceededStart < 0) { - memoryLimitExceededStart = Time.currentTimeMillis(); - } else { - long timeInViolation = Time.currentTimeMillis() - memoryLimitExceededStart; - if (timeInViolation > mediumMemoryGracePeriodMs) { - LOG.warn( - "{} is using {} MB > memory limit {} MB for {} seconds", - typeOfCheck, - usageMb, - memoryLimitMb, - timeInViolation / 1000); - return true; + + @Override + public boolean isMemoryLimitViolated(LocalAssignment withUpdatedLimits) throws IOException { + if (super.isMemoryLimitViolated(withUpdatedLimits)) { + return true; + } + if (_resourceIsolationManager != null) { + // In the short term the goal is to not shoot anyone unless we really need to. + // The on heap should limit the memory usage in most cases to a reasonable amount + // If someone is using way more than they requested this is a bug and we should + // not allow it + long usageMb; + long memoryLimitMb; + long hardMemoryLimitOver; + String typeOfCheck; + + if (withUpdatedLimits.is_set_total_node_shared()) { + //We need to do enforcement on a topology level, not a single worker level... + // Because in for cgroups each page in shared memory goes to the worker that touched it + // first. We may need to make this more plugable in the future and let the resource + // isolation manager tell us what to do + usageMb = getTotalTopologyMemoryUsed(); + memoryLimitMb = getTotalTopologyMemoryReserved(withUpdatedLimits); + hardMemoryLimitOver = this.hardMemoryLimitOver * getTotalWorkersForThisTopology(); + typeOfCheck = "TOPOLOGY " + _topologyId; + } else { + usageMb = getMemoryUsageMb(); + memoryLimitMb = this.memoryLimitMB; + hardMemoryLimitOver = this.hardMemoryLimitOver; + typeOfCheck = "WORKER " + _workerId; + } + LOG.debug( + "Enforcing memory usage for {} with usage of {} out of {} total and a hard limit of {}", + typeOfCheck, + usageMb, + memoryLimitMb, + hardMemoryLimitOver); + + if (usageMb <= 0) { + //Looks like usage might not be supported + return false; + } + long hardLimitMb = Math.max((long) (memoryLimitMb * hardMemoryLimitMultiplier), memoryLimitMb + hardMemoryLimitOver); + if (usageMb > hardLimitMb) { + LOG.warn( + "{} is using {} MB > adjusted hard limit {} MB", typeOfCheck, usageMb, hardLimitMb); + return true; + } + if (usageMb > memoryLimitMb) { + //For others using too much it is really a question of how much memory is free in the system + // to be use. If we cannot calculate it assume that it is bad + long systemFreeMemoryMb = 0; + try { + systemFreeMemoryMb = _resourceIsolationManager.getSystemFreeMemoryMb(); + } catch (IOException e) { + LOG.warn("Error trying to calculate free memory on the system {}", e); + } + LOG.debug("SYSTEM MEMORY FREE {} MB", systemFreeMemoryMb); + //If the system is low on memory we cannot be kind and need to shoot something + if (systemFreeMemoryMb <= lowMemoryThresholdMB) { + LOG.warn( + "{} is using {} MB > memory limit {} MB and system is low on memory {} free", + typeOfCheck, + usageMb, + memoryLimitMb, + systemFreeMemoryMb); + return true; + } + + //If the system still has some free memory give them a grace period to + // drop back down. + if (systemFreeMemoryMb < mediumMemoryThresholdMb) { + if (memoryLimitExceededStart < 0) { + memoryLimitExceededStart = Time.currentTimeMillis(); + } else { + long timeInViolation = Time.currentTimeMillis() - memoryLimitExceededStart; + if (timeInViolation > mediumMemoryGracePeriodMs) { + LOG.warn( + "{} is using {} MB > memory limit {} MB for {} seconds", + typeOfCheck, + usageMb, + memoryLimitMb, + timeInViolation / 1000); + return true; + } + } + } else { + //Otherwise don't bother them + LOG.debug("{} is using {} MB > memory limit {} MB", typeOfCheck, usageMb, memoryLimitMb); + memoryLimitExceededStart = -1; + } + } else { + memoryLimitExceededStart = -1; } - } - } else { - //Otherwise don't bother them - LOG.debug("{} is using {} MB > memory limit {} MB", typeOfCheck, usageMb, memoryLimitMb); - memoryLimitExceededStart = -1; } - } else { - memoryLimitExceededStart = -1; - } + return false; } - return false; - } @Override public long getMemoryUsageMb() { @@ -883,35 +779,35 @@ public class BasicContainer extends Container { } } - @Override - public long getMemoryReservationMb() { - return memoryLimitMB; - } + @Override + public long getMemoryReservationMb() { + return memoryLimitMB; + } - private long calculateMemoryLimit(final WorkerResources resources, final int memOnHeap) { - long ret = memOnHeap; - if (_resourceIsolationManager != null) { - final int memoffheap = (int) Math.ceil(resources.get_mem_off_heap()); - final int extraMem = - (int) - (Math.ceil( - ObjectReader.getDouble( - _conf.get(DaemonConfig.STORM_SUPERVISOR_MEMORY_LIMIT_TOLERANCE_MARGIN_MB), - 0.0))); - ret += memoffheap + extraMem; + private long calculateMemoryLimit(final WorkerResources resources, final int memOnHeap) { + long ret = memOnHeap; + if (_resourceIsolationManager != null) { + final int memoffheap = (int) Math.ceil(resources.get_mem_off_heap()); + final int extraMem = + (int) + (Math.ceil( + ObjectReader.getDouble( + _conf.get(DaemonConfig.STORM_SUPERVISOR_MEMORY_LIMIT_TOLERANCE_MARGIN_MB), + 0.0))); + ret += memoffheap + extraMem; + } + return ret; } - return ret; - } - + @Override public void launch() throws IOException { _type.assertFull(); LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", _assignment, - _supervisorId, _port, _workerId); + _supervisorId, _port, _workerId); String logPrefix = "Worker Process " + _workerId; ProcessExitCallback processExitCallback = new ProcessExitCallback(logPrefix); _exitedEarly = false; - + final WorkerResources resources = _assignment.get_resources(); final int memOnHeap = getMemOnHeap(resources); memoryLimitMB = calculateMemoryLimit(resources, memOnHeap); @@ -929,7 +825,7 @@ public class BasicContainer extends Container { if (ld_library_path != null) { jlp = jlp + System.getProperty("path.separator") + ld_library_path; } - + topEnvironment.put("LD_LIBRARY_PATH", jlp); if (_resourceIsolationManager != null) { @@ -939,11 +835,112 @@ public class BasicContainer extends Container { } List<String> commandList = mkLaunchCommand(memOnHeap, stormRoot, jlp); - + LOG.info("Launching worker with command: {}. ", ServerUtils.shellCmd(commandList)); String workerDir = ConfigUtils.workerRoot(_conf, _workerId); launchWorkerProcess(commandList, topEnvironment, logPrefix, processExitCallback, new File(workerDir)); } + + private static class TopologyMetaData { + private final Map<String, Object> _conf; + private final String _topologyId; + private final AdvancedFSOps _ops; + private final String _stormRoot; + private boolean _dataCached = false; + private List<String> _depLocs = null; + private String _stormVersion = null; + + public TopologyMetaData(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, final String stormRoot) { + _conf = conf; + _topologyId = topologyId; + _ops = ops; + _stormRoot = stormRoot; + } + + public String toString() { + List<String> data; + String stormVersion; + synchronized (this) { + data = _depLocs; + stormVersion = _stormVersion; + } + return "META for " + _topologyId + " DEP_LOCS => " + data + " STORM_VERSION => " + stormVersion; + } + + private synchronized void readData() throws IOException { + final StormTopology stormTopology = ConfigUtils.readSupervisorTopology(_conf, _topologyId, _ops); + final List<String> dependencyLocations = new ArrayList<>(); + if (stormTopology.get_dependency_jars() != null) { + for (String dependency : stormTopology.get_dependency_jars()) { + dependencyLocations.add(new File(_stormRoot, dependency).getAbsolutePath()); + } + } + + if (stormTopology.get_dependency_artifacts() != null) { + for (String dependency : stormTopology.get_dependency_artifacts()) { + dependencyLocations.add(new File(_stormRoot, dependency).getAbsolutePath()); + } + } + _depLocs = dependencyLocations; + _stormVersion = stormTopology.get_storm_version(); + _dataCached = true; + } + + public synchronized List<String> getDepLocs() throws IOException { + if (!_dataCached) { + readData(); + } + return _depLocs; + } + + public synchronized String getStormVersion() throws IOException { + if (!_dataCached) { + readData(); + } + return _stormVersion; + } + } + + static class TopoMetaLRUCache { + public final int _maxSize = 100; //We could make this configurable in the future... + + @SuppressWarnings("serial") + private LinkedHashMap<String, TopologyMetaData> _cache = new LinkedHashMap<String, TopologyMetaData>() { + @Override + protected boolean removeEldestEntry(Map.Entry<String, TopologyMetaData> eldest) { + return (size() > _maxSize); + } + }; + + public synchronized TopologyMetaData get(final Map<String, Object> conf, final String topologyId, final AdvancedFSOps ops, + String stormRoot) { + //Only go off of the topology id for now. + TopologyMetaData dl = _cache.get(topologyId); + if (dl == null) { + _cache.putIfAbsent(topologyId, new TopologyMetaData(conf, topologyId, ops, stormRoot)); + dl = _cache.get(topologyId); + } + return dl; + } + + public synchronized void clear() { + _cache.clear(); + } + } + + private class ProcessExitCallback implements ExitCodeCallback { + private final String _logPrefix; + + public ProcessExitCallback(String logPrefix) { + _logPrefix = logPrefix; + } + + @Override + public void call(int exitCode) { + LOG.info("{} exited with code: {}", _logPrefix, exitCode); + _exitedEarly = true; + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java index e2043e8..715fd61 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java @@ -1,25 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.daemon.supervisor; import java.io.IOException; import java.util.Map; - import org.apache.storm.container.ResourceIsolationInterface; import org.apache.storm.daemon.supervisor.Container.ContainerType; import org.apache.storm.generated.LocalAssignment; @@ -29,13 +23,13 @@ import org.apache.storm.utils.LocalState; * Launch containers with no security using standard java commands */ public class BasicContainerLauncher extends ContainerLauncher { + protected final ResourceIsolationInterface _resourceIsolationManager; private final Map<String, Object> _conf; private final String _supervisorId; private final int _supervisorPort; - protected final ResourceIsolationInterface _resourceIsolationManager; - + public BasicContainerLauncher(Map<String, Object> conf, String supervisorId, int supervisorPort, - ResourceIsolationInterface resourceIsolationManager) throws IOException { + ResourceIsolationInterface resourceIsolationManager) throws IOException { _conf = conf; _supervisorId = supervisorId; _supervisorPort = supervisorPort; @@ -45,7 +39,7 @@ public class BasicContainerLauncher extends ContainerLauncher { @Override public Container launchContainer(int port, LocalAssignment assignment, LocalState state) throws IOException { Container container = new BasicContainer(ContainerType.LAUNCH, _conf, _supervisorId, _supervisorPort, port, - assignment, _resourceIsolationManager, state, null); + assignment, _resourceIsolationManager, state, null); container.setup(); container.launch(); return container; @@ -54,12 +48,12 @@ public class BasicContainerLauncher extends ContainerLauncher { @Override public Container recoverContainer(int port, LocalAssignment assignment, LocalState state) throws IOException { return new BasicContainer(ContainerType.RECOVER_FULL, _conf, _supervisorId, _supervisorPort, port, assignment, - _resourceIsolationManager, state, null); + _resourceIsolationManager, state, null); } @Override public Killable recoverContainer(String workerId, LocalState localState) throws IOException { return new BasicContainer(ContainerType.RECOVER_PARTIAL, _conf, _supervisorId, _supervisorPort, -1, null, - _resourceIsolationManager, localState, workerId); + _resourceIsolationManager, localState, workerId); } }
