Send fail(tup) when BasicBolt process throws Exception
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/899eaec5 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/899eaec5 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/899eaec5 Branch: refs/heads/security Commit: 899eaec53a9b0895924c703b825dc0118e47cc83 Parents: cfe7e63 Author: Itai Frenkel <[email protected]> Authored: Sat Oct 11 15:46:35 2014 +0300 Committer: Itai Frenkel <[email protected]> Committed: Sat Oct 11 15:46:35 2014 +0300 ---------------------------------------------------------------------- storm-core/src/dev/resources/storm.py | 8 ++++++-- storm-core/src/multilang/py/storm.py | 8 ++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/899eaec5/storm-core/src/dev/resources/storm.py ---------------------------------------------------------------------- diff --git a/storm-core/src/dev/resources/storm.py b/storm-core/src/dev/resources/storm.py index d2a3082..a2aa976 100755 --- a/storm-core/src/dev/resources/storm.py +++ b/storm-core/src/dev/resources/storm.py @@ -210,8 +210,12 @@ class BasicBolt(object): while True: tup = readTuple() ANCHOR_TUPLE = tup - self.process(tup) - ack(tup) + try: + self.process(tup) + ack(tup) + except Exception, e: + reportError(traceback.format_exc(e)) + fail(tup) except Exception, e: reportError(traceback.format_exc(e)) http://git-wip-us.apache.org/repos/asf/storm/blob/899eaec5/storm-core/src/multilang/py/storm.py ---------------------------------------------------------------------- diff --git a/storm-core/src/multilang/py/storm.py b/storm-core/src/multilang/py/storm.py index d2a3082..a2aa976 100755 --- a/storm-core/src/multilang/py/storm.py +++ b/storm-core/src/multilang/py/storm.py @@ -210,8 +210,12 @@ class BasicBolt(object): while True: tup = readTuple() ANCHOR_TUPLE = tup - self.process(tup) - ack(tup) + try: + self.process(tup) + ack(tup) + except Exception, e: + reportError(traceback.format_exc(e)) + fail(tup) except Exception, e: reportError(traceback.format_exc(e))
