Send fail(tup) when BasicBolt process throws Exception

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/899eaec5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/899eaec5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/899eaec5

Branch: refs/heads/security
Commit: 899eaec53a9b0895924c703b825dc0118e47cc83
Parents: cfe7e63
Author: Itai Frenkel <[email protected]>
Authored: Sat Oct 11 15:46:35 2014 +0300
Committer: Itai Frenkel <[email protected]>
Committed: Sat Oct 11 15:46:35 2014 +0300

----------------------------------------------------------------------
 storm-core/src/dev/resources/storm.py | 8 ++++++--
 storm-core/src/multilang/py/storm.py  | 8 ++++++--
 2 files changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/899eaec5/storm-core/src/dev/resources/storm.py
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/storm.py 
b/storm-core/src/dev/resources/storm.py
index d2a3082..a2aa976 100755
--- a/storm-core/src/dev/resources/storm.py
+++ b/storm-core/src/dev/resources/storm.py
@@ -210,8 +210,12 @@ class BasicBolt(object):
             while True:
                 tup = readTuple()
                 ANCHOR_TUPLE = tup
-                self.process(tup)
-                ack(tup)
+                try:
+                    self.process(tup)
+                    ack(tup)
+                except Exception, e:
+                    reportError(traceback.format_exc(e))
+                    fail(tup)
         except Exception, e:
             reportError(traceback.format_exc(e))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/899eaec5/storm-core/src/multilang/py/storm.py
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/py/storm.py 
b/storm-core/src/multilang/py/storm.py
index d2a3082..a2aa976 100755
--- a/storm-core/src/multilang/py/storm.py
+++ b/storm-core/src/multilang/py/storm.py
@@ -210,8 +210,12 @@ class BasicBolt(object):
             while True:
                 tup = readTuple()
                 ANCHOR_TUPLE = tup
-                self.process(tup)
-                ack(tup)
+                try:
+                    self.process(tup)
+                    ack(tup)
+                except Exception, e:
+                    reportError(traceback.format_exc(e))
+                    fail(tup)
         except Exception, e:
             reportError(traceback.format_exc(e))
 

Reply via email to