Repository: qpid-proton Updated Branches: refs/heads/master 0820a3722 -> 3ac2e3bd3
Revert "PROTON-752: Provide a non-blocking means to receive messages in Ruby." This reverts commit 0820a3722b6ab5c2a5a4dbfac3428de7d22c1c6e. Reverting this change so we can have a review first. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/3ac2e3bd Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/3ac2e3bd Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/3ac2e3bd Branch: refs/heads/master Commit: 3ac2e3bd34b90a7026103af47b1cb8fd2f7d5271 Parents: 0820a37 Author: Darryl L. Pierce <[email protected]> Authored: Mon Nov 24 14:25:26 2014 -0500 Committer: Darryl L. Pierce <[email protected]> Committed: Mon Nov 24 14:25:26 2014 -0500 ---------------------------------------------------------------------- examples/messenger/ruby/passive_recv.rb | 122 +++++++++++++++--- .../bindings/ruby/lib/qpid_proton/messenger.rb | 128 ------------------- .../bindings/ruby/lib/qpid_proton/selectable.rb | 21 +-- 3 files changed, 108 insertions(+), 163 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3ac2e3bd/examples/messenger/ruby/passive_recv.rb ---------------------------------------------------------------------- diff --git a/examples/messenger/ruby/passive_recv.rb b/examples/messenger/ruby/passive_recv.rb index 878c801..a3625ac 100644 --- a/examples/messenger/ruby/passive_recv.rb +++ b/examples/messenger/ruby/passive_recv.rb @@ -31,26 +31,110 @@ end addresses = ["~0.0.0.0"] if addresses.empty? -msgr = Qpid::Proton::Messenger.receive_and_call(nil, :addresses => addresses) do |message| - puts "Address: #{message.address}" - subject = message.subject || "(no subject)" - puts "Subject: #{subject}" - puts "Body: #{message.body}" - puts "Properties: #{message.properties}" - puts "Instructions: #{message.instructions}" - puts "Annotations: #{message.annotations}" - - if message.reply_to - puts "=== Sending a reply to #{message.reply_to}" - reply = Qpid::Proton::Message.new - reply.address = message.reply_to - reply.subject = "RE: #{message.subject}" - reply.content = "Thanks for the message!" - - messenger.put(reply) - messenger.send +messenger = Qpid::Proton::Messenger.new +messenger.passive = true + +begin + messenger.start +rescue ProtonError => error + puts "ERROR: #{error.message}" + puts error.backtrace.join("\n") + exit +end + +addresses.each do |address| + begin + messenger.subscribe(address) + rescue Qpid::Proton::ProtonError => error + puts "ERROR: #{error.message}" + exit + end +end + +msg = Qpid::Proton::Message.new + +read_array = [] +write_array = [] +selectables = {} + +loop do + + # wait for incoming messages + sel = messenger.selectable + while !sel.nil? + if sel.terminal? + selectables.delete(sel.fileno) + read_array.delete(sel) + write_array.delete(sel) + sel.free + else + sel.capacity + sel.pending + if !sel.registered? + read_array << sel + write_array << sel + selectables[sel.fileno] = sel + sel.registered = true + end + end + sel = messenger.selectable end + unless selectables.empty? + rarray = []; read_array.each {|fd| rarray << fd.to_io } + warray = []; write_array.each {|fd| warray << fd.to_io } + + if messenger.deadline > 0.0 + result = IO.select(rarray, warray, nil, messenger.deadline) + else + result = IO.select(rarray, warray) + end + + unless result.nil? && result.empty? + result.flatten.each do |io| + sel = selectables[io.fileno] + + sel.writable if sel.pending > 0 + sel.readable if sel.capacity > 0 + end + end + + begin + messenger.receive(10) + rescue Qpid::Proton::ProtonError => error + puts "ERROR: #{error.message}" + exit + end + + while messenger.incoming.nonzero? + begin + messenger.get(msg) + rescue Qpid::Proton::Error => error + puts "ERROR: #{error.message}" + exit + end + + puts "Address: #{msg.address}" + subject = msg.subject || "(no subject)" + puts "Subject: #{subject}" + puts "Body: #{msg.body}" + puts "Properties: #{msg.properties}" + puts "Instructions: #{msg.instructions}" + puts "Annotations: #{msg.annotations}" + + if msg.reply_to + puts "=== Sending a reply to #{msg.reply_to}" + reply = Qpid::Proton::Message.new + reply.address = msg.reply_to + reply.subject = "RE: #{msg.subject}" + reply.content = "Thanks for the message!" + + messenger.put(reply) + messenger.send + end + end + end end -Thread.list[1].join +messenger.stop + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3ac2e3bd/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb b/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb index a8f7330..5a16c50 100644 --- a/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb +++ b/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb @@ -75,7 +75,6 @@ module Qpid # :nodoc: # def initialize(name = nil) @impl = Cproton.pn_messenger(name) - @interrupted = false @selectables = {} ObjectSpace.define_finalizer(self, self.class.finalize!(@impl)) end @@ -408,7 +407,6 @@ module Qpid # :nodoc: # originated the interrupt. # def interrupt - @interrupted = true Cproton.pn_messenger_interrupt(@impl) end @@ -697,132 +695,6 @@ module Qpid # :nodoc: !window.nil? && [Float, Fixnum].include?(window.class) end - public - - - #-- - # The following are class methods. - #++ - - # Receives messages from the provided instance of Messenger, and then - # calls the supplied block for each Message received. If no instance - # is provided then one is created using the provided list of options. - # - # This starts a new thread which will loop, waiting for and processing - # incoming messages. - # - # ==== Arguments - # - # * messenger - The instance of Messenger. - # - # ==== Options - # - # * :addresses - An array of addresses to which to subscribe. Addresses - # are required if no Messenger was supplied. - # - # ==== Examples - # - # # create a Messenger - # messenger = Qpid::Proton::Messenger.new - # # begin receiving messages - # Qpid::Proton::Messenger.receive_and_call(messenger) do |message| - # puts "Received: #{message.body}" - # end - # - def self.receive_and_call(messenger, options = {}, &block) - # if the messenger wasn't created then create one - if messenger.nil? - # if no addresses were supplied then raise an exception - raise ArgumentError.new("no addresses") if options[:addresses].nil? - # if no block was supplied then raise an exception - raise ArgumentError.new("missing block") if block.nil? - - messenger = Qpid::Proton::Messenger.new - Array(options[:addresses]).each do |address| - messenger.subscribe address - end - end - - # set the messenger to passive mode - messenger.passive = true - messenger.start - - Thread.new(messenger, block) do |messenger, &block| - read_array = [] - write_array = [] - selectables = {} - - aborted = false - - while !aborted do - # refresh the list of fds to be processed - sel = messenger.selectable - while !sel.nil? - if sel.terminal? - selectables.delete(sel.fileno) - read_array.delete(sel) - write_array.delete(sel) - sel.free - else - sel.capacity - sel.pending - if !sel.registered? - read_array << sel - write_array << sel - selectables[sel.fileno] = sel - sel.registered = true - end - end - sel = messenger.selectable - end - - unless selectables.empty? - rarray = []; read_array.each {|fd| rarray << fd.to_io} - warray = []; write_array.each {|fd| warray << fd.to_io} - - if messenger.deadline > 0.0 - result = IO.select(rarray, warray, nil, messenger.deadline) - else - result = IO.select(rarray, warray) - end - - unless result.nil? && result.empty? - result.flatten.each do |io| - sel = selectables[io.fileno] - - sel.writable if sel.pending > 0 - sel.readable if sel.capacity > 0 - end - end - - messenger.receive(10) - - # if this was interrupted then exit - messenger.instance_eval do - aborted = @interrupted - @interrupted = false - end - - if !aborted - # process each message received - while messenger.incoming.nonzero? - message = Qpid::Proton::Message.new - messenger.get(message) - yield message - end - end - - end - - end - - end - - # return the messenger - messenger - - end - end end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3ac2e3bd/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb b/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb index 8b1214a..33554cd 100644 --- a/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb +++ b/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb @@ -37,14 +37,6 @@ module Qpid # :nodoc: @impl = impl @io = nil @freed = false - - ObjectSpace.define_finalizer(self, self.class.finalize!(@impl)) - end - - def self.finalize!(impl) # :nodoc: - proc { - impl.free - } end # Returns the underlying file descriptor. @@ -56,11 +48,7 @@ module Qpid # :nodoc: end def to_io - if @io.nil? - fileno = self.fileno - @io = IO.new(fileno) - end - @io + @io ||= IO.new(fileno) end # The number of bytes the selectable is capable of consuming. @@ -109,14 +97,15 @@ module Qpid # :nodoc: end def to_s - return super if @freed - "#{super} fileno=#{self.fileno} registered=#{self.registered?} terminal=#{self.terminal?}" + "fileno=#{self.fileno} registered=#{self.registered?} terminal=#{self.terminal?}" end def free + return if @freed @freed = true @messenger.unregister_selectable(fileno) - @messenger = nil + @io.close unless @io.nil? + Cproton.pn_selectable_free(@impl) @impl = nil end --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
