This commit doesn't make sense to me for a couple of reasons. First, the way applications deal with the effects of events *is* by handling them. Secondly, if there is a reason to exit the event processing loop prior to processing all outstanding events, there is an API for doing that, you can call pn_reactor_yield(...) in one of your event handlers. I'm not sure I quite understand the reason for this commit, but I suspect there is a less brittle fix. Can you explain what is going on here? As is, I believe this change may introduce a potential stall.
--Rafael On Mon, Feb 23, 2015 at 4:31 PM, <[email protected]> wrote: > Repository: qpid-proton > Updated Branches: > refs/heads/master 20d574bc9 -> 1455f3257 > > > PROTON-825: Reactor should yield before quiescing. > > The reactor needs to yield before quiescing so that the applications > processing loop > can handle the effects of any events processed so far before the reactor > blocks > for more. > > > Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo > Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1455f325 > Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1455f325 > Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1455f325 > > Branch: refs/heads/master > Commit: 1455f32575921ac8ed90cce539273233058f2b70 > Parents: 20d574b > Author: Alan Conway <[email protected]> > Authored: Mon Feb 23 16:15:45 2015 -0500 > Committer: Alan Conway <[email protected]> > Committed: Mon Feb 23 16:15:45 2015 -0500 > > ---------------------------------------------------------------------- > examples/python/server.py | 5 +- > examples/python/sync_client.py | 80 ++++++++------------------- > proton-c/bindings/python/proton/utils.py | 2 +- > proton-c/include/proton/reactor.h | 9 +++ > proton-c/src/reactor/reactor.c | 5 ++ > 5 files changed, 41 insertions(+), 60 deletions(-) > ---------------------------------------------------------------------- > > > > http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1455f325/examples/python/server.py > ---------------------------------------------------------------------- > diff --git a/examples/python/server.py b/examples/python/server.py > index fc9ac8d..a8d63f8 100755 > --- a/examples/python/server.py > +++ b/examples/python/server.py > @@ -46,10 +46,11 @@ class Server(MessagingHandler): > if not sender: > sender = self.container.create_sender(self.conn, > event.message.reply_to) > self.senders[event.message.reply_to] = sender > - sender.send(Message(address=event.message.reply_to, > body=event.message.body.upper())) > + sender.send(Message(address=event.message.reply_to, > body=event.message.body.upper(), > + correlation_id=event.message.correlation_id)) > > try: > - Container(Server("localhost:5672", "examples")).run() > + Container(Server("0.0.0.0:5672", "examples")).run() > except KeyboardInterrupt: pass > > > > > http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1455f325/examples/python/sync_client.py > ---------------------------------------------------------------------- > diff --git a/examples/python/sync_client.py > b/examples/python/sync_client.py > index 86cd7c2..82cd85f 100755 > --- a/examples/python/sync_client.py > +++ b/examples/python/sync_client.py > @@ -24,65 +24,31 @@ Demonstrates the client side of the synchronous > request-response pattern > > """ > > +import optparse > from proton import Message, Url, ConnectionException, Timeout > -from proton.utils import BlockingConnection > +from proton.utils import SyncRequestResponse, BlockingConnection > from proton.handlers import IncomingMessageHandler > import sys > > -class SyncRequestClient(IncomingMessageHandler): > - """ > - Implementation of the synchronous request-responce (aka RPC) pattern. > - Create an instance and call invoke() to send a request and wait for a > response. > - """ > +parser = optparse.OptionParser(usage="usage: %prog [options]", > + description="Send requests to the supplied > address and print responses.") > +parser.add_option("-a", "--address", default="localhost:5672/examples", > + help="address to which messages are sent (default > %default)") > +parser.add_option("-t", "--timeout", type="float", default=5, > + help="Give up after this time out (default %default)") > +opts, args = parser.parse_args() > + > +url = Url(opts.address) > +client = SyncRequestResponse(BlockingConnection(url, > timeout=opts.timeout), url.path) > + > +try: > + REQUESTS= ["Twas brillig, and the slithy toves", > + "Did gire and gymble in the wabe.", > + "All mimsy were the borogroves,", > + "And the mome raths outgrabe."] > + for request in REQUESTS: > + response = client.call(Message(body=request)) > + print "%s => %s" % (request, response.body) > +finally: > + client.connection.close() > > - def __init__(self, url, timeout=None): > - """ > - @param url: a proton.Url or a URL string of the form > 'host:port/path' > - host:port is used to connect, path is used to identify the > remote messaging endpoint. > - """ > - super(SyncRequestClient, self).__init__() > - self.connection = BlockingConnection(Url(url).defaults(), > timeout=timeout) > - self.sender = self.connection.create_sender(url.path) > - # dynamic=true generates a unique address dynamically for this > receiver. > - # credit=1 because we want to receive 1 response message > initially. > - self.receiver = self.connection.create_receiver(None, > dynamic=True, credit=1, handler=self) > - self.response = None > - > - def invoke(self, request): > - """Send a request, wait for and return the response""" > - request.reply_to = self.reply_to > - self.sender.send(request) > - self.connection.wait(lambda: self.response, msg="Waiting for > response") > - response = self.response > - self.response = None # Ready for next response. > - self.receiver.flow(1) # Set up credit for the next response. > - return response > - > - @property > - def reply_to(self): > - """Return the dynamic address of our receiver.""" > - return self.receiver.remote_source.address > - > - def on_message(self, event): > - """Called when we receive a message for our receiver.""" > - self.response = event.message # Store the response > - > - def close(self): > - self.connection.close() > - > - > -if __name__ == '__main__': > - url = Url("0.0.0.0/examples") > - if len(sys.argv) > 1: url = Url(sys.argv[1]) > - > - invoker = SyncRequestClient(url, timeout=2) > - try: > - REQUESTS= ["Twas brillig, and the slithy toves", > - "Did gire and gymble in the wabe.", > - "All mimsy were the borogroves,", > - "And the mome raths outgrabe."] > - for request in REQUESTS: > - response = invoker.invoke(Message(body=request)) > - print "%s => %s" % (request, response.body) > - finally: > - invoker.close() > > > http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1455f325/proton-c/bindings/python/proton/utils.py > ---------------------------------------------------------------------- > diff --git a/proton-c/bindings/python/proton/utils.py > b/proton-c/bindings/python/proton/utils.py > index a47ffcc..c09330c 100644 > --- a/proton-c/bindings/python/proton/utils.py > +++ b/proton-c/bindings/python/proton/utils.py > @@ -221,7 +221,7 @@ class BlockingConnection(Handler): > while self.container.process(): pass > > def wait(self, condition, timeout=False, msg=None): > - """Call do_work until condition() is true""" > + """Call process until condition() is true""" > if timeout is False: > timeout = self.timeout > if timeout is None: > > > http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1455f325/proton-c/include/proton/reactor.h > ---------------------------------------------------------------------- > diff --git a/proton-c/include/proton/reactor.h > b/proton-c/include/proton/reactor.h > index 36ee336..27e8fe4 100644 > --- a/proton-c/include/proton/reactor.h > +++ b/proton-c/include/proton/reactor.h > @@ -79,6 +79,15 @@ PN_EXTERN pn_connection_t > *pn_reactor_connection(pn_reactor_t *reactor, pn_handl > PN_EXTERN int pn_reactor_wakeup(pn_reactor_t *reactor); > PN_EXTERN void pn_reactor_start(pn_reactor_t *reactor); > PN_EXTERN bool pn_reactor_quiesced(pn_reactor_t *reactor); > + > +/** > + * Process any available events and return. If there are no events > available, > + * wait up to the timeout for more to arrive. > + * > + *@return True if there may be more events to process in future (there > are still open > + * connection, timers, etc.) False if there is nothing registered with > the reactor > + * that can produce further events. > + */ > PN_EXTERN bool pn_reactor_process(pn_reactor_t *reactor); > PN_EXTERN void pn_reactor_stop(pn_reactor_t *reactor); > PN_EXTERN void pn_reactor_run(pn_reactor_t *reactor); > > > http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1455f325/proton-c/src/reactor/reactor.c > ---------------------------------------------------------------------- > diff --git a/proton-c/src/reactor/reactor.c > b/proton-c/src/reactor/reactor.c > index cd709b1..dde3766 100644 > --- a/proton-c/src/reactor/reactor.c > +++ b/proton-c/src/reactor/reactor.c > @@ -421,6 +421,11 @@ bool pn_reactor_process(pn_reactor_t *reactor) { > } > } > } > + // If the next event will quiesce the reactor, then give the thread > back > + // before we process it so the application's processing thread can > deal with > + // the effects of events processed so far. > + if (pn_reactor_quiesced(reactor)) > + return true; > } > } > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: [email protected] > For additional commands, e-mail: [email protected] > >
