Github user d2r commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/84#discussion_r12665267
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---
    @@ -35,91 +35,84 @@
         private SpoutOutputCollector _collector;
         private String[] _command;
         private ShellProcess _process;
    +    private SpoutMsg spoutMsg;
     
         public ShellSpout(ShellComponent component) {
             this(component.get_execution_command(), component.get_script());
         }
    -    
    +
         public ShellSpout(String... command) {
             _command = command;
         }
    -    
    +
         public void open(Map stormConf, TopologyContext context,
                          SpoutOutputCollector collector) {
    -        _process = new ShellProcess(_command);
             _collector = collector;
     
    -        try {
    -            Number subpid = _process.launch(stormConf, context);
    -            LOG.info("Launched subprocess with pid " + subpid);
    -        } catch (IOException e) {
    -            throw new RuntimeException("Error when launching multilang 
subprocess\n" + _process.getErrorsString(), e);
    -        }
    +        _process = new ShellProcess(_command);
    +
    +        Number subpid = _process.launch(stormConf, context);
    +        LOG.info("Launched subprocess with pid " + subpid);
         }
     
         public void close() {
             _process.destroy();
         }
     
    -    private JSONObject _next;
         public void nextTuple() {
    -        if (_next == null) {
    -            _next = new JSONObject();
    -            _next.put("command", "next");
    +        if (spoutMsg == null) {
    +            spoutMsg = new SpoutMsg();
             }
    -
    -        querySubprocess(_next);
    +        spoutMsg.setCommand("next");
    +        spoutMsg.setId("");
    +        querySubprocess();
         }
     
    -    private JSONObject _ack;
         public void ack(Object msgId) {
    -        if (_ack == null) {
    -            _ack = new JSONObject();
    -            _ack.put("command", "ack");
    +        if (spoutMsg == null) {
    +            spoutMsg = new SpoutMsg();
             }
    -
    -        _ack.put("id", msgId);
    -        querySubprocess(_ack);
    +        spoutMsg.setCommand("ack");
    +        spoutMsg.setId(msgId);
    +        querySubprocess();
         }
     
    -    private JSONObject _fail;
         public void fail(Object msgId) {
    -        if (_fail == null) {
    -            _fail = new JSONObject();
    -            _fail.put("command", "fail");
    +        if (spoutMsg == null) {
    +            spoutMsg = new SpoutMsg();
             }
    -
    -        _fail.put("id", msgId);
    -        querySubprocess(_fail);
    +        spoutMsg.setCommand("fail");
    +        spoutMsg.setId(msgId);
    +        querySubprocess();
         }
     
    -    private void querySubprocess(Object query) {
    +    private void querySubprocess() {
             try {
    -            _process.writeMessage(query);
    +            _process.writeSpoutMsg(spoutMsg);
     
                 while (true) {
    -                JSONObject action = _process.readMessage();
    -                String command = (String) action.get("command");
    +                ShellMsg shellMsg = _process.readShellMsg();
    +                String command = shellMsg.getCommand();
                     if (command.equals("sync")) {
                         return;
                     } else if (command.equals("log")) {
    -                    String msg = (String) action.get("msg");
    +                    String msg = shellMsg.getMsg();
                         LOG.info("Shell msg: " + msg);
                     } else if (command.equals("emit")) {
    -                    String stream = (String) action.get("stream");
    -                    if (stream == null) stream = Utils.DEFAULT_STREAM_ID;
    --- End diff --
    
    Do we still need to set a default stream id if we get a null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to