sync copies of storm.rb
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a9333360 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a9333360 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a9333360 Branch: refs/heads/master Commit: a9333360678107954209a2f04e0e4836230ad2ad Parents: b9322a5 Author: P. Taylor Goetz <ptgo...@gmail.com> Authored: Fri Nov 14 16:46:43 2014 -0500 Committer: P. Taylor Goetz <ptgo...@gmail.com> Committed: Fri Nov 14 16:46:43 2014 -0500 ---------------------------------------------------------------------- storm-core/src/dev/resources/storm.rb | 55 +++++++++++++++++------------- 1 file changed, 32 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/a9333360/storm-core/src/dev/resources/storm.rb ---------------------------------------------------------------------- diff --git a/storm-core/src/dev/resources/storm.rb b/storm-core/src/dev/resources/storm.rb index 17232d1..816694e 100644 --- a/storm-core/src/dev/resources/storm.rb +++ b/storm-core/src/dev/resources/storm.rb @@ -41,26 +41,26 @@ module Storm def read_task_ids Storm::Protocol.pending_taskids.shift || - begin - msg = read_message - until msg.is_a? Array - Storm::Protocol.pending_commands.push(msg) + begin msg = read_message + until msg.is_a? Array + Storm::Protocol.pending_commands.push(msg) + msg = read_message + end + msg end - msg - end end def read_command Storm::Protocol.pending_commands.shift || - begin - msg = read_message - while msg.is_a? Array - Storm::Protocol.pending_taskids.push(msg) + begin msg = read_message + while msg.is_a? Array + Storm::Protocol.pending_taskids.push(msg) + msg = read_message + end + msg end - msg - end end def send_msg_to_parent(msg) @@ -105,10 +105,10 @@ module Storm def emit(*args) case Storm::Protocol.mode - when 'spout' - emit_spout(*args) - when 'bolt' - emit_bolt(*args) + when 'spout' + emit_spout(*args) + when 'bolt' + emit_bolt(*args) end end @@ -169,6 +169,10 @@ module Storm def self.from_hash(hash) Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple")) end + + def is_heartbeat + task == -1 and stream == '__heartbeat' + end end class Bolt @@ -183,7 +187,12 @@ module Storm prepare(*handshake) begin while true - process Tuple.from_hash(read_command) + tuple = Tuple.from_hash(read_command) + if tuple.is_heartbeat + sync + else + process tuple + end end rescue Exception => e reportError 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n') @@ -210,12 +219,12 @@ module Storm while true msg = read_command case msg['command'] - when 'next' - nextTuple - when 'ack' - ack(msg['id']) - when 'fail' - fail(msg['id']) + when 'next' + nextTuple + when 'ack' + ack(msg['id']) + when 'fail' + fail(msg['id']) end sync end