[
https://issues.apache.org/jira/browse/STORM-138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13998135#comment-13998135
]
ASF GitHub Bot commented on STORM-138:
--------------------------------------
Github user d2r commented on a diff in the pull request:
https://github.com/apache/incubator-storm/pull/84#discussion_r12665246
--- Diff: storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
---
@@ -0,0 +1,164 @@
+package backtype.storm.multilang;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.Utils;
+
+/**
+ * JsonSerializer implements the JSON multilang protocol.
+ */
+public class JsonSerializer implements ISerializer {
+ private DataOutputStream processIn;
+ private BufferedReader processOut;
+
+ public void initialize(OutputStream processIn, InputStream processOut)
{
+ this.processIn = new DataOutputStream(processIn);
+ this.processOut = new BufferedReader(new
InputStreamReader(processOut));
+ }
+
+ public Number connect(Map conf, TopologyContext context)
+ throws IOException, NoOutputException {
+ JSONObject setupInfo = new JSONObject();
+ setupInfo.put("pidDir", context.getPIDDir());
+ setupInfo.put("conf", conf);
+ setupInfo.put("context", context);
+ writeMessage(setupInfo);
+
+ Number pid = (Number) ((JSONObject) readMessage()).get("pid");
+ return pid;
+ }
+
+ public void writeBoltMsg(BoltMsg boltMsg) throws IOException {
+ JSONObject obj = new JSONObject();
+ obj.put("id", boltMsg.getId());
+ obj.put("comp", boltMsg.getComp());
+ obj.put("stream", boltMsg.getStream());
+ obj.put("task", boltMsg.getTask());
+ obj.put("tuple", boltMsg.getTuple());
+ writeMessage(obj);
+ }
+
+ public void writeSpoutMsg(SpoutMsg msg) throws IOException {
+ JSONObject obj = new JSONObject();
+ obj.put("command", msg.getCommand());
+ obj.put("id", msg.getId());
+ writeMessage(obj);
+ }
+
+ public void writeTaskIds(List<Integer> taskIds) throws IOException {
+ writeMessage(taskIds);
+ }
+
+ private void writeMessage(Object msg) throws IOException {
+ writeString(JSONValue.toJSONString(msg));
+ }
+
+ private void writeString(String str) throws IOException {
+ byte[] strBytes = str.getBytes("UTF-8");
+ processIn.write(strBytes, 0, strBytes.length);
+ processIn.writeBytes("\nend\n");
+ processIn.flush();
+ }
+
+ public ShellMsg readShellMsg() throws IOException, NoOutputException {
+ JSONObject msg = (JSONObject) readMessage();
+ ShellMsg shellMsg = new ShellMsg();
+
+ String command = (String) msg.get("command");
+ shellMsg.setCommand(command);
+
+ Object id = msg.get("id");
+ shellMsg.setId(id);
+
+ String log = (String) msg.get("msg");
+ shellMsg.setMsg(log);
+
+ String stream = (String) msg.get("stream");
+ if (stream == null)
+ stream = Utils.DEFAULT_STREAM_ID;
--- End diff --
Tiny nit: Can we put this in a block?
> Pluggable serialization for multilang
> -------------------------------------
>
> Key: STORM-138
> URL: https://issues.apache.org/jira/browse/STORM-138
> Project: Apache Storm (Incubating)
> Issue Type: New Feature
> Reporter: James Xu
> Priority: Minor
>
> https://github.com/nathanmarz/storm/issues/373
> Currently JSON is used to serialize tuples for multilang. It would be great
> if the serialization mechanism were pluggable so that using richer types with
> multilang would be possible.
> ---------
> francis-liberty: Hello, I am a newbie here, and I wanted to pick up this
> issue. I also noticed a recent PR here #697 by jsgilmore, is it feasible for
> this issue, too?
> I looked around the source code, and I would like to talk about my opinions
> on this issue here.
> For now, ShellProcess only supports JSON to communicate with multilang
> process: read, write. And, ShellSpout and ShellBolt talk with ShellProcess
> through JSON, too. This is all because ShellProcess's interface use
> JSONObject only. Conceptually, ShellProcess should encapsulate the multilang
> details, and talk with Bolt and Spout using Tuple. (jsgilmore invented two
> new classes, Immission and Emission. But I think all information Bolt and
> Spout need is in Tuple already, no need for new data structures.) So, I think
> it would be much cleaner to do serialization in ShellProcess only, and both
> ShellSpout and ShellBolt don't know anything about how ShellProcess convert
> between Tuple and strings.
> So, I suppose I can do the work of
> 1. change the interface of ShellProcess to return and accept Tuple data
> structure, instead of JSONObject.
> 2. make ShellSpout and ShellBolt work on Tuple, all information like task_id,
> stream_id and tuples should be retrieve/encapsulate in this data structure.
> 3. what other serialization format would you like to add? I think in the end
> we need to add some example other than JSON to storm-starter storm.py/rb,
> which I would also like to work on.
> ----------
> jsgilmore: Hi, all serialisation is done in the JSONSerialiser, so no
> serialisation is done in ShellBolt, ShellProcess or ShellSpout. They just
> send around the Emission and Immission classes. The point of the ISerializer
> interface is to achieve the separation of serialisation.
> I come from the multilang side of Storm, so I'm not that familiar with the
> internal Storm structures. If there is a class that the ISerializer interface
> can use, instead of the Emission and Immission classes, I'm open to it.
> I would recommend that further discussion of PR #697 rather happen in the PR
> thread itself though.
> I created an issue to add protocol buffer serialisation for multilang to
> Storm in issue #654 , but I didn't see this issue. The whole purpose of PR
> #697 is to solve this issue.
--
This message was sent by Atlassian JIRA
(v6.2#6252)