[ 
https://issues.apache.org/jira/browse/STORM-138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14002445#comment-14002445
 ] 

ASF GitHub Bot commented on STORM-138:
--------------------------------------

Github user d2r commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/84#discussion_r12817824
  
    --- Diff: storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---
    @@ -35,91 +35,84 @@
         private SpoutOutputCollector _collector;
         private String[] _command;
         private ShellProcess _process;
    +    private SpoutMsg spoutMsg;
     
         public ShellSpout(ShellComponent component) {
             this(component.get_execution_command(), component.get_script());
         }
    -    
    +
         public ShellSpout(String... command) {
             _command = command;
         }
    -    
    +
         public void open(Map stormConf, TopologyContext context,
                          SpoutOutputCollector collector) {
    -        _process = new ShellProcess(_command);
             _collector = collector;
     
    -        try {
    -            Number subpid = _process.launch(stormConf, context);
    -            LOG.info("Launched subprocess with pid " + subpid);
    -        } catch (IOException e) {
    -            throw new RuntimeException("Error when launching multilang 
subprocess\n" + _process.getErrorsString(), e);
    -        }
    +        _process = new ShellProcess(_command);
    +
    +        Number subpid = _process.launch(stormConf, context);
    +        LOG.info("Launched subprocess with pid " + subpid);
         }
     
         public void close() {
             _process.destroy();
         }
     
    -    private JSONObject _next;
         public void nextTuple() {
    -        if (_next == null) {
    -            _next = new JSONObject();
    -            _next.put("command", "next");
    +        if (spoutMsg == null) {
    +            spoutMsg = new SpoutMsg();
             }
    -
    -        querySubprocess(_next);
    +        spoutMsg.setCommand("next");
    +        spoutMsg.setId("");
    +        querySubprocess();
         }
     
    -    private JSONObject _ack;
         public void ack(Object msgId) {
    -        if (_ack == null) {
    -            _ack = new JSONObject();
    -            _ack.put("command", "ack");
    +        if (spoutMsg == null) {
    +            spoutMsg = new SpoutMsg();
             }
    -
    -        _ack.put("id", msgId);
    -        querySubprocess(_ack);
    +        spoutMsg.setCommand("ack");
    +        spoutMsg.setId(msgId);
    +        querySubprocess();
         }
     
    -    private JSONObject _fail;
         public void fail(Object msgId) {
    -        if (_fail == null) {
    -            _fail = new JSONObject();
    -            _fail.put("command", "fail");
    +        if (spoutMsg == null) {
    +            spoutMsg = new SpoutMsg();
             }
    -
    -        _fail.put("id", msgId);
    -        querySubprocess(_fail);
    +        spoutMsg.setCommand("fail");
    +        spoutMsg.setId(msgId);
    +        querySubprocess();
         }
     
    -    private void querySubprocess(Object query) {
    +    private void querySubprocess() {
             try {
    -            _process.writeMessage(query);
    +            _process.writeSpoutMsg(spoutMsg);
     
                 while (true) {
    -                JSONObject action = _process.readMessage();
    -                String command = (String) action.get("command");
    +                ShellMsg shellMsg = _process.readShellMsg();
    +                String command = shellMsg.getCommand();
                     if (command.equals("sync")) {
                         return;
                     } else if (command.equals("log")) {
    -                    String msg = (String) action.get("msg");
    +                    String msg = shellMsg.getMsg();
                         LOG.info("Shell msg: " + msg);
                     } else if (command.equals("emit")) {
    -                    String stream = (String) action.get("stream");
    -                    if (stream == null) stream = Utils.DEFAULT_STREAM_ID;
    --- End diff --
    
    This is not a big enough issue to keep from merging this in.  If we run 
into troubles with this down the road, we can file another JIRA as an 
enhancement.


> Pluggable serialization for multilang
> -------------------------------------
>
>                 Key: STORM-138
>                 URL: https://issues.apache.org/jira/browse/STORM-138
>             Project: Apache Storm (Incubating)
>          Issue Type: New Feature
>            Reporter: James Xu
>            Priority: Minor
>
> https://github.com/nathanmarz/storm/issues/373
> Currently JSON is used to serialize tuples for multilang. It would be great 
> if the serialization mechanism were pluggable so that using richer types with 
> multilang would be possible.
> ---------
> francis-liberty: Hello, I am a newbie here, and I wanted to pick up this 
> issue. I also noticed a recent PR here #697 by jsgilmore, is it feasible for 
> this issue, too?
> I looked around the source code, and I would like to talk about my opinions 
> on this issue here.
> For now, ShellProcess only supports JSON to communicate with multilang 
> process: read, write. And, ShellSpout and ShellBolt talk with ShellProcess 
> through JSON, too. This is all because ShellProcess's interface use 
> JSONObject only. Conceptually, ShellProcess should encapsulate the multilang 
> details, and talk with Bolt and Spout using Tuple. (jsgilmore invented two 
> new classes, Immission and Emission. But I think all information Bolt and 
> Spout need is in Tuple already, no need for new data structures.) So, I think 
> it would be much cleaner to do serialization in ShellProcess only, and both 
> ShellSpout and ShellBolt don't know anything about how ShellProcess convert 
> between Tuple and strings.
> So, I suppose I can do the work of
> 1. change the interface of ShellProcess to return and accept Tuple data 
> structure, instead of JSONObject.
> 2. make ShellSpout and ShellBolt work on Tuple, all information like task_id, 
> stream_id and tuples should be retrieve/encapsulate in this data structure.
> 3. what other serialization format would you like to add? I think in the end 
> we need to add some example other than JSON to storm-starter storm.py/rb, 
> which I would also like to work on.
> ----------
> jsgilmore: Hi, all serialisation is done in the JSONSerialiser, so no 
> serialisation is done in ShellBolt, ShellProcess or ShellSpout. They just 
> send around the Emission and Immission classes. The point of the ISerializer 
> interface is to achieve the separation of serialisation.
> I come from the multilang side of Storm, so I'm not that familiar with the 
> internal Storm structures. If there is a class that the ISerializer interface 
> can use, instead of the Emission and Immission classes, I'm open to it.
> I would recommend that further discussion of PR #697 rather happen in the PR 
> thread itself though.
> I created an issue to add protocol buffer serialisation for multilang to 
> Storm in issue #654 , but I didn't see this issue. The whole purpose of PR 
> #697 is to solve this issue.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to