Merge branch 'master' of https://github.com/apache/storm into STORM-513
Conflicts: storm-core/src/multilang/py/storm.py * also fixes diverged py files (there're 3 storm.py files but seems to diverged) Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/69ddc67a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/69ddc67a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/69ddc67a Branch: refs/heads/0.9.3-branch Commit: 69ddc67a2b0ca527368ca7a27ac58621c745037b Parents: 12d31b4 7602f43 Author: Jungtaek Lim <kabh...@gmail.com> Authored: Wed Nov 5 06:44:04 2014 +0900 Committer: Jungtaek Lim <kabh...@gmail.com> Committed: Wed Nov 5 06:44:04 2014 +0900 ---------------------------------------------------------------------- CHANGELOG.md | 14 +- DEVELOPER.md | 3 + LICENSE | 20 +- README.markdown | 1 + bin/storm | 1 + bin/storm.cmd | 11 +- .../storm-starter/multilang/resources/storm.py | 8 +- examples/storm-starter/pom.xml | 2 +- external/storm-hbase/pom.xml | 2 +- .../storm/hbase/bolt/HBaseLookupBolt.java | 2 +- .../hbase/bolt/mapper/HBaseValueMapper.java | 4 +- .../storm/hbase/trident/state/HBaseState.java | 6 +- .../hbase/topology/WordCountValueMapper.java | 3 +- external/storm-hdfs/pom.xml | 2 +- external/storm-kafka/README.md | 7 +- external/storm-kafka/pom.xml | 2 +- .../src/jvm/storm/kafka/KafkaUtils.java | 63 ++-- .../src/jvm/storm/kafka/PartitionManager.java | 11 +- .../jvm/storm/kafka/UpdateOffsetException.java | 5 + .../src/test/storm/kafka/KafkaUtilsTest.java | 6 +- logback/cluster.xml | 4 +- pom.xml | 2 +- .../maven-shade-clojure-transformer/pom.xml | 2 +- storm-core/pom.xml | 10 +- .../clj/backtype/storm/daemon/supervisor.clj | 4 +- storm-core/src/clj/backtype/storm/testing.clj | 6 +- storm-core/src/clj/backtype/storm/testing4j.clj | 3 + storm-core/src/clj/backtype/storm/ui/core.clj | 50 +-- storm-core/src/dev/resources/storm.py | 261 +++++++++++++- storm-core/src/dev/resources/storm.rb | 228 +++++++++++- .../backtype/storm/messaging/netty/Server.java | 14 +- .../jvm/backtype/storm/utils/LocalState.java | 30 +- storm-core/src/multilang/js/storm.js | 349 +++++++++++++++++++ storm-core/src/multilang/py/storm.py | 8 +- storm-core/src/ui/public/component.html | 8 +- storm-core/src/ui/public/js/purl.js | 267 -------------- storm-core/src/ui/public/js/script.js | 3 +- storm-core/src/ui/public/js/url.min.js | 1 + storm-core/src/ui/public/js/visualization.js | 2 +- .../templates/component-page-template.html | 6 +- .../public/templates/index-page-template.html | 2 +- .../templates/topology-page-template.html | 14 +- storm-core/src/ui/public/topology.html | 8 +- .../clj/backtype/storm/local_state_test.clj | 14 +- .../test/clj/backtype/storm/supervisor_test.clj | 12 +- storm-dist/binary/LICENSE | 10 +- storm-dist/binary/pom.xml | 2 +- storm-dist/source/pom.xml | 2 +- 48 files changed, 1075 insertions(+), 420 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/69ddc67a/examples/storm-starter/multilang/resources/storm.py ---------------------------------------------------------------------- diff --cc examples/storm-starter/multilang/resources/storm.py index acc02ad,fdf7751..642c393 --- a/examples/storm-starter/multilang/resources/storm.py +++ b/examples/storm-starter/multilang/resources/storm.py @@@ -211,20 -179,17 +211,24 @@@ class BasicBolt(object) MODE = Bolt global ANCHOR_TUPLE conf, context = initComponent() - self.initialize(conf, context) try: + self.initialize(conf, context) while True: tup = readTuple() - ANCHOR_TUPLE = tup - self.process(tup) - ack(tup) + if tup.is_heartbeat_tuple(): + sync() + else: + ANCHOR_TUPLE = tup - self.process(tup) - ack(tup) ++ try: ++ self.process(tup) ++ ack(tup) ++ except Exception, e: ++ reportError(traceback.format_exc(e)) ++ fail(tup) except Exception, e: - log(traceback.format_exc(e)) + reportError(traceback.format_exc(e)) -class Spout: +class Spout(object): def initialize(self, conf, context): pass http://git-wip-us.apache.org/repos/asf/storm/blob/69ddc67a/storm-core/src/dev/resources/storm.py ---------------------------------------------------------------------- diff --cc storm-core/src/dev/resources/storm.py index 5e73111,a2aa976..642c393 mode 120000,100755..100755 --- a/storm-core/src/dev/resources/storm.py +++ b/storm-core/src/dev/resources/storm.py @@@ -1,1 -1,251 +1,260 @@@ - ../../multilang/py/storm.py + # -*- coding: utf-8 -*- + + # Licensed to the Apache Software Foundation (ASF) under one + # or more contributor license agreements. See the NOTICE file + # distributed with this work for additional information + # regarding copyright ownership. The ASF licenses this file + # to you under the Apache License, Version 2.0 (the + # "License"); you may not use this file except in compliance + # with the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + + import sys + import os + import traceback + from collections import deque + + try: + import simplejson as json + except ImportError: + import json + + json_encode = lambda x: json.dumps(x) + json_decode = lambda x: json.loads(x) + + #reads lines and reconstructs newlines appropriately + def readMsg(): + msg = "" + while True: + line = sys.stdin.readline() + if not line: + raise Exception('Read EOF from stdin') + if line[0:-1] == "end": + break + msg = msg + line + return json_decode(msg[0:-1]) + + MODE = None + ANCHOR_TUPLE = None + + #queue up commands we read while trying to read taskids + pending_commands = deque() + + def readTaskIds(): + if pending_taskids: + return pending_taskids.popleft() + else: + msg = readMsg() + while type(msg) is not list: + pending_commands.append(msg) + msg = readMsg() + return msg + + #queue up taskids we read while trying to read commands/tuples + pending_taskids = deque() + + def readCommand(): + if pending_commands: + return pending_commands.popleft() + else: + msg = readMsg() + while type(msg) is list: + pending_taskids.append(msg) + msg = readMsg() + return msg + + def readTuple(): + cmd = readCommand() + return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"]) + + def sendMsgToParent(msg): + print json_encode(msg) + print "end" + sys.stdout.flush() + + def sync(): + sendMsgToParent({'command':'sync'}) + + def sendpid(heartbeatdir): + pid = os.getpid() + sendMsgToParent({'pid':pid}) + open(heartbeatdir + "/" + str(pid), "w").close() + + def emit(*args, **kwargs): + __emit(*args, **kwargs) + return readTaskIds() + + def emitDirect(task, *args, **kwargs): + kwargs["directTask"] = task + __emit(*args, **kwargs) + + def __emit(*args, **kwargs): + global MODE + if MODE == Bolt: + emitBolt(*args, **kwargs) + elif MODE == Spout: + emitSpout(*args, **kwargs) + + def emitBolt(tup, stream=None, anchors = [], directTask=None): + global ANCHOR_TUPLE + if ANCHOR_TUPLE is not None: + anchors = [ANCHOR_TUPLE] + m = {"command": "emit"} + if stream is not None: + m["stream"] = stream + m["anchors"] = map(lambda a: a.id, anchors) + if directTask is not None: + m["task"] = directTask + m["tuple"] = tup + sendMsgToParent(m) + + def emitSpout(tup, stream=None, id=None, directTask=None): + m = {"command": "emit"} + if id is not None: + m["id"] = id + if stream is not None: + m["stream"] = stream + if directTask is not None: + m["task"] = directTask + m["tuple"] = tup + sendMsgToParent(m) + + def ack(tup): + sendMsgToParent({"command": "ack", "id": tup.id}) + + def fail(tup): + sendMsgToParent({"command": "fail", "id": tup.id}) + + def reportError(msg): + sendMsgToParent({"command": "error", "msg": msg}) + + def log(msg, level=2): + sendMsgToParent({"command": "log", "msg": msg, "level":level}) + + def logTrace(msg): + log(msg, 0) + + def logDebug(msg): + log(msg, 1) + + def logInfo(msg): + log(msg, 2) + + def logWarn(msg): + log(msg, 3) + + def logError(msg): + log(msg, 4) + + def rpcMetrics(name, params): + sendMsgToParent({"command": "metrics", "name": name, "params": params}) + + def initComponent(): + setupInfo = readMsg() + sendpid(setupInfo['pidDir']) + return [setupInfo['conf'], setupInfo['context']] + + class Tuple(object): + def __init__(self, id, component, stream, task, values): + self.id = id + self.component = component + self.stream = stream + self.task = task + self.values = values + + def __repr__(self): + return '<%s%s>' % ( - self.__class__.__name__, - ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys()))) ++ self.__class__.__name__, ++ ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys()))) ++ ++ def is_heartbeat_tuple(self): ++ return self.task == -1 and self.stream == "__heartbeat" + + class Bolt(object): + def initialize(self, stormconf, context): + pass + + def process(self, tuple): + pass + + def run(self): + global MODE + MODE = Bolt + conf, context = initComponent() + try: + self.initialize(conf, context) + while True: + tup = readTuple() - self.process(tup) ++ if tup.is_heartbeat_tuple(): ++ sync() ++ else: ++ self.process(tup) + except Exception, e: + reportError(traceback.format_exc(e)) + + class BasicBolt(object): + def initialize(self, stormconf, context): + pass + + def process(self, tuple): + pass + + def run(self): + global MODE + MODE = Bolt + global ANCHOR_TUPLE + conf, context = initComponent() + try: + self.initialize(conf, context) + while True: + tup = readTuple() - ANCHOR_TUPLE = tup - try: - self.process(tup) - ack(tup) - except Exception, e: - reportError(traceback.format_exc(e)) - fail(tup) ++ if tup.is_heartbeat_tuple(): ++ sync() ++ else: ++ ANCHOR_TUPLE = tup ++ try: ++ self.process(tup) ++ ack(tup) ++ except Exception, e: ++ reportError(traceback.format_exc(e)) ++ fail(tup) + except Exception, e: + reportError(traceback.format_exc(e)) + + class Spout(object): + def initialize(self, conf, context): + pass + + def ack(self, id): + pass + + def fail(self, id): + pass + + def nextTuple(self): + pass + + def run(self): + global MODE + MODE = Spout + conf, context = initComponent() + try: + self.initialize(conf, context) + while True: + msg = readCommand() + if msg["command"] == "next": + self.nextTuple() + if msg["command"] == "ack": + self.ack(msg["id"]) + if msg["command"] == "fail": + self.fail(msg["id"]) + sync() + except Exception, e: + reportError(traceback.format_exc(e)) http://git-wip-us.apache.org/repos/asf/storm/blob/69ddc67a/storm-core/src/multilang/py/storm.py ---------------------------------------------------------------------- diff --cc storm-core/src/multilang/py/storm.py index acc02ad,a2aa976..642c393 --- a/storm-core/src/multilang/py/storm.py +++ b/storm-core/src/multilang/py/storm.py @@@ -215,12 -209,13 +215,16 @@@ class BasicBolt(object) self.initialize(conf, context) while True: tup = readTuple() - ANCHOR_TUPLE = tup - try: - self.process(tup) - ack(tup) - except Exception, e: - reportError(traceback.format_exc(e)) - fail(tup) + if tup.is_heartbeat_tuple(): + sync() + else: + ANCHOR_TUPLE = tup - self.process(tup) - ack(tup) ++ try: ++ self.process(tup) ++ ack(tup) ++ except Exception, e: ++ reportError(traceback.format_exc(e)) ++ fail(tup) except Exception, e: reportError(traceback.format_exc(e))