Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 1ddb7d0de -> 1c3c1f5be


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1c3c1f5b/python/qpid_dispatch_internal/proton_future/handlers.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/proton_future/handlers.py 
b/python/qpid_dispatch_internal/proton_future/handlers.py
deleted file mode 100644
index badb4c8..0000000
--- a/python/qpid_dispatch_internal/proton_future/handlers.py
+++ /dev/null
@@ -1,442 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import heapq, os, Queue, re, socket, time, types
-from . import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url
-from . import Collector, Connection, Delivery, Described, Endpoint, Event, 
Link, Terminus, Timeout
-from . import Message, Handler, ProtonException, Transport, 
TransportException, ConnectionException
-from select import select
-
-class FlowController(Handler):
-    """
-    A handler that controls a configured credit window for associated
-    receivers.
-    """
-    def __init__(self, window=1):
-        self.window = window
-
-    def top_up(self, link):
-        delta = self.window - link.credit
-        link.flow(delta)
-
-    def on_link_local_open(self, event):
-        if event.link.is_receiver:
-            self.top_up(event.link)
-
-    def on_link_remote_open(self, event):
-        if event.link.is_receiver:
-            self.top_up(event.link)
-
-    def on_link_flow(self, event):
-        if event.link.is_receiver:
-            self.top_up(event.link)
-
-    def on_delivery(self, event):
-        if not event.delivery.released and event.delivery.link.is_receiver:
-            self.top_up(event.delivery.link)
-
-def nested_handlers(handlers):
-    # currently only allows for a single level of nesting
-    nested = []
-    for h in handlers:
-        nested.append(h)
-        if hasattr(h, 'handlers'):
-            nested.extend(getattr(h, 'handlers'))
-    return nested
-
-def add_nested_handler(handler, nested):
-    if hasattr(handler, 'handlers'):
-        getattr(handler, 'handlers').append(nested)
-    else:
-        handler.handlers = [nested]
-
-class ScopedHandler(Handler):
-    """
-    An internal handler that checks for handlers scoped to the engine
-    objects an event relates to. E.g it allows delivery, link, session
-    or connection scoped handlers that will only be called with events
-    for the object to which they are scoped.
-    """
-    scopes = ["delivery", "link", "session", "connection"]
-
-    def on_unhandled(self, method, args):
-        event = args[0]
-        if event.type in [Event.CONNECTION_FINAL, Event.SESSION_FINAL, 
Event.LINK_FINAL]:
-            return
-
-        objects = [getattr(event, attr) for attr in self.scopes if 
hasattr(event, attr) and getattr(event, attr)]
-        targets = [getattr(o, "context") for o in objects if hasattr(o, 
"context")]
-        handlers = [getattr(t, event.type.method) for t in 
nested_handlers(targets) if hasattr(t, event.type.method)]
-        for h in handlers:
-            h(event)
-
-
-class OutgoingMessageHandler(Handler):
-    """
-    A utility for simpler and more intuitive handling of delivery
-    events related to outgoing i.e. sent messages.
-    """
-    def __init__(self, auto_settle=True, delegate=None):
-        self.auto_settle = auto_settle
-        self.delegate = delegate
-
-    def on_link_flow(self, event):
-        if event.link.is_sender and event.link.credit:
-            self.on_credit(event)
-
-    def on_delivery(self, event):
-        dlv = event.delivery
-        if dlv.released: return
-        if dlv.link.is_sender and dlv.updated:
-            if dlv.remote_state == Delivery.ACCEPTED:
-                self.on_accepted(event)
-            elif dlv.remote_state == Delivery.REJECTED:
-                self.on_rejected(event)
-            elif dlv.remote_state == Delivery.RELEASED:
-                self.on_released(event)
-            elif dlv.remote_state == Delivery.MODIFIED:
-                self.on_modified(event)
-            if dlv.settled:
-                self.on_settled(event)
-            if self.auto_settle:
-                dlv.settle()
-
-    def on_credit(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_credit', event)
-
-    def on_accepted(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_accepted', event)
-
-    def on_rejected(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_rejected', event)
-
-    def on_released(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_released', event)
-
-    def on_modified(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_modified', event)
-
-    def on_settled(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_settled', event)
-
-def recv_msg(delivery):
-    msg = Message()
-    msg.decode(delivery.link.recv(delivery.pending))
-    delivery.link.advance()
-    return msg
-
-class Reject(ProtonException):
-  """
-  An exception that indicate a message should be rejected
-  """
-  pass
-
-class Acking(object):
-    def accept(self, delivery):
-        self.settle(delivery, Delivery.ACCEPTED)
-
-    def reject(self, delivery):
-        self.settle(delivery, Delivery.REJECTED)
-
-    def release(self, delivery, delivered=True):
-        if delivered:
-            self.settle(delivery, Delivery.MODIFIED)
-        else:
-            self.settle(delivery, Delivery.RELEASED)
-
-    def settle(self, delivery, state=None):
-        if state:
-            delivery.update(state)
-        delivery.settle()
-
-class IncomingMessageHandler(Handler, Acking):
-    """
-    A utility for simpler and more intuitive handling of delivery
-    events related to incoming i.e. received messages.
-    """
-
-    def __init__(self, auto_accept=True, delegate=None):
-        self.delegate = delegate
-        self.auto_accept = auto_accept
-
-    def on_delivery(self, event):
-        dlv = event.delivery
-        if dlv.released or not dlv.link.is_receiver: return
-        if dlv.readable and not dlv.partial:
-            event.message = recv_msg(dlv)
-            try:
-                self.on_message(event)
-                if self.auto_accept:
-                    dlv.update(Delivery.ACCEPTED)
-                    dlv.settle()
-            except Reject:
-                dlv.update(Delivery.REJECTED)
-                dlv.settle()
-        elif dlv.updated and dlv.settled:
-            self.on_settled(event)
-
-    def on_message(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_message', event)
-
-    def on_settled(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_settled', event)
-
-class EndpointStateHandler(Handler):
-    """
-    A utility that exposes 'endpoint' events i.e. the open/close for
-    links, sessions and connections in a more intuitive manner. A
-    XXX_opened method will be called when both local and remote peers
-    have opened the link, session or connection. This can be used to
-    confirm a locally initiated action for example. A XXX_opening
-    method will be called when the remote peer has requested an open
-    that was not initiated locally. By default this will simply open
-    locally, which then triggers the XXX_opened call. The same applies
-    to close.
-    """
-
-    def __init__(self, peer_close_is_error=False, delegate=None):
-        self.delegate = delegate
-        self.peer_close_is_error = peer_close_is_error
-
-    def is_local_open(self, endpoint):
-        return endpoint.state & Endpoint.LOCAL_ACTIVE
-
-    def is_local_uninitialised(self, endpoint):
-        return endpoint.state & Endpoint.LOCAL_UNINIT
-
-    def is_local_closed(self, endpoint):
-        return endpoint.state & Endpoint.LOCAL_CLOSED
-
-    def is_remote_open(self, endpoint):
-        return endpoint.state & Endpoint.REMOTE_ACTIVE
-
-    def is_remote_closed(self, endpoint):
-        return endpoint.state & Endpoint.REMOTE_CLOSED
-
-    def log_error(self, endpoint, endpoint_type):
-        pass
-        # TODO aconway 2014-12-11: log error message properly
-        # if endpoint.remote_condition:
-        #     print endpoint.remote_condition.description
-        # elif self.is_local_open(endpoint) and 
self.is_remote_closed(endpoint):
-        #     print "%s closed by peer" % endpoint_type
-
-    def on_link_remote_close(self, event):
-        if event.link.remote_condition:
-            self.on_link_error(event)
-        elif self.is_local_closed(event.link):
-            self.on_link_closed(event)
-        else:
-            self.on_link_closing(event)
-        event.link.close()
-
-    def on_session_remote_close(self, event):
-        if event.session.remote_condition:
-            self.on_session_error(event)
-        elif self.is_local_closed(event.session):
-            self.on_session_closed(event)
-        else:
-            self.on_session_closing(event)
-        event.session.close()
-
-    def on_connection_remote_close(self, event):
-        if event.connection.remote_condition:
-            self.on_connection_error(event)
-        elif self.is_local_closed(event.connection):
-            self.on_connection_closed(event)
-        else:
-            self.on_connection_closing(event)
-        event.connection.close()
-
-    def on_connection_local_open(self, event):
-        if self.is_remote_open(event.connection):
-            self.on_connection_opened(event)
-
-    def on_connection_remote_open(self, event):
-        if self.is_local_open(event.connection):
-            self.on_connection_opened(event)
-        elif self.is_local_uninitialised(event.connection):
-            self.on_connection_opening(event)
-            event.connection.open()
-
-    def on_session_local_open(self, event):
-        if self.is_remote_open(event.session):
-            self.on_session_opened(event)
-
-    def on_session_remote_open(self, event):
-        if self.is_local_open(event.session):
-            self.on_session_opened(event)
-        elif self.is_local_uninitialised(event.session):
-            self.on_session_opening(event)
-            event.session.open()
-
-    def on_link_local_open(self, event):
-        if self.is_remote_open(event.link):
-            self.on_link_opened(event)
-
-    def on_link_remote_open(self, event):
-        if self.is_local_open(event.link):
-            self.on_link_opened(event)
-        elif self.is_local_uninitialised(event.link):
-            self.on_link_opening(event)
-            event.link.open()
-
-    def on_connection_opened(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_connection_opened', event)
-
-    def on_session_opened(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_session_opened', event)
-
-    def on_link_opened(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_link_opened', event)
-
-    def on_connection_opening(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_connection_opening', event)
-
-    def on_session_opening(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_session_opening', event)
-
-    def on_link_opening(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_link_opening', event)
-
-    def on_connection_error(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_connection_error', event)
-        else:
-            self.log_error(event.connection, "connection")
-
-    def on_session_error(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_session_error', event)
-        else:
-            self.log_error(event.session, "session")
-            event.connection.close()
-
-    def on_link_error(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_link_error', event)
-        else:
-            self.log_error(event.link, "link")
-            event.connection.close()
-
-    def on_connection_closed(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_connection_closed', event)
-
-    def on_session_closed(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_session_closed', event)
-
-    def on_link_closed(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_link_closed', event)
-
-    def on_connection_closing(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_connection_closing', event)
-        elif self.peer_close_is_error:
-            self.on_connection_error(event)
-
-    def on_session_closing(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_session_closing', event)
-        elif self.peer_close_is_error:
-            self.on_session_error(event)
-
-    def on_link_closing(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_link_closing', event)
-        elif self.peer_close_is_error:
-            self.on_link_error(event)
-
-class MessagingHandler(Handler, Acking):
-    """
-    A general purpose handler that makes the proton-c events somewhat
-    simpler to deal with and.or avoids repetitive tasks for common use
-    cases.
-    """
-    def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, 
peer_close_is_error=False):
-        self.handlers = []
-        # FlowController if used needs to see event before
-        # IncomingMessageHandler, as the latter may involve the
-        # delivery being released
-        if prefetch:
-            self.handlers.append(FlowController(prefetch))
-        self.handlers.append(EndpointStateHandler(peer_close_is_error, self))
-        self.handlers.append(IncomingMessageHandler(auto_accept, self))
-        self.handlers.append(OutgoingMessageHandler(auto_settle, self))
-
-class TransactionalAcking(object):
-    def accept(self, delivery, transaction):
-        transaction.accept(delivery)
-
-class TransactionHandler(OutgoingMessageHandler, TransactionalAcking):
-    def __init__(self, auto_settle=True, delegate=None):
-        super(TransactionHandler, self).__init__(auto_settle, delegate)
-
-    def on_settled(self, event):
-        if hasattr(event.delivery, "transaction"):
-            event.transaction = event.delivery.transaction
-            event.delivery.transaction.handle_outcome(event)
-
-    def on_transaction_declared(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_transaction_declared', event)
-
-    def on_transaction_committed(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_transaction_committed', event)
-
-    def on_transaction_aborted(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_transaction_aborted', event)
-
-    def on_transaction_declare_failed(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_transaction_declare_failed', event)
-
-    def on_transaction_commit_failed(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_transaction_commit_failed', event)
-
-class TransactionalClientHandler(Handler, TransactionalAcking):
-    def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, 
peer_close_is_error=False):
-        super(TransactionalClientHandler, self).__init__()
-        self.handlers = []
-        # FlowController if used needs to see event before
-        # IncomingMessageHandler, as the latter may involve the
-        # delivery being released
-        if prefetch:
-            self.handlers.append(FlowController(prefetch))
-        self.handlers.append(EndpointStateHandler(peer_close_is_error, self))
-        self.handlers.append(IncomingMessageHandler(auto_accept, self))
-        self.handlers.append(TransactionHandler(auto_settle, self))

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1c3c1f5b/python/qpid_dispatch_internal/proton_future/reactors.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/proton_future/reactors.py 
b/python/qpid_dispatch_internal/proton_future/reactors.py
deleted file mode 100644
index 5312be1..0000000
--- a/python/qpid_dispatch_internal/proton_future/reactors.py
+++ /dev/null
@@ -1,833 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import os, Queue, socket, time, types
-from heapq import heappush, heappop, nsmallest
-from . import Collector, Connection, ConnectionException, Delivery, Described, 
dispatch
-from . import Endpoint, Event, EventType, generate_uuid, Handler, Link, Message
-from . import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, symbol
-from . import Terminus, Timeout, Transport, TransportException, ulong, Url
-from select import select
-from .handlers import nested_handlers, ScopedHandler
-
-class AmqpSocket(object):
-    """
-    Associates a transport with a connection and a socket and can be
-    used in an io loop to track the io for an AMQP 1.0 connection.
-    """
-
-    def __init__(self, conn, sock, events, heartbeat=None):
-        self.events = events
-        self.conn = conn
-        self.transport = Transport()
-        if heartbeat: self.transport.idle_timeout = heartbeat
-        self.transport.bind(self.conn)
-        self.socket = sock
-        self.socket.setblocking(0)
-        self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
-        self.write_done = False
-        self.read_done = False
-        self._closed = False
-
-    def accept(self, force_sasl=True):
-        if force_sasl:
-            sasl = self.transport.sasl()
-            sasl.mechanisms("ANONYMOUS")
-            sasl.server()
-            sasl.done(SASL.OK)
-        #TODO: use SASL anyway if requested by peer
-        return self
-
-    def connect(self, host, port=None, username=None, password=None, 
force_sasl=True):
-        if username and password:
-            sasl = self.transport.sasl()
-            sasl.plain(username, password)
-        elif force_sasl:
-            sasl = self.transport.sasl()
-            sasl.mechanisms('ANONYMOUS')
-            sasl.client()
-        try:
-            self.socket.connect_ex((host, port or 5672))
-        except socket.gaierror, e:
-            raise ConnectionException("Cannot resolve '%s': %s" % (host, e))
-        return self
-
-    def _closed_cleanly(self):
-        return self.conn.state & Endpoint.LOCAL_CLOSED and self.conn.state & 
Endpoint.REMOTE_CLOSED
-
-    def closed(self):
-        if not self._closed and self.write_done and self.read_done:
-            self.close()
-            return True
-        else:
-            return False
-
-    def close(self):
-        self.socket.close()
-        self._closed = True
-
-    def fileno(self):
-        return self.socket.fileno()
-
-    def reading(self):
-        if self.read_done: return False
-        c = self.transport.capacity()
-        if c > 0:
-            return True
-        elif c < 0:
-            self.read_done = True
-        return False
-
-    def writing(self):
-        if self.write_done: return False
-        try:
-            p = self.transport.pending()
-            if p > 0:
-                return True
-            elif p < 0:
-                self.write_done = True
-                return False
-            else: # p == 0
-                return False
-        except TransportException, e:
-            self.write_done = True
-            return False
-
-    def readable(self):
-        c = self.transport.capacity()
-        if c > 0:
-            try:
-                data = self.socket.recv(c)
-                if data:
-                    self.transport.push(data)
-                else:
-                    if not self._closed_cleanly():
-                        self.read_done = True
-                        self.write_done = True
-                    else:
-                        self.transport.close_tail()
-            except TransportException, e:
-                # TODO aconway 2014-12-11: log error: print "Error on read: 
%s" % e 
-                self.read_done = True
-            except socket.error, e:
-                # TODO aconway 2014-12-11: log error: print "Error on recv: 
%s" % e
-                self.read_done = True
-                self.write_done = True
-        elif c < 0:
-            self.read_done = True
-
-    def writable(self):
-        try:
-            p = self.transport.pending()
-            if p > 0:
-                data = self.transport.peek(p)
-                n = self.socket.send(data)
-                self.transport.pop(n)
-            elif p < 0:
-                self.write_done = True
-        except TransportException, e:
-            # TODO aconway 2014-12-11: log error: print "Error on write: %s" % 
e
-            self.write_done = True
-        except socket.error, e:
-            # TODO aconway 2014-12-11: log error: print "Error on send: %s" % e
-            self.write_done = True
-
-    def removed(self):
-        if not self._closed_cleanly():
-            self.transport.unbind()
-            self.events.dispatch(ApplicationEvent("disconnected", 
connection=self.conn))
-
-    def tick(self):
-        t = self.transport.tick(time.time())
-        if t: return t
-        else: return None
-
-class AmqpAcceptor:
-    """
-    Listens for incoming sockets, creates an AmqpSocket for them and
-    adds that to the list of tracked 'selectables'. The acceptor can
-    itself be added to an io loop.
-    """
-
-    def __init__(self, events, loop, host, port):
-        self.events = events
-        self.loop = loop
-        self.socket = socket.socket()
-        self.socket.setblocking(0)
-        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-        self.socket.bind((host, port))
-        self.socket.listen(5)
-        self.loop.add(self)
-        self._closed = False
-
-    def closed(self):
-        if self._closed:
-            self.socket.close()
-            return True
-        else:
-            return False
-
-    def close(self):
-        self._closed = True
-
-    def fileno(self):
-        return self.socket.fileno()
-
-    def reading(self):
-        return not self._closed
-
-    def writing(self):
-        return False
-
-    def readable(self):
-        sock, addr = self.socket.accept()
-        if sock:
-            self.loop.add(AmqpSocket(self.events.connection(), sock, 
self.events).accept())
-
-    def removed(self): pass
-    def tick(self): return None
-
-
-class EventInjector(object):
-    """
-    Can be added to an io loop to allow events to be triggered by an
-    external thread but handled on the event thread associated with
-    the loop.
-    """
-    def __init__(self, collector):
-        self.collector = collector
-        self.queue = Queue.Queue()
-        self.pipe = os.pipe()
-        self._closed = False
-
-    def trigger(self, event):
-        self.queue.put(event)
-        os.write(self.pipe[1], "!")
-
-    def closed(self):
-        return self._closed and self.queue.empty()
-
-    def close(self):
-        self._closed = True
-
-    def fileno(self):
-        return self.pipe[0]
-
-    def reading(self):
-        return True
-
-    def writing(self):
-        return False
-
-    def readable(self):
-        os.read(self.pipe[0], 512)
-        while not self.queue.empty():
-            event = self.queue.get()
-            self.collector.put(event.context, event.type)
-
-    def removed(self): pass
-    def tick(self): return None
-
-class PQueue:
-
-    def __init__(self):
-        self.entries = []
-
-    def add(self, priority, task):
-        heappush(self.entries, (priority, task))
-
-    def peek(self):
-        if self.entries:
-            return nsmallest(1, self.entries)[0]
-        else:
-            return None
-
-    def pop(self):
-        if self.entries:
-            return heappop(self.entries)
-        else:
-            return None
-
-    def __nonzero__(self):
-        if self.entries:
-            return True
-        else:
-            return False
-
-class Timer:
-    def __init__(self, collector):
-        self.collector = collector
-        self.events = PQueue()
-
-    def schedule(self, deadline, event):
-        self.events.add(deadline, event)
-
-    def tick(self):
-        while self.events:
-            deadline, event = self.events.peek()
-            if time.time() > deadline:
-                self.events.pop()
-                self.collector.put(event.context, event.type)
-            else:
-                return deadline
-        return None
-
-    @property
-    def pending(self):
-        return bool(self.events)
-
-class Events(object):
-    def __init__(self, *handlers):
-        self.collector = Collector()
-        self.timer = Timer(self.collector)
-        self.handlers = handlers
-
-    def connection(self):
-        conn = Connection()
-        conn.collect(self.collector)
-        return conn
-
-    def process(self):
-        result = False
-        while True:
-            ev = self.collector.peek()
-            if ev:
-                self.dispatch(ev)
-                self.collector.pop()
-                result = True
-            else:
-                return result
-
-    def dispatch(self, event):
-        for h in self.handlers:
-            event.dispatch(h)
-
-    @property
-    def empty(self):
-        return self.collector.peek() == None and not self.timer.pending
-
-class Names(object):
-    def __init__(self, base=10000):
-        self.names = []
-        self.base = base
-
-    def number(self, name):
-        if name not in self.names:
-            self.names.append(name)
-        return self.names.index(name) + self.base
-
-class ExtendedEventType(EventType):
-    USED = Names()
-    """
-    Event type identifier for events defined outside the proton-c
-    library
-    """
-    def __init__(self, name, number=None):
-        super(ExtendedEventType, self).__init__(number or 
ExtendedEventType.USED.number(name), "on_%s" % name)
-        self.name = name
-
-class ApplicationEventContext(object):
-    def __init__(self, connection=None, session=None, link=None, 
delivery=None, subject=None):
-        self.connection = connection
-        self.session = session
-        self.link = link
-        self.delivery = delivery
-        if self.delivery:
-            self.link = self.delivery.link
-        if self.link:
-            self.session = self.link.session
-        if self.session:
-            self.connection = self.session.connection
-        self.subject = subject
-
-    def __repr__(self):
-        objects = [self.connection, self.session, self.link, self.delivery, 
self.subject]
-        return ", ".join([str(o) for o in objects if o is not None])
-
-class ApplicationEvent(Event):
-    """
-    Application defined event, which can optionally be associated with
-    an engine object and or an arbitrary subject
-    """
-    def __init__(self, typename, connection=None, session=None, link=None, 
delivery=None, subject=None):
-        super(ApplicationEvent, self).__init__(PN_PYREF, 
ApplicationEventContext(connection, session, link, delivery, subject), 
ExtendedEventType(typename))
-
-class StartEvent(ApplicationEvent):
-    def __init__(self, container):
-        super(StartEvent, self).__init__("start")
-        self.container = container
-
-def _min(a, b):
-    if a and b: return min(a, b)
-    elif a: return a
-    else: return b
-
-class SelectLoop(object):
-    """
-    An io loop based on select()
-    """
-    def __init__(self, events):
-        self.events = events
-        self.selectables = []
-        self._abort = False
-
-    def abort(self):
-        self._abort = True
-
-    def add(self, selectable):
-        self.selectables.append(selectable)
-
-    def remove(self, selectable):
-        self.selectables.remove(selectable)
-
-    @property
-    def redundant(self):
-        return self.events.empty and not self.selectables
-
-    @property
-    def aborted(self):
-        return self._abort
-
-    def run(self):
-        while not (self._abort or self.redundant):
-            self.do_work()
-
-    def do_work(self, timeout=None):
-        """@return True if some work was done, False if time-out expired"""
-        tick = self.events.timer.tick()
-
-        if self.events.process():
-            tick = self.events.timer.tick()
-            while self.events.process():
-                if self._abort: return
-                tick = self.events.timer.tick()
-            return True # Did work, let caller check their conditions, don't 
select.
-
-        stable = False
-        while not stable:
-            reading = []
-            writing = []
-            closed = []
-            for s in self.selectables:
-                if s.reading(): reading.append(s)
-                if s.writing(): writing.append(s)
-                if s.closed(): closed.append(s)
-                else: tick = _min(tick, s.tick())
-
-            for s in closed:
-                self.selectables.remove(s)
-                s.removed()
-            stable = len(closed) == 0
-
-        if self.redundant:
-            return False
-
-        if tick:
-            timeout = _min(tick - time.time(), timeout)
-        if timeout and timeout < 0:
-            timeout = 0
-        if reading or writing or timeout:
-            readable, writable, _ = select(reading, writing, [], timeout)
-            for s in self.selectables:
-                s.tick()
-            for s in readable:
-                s.readable()
-            for s in writable:
-                s.writable()
-
-            return bool(readable or writable)
-        else:
-            return False
-
-def delivery_tags():
-    count = 1
-    while True:
-        yield str(count)
-        count += 1
-
-def send_msg(sender, msg, tag=None, handler=None, transaction=None):
-    dlv = sender.delivery(tag or next(sender.tags))
-    if transaction:
-        dlv.local.data = [transaction.id]
-        dlv.update(0x34)
-    if handler:
-        dlv.context = handler
-    sender.send(msg.encode())
-    sender.advance()
-    return dlv
-
-def _send_msg(self, msg, tag=None, handler=None, transaction=None):
-    return send_msg(self, msg, tag, handler, transaction)
-
-
-class Transaction(object):
-    """
-    Class to track state of an AMQP 1.0 transaction.
-    """
-    def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
-        self.txn_ctrl = txn_ctrl
-        self.handler = handler
-        self.id = None
-        self._declare = None
-        self._discharge = None
-        self.failed = False
-        self._pending = []
-        self.settle_before_discharge = settle_before_discharge
-        self.declare()
-
-    def commit(self):
-        self.discharge(False)
-
-    def abort(self):
-        self.discharge(True)
-
-    def declare(self):
-        self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
-
-    def discharge(self, failed):
-        self.failed = failed
-        self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), 
[self.id, failed])
-
-    def _send_ctrl(self, descriptor, value):
-        delivery = self.txn_ctrl.send_msg(Message(body=Described(descriptor, 
value)), handler=self.handler)
-        delivery.transaction = self
-        return delivery
-
-    def accept(self, delivery):
-        self.update(delivery, PN_ACCEPTED)
-        if self.settle_before_discharge:
-            delivery.settle()
-        else:
-            self._pending.append(delivery)
-
-    def update(self, delivery, state=None):
-        if state:
-            delivery.local.data = [self.id, Described(ulong(state), [])]
-            delivery.update(0x34)
-
-    def _release_pending(self):
-        for d in self._pending:
-            d.update(Delivery.RELEASED)
-            d.settle()
-        self._clear_pending()
-
-    def _clear_pending(self):
-        self._pending = []
-
-    def handle_outcome(self, event):
-        if event.delivery == self._declare:
-            if event.delivery.remote.data:
-                self.id = event.delivery.remote.data[0]
-                self.handler.on_transaction_declared(event)
-            elif event.delivery.remote_state == Delivery.REJECTED:
-                self.handler.on_transaction_declare_failed(event)
-            else:
-                # TODO aconway 2014-12-11: log error: print "Unexpected 
outcome for declare: %s" % event.delivery.remote_state
-                self.handler.on_transaction_declare_failed(event)
-        elif event.delivery == self._discharge:
-            if event.delivery.remote_state == Delivery.REJECTED:
-                if not self.failed:
-                    self.handler.on_transaction_commit_failed(event)
-                    self._release_pending() # make this optional?
-            else:
-                if self.failed:
-                    self.handler.on_transaction_aborted(event)
-                    self._release_pending()
-                else:
-                    self.handler.on_transaction_committed(event)
-            self._clear_pending()
-
-class LinkOption(object):
-    """
-    Abstract interface for link configuration options
-    """
-    def apply(self, link):
-        """
-        Subclasses will implement any configuration logic in this
-        method
-        """
-        pass
-    def test(self, link):
-        """
-        Subclasses can override this to selectively apply an option
-        e.g. based on some link criteria
-        """
-        return True
-
-class AtMostOnce(LinkOption):
-    def apply(self, link):
-        link.snd_settle_mode = Link.SND_SETTLED
-
-class AtLeastOnce(LinkOption):
-    def apply(self, link):
-        link.snd_settle_mode = Link.SND_UNSETTLED
-        link.rcv_settle_mode = Link.RCV_FIRST
-
-class SenderOption(LinkOption):
-    def apply(self, sender): pass
-    def test(self, link): return link.is_sender
-
-class ReceiverOption(LinkOption):
-    def apply(self, receiver): pass
-    def test(self, link): return link.is_receiver
-
-class Filter(ReceiverOption):
-    def __init__(self, filter_set={}):
-        self.filter_set = filter_set
-
-    def apply(self, receiver):
-        receiver.source.filter.put_dict(self.filter_set)
-
-class Selector(Filter):
-    """
-    Configures a link with a message selector filter
-    """
-    def __init__(self, value, name='selector'):
-        super(Selector, self).__init__({symbol(name): 
Described(symbol('apache.org:selector-filter:string'), value)})
-
-def _apply_link_options(options, link):
-    if options:
-        if isinstance(options, list):
-            for o in options:
-                if o.test(link): o.apply(link)
-        else:
-            if options.test(link): options.apply(link)
-
-def _create_session(connection, handler=None):
-    session = connection.session()
-    session.open()
-    return session
-
-
-def _get_attr(target, name):
-    if hasattr(target, name):
-        return getattr(target, name)
-    else:
-        return None
-
-class SessionPerConnection(object):
-    def __init__(self):
-        self._default_session = None
-
-    def session(self, connection):
-        if not self._default_session:
-            self._default_session = _create_session(connection)
-            self._default_session.context = self
-        return self._default_session
-
-    def on_session_remote_close(self, event):
-        event.connection.close()
-        self._default_session = None
-
-class Connector(Handler):
-    """
-    Internal handler that triggers the necessary socket connect for an
-    opened connection.
-    """
-    def __init__(self, loop):
-        self.loop = loop
-
-    def _connect(self, connection):
-        host, port = connection.address.next()
-        heartbeat = connection.heartbeat if hasattr(connection, 'heartbeat') 
else None
-        self.loop.add(AmqpSocket(connection, socket.socket(), 
self.loop.events, heartbeat=heartbeat).connect(host, port))
-        connection._pin = None #connection is now referenced by AmqpSocket, so 
no need for circular reference
-
-    def on_connection_local_open(self, event):
-        if hasattr(event.connection, "address"):
-            self._connect(event.connection)
-
-    def on_connection_remote_open(self, event):
-        if hasattr(event.connection, "reconnect"):
-            event.connection.reconnect.reset()
-
-    def on_disconnected(self, event):
-        if hasattr(event.connection, "reconnect"):
-            event.connection._pin = event.connection #no longer referenced by 
AmqpSocket, so pin in memory with circular reference
-            delay = event.connection.reconnect.next()
-            if delay == 0:
-                # TODO aconway 2014-12-11: log error: print "Disconnected, 
reconnecting..."
-                self._connect(event.connection)
-            else:
-                # TODO aconway 2014-12-11: log error: print "Disconnected will 
try to reconnect after %s seconds" % delay
-                self.loop.schedule(time.time() + delay, 
connection=event.connection, subject=self)
-        else:
-            # TODO aconway 2014-12-11: log error: print "Disconnected"
-            pass
-
-    def on_timer(self, event):
-        if event.subject == self and event.connection:
-            self._connect(event.connection)
-
-class Backoff(object):
-    """
-    A reconnect strategy involving an increasing delay between
-    retries, up to a maximum or 10 seconds.
-    """
-    def __init__(self):
-        self.delay = 0
-
-    def reset(self):
-        self.delay = 0
-
-    def next(self):
-        current = self.delay
-        if current == 0:
-            self.delay = 0.1
-        else:
-            self.delay = min(10, 2*current)
-        return current
-
-class Urls(object):
-    def __init__(self, values):
-        self.values = [Url(v) for v in values]
-        self.i = iter(self.values)
-
-    def __iter__(self):
-        return self
-
-    def _as_pair(self, url):
-        return (url.host, url.port)
-
-    def next(self):
-        try:
-            return self._as_pair(self.i.next())
-        except StopIteration:
-            self.i = iter(self.values)
-            return self._as_pair(self.i.next())
-
-class Container(object):
-    def __init__(self, *handlers):
-        h = [Connector(self), ScopedHandler()]
-        h.extend(nested_handlers(handlers))
-        self.events = Events(*h)
-        self.loop = SelectLoop(self.events)
-        self.trigger = None
-        self.container_id = str(generate_uuid())
-
-    def connect(self, url=None, urls=None, address=None, handler=None, 
reconnect=None, heartbeat=None):
-        conn = self.events.connection()
-        conn._pin = conn #circular reference until the open event gets handled
-        if handler:
-            conn.context = handler
-        conn.container = self.container_id or str(generate_uuid())
-        conn.heartbeat = heartbeat
-        if url: conn.address = Urls([url])
-        elif urls: conn.address = Urls(urls)
-        elif address: conn.address = address
-        else: raise ValueError("One of url, urls or address required")
-        if reconnect:
-            conn.reconnect = reconnect
-        elif reconnect is None:
-            conn.reconnect = Backoff()
-        conn._session_policy = SessionPerConnection() #todo: make configurable
-        conn.open()
-        return conn
-
-    def _get_id(self, container, remote, local):
-        if local and remote: "%s-%s-%s" % (container, remote, local)
-        elif local: return "%s-%s" % (container, local)
-        elif remote: return "%s-%s" % (container, remote)
-        else: return "%s-%s" % (container, str(generate_uuid()))
-
-    def _get_session(self, context):
-        if isinstance(context, Url):
-            return self._get_session(self.connect(url=context))
-        elif isinstance(context, Session):
-            return context
-        elif isinstance(context, Connection):
-            if hasattr(context, '_session_policy'):
-                return context._session_policy.session(context)
-            else:
-                return _create_session(context)
-        else:
-            return context.session()
-
-    def create_sender(self, context, target=None, source=None, name=None, 
handler=None, tags=None, options=None):
-        if isinstance(context, basestring):
-            context = Url(context)
-        if isinstance(context, Url) and not target:
-            target = context.path
-        session = self._get_session(context)
-        snd = session.sender(name or 
self._get_id(session.connection.container, target, source))
-        if source:
-            snd.source.address = source
-        if target:
-            snd.target.address = target
-        if handler:
-            snd.context = handler
-        snd.tags = tags or delivery_tags()
-        snd.send_msg = types.MethodType(_send_msg, snd)
-        _apply_link_options(options, snd)
-        snd.open()
-        return snd
-
-    def create_receiver(self, context, source=None, target=None, name=None, 
dynamic=False, handler=None, options=None):
-        if isinstance(context, basestring):
-            context = Url(context)
-        if isinstance(context, Url) and not source:
-            source = context.path
-        session = self._get_session(context)
-        rcv = session.receiver(name or 
self._get_id(session.connection.container, source, target))
-        if source:
-            rcv.source.address = source
-        if dynamic:
-            rcv.source.dynamic = True
-        if target:
-            rcv.target.address = target
-        if handler:
-            rcv.context = handler
-        _apply_link_options(options, rcv)
-        rcv.open()
-        return rcv
-
-    def declare_transaction(self, context, handler=None, 
settle_before_discharge=False):
-        if not _get_attr(context, '_txn_ctrl'):
-            context._txn_ctrl = self.create_sender(context, None, 
name='txn-ctrl')
-            context._txn_ctrl.target.type = Terminus.COORDINATOR
-            
context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
-        return Transaction(context._txn_ctrl, handler, settle_before_discharge)
-
-    def listen(self, url):
-        host, port = Urls([url]).next()
-        return AmqpAcceptor(self.events, self, host, port)
-
-    def schedule(self, deadline, connection=None, session=None, link=None, 
delivery=None, subject=None):
-        self.events.timer.schedule(deadline, ApplicationEvent("timer", 
connection, session, link, delivery, subject))
-
-    def get_event_trigger(self):
-        if not self.trigger or self.trigger.closed():
-            self.trigger = EventInjector(self.events.collector)
-            self.add(self.trigger)
-        return self.trigger
-
-    def add(self, selectable):
-        self.loop.add(selectable)
-
-    def remove(self, selectable):
-        self.loop.remove(selectable)
-
-    def run(self):
-        self.events.dispatch(StartEvent(self))
-        self.loop.run()
-
-    def stop(self):
-        self.loop.abort()
-
-    def do_work(self, timeout=None):
-        return self.loop.do_work(timeout)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1c3c1f5b/python/qpid_dispatch_internal/proton_future/utils.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/proton_future/utils.py 
b/python/qpid_dispatch_internal/proton_future/utils.py
deleted file mode 100644
index 91e76ae..0000000
--- a/python/qpid_dispatch_internal/proton_future/utils.py
+++ /dev/null
@@ -1,173 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import Queue, socket, time, threading
-from . import ConnectionException, Endpoint, Handler, Message, Timeout, Url
-from .reactors import AmqpSocket, Container, Events, SelectLoop, send_msg
-from .handlers import ScopedHandler, IncomingMessageHandler
-
-class BlockingLink(object):
-    def __init__(self, connection, link):
-        self.connection = connection
-        self.link = link
-        self.connection.wait(lambda: not (self.link.state & 
Endpoint.REMOTE_UNINIT),
-                             msg="Opening link %s" % link.name)
-
-    def close(self):
-        self.connection.wait(not (self.link.state & Endpoint.REMOTE_ACTIVE),
-                             msg="Closing link %s" % link.name)
-
-    # Access to other link attributes.
-    def __getattr__(self, name): return getattr(self.link, name)
-
-class BlockingSender(BlockingLink):
-    def __init__(self, connection, sender):
-        super(BlockingSender, self).__init__(connection, sender)
-
-    def send_msg(self, msg):
-        delivery = send_msg(self.link, msg)
-        self.connection.wait(lambda: delivery.settled, msg="Sending on sender 
%s" % self.link.name)
-
-class BlockingReceiver(BlockingLink):
-    def __init__(self, connection, receiver, credit=1):
-        super(BlockingReceiver, self).__init__(connection, receiver)
-        if credit: receiver.flow(credit)
-
-class BlockingConnection(Handler):
-    """
-    A synchronous style connection wrapper.
-    """
-    def __init__(self, url, timeout=None, container=None):
-        self.timeout = timeout
-        self.container = container or Container()
-        self.url = Url(url).defaults()
-        self.conn = self.container.connect(url=self.url, handler=self)
-        self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
-                  msg="Opening connection")
-
-    def create_sender(self, address, handler=None):
-        return BlockingSender(self, self.container.create_sender(self.conn, 
address, handler=handler))
-
-    def create_receiver(self, address, credit=1, dynamic=False, handler=None):
-        return BlockingReceiver(
-            self, self.container.create_receiver(self.conn, address, 
dynamic=dynamic, handler=handler), credit=credit)
-
-    def close(self):
-        self.conn.close()
-        self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE),
-                  msg="Closing connection")
-
-    def run(self):
-        """ Hand control over to the event loop (e.g. if waiting indefinitely 
for incoming messages) """
-        self.container.run()
-
-    def wait(self, condition, timeout=False, msg=None):
-        """Call do_work until condition() is true"""
-        if timeout is False:
-            timeout = self.timeout
-        if timeout is None:
-            while not condition():
-                self.container.do_work()
-        else:
-            deadline = time.time() + timeout
-            while not condition():
-                if not self.container.do_work(deadline - time.time()):
-                    txt = "Connection %s timed out" % self.url
-                    if msg: txt += ": " + msg
-                    raise Timeout(txt)
-
-    def on_link_remote_close(self, event):
-        if event.link.state & Endpoint.LOCAL_ACTIVE:
-            self.closed(event.link.remote_condition)
-
-    def on_connection_remote_close(self, event):
-        if event.connection.state & Endpoint.LOCAL_ACTIVE:
-            self.closed(event.connection.remote_condition)
-
-    def on_disconnected(self, event):
-        raise ConnectionException("Connection %s disconnected" % self.url);
-
-    def closed(self, error=None):
-        txt = "Connection %s closed" % self.url
-        if error:
-            txt += " due to: %s" % error
-        else:
-            txt += " by peer"
-        raise ConnectionException(txt)
-
-
-def atomic_count(start=0, step=1):
-    """Thread-safe atomic count iterator"""
-    lock = threading.Lock()
-    count = start
-    while True:
-        with lock:
-            count += step;
-            yield count
-
-
-class SyncRequestResponse(IncomingMessageHandler):
-    """
-    Implementation of the synchronous request-responce (aka RPC) pattern.
-    Create an instance and call call(request) to send a request and wait for a 
response.
-    """
-
-    correlation_id = atomic_count()
-
-    def __init__(self, connection, address=None):
-        """
-        @param connection: A L{BlockingConnection}
-        @param address: Address for the sender.
-            If this is not specified, then each request must have an address 
set.
-        """
-        super(SyncRequestResponse, self).__init__()
-        self.connection = connection
-        self.address = address
-        self.sender = self.connection.create_sender(self.address)
-        # dynamic=true generates a unique address dynamically for this 
receiver.
-        # credit=1 because we want to receive 1 response message initially.
-        self.receiver = self.connection.create_receiver(None, dynamic=True, 
credit=1, handler=self)
-        self.response = None
-
-    def call(self, request):
-        """Send a request, wait for and return the response"""
-        if not self.address and not request.address:
-            raise ValueError("Request message has no address: %s" % request)
-        request.reply_to = self.reply_to
-        request.correlation_id = correlation_id = self.correlation_id.next()
-        self.sender.send_msg(request)
-        def wakeup():
-            return self.response and (self.response.correlation_id == 
correlation_id)
-        self.connection.wait(wakeup, msg="Waiting for response")
-        response = self.response
-        self.response = None    # Ready for next response.
-        self.receiver.flow(1)   # Set up credit for the next response.
-        return response
-
-    @property
-    def reply_to(self):
-        """Return the dynamic address of our receiver."""
-        return self.receiver.remote_source.address
-
-    def on_message(self, event):
-        """Called when we receive a message for our receiver."""
-        self.response = event.message
-
-    def close(self):
-        self.connection.close()

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1c3c1f5b/python/qpid_dispatch_internal/tools/command.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/tools/command.py 
b/python/qpid_dispatch_internal/tools/command.py
index 7b941ef..95077cd 100644
--- a/python/qpid_dispatch_internal/tools/command.py
+++ b/python/qpid_dispatch_internal/tools/command.py
@@ -25,11 +25,7 @@ import sys, json, optparse, os
 from collections import Sequence, Mapping
 from qpid_dispatch_site import VERSION
 from proton import SSLDomain, Url
-try:
-    from proton.utils import SyncRequestResponse, BlockingConnection
-except ImportError:
-    from qpid_dispatch_internal.proton_future.utils import 
SyncRequestResponse, BlockingConnection
-
+from proton.utils import SyncRequestResponse, BlockingConnection
 
 class UsageError(Exception):
     """


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to