PROTON-752: Provide a non-blocking means to receive messages in Ruby.

To avoid changing the APIs or their intentions, a new class method is
added:

  Qpid::Proton::Receiver::receive_and_call

The method takes as arguments either an existing instance of Messenger
or else the parameters to create an instance, and also a code block to
be called with each message received.

The messenger is then put into passive mode and a new thread started. It
then monitors the messenger and, when a new message is received, passes
it to the block for processing.

To exit the messenger, the code would call Messenger#interrupt. The
thread then exits and the messenger stops processing.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/0820a372
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/0820a372
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/0820a372

Branch: refs/heads/examples
Commit: 0820a3722b6ab5c2a5a4dbfac3428de7d22c1c6e
Parents: 2335465
Author: Darryl L. Pierce <[email protected]>
Authored: Fri Nov 21 09:57:40 2014 -0500
Committer: Darryl L. Pierce <[email protected]>
Committed: Mon Nov 24 12:58:10 2014 -0500

----------------------------------------------------------------------
 examples/messenger/ruby/passive_recv.rb         | 122 +++---------------
 .../bindings/ruby/lib/qpid_proton/messenger.rb  | 128 +++++++++++++++++++
 .../bindings/ruby/lib/qpid_proton/selectable.rb |  21 ++-
 3 files changed, 163 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0820a372/examples/messenger/ruby/passive_recv.rb
----------------------------------------------------------------------
diff --git a/examples/messenger/ruby/passive_recv.rb 
b/examples/messenger/ruby/passive_recv.rb
index a3625ac..878c801 100644
--- a/examples/messenger/ruby/passive_recv.rb
+++ b/examples/messenger/ruby/passive_recv.rb
@@ -31,110 +31,26 @@ end
 
 addresses = ["~0.0.0.0"] if addresses.empty?
 
-messenger = Qpid::Proton::Messenger.new
-messenger.passive = true
-
-begin
-  messenger.start
-rescue ProtonError => error
-  puts "ERROR: #{error.message}"
-  puts error.backtrace.join("\n")
-  exit
-end
-
-addresses.each do |address|
-  begin
-    messenger.subscribe(address)
-  rescue Qpid::Proton::ProtonError => error
-    puts "ERROR: #{error.message}"
-    exit
-  end
-end
-
-msg = Qpid::Proton::Message.new
-
-read_array = []
-write_array = []
-selectables = {}
-
-loop do
-
-  # wait for incoming messages
-  sel = messenger.selectable
-  while !sel.nil?
-    if sel.terminal?
-      selectables.delete(sel.fileno)
-      read_array.delete(sel)
-      write_array.delete(sel)
-      sel.free
-    else
-      sel.capacity
-      sel.pending
-      if !sel.registered?
-        read_array << sel
-        write_array << sel
-        selectables[sel.fileno] = sel
-        sel.registered = true
-      end
-    end
-    sel = messenger.selectable
+msgr = Qpid::Proton::Messenger.receive_and_call(nil, :addresses => addresses) 
do |message|
+  puts "Address: #{message.address}"
+  subject = message.subject || "(no subject)"
+  puts "Subject: #{subject}"
+  puts "Body: #{message.body}"
+  puts "Properties: #{message.properties}"
+  puts "Instructions: #{message.instructions}"
+  puts "Annotations: #{message.annotations}"
+
+  if message.reply_to
+    puts "=== Sending a reply to #{message.reply_to}"
+    reply = Qpid::Proton::Message.new
+    reply.address = message.reply_to
+    reply.subject = "RE: #{message.subject}"
+    reply.content = "Thanks for the message!"
+
+    messenger.put(reply)
+    messenger.send
   end
 
-  unless selectables.empty?
-    rarray = []; read_array.each {|fd| rarray << fd.to_io }
-    warray = []; write_array.each {|fd| warray << fd.to_io }
-
-    if messenger.deadline > 0.0
-      result = IO.select(rarray, warray, nil, messenger.deadline)
-    else
-      result = IO.select(rarray, warray)
-    end
-
-    unless result.nil? && result.empty?
-      result.flatten.each do |io|
-        sel = selectables[io.fileno]
-
-        sel.writable if sel.pending > 0
-        sel.readable if sel.capacity > 0
-      end
-    end
-
-    begin
-      messenger.receive(10)
-    rescue Qpid::Proton::ProtonError => error
-      puts "ERROR: #{error.message}"
-      exit
-    end
-
-    while messenger.incoming.nonzero?
-      begin
-        messenger.get(msg)
-      rescue Qpid::Proton::Error => error
-        puts "ERROR: #{error.message}"
-        exit
-      end
-
-      puts "Address: #{msg.address}"
-      subject = msg.subject || "(no subject)"
-      puts "Subject: #{subject}"
-      puts "Body: #{msg.body}"
-      puts "Properties: #{msg.properties}"
-      puts "Instructions: #{msg.instructions}"
-      puts "Annotations: #{msg.annotations}"
-
-      if msg.reply_to
-        puts "=== Sending a reply to #{msg.reply_to}"
-        reply = Qpid::Proton::Message.new
-        reply.address = msg.reply_to
-        reply.subject = "RE: #{msg.subject}"
-        reply.content = "Thanks for the message!"
-
-        messenger.put(reply)
-        messenger.send
-      end
-    end
-  end
 end
 
-messenger.stop
-
+Thread.list[1].join

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0820a372/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb 
b/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb
index 5a16c50..a8f7330 100644
--- a/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb
+++ b/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb
@@ -75,6 +75,7 @@ module Qpid # :nodoc:
       #
       def initialize(name = nil)
         @impl = Cproton.pn_messenger(name)
+        @interrupted = false
         @selectables = {}
         ObjectSpace.define_finalizer(self, self.class.finalize!(@impl))
       end
@@ -407,6 +408,7 @@ module Qpid # :nodoc:
       # originated the interrupt.
       #
       def interrupt
+        @interrupted = true
         Cproton.pn_messenger_interrupt(@impl)
       end
 
@@ -695,6 +697,132 @@ module Qpid # :nodoc:
         !window.nil? && [Float, Fixnum].include?(window.class)
       end
 
+      public
+
+
+      #--
+      # The following are class methods.
+      #++
+
+      # Receives messages from the provided instance of Messenger, and then
+      # calls the supplied block for each Message received. If no instance
+      # is provided then one is created using the provided list of options.
+      #
+      # This starts a new thread which will loop, waiting for and processing
+      # incoming messages.
+      #
+      # ==== Arguments
+      #
+      # * messenger - The instance of Messenger.
+      #
+      # ==== Options
+      #
+      # * :addresses - An array of addresses to which to subscribe. Addresses
+      #   are required if no Messenger was supplied.
+      #
+      # ==== Examples
+      #
+      #   # create a Messenger
+      #   messenger = Qpid::Proton::Messenger.new
+      #   # begin receiving messages
+      #   Qpid::Proton::Messenger.receive_and_call(messenger) do |message|
+      #      puts "Received: #{message.body}"
+      #   end
+      #
+      def self.receive_and_call(messenger, options = {}, &block)
+        # if the messenger wasn't created then create one
+        if messenger.nil?
+          # if no addresses were supplied then raise an exception
+          raise ArgumentError.new("no addresses") if options[:addresses].nil?
+          # if no block was supplied then raise an exception
+          raise ArgumentError.new("missing block") if block.nil?
+
+          messenger = Qpid::Proton::Messenger.new
+          Array(options[:addresses]).each do |address|
+            messenger.subscribe address
+          end
+        end
+
+        # set the messenger to passive mode
+        messenger.passive = true
+        messenger.start
+
+        Thread.new(messenger, block) do |messenger, &block|
+          read_array = []
+          write_array = []
+          selectables = {}
+
+          aborted = false
+
+          while !aborted do
+            # refresh the list of fds to be processed
+            sel = messenger.selectable
+            while !sel.nil?
+              if sel.terminal?
+                selectables.delete(sel.fileno)
+                read_array.delete(sel)
+                write_array.delete(sel)
+                sel.free
+              else
+                sel.capacity
+                sel.pending
+                if !sel.registered?
+                  read_array << sel
+                  write_array << sel
+                  selectables[sel.fileno] = sel
+                  sel.registered = true
+                end
+              end
+              sel = messenger.selectable
+            end
+
+            unless selectables.empty?
+              rarray = []; read_array.each {|fd| rarray << fd.to_io}
+              warray = []; write_array.each {|fd| warray << fd.to_io}
+
+              if messenger.deadline > 0.0
+                result = IO.select(rarray, warray, nil, messenger.deadline)
+              else
+                result = IO.select(rarray, warray)
+              end
+
+              unless result.nil? && result.empty?
+                result.flatten.each do |io|
+                  sel = selectables[io.fileno]
+
+                  sel.writable if sel.pending > 0
+                  sel.readable if sel.capacity > 0
+                end
+              end
+
+              messenger.receive(10)
+
+              # if this was interrupted then exit
+              messenger.instance_eval do
+                aborted = @interrupted
+                @interrupted = false
+              end
+
+              if !aborted
+                # process each message received
+                while messenger.incoming.nonzero?
+                  message = Qpid::Proton::Message.new
+                  messenger.get(message)
+                  yield message
+                end
+              end
+
+            end
+
+          end
+
+        end
+
+        # return the messenger
+        messenger
+
+      end
+
     end
 
   end

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0820a372/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb 
b/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb
index 33554cd..8b1214a 100644
--- a/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb
+++ b/proton-c/bindings/ruby/lib/qpid_proton/selectable.rb
@@ -37,6 +37,14 @@ module Qpid # :nodoc:
         @impl = impl
         @io = nil
         @freed = false
+
+        ObjectSpace.define_finalizer(self, self.class.finalize!(@impl))
+      end
+
+      def self.finalize!(impl) # :nodoc:
+        proc {
+          impl.free
+        }
       end
 
       # Returns the underlying file descriptor.
@@ -48,7 +56,11 @@ module Qpid # :nodoc:
       end
 
       def to_io
-        @io ||= IO.new(fileno)
+        if @io.nil?
+          fileno = self.fileno
+          @io = IO.new(fileno)
+        end
+        @io
       end
 
       # The number of bytes the selectable is capable of consuming.
@@ -97,15 +109,14 @@ module Qpid # :nodoc:
       end
 
       def to_s
-        "fileno=#{self.fileno} registered=#{self.registered?} 
terminal=#{self.terminal?}"
+        return super if @freed
+        "#{super} fileno=#{self.fileno} registered=#{self.registered?} 
terminal=#{self.terminal?}"
       end
 
       def free
-        return if @freed
         @freed = true
         @messenger.unregister_selectable(fileno)
-        @io.close unless @io.nil?
-        Cproton.pn_selectable_free(@impl)
+        @messenger = nil
         @impl = nil
       end
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to