Repository: storm Updated Branches: refs/heads/master 42a6dbd0f -> 7ab4f2418
Moved storm.js to multilang folder. Added link from dev/resources folder Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3b609c21 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3b609c21 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3b609c21 Branch: refs/heads/master Commit: 3b609c21fee42951b6260d5344b7fc88fa2d96ab Parents: a9fb9c5 Author: Itai Frenkel <[email protected]> Authored: Tue Sep 23 17:30:53 2014 +0300 Committer: Itai Frenkel <[email protected]> Committed: Tue Sep 23 18:33:31 2014 +0300 ---------------------------------------------------------------------- storm-core/src/dev/resources/storm.js | 350 +---------------------------- storm-core/src/multilang/js/storm.js | 349 ++++++++++++++++++++++++++++ 2 files changed, 350 insertions(+), 349 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/3b609c21/storm-core/src/dev/resources/storm.js ---------------------------------------------------------------------- diff --git a/storm-core/src/dev/resources/storm.js b/storm-core/src/dev/resources/storm.js deleted file mode 100755 index 5c78072..0000000 --- a/storm-core/src/dev/resources/storm.js +++ /dev/null @@ -1,349 +0,0 @@ -/** - * Base classes in node-js for storm Bolt and Spout. - * Implements the storm multilang protocol for nodejs. - */ - - -var fs = require('fs'); - -function Storm() { - this.messagePart = ""; - this.taskIdsCallbacks = []; - this.isFirstMessage = true; - this.separator = '\nend\n'; -} - -Storm.prototype.sendMsgToParent = function(msg) { - var str = JSON.stringify(msg); - process.stdout.write(str + this.separator); -} - -Storm.prototype.sync = function() { - this.sendMsgToParent({"command":"sync"}); -} - -Storm.prototype.sendPid = function(heartbeatdir) { - var pid = process.pid; - fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w")); - this.sendMsgToParent({"pid": pid}) -} - -Storm.prototype.log = function(msg) { - this.sendMsgToParent({"command": "log", "msg": msg}); -} - -Storm.prototype.initSetupInfo = function(setupInfo) { - var self = this; - var callback = function() { - self.sendPid(setupInfo['pidDir']); - } - this.initialize(setupInfo['conf'], setupInfo['context'], callback); -} - -Storm.prototype.startReadingInput = function() { - var self = this; - process.stdin.on('readable', function() { - var chunk = process.stdin.read(); - var messages = self.handleNewChunk(chunk); - messages.forEach(function(message) { - self.handleNewMessage(message); - }) - - }); -} - -/** - * receives a new string chunk and returns a list of new messages with the separator removed - * stores state in this.messagePart - * @param chunk - */ -Storm.prototype.handleNewChunk = function(chunk) { - //invariant: this.messagePart has no separator otherwise we would have parsed it already - var messages = []; - if (chunk && chunk.length !== 0) { - //"{}".split("\nend\n") ==> ['{}'] - //"\nend\n".split("\nend\n") ==> ['' , ''] - //"{}\nend\n".split("\nend\n") ==> ['{}', ''] - //"\nend\n{}".split("\nend\n") ==> ['' , '{}'] - // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ] - this.messagePart = this.messagePart + chunk; - var newMessageParts = this.messagePart.split(this.separator); - while (newMessageParts.length > 0) { - var potentialMessage = newMessageParts.shift(); - var anotherMessageAhead = newMessageParts.length > 0; - if (!anotherMessageAhead) { - this.messagePart = potentialMessage; - } - else if (potentialMessage.length > 0) { - messages.push(potentialMessage); - } - } - } - return messages; - } - -Storm.prototype.isTaskIds = function(msg) { - return (msg instanceof Array); -} - -Storm.prototype.handleNewMessage = function(msg) { - var parsedMsg = JSON.parse(msg); - - if (this.isFirstMessage) { - this.initSetupInfo(parsedMsg); - this.isFirstMessage = false; - } else if (this.isTaskIds(parsedMsg)) { - this.handleNewTaskId(parsedMsg); - } else { - this.handleNewCommand(parsedMsg); - } -} - -Storm.prototype.handleNewTaskId = function(taskIds) { - //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called. - //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply - //take the first callback in the list and be sure it is the right one. - - var callback = this.taskIdsCallbacks.shift(); - if (callback) { - callback(taskIds); - } else { - throw new Error('Something went wrong, we off the split of task id callbacks'); - } -} - - - -/** - * - * @param messageDetails json with the emit details. - * - * For bolt, the json must contain the required fields: - * - tuple - the value to emit - * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source - * tuple and return ack when all components successfully finished to process it. - * and may contain the optional fields: - * - stream (if empty - emit to default stream) - * - * For spout, the json must contain the required fields: - * - tuple - the value to emit - * - * and may contain the optional fields: - * - id - pass id for reliable emit (and receive ack/fail later). - * - stream - if empty - emit to default stream. - * - * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received). - */ -Storm.prototype.emit = function(messageDetails, onTaskIds) { - //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible - //through the callback (will be called when the response arrives). The callback is stored in a list until the - //corresponding task id list arrives. - if (messageDetails.task) { - throw new Error('Illegal input - task. To emit to specific task use emit direct!'); - } - - if (!onTaskIds) { - throw new Error('You must pass a onTaskIds callback when using emit!') - } - - this.taskIdsCallbacks.push(onTaskIds); - this.__emit(messageDetails);; -} - - -/** - * Emit message to specific task. - * @param messageDetails json with the emit details. - * - * For bolt, the json must contain the required fields: - * - tuple - the value to emit - * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source - * tuple and return ack when all components successfully finished to process it. - * - task - indicate the task to send the tuple to. - * and may contain the optional fields: - * - stream (if empty - emit to default stream) - * - * For spout, the json must contain the required fields: - * - tuple - the value to emit - * - task - indicate the task to send the tuple to. - * and may contain the optional fields: - * - id - pass id for reliable emit (and receive ack/fail later). - * - stream - if empty - emit to default stream. - * - * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received). - */ -Storm.prototype.emitDirect = function(commandDetails) { - if (!commandDetails.task) { - throw new Error("Emit direct must receive task id!") - } - this.__emit(commandDetails); -} - -/** - * Initialize storm component according to the configuration received. - * @param conf configuration object accrding to storm protocol. - * @param context context object according to storm protocol. - * @param done callback. Call this method when finished initializing. - */ -Storm.prototype.initialize = function(conf, context, done) { - done(); -} - -Storm.prototype.run = function() { - process.stdout.setEncoding('utf8'); - process.stdin.setEncoding('utf8'); - this.startReadingInput(); -} - -function Tuple(id, component, stream, task, values) { - this.id = id; - this.component = component; - this.stream = stream; - this.task = task; - this.values = values; -} - -/** - * Base class for storm bolt. - * To create a bolt implement 'process' method. - * You may also implement initialize method to - */ -function BasicBolt() { - Storm.call(this); - this.anchorTuple = null; -}; - -BasicBolt.prototype = Object.create(Storm.prototype); -BasicBolt.prototype.constructor = BasicBolt; - -/** - * Emit message. - * @param commandDetails json with the required fields: - * - tuple - the value to emit - * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source - * tuple and return ack when all components successfully finished to process it. - * and the optional fields: - * - stream (if empty - emit to default stream) - * - task (pass only to emit to specific task) - */ -BasicBolt.prototype.__emit = function(commandDetails) { - var self = this; - - var message = { - command: "emit", - tuple: commandDetails.tuple, - stream: commandDetails.stream, - task: commandDetails.task, - anchors: [commandDetails.anchorTupleId] - }; - - this.sendMsgToParent(message); -} - -BasicBolt.prototype.handleNewCommand = function(command) { - var self = this; - var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]); - var callback = function(err) { - if (err) { - self.fail(tup, err); - return; - } - self.ack(tup); - } - this.process(tup, callback); -} - -/** - * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what - * should it do?). - * @param tuple the input of the bolt - what to process. - * @param done call this method when done processing. - */ -BasicBolt.prototype.process = function(tuple, done) {}; - -BasicBolt.prototype.ack = function(tup) { - this.sendMsgToParent({"command": "ack", "id": tup.id}); -} - -BasicBolt.prototype.fail = function(tup, err) { - this.sendMsgToParent({"command": "fail", "id": tup.id}); -} - - -/** - * Base class for storm spout. - * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail - * can stay empty). - * You may also implement initialize method. - * - */ -function Spout() { - Storm.call(this); -}; - -Spout.prototype = Object.create(Storm.prototype); - -Spout.prototype.constructor = Spout; - -/** - * This method will be called when an ack is received for preciously sent tuple. One may implement it. - * @param id The id of the tuple. - * @param done Call this method when finished and ready to receive more tuples. - */ -Spout.prototype.ack = function(id, done) {}; - -/** - * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example - - * log the failure or send the tuple again). - * @param id The id of the tuple. - * @param done Call this method when finished and ready to receive more tuples. - */ -Spout.prototype.fail = function(id, done) {}; - -/** - * Method the indicates its time to emit the next tuple. - * @param done call this method when done sending the output. - */ -Spout.prototype.nextTuple = function(done) {}; - -Spout.prototype.handleNewCommand = function(command) { - var self = this; - var callback = function() { - self.sync(); - } - - if (command["command"] === "next") { - this.nextTuple(callback); - } - - if (command["command"] === "ack") { - this.ack(command["id"], callback); - } - - if (command["command"] === "fail") { - this.fail(command["id"], callback); - } -} - -/** - * @param commandDetails json with the required fields: - * - tuple - the value to emit. - * and the optional fields: - * - id - pass id for reliable emit (and receive ack/fail later). - * - stream - if empty - emit to default stream. - * - task - pass only to emit to specific task. - */ -Spout.prototype.__emit = function(commandDetails) { - var message = { - command: "emit", - tuple: commandDetails.tuple, - id: commandDetails.id, - stream: commandDetails.stream, - task: commandDetails.task - }; - - this.sendMsgToParent(message); -} - -module.exports.BasicBolt = BasicBolt; -module.exports.Spout = Spout; diff --git a/storm-core/src/dev/resources/storm.js b/storm-core/src/dev/resources/storm.js new file mode 120000 index 0000000..a5fc98b --- /dev/null +++ b/storm-core/src/dev/resources/storm.js @@ -0,0 +1 @@ +../../multilang/js/storm.js \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/3b609c21/storm-core/src/multilang/js/storm.js ---------------------------------------------------------------------- diff --git a/storm-core/src/multilang/js/storm.js b/storm-core/src/multilang/js/storm.js new file mode 100755 index 0000000..5c78072 --- /dev/null +++ b/storm-core/src/multilang/js/storm.js @@ -0,0 +1,349 @@ +/** + * Base classes in node-js for storm Bolt and Spout. + * Implements the storm multilang protocol for nodejs. + */ + + +var fs = require('fs'); + +function Storm() { + this.messagePart = ""; + this.taskIdsCallbacks = []; + this.isFirstMessage = true; + this.separator = '\nend\n'; +} + +Storm.prototype.sendMsgToParent = function(msg) { + var str = JSON.stringify(msg); + process.stdout.write(str + this.separator); +} + +Storm.prototype.sync = function() { + this.sendMsgToParent({"command":"sync"}); +} + +Storm.prototype.sendPid = function(heartbeatdir) { + var pid = process.pid; + fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w")); + this.sendMsgToParent({"pid": pid}) +} + +Storm.prototype.log = function(msg) { + this.sendMsgToParent({"command": "log", "msg": msg}); +} + +Storm.prototype.initSetupInfo = function(setupInfo) { + var self = this; + var callback = function() { + self.sendPid(setupInfo['pidDir']); + } + this.initialize(setupInfo['conf'], setupInfo['context'], callback); +} + +Storm.prototype.startReadingInput = function() { + var self = this; + process.stdin.on('readable', function() { + var chunk = process.stdin.read(); + var messages = self.handleNewChunk(chunk); + messages.forEach(function(message) { + self.handleNewMessage(message); + }) + + }); +} + +/** + * receives a new string chunk and returns a list of new messages with the separator removed + * stores state in this.messagePart + * @param chunk + */ +Storm.prototype.handleNewChunk = function(chunk) { + //invariant: this.messagePart has no separator otherwise we would have parsed it already + var messages = []; + if (chunk && chunk.length !== 0) { + //"{}".split("\nend\n") ==> ['{}'] + //"\nend\n".split("\nend\n") ==> ['' , ''] + //"{}\nend\n".split("\nend\n") ==> ['{}', ''] + //"\nend\n{}".split("\nend\n") ==> ['' , '{}'] + // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ] + this.messagePart = this.messagePart + chunk; + var newMessageParts = this.messagePart.split(this.separator); + while (newMessageParts.length > 0) { + var potentialMessage = newMessageParts.shift(); + var anotherMessageAhead = newMessageParts.length > 0; + if (!anotherMessageAhead) { + this.messagePart = potentialMessage; + } + else if (potentialMessage.length > 0) { + messages.push(potentialMessage); + } + } + } + return messages; + } + +Storm.prototype.isTaskIds = function(msg) { + return (msg instanceof Array); +} + +Storm.prototype.handleNewMessage = function(msg) { + var parsedMsg = JSON.parse(msg); + + if (this.isFirstMessage) { + this.initSetupInfo(parsedMsg); + this.isFirstMessage = false; + } else if (this.isTaskIds(parsedMsg)) { + this.handleNewTaskId(parsedMsg); + } else { + this.handleNewCommand(parsedMsg); + } +} + +Storm.prototype.handleNewTaskId = function(taskIds) { + //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called. + //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply + //take the first callback in the list and be sure it is the right one. + + var callback = this.taskIdsCallbacks.shift(); + if (callback) { + callback(taskIds); + } else { + throw new Error('Something went wrong, we off the split of task id callbacks'); + } +} + + + +/** + * + * @param messageDetails json with the emit details. + * + * For bolt, the json must contain the required fields: + * - tuple - the value to emit + * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source + * tuple and return ack when all components successfully finished to process it. + * and may contain the optional fields: + * - stream (if empty - emit to default stream) + * + * For spout, the json must contain the required fields: + * - tuple - the value to emit + * + * and may contain the optional fields: + * - id - pass id for reliable emit (and receive ack/fail later). + * - stream - if empty - emit to default stream. + * + * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received). + */ +Storm.prototype.emit = function(messageDetails, onTaskIds) { + //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible + //through the callback (will be called when the response arrives). The callback is stored in a list until the + //corresponding task id list arrives. + if (messageDetails.task) { + throw new Error('Illegal input - task. To emit to specific task use emit direct!'); + } + + if (!onTaskIds) { + throw new Error('You must pass a onTaskIds callback when using emit!') + } + + this.taskIdsCallbacks.push(onTaskIds); + this.__emit(messageDetails);; +} + + +/** + * Emit message to specific task. + * @param messageDetails json with the emit details. + * + * For bolt, the json must contain the required fields: + * - tuple - the value to emit + * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source + * tuple and return ack when all components successfully finished to process it. + * - task - indicate the task to send the tuple to. + * and may contain the optional fields: + * - stream (if empty - emit to default stream) + * + * For spout, the json must contain the required fields: + * - tuple - the value to emit + * - task - indicate the task to send the tuple to. + * and may contain the optional fields: + * - id - pass id for reliable emit (and receive ack/fail later). + * - stream - if empty - emit to default stream. + * + * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received). + */ +Storm.prototype.emitDirect = function(commandDetails) { + if (!commandDetails.task) { + throw new Error("Emit direct must receive task id!") + } + this.__emit(commandDetails); +} + +/** + * Initialize storm component according to the configuration received. + * @param conf configuration object accrding to storm protocol. + * @param context context object according to storm protocol. + * @param done callback. Call this method when finished initializing. + */ +Storm.prototype.initialize = function(conf, context, done) { + done(); +} + +Storm.prototype.run = function() { + process.stdout.setEncoding('utf8'); + process.stdin.setEncoding('utf8'); + this.startReadingInput(); +} + +function Tuple(id, component, stream, task, values) { + this.id = id; + this.component = component; + this.stream = stream; + this.task = task; + this.values = values; +} + +/** + * Base class for storm bolt. + * To create a bolt implement 'process' method. + * You may also implement initialize method to + */ +function BasicBolt() { + Storm.call(this); + this.anchorTuple = null; +}; + +BasicBolt.prototype = Object.create(Storm.prototype); +BasicBolt.prototype.constructor = BasicBolt; + +/** + * Emit message. + * @param commandDetails json with the required fields: + * - tuple - the value to emit + * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source + * tuple and return ack when all components successfully finished to process it. + * and the optional fields: + * - stream (if empty - emit to default stream) + * - task (pass only to emit to specific task) + */ +BasicBolt.prototype.__emit = function(commandDetails) { + var self = this; + + var message = { + command: "emit", + tuple: commandDetails.tuple, + stream: commandDetails.stream, + task: commandDetails.task, + anchors: [commandDetails.anchorTupleId] + }; + + this.sendMsgToParent(message); +} + +BasicBolt.prototype.handleNewCommand = function(command) { + var self = this; + var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]); + var callback = function(err) { + if (err) { + self.fail(tup, err); + return; + } + self.ack(tup); + } + this.process(tup, callback); +} + +/** + * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what + * should it do?). + * @param tuple the input of the bolt - what to process. + * @param done call this method when done processing. + */ +BasicBolt.prototype.process = function(tuple, done) {}; + +BasicBolt.prototype.ack = function(tup) { + this.sendMsgToParent({"command": "ack", "id": tup.id}); +} + +BasicBolt.prototype.fail = function(tup, err) { + this.sendMsgToParent({"command": "fail", "id": tup.id}); +} + + +/** + * Base class for storm spout. + * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail + * can stay empty). + * You may also implement initialize method. + * + */ +function Spout() { + Storm.call(this); +}; + +Spout.prototype = Object.create(Storm.prototype); + +Spout.prototype.constructor = Spout; + +/** + * This method will be called when an ack is received for preciously sent tuple. One may implement it. + * @param id The id of the tuple. + * @param done Call this method when finished and ready to receive more tuples. + */ +Spout.prototype.ack = function(id, done) {}; + +/** + * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example - + * log the failure or send the tuple again). + * @param id The id of the tuple. + * @param done Call this method when finished and ready to receive more tuples. + */ +Spout.prototype.fail = function(id, done) {}; + +/** + * Method the indicates its time to emit the next tuple. + * @param done call this method when done sending the output. + */ +Spout.prototype.nextTuple = function(done) {}; + +Spout.prototype.handleNewCommand = function(command) { + var self = this; + var callback = function() { + self.sync(); + } + + if (command["command"] === "next") { + this.nextTuple(callback); + } + + if (command["command"] === "ack") { + this.ack(command["id"], callback); + } + + if (command["command"] === "fail") { + this.fail(command["id"], callback); + } +} + +/** + * @param commandDetails json with the required fields: + * - tuple - the value to emit. + * and the optional fields: + * - id - pass id for reliable emit (and receive ack/fail later). + * - stream - if empty - emit to default stream. + * - task - pass only to emit to specific task. + */ +Spout.prototype.__emit = function(commandDetails) { + var message = { + command: "emit", + tuple: commandDetails.tuple, + id: commandDetails.id, + stream: commandDetails.stream, + task: commandDetails.task + }; + + this.sendMsgToParent(message); +} + +module.exports.BasicBolt = BasicBolt; +module.exports.Spout = Spout;
