http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/python/proton/_compat.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/_compat.py 
b/proton-c/bindings/python/proton/_compat.py
deleted file mode 100644
index 4585dfc..0000000
--- a/proton-c/bindings/python/proton/_compat.py
+++ /dev/null
@@ -1,84 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-Utilities to help Proton support both python2 and python3.
-"""
-
-import sys
-import types
-IS_PY2 = sys.version_info[0] == 2
-IS_PY3 = sys.version_info[0] == 3
-
-if IS_PY3:
-    INT_TYPES = (int,)
-    TEXT_TYPES = (str,)
-    STRING_TYPES = (str,)
-    BINARY_TYPES = (bytes,)
-    CLASS_TYPES = (type,)
-
-    def raise_(t, v=None, tb=None):
-        """Mimic the old 2.x raise behavior:
-        Raise an exception of type t with value v using optional traceback tb
-        """
-        if v is None:
-            v = t()
-        if tb is None:
-            raise v
-        else:
-            raise v.with_traceback(tb)
-
-    def iteritems(d):
-        return iter(d.items())
-
-    def unichar(i):
-        return chr(i)
-
-    def str2bin(s, encoding='latin-1'):
-        """Convert str to binary type"""
-        return s.encode(encoding)
-
-    def str2unicode(s):
-        return s
-
-else:
-    INT_TYPES = (int, long)
-    TEXT_TYPES = (unicode,)
-    # includes both unicode and non-unicode strings:
-    STRING_TYPES = (basestring,)
-    BINARY_TYPES = (str,)
-    CLASS_TYPES = (type, types.ClassType)
-
-    # the raise syntax will cause a parse error in Py3, so 'sneak' in a
-    # definition that won't cause the parser to barf
-    exec("""def raise_(t, v=None, tb=None):
-    raise t, v, tb
-""")
-
-    def iteritems(d, **kw):
-        return d.iteritems()
-
-    def unichar(i):
-        return unichr(i)
-
-    def str2bin(s, encoding='latin-1'):
-        return s
-
-    def str2unicode(s):
-        return unicode(s, "unicode_escape")

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/python/proton/handlers.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/handlers.py 
b/proton-c/bindings/python/proton/handlers.py
deleted file mode 100644
index a689e65..0000000
--- a/proton-c/bindings/python/proton/handlers.py
+++ /dev/null
@@ -1,637 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import heapq, logging, os, re, socket, time, types
-
-from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, 
Url
-from proton import Collector, Connection, Delivery, Described, Endpoint, 
Event, Link, Terminus, Timeout
-from proton import Message, Handler, ProtonException, Transport, 
TransportException, ConnectionException
-from select import select
-
-
-class OutgoingMessageHandler(Handler):
-    """
-    A utility for simpler and more intuitive handling of delivery
-    events related to outgoing i.e. sent messages.
-    """
-    def __init__(self, auto_settle=True, delegate=None):
-        self.auto_settle = auto_settle
-        self.delegate = delegate
-
-    def on_link_flow(self, event):
-        if event.link.is_sender and event.link.credit \
-           and event.link.state & Endpoint.LOCAL_ACTIVE \
-           and event.link.state & Endpoint.REMOTE_ACTIVE :
-            self.on_sendable(event)
-
-    def on_delivery(self, event):
-        dlv = event.delivery
-        if dlv.link.is_sender and dlv.updated:
-            if dlv.remote_state == Delivery.ACCEPTED:
-                self.on_accepted(event)
-            elif dlv.remote_state == Delivery.REJECTED:
-                self.on_rejected(event)
-            elif dlv.remote_state == Delivery.RELEASED or dlv.remote_state == 
Delivery.MODIFIED:
-                self.on_released(event)
-            if dlv.settled:
-                self.on_settled(event)
-            if self.auto_settle:
-                dlv.settle()
-
-    def on_sendable(self, event):
-        """
-        Called when the sender link has credit and messages can
-        therefore be transferred.
-        """
-        if self.delegate != None:
-            dispatch(self.delegate, 'on_sendable', event)
-
-    def on_accepted(self, event):
-        """
-        Called when the remote peer accepts an outgoing message.
-        """
-        if self.delegate != None:
-            dispatch(self.delegate, 'on_accepted', event)
-
-    def on_rejected(self, event):
-        """
-        Called when the remote peer rejects an outgoing message.
-        """
-        if self.delegate != None:
-            dispatch(self.delegate, 'on_rejected', event)
-
-    def on_released(self, event):
-        """
-        Called when the remote peer releases an outgoing message. Note
-        that this may be in response to either the RELEASE or MODIFIED
-        state as defined by the AMQP specification.
-        """
-        if self.delegate != None:
-            dispatch(self.delegate, 'on_released', event)
-
-    def on_settled(self, event):
-        """
-        Called when the remote peer has settled the outgoing
-        message. This is the point at which it shouod never be
-        retransmitted.
-        """
-        if self.delegate != None:
-            dispatch(self.delegate, 'on_settled', event)
-
-def recv_msg(delivery):
-    msg = Message()
-    msg.decode(delivery.link.recv(delivery.pending))
-    delivery.link.advance()
-    return msg
-
-class Reject(ProtonException):
-  """
-  An exception that indicate a message should be rejected
-  """
-  pass
-
-class Release(ProtonException):
-  """
-  An exception that indicate a message should be rejected
-  """
-  pass
-
-class Acking(object):
-    def accept(self, delivery):
-        """
-        Accepts a received message.
-        """
-        self.settle(delivery, Delivery.ACCEPTED)
-
-    def reject(self, delivery):
-        """
-        Rejects a received message that is considered invalid or
-        unprocessable.
-        """
-        self.settle(delivery, Delivery.REJECTED)
-
-    def release(self, delivery, delivered=True):
-        """
-        Releases a received message, making it available at the source
-        for any (other) interested receiver. The ``delivered``
-        parameter indicates whether this should be considered a
-        delivery attempt (and the delivery count updated) or not.
-        """
-        if delivered:
-            self.settle(delivery, Delivery.MODIFIED)
-        else:
-            self.settle(delivery, Delivery.RELEASED)
-
-    def settle(self, delivery, state=None):
-        if state:
-            delivery.update(state)
-        delivery.settle()
-
-class IncomingMessageHandler(Handler, Acking):
-    """
-    A utility for simpler and more intuitive handling of delivery
-    events related to incoming i.e. received messages.
-    """
-
-    def __init__(self, auto_accept=True, delegate=None):
-        self.delegate = delegate
-        self.auto_accept = auto_accept
-
-    def on_delivery(self, event):
-        dlv = event.delivery
-        if not dlv.link.is_receiver: return
-        if dlv.readable and not dlv.partial:
-            event.message = recv_msg(dlv)
-            if event.link.state & Endpoint.LOCAL_CLOSED:
-                if self.auto_accept:
-                    dlv.update(Delivery.RELEASED)
-                    dlv.settle()
-            else:
-                try:
-                    self.on_message(event)
-                    if self.auto_accept:
-                        dlv.update(Delivery.ACCEPTED)
-                        dlv.settle()
-                except Reject:
-                    dlv.update(Delivery.REJECTED)
-                    dlv.settle()
-                except Release:
-                    dlv.update(Delivery.MODIFIED)
-                    dlv.settle()
-        elif dlv.updated and dlv.settled:
-            self.on_settled(event)
-
-    def on_message(self, event):
-        """
-        Called when a message is received. The message itself can be
-        obtained as a property on the event. For the purpose of
-        refering to this message in further actions (e.g. if
-        explicitly accepting it, the ``delivery`` should be used, also
-        obtainable via a property on the event.
-        """
-        if self.delegate != None:
-            dispatch(self.delegate, 'on_message', event)
-
-    def on_settled(self, event):
-        if self.delegate != None:
-            dispatch(self.delegate, 'on_settled', event)
-
-class EndpointStateHandler(Handler):
-    """
-    A utility that exposes 'endpoint' events i.e. the open/close for
-    links, sessions and connections in a more intuitive manner. A
-    XXX_opened method will be called when both local and remote peers
-    have opened the link, session or connection. This can be used to
-    confirm a locally initiated action for example. A XXX_opening
-    method will be called when the remote peer has requested an open
-    that was not initiated locally. By default this will simply open
-    locally, which then triggers the XXX_opened call. The same applies
-    to close.
-    """
-
-    def __init__(self, peer_close_is_error=False, delegate=None):
-        self.delegate = delegate
-        self.peer_close_is_error = peer_close_is_error
-
-    @classmethod
-    def is_local_open(cls, endpoint):
-        return endpoint.state & Endpoint.LOCAL_ACTIVE
-
-    @classmethod
-    def is_local_uninitialised(cls, endpoint):
-        return endpoint.state & Endpoint.LOCAL_UNINIT
-
-    @classmethod
-    def is_local_closed(cls, endpoint):
-        return endpoint.state & Endpoint.LOCAL_CLOSED
-
-    @classmethod
-    def is_remote_open(cls, endpoint):
-        return endpoint.state & Endpoint.REMOTE_ACTIVE
-
-    @classmethod
-    def is_remote_closed(cls, endpoint):
-        return endpoint.state & Endpoint.REMOTE_CLOSED
-
-    @classmethod
-    def print_error(cls, endpoint, endpoint_type):
-        if endpoint.remote_condition:
-            logging.error(endpoint.remote_condition.description or 
endpoint.remote_condition.name)
-        elif cls.is_local_open(endpoint) and cls.is_remote_closed(endpoint):
-            logging.error("%s closed by peer" % endpoint_type)
-
-    def on_link_remote_close(self, event):
-        if event.link.remote_condition:
-            self.on_link_error(event)
-        elif self.is_local_closed(event.link):
-            self.on_link_closed(event)
-        else:
-            self.on_link_closing(event)
-        event.link.close()
-
-    def on_session_remote_close(self, event):
-        if event.session.remote_condition:
-            self.on_session_error(event)
-        elif self.is_local_closed(event.session):
-            self.on_session_closed(event)
-        else:
-            self.on_session_closing(event)
-        event.session.close()
-
-    def on_connection_remote_close(self, event):
-        if event.connection.remote_condition:
-            self.on_connection_error(event)
-        elif self.is_local_closed(event.connection):
-           self.on_connection_closed(event)
-        else:
-            self.on_connection_closing(event)
-        event.connection.close()
-
-    def on_connection_local_open(self, event):
-        if self.is_remote_open(event.connection):
-            self.on_connection_opened(event)
-
-    def on_connection_remote_open(self, event):
-        if self.is_local_open(event.connection):
-            self.on_connection_opened(event)
-        elif self.is_local_uninitialised(event.connection):
-            self.on_connection_opening(event)
-            event.connection.open()
-
-    def on_session_local_open(self, event):
-        if self.is_remote_open(event.session):
-            self.on_session_opened(event)
-
-    def on_session_remote_open(self, event):
-        if self.is_local_open(event.session):
-            self.on_session_opened(event)
-        elif self.is_local_uninitialised(event.session):
-            self.on_session_opening(event)
-            event.session.open()
-
-    def on_link_local_open(self, event):
-        if self.is_remote_open(event.link):
-            self.on_link_opened(event)
-
-    def on_link_remote_open(self, event):
-        if self.is_local_open(event.link):
-            self.on_link_opened(event)
-        elif self.is_local_uninitialised(event.link):
-            self.on_link_opening(event)
-            event.link.open()
-
-    def on_connection_opened(self, event):
-        if self.delegate != None:
-            dispatch(self.delegate, 'on_connection_opened', event)
-
-    def on_session_opened(self, event):
-        if self.delegate != None:
-            dispatch(self.delegate, 'on_session_opened', event)
-
-    def on_link_opened(self, event):
-        if self.delegate != None:
-            dispatch(self.delegate, 'on_link_opened', event)
-
-    def on_connection_opening(self, event):
-        if self.delegate != None:
-            dispatch(self.delegate, 'on_connection_opening', event)
-
-    def on_session_opening(self, event):
-        if self.delegate != None:
-            dispatch(self.delegate, 'on_session_opening', event)
-
-    def on_link_opening(self, event):
-        if self.delegate != None:
-            dispatch(self.delegate, 'on_link_opening', event)
-
-    def on_connection_error(self, event):
-        if self.delegate != None:
-            dispatch(self.delegate, 'on_connection_error', event)
-        else:
-            self.log_error(event.connection, "connection")
-
-    def on_session_error(self, event):
-        if self.delegate != None:
-            dispatch(self.delegate, 'on_session_error', event)
-        else:
-            self.log_error(event.session, "session")
-            event.connection.close()
-
-    def on_link_error(self, event):
-        if self.delegate != None:
-            dispatch(self.delegate, 'on_link_error', event)
-        else:
-            self.log_error(event.link, "link")
-            event.connection.close()
-
-    def on_connection_closed(self, event):
-        if self.delegate != None:
-            dispatch(self.delegate, 'on_connection_closed', event)
-
-    def on_session_closed(self, event):
-        if self.delegate != None:
-            dispatch(self.delegate, 'on_session_closed', event)
-
-    def on_link_closed(self, event):
-        if self.delegate != None:
-            dispatch(self.delegate, 'on_link_closed', event)
-
-    def on_connection_closing(self, event):
-        if self.delegate != None:
-            dispatch(self.delegate, 'on_connection_closing', event)
-        elif self.peer_close_is_error:
-            self.on_connection_error(event)
-
-    def on_session_closing(self, event):
-        if self.delegate != None:
-            dispatch(self.delegate, 'on_session_closing', event)
-        elif self.peer_close_is_error:
-            self.on_session_error(event)
-
-    def on_link_closing(self, event):
-        if self.delegate != None:
-            dispatch(self.delegate, 'on_link_closing', event)
-        elif self.peer_close_is_error:
-            self.on_link_error(event)
-
-    def on_transport_tail_closed(self, event):
-        self.on_transport_closed(event)
-
-    def on_transport_closed(self, event):
-        if self.delegate != None and event.connection and 
self.is_local_open(event.connection):
-            dispatch(self.delegate, 'on_disconnected', event)
-
-class MessagingHandler(Handler, Acking):
-    """
-    A general purpose handler that makes the proton-c events somewhat
-    simpler to deal with and/or avoids repetitive tasks for common use
-    cases.
-    """
-    def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, 
peer_close_is_error=False):
-        self.handlers = []
-        if prefetch:
-            self.handlers.append(CFlowController(prefetch))
-        self.handlers.append(EndpointStateHandler(peer_close_is_error, self))
-        self.handlers.append(IncomingMessageHandler(auto_accept, self))
-        self.handlers.append(OutgoingMessageHandler(auto_settle, self))
-        self.fatal_conditions = ["amqp:unauthorized-access"]
-
-    def on_transport_error(self, event):
-        """
-        Called when some error is encountered with the transport over
-        which the AMQP connection is to be established. This includes
-        authentication errors as well as socket errors.
-        """
-        if event.transport.condition:
-            if event.transport.condition.info:
-                logging.error("%s: %s" % (event.transport.condition.name, 
event.transport.condition.description, event.transport.condition.info))
-            else:
-                logging.error("%s: %s" % (event.transport.condition.name, 
event.transport.condition.description))
-            if event.transport.condition.name in self.fatal_conditions:
-                event.connection.close()
-        else:
-            logging.error("Unspecified transport error")
-
-    def on_connection_error(self, event):
-        """
-        Called when the peer closes the connection with an error condition.
-        """
-        EndpointStateHandler.print_error(event.connection, "connection")
-
-    def on_session_error(self, event):
-        """
-        Called when the peer closes the session with an error condition.
-        """
-        EndpointStateHandler.print_error(event.session, "session")
-        event.connection.close()
-
-    def on_link_error(self, event):
-        """
-        Called when the peer closes the link with an error condition.
-        """
-        EndpointStateHandler.print_error(event.link, "link")
-        event.connection.close()
-
-    def on_reactor_init(self, event):
-        """
-        Called when the event loop - the reactor - starts.
-        """
-        if hasattr(event.reactor, 'subclass'):
-            setattr(event, event.reactor.subclass.__name__.lower(), 
event.reactor)
-        self.on_start(event)
-
-    def on_start(self, event):
-        """
-        Called when the event loop starts. (Just an alias for on_reactor_init)
-        """
-        pass
-    def on_connection_closed(self, event):
-        """
-        Called when the connection is closed.
-        """
-        pass
-    def on_session_closed(self, event):
-        """
-        Called when the session is closed.
-        """
-        pass
-    def on_link_closed(self, event):
-        """
-        Called when the link is closed.
-        """
-        pass
-    def on_connection_closing(self, event):
-        """
-        Called when the peer initiates the closing of the connection.
-        """
-        pass
-    def on_session_closing(self, event):
-        """
-        Called when the peer initiates the closing of the session.
-        """
-        pass
-    def on_link_closing(self, event):
-        """
-        Called when the peer initiates the closing of the link.
-        """
-        pass
-    def on_disconnected(self, event):
-        """
-        Called when the socket is disconnected.
-        """
-        pass
-
-    def on_sendable(self, event):
-        """
-        Called when the sender link has credit and messages can
-        therefore be transferred.
-        """
-        pass
-
-    def on_accepted(self, event):
-        """
-        Called when the remote peer accepts an outgoing message.
-        """
-        pass
-
-    def on_rejected(self, event):
-        """
-        Called when the remote peer rejects an outgoing message.
-        """
-        pass
-
-    def on_released(self, event):
-        """
-        Called when the remote peer releases an outgoing message. Note
-        that this may be in response to either the RELEASE or MODIFIED
-        state as defined by the AMQP specification.
-        """
-        pass
-
-    def on_settled(self, event):
-        """
-        Called when the remote peer has settled the outgoing
-        message. This is the point at which it shouod never be
-        retransmitted.
-        """
-        pass
-    def on_message(self, event):
-        """
-        Called when a message is received. The message itself can be
-        obtained as a property on the event. For the purpose of
-        refering to this message in further actions (e.g. if
-        explicitly accepting it, the ``delivery`` should be used, also
-        obtainable via a property on the event.
-        """
-        pass
-
-class TransactionHandler(object):
-    """
-    The interface for transaction handlers, i.e. objects that want to
-    be notified of state changes related to a transaction.
-    """
-    def on_transaction_declared(self, event):
-        pass
-
-    def on_transaction_committed(self, event):
-        pass
-
-    def on_transaction_aborted(self, event):
-        pass
-
-    def on_transaction_declare_failed(self, event):
-        pass
-
-    def on_transaction_commit_failed(self, event):
-        pass
-
-class TransactionalClientHandler(MessagingHandler, TransactionHandler):
-    """
-    An extension to the MessagingHandler for applications using
-    transactions.
-    """
-
-    def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, 
peer_close_is_error=False):
-        super(TransactionalClientHandler, self).__init__(prefetch, 
auto_accept, auto_settle, peer_close_is_error)
-
-    def accept(self, delivery, transaction=None):
-        if transaction:
-            transaction.accept(delivery)
-        else:
-            super(TransactionalClientHandler, self).accept(delivery)
-
-from proton import WrappedHandler
-from cproton import pn_flowcontroller, pn_handshaker, pn_iohandler
-
-class CFlowController(WrappedHandler):
-
-    def __init__(self, window=1024):
-        WrappedHandler.__init__(self, lambda: pn_flowcontroller(window))
-
-class CHandshaker(WrappedHandler):
-
-    def __init__(self):
-        WrappedHandler.__init__(self, pn_handshaker)
-
-class IOHandler(WrappedHandler):
-
-    def __init__(self):
-        WrappedHandler.__init__(self, pn_iohandler)
-
-class PythonIO:
-
-    def __init__(self):
-        self.selectables = []
-        self.delegate = IOHandler()
-
-    def on_unhandled(self, method, event):
-        event.dispatch(self.delegate)
-
-    def on_selectable_init(self, event):
-        self.selectables.append(event.context)
-
-    def on_selectable_updated(self, event):
-        pass
-
-    def on_selectable_final(self, event):
-        sel = event.context
-        if sel.is_terminal:
-            self.selectables.remove(sel)
-            sel.release()
-
-    def on_reactor_quiesced(self, event):
-        reactor = event.reactor
-        # check if we are still quiesced, other handlers of
-        # on_reactor_quiesced could have produced events to process
-        if not reactor.quiesced: return
-
-        reading = []
-        writing = []
-        deadline = None
-        for sel in self.selectables:
-            if sel.reading:
-                reading.append(sel)
-            if sel.writing:
-                writing.append(sel)
-            if sel.deadline:
-                if deadline is None:
-                    deadline = sel.deadline
-                else:
-                    deadline = min(sel.deadline, deadline)
-
-        if deadline is not None:
-            timeout = deadline - time.time()
-        else:
-            timeout = reactor.timeout
-        if (timeout < 0): timeout = 0
-        timeout = min(timeout, reactor.timeout)
-        readable, writable, _ = select(reading, writing, [], timeout)
-
-        reactor.mark()
-
-        now = time.time()
-
-        for s in readable:
-            s.readable()
-        for s in writable:
-            s.writable()
-        for s in self.selectables:
-            if s.deadline and now > s.deadline:
-                s.expired()
-
-        reactor.yield_()

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/python/proton/reactor.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactor.py 
b/proton-c/bindings/python/proton/reactor.py
deleted file mode 100644
index e4dab95..0000000
--- a/proton-c/bindings/python/proton/reactor.py
+++ /dev/null
@@ -1,884 +0,0 @@
-from __future__ import absolute_import
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import logging, os, socket, time, types
-from heapq import heappush, heappop, nsmallest
-from proton import Collector, Connection, ConnectionException, Delivery, 
Described, dispatch
-from proton import Endpoint, Event, EventBase, EventType, generate_uuid, 
Handler, Link, Message
-from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, 
SSLDomain, SSLUnavailable, symbol
-from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
-from select import select
-from proton.handlers import OutgoingMessageHandler
-from proton import unicode2utf8, utf82unicode
-
-import traceback
-from proton import WrappedHandler, _chandler, secs2millis, millis2secs, 
timeout2millis, millis2timeout, Selectable
-from .wrapper import Wrapper, PYCTX
-from cproton import *
-from . import _compat
-
-try:
-    import Queue
-except ImportError:
-    import queue as Queue
-
-class Task(Wrapper):
-
-    @staticmethod
-    def wrap(impl):
-        if impl is None:
-            return None
-        else:
-            return Task(impl)
-
-    def __init__(self, impl):
-        Wrapper.__init__(self, impl, pn_task_attachments)
-
-    def _init(self):
-        pass
-
-    def cancel(self):
-        pn_task_cancel(self._impl)
-
-class Acceptor(Wrapper):
-
-    def __init__(self, impl):
-        Wrapper.__init__(self, impl)
-
-    def set_ssl_domain(self, ssl_domain):
-        pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
-
-    def close(self):
-        pn_acceptor_close(self._impl)
-
-class Reactor(Wrapper):
-
-    @staticmethod
-    def wrap(impl):
-        if impl is None:
-            return None
-        else:
-            record = pn_reactor_attachments(impl)
-            attrs = pn_void2py(pn_record_get(record, PYCTX))
-            if attrs and 'subclass' in attrs:
-                return attrs['subclass'](impl=impl)
-            else:
-                return Reactor(impl=impl)
-
-    def __init__(self, *handlers, **kwargs):
-        Wrapper.__init__(self, kwargs.get("impl", pn_reactor), 
pn_reactor_attachments)
-        for h in handlers:
-            self.handler.add(h)
-
-    def _init(self):
-        self.errors = []
-
-    def on_error(self, info):
-        self.errors.append(info)
-        self.yield_()
-
-    def _get_global(self):
-        return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), 
self.on_error)
-
-    def _set_global(self, handler):
-        impl = _chandler(handler, self.on_error)
-        pn_reactor_set_global_handler(self._impl, impl)
-        pn_decref(impl)
-
-    global_handler = property(_get_global, _set_global)
-
-    def _get_timeout(self):
-        return millis2timeout(pn_reactor_get_timeout(self._impl))
-
-    def _set_timeout(self, secs):
-        return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
-
-    timeout = property(_get_timeout, _set_timeout)
-
-    def yield_(self):
-        pn_reactor_yield(self._impl)
-
-    def mark(self):
-        return pn_reactor_mark(self._impl)
-
-    def _get_handler(self):
-        return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), 
self.on_error)
-
-    def _set_handler(self, handler):
-        impl = _chandler(handler, self.on_error)
-        pn_reactor_set_handler(self._impl, impl)
-        pn_decref(impl)
-
-    handler = property(_get_handler, _set_handler)
-
-    def run(self):
-        self.timeout = 3.14159265359
-        self.start()
-        while self.process(): pass
-        self.stop()
-        self.process()
-        self.global_handler = None
-        self.handler = None
-
-    def wakeup(self):
-        n = pn_reactor_wakeup(self._impl)
-        if n: raise IOError(pn_error_text(pn_reactor_error(self._impl)))
-
-    def start(self):
-        pn_reactor_start(self._impl)
-
-    @property
-    def quiesced(self):
-        return pn_reactor_quiesced(self._impl)
-
-    def _check_errors(self):
-        if self.errors:
-            for exc, value, tb in self.errors[:-1]:
-                traceback.print_exception(exc, value, tb)
-            exc, value, tb = self.errors[-1]
-            _compat.raise_(exc, value, tb)
-
-    def process(self):
-        result = pn_reactor_process(self._impl)
-        self._check_errors()
-        return result
-
-    def stop(self):
-        pn_reactor_stop(self._impl)
-        self._check_errors()
-
-    def schedule(self, delay, task):
-        impl = _chandler(task, self.on_error)
-        task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), 
impl))
-        pn_decref(impl)
-        return task
-
-    def acceptor(self, host, port, handler=None):
-        impl = _chandler(handler, self.on_error)
-        aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), 
impl)
-        pn_decref(impl)
-        if aimpl:
-            return Acceptor(aimpl)
-        else:
-            raise IOError("%s (%s:%s)" % 
pn_error_text(pn_reactor_error(self._impl)), host, port)
-
-    def connection(self, handler=None):
-        """Deprecated: use connection_to_host() instead
-        """
-        impl = _chandler(handler, self.on_error)
-        result = Connection.wrap(pn_reactor_connection(self._impl, impl))
-        if impl: pn_decref(impl)
-        return result
-
-    def connection_to_host(self, host, port, handler=None):
-        """Create an outgoing Connection that will be managed by the reactor.
-        The reator's pn_iohandler will create a socket connection to the host
-        once the connection is opened.
-        """
-        conn = self.connection(handler)
-        self.set_connection_host(conn, host, port)
-        return conn
-
-    def set_connection_host(self, connection, host, port):
-        """Change the address used by the connection.  The address is
-        used by the reactor's iohandler to create an outgoing socket
-        connection.  This must be set prior to opening the connection.
-        """
-        pn_reactor_set_connection_host(self._impl,
-                                       connection._impl,
-                                       unicode2utf8(str(host)),
-                                       unicode2utf8(str(port)))
-
-    def get_connection_address(self, connection):
-        """This may be used to retrieve the remote peer address.
-        @return: string containing the address in URL format or None if no
-        address is available.  Use the proton.Url class to create a Url object
-        from the returned value.
-        """
-        _url = pn_reactor_get_connection_address(self._impl, connection._impl)
-        return utf82unicode(_url)
-
-    def selectable(self, handler=None):
-        impl = _chandler(handler, self.on_error)
-        result = Selectable.wrap(pn_reactor_selectable(self._impl))
-        if impl:
-            record = pn_selectable_attachments(result._impl)
-            pn_record_set_handler(record, impl)
-            pn_decref(impl)
-        return result
-
-    def update(self, sel):
-        pn_reactor_update(self._impl, sel._impl)
-
-    def push_event(self, obj, etype):
-        pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, 
pn_py2void(obj), etype.number)
-
-from proton import wrappers as _wrappers
-_wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
-_wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
-
-
-class EventInjector(object):
-    """
-    Can be added to a reactor to allow events to be triggered by an
-    external thread but handled on the event thread associated with
-    the reactor. An instance of this class can be passed to the
-    Reactor.selectable() method of the reactor in order to activate
-    it. The close() method should be called when it is no longer
-    needed, to allow the event loop to end if needed.
-    """
-    def __init__(self):
-        self.queue = Queue.Queue()
-        self.pipe = os.pipe()
-        self._closed = False
-
-    def trigger(self, event):
-        """
-        Request that the given event be dispatched on the event thread
-        of the reactor to which this EventInjector was added.
-        """
-        self.queue.put(event)
-        os.write(self.pipe[1], _compat.str2bin("!"))
-
-    def close(self):
-        """
-        Request that this EventInjector be closed. Existing events
-        will be dispctahed on the reactors event dispactch thread,
-        then this will be removed from the set of interest.
-        """
-        self._closed = True
-        os.write(self.pipe[1], _compat.str2bin("!"))
-
-    def fileno(self):
-        return self.pipe[0]
-
-    def on_selectable_init(self, event):
-        sel = event.context
-        sel.fileno(self.fileno())
-        sel.reading = True
-        event.reactor.update(sel)
-
-    def on_selectable_readable(self, event):
-        os.read(self.pipe[0], 512)
-        while not self.queue.empty():
-            requested = self.queue.get()
-            event.reactor.push_event(requested.context, requested.type)
-        if self._closed:
-            s = event.context
-            s.terminate()
-            event.reactor.update(s)
-
-
-class ApplicationEvent(EventBase):
-    """
-    Application defined event, which can optionally be associated with
-    an engine object and or an arbitrary subject
-    """
-    def __init__(self, typename, connection=None, session=None, link=None, 
delivery=None, subject=None):
-        super(ApplicationEvent, self).__init__(PN_PYREF, self, 
EventType(typename))
-        self.connection = connection
-        self.session = session
-        self.link = link
-        self.delivery = delivery
-        if self.delivery:
-            self.link = self.delivery.link
-        if self.link:
-            self.session = self.link.session
-        if self.session:
-            self.connection = self.session.connection
-        self.subject = subject
-
-    def __repr__(self):
-        objects = [self.connection, self.session, self.link, self.delivery, 
self.subject]
-        return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o 
is not None]))
-
-class Transaction(object):
-    """
-    Class to track state of an AMQP 1.0 transaction.
-    """
-    def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
-        self.txn_ctrl = txn_ctrl
-        self.handler = handler
-        self.id = None
-        self._declare = None
-        self._discharge = None
-        self.failed = False
-        self._pending = []
-        self.settle_before_discharge = settle_before_discharge
-        self.declare()
-
-    def commit(self):
-        self.discharge(False)
-
-    def abort(self):
-        self.discharge(True)
-
-    def declare(self):
-        self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
-
-    def discharge(self, failed):
-        self.failed = failed
-        self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), 
[self.id, failed])
-
-    def _send_ctrl(self, descriptor, value):
-        delivery = self.txn_ctrl.send(Message(body=Described(descriptor, 
value)))
-        delivery.transaction = self
-        return delivery
-
-    def send(self, sender, msg, tag=None):
-        dlv = sender.send(msg, tag=tag)
-        dlv.local.data = [self.id]
-        dlv.update(0x34)
-        return dlv
-
-    def accept(self, delivery):
-        self.update(delivery, PN_ACCEPTED)
-        if self.settle_before_discharge:
-            delivery.settle()
-        else:
-            self._pending.append(delivery)
-
-    def update(self, delivery, state=None):
-        if state:
-            delivery.local.data = [self.id, Described(ulong(state), [])]
-            delivery.update(0x34)
-
-    def _release_pending(self):
-        for d in self._pending:
-            d.update(Delivery.RELEASED)
-            d.settle()
-        self._clear_pending()
-
-    def _clear_pending(self):
-        self._pending = []
-
-    def handle_outcome(self, event):
-        if event.delivery == self._declare:
-            if event.delivery.remote.data:
-                self.id = event.delivery.remote.data[0]
-                self.handler.on_transaction_declared(event)
-            elif event.delivery.remote_state == Delivery.REJECTED:
-                self.handler.on_transaction_declare_failed(event)
-            else:
-                logging.warning("Unexpected outcome for declare: %s" % 
event.delivery.remote_state)
-                self.handler.on_transaction_declare_failed(event)
-        elif event.delivery == self._discharge:
-            if event.delivery.remote_state == Delivery.REJECTED:
-                if not self.failed:
-                    self.handler.on_transaction_commit_failed(event)
-                    self._release_pending() # make this optional?
-            else:
-                if self.failed:
-                    self.handler.on_transaction_aborted(event)
-                    self._release_pending()
-                else:
-                    self.handler.on_transaction_committed(event)
-            self._clear_pending()
-
-class LinkOption(object):
-    """
-    Abstract interface for link configuration options
-    """
-    def apply(self, link):
-        """
-        Subclasses will implement any configuration logic in this
-        method
-        """
-        pass
-    def test(self, link):
-        """
-        Subclasses can override this to selectively apply an option
-        e.g. based on some link criteria
-        """
-        return True
-
-class AtMostOnce(LinkOption):
-    def apply(self, link):
-        link.snd_settle_mode = Link.SND_SETTLED
-
-class AtLeastOnce(LinkOption):
-    def apply(self, link):
-        link.snd_settle_mode = Link.SND_UNSETTLED
-        link.rcv_settle_mode = Link.RCV_FIRST
-
-class SenderOption(LinkOption):
-    def apply(self, sender): pass
-    def test(self, link): return link.is_sender
-
-class ReceiverOption(LinkOption):
-    def apply(self, receiver): pass
-    def test(self, link): return link.is_receiver
-
-class DynamicNodeProperties(LinkOption):
-    def __init__(self, props={}):
-        self.properties = {}
-        for k in props:
-            if isinstance(k, symbol):
-                self.properties[k] = props[k]
-            else:
-                self.properties[symbol(k)] = props[k]
-
-    def apply(self, link):
-        if link.is_receiver:
-            link.source.properties.put_dict(self.properties)
-        else:
-            link.target.properties.put_dict(self.properties)
-
-class Filter(ReceiverOption):
-    def __init__(self, filter_set={}):
-        self.filter_set = filter_set
-
-    def apply(self, receiver):
-        receiver.source.filter.put_dict(self.filter_set)
-
-class Selector(Filter):
-    """
-    Configures a link with a message selector filter
-    """
-    def __init__(self, value, name='selector'):
-        super(Selector, self).__init__({symbol(name): 
Described(symbol('apache.org:selector-filter:string'), value)})
-
-class DurableSubscription(ReceiverOption):
-    def apply(self, receiver):
-        receiver.source.durability = Terminus.DELIVERIES
-        receiver.source.expiry_policy = Terminus.EXPIRE_NEVER
-
-class Move(ReceiverOption):
-    def apply(self, receiver):
-        receiver.source.distribution_mode = Terminus.DIST_MODE_MOVE
-
-class Copy(ReceiverOption):
-    def apply(self, receiver):
-        receiver.source.distribution_mode = Terminus.DIST_MODE_COPY
-
-def _apply_link_options(options, link):
-    if options:
-        if isinstance(options, list):
-            for o in options:
-                if o.test(link): o.apply(link)
-        else:
-            if options.test(link): options.apply(link)
-
-def _create_session(connection, handler=None):
-    session = connection.session()
-    session.open()
-    return session
-
-
-def _get_attr(target, name):
-    if hasattr(target, name):
-        return getattr(target, name)
-    else:
-        return None
-
-class SessionPerConnection(object):
-    def __init__(self):
-        self._default_session = None
-
-    def session(self, connection):
-        if not self._default_session:
-            self._default_session = _create_session(connection)
-            self._default_session.context = self
-        return self._default_session
-
-    def on_session_remote_close(self, event):
-        event.connection.close()
-        self._default_session = None
-
-class GlobalOverrides(object):
-    """
-    Internal handler that triggers the necessary socket connect for an
-    opened connection.
-    """
-    def __init__(self, base):
-        self.base = base
-
-    def on_unhandled(self, name, event):
-        if not self._override(event):
-            event.dispatch(self.base)
-
-    def _override(self, event):
-        conn = event.connection
-        return conn and hasattr(conn, '_overrides') and 
event.dispatch(conn._overrides)
-
-class Connector(Handler):
-    """
-    Internal handler that triggers the necessary socket connect for an
-    opened connection.
-    """
-    def __init__(self, connection):
-        self.connection = connection
-        self.address = None
-        self.heartbeat = None
-        self.reconnect = None
-        self.ssl_domain = None
-        self.allow_insecure_mechs = True
-        self.allowed_mechs = None
-        self.sasl_enabled = True
-        self.user = None
-        self.password = None
-        self.virtual_host = None
-        self.ssl_sni = None
-
-    def _connect(self, connection, reactor):
-        assert(reactor is not None)
-        url = self.address.next()
-        reactor.set_connection_host(connection, url.host, str(url.port))
-        # if virtual-host not set, use host from address as default
-        if self.virtual_host is None:
-            connection.hostname = url.host
-        logging.debug("connecting to %s..." % url)
-
-        transport = Transport()
-        if self.sasl_enabled:
-            sasl = transport.sasl()
-            sasl.allow_insecure_mechs = self.allow_insecure_mechs
-            if url.username:
-                connection.user = url.username
-            elif self.user:
-                connection.user = self.user
-            if url.password:
-                connection.password = url.password
-            elif self.password:
-                connection.password = self.password
-            if self.allowed_mechs:
-                sasl.allowed_mechs(self.allowed_mechs)
-        transport.bind(connection)
-        if self.heartbeat:
-            transport.idle_timeout = self.heartbeat
-        if url.scheme == 'amqps':
-            if not self.ssl_domain:
-                raise SSLUnavailable("amqps: SSL libraries not found")
-            self.ssl = SSL(transport, self.ssl_domain)
-            self.ssl.peer_hostname = self.ssl_sni or self.virtual_host or 
url.host
-
-    def on_connection_local_open(self, event):
-        self._connect(event.connection, event.reactor)
-
-    def on_connection_remote_open(self, event):
-        logging.debug("connected to %s" % event.connection.hostname)
-        if self.reconnect:
-            self.reconnect.reset()
-            self.transport = None
-
-    def on_transport_tail_closed(self, event):
-        self.on_transport_closed(event)
-
-    def on_transport_closed(self, event):
-        if self.connection and self.connection.state & Endpoint.LOCAL_ACTIVE:
-            if self.reconnect:
-                event.transport.unbind()
-                delay = self.reconnect.next()
-                if delay == 0:
-                    logging.info("Disconnected, reconnecting...")
-                    self._connect(self.connection, event.reactor)
-                else:
-                    logging.info("Disconnected will try to reconnect after %s 
seconds" % delay)
-                    event.reactor.schedule(delay, self)
-            else:
-                logging.debug("Disconnected")
-                self.connection = None
-
-    def on_timer_task(self, event):
-        self._connect(self.connection, event.reactor)
-
-    def on_connection_remote_close(self, event):
-        self.connection = None
-
-class Backoff(object):
-    """
-    A reconnect strategy involving an increasing delay between
-    retries, up to a maximum or 10 seconds.
-    """
-    def __init__(self):
-        self.delay = 0
-
-    def reset(self):
-        self.delay = 0
-
-    def next(self):
-        current = self.delay
-        if current == 0:
-            self.delay = 0.1
-        else:
-            self.delay = min(10, 2*current)
-        return current
-
-class Urls(object):
-    def __init__(self, values):
-        self.values = [Url(v) for v in values]
-        self.i = iter(self.values)
-
-    def __iter__(self):
-        return self
-
-    def next(self):
-        try:
-            return next(self.i)
-        except StopIteration:
-            self.i = iter(self.values)
-            return next(self.i)
-
-class SSLConfig(object):
-    def __init__(self):
-        self.client = SSLDomain(SSLDomain.MODE_CLIENT)
-        self.server = SSLDomain(SSLDomain.MODE_SERVER)
-
-    def set_credentials(self, cert_file, key_file, password):
-        self.client.set_credentials(cert_file, key_file, password)
-        self.server.set_credentials(cert_file, key_file, password)
-
-    def set_trusted_ca_db(self, certificate_db):
-        self.client.set_trusted_ca_db(certificate_db)
-        self.server.set_trusted_ca_db(certificate_db)
-
-
-class Container(Reactor):
-    """A representation of the AMQP concept of a 'container', which
-       lossely speaking is something that establishes links to or from
-       another container, over which messages are transfered. This is
-       an extension to the Reactor class that adds convenience methods
-       for creating connections and sender- or receiver- links.
-    """
-    def __init__(self, *handlers, **kwargs):
-        super(Container, self).__init__(*handlers, **kwargs)
-        if "impl" not in kwargs:
-            try:
-                self.ssl = SSLConfig()
-            except SSLUnavailable:
-                self.ssl = None
-            self.global_handler = GlobalOverrides(kwargs.get('global_handler', 
self.global_handler))
-            self.trigger = None
-            self.container_id = str(generate_uuid())
-            self.allow_insecure_mechs = True
-            self.allowed_mechs = None
-            self.sasl_enabled = True
-            self.user = None
-            self.password = None
-            Wrapper.__setattr__(self, 'subclass', self.__class__)
-
-    def connect(self, url=None, urls=None, address=None, handler=None, 
reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
-        """
-        Initiates the establishment of an AMQP connection. Returns an
-        instance of proton.Connection.
-
-        @param url: URL string of process to connect to
-
-        @param urls: list of URL strings of process to try to connect to
-
-        Only one of url or urls should be specified.
-
-        @param reconnect: A value of False will prevent the library
-        form automatically trying to reconnect if the underlying
-        socket is disconnected before the connection has been closed.
-
-        @param heartbeat: A value in milliseconds indicating the
-        desired frequency of heartbeats used to test the underlying
-        socket is alive.
-
-        @param ssl_domain: SSL configuration in the form of an
-        instance of proton.SSLdomain.
-
-        @param handler: a connection scoped handler that will be
-        called to process any events in the scope of this connection
-        or its child links
-
-        @param kwargs: sasl_enabled, which determines whether a sasl layer is
-        used for the connection; allowed_mechs an optional list of SASL
-        mechanisms to allow if sasl is enabled; allow_insecure_mechs a flag
-        indicating whether insecure mechanisms, such as PLAIN over a
-        non-encrypted socket, are allowed; 'virtual_host' the hostname to set
-        in the Open performative used by peer to determine the correct
-        back-end service for the client. If 'virtual_host' is not supplied the
-        host field from the URL is used instead."
-
-        """
-        conn = self.connection(handler)
-        conn.container = self.container_id or str(generate_uuid())
-        conn.offered_capabilities = kwargs.get('offered_capabilities')
-        conn.desired_capabilities = kwargs.get('desired_capabilities')
-        conn.properties = kwargs.get('properties')
-
-        connector = Connector(conn)
-        connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', 
self.allow_insecure_mechs)
-        connector.allowed_mechs = kwargs.get('allowed_mechs', 
self.allowed_mechs)
-        connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled)
-        connector.user = kwargs.get('user', self.user)
-        connector.password = kwargs.get('password', self.password)
-        connector.virtual_host = kwargs.get('virtual_host')
-        if connector.virtual_host:
-            # only set hostname if virtual-host is a non-empty string
-            conn.hostname = connector.virtual_host
-        connector.ssl_sni = kwargs.get('sni')
-
-        conn._overrides = connector
-        if url: connector.address = Urls([url])
-        elif urls: connector.address = Urls(urls)
-        elif address: connector.address = address
-        else: raise ValueError("One of url, urls or address required")
-        if heartbeat:
-            connector.heartbeat = heartbeat
-        if reconnect:
-            connector.reconnect = reconnect
-        elif reconnect is None:
-            connector.reconnect = Backoff()
-        # use container's default client domain if none specified.  This is
-        # only necessary of the URL specifies the "amqps:" scheme
-        connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
-        conn._session_policy = SessionPerConnection() #todo: make configurable
-        conn.open()
-        return conn
-
-    def _get_id(self, container, remote, local):
-        if local and remote: "%s-%s-%s" % (container, remote, local)
-        elif local: return "%s-%s" % (container, local)
-        elif remote: return "%s-%s" % (container, remote)
-        else: return "%s-%s" % (container, str(generate_uuid()))
-
-    def _get_session(self, context):
-        if isinstance(context, Url):
-            return self._get_session(self.connect(url=context))
-        elif isinstance(context, Session):
-            return context
-        elif isinstance(context, Connection):
-            if hasattr(context, '_session_policy'):
-                return context._session_policy.session(context)
-            else:
-                return _create_session(context)
-        else:
-            return context.session()
-
-    def create_sender(self, context, target=None, source=None, name=None, 
handler=None, tags=None, options=None):
-        """
-        Initiates the establishment of a link over which messages can
-        be sent. Returns an instance of proton.Sender.
-
-        There are two patterns of use. (1) A connection can be passed
-        as the first argument, in which case the link is established
-        on that connection. In this case the target address can be
-        specified as the second argument (or as a keyword
-        argument). The source address can also be specified if
-        desired. (2) Alternatively a URL can be passed as the first
-        argument. In this case a new connection will be establised on
-        which the link will be attached. If a path is specified and
-        the target is not, then the path of the URL is used as the
-        target address.
-
-        The name of the link may be specified if desired, otherwise a
-        unique name will be generated.
-
-        Various LinkOptions can be specified to further control the
-        attachment.
-        """
-        if isinstance(context, _compat.STRING_TYPES):
-            context = Url(context)
-        if isinstance(context, Url) and not target:
-            target = context.path
-        session = self._get_session(context)
-        snd = session.sender(name or 
self._get_id(session.connection.container, target, source))
-        if source:
-            snd.source.address = source
-        if target:
-            snd.target.address = target
-        if handler != None:
-            snd.handler = handler
-        if tags:
-            snd.tag_generator = tags
-        _apply_link_options(options, snd)
-        snd.open()
-        return snd
-
-    def create_receiver(self, context, source=None, target=None, name=None, 
dynamic=False, handler=None, options=None):
-        """
-        Initiates the establishment of a link over which messages can
-        be received (aka a subscription). Returns an instance of
-        proton.Receiver.
-
-        There are two patterns of use. (1) A connection can be passed
-        as the first argument, in which case the link is established
-        on that connection. In this case the source address can be
-        specified as the second argument (or as a keyword
-        argument). The target address can also be specified if
-        desired. (2) Alternatively a URL can be passed as the first
-        argument. In this case a new connection will be establised on
-        which the link will be attached. If a path is specified and
-        the source is not, then the path of the URL is used as the
-        target address.
-
-        The name of the link may be specified if desired, otherwise a
-        unique name will be generated.
-
-        Various LinkOptions can be specified to further control the
-        attachment.
-        """
-        if isinstance(context, _compat.STRING_TYPES):
-            context = Url(context)
-        if isinstance(context, Url) and not source:
-            source = context.path
-        session = self._get_session(context)
-        rcv = session.receiver(name or 
self._get_id(session.connection.container, source, target))
-        if source:
-            rcv.source.address = source
-        if dynamic:
-            rcv.source.dynamic = True
-        if target:
-            rcv.target.address = target
-        if handler != None:
-            rcv.handler = handler
-        _apply_link_options(options, rcv)
-        rcv.open()
-        return rcv
-
-    def declare_transaction(self, context, handler=None, 
settle_before_discharge=False):
-        if not _get_attr(context, '_txn_ctrl'):
-            class InternalTransactionHandler(OutgoingMessageHandler):
-                def __init__(self):
-                    super(InternalTransactionHandler, 
self).__init__(auto_settle=True)
-
-                def on_settled(self, event):
-                    if hasattr(event.delivery, "transaction"):
-                        event.transaction = event.delivery.transaction
-                        event.delivery.transaction.handle_outcome(event)
-            context._txn_ctrl = self.create_sender(context, None, 
name='txn-ctrl', handler=InternalTransactionHandler())
-            context._txn_ctrl.target.type = Terminus.COORDINATOR
-            
context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
-        return Transaction(context._txn_ctrl, handler, settle_before_discharge)
-
-    def listen(self, url, ssl_domain=None):
-        """
-        Initiates a server socket, accepting incoming AMQP connections
-        on the interface and port specified.
-        """
-        url = Url(url)
-        acceptor = self.acceptor(url.host, url.port)
-        ssl_config = ssl_domain
-        if not ssl_config and url.scheme == 'amqps':
-            # use container's default server domain
-            if self.ssl:
-                ssl_config = self.ssl.server
-            else:
-                raise SSLUnavailable("amqps: SSL libraries not found")
-        if ssl_config:
-            acceptor.set_ssl_domain(ssl_config)
-        return acceptor
-
-    def do_work(self, timeout=None):
-        if timeout:
-            self.timeout = timeout
-        return self.process()

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/utils.py 
b/proton-c/bindings/python/proton/utils.py
deleted file mode 100644
index 9cd7cf3..0000000
--- a/proton-c/bindings/python/proton/utils.py
+++ /dev/null
@@ -1,356 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import collections, socket, time, threading
-
-from proton import ConnectionException, Delivery, Endpoint, Handler, Link, 
LinkException, Message
-from proton import ProtonException, Timeout, Url
-from proton.reactor import Container
-from proton.handlers import MessagingHandler, IncomingMessageHandler
-
-
-class BlockingLink(object):
-    def __init__(self, connection, link):
-        self.connection = connection
-        self.link = link
-        self.connection.wait(lambda: not (self.link.state & 
Endpoint.REMOTE_UNINIT),
-                             msg="Opening link %s" % link.name)
-        self._checkClosed()
-
-    def _waitForClose(self, timeout=1):
-        try:
-            self.connection.wait(lambda: self.link.state & 
Endpoint.REMOTE_CLOSED,
-                                 timeout=timeout,
-                                 msg="Opening link %s" % self.link.name)
-        except Timeout as e: pass
-        self._checkClosed()
-
-    def _checkClosed(self):
-        if self.link.state & Endpoint.REMOTE_CLOSED:
-            self.link.close()
-            raise LinkDetached(self.link)
-
-    def close(self):
-        self.link.close()
-        self.connection.wait(lambda: not (self.link.state & 
Endpoint.REMOTE_ACTIVE),
-                             msg="Closing link %s" % self.link.name)
-
-    # Access to other link attributes.
-    def __getattr__(self, name): return getattr(self.link, name)
-
-class SendException(ProtonException):
-    """
-    Exception used to indicate an exceptional state/condition on a send request
-    """
-    def __init__(self, state):
-        self.state = state
-
-def _is_settled(delivery):
-    return delivery.settled or delivery.link.snd_settle_mode == 
Link.SND_SETTLED
-
-class BlockingSender(BlockingLink):
-    def __init__(self, connection, sender):
-        super(BlockingSender, self).__init__(connection, sender)
-        if self.link.target and self.link.target.address and 
self.link.target.address != self.link.remote_target.address:
-            #this may be followed by a detach, which may contain an error 
condition, so wait a little...
-            self._waitForClose()
-            #...but close ourselves if peer does not
-            self.link.close()
-            raise LinkException("Failed to open sender %s, target does not 
match" % self.link.name)
-
-    def send(self, msg, timeout=False, error_states=None):
-        delivery = self.link.send(msg)
-        self.connection.wait(lambda: _is_settled(delivery), msg="Sending on 
sender %s" % self.link.name, timeout=timeout)
-        if delivery.link.snd_settle_mode != Link.SND_SETTLED:
-            delivery.settle()
-        bad = error_states
-        if bad is None:
-            bad = [Delivery.REJECTED, Delivery.RELEASED]
-        if delivery.remote_state in bad:
-            raise SendException(delivery.remote_state)
-        return delivery
-
-class Fetcher(MessagingHandler):
-    def __init__(self, connection, prefetch):
-        super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False)
-        self.connection = connection
-        self.incoming = collections.deque([])
-        self.unsettled = collections.deque([])
-
-    def on_message(self, event):
-        self.incoming.append((event.message, event.delivery))
-        self.connection.container.yield_() # Wake up the wait() loop to handle 
the message.
-
-    def on_link_error(self, event):
-        if event.link.state & Endpoint.LOCAL_ACTIVE:
-            event.link.close()
-            raise LinkDetached(event.link)
-
-    def on_connection_error(self, event):
-        raise ConnectionClosed(event.connection)
-
-    @property
-    def has_message(self):
-        return len(self.incoming)
-
-    def pop(self):
-        message, delivery = self.incoming.popleft()
-        if not delivery.settled:
-            self.unsettled.append(delivery)
-        return message
-
-    def settle(self, state=None):
-        delivery = self.unsettled.popleft()
-        if state:
-            delivery.update(state)
-        delivery.settle()
-
-
-class BlockingReceiver(BlockingLink):
-    def __init__(self, connection, receiver, fetcher, credit=1):
-        super(BlockingReceiver, self).__init__(connection, receiver)
-        if self.link.source and self.link.source.address and 
self.link.source.address != self.link.remote_source.address:
-            #this may be followed by a detach, which may contain an error 
condition, so wait a little...
-            self._waitForClose()
-            #...but close ourselves if peer does not
-            self.link.close()
-            raise LinkException("Failed to open receiver %s, source does not 
match" % self.link.name)
-        if credit: receiver.flow(credit)
-        self.fetcher = fetcher
-
-    def __del__(self):
-        self.fetcher = None
-        self.link.handler = None
-
-    def receive(self, timeout=False):
-        if not self.fetcher:
-            raise Exception("Can't call receive on this receiver as a handler 
was provided")
-        if not self.link.credit:
-            self.link.flow(1)
-        self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving 
on receiver %s" % self.link.name, timeout=timeout)
-        return self.fetcher.pop()
-
-    def accept(self):
-        self.settle(Delivery.ACCEPTED)
-
-    def reject(self):
-        self.settle(Delivery.REJECTED)
-
-    def release(self, delivered=True):
-        if delivered:
-            self.settle(Delivery.MODIFIED)
-        else:
-            self.settle(Delivery.RELEASED)
-
-    def settle(self, state=None):
-        if not self.fetcher:
-            raise Exception("Can't call accept/reject etc on this receiver as 
a handler was provided")
-        self.fetcher.settle(state)
-
-
-class LinkDetached(LinkException):
-    def __init__(self, link):
-        self.link = link
-        if link.is_sender:
-            txt = "sender %s to %s closed" % (link.name, link.target.address)
-        else:
-            txt = "receiver %s from %s closed" % (link.name, 
link.source.address)
-        if link.remote_condition:
-            txt += " due to: %s" % link.remote_condition
-            self.condition = link.remote_condition.name
-        else:
-            txt += " by peer"
-            self.condition = None
-        super(LinkDetached, self).__init__(txt)
-
-
-class ConnectionClosed(ConnectionException):
-    def __init__(self, connection):
-        self.connection = connection
-        txt = "Connection %s closed" % connection.hostname
-        if connection.remote_condition:
-            txt += " due to: %s" % connection.remote_condition
-            self.condition = connection.remote_condition.name
-        else:
-            txt += " by peer"
-            self.condition = None
-        super(ConnectionClosed, self).__init__(txt)
-
-
-class BlockingConnection(Handler):
-    """
-    A synchronous style connection wrapper.
-    """
-    def __init__(self, url, timeout=None, container=None, ssl_domain=None, 
heartbeat=None, **kwargs):
-        self.disconnected = False
-        self.timeout = timeout or 60
-        self.container = container or Container()
-        self.container.timeout = self.timeout
-        self.container.start()
-        self.url = Url(url).defaults()
-        self.conn = self.container.connect(url=self.url, handler=self, 
ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat, **kwargs)
-        self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
-                  msg="Opening connection")
-
-    def create_sender(self, address, handler=None, name=None, options=None):
-        return BlockingSender(self, self.container.create_sender(self.conn, 
address, name=name, handler=handler, options=options))
-
-    def create_receiver(self, address, credit=None, dynamic=False, 
handler=None, name=None, options=None):
-        prefetch = credit
-        if handler:
-            fetcher = None
-            if prefetch is None:
-                prefetch = 1
-        else:
-            fetcher = Fetcher(self, credit)
-        return BlockingReceiver(
-            self, self.container.create_receiver(self.conn, address, 
name=name, dynamic=dynamic, handler=handler or fetcher, options=options), 
fetcher, credit=prefetch)
-
-    def close(self):
-        self.conn.close()
-        try:
-            self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE),
-                      msg="Closing connection")
-        finally:
-            self.conn = None
-            self.container = None
-
-    def _is_closed(self):
-        return self.conn.state & (Endpoint.LOCAL_CLOSED | 
Endpoint.REMOTE_CLOSED)
-
-    def run(self):
-        """ Hand control over to the event loop (e.g. if waiting indefinitely 
for incoming messages) """
-        while self.container.process(): pass
-
-    def wait(self, condition, timeout=False, msg=None):
-        """Call process until condition() is true"""
-        if timeout is False:
-            timeout = self.timeout
-        if timeout is None:
-            while not condition() and not self.disconnected:
-                self.container.process()
-        else:
-            container_timeout = self.container.timeout
-            self.container.timeout = timeout
-            try:
-                deadline = time.time() + timeout
-                while not condition() and not self.disconnected:
-                    self.container.process()
-                    if deadline < time.time():
-                        txt = "Connection %s timed out" % self.url
-                        if msg: txt += ": " + msg
-                        raise Timeout(txt)
-            finally:
-                self.container.timeout = container_timeout
-        if self.disconnected or self._is_closed():
-            self.container.stop()
-            self.conn.handler = None # break cyclical reference
-        if self.disconnected and not self._is_closed():
-            raise ConnectionException(
-                "Connection %s disconnected: %s" % (self.url, 
self.disconnected))
-
-    def on_link_remote_close(self, event):
-        if event.link.state & Endpoint.LOCAL_ACTIVE:
-            event.link.close()
-            raise LinkDetached(event.link)
-
-    def on_connection_remote_close(self, event):
-        if event.connection.state & Endpoint.LOCAL_ACTIVE:
-            event.connection.close()
-            raise ConnectionClosed(event.connection)
-
-    def on_transport_tail_closed(self, event):
-        self.on_transport_closed(event)
-
-    def on_transport_head_closed(self, event):
-        self.on_transport_closed(event)
-
-    def on_transport_closed(self, event):
-        self.disconnected = event.transport.condition or "unknown"
-
-class AtomicCount(object):
-    def __init__(self, start=0, step=1):
-        """Thread-safe atomic counter. Start at start, increment by step."""
-        self.count, self.step = start, step
-        self.lock = threading.Lock()
-
-    def next(self):
-        """Get the next value"""
-        self.lock.acquire()
-        self.count += self.step;
-        result = self.count
-        self.lock.release()
-        return result
-
-class SyncRequestResponse(IncomingMessageHandler):
-    """
-    Implementation of the synchronous request-responce (aka RPC) pattern.
-    @ivar address: Address for all requests, may be None.
-    @ivar connection: Connection for requests and responses.
-    """
-
-    correlation_id = AtomicCount()
-
-    def __init__(self, connection, address=None):
-        """
-        Send requests and receive responses. A single instance can send many 
requests
-        to the same or different addresses.
-
-        @param connection: A L{BlockingConnection}
-        @param address: Address for all requests.
-            If not specified, each request must have the address property set.
-            Sucessive messages may have different addresses.
-        """
-        super(SyncRequestResponse, self).__init__()
-        self.connection = connection
-        self.address = address
-        self.sender = self.connection.create_sender(self.address)
-        # dynamic=true generates a unique address dynamically for this 
receiver.
-        # credit=1 because we want to receive 1 response message initially.
-        self.receiver = self.connection.create_receiver(None, dynamic=True, 
credit=1, handler=self)
-        self.response = None
-
-    def call(self, request):
-        """
-        Send a request message, wait for and return the response message.
-
-        @param request: A L{proton.Message}. If L{self.address} is not set the 
-            L{self.address} must be set and will be used.
-        """
-        if not self.address and not request.address:
-            raise ValueError("Request message has no address: %s" % request)
-        request.reply_to = self.reply_to
-        request.correlation_id = correlation_id = self.correlation_id.next()
-        self.sender.send(request)
-        def wakeup():
-            return self.response and (self.response.correlation_id == 
correlation_id)
-        self.connection.wait(wakeup, msg="Waiting for response")
-        response = self.response
-        self.response = None    # Ready for next response.
-        self.receiver.flow(1)   # Set up credit for the next response.
-        return response
-
-    @property
-    def reply_to(self):
-        """Return the dynamic address of our receiver."""
-        return self.receiver.remote_source.address
-
-    def on_message(self, event):
-        """Called when we receive a message for our receiver."""
-        self.response = event.message
-        self.connection.container.yield_() # Wake up the wait() loop to handle 
the message.

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/python/proton/wrapper.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/wrapper.py 
b/proton-c/bindings/python/proton/wrapper.py
deleted file mode 100644
index eff917b..0000000
--- a/proton-c/bindings/python/proton/wrapper.py
+++ /dev/null
@@ -1,114 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from cproton import *
-
-class EmptyAttrs:
-
-    def __contains__(self, name):
-        return False
-
-    def __getitem__(self, name):
-        raise KeyError(name)
-
-    def __setitem__(self, name, value):
-        raise TypeError("does not support item assignment")
-
-EMPTY_ATTRS = EmptyAttrs()
-
-class Wrapper(object):
-
-    def __init__(self, impl_or_constructor, get_context=None):
-        init = False
-        if callable(impl_or_constructor):
-            # we are constructing a new object
-            impl = impl_or_constructor()
-            if impl is None:
-                from proton import ProtonException
-                raise ProtonException("Wrapper failed to create wrapped 
object. Check for file descriptor or memory exhaustion.")
-            init = True
-        else:
-            # we are wrapping an existing object
-            impl = impl_or_constructor
-            pn_incref(impl)
-
-        if get_context:
-            record = get_context(impl)
-            attrs = pn_void2py(pn_record_get(record, PYCTX))
-            if attrs is None:
-                attrs = {}
-                pn_record_def(record, PYCTX, PN_PYREF)
-                pn_record_set(record, PYCTX, pn_py2void(attrs))
-                init = True
-        else:
-            attrs = EMPTY_ATTRS
-            init = False
-            record = None
-        self.__dict__["_impl"] = impl
-        self.__dict__["_attrs"] = attrs
-        self.__dict__["_record"] = record
-        if init: self._init()
-
-    def __getattr__(self, name):
-        attrs = self.__dict__["_attrs"]
-        if name in attrs:
-            return attrs[name]
-        else:
-            raise AttributeError(name + " not in _attrs")
-
-    def __setattr__(self, name, value):
-        if hasattr(self.__class__, name):
-            object.__setattr__(self, name, value)
-        else:
-            attrs = self.__dict__["_attrs"]
-            attrs[name] = value
-
-    def __delattr__(self, name):
-        attrs = self.__dict__["_attrs"]
-        if attrs:
-            del attrs[name]
-
-    def __hash__(self):
-        return hash(addressof(self._impl))
-
-    def __eq__(self, other):
-        if isinstance(other, Wrapper):
-            return addressof(self._impl) == addressof(other._impl)
-        return False
-
-    def __ne__(self, other):
-        if isinstance(other, Wrapper):
-            return addressof(self._impl) != addressof(other._impl)
-        return True
-
-    def __del__(self):
-        pn_decref(self._impl)
-
-    def __repr__(self):
-        return '<%s.%s 0x%x ~ 0x%x>' % (self.__class__.__module__,
-                                               self.__class__.__name__,
-                                               id(self), addressof(self._impl))
-
-
-if pn_py2void(Wrapper) is Wrapper:
-    PYCTX = Wrapper
-    import java.lang.System
-    addressof = java.lang.System.identityHashCode
-else:
-    PYCTX = int(pn_py2void(Wrapper))
-    addressof = int


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to