PROTON-752: Provide a non-blocking means to receive messages in Ruby. To avoid changing the APIs or their intentions, a new class method is added:
Qpid::Proton::Receiver::receive_and_call The method takes as arguments either an existing instance of Messenger or else the parameters to create an instance, and also a code block to be called with each message received. The messenger is then put into passive mode and a new thread started. It then monitors the messenger and, when a new message is received, passes it to the block for processing. To exit the messenger, the code would call Messenger#interrupt. The thread then exits and the messenger stops processing. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/0820a372 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/0820a372 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/0820a372 Branch: refs/heads/examples Commit: 0820a3722b6ab5c2a5a4dbfac3428de7d22c1c6e Parents: 2335465 Author: Darryl L. Pierce <[email protected]> Authored: Fri Nov 21 09:57:40 2014 -0500 Committer: Darryl L. Pierce <[email protected]> Committed: Mon Nov 24 12:58:10 2014 -0500 ---------------------------------------------------------------------- examples/messenger/ruby/passive_recv.rb | 122 +++--------------- .../bindings/ruby/lib/qpid_proton/messenger.rb | 128 +++++++++++++++++++ .../bindings/ruby/lib/qpid_proton/selectable.rb | 21 ++- 3 files changed, 163 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0820a372/examples/messenger/ruby/passive_recv.rb ---------------------------------------------------------------------- diff --git a/examples/messenger/ruby/passive_recv.rb b/examples/messenger/ruby/passive_recv.rb index a3625ac..878c801 100644 --- a/examples/messenger/ruby/passive_recv.rb +++ b/examples/messenger/ruby/passive_recv.rb @@ -31,110 +31,26 @@ end addresses = ["~0.0.0.0"] if addresses.empty? -messenger = Qpid::Proton::Messenger.new -messenger.passive = true - -begin - messenger.start -rescue ProtonError => error - puts "ERROR: #{error.message}" - puts error.backtrace.join("\n") - exit -end - -addresses.each do |address| - begin - messenger.subscribe(address) - rescue Qpid::Proton::ProtonError => error - puts "ERROR: #{error.message}" - exit - end -end - -msg = Qpid::Proton::Message.new - -read_array = [] -write_array = [] -selectables = {} - -loop do - - # wait for incoming messages - sel = messenger.selectable - while !sel.nil? - if sel.terminal? - selectables.delete(sel.fileno) - read_array.delete(sel) - write_array.delete(sel) - sel.free - else - sel.capacity - sel.pending - if !sel.registered? - read_array << sel - write_array << sel - selectables[sel.fileno] = sel - sel.registered = true - end - end - sel = messenger.selectable +msgr = Qpid::Proton::Messenger.receive_and_call(nil, :addresses => addresses) do |message| + puts "Address: #{message.address}" + subject = message.subject || "(no subject)" + puts "Subject: #{subject}" + puts "Body: #{message.body}" + puts "Properties: #{message.properties}" + puts "Instructions: #{message.instructions}" + puts "Annotations: #{message.annotations}" + + if message.reply_to + puts "=== Sending a reply to #{message.reply_to}" + reply = Qpid::Proton::Message.new + reply.address = message.reply_to + reply.subject = "RE: #{message.subject}" + reply.content = "Thanks for the message!" + + messenger.put(reply) + messenger.send end - unless selectables.empty? - rarray = []; read_array.each {|fd| rarray << fd.to_io } - warray = []; write_array.each {|fd| warray << fd.to_io } - - if messenger.deadline > 0.0 - result = IO.select(rarray, warray, nil, messenger.deadline) - else - result = IO.select(rarray, warray) - end - - unless result.nil? && result.empty? - result.flatten.each do |io| - sel = selectables[io.fileno] - - sel.writable if sel.pending > 0 - sel.readable if sel.capacity > 0 - end - end - - begin - messenger.receive(10) - rescue Qpid::Proton::ProtonError => error - puts "ERROR: #{error.message}" - exit - end - - while messenger.incoming.nonzero? - begin - messenger.get(msg) - rescue Qpid::Proton::Error => error - puts "ERROR: #{error.message}" - exit - end - - puts "Address: #{msg.address}" - subject = msg.subject || "(no subject)" - puts "Subject: #{subject}" - puts "Body: #{msg.body}" - puts "Properties: #{msg.properties}" - puts "Instructions: #{msg.instructions}" - puts "Annotations: #{msg.annotations}" - - if msg.reply_to - puts "=== Sending a reply to #{msg.reply_to}" - reply = Qpid::Proton::Message.new - reply.address = msg.reply_to - reply.subject = "RE: #{msg.subject}" - reply.content = "Thanks for the message!" - - messenger.put(reply) - messenger.send - end - end - end end -messenger.stop - +Thread.list[1].join http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0820a372/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb b/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb index 5a16c50..a8f7330 100644 --- a/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb +++ b/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb @@ -75,6 +75,7 @@ module Qpid # :nodoc: # def initialize(name = nil) @impl = Cproton.pn_messenger(name) + @interrupted = false @selectables = {} ObjectSpace.define_finalizer(self, self.class.finalize!(@impl)) end @@ -407,6 +408,7 @@ module Qpid # :nodoc: # originated the interrupt. # def interrupt + @interrupted = true Cproton.pn_messenger_interrupt(@impl) end @@ -695,6 +697,132 @@ module Qpid # :nodoc: !window.nil? && [Float, Fixnum].include?(window.class) end + public + + + #-- + # The following are class methods. + #++ + + # Receives messages from the provided instance of Messenger, and then + # calls the supplied block for each Message received. If no instance + # is provided then one is created using the provided list of options. + # + # This starts a new thread which will loop, waiting for and processing + # incoming messages. + # + # ==== Arguments + # + # * messenger - The instance of Messenger. + # + # ==== Options + # + # * :addresses - An array of addresses to which to subscribe. Addresses + # are required if no Messenger was supplied. + # + # ==== Examples + # + # # create a Messenger + # messenger = Qpid::Proton::Messenger.new + # # begin receiving messages + # Qpid::Proton::Messenger.receive_and_call(messenger) do |message| + # puts "Received: #{message.body}" + # end + # + def self.receive_and_call(messenger, options = {}, &block) + # if the messenger wasn't created then create one + if messenger.nil? + # if no addresses were supplied then raise an exception + raise ArgumentError.new("no addresses") if options[:addresses].nil? + # if no block was supplied then raise an exception + raise ArgumentError.new("missing block") if block.nil? + + messenger = Qpid::Proton::Messenger.new + Array(options[:addresses]).each do |address| + messenger.subscribe address + end + end + + # set the messenger to passive mode + messenger.passive = true + messenger.start + + Thread.new(messenger, block) do |messenger, &block| + read_array = [] + write_array = [] + selectables = {} + + aborted = false + + while !aborted do + # refresh the list of fds to be processed + sel = messenger.selectable + while !sel.nil? + if sel.terminal? + selectables.delete(sel.fileno) + read_array.delete(sel) + write_array.delete(sel) + sel.free + else + sel.capacity + sel.pending + if !sel.registered? + read_array << sel + write_array << sel + selectables[sel.fileno] = sel + sel.registered = true + end + end + sel = messenger.selectable + end + + unless selectables.empty? + rarray = []; read_array.each {|fd| rarray << fd.to_io} + warray = []; write_array.each {|fd| warray << fd.to_io} + + if messenger.deadline > 0.0 + result = IO.select(rarray, warray, nil, messenger.deadline) + else + result = IO.select(rarray, warray) + end + + unless result.nil? && result.empty? + result.flatten.each do |io| + sel = selectables[io.fileno] + + sel.writable if sel.pending > 0 + sel.readable if sel.capacity > 0 + end + end + + messenger.receive(10) + + # if this was interrupted then exit + messenger.instance_eval do + aborted = @interrupted + @interrupted = false + end + + if !aborted + # process each message received + while messenger.incoming.nonzero? + message = Qpid::Proton::Message.new + messenger.get(message) + yield message + end + end + + end + + end + + end + + # return the messenger + messenger + + end + end end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0820a372/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb b/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb index 33554cd..8b1214a 100644 --- a/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb +++ b/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb @@ -37,6 +37,14 @@ module Qpid # :nodoc: @impl = impl @io = nil @freed = false + + ObjectSpace.define_finalizer(self, self.class.finalize!(@impl)) + end + + def self.finalize!(impl) # :nodoc: + proc { + impl.free + } end # Returns the underlying file descriptor. @@ -48,7 +56,11 @@ module Qpid # :nodoc: end def to_io - @io ||= IO.new(fileno) + if @io.nil? + fileno = self.fileno + @io = IO.new(fileno) + end + @io end # The number of bytes the selectable is capable of consuming. @@ -97,15 +109,14 @@ module Qpid # :nodoc: end def to_s - "fileno=#{self.fileno} registered=#{self.registered?} terminal=#{self.terminal?}" + return super if @freed + "#{super} fileno=#{self.fileno} registered=#{self.registered?} terminal=#{self.terminal?}" end def free - return if @freed @freed = true @messenger.unregister_selectable(fileno) - @io.close unless @io.nil? - Cproton.pn_selectable_free(@impl) + @messenger = nil @impl = nil end --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
