Allow ShellBolt,ShellSpout to set env vars (particularly PATH)

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7e81e54e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7e81e54e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7e81e54e

Branch: refs/heads/master
Commit: 7e81e54e16feba742ec8501d7b9497ce2c2bf68f
Parents: 27a3a6b
Author: Shyam Rajendran <srajend...@yahoo-inc.com>
Authored: Wed Jun 10 09:53:48 2015 -0500
Committer: Shyam Rajendran <rshyam....@gmail.com>
Committed: Wed Jul 1 13:10:54 2015 -0500

----------------------------------------------------------------------
 .../jvm/backtype/storm/spout/ShellSpout.java    | 26 +++++++++++++------
 .../src/jvm/backtype/storm/task/ShellBolt.java  | 10 ++++++++
 .../jvm/backtype/storm/utils/ShellProcess.java  | 27 ++++++++++++++++----
 3 files changed, 50 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/7e81e54e/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java 
b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index ece11ee..1abee52 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -25,29 +25,31 @@ import backtype.storm.multilang.ShellMsg;
 import backtype.storm.multilang.SpoutMsg;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.utils.ShellProcess;
-import java.util.Map;
+import clojure.lang.RT;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.TimerTask;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import clojure.lang.RT;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 
 public class ShellSpout implements ISpout {
     public static Logger LOG = LoggerFactory.getLogger(ShellSpout.class);
 
     private SpoutOutputCollector _collector;
     private String[] _command;
+    private Map<String, String> env = new HashMap<String, String>();
     private ShellProcess _process;
-    
+
     private TopologyContext _context;
-    
+
     private SpoutMsg _spoutMsg;
 
     private int workerTimeoutMills;
@@ -62,6 +64,11 @@ public class ShellSpout implements ISpout {
         _command = command;
     }
 
+    public ShellSpout setEnv(Map<String, String> env) {
+        this.env = env;
+        return this;
+    }
+
     public void open(Map stormConf, TopologyContext context,
                      SpoutOutputCollector collector) {
         _collector = collector;
@@ -70,6 +77,9 @@ public class ShellSpout implements ISpout {
         workerTimeoutMills = 1000 * 
RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
 
         _process = new ShellProcess(_command);
+        if (!env.isEmpty()) {
+            _process.setEnv(env);
+        }
 
         Number subpid = _process.launch(stormConf, context);
         LOG.info("Launched subprocess with pid " + subpid);

http://git-wip-us.apache.org/repos/asf/storm/blob/7e81e54e/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java 
b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index eac8a90..b246784 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -74,6 +74,7 @@ public class ShellBolt implements IBolt {
     Map<String, Tuple> _inputs = new ConcurrentHashMap<String, Tuple>();
 
     private String[] _command;
+    private Map<String, String> env = new HashMap<String, String>();
     private ShellProcess _process;
     private volatile boolean _running = true;
     private volatile Throwable _exception;
@@ -98,6 +99,12 @@ public class ShellBolt implements IBolt {
         _command = command;
     }
 
+
+    public ShellBolt setEnv(Map<String, String> env) {
+        this.env = env;
+        return this;
+    }
+
     public void prepare(Map stormConf, TopologyContext context,
                         final OutputCollector collector) {
         Object maxPending = 
stormConf.get(Config.TOPOLOGY_SHELLBOLT_MAX_PENDING);
@@ -112,6 +119,9 @@ public class ShellBolt implements IBolt {
         workerTimeoutMills = 1000 * 
RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
 
         _process = new ShellProcess(_command);
+        if (!env.isEmpty()) {
+            _process.setEnv(env);
+        }
 
         //subprocesses must send their pid first thing
         Number subpid = _process.launch(stormConf, context);

http://git-wip-us.apache.org/repos/asf/storm/blob/7e81e54e/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java 
b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
index 82eabf1..8134be7 100644
--- a/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
+++ b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
@@ -18,16 +18,14 @@
 package backtype.storm.utils;
 
 import backtype.storm.Config;
-import backtype.storm.multilang.ISerializer;
-import backtype.storm.multilang.BoltMsg;
-import backtype.storm.multilang.NoOutputException;
-import backtype.storm.multilang.ShellMsg;
-import backtype.storm.multilang.SpoutMsg;
+import backtype.storm.multilang.*;
 import backtype.storm.task.TopologyContext;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -40,6 +38,7 @@ public class ShellProcess implements Serializable {
     private Process      _subprocess;
     private InputStream  processErrorStream;
     private String[]     command;
+    private Map<String, String> env = new HashMap<String, String>();
     public ISerializer   serializer;
     public Number pid;
     public String componentName;
@@ -48,8 +47,26 @@ public class ShellProcess implements Serializable {
         this.command = command;
     }
 
+    public void setEnv(Map<String, String> env) {
+        this.env = env;
+    }
+
+    private void modifyEnvironment(Map<String, String> buildEnv) {
+        for (Map.Entry<String, String> entry : env.entrySet()) {
+            if (entry.getKey().equals("PATH")) {
+                buildEnv.put("PATH", buildEnv.get("PATH") + ":" + 
env.get("PATH"));
+            } else {
+                buildEnv.put(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+
     public Number launch(Map conf, TopologyContext context) {
         ProcessBuilder builder = new ProcessBuilder(command);
+        if (!env.isEmpty()) {
+            Map<String, String> buildEnv = builder.environment();
+            modifyEnvironment(buildEnv);
+        }
         builder.directory(new File(context.getCodeDir()));
 
         ShellLogger = LoggerFactory.getLogger(context.getThisComponentId());

Reply via email to