Repository: qpid-dispatch Updated Branches: refs/heads/master 1ddb7d0de -> 1c3c1f5be
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1c3c1f5b/python/qpid_dispatch_internal/proton_future/handlers.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/proton_future/handlers.py b/python/qpid_dispatch_internal/proton_future/handlers.py deleted file mode 100644 index badb4c8..0000000 --- a/python/qpid_dispatch_internal/proton_future/handlers.py +++ /dev/null @@ -1,442 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import heapq, os, Queue, re, socket, time, types -from . import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url -from . import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout -from . import Message, Handler, ProtonException, Transport, TransportException, ConnectionException -from select import select - -class FlowController(Handler): - """ - A handler that controls a configured credit window for associated - receivers. - """ - def __init__(self, window=1): - self.window = window - - def top_up(self, link): - delta = self.window - link.credit - link.flow(delta) - - def on_link_local_open(self, event): - if event.link.is_receiver: - self.top_up(event.link) - - def on_link_remote_open(self, event): - if event.link.is_receiver: - self.top_up(event.link) - - def on_link_flow(self, event): - if event.link.is_receiver: - self.top_up(event.link) - - def on_delivery(self, event): - if not event.delivery.released and event.delivery.link.is_receiver: - self.top_up(event.delivery.link) - -def nested_handlers(handlers): - # currently only allows for a single level of nesting - nested = [] - for h in handlers: - nested.append(h) - if hasattr(h, 'handlers'): - nested.extend(getattr(h, 'handlers')) - return nested - -def add_nested_handler(handler, nested): - if hasattr(handler, 'handlers'): - getattr(handler, 'handlers').append(nested) - else: - handler.handlers = [nested] - -class ScopedHandler(Handler): - """ - An internal handler that checks for handlers scoped to the engine - objects an event relates to. E.g it allows delivery, link, session - or connection scoped handlers that will only be called with events - for the object to which they are scoped. - """ - scopes = ["delivery", "link", "session", "connection"] - - def on_unhandled(self, method, args): - event = args[0] - if event.type in [Event.CONNECTION_FINAL, Event.SESSION_FINAL, Event.LINK_FINAL]: - return - - objects = [getattr(event, attr) for attr in self.scopes if hasattr(event, attr) and getattr(event, attr)] - targets = [getattr(o, "context") for o in objects if hasattr(o, "context")] - handlers = [getattr(t, event.type.method) for t in nested_handlers(targets) if hasattr(t, event.type.method)] - for h in handlers: - h(event) - - -class OutgoingMessageHandler(Handler): - """ - A utility for simpler and more intuitive handling of delivery - events related to outgoing i.e. sent messages. - """ - def __init__(self, auto_settle=True, delegate=None): - self.auto_settle = auto_settle - self.delegate = delegate - - def on_link_flow(self, event): - if event.link.is_sender and event.link.credit: - self.on_credit(event) - - def on_delivery(self, event): - dlv = event.delivery - if dlv.released: return - if dlv.link.is_sender and dlv.updated: - if dlv.remote_state == Delivery.ACCEPTED: - self.on_accepted(event) - elif dlv.remote_state == Delivery.REJECTED: - self.on_rejected(event) - elif dlv.remote_state == Delivery.RELEASED: - self.on_released(event) - elif dlv.remote_state == Delivery.MODIFIED: - self.on_modified(event) - if dlv.settled: - self.on_settled(event) - if self.auto_settle: - dlv.settle() - - def on_credit(self, event): - if self.delegate: - dispatch(self.delegate, 'on_credit', event) - - def on_accepted(self, event): - if self.delegate: - dispatch(self.delegate, 'on_accepted', event) - - def on_rejected(self, event): - if self.delegate: - dispatch(self.delegate, 'on_rejected', event) - - def on_released(self, event): - if self.delegate: - dispatch(self.delegate, 'on_released', event) - - def on_modified(self, event): - if self.delegate: - dispatch(self.delegate, 'on_modified', event) - - def on_settled(self, event): - if self.delegate: - dispatch(self.delegate, 'on_settled', event) - -def recv_msg(delivery): - msg = Message() - msg.decode(delivery.link.recv(delivery.pending)) - delivery.link.advance() - return msg - -class Reject(ProtonException): - """ - An exception that indicate a message should be rejected - """ - pass - -class Acking(object): - def accept(self, delivery): - self.settle(delivery, Delivery.ACCEPTED) - - def reject(self, delivery): - self.settle(delivery, Delivery.REJECTED) - - def release(self, delivery, delivered=True): - if delivered: - self.settle(delivery, Delivery.MODIFIED) - else: - self.settle(delivery, Delivery.RELEASED) - - def settle(self, delivery, state=None): - if state: - delivery.update(state) - delivery.settle() - -class IncomingMessageHandler(Handler, Acking): - """ - A utility for simpler and more intuitive handling of delivery - events related to incoming i.e. received messages. - """ - - def __init__(self, auto_accept=True, delegate=None): - self.delegate = delegate - self.auto_accept = auto_accept - - def on_delivery(self, event): - dlv = event.delivery - if dlv.released or not dlv.link.is_receiver: return - if dlv.readable and not dlv.partial: - event.message = recv_msg(dlv) - try: - self.on_message(event) - if self.auto_accept: - dlv.update(Delivery.ACCEPTED) - dlv.settle() - except Reject: - dlv.update(Delivery.REJECTED) - dlv.settle() - elif dlv.updated and dlv.settled: - self.on_settled(event) - - def on_message(self, event): - if self.delegate: - dispatch(self.delegate, 'on_message', event) - - def on_settled(self, event): - if self.delegate: - dispatch(self.delegate, 'on_settled', event) - -class EndpointStateHandler(Handler): - """ - A utility that exposes 'endpoint' events i.e. the open/close for - links, sessions and connections in a more intuitive manner. A - XXX_opened method will be called when both local and remote peers - have opened the link, session or connection. This can be used to - confirm a locally initiated action for example. A XXX_opening - method will be called when the remote peer has requested an open - that was not initiated locally. By default this will simply open - locally, which then triggers the XXX_opened call. The same applies - to close. - """ - - def __init__(self, peer_close_is_error=False, delegate=None): - self.delegate = delegate - self.peer_close_is_error = peer_close_is_error - - def is_local_open(self, endpoint): - return endpoint.state & Endpoint.LOCAL_ACTIVE - - def is_local_uninitialised(self, endpoint): - return endpoint.state & Endpoint.LOCAL_UNINIT - - def is_local_closed(self, endpoint): - return endpoint.state & Endpoint.LOCAL_CLOSED - - def is_remote_open(self, endpoint): - return endpoint.state & Endpoint.REMOTE_ACTIVE - - def is_remote_closed(self, endpoint): - return endpoint.state & Endpoint.REMOTE_CLOSED - - def log_error(self, endpoint, endpoint_type): - pass - # TODO aconway 2014-12-11: log error message properly - # if endpoint.remote_condition: - # print endpoint.remote_condition.description - # elif self.is_local_open(endpoint) and self.is_remote_closed(endpoint): - # print "%s closed by peer" % endpoint_type - - def on_link_remote_close(self, event): - if event.link.remote_condition: - self.on_link_error(event) - elif self.is_local_closed(event.link): - self.on_link_closed(event) - else: - self.on_link_closing(event) - event.link.close() - - def on_session_remote_close(self, event): - if event.session.remote_condition: - self.on_session_error(event) - elif self.is_local_closed(event.session): - self.on_session_closed(event) - else: - self.on_session_closing(event) - event.session.close() - - def on_connection_remote_close(self, event): - if event.connection.remote_condition: - self.on_connection_error(event) - elif self.is_local_closed(event.connection): - self.on_connection_closed(event) - else: - self.on_connection_closing(event) - event.connection.close() - - def on_connection_local_open(self, event): - if self.is_remote_open(event.connection): - self.on_connection_opened(event) - - def on_connection_remote_open(self, event): - if self.is_local_open(event.connection): - self.on_connection_opened(event) - elif self.is_local_uninitialised(event.connection): - self.on_connection_opening(event) - event.connection.open() - - def on_session_local_open(self, event): - if self.is_remote_open(event.session): - self.on_session_opened(event) - - def on_session_remote_open(self, event): - if self.is_local_open(event.session): - self.on_session_opened(event) - elif self.is_local_uninitialised(event.session): - self.on_session_opening(event) - event.session.open() - - def on_link_local_open(self, event): - if self.is_remote_open(event.link): - self.on_link_opened(event) - - def on_link_remote_open(self, event): - if self.is_local_open(event.link): - self.on_link_opened(event) - elif self.is_local_uninitialised(event.link): - self.on_link_opening(event) - event.link.open() - - def on_connection_opened(self, event): - if self.delegate: - dispatch(self.delegate, 'on_connection_opened', event) - - def on_session_opened(self, event): - if self.delegate: - dispatch(self.delegate, 'on_session_opened', event) - - def on_link_opened(self, event): - if self.delegate: - dispatch(self.delegate, 'on_link_opened', event) - - def on_connection_opening(self, event): - if self.delegate: - dispatch(self.delegate, 'on_connection_opening', event) - - def on_session_opening(self, event): - if self.delegate: - dispatch(self.delegate, 'on_session_opening', event) - - def on_link_opening(self, event): - if self.delegate: - dispatch(self.delegate, 'on_link_opening', event) - - def on_connection_error(self, event): - if self.delegate: - dispatch(self.delegate, 'on_connection_error', event) - else: - self.log_error(event.connection, "connection") - - def on_session_error(self, event): - if self.delegate: - dispatch(self.delegate, 'on_session_error', event) - else: - self.log_error(event.session, "session") - event.connection.close() - - def on_link_error(self, event): - if self.delegate: - dispatch(self.delegate, 'on_link_error', event) - else: - self.log_error(event.link, "link") - event.connection.close() - - def on_connection_closed(self, event): - if self.delegate: - dispatch(self.delegate, 'on_connection_closed', event) - - def on_session_closed(self, event): - if self.delegate: - dispatch(self.delegate, 'on_session_closed', event) - - def on_link_closed(self, event): - if self.delegate: - dispatch(self.delegate, 'on_link_closed', event) - - def on_connection_closing(self, event): - if self.delegate: - dispatch(self.delegate, 'on_connection_closing', event) - elif self.peer_close_is_error: - self.on_connection_error(event) - - def on_session_closing(self, event): - if self.delegate: - dispatch(self.delegate, 'on_session_closing', event) - elif self.peer_close_is_error: - self.on_session_error(event) - - def on_link_closing(self, event): - if self.delegate: - dispatch(self.delegate, 'on_link_closing', event) - elif self.peer_close_is_error: - self.on_link_error(event) - -class MessagingHandler(Handler, Acking): - """ - A general purpose handler that makes the proton-c events somewhat - simpler to deal with and.or avoids repetitive tasks for common use - cases. - """ - def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False): - self.handlers = [] - # FlowController if used needs to see event before - # IncomingMessageHandler, as the latter may involve the - # delivery being released - if prefetch: - self.handlers.append(FlowController(prefetch)) - self.handlers.append(EndpointStateHandler(peer_close_is_error, self)) - self.handlers.append(IncomingMessageHandler(auto_accept, self)) - self.handlers.append(OutgoingMessageHandler(auto_settle, self)) - -class TransactionalAcking(object): - def accept(self, delivery, transaction): - transaction.accept(delivery) - -class TransactionHandler(OutgoingMessageHandler, TransactionalAcking): - def __init__(self, auto_settle=True, delegate=None): - super(TransactionHandler, self).__init__(auto_settle, delegate) - - def on_settled(self, event): - if hasattr(event.delivery, "transaction"): - event.transaction = event.delivery.transaction - event.delivery.transaction.handle_outcome(event) - - def on_transaction_declared(self, event): - if self.delegate: - dispatch(self.delegate, 'on_transaction_declared', event) - - def on_transaction_committed(self, event): - if self.delegate: - dispatch(self.delegate, 'on_transaction_committed', event) - - def on_transaction_aborted(self, event): - if self.delegate: - dispatch(self.delegate, 'on_transaction_aborted', event) - - def on_transaction_declare_failed(self, event): - if self.delegate: - dispatch(self.delegate, 'on_transaction_declare_failed', event) - - def on_transaction_commit_failed(self, event): - if self.delegate: - dispatch(self.delegate, 'on_transaction_commit_failed', event) - -class TransactionalClientHandler(Handler, TransactionalAcking): - def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False): - super(TransactionalClientHandler, self).__init__() - self.handlers = [] - # FlowController if used needs to see event before - # IncomingMessageHandler, as the latter may involve the - # delivery being released - if prefetch: - self.handlers.append(FlowController(prefetch)) - self.handlers.append(EndpointStateHandler(peer_close_is_error, self)) - self.handlers.append(IncomingMessageHandler(auto_accept, self)) - self.handlers.append(TransactionHandler(auto_settle, self)) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1c3c1f5b/python/qpid_dispatch_internal/proton_future/reactors.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/proton_future/reactors.py b/python/qpid_dispatch_internal/proton_future/reactors.py deleted file mode 100644 index 5312be1..0000000 --- a/python/qpid_dispatch_internal/proton_future/reactors.py +++ /dev/null @@ -1,833 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import os, Queue, socket, time, types -from heapq import heappush, heappop, nsmallest -from . import Collector, Connection, ConnectionException, Delivery, Described, dispatch -from . import Endpoint, Event, EventType, generate_uuid, Handler, Link, Message -from . import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, symbol -from . import Terminus, Timeout, Transport, TransportException, ulong, Url -from select import select -from .handlers import nested_handlers, ScopedHandler - -class AmqpSocket(object): - """ - Associates a transport with a connection and a socket and can be - used in an io loop to track the io for an AMQP 1.0 connection. - """ - - def __init__(self, conn, sock, events, heartbeat=None): - self.events = events - self.conn = conn - self.transport = Transport() - if heartbeat: self.transport.idle_timeout = heartbeat - self.transport.bind(self.conn) - self.socket = sock - self.socket.setblocking(0) - self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - self.write_done = False - self.read_done = False - self._closed = False - - def accept(self, force_sasl=True): - if force_sasl: - sasl = self.transport.sasl() - sasl.mechanisms("ANONYMOUS") - sasl.server() - sasl.done(SASL.OK) - #TODO: use SASL anyway if requested by peer - return self - - def connect(self, host, port=None, username=None, password=None, force_sasl=True): - if username and password: - sasl = self.transport.sasl() - sasl.plain(username, password) - elif force_sasl: - sasl = self.transport.sasl() - sasl.mechanisms('ANONYMOUS') - sasl.client() - try: - self.socket.connect_ex((host, port or 5672)) - except socket.gaierror, e: - raise ConnectionException("Cannot resolve '%s': %s" % (host, e)) - return self - - def _closed_cleanly(self): - return self.conn.state & Endpoint.LOCAL_CLOSED and self.conn.state & Endpoint.REMOTE_CLOSED - - def closed(self): - if not self._closed and self.write_done and self.read_done: - self.close() - return True - else: - return False - - def close(self): - self.socket.close() - self._closed = True - - def fileno(self): - return self.socket.fileno() - - def reading(self): - if self.read_done: return False - c = self.transport.capacity() - if c > 0: - return True - elif c < 0: - self.read_done = True - return False - - def writing(self): - if self.write_done: return False - try: - p = self.transport.pending() - if p > 0: - return True - elif p < 0: - self.write_done = True - return False - else: # p == 0 - return False - except TransportException, e: - self.write_done = True - return False - - def readable(self): - c = self.transport.capacity() - if c > 0: - try: - data = self.socket.recv(c) - if data: - self.transport.push(data) - else: - if not self._closed_cleanly(): - self.read_done = True - self.write_done = True - else: - self.transport.close_tail() - except TransportException, e: - # TODO aconway 2014-12-11: log error: print "Error on read: %s" % e - self.read_done = True - except socket.error, e: - # TODO aconway 2014-12-11: log error: print "Error on recv: %s" % e - self.read_done = True - self.write_done = True - elif c < 0: - self.read_done = True - - def writable(self): - try: - p = self.transport.pending() - if p > 0: - data = self.transport.peek(p) - n = self.socket.send(data) - self.transport.pop(n) - elif p < 0: - self.write_done = True - except TransportException, e: - # TODO aconway 2014-12-11: log error: print "Error on write: %s" % e - self.write_done = True - except socket.error, e: - # TODO aconway 2014-12-11: log error: print "Error on send: %s" % e - self.write_done = True - - def removed(self): - if not self._closed_cleanly(): - self.transport.unbind() - self.events.dispatch(ApplicationEvent("disconnected", connection=self.conn)) - - def tick(self): - t = self.transport.tick(time.time()) - if t: return t - else: return None - -class AmqpAcceptor: - """ - Listens for incoming sockets, creates an AmqpSocket for them and - adds that to the list of tracked 'selectables'. The acceptor can - itself be added to an io loop. - """ - - def __init__(self, events, loop, host, port): - self.events = events - self.loop = loop - self.socket = socket.socket() - self.socket.setblocking(0) - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.socket.bind((host, port)) - self.socket.listen(5) - self.loop.add(self) - self._closed = False - - def closed(self): - if self._closed: - self.socket.close() - return True - else: - return False - - def close(self): - self._closed = True - - def fileno(self): - return self.socket.fileno() - - def reading(self): - return not self._closed - - def writing(self): - return False - - def readable(self): - sock, addr = self.socket.accept() - if sock: - self.loop.add(AmqpSocket(self.events.connection(), sock, self.events).accept()) - - def removed(self): pass - def tick(self): return None - - -class EventInjector(object): - """ - Can be added to an io loop to allow events to be triggered by an - external thread but handled on the event thread associated with - the loop. - """ - def __init__(self, collector): - self.collector = collector - self.queue = Queue.Queue() - self.pipe = os.pipe() - self._closed = False - - def trigger(self, event): - self.queue.put(event) - os.write(self.pipe[1], "!") - - def closed(self): - return self._closed and self.queue.empty() - - def close(self): - self._closed = True - - def fileno(self): - return self.pipe[0] - - def reading(self): - return True - - def writing(self): - return False - - def readable(self): - os.read(self.pipe[0], 512) - while not self.queue.empty(): - event = self.queue.get() - self.collector.put(event.context, event.type) - - def removed(self): pass - def tick(self): return None - -class PQueue: - - def __init__(self): - self.entries = [] - - def add(self, priority, task): - heappush(self.entries, (priority, task)) - - def peek(self): - if self.entries: - return nsmallest(1, self.entries)[0] - else: - return None - - def pop(self): - if self.entries: - return heappop(self.entries) - else: - return None - - def __nonzero__(self): - if self.entries: - return True - else: - return False - -class Timer: - def __init__(self, collector): - self.collector = collector - self.events = PQueue() - - def schedule(self, deadline, event): - self.events.add(deadline, event) - - def tick(self): - while self.events: - deadline, event = self.events.peek() - if time.time() > deadline: - self.events.pop() - self.collector.put(event.context, event.type) - else: - return deadline - return None - - @property - def pending(self): - return bool(self.events) - -class Events(object): - def __init__(self, *handlers): - self.collector = Collector() - self.timer = Timer(self.collector) - self.handlers = handlers - - def connection(self): - conn = Connection() - conn.collect(self.collector) - return conn - - def process(self): - result = False - while True: - ev = self.collector.peek() - if ev: - self.dispatch(ev) - self.collector.pop() - result = True - else: - return result - - def dispatch(self, event): - for h in self.handlers: - event.dispatch(h) - - @property - def empty(self): - return self.collector.peek() == None and not self.timer.pending - -class Names(object): - def __init__(self, base=10000): - self.names = [] - self.base = base - - def number(self, name): - if name not in self.names: - self.names.append(name) - return self.names.index(name) + self.base - -class ExtendedEventType(EventType): - USED = Names() - """ - Event type identifier for events defined outside the proton-c - library - """ - def __init__(self, name, number=None): - super(ExtendedEventType, self).__init__(number or ExtendedEventType.USED.number(name), "on_%s" % name) - self.name = name - -class ApplicationEventContext(object): - def __init__(self, connection=None, session=None, link=None, delivery=None, subject=None): - self.connection = connection - self.session = session - self.link = link - self.delivery = delivery - if self.delivery: - self.link = self.delivery.link - if self.link: - self.session = self.link.session - if self.session: - self.connection = self.session.connection - self.subject = subject - - def __repr__(self): - objects = [self.connection, self.session, self.link, self.delivery, self.subject] - return ", ".join([str(o) for o in objects if o is not None]) - -class ApplicationEvent(Event): - """ - Application defined event, which can optionally be associated with - an engine object and or an arbitrary subject - """ - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None): - super(ApplicationEvent, self).__init__(PN_PYREF, ApplicationEventContext(connection, session, link, delivery, subject), ExtendedEventType(typename)) - -class StartEvent(ApplicationEvent): - def __init__(self, container): - super(StartEvent, self).__init__("start") - self.container = container - -def _min(a, b): - if a and b: return min(a, b) - elif a: return a - else: return b - -class SelectLoop(object): - """ - An io loop based on select() - """ - def __init__(self, events): - self.events = events - self.selectables = [] - self._abort = False - - def abort(self): - self._abort = True - - def add(self, selectable): - self.selectables.append(selectable) - - def remove(self, selectable): - self.selectables.remove(selectable) - - @property - def redundant(self): - return self.events.empty and not self.selectables - - @property - def aborted(self): - return self._abort - - def run(self): - while not (self._abort or self.redundant): - self.do_work() - - def do_work(self, timeout=None): - """@return True if some work was done, False if time-out expired""" - tick = self.events.timer.tick() - - if self.events.process(): - tick = self.events.timer.tick() - while self.events.process(): - if self._abort: return - tick = self.events.timer.tick() - return True # Did work, let caller check their conditions, don't select. - - stable = False - while not stable: - reading = [] - writing = [] - closed = [] - for s in self.selectables: - if s.reading(): reading.append(s) - if s.writing(): writing.append(s) - if s.closed(): closed.append(s) - else: tick = _min(tick, s.tick()) - - for s in closed: - self.selectables.remove(s) - s.removed() - stable = len(closed) == 0 - - if self.redundant: - return False - - if tick: - timeout = _min(tick - time.time(), timeout) - if timeout and timeout < 0: - timeout = 0 - if reading or writing or timeout: - readable, writable, _ = select(reading, writing, [], timeout) - for s in self.selectables: - s.tick() - for s in readable: - s.readable() - for s in writable: - s.writable() - - return bool(readable or writable) - else: - return False - -def delivery_tags(): - count = 1 - while True: - yield str(count) - count += 1 - -def send_msg(sender, msg, tag=None, handler=None, transaction=None): - dlv = sender.delivery(tag or next(sender.tags)) - if transaction: - dlv.local.data = [transaction.id] - dlv.update(0x34) - if handler: - dlv.context = handler - sender.send(msg.encode()) - sender.advance() - return dlv - -def _send_msg(self, msg, tag=None, handler=None, transaction=None): - return send_msg(self, msg, tag, handler, transaction) - - -class Transaction(object): - """ - Class to track state of an AMQP 1.0 transaction. - """ - def __init__(self, txn_ctrl, handler, settle_before_discharge=False): - self.txn_ctrl = txn_ctrl - self.handler = handler - self.id = None - self._declare = None - self._discharge = None - self.failed = False - self._pending = [] - self.settle_before_discharge = settle_before_discharge - self.declare() - - def commit(self): - self.discharge(False) - - def abort(self): - self.discharge(True) - - def declare(self): - self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None]) - - def discharge(self, failed): - self.failed = failed - self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed]) - - def _send_ctrl(self, descriptor, value): - delivery = self.txn_ctrl.send_msg(Message(body=Described(descriptor, value)), handler=self.handler) - delivery.transaction = self - return delivery - - def accept(self, delivery): - self.update(delivery, PN_ACCEPTED) - if self.settle_before_discharge: - delivery.settle() - else: - self._pending.append(delivery) - - def update(self, delivery, state=None): - if state: - delivery.local.data = [self.id, Described(ulong(state), [])] - delivery.update(0x34) - - def _release_pending(self): - for d in self._pending: - d.update(Delivery.RELEASED) - d.settle() - self._clear_pending() - - def _clear_pending(self): - self._pending = [] - - def handle_outcome(self, event): - if event.delivery == self._declare: - if event.delivery.remote.data: - self.id = event.delivery.remote.data[0] - self.handler.on_transaction_declared(event) - elif event.delivery.remote_state == Delivery.REJECTED: - self.handler.on_transaction_declare_failed(event) - else: - # TODO aconway 2014-12-11: log error: print "Unexpected outcome for declare: %s" % event.delivery.remote_state - self.handler.on_transaction_declare_failed(event) - elif event.delivery == self._discharge: - if event.delivery.remote_state == Delivery.REJECTED: - if not self.failed: - self.handler.on_transaction_commit_failed(event) - self._release_pending() # make this optional? - else: - if self.failed: - self.handler.on_transaction_aborted(event) - self._release_pending() - else: - self.handler.on_transaction_committed(event) - self._clear_pending() - -class LinkOption(object): - """ - Abstract interface for link configuration options - """ - def apply(self, link): - """ - Subclasses will implement any configuration logic in this - method - """ - pass - def test(self, link): - """ - Subclasses can override this to selectively apply an option - e.g. based on some link criteria - """ - return True - -class AtMostOnce(LinkOption): - def apply(self, link): - link.snd_settle_mode = Link.SND_SETTLED - -class AtLeastOnce(LinkOption): - def apply(self, link): - link.snd_settle_mode = Link.SND_UNSETTLED - link.rcv_settle_mode = Link.RCV_FIRST - -class SenderOption(LinkOption): - def apply(self, sender): pass - def test(self, link): return link.is_sender - -class ReceiverOption(LinkOption): - def apply(self, receiver): pass - def test(self, link): return link.is_receiver - -class Filter(ReceiverOption): - def __init__(self, filter_set={}): - self.filter_set = filter_set - - def apply(self, receiver): - receiver.source.filter.put_dict(self.filter_set) - -class Selector(Filter): - """ - Configures a link with a message selector filter - """ - def __init__(self, value, name='selector'): - super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)}) - -def _apply_link_options(options, link): - if options: - if isinstance(options, list): - for o in options: - if o.test(link): o.apply(link) - else: - if options.test(link): options.apply(link) - -def _create_session(connection, handler=None): - session = connection.session() - session.open() - return session - - -def _get_attr(target, name): - if hasattr(target, name): - return getattr(target, name) - else: - return None - -class SessionPerConnection(object): - def __init__(self): - self._default_session = None - - def session(self, connection): - if not self._default_session: - self._default_session = _create_session(connection) - self._default_session.context = self - return self._default_session - - def on_session_remote_close(self, event): - event.connection.close() - self._default_session = None - -class Connector(Handler): - """ - Internal handler that triggers the necessary socket connect for an - opened connection. - """ - def __init__(self, loop): - self.loop = loop - - def _connect(self, connection): - host, port = connection.address.next() - heartbeat = connection.heartbeat if hasattr(connection, 'heartbeat') else None - self.loop.add(AmqpSocket(connection, socket.socket(), self.loop.events, heartbeat=heartbeat).connect(host, port)) - connection._pin = None #connection is now referenced by AmqpSocket, so no need for circular reference - - def on_connection_local_open(self, event): - if hasattr(event.connection, "address"): - self._connect(event.connection) - - def on_connection_remote_open(self, event): - if hasattr(event.connection, "reconnect"): - event.connection.reconnect.reset() - - def on_disconnected(self, event): - if hasattr(event.connection, "reconnect"): - event.connection._pin = event.connection #no longer referenced by AmqpSocket, so pin in memory with circular reference - delay = event.connection.reconnect.next() - if delay == 0: - # TODO aconway 2014-12-11: log error: print "Disconnected, reconnecting..." - self._connect(event.connection) - else: - # TODO aconway 2014-12-11: log error: print "Disconnected will try to reconnect after %s seconds" % delay - self.loop.schedule(time.time() + delay, connection=event.connection, subject=self) - else: - # TODO aconway 2014-12-11: log error: print "Disconnected" - pass - - def on_timer(self, event): - if event.subject == self and event.connection: - self._connect(event.connection) - -class Backoff(object): - """ - A reconnect strategy involving an increasing delay between - retries, up to a maximum or 10 seconds. - """ - def __init__(self): - self.delay = 0 - - def reset(self): - self.delay = 0 - - def next(self): - current = self.delay - if current == 0: - self.delay = 0.1 - else: - self.delay = min(10, 2*current) - return current - -class Urls(object): - def __init__(self, values): - self.values = [Url(v) for v in values] - self.i = iter(self.values) - - def __iter__(self): - return self - - def _as_pair(self, url): - return (url.host, url.port) - - def next(self): - try: - return self._as_pair(self.i.next()) - except StopIteration: - self.i = iter(self.values) - return self._as_pair(self.i.next()) - -class Container(object): - def __init__(self, *handlers): - h = [Connector(self), ScopedHandler()] - h.extend(nested_handlers(handlers)) - self.events = Events(*h) - self.loop = SelectLoop(self.events) - self.trigger = None - self.container_id = str(generate_uuid()) - - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None): - conn = self.events.connection() - conn._pin = conn #circular reference until the open event gets handled - if handler: - conn.context = handler - conn.container = self.container_id or str(generate_uuid()) - conn.heartbeat = heartbeat - if url: conn.address = Urls([url]) - elif urls: conn.address = Urls(urls) - elif address: conn.address = address - else: raise ValueError("One of url, urls or address required") - if reconnect: - conn.reconnect = reconnect - elif reconnect is None: - conn.reconnect = Backoff() - conn._session_policy = SessionPerConnection() #todo: make configurable - conn.open() - return conn - - def _get_id(self, container, remote, local): - if local and remote: "%s-%s-%s" % (container, remote, local) - elif local: return "%s-%s" % (container, local) - elif remote: return "%s-%s" % (container, remote) - else: return "%s-%s" % (container, str(generate_uuid())) - - def _get_session(self, context): - if isinstance(context, Url): - return self._get_session(self.connect(url=context)) - elif isinstance(context, Session): - return context - elif isinstance(context, Connection): - if hasattr(context, '_session_policy'): - return context._session_policy.session(context) - else: - return _create_session(context) - else: - return context.session() - - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None): - if isinstance(context, basestring): - context = Url(context) - if isinstance(context, Url) and not target: - target = context.path - session = self._get_session(context) - snd = session.sender(name or self._get_id(session.connection.container, target, source)) - if source: - snd.source.address = source - if target: - snd.target.address = target - if handler: - snd.context = handler - snd.tags = tags or delivery_tags() - snd.send_msg = types.MethodType(_send_msg, snd) - _apply_link_options(options, snd) - snd.open() - return snd - - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None): - if isinstance(context, basestring): - context = Url(context) - if isinstance(context, Url) and not source: - source = context.path - session = self._get_session(context) - rcv = session.receiver(name or self._get_id(session.connection.container, source, target)) - if source: - rcv.source.address = source - if dynamic: - rcv.source.dynamic = True - if target: - rcv.target.address = target - if handler: - rcv.context = handler - _apply_link_options(options, rcv) - rcv.open() - return rcv - - def declare_transaction(self, context, handler=None, settle_before_discharge=False): - if not _get_attr(context, '_txn_ctrl'): - context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl') - context._txn_ctrl.target.type = Terminus.COORDINATOR - context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) - return Transaction(context._txn_ctrl, handler, settle_before_discharge) - - def listen(self, url): - host, port = Urls([url]).next() - return AmqpAcceptor(self.events, self, host, port) - - def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None): - self.events.timer.schedule(deadline, ApplicationEvent("timer", connection, session, link, delivery, subject)) - - def get_event_trigger(self): - if not self.trigger or self.trigger.closed(): - self.trigger = EventInjector(self.events.collector) - self.add(self.trigger) - return self.trigger - - def add(self, selectable): - self.loop.add(selectable) - - def remove(self, selectable): - self.loop.remove(selectable) - - def run(self): - self.events.dispatch(StartEvent(self)) - self.loop.run() - - def stop(self): - self.loop.abort() - - def do_work(self, timeout=None): - return self.loop.do_work(timeout) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1c3c1f5b/python/qpid_dispatch_internal/proton_future/utils.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/proton_future/utils.py b/python/qpid_dispatch_internal/proton_future/utils.py deleted file mode 100644 index 91e76ae..0000000 --- a/python/qpid_dispatch_internal/proton_future/utils.py +++ /dev/null @@ -1,173 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import Queue, socket, time, threading -from . import ConnectionException, Endpoint, Handler, Message, Timeout, Url -from .reactors import AmqpSocket, Container, Events, SelectLoop, send_msg -from .handlers import ScopedHandler, IncomingMessageHandler - -class BlockingLink(object): - def __init__(self, connection, link): - self.connection = connection - self.link = link - self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT), - msg="Opening link %s" % link.name) - - def close(self): - self.connection.wait(not (self.link.state & Endpoint.REMOTE_ACTIVE), - msg="Closing link %s" % link.name) - - # Access to other link attributes. - def __getattr__(self, name): return getattr(self.link, name) - -class BlockingSender(BlockingLink): - def __init__(self, connection, sender): - super(BlockingSender, self).__init__(connection, sender) - - def send_msg(self, msg): - delivery = send_msg(self.link, msg) - self.connection.wait(lambda: delivery.settled, msg="Sending on sender %s" % self.link.name) - -class BlockingReceiver(BlockingLink): - def __init__(self, connection, receiver, credit=1): - super(BlockingReceiver, self).__init__(connection, receiver) - if credit: receiver.flow(credit) - -class BlockingConnection(Handler): - """ - A synchronous style connection wrapper. - """ - def __init__(self, url, timeout=None, container=None): - self.timeout = timeout - self.container = container or Container() - self.url = Url(url).defaults() - self.conn = self.container.connect(url=self.url, handler=self) - self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), - msg="Opening connection") - - def create_sender(self, address, handler=None): - return BlockingSender(self, self.container.create_sender(self.conn, address, handler=handler)) - - def create_receiver(self, address, credit=1, dynamic=False, handler=None): - return BlockingReceiver( - self, self.container.create_receiver(self.conn, address, dynamic=dynamic, handler=handler), credit=credit) - - def close(self): - self.conn.close() - self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), - msg="Closing connection") - - def run(self): - """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ - self.container.run() - - def wait(self, condition, timeout=False, msg=None): - """Call do_work until condition() is true""" - if timeout is False: - timeout = self.timeout - if timeout is None: - while not condition(): - self.container.do_work() - else: - deadline = time.time() + timeout - while not condition(): - if not self.container.do_work(deadline - time.time()): - txt = "Connection %s timed out" % self.url - if msg: txt += ": " + msg - raise Timeout(txt) - - def on_link_remote_close(self, event): - if event.link.state & Endpoint.LOCAL_ACTIVE: - self.closed(event.link.remote_condition) - - def on_connection_remote_close(self, event): - if event.connection.state & Endpoint.LOCAL_ACTIVE: - self.closed(event.connection.remote_condition) - - def on_disconnected(self, event): - raise ConnectionException("Connection %s disconnected" % self.url); - - def closed(self, error=None): - txt = "Connection %s closed" % self.url - if error: - txt += " due to: %s" % error - else: - txt += " by peer" - raise ConnectionException(txt) - - -def atomic_count(start=0, step=1): - """Thread-safe atomic count iterator""" - lock = threading.Lock() - count = start - while True: - with lock: - count += step; - yield count - - -class SyncRequestResponse(IncomingMessageHandler): - """ - Implementation of the synchronous request-responce (aka RPC) pattern. - Create an instance and call call(request) to send a request and wait for a response. - """ - - correlation_id = atomic_count() - - def __init__(self, connection, address=None): - """ - @param connection: A L{BlockingConnection} - @param address: Address for the sender. - If this is not specified, then each request must have an address set. - """ - super(SyncRequestResponse, self).__init__() - self.connection = connection - self.address = address - self.sender = self.connection.create_sender(self.address) - # dynamic=true generates a unique address dynamically for this receiver. - # credit=1 because we want to receive 1 response message initially. - self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) - self.response = None - - def call(self, request): - """Send a request, wait for and return the response""" - if not self.address and not request.address: - raise ValueError("Request message has no address: %s" % request) - request.reply_to = self.reply_to - request.correlation_id = correlation_id = self.correlation_id.next() - self.sender.send_msg(request) - def wakeup(): - return self.response and (self.response.correlation_id == correlation_id) - self.connection.wait(wakeup, msg="Waiting for response") - response = self.response - self.response = None # Ready for next response. - self.receiver.flow(1) # Set up credit for the next response. - return response - - @property - def reply_to(self): - """Return the dynamic address of our receiver.""" - return self.receiver.remote_source.address - - def on_message(self, event): - """Called when we receive a message for our receiver.""" - self.response = event.message - - def close(self): - self.connection.close() http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1c3c1f5b/python/qpid_dispatch_internal/tools/command.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/tools/command.py b/python/qpid_dispatch_internal/tools/command.py index 7b941ef..95077cd 100644 --- a/python/qpid_dispatch_internal/tools/command.py +++ b/python/qpid_dispatch_internal/tools/command.py @@ -25,11 +25,7 @@ import sys, json, optparse, os from collections import Sequence, Mapping from qpid_dispatch_site import VERSION from proton import SSLDomain, Url -try: - from proton.utils import SyncRequestResponse, BlockingConnection -except ImportError: - from qpid_dispatch_internal.proton_future.utils import SyncRequestResponse, BlockingConnection - +from proton.utils import SyncRequestResponse, BlockingConnection class UsageError(Exception): """ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
