integrate flux with Storm build
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2094a08e Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2094a08e Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2094a08e Branch: refs/heads/master Commit: 2094a08ea6ccdf81126c103930d9ec3e77fcd5c1 Parents: b21a98d Author: P. Taylor Goetz <[email protected]> Authored: Wed May 6 14:13:13 2015 -0400 Committer: P. Taylor Goetz <[email protected]> Committed: Wed May 6 14:13:13 2015 -0400 ---------------------------------------------------------------------- external/flux/README.md | 15 +- external/flux/flux-core/pom.xml | 13 +- external/flux/flux-examples/pom.xml | 13 +- external/flux/flux-wrappers/pom.xml | 24 +- .../main/resources/resources/randomsentence.js | 93 ----- .../main/resources/resources/splitsentence.py | 24 -- .../src/main/resources/resources/storm.js | 373 ------------------- .../src/main/resources/resources/storm.py | 260 ------------- external/flux/pom.xml | 19 +- pom.xml | 1 + 10 files changed, 41 insertions(+), 794 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/2094a08e/external/flux/README.md ---------------------------------------------------------------------- diff --git a/external/flux/README.md b/external/flux/README.md index 6f27219..d09a73c 100644 --- a/external/flux/README.md +++ b/external/flux/README.md @@ -829,17 +829,6 @@ topologySource: methodName: "getTopologyWithDifferentMethodName" ``` -## Author -P. Taylor Goetz +## Committer Sponsors -## Contributors - - -## Contributing - -Contributions in any form are more than welcome. - -The intent of this project is that it will be donated to Apache Storm. - -By offering any contributions to this project, you should be willing and able to submit an -[Apache ICLA](http://www.apache.org/licenses/icla.txt), if you have not done so already. \ No newline at end of file + * P. Taylor Goetz ([[email protected]](mailto:[email protected])) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/2094a08e/external/flux/flux-core/pom.xml ---------------------------------------------------------------------- diff --git a/external/flux/flux-core/pom.xml b/external/flux/flux-core/pom.xml index 600613d..c3842bd 100644 --- a/external/flux/flux-core/pom.xml +++ b/external/flux/flux-core/pom.xml @@ -19,13 +19,12 @@ <modelVersion>4.0.0</modelVersion> <parent> - <groupId>com.github.ptgoetz</groupId> + <groupId>org.apache.storm</groupId> <artifactId>flux</artifactId> - <version>0.3.1-SNAPSHOT</version> + <version>0.11.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> - <groupId>com.github.ptgoetz</groupId> <artifactId>flux-core</artifactId> <packaging>jar</packaging> @@ -34,26 +33,26 @@ <dependencies> <dependency> - <groupId>com.github.ptgoetz</groupId> + <groupId>org.apache.storm</groupId> <artifactId>flux-wrappers</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> - <version>${storm.version}</version> + <version>${project.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-hdfs</artifactId> - <version>${storm.version}</version> + <version>${project.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-hbase</artifactId> - <version>${storm.version}</version> + <version>${project.version}</version> <scope>test</scope> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/storm/blob/2094a08e/external/flux/flux-examples/pom.xml ---------------------------------------------------------------------- diff --git a/external/flux/flux-examples/pom.xml b/external/flux/flux-examples/pom.xml index 0b9796e..709b20b 100644 --- a/external/flux/flux-examples/pom.xml +++ b/external/flux/flux-examples/pom.xml @@ -19,13 +19,12 @@ <modelVersion>4.0.0</modelVersion> <parent> - <groupId>com.github.ptgoetz</groupId> + <groupId>org.apache.storm</groupId> <artifactId>flux</artifactId> - <version>0.3.1-SNAPSHOT</version> + <version>0.11.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> - <groupId>com.github.ptgoetz</groupId> <artifactId>flux-examples</artifactId> <packaging>jar</packaging> @@ -34,12 +33,12 @@ <dependencies> <dependency> - <groupId>com.github.ptgoetz</groupId> + <groupId>org.apache.storm</groupId> <artifactId>flux-core</artifactId> <version>${project.version}</version> </dependency> <dependency> - <groupId>com.github.ptgoetz</groupId> + <groupId>org.apache.storm</groupId> <artifactId>flux-wrappers</artifactId> <version>${project.version}</version> </dependency> @@ -47,12 +46,12 @@ <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-hdfs</artifactId> - <version>${storm.version}</version> + <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-hbase</artifactId> - <version>${storm.version}</version> + <version>${project.version}</version> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/storm/blob/2094a08e/external/flux/flux-wrappers/pom.xml ---------------------------------------------------------------------- diff --git a/external/flux/flux-wrappers/pom.xml b/external/flux/flux-wrappers/pom.xml index 6784141..be042ff 100644 --- a/external/flux/flux-wrappers/pom.xml +++ b/external/flux/flux-wrappers/pom.xml @@ -19,17 +19,33 @@ <modelVersion>4.0.0</modelVersion> <parent> - <groupId>com.github.ptgoetz</groupId> + <groupId>org.apache.storm</groupId> <artifactId>flux</artifactId> - <version>0.3.1-SNAPSHOT</version> + <version>0.11.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> - <groupId>com.github.ptgoetz</groupId> <artifactId>flux-wrappers</artifactId> <packaging>jar</packaging> <name>flux-wrappers</name> - <url>https://github.com/ptgoetz/flux</url> + + <dependencies> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>multilang-javascript</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>multilang-ruby</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>multilang-python</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/storm/blob/2094a08e/external/flux/flux-wrappers/src/main/resources/resources/randomsentence.js ---------------------------------------------------------------------- diff --git a/external/flux/flux-wrappers/src/main/resources/resources/randomsentence.js b/external/flux/flux-wrappers/src/main/resources/resources/randomsentence.js deleted file mode 100644 index 36fc5f5..0000000 --- a/external/flux/flux-wrappers/src/main/resources/resources/randomsentence.js +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Example for storm spout. Emits random sentences. - * The original class in java - storm.starter.spout.RandomSentenceSpout. - * - */ - -var storm = require('./storm'); -var Spout = storm.Spout; - - -var SENTENCES = [ - "the cow jumped over the moon", - "an apple a day keeps the doctor away", - "four score and seven years ago", - "snow white and the seven dwarfs", - "i am at two with nature"] - -function RandomSentenceSpout(sentences) { - Spout.call(this); - this.runningTupleId = 0; - this.sentences = sentences; - this.pending = {}; -}; - -RandomSentenceSpout.prototype = Object.create(Spout.prototype); -RandomSentenceSpout.prototype.constructor = RandomSentenceSpout; - -RandomSentenceSpout.prototype.getRandomSentence = function() { - return this.sentences[getRandomInt(0, this.sentences.length - 1)]; -} - -RandomSentenceSpout.prototype.nextTuple = function(done) { - var self = this; - var sentence = this.getRandomSentence(); - var tup = [sentence]; - var id = this.createNextTupleId(); - this.pending[id] = tup; - //This timeout can be removed if TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS is configured to 100 - setTimeout(function() { - self.emit({tuple: tup, id: id}, function(taskIds) { - self.log(tup + ' sent to task ids - ' + taskIds); - }); - done(); - },100); -} - -RandomSentenceSpout.prototype.createNextTupleId = function() { - var id = this.runningTupleId; - this.runningTupleId++; - return id; -} - -RandomSentenceSpout.prototype.ack = function(id, done) { - this.log('Received ack for - ' + id); - delete this.pending[id]; - done(); -} - -RandomSentenceSpout.prototype.fail = function(id, done) { - var self = this; - this.log('Received fail for - ' + id + '. Retrying.'); - this.emit({tuple: this.pending[id], id:id}, function(taskIds) { - self.log(self.pending[id] + ' sent to task ids - ' + taskIds); - }); - done(); -} - -/** - * Returns a random integer between min (inclusive) and max (inclusive) - */ -function getRandomInt(min, max) { - return Math.floor(Math.random() * (max - min + 1)) + min; -} - -new RandomSentenceSpout(SENTENCES).run(); http://git-wip-us.apache.org/repos/asf/storm/blob/2094a08e/external/flux/flux-wrappers/src/main/resources/resources/splitsentence.py ---------------------------------------------------------------------- diff --git a/external/flux/flux-wrappers/src/main/resources/resources/splitsentence.py b/external/flux/flux-wrappers/src/main/resources/resources/splitsentence.py deleted file mode 100644 index 300105f..0000000 --- a/external/flux/flux-wrappers/src/main/resources/resources/splitsentence.py +++ /dev/null @@ -1,24 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import storm - -class SplitSentenceBolt(storm.BasicBolt): - def process(self, tup): - words = tup.values[0].split(" ") - for word in words: - storm.emit([word]) - -SplitSentenceBolt().run() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/2094a08e/external/flux/flux-wrappers/src/main/resources/resources/storm.js ---------------------------------------------------------------------- diff --git a/external/flux/flux-wrappers/src/main/resources/resources/storm.js b/external/flux/flux-wrappers/src/main/resources/resources/storm.js deleted file mode 100755 index 355c2d2..0000000 --- a/external/flux/flux-wrappers/src/main/resources/resources/storm.js +++ /dev/null @@ -1,373 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Base classes in node-js for storm Bolt and Spout. - * Implements the storm multilang protocol for nodejs. - */ - - -var fs = require('fs'); - -function Storm() { - this.messagePart = ""; - this.taskIdsCallbacks = []; - this.isFirstMessage = true; - this.separator = '\nend\n'; -} - -Storm.prototype.sendMsgToParent = function(msg) { - var str = JSON.stringify(msg); - process.stdout.write(str + this.separator); -} - -Storm.prototype.sync = function() { - this.sendMsgToParent({"command":"sync"}); -} - -Storm.prototype.sendPid = function(heartbeatdir) { - var pid = process.pid; - fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w")); - this.sendMsgToParent({"pid": pid}) -} - -Storm.prototype.log = function(msg) { - this.sendMsgToParent({"command": "log", "msg": msg}); -} - -Storm.prototype.initSetupInfo = function(setupInfo) { - var self = this; - var callback = function() { - self.sendPid(setupInfo['pidDir']); - } - this.initialize(setupInfo['conf'], setupInfo['context'], callback); -} - -Storm.prototype.startReadingInput = function() { - var self = this; - process.stdin.on('readable', function() { - var chunk = process.stdin.read(); - var messages = self.handleNewChunk(chunk); - messages.forEach(function(message) { - self.handleNewMessage(message); - }) - - }); -} - -/** - * receives a new string chunk and returns a list of new messages with the separator removed - * stores state in this.messagePart - * @param chunk - */ -Storm.prototype.handleNewChunk = function(chunk) { - //invariant: this.messagePart has no separator otherwise we would have parsed it already - var messages = []; - if (chunk && chunk.length !== 0) { - //"{}".split("\nend\n") ==> ['{}'] - //"\nend\n".split("\nend\n") ==> ['' , ''] - //"{}\nend\n".split("\nend\n") ==> ['{}', ''] - //"\nend\n{}".split("\nend\n") ==> ['' , '{}'] - // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ] - this.messagePart = this.messagePart + chunk; - var newMessageParts = this.messagePart.split(this.separator); - while (newMessageParts.length > 0) { - var potentialMessage = newMessageParts.shift(); - var anotherMessageAhead = newMessageParts.length > 0; - if (!anotherMessageAhead) { - this.messagePart = potentialMessage; - } - else if (potentialMessage.length > 0) { - messages.push(potentialMessage); - } - } - } - return messages; -} - -Storm.prototype.isTaskIds = function(msg) { - return (msg instanceof Array); -} - -Storm.prototype.handleNewMessage = function(msg) { - var parsedMsg = JSON.parse(msg); - - if (this.isFirstMessage) { - this.initSetupInfo(parsedMsg); - this.isFirstMessage = false; - } else if (this.isTaskIds(parsedMsg)) { - this.handleNewTaskId(parsedMsg); - } else { - this.handleNewCommand(parsedMsg); - } -} - -Storm.prototype.handleNewTaskId = function(taskIds) { - //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called. - //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply - //take the first callback in the list and be sure it is the right one. - - var callback = this.taskIdsCallbacks.shift(); - if (callback) { - callback(taskIds); - } else { - throw new Error('Something went wrong, we off the split of task id callbacks'); - } -} - - - -/** - * - * @param messageDetails json with the emit details. - * - * For bolt, the json must contain the required fields: - * - tuple - the value to emit - * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source - * tuple and return ack when all components successfully finished to process it. - * and may contain the optional fields: - * - stream (if empty - emit to default stream) - * - * For spout, the json must contain the required fields: - * - tuple - the value to emit - * - * and may contain the optional fields: - * - id - pass id for reliable emit (and receive ack/fail later). - * - stream - if empty - emit to default stream. - * - * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received). - */ -Storm.prototype.emit = function(messageDetails, onTaskIds) { - //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible - //through the callback (will be called when the response arrives). The callback is stored in a list until the - //corresponding task id list arrives. - if (messageDetails.task) { - throw new Error('Illegal input - task. To emit to specific task use emit direct!'); - } - - if (!onTaskIds) { - throw new Error('You must pass a onTaskIds callback when using emit!') - } - - this.taskIdsCallbacks.push(onTaskIds); - this.__emit(messageDetails);; -} - - -/** - * Emit message to specific task. - * @param messageDetails json with the emit details. - * - * For bolt, the json must contain the required fields: - * - tuple - the value to emit - * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source - * tuple and return ack when all components successfully finished to process it. - * - task - indicate the task to send the tuple to. - * and may contain the optional fields: - * - stream (if empty - emit to default stream) - * - * For spout, the json must contain the required fields: - * - tuple - the value to emit - * - task - indicate the task to send the tuple to. - * and may contain the optional fields: - * - id - pass id for reliable emit (and receive ack/fail later). - * - stream - if empty - emit to default stream. - * - * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received). - */ -Storm.prototype.emitDirect = function(commandDetails) { - if (!commandDetails.task) { - throw new Error("Emit direct must receive task id!") - } - this.__emit(commandDetails); -} - -/** - * Initialize storm component according to the configuration received. - * @param conf configuration object accrding to storm protocol. - * @param context context object according to storm protocol. - * @param done callback. Call this method when finished initializing. - */ -Storm.prototype.initialize = function(conf, context, done) { - done(); -} - -Storm.prototype.run = function() { - process.stdout.setEncoding('utf8'); - process.stdin.setEncoding('utf8'); - this.startReadingInput(); -} - -function Tuple(id, component, stream, task, values) { - this.id = id; - this.component = component; - this.stream = stream; - this.task = task; - this.values = values; -} - -/** - * Base class for storm bolt. - * To create a bolt implement 'process' method. - * You may also implement initialize method to - */ -function BasicBolt() { - Storm.call(this); - this.anchorTuple = null; -}; - -BasicBolt.prototype = Object.create(Storm.prototype); -BasicBolt.prototype.constructor = BasicBolt; - -/** - * Emit message. - * @param commandDetails json with the required fields: - * - tuple - the value to emit - * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source - * tuple and return ack when all components successfully finished to process it. - * and the optional fields: - * - stream (if empty - emit to default stream) - * - task (pass only to emit to specific task) - */ -BasicBolt.prototype.__emit = function(commandDetails) { - var self = this; - - var message = { - command: "emit", - tuple: commandDetails.tuple, - stream: commandDetails.stream, - task: commandDetails.task, - anchors: [commandDetails.anchorTupleId] - }; - - this.sendMsgToParent(message); -} - -BasicBolt.prototype.handleNewCommand = function(command) { - var self = this; - var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]); - - if (tup.task === -1 && tup.stream === "__heartbeat") { - self.sync(); - return; - } - - var callback = function(err) { - if (err) { - self.fail(tup, err); - return; - } - self.ack(tup); - } - this.process(tup, callback); -} - -/** - * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what - * should it do?). - * @param tuple the input of the bolt - what to process. - * @param done call this method when done processing. - */ -BasicBolt.prototype.process = function(tuple, done) {}; - -BasicBolt.prototype.ack = function(tup) { - this.sendMsgToParent({"command": "ack", "id": tup.id}); -} - -BasicBolt.prototype.fail = function(tup, err) { - this.sendMsgToParent({"command": "fail", "id": tup.id}); -} - - -/** - * Base class for storm spout. - * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail - * can stay empty). - * You may also implement initialize method. - * - */ -function Spout() { - Storm.call(this); -}; - -Spout.prototype = Object.create(Storm.prototype); - -Spout.prototype.constructor = Spout; - -/** - * This method will be called when an ack is received for preciously sent tuple. One may implement it. - * @param id The id of the tuple. - * @param done Call this method when finished and ready to receive more tuples. - */ -Spout.prototype.ack = function(id, done) {}; - -/** - * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example - - * log the failure or send the tuple again). - * @param id The id of the tuple. - * @param done Call this method when finished and ready to receive more tuples. - */ -Spout.prototype.fail = function(id, done) {}; - -/** - * Method the indicates its time to emit the next tuple. - * @param done call this method when done sending the output. - */ -Spout.prototype.nextTuple = function(done) {}; - -Spout.prototype.handleNewCommand = function(command) { - var self = this; - var callback = function() { - self.sync(); - } - - if (command["command"] === "next") { - this.nextTuple(callback); - } - - if (command["command"] === "ack") { - this.ack(command["id"], callback); - } - - if (command["command"] === "fail") { - this.fail(command["id"], callback); - } -} - -/** - * @param commandDetails json with the required fields: - * - tuple - the value to emit. - * and the optional fields: - * - id - pass id for reliable emit (and receive ack/fail later). - * - stream - if empty - emit to default stream. - * - task - pass only to emit to specific task. - */ -Spout.prototype.__emit = function(commandDetails) { - var message = { - command: "emit", - tuple: commandDetails.tuple, - id: commandDetails.id, - stream: commandDetails.stream, - task: commandDetails.task - }; - - this.sendMsgToParent(message); -} - -module.exports.BasicBolt = BasicBolt; -module.exports.Spout = Spout; http://git-wip-us.apache.org/repos/asf/storm/blob/2094a08e/external/flux/flux-wrappers/src/main/resources/resources/storm.py ---------------------------------------------------------------------- diff --git a/external/flux/flux-wrappers/src/main/resources/resources/storm.py b/external/flux/flux-wrappers/src/main/resources/resources/storm.py deleted file mode 100644 index 642c393..0000000 --- a/external/flux/flux-wrappers/src/main/resources/resources/storm.py +++ /dev/null @@ -1,260 +0,0 @@ -# -*- coding: utf-8 -*- - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import sys -import os -import traceback -from collections import deque - -try: - import simplejson as json -except ImportError: - import json - -json_encode = lambda x: json.dumps(x) -json_decode = lambda x: json.loads(x) - -#reads lines and reconstructs newlines appropriately -def readMsg(): - msg = "" - while True: - line = sys.stdin.readline() - if not line: - raise Exception('Read EOF from stdin') - if line[0:-1] == "end": - break - msg = msg + line - return json_decode(msg[0:-1]) - -MODE = None -ANCHOR_TUPLE = None - -#queue up commands we read while trying to read taskids -pending_commands = deque() - -def readTaskIds(): - if pending_taskids: - return pending_taskids.popleft() - else: - msg = readMsg() - while type(msg) is not list: - pending_commands.append(msg) - msg = readMsg() - return msg - -#queue up taskids we read while trying to read commands/tuples -pending_taskids = deque() - -def readCommand(): - if pending_commands: - return pending_commands.popleft() - else: - msg = readMsg() - while type(msg) is list: - pending_taskids.append(msg) - msg = readMsg() - return msg - -def readTuple(): - cmd = readCommand() - return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"]) - -def sendMsgToParent(msg): - print json_encode(msg) - print "end" - sys.stdout.flush() - -def sync(): - sendMsgToParent({'command':'sync'}) - -def sendpid(heartbeatdir): - pid = os.getpid() - sendMsgToParent({'pid':pid}) - open(heartbeatdir + "/" + str(pid), "w").close() - -def emit(*args, **kwargs): - __emit(*args, **kwargs) - return readTaskIds() - -def emitDirect(task, *args, **kwargs): - kwargs["directTask"] = task - __emit(*args, **kwargs) - -def __emit(*args, **kwargs): - global MODE - if MODE == Bolt: - emitBolt(*args, **kwargs) - elif MODE == Spout: - emitSpout(*args, **kwargs) - -def emitBolt(tup, stream=None, anchors = [], directTask=None): - global ANCHOR_TUPLE - if ANCHOR_TUPLE is not None: - anchors = [ANCHOR_TUPLE] - m = {"command": "emit"} - if stream is not None: - m["stream"] = stream - m["anchors"] = map(lambda a: a.id, anchors) - if directTask is not None: - m["task"] = directTask - m["tuple"] = tup - sendMsgToParent(m) - -def emitSpout(tup, stream=None, id=None, directTask=None): - m = {"command": "emit"} - if id is not None: - m["id"] = id - if stream is not None: - m["stream"] = stream - if directTask is not None: - m["task"] = directTask - m["tuple"] = tup - sendMsgToParent(m) - -def ack(tup): - sendMsgToParent({"command": "ack", "id": tup.id}) - -def fail(tup): - sendMsgToParent({"command": "fail", "id": tup.id}) - -def reportError(msg): - sendMsgToParent({"command": "error", "msg": msg}) - -def log(msg, level=2): - sendMsgToParent({"command": "log", "msg": msg, "level":level}) - -def logTrace(msg): - log(msg, 0) - -def logDebug(msg): - log(msg, 1) - -def logInfo(msg): - log(msg, 2) - -def logWarn(msg): - log(msg, 3) - -def logError(msg): - log(msg, 4) - -def rpcMetrics(name, params): - sendMsgToParent({"command": "metrics", "name": name, "params": params}) - -def initComponent(): - setupInfo = readMsg() - sendpid(setupInfo['pidDir']) - return [setupInfo['conf'], setupInfo['context']] - -class Tuple(object): - def __init__(self, id, component, stream, task, values): - self.id = id - self.component = component - self.stream = stream - self.task = task - self.values = values - - def __repr__(self): - return '<%s%s>' % ( - self.__class__.__name__, - ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys()))) - - def is_heartbeat_tuple(self): - return self.task == -1 and self.stream == "__heartbeat" - -class Bolt(object): - def initialize(self, stormconf, context): - pass - - def process(self, tuple): - pass - - def run(self): - global MODE - MODE = Bolt - conf, context = initComponent() - try: - self.initialize(conf, context) - while True: - tup = readTuple() - if tup.is_heartbeat_tuple(): - sync() - else: - self.process(tup) - except Exception, e: - reportError(traceback.format_exc(e)) - -class BasicBolt(object): - def initialize(self, stormconf, context): - pass - - def process(self, tuple): - pass - - def run(self): - global MODE - MODE = Bolt - global ANCHOR_TUPLE - conf, context = initComponent() - try: - self.initialize(conf, context) - while True: - tup = readTuple() - if tup.is_heartbeat_tuple(): - sync() - else: - ANCHOR_TUPLE = tup - try: - self.process(tup) - ack(tup) - except Exception, e: - reportError(traceback.format_exc(e)) - fail(tup) - except Exception, e: - reportError(traceback.format_exc(e)) - -class Spout(object): - def initialize(self, conf, context): - pass - - def ack(self, id): - pass - - def fail(self, id): - pass - - def nextTuple(self): - pass - - def run(self): - global MODE - MODE = Spout - conf, context = initComponent() - try: - self.initialize(conf, context) - while True: - msg = readCommand() - if msg["command"] == "next": - self.nextTuple() - if msg["command"] == "ack": - self.ack(msg["id"]) - if msg["command"] == "fail": - self.fail(msg["id"]) - sync() - except Exception, e: - reportError(traceback.format_exc(e)) http://git-wip-us.apache.org/repos/asf/storm/blob/2094a08e/external/flux/pom.xml ---------------------------------------------------------------------- diff --git a/external/flux/pom.xml b/external/flux/pom.xml index 5ea1b40..bf975cb 100644 --- a/external/flux/pom.xml +++ b/external/flux/pom.xml @@ -18,23 +18,17 @@ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - <groupId>com.github.ptgoetz</groupId> <artifactId>flux</artifactId> - <version>0.3.1-SNAPSHOT</version> <packaging>pom</packaging> <name>flux</name> - <url>https://github.com/ptgoetz/flux</url> + <parent> - <groupId>org.sonatype.oss</groupId> - <artifactId>oss-parent</artifactId> - <version>7</version> + <artifactId>storm</artifactId> + <groupId>org.apache.storm</groupId> + <version>0.11.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> </parent> - <scm> - <connection>scm:git:[email protected]:ptgoetz/flux.git</connection> - <developerConnection>scm:git:[email protected]:ptgoetz/flux.git</developerConnection> - <url>:[email protected]:ptgoetz/flux.git</url> - </scm> <developers> <developer> @@ -46,7 +40,6 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <storm.version>0.9.3</storm.version> <!-- see comment below... This fixes an annoyance with intellij --> <provided.scope>provided</provided.scope> </properties> @@ -75,7 +68,7 @@ <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> - <version>${storm.version}</version> + <version>${project.version}</version> <scope>${provided.scope}</scope> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/storm/blob/2094a08e/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 2e0c898..02417b8 100644 --- a/pom.xml +++ b/pom.xml @@ -169,6 +169,7 @@ <module>external/storm-jdbc</module> <module>external/storm-redis</module> <module>external/storm-eventhubs</module> + <module>external/flux</module> </modules> <scm>
