Merge branch 'master' of https://github.com/apache/storm into STORM-513

Conflicts:
        storm-core/src/multilang/py/storm.py

* also fixes diverged py files (there're 3 storm.py files but seems to diverged)


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/69ddc67a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/69ddc67a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/69ddc67a

Branch: refs/heads/0.9.3-branch
Commit: 69ddc67a2b0ca527368ca7a27ac58621c745037b
Parents: 12d31b4 7602f43
Author: Jungtaek Lim <kabh...@gmail.com>
Authored: Wed Nov 5 06:44:04 2014 +0900
Committer: Jungtaek Lim <kabh...@gmail.com>
Committed: Wed Nov 5 06:44:04 2014 +0900

----------------------------------------------------------------------
 CHANGELOG.md                                    |  14 +-
 DEVELOPER.md                                    |   3 +
 LICENSE                                         |  20 +-
 README.markdown                                 |   1 +
 bin/storm                                       |   1 +
 bin/storm.cmd                                   |  11 +-
 .../storm-starter/multilang/resources/storm.py  |   8 +-
 examples/storm-starter/pom.xml                  |   2 +-
 external/storm-hbase/pom.xml                    |   2 +-
 .../storm/hbase/bolt/HBaseLookupBolt.java       |   2 +-
 .../hbase/bolt/mapper/HBaseValueMapper.java     |   4 +-
 .../storm/hbase/trident/state/HBaseState.java   |   6 +-
 .../hbase/topology/WordCountValueMapper.java    |   3 +-
 external/storm-hdfs/pom.xml                     |   2 +-
 external/storm-kafka/README.md                  |   7 +-
 external/storm-kafka/pom.xml                    |   2 +-
 .../src/jvm/storm/kafka/KafkaUtils.java         |  63 ++--
 .../src/jvm/storm/kafka/PartitionManager.java   |  11 +-
 .../jvm/storm/kafka/UpdateOffsetException.java  |   5 +
 .../src/test/storm/kafka/KafkaUtilsTest.java    |   6 +-
 logback/cluster.xml                             |   4 +-
 pom.xml                                         |   2 +-
 .../maven-shade-clojure-transformer/pom.xml     |   2 +-
 storm-core/pom.xml                              |  10 +-
 .../clj/backtype/storm/daemon/supervisor.clj    |   4 +-
 storm-core/src/clj/backtype/storm/testing.clj   |   6 +-
 storm-core/src/clj/backtype/storm/testing4j.clj |   3 +
 storm-core/src/clj/backtype/storm/ui/core.clj   |  50 +--
 storm-core/src/dev/resources/storm.py           | 261 +++++++++++++-
 storm-core/src/dev/resources/storm.rb           | 228 +++++++++++-
 .../backtype/storm/messaging/netty/Server.java  |  14 +-
 .../jvm/backtype/storm/utils/LocalState.java    |  30 +-
 storm-core/src/multilang/js/storm.js            | 349 +++++++++++++++++++
 storm-core/src/multilang/py/storm.py            |   8 +-
 storm-core/src/ui/public/component.html         |   8 +-
 storm-core/src/ui/public/js/purl.js             | 267 --------------
 storm-core/src/ui/public/js/script.js           |   3 +-
 storm-core/src/ui/public/js/url.min.js          |   1 +
 storm-core/src/ui/public/js/visualization.js    |   2 +-
 .../templates/component-page-template.html      |   6 +-
 .../public/templates/index-page-template.html   |   2 +-
 .../templates/topology-page-template.html       |  14 +-
 storm-core/src/ui/public/topology.html          |   8 +-
 .../clj/backtype/storm/local_state_test.clj     |  14 +-
 .../test/clj/backtype/storm/supervisor_test.clj |  12 +-
 storm-dist/binary/LICENSE                       |  10 +-
 storm-dist/binary/pom.xml                       |   2 +-
 storm-dist/source/pom.xml                       |   2 +-
 48 files changed, 1075 insertions(+), 420 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/69ddc67a/examples/storm-starter/multilang/resources/storm.py
----------------------------------------------------------------------
diff --cc examples/storm-starter/multilang/resources/storm.py
index acc02ad,fdf7751..642c393
--- a/examples/storm-starter/multilang/resources/storm.py
+++ b/examples/storm-starter/multilang/resources/storm.py
@@@ -211,20 -179,17 +211,24 @@@ class BasicBolt(object)
          MODE = Bolt
          global ANCHOR_TUPLE
          conf, context = initComponent()
 -        self.initialize(conf, context)
          try:
 +            self.initialize(conf, context)
              while True:
                  tup = readTuple()
 -                ANCHOR_TUPLE = tup
 -                self.process(tup)
 -                ack(tup)
 +                if tup.is_heartbeat_tuple():
 +                    sync()
 +                else:
 +                    ANCHOR_TUPLE = tup
-                     self.process(tup)
-                     ack(tup)
++                    try:
++                        self.process(tup)
++                        ack(tup)
++                    except Exception, e:
++                        reportError(traceback.format_exc(e))
++                        fail(tup)
          except Exception, e:
 -            log(traceback.format_exc(e))
 +            reportError(traceback.format_exc(e))
  
 -class Spout:
 +class Spout(object):
      def initialize(self, conf, context):
          pass
  

http://git-wip-us.apache.org/repos/asf/storm/blob/69ddc67a/storm-core/src/dev/resources/storm.py
----------------------------------------------------------------------
diff --cc storm-core/src/dev/resources/storm.py
index 5e73111,a2aa976..642c393
mode 120000,100755..100755
--- a/storm-core/src/dev/resources/storm.py
+++ b/storm-core/src/dev/resources/storm.py
@@@ -1,1 -1,251 +1,260 @@@
- ../../multilang/py/storm.py
+ # -*- coding: utf-8 -*-
+ 
+ # Licensed to the Apache Software Foundation (ASF) under one
+ # or more contributor license agreements.  See the NOTICE file
+ # distributed with this work for additional information
+ # regarding copyright ownership.  The ASF licenses this file
+ # to you under the Apache License, Version 2.0 (the
+ # "License"); you may not use this file except in compliance
+ # with the License.  You may obtain a copy of the License at
+ #
+ # http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+ 
+ import sys
+ import os
+ import traceback
+ from collections import deque
+ 
+ try:
+     import simplejson as json
+ except ImportError:
+     import json
+ 
+ json_encode = lambda x: json.dumps(x)
+ json_decode = lambda x: json.loads(x)
+ 
+ #reads lines and reconstructs newlines appropriately
+ def readMsg():
+     msg = ""
+     while True:
+         line = sys.stdin.readline()
+         if not line:
+             raise Exception('Read EOF from stdin')
+         if line[0:-1] == "end":
+             break
+         msg = msg + line
+     return json_decode(msg[0:-1])
+ 
+ MODE = None
+ ANCHOR_TUPLE = None
+ 
+ #queue up commands we read while trying to read taskids
+ pending_commands = deque()
+ 
+ def readTaskIds():
+     if pending_taskids:
+         return pending_taskids.popleft()
+     else:
+         msg = readMsg()
+         while type(msg) is not list:
+             pending_commands.append(msg)
+             msg = readMsg()
+         return msg
+ 
+ #queue up taskids we read while trying to read commands/tuples
+ pending_taskids = deque()
+ 
+ def readCommand():
+     if pending_commands:
+         return pending_commands.popleft()
+     else:
+         msg = readMsg()
+         while type(msg) is list:
+             pending_taskids.append(msg)
+             msg = readMsg()
+         return msg
+ 
+ def readTuple():
+     cmd = readCommand()
+     return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], 
cmd["tuple"])
+ 
+ def sendMsgToParent(msg):
+     print json_encode(msg)
+     print "end"
+     sys.stdout.flush()
+ 
+ def sync():
+     sendMsgToParent({'command':'sync'})
+ 
+ def sendpid(heartbeatdir):
+     pid = os.getpid()
+     sendMsgToParent({'pid':pid})
+     open(heartbeatdir + "/" + str(pid), "w").close()
+ 
+ def emit(*args, **kwargs):
+     __emit(*args, **kwargs)
+     return readTaskIds()
+ 
+ def emitDirect(task, *args, **kwargs):
+     kwargs["directTask"] = task
+     __emit(*args, **kwargs)
+ 
+ def __emit(*args, **kwargs):
+     global MODE
+     if MODE == Bolt:
+         emitBolt(*args, **kwargs)
+     elif MODE == Spout:
+         emitSpout(*args, **kwargs)
+ 
+ def emitBolt(tup, stream=None, anchors = [], directTask=None):
+     global ANCHOR_TUPLE
+     if ANCHOR_TUPLE is not None:
+         anchors = [ANCHOR_TUPLE]
+     m = {"command": "emit"}
+     if stream is not None:
+         m["stream"] = stream
+     m["anchors"] = map(lambda a: a.id, anchors)
+     if directTask is not None:
+         m["task"] = directTask
+     m["tuple"] = tup
+     sendMsgToParent(m)
+ 
+ def emitSpout(tup, stream=None, id=None, directTask=None):
+     m = {"command": "emit"}
+     if id is not None:
+         m["id"] = id
+     if stream is not None:
+         m["stream"] = stream
+     if directTask is not None:
+         m["task"] = directTask
+     m["tuple"] = tup
+     sendMsgToParent(m)
+ 
+ def ack(tup):
+     sendMsgToParent({"command": "ack", "id": tup.id})
+ 
+ def fail(tup):
+     sendMsgToParent({"command": "fail", "id": tup.id})
+ 
+ def reportError(msg):
+     sendMsgToParent({"command": "error", "msg": msg})
+ 
+ def log(msg, level=2):
+     sendMsgToParent({"command": "log", "msg": msg, "level":level})
+ 
+ def logTrace(msg):
+     log(msg, 0)
+ 
+ def logDebug(msg):
+     log(msg, 1)
+ 
+ def logInfo(msg):
+     log(msg, 2)
+ 
+ def logWarn(msg):
+     log(msg, 3)
+ 
+ def logError(msg):
+     log(msg, 4)
+ 
+ def rpcMetrics(name, params):
+     sendMsgToParent({"command": "metrics", "name": name, "params": params})
+ 
+ def initComponent():
+     setupInfo = readMsg()
+     sendpid(setupInfo['pidDir'])
+     return [setupInfo['conf'], setupInfo['context']]
+ 
+ class Tuple(object):
+     def __init__(self, id, component, stream, task, values):
+         self.id = id
+         self.component = component
+         self.stream = stream
+         self.task = task
+         self.values = values
+ 
+     def __repr__(self):
+         return '<%s%s>' % (
 -                self.__class__.__name__,
 -                ''.join(' %s=%r' % (k, self.__dict__[k]) for k in 
sorted(self.__dict__.keys())))
++            self.__class__.__name__,
++            ''.join(' %s=%r' % (k, self.__dict__[k]) for k in 
sorted(self.__dict__.keys())))
++
++    def is_heartbeat_tuple(self):
++        return self.task == -1 and self.stream == "__heartbeat"
+ 
+ class Bolt(object):
+     def initialize(self, stormconf, context):
+         pass
+ 
+     def process(self, tuple):
+         pass
+ 
+     def run(self):
+         global MODE
+         MODE = Bolt
+         conf, context = initComponent()
+         try:
+             self.initialize(conf, context)
+             while True:
+                 tup = readTuple()
 -                self.process(tup)
++                if tup.is_heartbeat_tuple():
++                    sync()
++                else:
++                    self.process(tup)
+         except Exception, e:
+             reportError(traceback.format_exc(e))
+ 
+ class BasicBolt(object):
+     def initialize(self, stormconf, context):
+         pass
+ 
+     def process(self, tuple):
+         pass
+ 
+     def run(self):
+         global MODE
+         MODE = Bolt
+         global ANCHOR_TUPLE
+         conf, context = initComponent()
+         try:
+             self.initialize(conf, context)
+             while True:
+                 tup = readTuple()
 -                ANCHOR_TUPLE = tup
 -                try:
 -                    self.process(tup)
 -                    ack(tup)
 -                except Exception, e:
 -                    reportError(traceback.format_exc(e))
 -                    fail(tup)
++                if tup.is_heartbeat_tuple():
++                    sync()
++                else:
++                    ANCHOR_TUPLE = tup
++                    try:
++                        self.process(tup)
++                        ack(tup)
++                    except Exception, e:
++                        reportError(traceback.format_exc(e))
++                        fail(tup)
+         except Exception, e:
+             reportError(traceback.format_exc(e))
+ 
+ class Spout(object):
+     def initialize(self, conf, context):
+         pass
+ 
+     def ack(self, id):
+         pass
+ 
+     def fail(self, id):
+         pass
+ 
+     def nextTuple(self):
+         pass
+ 
+     def run(self):
+         global MODE
+         MODE = Spout
+         conf, context = initComponent()
+         try:
+             self.initialize(conf, context)
+             while True:
+                 msg = readCommand()
+                 if msg["command"] == "next":
+                     self.nextTuple()
+                 if msg["command"] == "ack":
+                     self.ack(msg["id"])
+                 if msg["command"] == "fail":
+                     self.fail(msg["id"])
+                 sync()
+         except Exception, e:
+             reportError(traceback.format_exc(e))

http://git-wip-us.apache.org/repos/asf/storm/blob/69ddc67a/storm-core/src/multilang/py/storm.py
----------------------------------------------------------------------
diff --cc storm-core/src/multilang/py/storm.py
index acc02ad,a2aa976..642c393
--- a/storm-core/src/multilang/py/storm.py
+++ b/storm-core/src/multilang/py/storm.py
@@@ -215,12 -209,13 +215,16 @@@ class BasicBolt(object)
              self.initialize(conf, context)
              while True:
                  tup = readTuple()
 -                ANCHOR_TUPLE = tup
 -                try:
 -                    self.process(tup)
 -                    ack(tup)
 -                except Exception, e:
 -                    reportError(traceback.format_exc(e))
 -                    fail(tup)
 +                if tup.is_heartbeat_tuple():
 +                    sync()
 +                else:
 +                    ANCHOR_TUPLE = tup
-                     self.process(tup)
-                     ack(tup)
++                    try:
++                        self.process(tup)
++                        ack(tup)
++                    except Exception, e:
++                        reportError(traceback.format_exc(e))
++                        fail(tup)
          except Exception, e:
              reportError(traceback.format_exc(e))
  

Reply via email to