PROTON-1778: [ruby] threaded broker example for thread safe work_queue

Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/8694821e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/8694821e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/8694821e

Branch: refs/heads/go1
Commit: 8694821e5f71a534957c00f4857478d79906636d
Parents: c0d8399
Author: Alan Conway <acon...@redhat.com>
Authored: Fri Mar 23 15:31:05 2018 -0400
Committer: Alan Conway <acon...@redhat.com>
Committed: Fri Mar 23 16:30:13 2018 -0400

----------------------------------------------------------------------
 examples/ruby/README.md                       |  36 ++--
 examples/ruby/broker.rb                       | 190 +++++++++++----------
 proton-c/bindings/ruby/README.rdoc            |  21 +++
 proton-c/bindings/ruby/lib/core/connection.rb |   3 +-
 proton-c/bindings/ruby/lib/core/container.rb  |   6 +-
 proton-c/bindings/ruby/lib/core/transfer.rb   |   3 +
 proton-c/bindings/ruby/lib/core/work_queue.rb |  25 ++-
 7 files changed, 175 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8694821e/examples/ruby/README.md
----------------------------------------------------------------------
diff --git a/examples/ruby/README.md b/examples/ruby/README.md
index 66f6b31..61e01ca 100644
--- a/examples/ruby/README.md
+++ b/examples/ruby/README.md
@@ -58,19 +58,35 @@ In this set of examples we see the following event 
occurring, in addition to wha
 
 ## Now About That Broker example
 
-The **broker.rb** example application is a nice demonstration of doing 
something more interesting in Ruby with Proton.
+The **broker.rb** example application is a nice demonstration of doing 
something more interesting in Ruby with Proton, and shows how to use multiple 
threads.
 
-The way the broker works is to listen to incoming connections, examine the 
components of the address for that connection, attach that connection to an 
exchange managing that address and then it sends any messages destined for that 
address to them.
+The broker listens for incoming connections and sender/receiver links. It uses 
the source and target address of senders and receivers to identify a queue. 
Messages from receivers go on the queue, and are sent via senders.
 
 The components of the broker example include:
- * **Broker** - A class that extends the MessagingHandler class. It accepts 
incoming connections, manages subscribing them to exchanges, and transfers 
messages between them.
- * **MessageQueue** - Distributes messages to subscriptions.
-
-The Broker manages a map connecting a queue address to the instance of 
Exchange that holds references to the endpoints of interest.
+ * **Broker** is a Listener::Handler that accepts connections, and manages the 
set of named queues.
+ * **BrokerHandler** extends MessagingHandler to accept incoming connections, 
senders and receivers and transfers messages between them and the Broker's 
queues.
+ * **MessageQueue** - A queue of messages that keeps track of waiting senders.
 
 The broker application demonstrates a new set of events:
 
- * **on_link_open** - Fired when a remote link is opened. From this event the 
broker grabs the address and subscribes the link to an exchange for that 
address.
- * **on_link_close** - Fired when a remote link is closed. From this event the 
broker grabs the address and unsubscribes the link from that exchange.
- * **on_connection_close** - Fired when a remote connection is closed but the 
local end is still open.
- * **on_transport_close** - Fired when the protocol transport has closed. The 
broker removes all links for the disconnected connection, avoiding workign with 
endpoints that are now gone.
+ * **on_sender_open** - Fired when a sender link is opened, the broker gets 
the address and starts sending messages from the corresponding queue.
+ * **on_sender_close** - Fired when a sender link is closed, remove the sender 
from the queue so no more messages are sent.
+ * **on_connection_close** - Fired when the remote connection is closes, close 
all senders.
+ * **on_transport_close** - Fired when the transport (socket) has closed, 
close all senders.
+
+It also demonstrates aspects of multi-threaded proton:
+
+ * **Thread safe MessageQueue** Uses a Mutex to make actions atomic when 
called concurrently.
+
+ * **Using WorkQueue** Proton objects like Sender are not thread safe.  They 
are
+   normally only used in MessagingHandler#on_ callbacks.  To request work from 
a
+   different thread you can add a code block to a WorkQueue, as shown in
+   MessageQueue#push.
+
+ * **Listener::Handler** The broker creates a new BrokerHandler instance for
+   each accepted connection. The container ensures that calls on each handler 
instance
+   are serialized even if there are multiple threads in the container.
+
+ * **Calling Container#run in multiple threads** The Container uses threads 
that call
+   #run as a thread pool to dispatch calls to MessagingHandler instances. Even
+   if there are multiple threads, calls to handler instance are serialized.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8694821e/examples/ruby/broker.rb
----------------------------------------------------------------------
diff --git a/examples/ruby/broker.rb b/examples/ruby/broker.rb
index 9a023ba..8dbfe81 100644
--- a/examples/ruby/broker.rb
+++ b/examples/ruby/broker.rb
@@ -21,139 +21,142 @@ require 'qpid_proton'
 require 'optparse'
 require 'pathname'
 
+# Thread safe message queue that notifies waiting senders when messages arrive.
 class MessageQueue
 
-  def initialize(dynamic = false)
-    @dynamic = dynamic
-    @queue = Queue.new
-    @consumers = []
-  end
-
-  def subscribe(consumer)
-    @consumers << (consumer)
-  end
-
-  def unsubscribe(consumer)
-    if @consumers.include?(consumer)
-      @consumers.delete(consumer)
+  def initialize
+    @lock = Mutex.new           # Make ations on the queue atomic
+    @messages = []              # Messages on the queue
+    @waiting = []               # Senders that are waiting for messages
+  end
+
+  # Push a message onto the queue and notify any waiting senders
+  def push(message)
+    @lock.synchronize do
+      @messages << message
+      unless @waiting.empty?    # Notify waiting senders
+        # NOTE: the call to self.send_to is added to the sender's work_queue,
+        # and will be executed in the sender's thread
+        @waiting.each { |s| s.work_queue.add { self.send_to(s); } }
+        @waiting.clear
+      end
     end
-    @consumers.empty? && (@dynamic || @queue.empty?)
-  end
-
-  def publish(message)
-    @queue << message
-    self.dispatch
   end
 
-  def dispatch(consumer = nil)
-    if consumer
-      c = [consumer]
-    else
-      c = @consumers
-    end
-
-    while self.deliver_to(c) do
+  # Pop a message off the queue.
+  # If no messages available, record sender as waiting and return nil.
+  def pop(sender)
+    @lock.synchronize do
+      if @messages.empty?
+        @waiting << sender
+        nil
+      else
+        @messages.shift
+      end
     end
   end
 
-  def deliver_to(consumers)
-    result = false
-    consumers.each do |consumer|
-      if consumer.credit > 0 && !@queue.empty?
-        consumer.send(@queue.pop(true))
-        result = true
-      end
+  # NOTE: Called in sender's thread.
+  # Pull messages from the queue as long as sender has credit.
+  # If queue runs out of messages, record sender as waiting.
+  def send_to(sender)
+    while sender.credit > 0 && (message = pop(sender))
+      sender.send(message)
     end
-    return result
   end
 
+  def forget(sender)
+    @lock.synchronize { @waiting.delete(sender) }
+  end
 end
 
-class Broker < Qpid::Proton::MessagingHandler
-
-  def initialize(url)
-    super()
-    @url = url
-    @queues = {}
-    begin          # Optional SSL setup, ignore if we don't find cert files 
etc.
-      @ssl_domain = 
Qpid::Proton::SSLDomain.new(Qpid::Proton::SSLDomain::MODE_SERVER)
-      cert_passsword = "tserverpw"
-      if Gem.win_platform?       # Use P12 certs for windows schannel
-        @ssl_domain.credentials("ssl_certs/tserver-certificate.p12", "", 
cert_passsword)
-      else
-        @ssl_domain.credentials("ssl_certs/tserver-certificate.pem", 
"ssl_certs/tserver-private-key.pem", cert_passsword)
-      end
-      @ssl_domain.allow_unsecured_client # SSL is optional, this is not secure.
-    rescue
-      @ssl_domain = nil # Don't worry if we can't set up SSL.
-    end
-  end
 
-  def on_container_start(container)
-    # Options for incoming connections, provide SSL configuration if we have 
it.
-    opts = {:ssl_domain => @ssl_domain} if @ssl_domain
-    @listener = container.listen(@url, 
Qpid::Proton::Listener::Handler.new(opts))
-    STDOUT.puts "Listening on #{@url.inspect}"; STDOUT.flush
-  end
+# Handler for broker connections. In a multi-threaded application you should
+# normally create a separate handler instance for each connection.
+class BrokerHandler < Qpid::Proton::MessagingHandler
 
-  def queue(address)
-    unless @queues.has_key?(address)
-      @queues[address] = MessageQueue.new
-    end
-    @queues[address]
+  def initialize(broker)
+    @broker = broker
   end
 
   def on_sender_open(sender)
     if sender.remote_source.dynamic?
-      address = SecureRandom.uuid
-      sender.source.address = address
-      q = MessageQueue.new(true)
-      @queues[address] = q
-      q.subscribe(sender)
+      sender.source.address = SecureRandom.uuid
     elsif sender.remote_source.address
       sender.source.address = sender.remote_source.address
-      self.queue(sender.source.address).subscribe(sender)
+    else
+      sender.connection.close("no source address")
+      return
     end
+    q = @broker.queue(sender.source.address)
+    q.send_to(sender)
   end
 
   def on_receiver_open(receiver)
     if receiver.remote_target.address
       receiver.target.address = receiver.remote_target.address
-    end
-  end
-
-  def unsubscribe(link)
-    if @queues.has_key?(link.source.address)
-      if @queues[link.source.address].unsubscribe(link)
-        @queues.delete(link.source.address)
-      end
+    else
+      receiver.connection.close("no target address")
     end
   end
 
   def on_sender_close(sender)
-    self.unsubscribe(sender)
+    q = @broker.queue(sender.source.address)
+    q.forget(sender) if q
   end
 
   def on_connection_close(connection)
-    self.remove_stale_consumers(connection)
+    connection.each_sender { |s| on_sender_close(s) }
   end
 
   def on_transport_close(transport)
-    self.remove_stale_consumers(transport.connection)
-  end
-
-  def remove_stale_consumers(connection)
-    connection.each_sender { |s| unsubscribe(s) }
+    transport.connection.each_sender { |s| on_sender_close(s) }
   end
 
   def on_sendable(sender)
-    q = self.queue(sender.source.address)
-    q.dispatch(sender)
+    @broker.queue(sender.source.address).send_to(sender)
   end
 
   def on_message(delivery, message)
-    q = self.queue(delivery.link.target.address)
-    q.publish(message)
+    @broker.queue(delivery.receiver.target.address).push(message)
+  end
+end
+
+# Broker manages the queues and accepts incoming connections.
+class Broker < Qpid::Proton::Listener::Handler
+
+  def initialize
+    @queues = {}
+    @connection_options = {}
+    ssl_setup
+  end
+
+  def ssl_setup
+    # Optional SSL setup
+    ssl = Qpid::Proton::SSLDomain.new(Qpid::Proton::SSLDomain::MODE_SERVER)
+    cert_passsword = "tserverpw"
+    if Gem.win_platform?       # Use P12 certs for windows schannel
+      ssl.credentials("ssl_certs/tserver-certificate.p12", "", cert_passsword)
+    else
+      ssl.credentials("ssl_certs/tserver-certificate.pem", 
"ssl_certs/tserver-private-key.pem", cert_passsword)
+    end
+    ssl.allow_unsecured_client # SSL is optional, this is not secure.
+    @connection_options[:ssl_domain] = ssl if ssl
+  rescue
+    # Don't worry if we can't set up SSL.
+  end
+
+  def on_open(l)
+    STDOUT.puts "Listening on #{l}\n"; STDOUT.flush
+  end
+
+  # Create a new BrokerHandler instance for each connection we accept
+  def on_accept(l)
+    { :handler => BrokerHandler.new(self) }.update(@connection_options)
+  end
+
+  def queue(address)
+    @queues[address] ||= MessageQueue.new
   end
 
 end
@@ -164,4 +167,9 @@ Start an example broker listening on URL"
   return 1
 end
 url, = ARGV
-Qpid::Proton::Container.new(Broker.new(url)).run
+container = Qpid::Proton::Container.new
+container.listen(url, Broker.new)
+
+# Run the container in multiple threads.
+threads = 4.times.map { Thread.new {  container.run }}
+threads.each { |t| t.join }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8694821e/proton-c/bindings/ruby/README.rdoc
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/README.rdoc 
b/proton-c/bindings/ruby/README.rdoc
index 5eec00f..ab623bb 100644
--- a/proton-c/bindings/ruby/README.rdoc
+++ b/proton-c/bindings/ruby/README.rdoc
@@ -118,4 +118,25 @@ connection and link can be reestablished) but it is 
possible for multiple copies
 of the same message are delivered, so the receiver must be aware of that. This
 is known as _at_least_once_ reliability.
 
+== Multi-threaded applications
 
+{Qpid::Proton::Container#run} can be called by multiple threads concurrently,
+giving the container a thread-pool to execute handler methods in parallel.
+
+Instances of {Qpid::Proton::Connection} and objects associated with it
+({Qpid::Proton::Session}, {Qpid::Proton::Sender}, {Qpid::Proton::Receiver},
+{Qpid::Proton::Delivery}, {Qpid::Proton::Tracker}) are not thread-safe and must
+be used correctly when multiple threads call {Qpid::Proton::Container#run}
+
+Calls to {Qpid::Proton::MessagingHandler} and {Qpid::Proton::Listener::Handler}
+methods by the {Qpid::Proton::Container} are automatically serialized for each
+connection instance.
+
+Other threads may have code similarly serialized by adding it to the
+{Qpid::Proton::Connection#work_queue} for the connection.  Each object related
+to a {Qpid::Proton::Connection} also provides a +work_queue+ method.
+
+You also need to use the {Qpid::Proton::WorkQueue} to communicate between a
+{Qpid::Proton::MessagingHandler} method call for one connection instance, and a
+different {Qpid::Proton::Connection} instance in the same container, as 
separate
+connections can be processed in parallel.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8694821e/proton-c/bindings/ruby/lib/core/connection.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/connection.rb 
b/proton-c/bindings/ruby/lib/core/connection.rb
index c0d161e..dc2590f 100644
--- a/proton-c/bindings/ruby/lib/core/connection.rb
+++ b/proton-c/bindings/ruby/lib/core/connection.rb
@@ -288,8 +288,7 @@ module Qpid::Proton
       @link_prefix + "/" +  (@link_count += 1).to_s(32)
     end
 
-    # @return [WorkQueue] work queue for code that should be run in the thread
-    # context for this connection
+    # @return [WorkQueue] work queue to execute code serialized correctly for 
this connection
     attr_reader :work_queue
 
     protected

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8694821e/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb 
b/proton-c/bindings/ruby/lib/core/container.rb
index 78f8013..85dbe69 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -59,6 +59,7 @@ module Qpid::Proton
       when 1 then
         @id = String.try_convert(args[0]) || (args[0].to_s if args[0].is_a? 
Symbol)
         @handler = args[0] unless @id
+      when 0 then
       else raise ArgumentError, "wrong number of arguments (given 
#{args.size}, expected 0..2"
       end
       # Use an empty messaging adapter to give default behaviour if there's no 
global handler.
@@ -169,10 +170,11 @@ module Qpid::Proton
 
     # Run the container: wait for IO activity, dispatch events to handlers.
     #
-    # *Multiple threads* : More than one thread can call {#run} concurrently,
+    # *Multi-threaading* : More than one thread can call {#run} concurrently,
     # the container will use all {#run} threads as a thread pool. Calls to
     # {MessagingHandler} or {Listener::Handler} methods are serialized for each
-    # connection or listener, even if the container has multiple threads.
+    # connection or listener. See {WorkQueue} for coordinating with other
+    # threads.
     #
     # *Exceptions*: If any handler method raises an exception it will stop the
     # container, and the exception will be raised by all calls to {#run}. For

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8694821e/proton-c/bindings/ruby/lib/core/transfer.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/transfer.rb 
b/proton-c/bindings/ruby/lib/core/transfer.rb
index 4fc21a9..53adf69 100644
--- a/proton-c/bindings/ruby/lib/core/transfer.rb
+++ b/proton-c/bindings/ruby/lib/core/transfer.rb
@@ -88,6 +88,9 @@ module Qpid::Proton
     # @return [Transport] The parent connection's transport.
     def transport() self.connection.transport; end
 
+    # @return [WorkQueue] The parent connection's work-queue.
+    def work_queue() self.connection.work_queue; end
+
     # @deprecated internal use only
     proton_caller :writable?
     # @deprecated internal use only

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8694821e/proton-c/bindings/ruby/lib/core/work_queue.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/work_queue.rb 
b/proton-c/bindings/ruby/lib/core/work_queue.rb
index d08d883..d205a13 100644
--- a/proton-c/bindings/ruby/lib/core/work_queue.rb
+++ b/proton-c/bindings/ruby/lib/core/work_queue.rb
@@ -17,10 +17,24 @@
 
 module Qpid::Proton
 
-  # A queue of work items to be executed, possibly in a different thread.
+  # A thread-safe queue of work for multi-threaded programs.
+  #
+  # Instances of {Connection} and objects associated with it ({Session}, 
{Sender},
+  # {Receiver}, {Delivery}, {Tracker}) are not thread-safe and must be
+  # used correctly when multiple threads call {Container#run}
+  #
+  # Calls to {MessagingHandler} methods by the {Container} are automatically
+  # serialized for each connection instance. Other threads may have code
+  # similarly serialized by adding it to the {Connection#work_queue} for the
+  # connection.  Each object related to a {Connection} also provides a
+  # +work_queue+ method.
+  #
   class WorkQueue
 
-    # Add code to be executed by the WorkQueue immediately.
+    # Add code to be executed in series with other {Container} operations on 
the
+    # work queue's owner. The code will be executed as soon as possible.
+    #
+    # @note Thread Safe: may be called in any thread.
     # @param non_block [Boolean] if true raise {ThreadError} if the operation 
would block.
     # @yield [ ] the block will be invoked with no parameters in the 
{WorkQueue} context,
     #  which may be a different thread.
@@ -32,9 +46,12 @@ module Qpid::Proton
       @container.send :wake
     end
 
-    # Schedule work to be executed by the WorkQueue after a delay.
-    # Note that tasks scheduled after the WorkQueue closes will be silently 
dropped
+    # Schedule code to be executed after +delay+ seconds in series with other
+    # {Container} operations on the work queue's owner.
+    #
+    # Work scheduled for after the {WorkQueue} has closed will be silently 
dropped.
     #
+    # @note (see #add)
     # @param delay delay in seconds until the block is added to the queue.
     # @param (see #add)
     # @yield (see #add)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to