[ 
https://issues.apache.org/jira/browse/STORM-138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13998136#comment-13998136
 ] 

ASF GitHub Bot commented on STORM-138:
--------------------------------------

Github user d2r commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/84#discussion_r12665250
  
    --- Diff: storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java 
---
    @@ -0,0 +1,164 @@
    +package backtype.storm.multilang;
    +
    +import java.io.BufferedReader;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.json.simple.JSONObject;
    +import org.json.simple.JSONValue;
    +
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.utils.Utils;
    +
    +/**
    + * JsonSerializer implements the JSON multilang protocol.
    + */
    +public class JsonSerializer implements ISerializer {
    +    private DataOutputStream processIn;
    +    private BufferedReader processOut;
    +
    +    public void initialize(OutputStream processIn, InputStream processOut) 
{
    +        this.processIn = new DataOutputStream(processIn);
    +        this.processOut = new BufferedReader(new 
InputStreamReader(processOut));
    +    }
    +
    +    public Number connect(Map conf, TopologyContext context)
    +            throws IOException, NoOutputException {
    +        JSONObject setupInfo = new JSONObject();
    +        setupInfo.put("pidDir", context.getPIDDir());
    +        setupInfo.put("conf", conf);
    +        setupInfo.put("context", context);
    +        writeMessage(setupInfo);
    +
    +        Number pid = (Number) ((JSONObject) readMessage()).get("pid");
    +        return pid;
    +    }
    +
    +    public void writeBoltMsg(BoltMsg boltMsg) throws IOException {
    +        JSONObject obj = new JSONObject();
    +        obj.put("id", boltMsg.getId());
    +        obj.put("comp", boltMsg.getComp());
    +        obj.put("stream", boltMsg.getStream());
    +        obj.put("task", boltMsg.getTask());
    +        obj.put("tuple", boltMsg.getTuple());
    +        writeMessage(obj);
    +    }
    +
    +    public void writeSpoutMsg(SpoutMsg msg) throws IOException {
    +        JSONObject obj = new JSONObject();
    +        obj.put("command", msg.getCommand());
    +        obj.put("id", msg.getId());
    +        writeMessage(obj);
    +    }
    +
    +    public void writeTaskIds(List<Integer> taskIds) throws IOException {
    +        writeMessage(taskIds);
    +    }
    +
    +    private void writeMessage(Object msg) throws IOException {
    +        writeString(JSONValue.toJSONString(msg));
    +    }
    +
    +    private void writeString(String str) throws IOException {
    +        byte[] strBytes = str.getBytes("UTF-8");
    +        processIn.write(strBytes, 0, strBytes.length);
    +        processIn.writeBytes("\nend\n");
    +        processIn.flush();
    +    }
    +
    +    public ShellMsg readShellMsg() throws IOException, NoOutputException {
    +        JSONObject msg = (JSONObject) readMessage();
    +        ShellMsg shellMsg = new ShellMsg();
    +
    +        String command = (String) msg.get("command");
    +        shellMsg.setCommand(command);
    +
    +        Object id = msg.get("id");
    +        shellMsg.setId(id);
    +
    +        String log = (String) msg.get("msg");
    +        shellMsg.setMsg(log);
    +
    +        String stream = (String) msg.get("stream");
    +        if (stream == null)
    +            stream = Utils.DEFAULT_STREAM_ID;
    +        shellMsg.setStream(stream);
    +
    +        Object taskObj = msg.get("task");
    +        if (taskObj != null) {
    +            shellMsg.setTask((Long) taskObj);
    +        } else {
    +            shellMsg.setTask(0);
    +        }
    +
    +        Object need_task_ids = msg.get("need_task_ids");
    +        if (need_task_ids == null || ((Boolean) 
need_task_ids).booleanValue()) {
    +            shellMsg.setNeedTaskIds(true);
    +        } else {
    +            shellMsg.setNeedTaskIds(false);
    +        }
    +
    +        shellMsg.setTuple((List) msg.get("tuple"));
    +
    +        List<Tuple> anchors = new ArrayList<Tuple>();
    +        Object anchorObj = msg.get("anchors");
    +        if (anchorObj != null) {
    +            if (anchorObj instanceof String) {
    +                anchorObj = Arrays.asList(anchorObj);
    +            }
    +            for (Object o : (List) anchorObj) {
    +                shellMsg.addAnchor((String) o);
    +            }
    +        }
    +
    +        return shellMsg;
    +    }
    +
    +    private Object readMessage() throws IOException, NoOutputException {
    +        String string = readString();
    +        Object msg = JSONValue.parse(string);
    +        if (msg != null) {
    +            return msg;
    +        } else {
    +            throw new IOException("unable to parse: " + string);
    +        }
    +    }
    +
    +    private String readString() throws IOException, NoOutputException {
    +        StringBuilder line = new StringBuilder();
    +
    +        // synchronized (processOut) {
    +        while (true) {
    +            String subline = processOut.readLine();
    +            if (subline == null) {
    +                StringBuilder errorMessage = new StringBuilder();
    +                errorMessage.append("Pipe to subprocess seems to be 
broken!");
    +                if (line.length() == 0) {
    +                    errorMessage.append(" No output read.\n");
    +                } else {
    +                    errorMessage.append(" Currently read output: "
    +                            + line.toString() + "\n");
    +                }
    +                errorMessage.append("Serializer Exception:\n");
    +                throw new NoOutputException(errorMessage.toString());
    +            }
    +            if (subline.equals("end")) {
    +                break;
    +            }
    +            if (line.length() != 0) {
    +                line.append("\n");
    +            }
    +            line.append(subline);
    +        }
    +        // }
    --- End diff --
    
    commented `synchronized` can be removed if it is not needed.  Looks like it 
may have come from other code that already had it commented out.


> Pluggable serialization for multilang
> -------------------------------------
>
>                 Key: STORM-138
>                 URL: https://issues.apache.org/jira/browse/STORM-138
>             Project: Apache Storm (Incubating)
>          Issue Type: New Feature
>            Reporter: James Xu
>            Priority: Minor
>
> https://github.com/nathanmarz/storm/issues/373
> Currently JSON is used to serialize tuples for multilang. It would be great 
> if the serialization mechanism were pluggable so that using richer types with 
> multilang would be possible.
> ---------
> francis-liberty: Hello, I am a newbie here, and I wanted to pick up this 
> issue. I also noticed a recent PR here #697 by jsgilmore, is it feasible for 
> this issue, too?
> I looked around the source code, and I would like to talk about my opinions 
> on this issue here.
> For now, ShellProcess only supports JSON to communicate with multilang 
> process: read, write. And, ShellSpout and ShellBolt talk with ShellProcess 
> through JSON, too. This is all because ShellProcess's interface use 
> JSONObject only. Conceptually, ShellProcess should encapsulate the multilang 
> details, and talk with Bolt and Spout using Tuple. (jsgilmore invented two 
> new classes, Immission and Emission. But I think all information Bolt and 
> Spout need is in Tuple already, no need for new data structures.) So, I think 
> it would be much cleaner to do serialization in ShellProcess only, and both 
> ShellSpout and ShellBolt don't know anything about how ShellProcess convert 
> between Tuple and strings.
> So, I suppose I can do the work of
> 1. change the interface of ShellProcess to return and accept Tuple data 
> structure, instead of JSONObject.
> 2. make ShellSpout and ShellBolt work on Tuple, all information like task_id, 
> stream_id and tuples should be retrieve/encapsulate in this data structure.
> 3. what other serialization format would you like to add? I think in the end 
> we need to add some example other than JSON to storm-starter storm.py/rb, 
> which I would also like to work on.
> ----------
> jsgilmore: Hi, all serialisation is done in the JSONSerialiser, so no 
> serialisation is done in ShellBolt, ShellProcess or ShellSpout. They just 
> send around the Emission and Immission classes. The point of the ISerializer 
> interface is to achieve the separation of serialisation.
> I come from the multilang side of Storm, so I'm not that familiar with the 
> internal Storm structures. If there is a class that the ISerializer interface 
> can use, instead of the Emission and Immission classes, I'm open to it.
> I would recommend that further discussion of PR #697 rather happen in the PR 
> thread itself though.
> I created an issue to add protocol buffer serialisation for multilang to 
> Storm in issue #654 , but I didn't see this issue. The whole purpose of PR 
> #697 is to solve this issue.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to