This commit doesn't make sense to me for a couple of reasons. First, the
way applications deal with the effects of events *is* by handling them.
Secondly, if there is a reason to exit the event processing loop prior to
processing all outstanding events, there is an API for doing that, you can
call pn_reactor_yield(...) in one of your event handlers. I'm not sure I
quite understand the reason for this commit, but I suspect there is a less
brittle fix. Can you explain what is going on here? As is, I believe this
change may introduce a potential stall.

--Rafael

On Mon, Feb 23, 2015 at 4:31 PM, <[email protected]> wrote:

> Repository: qpid-proton
> Updated Branches:
>   refs/heads/master 20d574bc9 -> 1455f3257
>
>
> PROTON-825: Reactor should yield before quiescing.
>
> The reactor needs to yield before quiescing so that the applications
> processing loop
> can handle the effects of any events processed so far before the reactor
> blocks
> for more.
>
>
> Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
> Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1455f325
> Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1455f325
> Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1455f325
>
> Branch: refs/heads/master
> Commit: 1455f32575921ac8ed90cce539273233058f2b70
> Parents: 20d574b
> Author: Alan Conway <[email protected]>
> Authored: Mon Feb 23 16:15:45 2015 -0500
> Committer: Alan Conway <[email protected]>
> Committed: Mon Feb 23 16:15:45 2015 -0500
>
> ----------------------------------------------------------------------
>  examples/python/server.py                |  5 +-
>  examples/python/sync_client.py           | 80 ++++++++-------------------
>  proton-c/bindings/python/proton/utils.py |  2 +-
>  proton-c/include/proton/reactor.h        |  9 +++
>  proton-c/src/reactor/reactor.c           |  5 ++
>  5 files changed, 41 insertions(+), 60 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1455f325/examples/python/server.py
> ----------------------------------------------------------------------
> diff --git a/examples/python/server.py b/examples/python/server.py
> index fc9ac8d..a8d63f8 100755
> --- a/examples/python/server.py
> +++ b/examples/python/server.py
> @@ -46,10 +46,11 @@ class Server(MessagingHandler):
>          if not sender:
>              sender = self.container.create_sender(self.conn,
> event.message.reply_to)
>              self.senders[event.message.reply_to] = sender
> -        sender.send(Message(address=event.message.reply_to,
> body=event.message.body.upper()))
> +        sender.send(Message(address=event.message.reply_to,
> body=event.message.body.upper(),
> +                            correlation_id=event.message.correlation_id))
>
>  try:
> -    Container(Server("localhost:5672", "examples")).run()
> +    Container(Server("0.0.0.0:5672", "examples")).run()
>  except KeyboardInterrupt: pass
>
>
>
>
> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1455f325/examples/python/sync_client.py
> ----------------------------------------------------------------------
> diff --git a/examples/python/sync_client.py
> b/examples/python/sync_client.py
> index 86cd7c2..82cd85f 100755
> --- a/examples/python/sync_client.py
> +++ b/examples/python/sync_client.py
> @@ -24,65 +24,31 @@ Demonstrates the client side of the synchronous
> request-response pattern
>
>  """
>
> +import optparse
>  from proton import Message, Url, ConnectionException, Timeout
> -from proton.utils import BlockingConnection
> +from proton.utils import SyncRequestResponse, BlockingConnection
>  from proton.handlers import IncomingMessageHandler
>  import sys
>
> -class SyncRequestClient(IncomingMessageHandler):
> -    """
> -    Implementation of the synchronous request-responce (aka RPC) pattern.
> -    Create an instance and call invoke() to send a request and wait for a
> response.
> -    """
> +parser = optparse.OptionParser(usage="usage: %prog [options]",
> +                               description="Send requests to the supplied
> address and print responses.")
> +parser.add_option("-a", "--address", default="localhost:5672/examples",
> +                  help="address to which messages are sent (default
> %default)")
> +parser.add_option("-t", "--timeout", type="float", default=5,
> +                  help="Give up after this time out (default %default)")
> +opts, args = parser.parse_args()
> +
> +url = Url(opts.address)
> +client = SyncRequestResponse(BlockingConnection(url,
> timeout=opts.timeout), url.path)
> +
> +try:
> +    REQUESTS= ["Twas brillig, and the slithy toves",
> +               "Did gire and gymble in the wabe.",
> +               "All mimsy were the borogroves,",
> +               "And the mome raths outgrabe."]
> +    for request in REQUESTS:
> +        response = client.call(Message(body=request))
> +        print "%s => %s" % (request, response.body)
> +finally:
> +    client.connection.close()
>
> -    def __init__(self, url, timeout=None):
> -        """
> -        @param url: a proton.Url or a URL string of the form
> 'host:port/path'
> -            host:port is used to connect, path is used to identify the
> remote messaging endpoint.
> -        """
> -        super(SyncRequestClient, self).__init__()
> -        self.connection = BlockingConnection(Url(url).defaults(),
> timeout=timeout)
> -        self.sender = self.connection.create_sender(url.path)
> -        # dynamic=true generates a unique address dynamically for this
> receiver.
> -        # credit=1 because we want to receive 1 response message
> initially.
> -        self.receiver = self.connection.create_receiver(None,
> dynamic=True, credit=1, handler=self)
> -        self.response = None
> -
> -    def invoke(self, request):
> -        """Send a request, wait for and return the response"""
> -        request.reply_to = self.reply_to
> -        self.sender.send(request)
> -        self.connection.wait(lambda: self.response, msg="Waiting for
> response")
> -        response = self.response
> -        self.response = None    # Ready for next response.
> -        self.receiver.flow(1)   # Set up credit for the next response.
> -        return response
> -
> -    @property
> -    def reply_to(self):
> -        """Return the dynamic address of our receiver."""
> -        return self.receiver.remote_source.address
> -
> -    def on_message(self, event):
> -        """Called when we receive a message for our receiver."""
> -        self.response = event.message # Store the response
> -
> -    def close(self):
> -        self.connection.close()
> -
> -
> -if __name__ == '__main__':
> -    url = Url("0.0.0.0/examples")
> -    if len(sys.argv) > 1: url = Url(sys.argv[1])
> -
> -    invoker = SyncRequestClient(url, timeout=2)
> -    try:
> -        REQUESTS= ["Twas brillig, and the slithy toves",
> -                   "Did gire and gymble in the wabe.",
> -                   "All mimsy were the borogroves,",
> -                   "And the mome raths outgrabe."]
> -        for request in REQUESTS:
> -            response = invoker.invoke(Message(body=request))
> -            print "%s => %s" % (request, response.body)
> -    finally:
> -        invoker.close()
>
>
> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1455f325/proton-c/bindings/python/proton/utils.py
> ----------------------------------------------------------------------
> diff --git a/proton-c/bindings/python/proton/utils.py
> b/proton-c/bindings/python/proton/utils.py
> index a47ffcc..c09330c 100644
> --- a/proton-c/bindings/python/proton/utils.py
> +++ b/proton-c/bindings/python/proton/utils.py
> @@ -221,7 +221,7 @@ class BlockingConnection(Handler):
>          while self.container.process(): pass
>
>      def wait(self, condition, timeout=False, msg=None):
> -        """Call do_work until condition() is true"""
> +        """Call process until condition() is true"""
>          if timeout is False:
>              timeout = self.timeout
>          if timeout is None:
>
>
> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1455f325/proton-c/include/proton/reactor.h
> ----------------------------------------------------------------------
> diff --git a/proton-c/include/proton/reactor.h
> b/proton-c/include/proton/reactor.h
> index 36ee336..27e8fe4 100644
> --- a/proton-c/include/proton/reactor.h
> +++ b/proton-c/include/proton/reactor.h
> @@ -79,6 +79,15 @@ PN_EXTERN pn_connection_t
> *pn_reactor_connection(pn_reactor_t *reactor, pn_handl
>  PN_EXTERN int pn_reactor_wakeup(pn_reactor_t *reactor);
>  PN_EXTERN void pn_reactor_start(pn_reactor_t *reactor);
>  PN_EXTERN bool pn_reactor_quiesced(pn_reactor_t *reactor);
> +
> +/**
> + * Process any available events and return.  If there are no events
> available,
> + * wait up to the timeout for more to arrive.
> + *
> + *@return True if there may be more events to process in future (there
> are still open
> + * connection, timers, etc.) False if there is nothing registered with
> the reactor
> + * that can produce further events.
> + */
>  PN_EXTERN bool pn_reactor_process(pn_reactor_t *reactor);
>  PN_EXTERN void pn_reactor_stop(pn_reactor_t *reactor);
>  PN_EXTERN void pn_reactor_run(pn_reactor_t *reactor);
>
>
> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1455f325/proton-c/src/reactor/reactor.c
> ----------------------------------------------------------------------
> diff --git a/proton-c/src/reactor/reactor.c
> b/proton-c/src/reactor/reactor.c
> index cd709b1..dde3766 100644
> --- a/proton-c/src/reactor/reactor.c
> +++ b/proton-c/src/reactor/reactor.c
> @@ -421,6 +421,11 @@ bool pn_reactor_process(pn_reactor_t *reactor) {
>          }
>        }
>      }
> +    // If the next event will quiesce the reactor, then give the thread
> back
> +    // before we process it so the application's processing thread can
> deal with
> +    // the effects of events processed so far.
> +    if (pn_reactor_quiesced(reactor))
> +      return true;
>    }
>  }
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>

Reply via email to