On Tue, 2015-02-24 at 07:59 -0500, Rafael Schloming wrote: > This commit doesn't make sense to me for a couple of reasons. First, the > way applications deal with the effects of events *is* by handling them. > Secondly, if there is a reason to exit the event processing loop prior to > processing all outstanding events, there is an API for doing that, you can > call pn_reactor_yield(...) in one of your event handlers. I'm not sure I > quite understand the reason for this commit, but I suspect there is a less > brittle fix. Can you explain what is going on here? As is, I believe this > change may introduce a potential stall.
Gotcha, will revert and replace with a yield as you suggest. The scenario is blocking use of proton, which is why it doesn't show up in most of the existing examples. The SyncRequestResponse utility uses a BlockingConnection to send a request, and has an on_message that records the response. It uses BlockingConnection.wait() to wait for the response, which calls reactor.process() in a loop until the response has appeared. The problem was that although on_message was called correctly by process, instead of returning control it carries on to process a quiesce event and blocks up to the reactor timeout before returning. Putting a reactor.yield in on_message will solve my problem just as well and with less clutter in the reactors process loop. We should document the need for it somewhere where we discuss blocking use of proton... > > --Rafael > > On Mon, Feb 23, 2015 at 4:31 PM, <[email protected]> wrote: > > > Repository: qpid-proton > > Updated Branches: > > refs/heads/master 20d574bc9 -> 1455f3257 > > > > > > PROTON-825: Reactor should yield before quiescing. > > > > The reactor needs to yield before quiescing so that the applications > > processing loop > > can handle the effects of any events processed so far before the reactor > > blocks > > for more. > > > > > > Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo > > Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1455f325 > > Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1455f325 > > Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1455f325 > > > > Branch: refs/heads/master > > Commit: 1455f32575921ac8ed90cce539273233058f2b70 > > Parents: 20d574b > > Author: Alan Conway <[email protected]> > > Authored: Mon Feb 23 16:15:45 2015 -0500 > > Committer: Alan Conway <[email protected]> > > Committed: Mon Feb 23 16:15:45 2015 -0500 > > > > ---------------------------------------------------------------------- > > examples/python/server.py | 5 +- > > examples/python/sync_client.py | 80 ++++++++------------------- > > proton-c/bindings/python/proton/utils.py | 2 +- > > proton-c/include/proton/reactor.h | 9 +++ > > proton-c/src/reactor/reactor.c | 5 ++ > > 5 files changed, 41 insertions(+), 60 deletions(-) > > ---------------------------------------------------------------------- > > > > > > > > http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1455f325/examples/python/server.py > > ---------------------------------------------------------------------- > > diff --git a/examples/python/server.py b/examples/python/server.py > > index fc9ac8d..a8d63f8 100755 > > --- a/examples/python/server.py > > +++ b/examples/python/server.py > > @@ -46,10 +46,11 @@ class Server(MessagingHandler): > > if not sender: > > sender = self.container.create_sender(self.conn, > > event.message.reply_to) > > self.senders[event.message.reply_to] = sender > > - sender.send(Message(address=event.message.reply_to, > > body=event.message.body.upper())) > > + sender.send(Message(address=event.message.reply_to, > > body=event.message.body.upper(), > > + correlation_id=event.message.correlation_id)) > > > > try: > > - Container(Server("localhost:5672", "examples")).run() > > + Container(Server("0.0.0.0:5672", "examples")).run() > > except KeyboardInterrupt: pass > > > > > > > > > > http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1455f325/examples/python/sync_client.py > > ---------------------------------------------------------------------- > > diff --git a/examples/python/sync_client.py > > b/examples/python/sync_client.py > > index 86cd7c2..82cd85f 100755 > > --- a/examples/python/sync_client.py > > +++ b/examples/python/sync_client.py > > @@ -24,65 +24,31 @@ Demonstrates the client side of the synchronous > > request-response pattern > > > > """ > > > > +import optparse > > from proton import Message, Url, ConnectionException, Timeout > > -from proton.utils import BlockingConnection > > +from proton.utils import SyncRequestResponse, BlockingConnection > > from proton.handlers import IncomingMessageHandler > > import sys > > > > -class SyncRequestClient(IncomingMessageHandler): > > - """ > > - Implementation of the synchronous request-responce (aka RPC) pattern. > > - Create an instance and call invoke() to send a request and wait for a > > response. > > - """ > > +parser = optparse.OptionParser(usage="usage: %prog [options]", > > + description="Send requests to the supplied > > address and print responses.") > > +parser.add_option("-a", "--address", default="localhost:5672/examples", > > + help="address to which messages are sent (default > > %default)") > > +parser.add_option("-t", "--timeout", type="float", default=5, > > + help="Give up after this time out (default %default)") > > +opts, args = parser.parse_args() > > + > > +url = Url(opts.address) > > +client = SyncRequestResponse(BlockingConnection(url, > > timeout=opts.timeout), url.path) > > + > > +try: > > + REQUESTS= ["Twas brillig, and the slithy toves", > > + "Did gire and gymble in the wabe.", > > + "All mimsy were the borogroves,", > > + "And the mome raths outgrabe."] > > + for request in REQUESTS: > > + response = client.call(Message(body=request)) > > + print "%s => %s" % (request, response.body) > > +finally: > > + client.connection.close() > > > > - def __init__(self, url, timeout=None): > > - """ > > - @param url: a proton.Url or a URL string of the form > > 'host:port/path' > > - host:port is used to connect, path is used to identify the > > remote messaging endpoint. > > - """ > > - super(SyncRequestClient, self).__init__() > > - self.connection = BlockingConnection(Url(url).defaults(), > > timeout=timeout) > > - self.sender = self.connection.create_sender(url.path) > > - # dynamic=true generates a unique address dynamically for this > > receiver. > > - # credit=1 because we want to receive 1 response message > > initially. > > - self.receiver = self.connection.create_receiver(None, > > dynamic=True, credit=1, handler=self) > > - self.response = None > > - > > - def invoke(self, request): > > - """Send a request, wait for and return the response""" > > - request.reply_to = self.reply_to > > - self.sender.send(request) > > - self.connection.wait(lambda: self.response, msg="Waiting for > > response") > > - response = self.response > > - self.response = None # Ready for next response. > > - self.receiver.flow(1) # Set up credit for the next response. > > - return response > > - > > - @property > > - def reply_to(self): > > - """Return the dynamic address of our receiver.""" > > - return self.receiver.remote_source.address > > - > > - def on_message(self, event): > > - """Called when we receive a message for our receiver.""" > > - self.response = event.message # Store the response > > - > > - def close(self): > > - self.connection.close() > > - > > - > > -if __name__ == '__main__': > > - url = Url("0.0.0.0/examples") > > - if len(sys.argv) > 1: url = Url(sys.argv[1]) > > - > > - invoker = SyncRequestClient(url, timeout=2) > > - try: > > - REQUESTS= ["Twas brillig, and the slithy toves", > > - "Did gire and gymble in the wabe.", > > - "All mimsy were the borogroves,", > > - "And the mome raths outgrabe."] > > - for request in REQUESTS: > > - response = invoker.invoke(Message(body=request)) > > - print "%s => %s" % (request, response.body) > > - finally: > > - invoker.close() > > > > > > http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1455f325/proton-c/bindings/python/proton/utils.py > > ---------------------------------------------------------------------- > > diff --git a/proton-c/bindings/python/proton/utils.py > > b/proton-c/bindings/python/proton/utils.py > > index a47ffcc..c09330c 100644 > > --- a/proton-c/bindings/python/proton/utils.py > > +++ b/proton-c/bindings/python/proton/utils.py > > @@ -221,7 +221,7 @@ class BlockingConnection(Handler): > > while self.container.process(): pass > > > > def wait(self, condition, timeout=False, msg=None): > > - """Call do_work until condition() is true""" > > + """Call process until condition() is true""" > > if timeout is False: > > timeout = self.timeout > > if timeout is None: > > > > > > http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1455f325/proton-c/include/proton/reactor.h > > ---------------------------------------------------------------------- > > diff --git a/proton-c/include/proton/reactor.h > > b/proton-c/include/proton/reactor.h > > index 36ee336..27e8fe4 100644 > > --- a/proton-c/include/proton/reactor.h > > +++ b/proton-c/include/proton/reactor.h > > @@ -79,6 +79,15 @@ PN_EXTERN pn_connection_t > > *pn_reactor_connection(pn_reactor_t *reactor, pn_handl > > PN_EXTERN int pn_reactor_wakeup(pn_reactor_t *reactor); > > PN_EXTERN void pn_reactor_start(pn_reactor_t *reactor); > > PN_EXTERN bool pn_reactor_quiesced(pn_reactor_t *reactor); > > + > > +/** > > + * Process any available events and return. If there are no events > > available, > > + * wait up to the timeout for more to arrive. > > + * > > + *@return True if there may be more events to process in future (there > > are still open > > + * connection, timers, etc.) False if there is nothing registered with > > the reactor > > + * that can produce further events. > > + */ > > PN_EXTERN bool pn_reactor_process(pn_reactor_t *reactor); > > PN_EXTERN void pn_reactor_stop(pn_reactor_t *reactor); > > PN_EXTERN void pn_reactor_run(pn_reactor_t *reactor); > > > > > > http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1455f325/proton-c/src/reactor/reactor.c > > ---------------------------------------------------------------------- > > diff --git a/proton-c/src/reactor/reactor.c > > b/proton-c/src/reactor/reactor.c > > index cd709b1..dde3766 100644 > > --- a/proton-c/src/reactor/reactor.c > > +++ b/proton-c/src/reactor/reactor.c > > @@ -421,6 +421,11 @@ bool pn_reactor_process(pn_reactor_t *reactor) { > > } > > } > > } > > + // If the next event will quiesce the reactor, then give the thread > > back > > + // before we process it so the application's processing thread can > > deal with > > + // the effects of events processed so far. > > + if (pn_reactor_quiesced(reactor)) > > + return true; > > } > > } > > > > > > > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: [email protected] > > For additional commands, e-mail: [email protected] > > > > --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
