Repository: qpid-proton Updated Branches: refs/heads/master 71d75e953 -> 04d0d01b6
PROTON-805: Add dispatch request-response extension to utils.py Added SyncRequestResponse class to utils.py. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/04d0d01b Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/04d0d01b Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/04d0d01b Branch: refs/heads/master Commit: 04d0d01b6216ca55c3d4274954d46d874f27a8f2 Parents: 71d75e9 Author: Alan Conway <[email protected]> Authored: Fri Jan 23 15:33:08 2015 -0500 Committer: Alan Conway <[email protected]> Committed: Fri Jan 23 15:33:08 2015 -0500 ---------------------------------------------------------------------- proton-c/bindings/python/proton/__init__.py | 4 +- proton-c/bindings/python/proton/handlers.py | 8 +-- proton-c/bindings/python/proton/reactors.py | 12 ++-- proton-c/bindings/python/proton/utils.py | 78 ++++++++++++++++++++++-- 4 files changed, 86 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/04d0d01b/proton-c/bindings/python/proton/__init__.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py index 317a8b4..127ddd4 100644 --- a/proton-c/bindings/python/proton/__init__.py +++ b/proton-c/bindings/python/proton/__init__.py @@ -33,7 +33,7 @@ The proton APIs consist of the following classes: from cproton import * from wrapper import Wrapper -import weakref, re, socket, sys +import weakref, socket, sys try: import uuid except ImportError: @@ -68,7 +68,7 @@ except ImportError: def __hash__(self): return self.bytes.__hash__() - import os, random, socket, time + import os, random, time rand = random.Random() rand.seed((os.getpid(), time.time(), socket.gethostname())) def random_uuid(): http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/04d0d01b/proton-c/bindings/python/proton/handlers.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/handlers.py b/proton-c/bindings/python/proton/handlers.py index b18bda6..e403d26 100644 --- a/proton-c/bindings/python/proton/handlers.py +++ b/proton-c/bindings/python/proton/handlers.py @@ -243,7 +243,7 @@ class EndpointStateHandler(Handler): def print_error(cls, endpoint, endpoint_type): if endpoint.remote_condition: logging.error(endpoint.remote_condition.description) - elif self.is_local_open(endpoint) and self.is_remote_closed(endpoint): + elif cls.is_local_open(endpoint) and cls.is_remote_closed(endpoint): logging.error("%s closed by peer" % endpoint_type) def on_link_remote_close(self, event): @@ -334,20 +334,20 @@ class EndpointStateHandler(Handler): if self.delegate: dispatch(self.delegate, 'on_connection_error', event) else: - self.print_error(event.connection, "connection") + self.log_error(event.connection, "connection") def on_session_error(self, event): if self.delegate: dispatch(self.delegate, 'on_session_error', event) else: - self.print_error(event.session, "session") + self.log_error(event.session, "session") event.connection.close() def on_link_error(self, event): if self.delegate: dispatch(self.delegate, 'on_link_error', event) else: - self.print_error(event.link, "link") + self.log_error(event.link, "link") event.connection.close() def on_connection_closed(self, event): http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/04d0d01b/proton-c/bindings/python/proton/reactors.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py index 4e2664a..41e9543 100644 --- a/proton-c/bindings/python/proton/reactors.py +++ b/proton-c/bindings/python/proton/reactors.py @@ -408,9 +408,13 @@ class SelectLoop(object): def do_work(self, timeout=None): """@return True if some work was done, False if time-out expired""" tick = self.events.timer.tick() - while self.events.process(): - if self._abort: return + + if self.events.process(): tick = self.events.timer.tick() + while self.events.process(): + if self._abort: return + tick = self.events.timer.tick() + return True # Did work, let caller check their conditions, don't select. stable = False while not stable: @@ -429,7 +433,7 @@ class SelectLoop(object): stable = len(closed) == 0 if self.redundant: - return + return False if tick: timeout = _min(tick - time.time(), timeout) @@ -836,7 +840,7 @@ class Container(object): return self.loop.do_work(timeout) import traceback -from proton import WrappedHandler, _chandler, Connection, secs2millis, millis2secs, Selectable +from proton import WrappedHandler, _chandler, secs2millis, millis2secs, Selectable from wrapper import Wrapper from cproton import * http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/04d0d01b/proton-c/bindings/python/proton/utils.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py index f4740b5..7a35362 100644 --- a/proton-c/bindings/python/proton/utils.py +++ b/proton-c/bindings/python/proton/utils.py @@ -16,10 +16,10 @@ # specific language governing permissions and limitations # under the License. # -import collections, Queue, socket, time +import collections, Queue, socket, time, threading from proton import ConnectionException, Endpoint, Handler, Message, Timeout, Url from proton.reactors import AmqpSocket, Container, Events, SelectLoop, send_msg -from proton.handlers import MessagingHandler, ScopedHandler +from proton.handlers import MessagingHandler, ScopedHandler, IncomingMessageHandler class BlockingLink(object): def __init__(self, connection, link): @@ -82,10 +82,7 @@ class BlockingConnection(Handler): def __init__(self, url, timeout=None, container=None): self.timeout = timeout self.container = container or Container() - if isinstance(url, basestring): - self.url = Url(url) - else: - self.url = url + self.url = Url(url).defaults() self.conn = self.container.connect(url=self.url, handler=self) self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), msg="Opening connection") @@ -146,3 +143,72 @@ class BlockingConnection(Handler): else: txt += " by peer" raise ConnectionException(txt) + + +def atomic_count(start=0, step=1): + """Thread-safe atomic count iterator""" + lock = threading.Lock() + count = start + while True: + with lock: + count += step; + yield count + + +class SyncRequestResponse(IncomingMessageHandler): + """ + Implementation of the synchronous request-responce (aka RPC) pattern. + """ + + correlation_id = atomic_count() + + def __init__(self, connection, address=None): + """ + Send requests and receive responses. A single instance can send many requests + to the same or different addresses. + + @param connection: A L{BlockingConnection} + @param address: Address for all requests. + If not specified, each request must have the address property set. + Sucessive messages may have different addresses. + + @ivar address: Address for all requests, may be None. + @ivar connection: Connection for requests and responses. + """ + super(SyncRequestResponse, self).__init__() + self.connection = connection + self.address = address + self.sender = self.connection.create_sender(self.address) + # dynamic=true generates a unique address dynamically for this receiver. + # credit=1 because we want to receive 1 response message initially. + self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) + self.response = None + + def call(self, request): + """ + Send a request message, wait for and return the response message. + + @param request: A L{proton.Message}. If L{self.address} is not set the + L{request.address} must be set and will be used. + """ + if not self.address and not request.address: + raise ValueError("Request message has no address: %s" % request) + request.reply_to = self.reply_to + request.correlation_id = correlation_id = self.correlation_id.next() + self.sender.send_msg(request) + def wakeup(): + return self.response and (self.response.correlation_id == correlation_id) + self.connection.wait(wakeup, msg="Waiting for response") + response = self.response + self.response = None # Ready for next response. + self.receiver.flow(1) # Set up credit for the next response. + return response + + @property + def reply_to(self): + """Return the dynamic address of our receiver.""" + return self.receiver.remote_source.address + + def on_message(self, event): + """Called when we receive a message for our receiver.""" + self.response = event.message --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
