Allow ShellBolt,ShellSpout to set env vars (particularly PATH)
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7e81e54e Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7e81e54e Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7e81e54e Branch: refs/heads/master Commit: 7e81e54e16feba742ec8501d7b9497ce2c2bf68f Parents: 27a3a6b Author: Shyam Rajendran <srajend...@yahoo-inc.com> Authored: Wed Jun 10 09:53:48 2015 -0500 Committer: Shyam Rajendran <rshyam....@gmail.com> Committed: Wed Jul 1 13:10:54 2015 -0500 ---------------------------------------------------------------------- .../jvm/backtype/storm/spout/ShellSpout.java | 26 +++++++++++++------ .../src/jvm/backtype/storm/task/ShellBolt.java | 10 ++++++++ .../jvm/backtype/storm/utils/ShellProcess.java | 27 ++++++++++++++++---- 3 files changed, 50 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/7e81e54e/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java index ece11ee..1abee52 100644 --- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java +++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java @@ -25,29 +25,31 @@ import backtype.storm.multilang.ShellMsg; import backtype.storm.multilang.SpoutMsg; import backtype.storm.task.TopologyContext; import backtype.storm.utils.ShellProcess; -import java.util.Map; +import clojure.lang.RT; +import com.google.common.util.concurrent.MoreExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.TimerTask; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import clojure.lang.RT; -import com.google.common.util.concurrent.MoreExecutors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class ShellSpout implements ISpout { public static Logger LOG = LoggerFactory.getLogger(ShellSpout.class); private SpoutOutputCollector _collector; private String[] _command; + private Map<String, String> env = new HashMap<String, String>(); private ShellProcess _process; - + private TopologyContext _context; - + private SpoutMsg _spoutMsg; private int workerTimeoutMills; @@ -62,6 +64,11 @@ public class ShellSpout implements ISpout { _command = command; } + public ShellSpout setEnv(Map<String, String> env) { + this.env = env; + return this; + } + public void open(Map stormConf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; @@ -70,6 +77,9 @@ public class ShellSpout implements ISpout { workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)); _process = new ShellProcess(_command); + if (!env.isEmpty()) { + _process.setEnv(env); + } Number subpid = _process.launch(stormConf, context); LOG.info("Launched subprocess with pid " + subpid); http://git-wip-us.apache.org/repos/asf/storm/blob/7e81e54e/storm-core/src/jvm/backtype/storm/task/ShellBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java index eac8a90..b246784 100644 --- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java +++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java @@ -74,6 +74,7 @@ public class ShellBolt implements IBolt { Map<String, Tuple> _inputs = new ConcurrentHashMap<String, Tuple>(); private String[] _command; + private Map<String, String> env = new HashMap<String, String>(); private ShellProcess _process; private volatile boolean _running = true; private volatile Throwable _exception; @@ -98,6 +99,12 @@ public class ShellBolt implements IBolt { _command = command; } + + public ShellBolt setEnv(Map<String, String> env) { + this.env = env; + return this; + } + public void prepare(Map stormConf, TopologyContext context, final OutputCollector collector) { Object maxPending = stormConf.get(Config.TOPOLOGY_SHELLBOLT_MAX_PENDING); @@ -112,6 +119,9 @@ public class ShellBolt implements IBolt { workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)); _process = new ShellProcess(_command); + if (!env.isEmpty()) { + _process.setEnv(env); + } //subprocesses must send their pid first thing Number subpid = _process.launch(stormConf, context); http://git-wip-us.apache.org/repos/asf/storm/blob/7e81e54e/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java index 82eabf1..8134be7 100644 --- a/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java +++ b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java @@ -18,16 +18,14 @@ package backtype.storm.utils; import backtype.storm.Config; -import backtype.storm.multilang.ISerializer; -import backtype.storm.multilang.BoltMsg; -import backtype.storm.multilang.NoOutputException; -import backtype.storm.multilang.ShellMsg; -import backtype.storm.multilang.SpoutMsg; +import backtype.storm.multilang.*; import backtype.storm.task.TopologyContext; + import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.Serializable; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -40,6 +38,7 @@ public class ShellProcess implements Serializable { private Process _subprocess; private InputStream processErrorStream; private String[] command; + private Map<String, String> env = new HashMap<String, String>(); public ISerializer serializer; public Number pid; public String componentName; @@ -48,8 +47,26 @@ public class ShellProcess implements Serializable { this.command = command; } + public void setEnv(Map<String, String> env) { + this.env = env; + } + + private void modifyEnvironment(Map<String, String> buildEnv) { + for (Map.Entry<String, String> entry : env.entrySet()) { + if (entry.getKey().equals("PATH")) { + buildEnv.put("PATH", buildEnv.get("PATH") + ":" + env.get("PATH")); + } else { + buildEnv.put(entry.getKey(), entry.getValue()); + } + } + } + public Number launch(Map conf, TopologyContext context) { ProcessBuilder builder = new ProcessBuilder(command); + if (!env.isEmpty()) { + Map<String, String> buildEnv = builder.environment(); + modifyEnvironment(buildEnv); + } builder.directory(new File(context.getCodeDir())); ShellLogger = LoggerFactory.getLogger(context.getThisComponentId());