Merge branch 'master' into supervisor and update supervisor based STORM-1631&STORM-1636
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f03b8bec Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f03b8bec Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f03b8bec Branch: refs/heads/master Commit: f03b8bec105e88282211bf3e7dd4be4aeed484d8 Parents: 42928c2 2886737 Author: xiaojian.fxj <[email protected]> Authored: Wed Mar 23 13:53:00 2016 +0800 Committer: xiaojian.fxj <[email protected]> Committed: Wed Mar 23 14:40:22 2016 +0800 ---------------------------------------------------------------------- .gitignore | 1 + CHANGELOG.md | 18 +- README.markdown | 4 +- docs/Acking-framework-implementation.md | 36 + docs/Clojure-DSL.md | 266 + docs/Command-line-client.md | 104 + docs/Common-patterns.md | 100 + docs/Concepts.md | 115 + docs/Configuration.md | 31 + docs/Contributing-to-Storm.md | 33 + docs/Creating-a-new-Storm-project.md | 25 + docs/DSLs-and-multilang-adapters.md | 10 + docs/Daemon-Fault-Tolerance.md | 30 + ...Defining-a-non-jvm-language-dsl-for-storm.md | 38 + docs/Distributed-RPC.md | 199 + docs/Documentation.md | 50 + docs/FAQ.md | 127 + docs/Fault-tolerance.md | 28 + docs/Guaranteeing-message-processing.md | 181 + docs/Hooks.md | 9 + docs/Implementation-docs.md | 13 + docs/Installing-native-dependencies.md | 38 + docs/Kestrel-and-Storm.md | 200 + docs/Lifecycle-of-a-topology.md | 82 + docs/Local-mode.md | 29 + docs/Logs.md | 30 + docs/Maven.md | 22 + docs/Message-passing-implementation.md | 30 + docs/Metrics.md | 36 + docs/Multilang-protocol.md | 287 + docs/Pacemaker.md | 113 + docs/Powered-By.md | 1028 +++ docs/Project-ideas.md | 6 + docs/README.md | 61 + docs/Rationale.md | 33 + docs/Resource_Aware_Scheduler_overview.md | 232 + ...unning-topologies-on-a-production-cluster.md | 77 + docs/SECURITY.md | 478 ++ docs/STORM-UI-REST-API.md | 1017 +++ docs/Serialization-(prior-to-0.6.0).md | 50 + docs/Serialization.md | 62 + docs/Serializers.md | 4 + docs/Setting-up-a-Storm-cluster.md | 117 + docs/Setting-up-a-Storm-project-in-Eclipse.md | 1 + docs/Setting-up-development-environment.md | 33 + docs/Spout-implementations.md | 10 + docs/State-checkpointing.md | 160 + ...guage-protocol-(versions-0.7.0-and-below).md | 122 + docs/Structure-of-the-codebase.md | 134 + docs/Support-for-non-java-languages.md | 9 + docs/Transactional-topologies.md | 361 + docs/Trident-API-Overview.md | 525 ++ docs/Trident-spouts.md | 44 + docs/Trident-state.md | 331 + docs/Trident-tutorial.md | 254 + docs/Troubleshooting.md | 182 + docs/Tutorial.md | 320 + ...nding-the-parallelism-of-a-Storm-topology.md | 123 + docs/Using-non-JVM-languages-with-Storm.md | 53 + docs/Windowing.md | 239 + docs/_config.yml | 18 + docs/_includes/footer.html | 55 + docs/_includes/head.html | 34 + docs/_includes/header.html | 59 + docs/_layouts/about.html | 43 + docs/_layouts/default.html | 18 + docs/_layouts/documentation.html | 9 + docs/_layouts/page.html | 5 + docs/_layouts/post.html | 61 + docs/_plugins/releases.rb | 84 + docs/_sass/_syntax-highlighting.scss | 70 + docs/assets/css/bootstrap-theme.css | 470 ++ docs/assets/css/bootstrap-theme.css.map | 1 + docs/assets/css/bootstrap-theme.min.css | 5 + docs/assets/css/bootstrap.css | 6800 ++++++++++++++++++ docs/assets/css/bootstrap.css.map | 1 + docs/assets/css/bootstrap.min.css | 5 + docs/assets/css/font-awesome.min.css | 4 + docs/assets/css/main.scss | 48 + docs/assets/css/owl.carousel.css | 71 + docs/assets/css/owl.theme.css | 79 + docs/assets/css/style.css | 503 ++ docs/assets/css/theme.css | 18 + docs/assets/favicon.ico | Bin 0 -> 1150 bytes .../fonts/glyphicons-halflings-regular.eot | Bin 0 -> 20335 bytes .../fonts/glyphicons-halflings-regular.svg | 229 + .../fonts/glyphicons-halflings-regular.ttf | Bin 0 -> 41280 bytes .../fonts/glyphicons-halflings-regular.woff | Bin 0 -> 23320 bytes docs/assets/js/bootstrap.js | 2320 ++++++ docs/assets/js/bootstrap.min.js | 7 + docs/assets/js/jquery.min.js | 6 + docs/assets/js/npm.js | 13 + docs/assets/js/owl.carousel.min.js | 47 + docs/assets/js/storm.js | 67 + docs/cgroups_in_storm.md | 71 + docs/css/style.css | 553 ++ docs/distcache-blobstore.md | 740 ++ docs/dynamic-log-level-settings.md | 45 + docs/dynamic-worker-profiling.md | 37 + docs/favicon.ico | Bin 0 -> 1150 bytes docs/flux.md | 835 +++ docs/images/ack_tree.png | Bin 0 -> 31463 bytes docs/images/architecture.png | Bin 0 -> 69825 bytes docs/images/architecture.svg | 1458 ++++ docs/images/batched-stream.png | Bin 0 -> 66336 bytes docs/images/bolt.png | Bin 0 -> 24796 bytes docs/images/bolt.svg | 743 ++ docs/images/bullet.gif | Bin 0 -> 82 bytes docs/images/download.png | Bin 0 -> 16272 bytes docs/images/drpc-workflow.png | Bin 0 -> 66199 bytes docs/images/dynamic_log_level_settings_1.png | Bin 0 -> 93689 bytes docs/images/dynamic_log_level_settings_2.png | Bin 0 -> 78785 bytes docs/images/dynamic_profiling_debugging_1.png | Bin 0 -> 56876 bytes docs/images/dynamic_profiling_debugging_2.png | Bin 0 -> 99164 bytes docs/images/dynamic_profiling_debugging_3.png | Bin 0 -> 96974 bytes docs/images/dynamic_profiling_debugging_4.png | Bin 0 -> 121994 bytes docs/images/eclipse-project-properties.png | Bin 0 -> 80810 bytes docs/images/example-of-a-running-topology.png | Bin 0 -> 81430 bytes docs/images/footer-bg.png | Bin 0 -> 138 bytes docs/images/grouping.png | Bin 0 -> 39701 bytes docs/images/hdfs_blobstore.png | Bin 0 -> 82180 bytes docs/images/header-bg.png | Bin 0 -> 470 bytes docs/images/incubator-logo.png | Bin 0 -> 11651 bytes docs/images/ld-library-path-eclipse-linux.png | Bin 0 -> 114597 bytes docs/images/loading.gif | Bin 0 -> 12150 bytes docs/images/local_blobstore.png | Bin 0 -> 81212 bytes docs/images/logo.png | Bin 0 -> 26889 bytes docs/images/logos/aeris.jpg | Bin 0 -> 7420 bytes docs/images/logos/alibaba.jpg | Bin 0 -> 10317 bytes docs/images/logos/bai.jpg | Bin 0 -> 10026 bytes docs/images/logos/cerner.jpg | Bin 0 -> 7244 bytes docs/images/logos/flipboard.jpg | Bin 0 -> 8318 bytes docs/images/logos/fullcontact.jpg | Bin 0 -> 6172 bytes docs/images/logos/groupon.jpg | Bin 0 -> 9849 bytes docs/images/logos/health-market-science.jpg | Bin 0 -> 6509 bytes docs/images/logos/images.png | Bin 0 -> 7339 bytes docs/images/logos/infochimp.jpg | Bin 0 -> 5290 bytes docs/images/logos/klout.jpg | Bin 0 -> 7251 bytes docs/images/logos/loggly.jpg | Bin 0 -> 9258 bytes docs/images/logos/ooyala.jpg | Bin 0 -> 5675 bytes docs/images/logos/parc.png | Bin 0 -> 13720 bytes docs/images/logos/premise.jpg | Bin 0 -> 5391 bytes docs/images/logos/qiy.jpg | Bin 0 -> 7441 bytes docs/images/logos/quicklizard.jpg | Bin 0 -> 7382 bytes docs/images/logos/rocketfuel.jpg | Bin 0 -> 10007 bytes docs/images/logos/rubicon.jpg | Bin 0 -> 7120 bytes docs/images/logos/spider.jpg | Bin 0 -> 6265 bytes docs/images/logos/spotify.jpg | Bin 0 -> 6445 bytes docs/images/logos/taobao.jpg | Bin 0 -> 16814 bytes docs/images/logos/the-weather-channel.jpg | Bin 0 -> 13295 bytes docs/images/logos/twitter.jpg | Bin 0 -> 7139 bytes docs/images/logos/verisign.jpg | Bin 0 -> 5982 bytes docs/images/logos/webmd.jpg | Bin 0 -> 8226 bytes docs/images/logos/wego.jpg | Bin 0 -> 6836 bytes docs/images/logos/yahoo-japan.jpg | Bin 0 -> 10350 bytes docs/images/logos/yahoo.png | Bin 0 -> 13067 bytes docs/images/logos/yelp.jpg | Bin 0 -> 7220 bytes docs/images/mailinglist.png | Bin 0 -> 4245 bytes docs/images/nimbus_ha_blobstore.png | Bin 0 -> 113991 bytes .../nimbus_ha_leader_election_and_failover.png | Bin 0 -> 154316 bytes docs/images/nimbus_ha_topology_submission.png | Bin 0 -> 134180 bytes ...onships-worker-processes-executors-tasks.png | Bin 0 -> 54804 bytes docs/images/search-a-topology.png | Bin 0 -> 671031 bytes docs/images/search-for-a-single-worker-log.png | Bin 0 -> 736579 bytes docs/images/security.png | Bin 0 -> 72415 bytes docs/images/security.svg | 1779 +++++ docs/images/spout-vs-state.png | Bin 0 -> 24804 bytes docs/images/spout.png | Bin 0 -> 22911 bytes docs/images/spout.svg | 833 +++ docs/images/storm-cluster.png | Bin 0 -> 34604 bytes docs/images/storm-flow.png | Bin 0 -> 59688 bytes docs/images/storm-sql-internal-example.png | Bin 0 -> 28377 bytes docs/images/storm-sql-internal-workflow.png | Bin 0 -> 20020 bytes docs/images/storm.svg | 1326 ++++ docs/images/storm_header.png | Bin 0 -> 17291 bytes docs/images/storm_logo_tagline_color.png | Bin 0 -> 33568 bytes docs/images/top_bg.gif | Bin 0 -> 113 bytes docs/images/topology-tasks.png | Bin 0 -> 45960 bytes docs/images/topology.png | Bin 0 -> 23147 bytes docs/images/topology.svg | 1044 +++ docs/images/topology_dark.png | Bin 0 -> 49692 bytes docs/images/topology_dark.svg | 1101 +++ docs/images/transactional-batches.png | Bin 0 -> 23293 bytes docs/images/transactional-commit-flow.png | Bin 0 -> 17725 bytes docs/images/transactional-design-2.png | Bin 0 -> 13537 bytes docs/images/transactional-spout-structure.png | Bin 0 -> 25067 bytes docs/images/trident-to-storm1.png | Bin 0 -> 67173 bytes docs/images/trident-to-storm2.png | Bin 0 -> 68943 bytes docs/images/tuple-dag.png | Bin 0 -> 18849 bytes docs/images/tuple_tree.png | Bin 0 -> 58186 bytes docs/images/ui_topology_viz.png | Bin 0 -> 112831 bytes docs/index.md | 81 + docs/nimbus-ha-design.md | 222 + docs/storm-eventhubs.md | 40 + docs/storm-hbase.md | 241 + docs/storm-hdfs.md | 368 + docs/storm-hive.md | 111 + docs/storm-jdbc.md | 285 + docs/storm-kafka.md | 287 + .../storm-metrics-profiling-internal-actions.md | 70 + docs/storm-redis.md | 258 + docs/storm-solr.md | 184 + docs/storm-sql-internal.md | 55 + docs/storm-sql.md | 97 + .../storm/starter/spout/RandomIntegerSpout.java | 15 +- .../src/jvm/storm/starter/StatefulTopology.java | 1 + external/storm-kafka/README.md | 1 - .../apache/storm/kafka/PartitionManager.java | 12 +- external/storm-mongodb/README.md | 195 + external/storm-mongodb/pom.xml | 74 + .../storm/mongodb/bolt/AbstractMongoBolt.java | 56 + .../storm/mongodb/bolt/MongoInsertBolt.java | 62 + .../storm/mongodb/bolt/MongoUpdateBolt.java | 75 + .../storm/mongodb/common/MongoDBClient.java | 91 + .../mongodb/common/QueryFilterCreator.java | 38 + .../common/SimpleQueryFilterCreator.java | 39 + .../mongodb/common/mapper/MongoMapper.java | 38 + .../common/mapper/SimpleMongoMapper.java | 40 + .../common/mapper/SimpleMongoUpdateMapper.java | 41 + .../storm/mongodb/trident/state/MongoState.java | 97 + .../trident/state/MongoStateFactory.java | 42 + .../trident/state/MongoStateUpdater.java | 34 + .../storm/mongodb/topology/InsertWordCount.java | 81 + .../storm/mongodb/topology/UpdateWordCount.java | 91 + .../storm/mongodb/topology/WordCounter.java | 67 + .../storm/mongodb/topology/WordSpout.java | 88 + .../storm/mongodb/trident/WordCountTrident.java | 85 + log4j2/cluster.xml | 15 - log4j2/worker.xml | 15 + pom.xml | 1 + .../src/clj/org/apache/storm/clojure.clj | 3 + .../clj/org/apache/storm/command/heartbeats.clj | 5 +- .../src/clj/org/apache/storm/converter.clj | 46 +- .../org/apache/storm/daemon/builtin_metrics.clj | 33 +- .../clj/org/apache/storm/daemon/executor.clj | 33 +- .../src/clj/org/apache/storm/daemon/nimbus.clj | 117 +- .../src/clj/org/apache/storm/daemon/task.clj | 10 +- .../src/clj/org/apache/storm/daemon/worker.clj | 19 +- .../clj/org/apache/storm/internal/clojure.clj | 3 + .../apache/storm/scheduler/DefaultScheduler.clj | 80 - .../apache/storm/scheduler/EvenScheduler.clj | 98 - .../storm/scheduler/IsolationScheduler.clj | 4 +- storm-core/src/clj/org/apache/storm/stats.clj | 1568 ---- storm-core/src/clj/org/apache/storm/testing.clj | 8 +- .../clj/org/apache/storm/trident/testing.clj | 12 +- storm-core/src/clj/org/apache/storm/ui/core.clj | 71 +- storm-core/src/clj/org/apache/storm/util.clj | 11 - .../org/apache/storm/blobstore/BlobStore.java | 5 + .../storm/cluster/IStormClusterState.java | 2 + .../storm/cluster/StormClusterStateImpl.java | 33 +- .../container/ResourceIsolationInterface.java | 8 + .../storm/container/cgroup/CgroupManager.java | 16 +- .../storm/coordination/CoordinatedBolt.java | 4 + .../src/jvm/org/apache/storm/daemon/Acker.java | 18 +- .../org/apache/storm/daemon/StormCommon.java | 4 + .../daemon/supervisor/SupervisorUtils.java | 7 +- .../supervisor/timer/RunProfilerActions.java | 10 +- .../workermanager/DefaultWorkerManager.java | 14 +- .../storm/scheduler/DefaultScheduler.java | 111 + .../apache/storm/scheduler/EvenScheduler.java | 168 + .../apache/storm/scheduler/TopologyDetails.java | 3 +- .../apache/storm/stats/BoltExecutorStats.java | 105 + .../jvm/org/apache/storm/stats/CommonStats.java | 112 + .../apache/storm/stats/SpoutExecutorStats.java | 79 + .../jvm/org/apache/storm/stats/StatsUtil.java | 2441 +++++++ .../org/apache/storm/task/IOutputCollector.java | 1 + .../org/apache/storm/task/OutputCollector.java | 11 + .../storm/topology/BasicOutputCollector.java | 10 + .../topology/CheckpointTupleForwarder.java | 22 +- .../ComponentConfigurationDeclarer.java | 5 +- .../storm/topology/IBasicOutputCollector.java | 2 + .../apache/storm/topology/IStatefulBolt.java | 7 +- .../apache/storm/topology/ResourceDeclarer.java | 28 + .../storm/topology/StatefulBoltExecutor.java | 46 +- .../apache/storm/topology/TopologyBuilder.java | 5 +- .../jvm/org/apache/storm/trident/Stream.java | 31 +- .../org/apache/storm/trident/TridentState.java | 27 +- .../apache/storm/trident/TridentTopology.java | 91 +- .../org/apache/storm/trident/graph/Group.java | 22 +- .../operation/DefaultResourceDeclarer.java | 66 + .../trident/operation/ITridentResource.java | 32 + .../org/apache/storm/trident/planner/Node.java | 5 +- .../trident/topology/TridentBoltExecutor.java | 4 + .../jvm/org/apache/storm/utils/ConfigUtils.java | 10 +- .../src/jvm/org/apache/storm/utils/Utils.java | 38 +- .../org/apache/storm/integration_test.clj | 53 +- .../apache/storm/trident/integration_test.clj | 110 +- .../apache/storm/messaging/netty_unit_test.clj | 14 +- .../test/clj/org/apache/storm/nimbus_test.clj | 152 +- .../clj/org/apache/storm/scheduler_test.clj | 34 +- .../apache/storm/security/auth/auth_test.clj | 15 +- .../storm/security/auth/drpc_auth_test.clj | 15 +- .../storm/security/auth/nimbus_auth_test.clj | 15 +- .../clj/org/apache/storm/supervisor_test.clj | 2 +- .../apache/storm/blobstore/BlobStoreTest.java | 171 +- .../cluster/StormClusterStateImplTest.java | 116 + .../apache/storm/localizer/LocalizerTest.java | 7 +- .../topology/StatefulBoltExecutorTest.java | 1 + storm-dist/binary/src/main/assembly/binary.xml | 14 + 299 files changed, 37982 insertions(+), 2220 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/f03b8bec/storm-core/src/clj/org/apache/storm/testing.clj ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/f03b8bec/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java index bb2525a,0000000..a567956 mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java @@@ -1,268 -1,0 +1,271 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.ProcessSimulator; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URLDecoder; +import java.util.*; + +public class SupervisorUtils { + + private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class); + + private static final SupervisorUtils INSTANCE = new SupervisorUtils(); + private static SupervisorUtils _instance = INSTANCE; + public static void setInstance(SupervisorUtils u) { + _instance = u; + } + public static void resetInstance() { + _instance = INSTANCE; + } + - public static Process processLauncher(Map conf, String user, List<String> args, Map<String, String> environment, final String logPreFix, ++ public static Process processLauncher(Map conf, String user, List<String> commandPrefix, List<String> args, Map<String, String> environment, final String logPreFix, + final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException { + if (StringUtils.isBlank(user)) { + throw new IllegalArgumentException("User cannot be blank when calling processLauncher."); + } + String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER)); + String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); + String wl; + if (StringUtils.isNotBlank(wlinitial)) { + wl = wlinitial; + } else { + wl = stormHome + "/bin/worker-launcher"; + } + List<String> commands = new ArrayList<>(); ++ if (commandPrefix != null){ ++ commands.addAll(commandPrefix); ++ } + commands.add(wl); + commands.add(user); + commands.addAll(args); + LOG.info("Running as user: {} command: {}", user, commands); + return Utils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir); + } + + public static int processLauncherAndWait(Map conf, String user, List<String> args, final Map<String, String> environment, final String logPreFix) + throws IOException { + int ret = 0; - Process process = processLauncher(conf, user, args, environment, logPreFix, null, null); ++ Process process = processLauncher(conf, user, null, args, environment, logPreFix, null, null); + if (StringUtils.isNotBlank(logPreFix)) + Utils.readAndLogStream(logPreFix, process.getInputStream()); + try { + process.waitFor(); + } catch (InterruptedException e) { + LOG.info("{} interrupted.", logPreFix); + } + ret = process.exitValue(); + return ret; + } + + public static void setupStormCodeDir(Map conf, Map stormConf, String dir) throws IOException { + if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { + String logPrefix = "setup conf for " + dir; + List<String> commands = new ArrayList<>(); + commands.add("code-dir"); + commands.add(dir); + processLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix); + } + } + + public static void rmrAsUser(Map conf, String id, String path) throws IOException { + String user = Utils.getFileOwner(path); + String logPreFix = "rmr " + id; + List<String> commands = new ArrayList<>(); + commands.add("rmr"); + commands.add(path); + SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPreFix); + if (Utils.checkFileExists(path)) { + throw new RuntimeException(path + " was not deleted."); + } + } + + /** + * Given the blob information returns the value of the uncompress field, handling it either being a string or a boolean value, or if it's not specified then + * returns false + * + * @param blobInfo + * @return + */ + public static Boolean shouldUncompressBlob(Map<String, Object> blobInfo) { + return Utils.getBoolean(blobInfo.get("uncompress"), false); + } + + /** + * Returns a list of LocalResources based on the blobstore-map passed in + * + * @param blobstoreMap + * @return + */ + public static List<LocalResource> blobstoreMapToLocalresources(Map<String, Map<String, Object>> blobstoreMap) { + List<LocalResource> localResourceList = new ArrayList<>(); + if (blobstoreMap != null) { + for (Map.Entry<String, Map<String, Object>> map : blobstoreMap.entrySet()) { + LocalResource localResource = new LocalResource(map.getKey(), shouldUncompressBlob(map.getValue())); + localResourceList.add(localResource); + } + } + return localResourceList; + } + + /** + * For each of the downloaded topologies, adds references to the blobs that the topologies are using. This is used to reconstruct the cache on restart. + * + * @param localizer + * @param stormId + * @param conf + */ + public static void addBlobReferences(Localizer localizer, String stormId, Map conf) throws IOException { + Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); + Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); + String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); + String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME); + List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap); + if (blobstoreMap != null) { + localizer.addReferences(localresources, user, topoName); + } + } + + public static Set<String> readDownLoadedStormIds(Map conf) throws IOException { + Set<String> stormIds = new HashSet<>(); + String path = ConfigUtils.supervisorStormDistRoot(conf); + Collection<String> rets = Utils.readDirContents(path); + for (String ret : rets) { + stormIds.add(URLDecoder.decode(ret)); + } + return stormIds; + } + + public static Collection<String> supervisorWorkerIds(Map conf) { + String workerRoot = ConfigUtils.workerRoot(conf); + return Utils.readDirContents(workerRoot); + } + + public static boolean doRequiredTopoFilesExist(Map conf, String stormId) throws IOException { + String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId); + String stormjarpath = ConfigUtils.supervisorStormJarPath(stormroot); + String stormcodepath = ConfigUtils.supervisorStormCodePath(stormroot); + String stormconfpath = ConfigUtils.supervisorStormConfPath(stormroot); + if (!Utils.checkFileExists(stormroot)) + return false; + if (!Utils.checkFileExists(stormcodepath)) + return false; + if (!Utils.checkFileExists(stormconfpath)) + return false; + if (ConfigUtils.isLocalMode(conf) || Utils.checkFileExists(stormjarpath)) + return true; + return false; + } + + /** + * map from worker id to heartbeat + * + * @param conf + * @return + * @throws Exception + */ + public static Map<String, LSWorkerHeartbeat> readWorkerHeartbeats(Map conf) throws Exception { + return _instance.readWorkerHeartbeatsImpl(conf); + } + + public Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map conf) throws Exception { + Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>(); + + Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf); + + for (String workerId : workerIds) { + LSWorkerHeartbeat whb = readWorkerHeartbeat(conf, workerId); + // ATTENTION: whb can be null + workerHeartbeats.put(workerId, whb); + } + return workerHeartbeats; + } + + + /** + * get worker heartbeat by workerId + * + * @param conf + * @param workerId + * @return + * @throws IOException + */ + public static LSWorkerHeartbeat readWorkerHeartbeat(Map conf, String workerId) { + return _instance.readWorkerHeartbeatImpl(conf, workerId); + } + + public LSWorkerHeartbeat readWorkerHeartbeatImpl(Map conf, String workerId) { + try { + LocalState localState = ConfigUtils.workerState(conf, workerId); + return localState.getWorkerHeartBeat(); + } catch (Exception e) { + LOG.warn("Failed to read local heartbeat for workerId : {},Ignoring exception.", workerId, e); + return null; + } + } + + public static boolean isWorkerHbTimedOut(int now, LSWorkerHeartbeat whb, Map conf) { + return _instance.isWorkerHbTimedOutImpl(now, whb, conf); + } + + public boolean isWorkerHbTimedOutImpl(int now, LSWorkerHeartbeat whb, Map conf) { + boolean result = false; + if ((now - whb.get_time_secs()) > Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) { + result = true; + } + return result; + } + + public static String javaCmd(String cmd) { + return _instance.javaCmdImpl(cmd); + } + + public String javaCmdImpl(String cmd) { + String ret = null; + String javaHome = System.getenv().get("JAVA_HOME"); + if (StringUtils.isNotBlank(javaHome)) { + ret = javaHome + Utils.FILE_PATH_SEPARATOR + "bin" + Utils.FILE_PATH_SEPARATOR + cmd; + } else { + ret = cmd; + } + return ret; + } + + public final static List<ACL> supervisorZkAcls() { + final List<ACL> acls = new ArrayList<>(); + acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0)); + acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE)); + return acls; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/f03b8bec/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java index ec29855,0000000..6b294f2 mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java @@@ -1,221 -1,0 +1,221 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.supervisor.timer; + +import org.apache.storm.Config; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.generated.ProfileAction; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.*; + +public class RunProfilerActions implements Runnable { + private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class); + + private Map conf; + private IStormClusterState stormClusterState; + private String hostName; + + private String profileCmd; + + private SupervisorData supervisorData; + + private class ActionExitCallback implements Utils.ExitCodeCallable { + private String stormId; + private ProfileRequest profileRequest; + private String logPrefix; + + public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix) { + this.stormId = stormId; + this.profileRequest = profileRequest; + this.logPrefix = logPrefix; + } + + @Override + public Object call() throws Exception { + return null; + } + + @Override + public Object call(int exitCode) { + LOG.info("{} profile-action exited for {}", logPrefix, exitCode); + try { + stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest); + } catch (Exception e) { + LOG.warn("failed delete profileRequest: " + profileRequest); + } + return null; + } + } + + public RunProfilerActions(SupervisorData supervisorData) { + this.conf = supervisorData.getConf(); + this.stormClusterState = supervisorData.getStormClusterState(); + this.hostName = supervisorData.getHostName(); + this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND)); + this.supervisorData = supervisorData; + } + + @Override + public void run() { + Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfileActions().get(); + try { + for (Map.Entry<String, List<ProfileRequest>> entry : stormIdToActions.entrySet()) { + String stormId = entry.getKey(); + List<ProfileRequest> requests = entry.getValue(); + if (requests != null) { + for (ProfileRequest profileRequest : requests) { + if (profileRequest.get_nodeInfo().get_node().equals(hostName)) { + boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp() ? true : false; + Long port = profileRequest.get_nodeInfo().get_port().iterator().next(); + String targetDir = ConfigUtils.workerArtifactsRoot(conf, String.valueOf(port)); + Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); + + String user = null; + if (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER) != null) { + user = (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)); + } + Map<String, String> env = null; + if (stormConf.get(Config.TOPOLOGY_ENVIRONMENT) != null) { + env = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT); + } else { + env = new HashMap<String, String>(); + } + + String str = ConfigUtils.workerArtifactsPidPath(conf, stormId, port.intValue()); + StringBuilder stringBuilder = new StringBuilder(); + FileReader reader = null; + BufferedReader br = null; + try { + reader = new FileReader(str); + br = new BufferedReader(reader); + int c; + while ((c = br.read()) >= 0) { + stringBuilder.append(c); + } + } catch (IOException e) { + if (reader != null) + reader.close(); + if (br != null) + br.close(); + } + String workerPid = stringBuilder.toString().trim(); + ProfileAction profileAction = profileRequest.get_action(); + String logPrefix = "ProfilerAction process " + stormId + ":" + port + " PROFILER_ACTION: " + profileAction + " "; + + // Until PROFILER_STOP action is invalid, keep launching profiler start in case worker restarted + // The profiler plugin script validates if JVM is recording before starting another recording. + String command = mkCommand(profileAction, stop, workerPid, targetDir); + List<String> listCommand = new ArrayList<>(); + if (command != null) { + listCommand.addAll(Arrays.asList(command.split(" "))); + } + try { + ActionExitCallback actionExitCallback = new ActionExitCallback(stormId, profileRequest, logPrefix); + launchProfilerActionForWorker(user, targetDir, listCommand, env, actionExitCallback, logPrefix); + } catch (IOException e) { + LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port); + } catch (RuntimeException e) { + LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port); + } + } + } + } + } + } catch (Exception e) { + LOG.error("Error running profiler actions, will retry again later"); + } + } + + private void launchProfilerActionForWorker(String user, String targetDir, List<String> commands, Map<String, String> environment, + final Utils.ExitCodeCallable exitCodeCallable, String logPrefix) throws IOException { + File targetFile = new File(targetDir); + if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { + LOG.info("Running as user:{} command:{}", user, commands); + String containerFile = Utils.containerFilePath(targetDir); + if (Utils.checkFileExists(containerFile)) { + SupervisorUtils.rmrAsUser(conf, containerFile, containerFile); + } + String scriptFile = Utils.scriptFilePath(targetDir); + if (Utils.checkFileExists(scriptFile)) { + SupervisorUtils.rmrAsUser(conf, scriptFile, scriptFile); + } + String script = Utils.writeScript(targetDir, commands, environment); - List<String> newCommands = new ArrayList<>(); - newCommands.add("profiler"); - newCommands.add(targetDir); - newCommands.add(script); - SupervisorUtils.processLauncher(conf, user, newCommands, environment, logPrefix, exitCodeCallable, targetFile); ++ List<String> args = new ArrayList<>(); ++ args.add("profiler"); ++ args.add(targetDir); ++ args.add(script); ++ SupervisorUtils.processLauncher(conf, user, null, args, environment, logPrefix, exitCodeCallable, targetFile); + } else { + Utils.launchProcess(commands, environment, logPrefix, exitCodeCallable, targetFile); + } + } + + private String mkCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) { + if (action == ProfileAction.JMAP_DUMP) { + return jmapDumpCmd(workerPid, targetDir); + } else if (action == ProfileAction.JSTACK_DUMP) { + return jstackDumpCmd(workerPid, targetDir); + } else if (action == ProfileAction.JPROFILE_DUMP) { + return jprofileDump(workerPid, targetDir); + } else if (action == ProfileAction.JVM_RESTART) { + return jprofileJvmRestart(workerPid); + } else if (!stop && action == ProfileAction.JPROFILE_STOP) { + return jprofileStart(workerPid); + } else if (stop && action == ProfileAction.JPROFILE_STOP) { + return jprofileStop(workerPid, targetDir); + } + return null; + } + + private String jmapDumpCmd(String pid, String targetDir) { + return profileCmd + " " + pid + " jmap " + targetDir; + } + + private String jstackDumpCmd(String pid, String targetDir) { + return profileCmd + " " + pid + " jstack " + targetDir; + } + + private String jprofileStart(String pid) { + return profileCmd + " " + pid + " start"; + } + + private String jprofileStop(String pid, String targetDir) { + return profileCmd + " " + pid + " stop " + targetDir; + } + + private String jprofileDump(String pid, String targetDir) { + return profileCmd + " " + pid + " dump " + targetDir; + } + + private String jprofileJvmRestart(String pid) { + return profileCmd + " " + pid + " kill"; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/f03b8bec/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java index a73a9bd,0000000..9529b1a mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java @@@ -1,402 -1,0 +1,408 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor.workermanager; + +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.ProcessSimulator; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.*; + +public class DefaultWorkerManager implements IWorkerManager { + + private static Logger LOG = LoggerFactory.getLogger(DefaultWorkerManager.class); + + private Map conf; + private CgroupManager resourceIsolationManager; + private boolean runWorkerAsUser; + + @Override + public void prepareWorker(Map conf, Localizer localizer) { + this.conf = conf; + if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) { + try { + this.resourceIsolationManager = Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN)); + this.resourceIsolationManager.prepare(conf); + LOG.info("Using resource isolation plugin {} {}", conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager); + } catch (IOException e) { + throw Utils.wrapInRuntime(e); + } + } else { + this.resourceIsolationManager = null; + } + this.runWorkerAsUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false); + } + + @Override + public IWorkerResult launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources, + Utils.ExitCodeCallable workerExitCallback) { + try { + + String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); + String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options")); + String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file")); + String workerTmpDir = ConfigUtils.workerTmpRoot(conf, workerId); + + String stormLogDir = ConfigUtils.getLogDir(); + String stormLogConfDir = (String) (conf.get(Config.STORM_LOG4J2_CONF_DIR)); + + String stormLog4j2ConfDir; + if (StringUtils.isNotBlank(stormLogConfDir)) { + if (Utils.isAbsolutePath(stormLogConfDir)) { + stormLog4j2ConfDir = stormLogConfDir; + } else { + stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + stormLogConfDir; + } + } else { + stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2"; + } + + String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId); + + String jlp = jlp(stormRoot, conf); + + String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot); + + Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); + + String workerClassPath = getWorkerClassPath(stormJar, stormConf); + + Object topGcOptsObject = stormConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS); + List<String> topGcOpts = new ArrayList<>(); + if (topGcOptsObject instanceof String) { + topGcOpts.add((String) topGcOptsObject); + } else if (topGcOptsObject instanceof List) { + topGcOpts.addAll((List<String>) topGcOptsObject); + } + + int memOnheap = 0; + if (resources.get_mem_on_heap() > 0) { + memOnheap = (int) Math.ceil(resources.get_mem_on_heap()); + } else { + // set the default heap memory size for supervisor-test + memOnheap = Utils.getInt(stormConf.get(Config.WORKER_HEAP_MEMORY_MB), 768); + } + + int memoffheap = (int) Math.ceil(resources.get_mem_off_heap()); + + int cpu = (int) Math.ceil(resources.get_cpu()); + + List<String> gcOpts = null; + + if (topGcOpts.size() > 0) { + gcOpts = substituteChildopts(topGcOpts, workerId, stormId, port, memOnheap); + } else { + gcOpts = substituteChildopts(conf.get(Config.WORKER_GC_CHILDOPTS), workerId, stormId, port, memOnheap); + } + + Object topoWorkerLogwriterObject = stormConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS); + List<String> topoWorkerLogwriterChildopts = new ArrayList<>(); + if (topoWorkerLogwriterObject instanceof String) { + topoWorkerLogwriterChildopts.add((String) topoWorkerLogwriterObject); + } else if (topoWorkerLogwriterObject instanceof List) { + topoWorkerLogwriterChildopts.addAll((List<String>) topoWorkerLogwriterObject); + } + + String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); + + String logfileName = "worker.log"; + + String workersArtifacets = ConfigUtils.workerArtifactsRoot(conf); + + String loggingSensitivity = (String) stormConf.get(Config.TOPOLOGY_LOGGING_SENSITIVITY); + if (loggingSensitivity == null) { + loggingSensitivity = "S3"; + } + + List<String> workerChildopts = substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), workerId, stormId, port, memOnheap); + + List<String> topWorkerChildopts = substituteChildopts(stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), workerId, stormId, port, memOnheap); + + List<String> workerProfilerChildopts = null; + if (Utils.getBoolean(conf.get(Config.WORKER_PROFILER_ENABLED), false)) { + workerProfilerChildopts = substituteChildopts(conf.get(Config.WORKER_PROFILER_CHILDOPTS), workerId, stormId, port, memOnheap); + } else { + workerProfilerChildopts = new ArrayList<>(); + } + + Map<String, String> topEnvironment = new HashMap<String, String>(); + Map<String, String> environment = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT); + if (environment != null) { + topEnvironment.putAll(environment); + } + topEnvironment.put("LD_LIBRARY_PATH", jlp); + + String log4jConfigurationFile = null; + if (System.getProperty("os.name").startsWith("Windows") && !stormLog4j2ConfDir.startsWith("file:")) { + log4jConfigurationFile = "file:///" + stormLog4j2ConfDir; + } else { + log4jConfigurationFile = stormLog4j2ConfDir; + } + log4jConfigurationFile = log4jConfigurationFile + Utils.FILE_PATH_SEPARATOR + "worker.xml"; + + List<String> commandList = new ArrayList<>(); + commandList.add(SupervisorUtils.javaCmd("java")); + commandList.add("-cp"); + commandList.add(workerClassPath); + commandList.addAll(topoWorkerLogwriterChildopts); + commandList.add("-Dlogfile.name=" + logfileName); + commandList.add("-Dstorm.home=" + stormHome); + commandList.add("-Dworkers.artifacts=" + workersArtifacets); + commandList.add("-Dstorm.id=" + stormId); + commandList.add("-Dworker.id=" + workerId); + commandList.add("-Dworker.port=" + port); + commandList.add("-Dstorm.log.dir=" + stormLogDir); + commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile); + commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"); + commandList.add("org.apache.storm.LogWriter"); + + commandList.add(SupervisorUtils.javaCmd("java")); + commandList.add("-server"); + commandList.addAll(workerChildopts); + commandList.addAll(topWorkerChildopts); + commandList.addAll(gcOpts); + commandList.addAll(workerProfilerChildopts); + commandList.add("-Djava.library.path=" + jlp); + commandList.add("-Dlogfile.name=" + logfileName); + commandList.add("-Dstorm.home=" + stormHome); + commandList.add("-Dworkers.artifacts=" + workersArtifacets); + commandList.add("-Dstorm.conf.file=" + stormConfFile); + commandList.add("-Dstorm.options=" + stormOptions); + commandList.add("-Dstorm.log.dir=" + stormLogDir); + commandList.add("-Djava.io.tmpdir=" + workerTmpDir); + commandList.add("-Dlogging.sensitivity=" + loggingSensitivity); + commandList.add("-Dlog4j.configurationFile=" + log4jConfigurationFile); + commandList.add("-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector"); + commandList.add("-Dstorm.id=" + stormId); + commandList.add("-Dworker.id=" + workerId); + commandList.add("-Dworker.port=" + port); + commandList.add("-cp"); + commandList.add(workerClassPath); + commandList.add("org.apache.storm.daemon.worker"); + commandList.add(stormId); + commandList.add(assignmentId); + commandList.add(String.valueOf(port)); + commandList.add(workerId); + + // {"cpu" cpu "memory" (+ mem-onheap mem-offheap (int (Math/ceil (conf STORM-CGROUP-MEMORY-LIMIT-TOLERANCE-MARGIN-MB)))) + if (resourceIsolationManager != null) { + int cGroupMem = (int) (Math.ceil((double) conf.get(Config.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB))); + int memoryValue = memoffheap + memOnheap + cGroupMem; + int cpuValue = cpu; + Map<String, Number> map = new HashMap<>(); + map.put("cpu", cpuValue); + map.put("memory", memoryValue); + resourceIsolationManager.reserveResourcesForWorker(workerId, map); + commandList = resourceIsolationManager.getLaunchCommand(workerId, commandList); + } + + LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList)); + + String logPrefix = "Worker Process " + workerId; + String workerDir = ConfigUtils.workerRoot(conf, workerId); + + if (runWorkerAsUser) { + List<String> args = new ArrayList<>(); + args.add("worker"); + args.add(workerDir); + args.add(Utils.writeScript(workerDir, commandList, topEnvironment)); - SupervisorUtils.processLauncher(conf, user, args, null, logPrefix, workerExitCallback, new File(workerDir)); ++ List<String> commandPrefix = null; ++ if (resourceIsolationManager != null) ++ commandPrefix = resourceIsolationManager.getLaunchCommandPrefix(workerId); ++ SupervisorUtils.processLauncher(conf, user, commandPrefix, args, null, logPrefix, workerExitCallback, new File(workerDir)); + } else { + Utils.launchProcess(commandList, topEnvironment, logPrefix, workerExitCallback, new File(workerDir)); + } + } catch (IOException e) { + throw Utils.wrapInRuntime(e); + } + return null; + } + + @Override + public IWorkerResult shutdownWorker(String supervisorId, String workerId, Map<String, String> workerThreadPids) { + try { + LOG.info("Shutting down {}:{}", supervisorId, workerId); + Collection<String> pids = Utils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId)); + Integer shutdownSleepSecs = Utils.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)); + String user = ConfigUtils.getWorkerUser(conf, workerId); + String threadPid = workerThreadPids.get(workerId); + if (StringUtils.isNotBlank(threadPid)) { + ProcessSimulator.killProcess(threadPid); + } + + for (String pid : pids) { + if (runWorkerAsUser) { + List<String> commands = new ArrayList<>(); + commands.add("signal"); + commands.add(pid); + commands.add("15"); + String logPrefix = "kill -15 " + pid; + SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPrefix); + } else { + Utils.killProcessWithSigTerm(pid); + } + } + + if (pids.size() > 0) { + LOG.info("Sleep {} seconds for execution of cleanup threads on worker.", shutdownSleepSecs); + Time.sleepSecs(shutdownSleepSecs); + } + + for (String pid : pids) { + if (runWorkerAsUser) { + List<String> commands = new ArrayList<>(); + commands.add("signal"); + commands.add(pid); + commands.add("9"); + String logPrefix = "kill -9 " + pid; + SupervisorUtils.processLauncherAndWait(conf, user, commands, null, logPrefix); + } else { + Utils.forceKillProcess(pid); + } + String path = ConfigUtils.workerPidPath(conf, workerId, pid); + if (runWorkerAsUser) { + SupervisorUtils.rmrAsUser(conf, workerId, path); + } else { + try { + LOG.debug("Removing path {}", path); + new File(path).delete(); + } catch (Exception e) { + // on windows, the supervisor may still holds the lock on the worker directory + // ignore + } + } + } + LOG.info("Shut down {}:{}", supervisorId, workerId); + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } + return null; + } + + @Override + public boolean cleanupWorker(String workerId) { + try { ++ //clean up for resource isolation if enabled ++ if (resourceIsolationManager != null) { ++ resourceIsolationManager.releaseResourcesForWorker(workerId); ++ } ++ //Always make sure to clean up everything else before worker directory ++ //is removed since that is what is going to trigger the retry for cleanup + String workerRoot = ConfigUtils.workerRoot(conf, workerId); + if (Utils.checkFileExists(workerRoot)) { + if (runWorkerAsUser) { + SupervisorUtils.rmrAsUser(conf, workerId, workerRoot); + } else { + Utils.forceDelete(ConfigUtils.workerHeartbeatsRoot(conf, workerId)); + Utils.forceDelete(ConfigUtils.workerPidsRoot(conf, workerId)); + Utils.forceDelete(ConfigUtils.workerTmpRoot(conf, workerId)); + Utils.forceDelete(ConfigUtils.workerRoot(conf, workerId)); + } + ConfigUtils.removeWorkerUserWSE(conf, workerId); + } - if (resourceIsolationManager != null) { - resourceIsolationManager.releaseResourcesForWorker(workerId); - } + return true; + } catch (IOException e) { + LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e); + } catch (RuntimeException e) { + LOG.warn("Failed to cleanup worker {}. Will retry later", workerId, e); + } + return false; + } + + @Override + public IWorkerResult resizeWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources) { + return null; + } + + protected String jlp(String stormRoot, Map conf) { + String resourceRoot = stormRoot + Utils.FILE_PATH_SEPARATOR + ConfigUtils.RESOURCES_SUBDIR; + String os = System.getProperty("os.name").replaceAll("\\s+", "_"); + String arch = System.getProperty("os.arch"); + String archResourceRoot = resourceRoot + Utils.FILE_PATH_SEPARATOR + os + "-" + arch; + String ret = archResourceRoot + Utils.CLASS_PATH_SEPARATOR + resourceRoot + Utils.CLASS_PATH_SEPARATOR + conf.get(Config.JAVA_LIBRARY_PATH); + return ret; + } + + protected String getWorkerClassPath(String stormJar, Map stormConf) { + List<String> topoClasspath = new ArrayList<>(); + Object object = stormConf.get(Config.TOPOLOGY_CLASSPATH); + + if (object instanceof List) { + topoClasspath.addAll((List<String>) object); + } else if (object instanceof String) { + topoClasspath.add((String) object); + } else { + LOG.error("topology specific classpath is invaild"); + } + String classPath = Utils.workerClasspath(); + String classAddPath = Utils.addToClasspath(classPath, Arrays.asList(stormJar)); + return Utils.addToClasspath(classAddPath, topoClasspath); + } + + /** + * "Generates runtime childopts by replacing keys with topology-id, worker-id, port, mem-onheap" + * + * @param value + * @param workerId + * @param stormId + * @param port + * @param memOnheap + */ + public List<String> substituteChildopts(Object value, String workerId, String stormId, Long port, int memOnheap) { + List<String> rets = new ArrayList<>(); + if (value instanceof String) { + String string = (String) value; + if (StringUtils.isNotBlank(string)){ + string = string.replace("%ID%", String.valueOf(port)); + string = string.replace("%WORKER-ID%", workerId); + string = string.replace("%TOPOLOGY-ID%", stormId); + string = string.replace("%WORKER-PORT%", String.valueOf(port)); + string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap)); + String[] strings = string.split("\\s+"); + rets.addAll(Arrays.asList(strings)); + } + + } else if (value instanceof List) { + List<Object> objects = (List<Object>) value; + for (Object object : objects) { + String str = (String) object; + if (StringUtils.isNotBlank(str)){ + str = str.replace("%ID%", String.valueOf(port)); + str = str.replace("%WORKER-ID%", workerId); + str = str.replace("%TOPOLOGY-ID%", stormId); + str = str.replace("%WORKER-PORT%", String.valueOf(port)); + str = str.replace("%HEAP-MEM%", String.valueOf(memOnheap)); + rets.add(str); + } + } + } + return rets; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/f03b8bec/storm-core/src/jvm/org/apache/storm/utils/Utils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/f03b8bec/storm-core/test/clj/org/apache/storm/supervisor_test.clj ---------------------------------------------------------------------- diff --cc storm-core/test/clj/org/apache/storm/supervisor_test.clj index 8f11f8a,ade1c2f..0d6603d --- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj +++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj @@@ -728,7 -689,7 +728,7 @@@ (launchProcessImpl [& _] nil))] (with-open [_ (UtilsInstaller. utils-proxy)] (is (try - (SupervisorUtils/processLauncher {} nil (ArrayList.) {} nil nil nil) - (supervisor/worker-launcher {} nil "") ++ (SupervisorUtils/processLauncher {} nil nil (ArrayList.) {} nil nil nil) false (catch Throwable t (and (re-matches #"(?i).*user cannot be blank.*" (.getMessage t))
