http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/python/proton/_compat.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/_compat.py b/proton-c/bindings/python/proton/_compat.py deleted file mode 100644 index 4585dfc..0000000 --- a/proton-c/bindings/python/proton/_compat.py +++ /dev/null @@ -1,84 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -Utilities to help Proton support both python2 and python3. -""" - -import sys -import types -IS_PY2 = sys.version_info[0] == 2 -IS_PY3 = sys.version_info[0] == 3 - -if IS_PY3: - INT_TYPES = (int,) - TEXT_TYPES = (str,) - STRING_TYPES = (str,) - BINARY_TYPES = (bytes,) - CLASS_TYPES = (type,) - - def raise_(t, v=None, tb=None): - """Mimic the old 2.x raise behavior: - Raise an exception of type t with value v using optional traceback tb - """ - if v is None: - v = t() - if tb is None: - raise v - else: - raise v.with_traceback(tb) - - def iteritems(d): - return iter(d.items()) - - def unichar(i): - return chr(i) - - def str2bin(s, encoding='latin-1'): - """Convert str to binary type""" - return s.encode(encoding) - - def str2unicode(s): - return s - -else: - INT_TYPES = (int, long) - TEXT_TYPES = (unicode,) - # includes both unicode and non-unicode strings: - STRING_TYPES = (basestring,) - BINARY_TYPES = (str,) - CLASS_TYPES = (type, types.ClassType) - - # the raise syntax will cause a parse error in Py3, so 'sneak' in a - # definition that won't cause the parser to barf - exec("""def raise_(t, v=None, tb=None): - raise t, v, tb -""") - - def iteritems(d, **kw): - return d.iteritems() - - def unichar(i): - return unichr(i) - - def str2bin(s, encoding='latin-1'): - return s - - def str2unicode(s): - return unicode(s, "unicode_escape")
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/python/proton/handlers.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/handlers.py b/proton-c/bindings/python/proton/handlers.py deleted file mode 100644 index a689e65..0000000 --- a/proton-c/bindings/python/proton/handlers.py +++ /dev/null @@ -1,637 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import heapq, logging, os, re, socket, time, types - -from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url -from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout -from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException -from select import select - - -class OutgoingMessageHandler(Handler): - """ - A utility for simpler and more intuitive handling of delivery - events related to outgoing i.e. sent messages. - """ - def __init__(self, auto_settle=True, delegate=None): - self.auto_settle = auto_settle - self.delegate = delegate - - def on_link_flow(self, event): - if event.link.is_sender and event.link.credit \ - and event.link.state & Endpoint.LOCAL_ACTIVE \ - and event.link.state & Endpoint.REMOTE_ACTIVE : - self.on_sendable(event) - - def on_delivery(self, event): - dlv = event.delivery - if dlv.link.is_sender and dlv.updated: - if dlv.remote_state == Delivery.ACCEPTED: - self.on_accepted(event) - elif dlv.remote_state == Delivery.REJECTED: - self.on_rejected(event) - elif dlv.remote_state == Delivery.RELEASED or dlv.remote_state == Delivery.MODIFIED: - self.on_released(event) - if dlv.settled: - self.on_settled(event) - if self.auto_settle: - dlv.settle() - - def on_sendable(self, event): - """ - Called when the sender link has credit and messages can - therefore be transferred. - """ - if self.delegate != None: - dispatch(self.delegate, 'on_sendable', event) - - def on_accepted(self, event): - """ - Called when the remote peer accepts an outgoing message. - """ - if self.delegate != None: - dispatch(self.delegate, 'on_accepted', event) - - def on_rejected(self, event): - """ - Called when the remote peer rejects an outgoing message. - """ - if self.delegate != None: - dispatch(self.delegate, 'on_rejected', event) - - def on_released(self, event): - """ - Called when the remote peer releases an outgoing message. Note - that this may be in response to either the RELEASE or MODIFIED - state as defined by the AMQP specification. - """ - if self.delegate != None: - dispatch(self.delegate, 'on_released', event) - - def on_settled(self, event): - """ - Called when the remote peer has settled the outgoing - message. This is the point at which it shouod never be - retransmitted. - """ - if self.delegate != None: - dispatch(self.delegate, 'on_settled', event) - -def recv_msg(delivery): - msg = Message() - msg.decode(delivery.link.recv(delivery.pending)) - delivery.link.advance() - return msg - -class Reject(ProtonException): - """ - An exception that indicate a message should be rejected - """ - pass - -class Release(ProtonException): - """ - An exception that indicate a message should be rejected - """ - pass - -class Acking(object): - def accept(self, delivery): - """ - Accepts a received message. - """ - self.settle(delivery, Delivery.ACCEPTED) - - def reject(self, delivery): - """ - Rejects a received message that is considered invalid or - unprocessable. - """ - self.settle(delivery, Delivery.REJECTED) - - def release(self, delivery, delivered=True): - """ - Releases a received message, making it available at the source - for any (other) interested receiver. The ``delivered`` - parameter indicates whether this should be considered a - delivery attempt (and the delivery count updated) or not. - """ - if delivered: - self.settle(delivery, Delivery.MODIFIED) - else: - self.settle(delivery, Delivery.RELEASED) - - def settle(self, delivery, state=None): - if state: - delivery.update(state) - delivery.settle() - -class IncomingMessageHandler(Handler, Acking): - """ - A utility for simpler and more intuitive handling of delivery - events related to incoming i.e. received messages. - """ - - def __init__(self, auto_accept=True, delegate=None): - self.delegate = delegate - self.auto_accept = auto_accept - - def on_delivery(self, event): - dlv = event.delivery - if not dlv.link.is_receiver: return - if dlv.readable and not dlv.partial: - event.message = recv_msg(dlv) - if event.link.state & Endpoint.LOCAL_CLOSED: - if self.auto_accept: - dlv.update(Delivery.RELEASED) - dlv.settle() - else: - try: - self.on_message(event) - if self.auto_accept: - dlv.update(Delivery.ACCEPTED) - dlv.settle() - except Reject: - dlv.update(Delivery.REJECTED) - dlv.settle() - except Release: - dlv.update(Delivery.MODIFIED) - dlv.settle() - elif dlv.updated and dlv.settled: - self.on_settled(event) - - def on_message(self, event): - """ - Called when a message is received. The message itself can be - obtained as a property on the event. For the purpose of - refering to this message in further actions (e.g. if - explicitly accepting it, the ``delivery`` should be used, also - obtainable via a property on the event. - """ - if self.delegate != None: - dispatch(self.delegate, 'on_message', event) - - def on_settled(self, event): - if self.delegate != None: - dispatch(self.delegate, 'on_settled', event) - -class EndpointStateHandler(Handler): - """ - A utility that exposes 'endpoint' events i.e. the open/close for - links, sessions and connections in a more intuitive manner. A - XXX_opened method will be called when both local and remote peers - have opened the link, session or connection. This can be used to - confirm a locally initiated action for example. A XXX_opening - method will be called when the remote peer has requested an open - that was not initiated locally. By default this will simply open - locally, which then triggers the XXX_opened call. The same applies - to close. - """ - - def __init__(self, peer_close_is_error=False, delegate=None): - self.delegate = delegate - self.peer_close_is_error = peer_close_is_error - - @classmethod - def is_local_open(cls, endpoint): - return endpoint.state & Endpoint.LOCAL_ACTIVE - - @classmethod - def is_local_uninitialised(cls, endpoint): - return endpoint.state & Endpoint.LOCAL_UNINIT - - @classmethod - def is_local_closed(cls, endpoint): - return endpoint.state & Endpoint.LOCAL_CLOSED - - @classmethod - def is_remote_open(cls, endpoint): - return endpoint.state & Endpoint.REMOTE_ACTIVE - - @classmethod - def is_remote_closed(cls, endpoint): - return endpoint.state & Endpoint.REMOTE_CLOSED - - @classmethod - def print_error(cls, endpoint, endpoint_type): - if endpoint.remote_condition: - logging.error(endpoint.remote_condition.description or endpoint.remote_condition.name) - elif cls.is_local_open(endpoint) and cls.is_remote_closed(endpoint): - logging.error("%s closed by peer" % endpoint_type) - - def on_link_remote_close(self, event): - if event.link.remote_condition: - self.on_link_error(event) - elif self.is_local_closed(event.link): - self.on_link_closed(event) - else: - self.on_link_closing(event) - event.link.close() - - def on_session_remote_close(self, event): - if event.session.remote_condition: - self.on_session_error(event) - elif self.is_local_closed(event.session): - self.on_session_closed(event) - else: - self.on_session_closing(event) - event.session.close() - - def on_connection_remote_close(self, event): - if event.connection.remote_condition: - self.on_connection_error(event) - elif self.is_local_closed(event.connection): - self.on_connection_closed(event) - else: - self.on_connection_closing(event) - event.connection.close() - - def on_connection_local_open(self, event): - if self.is_remote_open(event.connection): - self.on_connection_opened(event) - - def on_connection_remote_open(self, event): - if self.is_local_open(event.connection): - self.on_connection_opened(event) - elif self.is_local_uninitialised(event.connection): - self.on_connection_opening(event) - event.connection.open() - - def on_session_local_open(self, event): - if self.is_remote_open(event.session): - self.on_session_opened(event) - - def on_session_remote_open(self, event): - if self.is_local_open(event.session): - self.on_session_opened(event) - elif self.is_local_uninitialised(event.session): - self.on_session_opening(event) - event.session.open() - - def on_link_local_open(self, event): - if self.is_remote_open(event.link): - self.on_link_opened(event) - - def on_link_remote_open(self, event): - if self.is_local_open(event.link): - self.on_link_opened(event) - elif self.is_local_uninitialised(event.link): - self.on_link_opening(event) - event.link.open() - - def on_connection_opened(self, event): - if self.delegate != None: - dispatch(self.delegate, 'on_connection_opened', event) - - def on_session_opened(self, event): - if self.delegate != None: - dispatch(self.delegate, 'on_session_opened', event) - - def on_link_opened(self, event): - if self.delegate != None: - dispatch(self.delegate, 'on_link_opened', event) - - def on_connection_opening(self, event): - if self.delegate != None: - dispatch(self.delegate, 'on_connection_opening', event) - - def on_session_opening(self, event): - if self.delegate != None: - dispatch(self.delegate, 'on_session_opening', event) - - def on_link_opening(self, event): - if self.delegate != None: - dispatch(self.delegate, 'on_link_opening', event) - - def on_connection_error(self, event): - if self.delegate != None: - dispatch(self.delegate, 'on_connection_error', event) - else: - self.log_error(event.connection, "connection") - - def on_session_error(self, event): - if self.delegate != None: - dispatch(self.delegate, 'on_session_error', event) - else: - self.log_error(event.session, "session") - event.connection.close() - - def on_link_error(self, event): - if self.delegate != None: - dispatch(self.delegate, 'on_link_error', event) - else: - self.log_error(event.link, "link") - event.connection.close() - - def on_connection_closed(self, event): - if self.delegate != None: - dispatch(self.delegate, 'on_connection_closed', event) - - def on_session_closed(self, event): - if self.delegate != None: - dispatch(self.delegate, 'on_session_closed', event) - - def on_link_closed(self, event): - if self.delegate != None: - dispatch(self.delegate, 'on_link_closed', event) - - def on_connection_closing(self, event): - if self.delegate != None: - dispatch(self.delegate, 'on_connection_closing', event) - elif self.peer_close_is_error: - self.on_connection_error(event) - - def on_session_closing(self, event): - if self.delegate != None: - dispatch(self.delegate, 'on_session_closing', event) - elif self.peer_close_is_error: - self.on_session_error(event) - - def on_link_closing(self, event): - if self.delegate != None: - dispatch(self.delegate, 'on_link_closing', event) - elif self.peer_close_is_error: - self.on_link_error(event) - - def on_transport_tail_closed(self, event): - self.on_transport_closed(event) - - def on_transport_closed(self, event): - if self.delegate != None and event.connection and self.is_local_open(event.connection): - dispatch(self.delegate, 'on_disconnected', event) - -class MessagingHandler(Handler, Acking): - """ - A general purpose handler that makes the proton-c events somewhat - simpler to deal with and/or avoids repetitive tasks for common use - cases. - """ - def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False): - self.handlers = [] - if prefetch: - self.handlers.append(CFlowController(prefetch)) - self.handlers.append(EndpointStateHandler(peer_close_is_error, self)) - self.handlers.append(IncomingMessageHandler(auto_accept, self)) - self.handlers.append(OutgoingMessageHandler(auto_settle, self)) - self.fatal_conditions = ["amqp:unauthorized-access"] - - def on_transport_error(self, event): - """ - Called when some error is encountered with the transport over - which the AMQP connection is to be established. This includes - authentication errors as well as socket errors. - """ - if event.transport.condition: - if event.transport.condition.info: - logging.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description, event.transport.condition.info)) - else: - logging.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description)) - if event.transport.condition.name in self.fatal_conditions: - event.connection.close() - else: - logging.error("Unspecified transport error") - - def on_connection_error(self, event): - """ - Called when the peer closes the connection with an error condition. - """ - EndpointStateHandler.print_error(event.connection, "connection") - - def on_session_error(self, event): - """ - Called when the peer closes the session with an error condition. - """ - EndpointStateHandler.print_error(event.session, "session") - event.connection.close() - - def on_link_error(self, event): - """ - Called when the peer closes the link with an error condition. - """ - EndpointStateHandler.print_error(event.link, "link") - event.connection.close() - - def on_reactor_init(self, event): - """ - Called when the event loop - the reactor - starts. - """ - if hasattr(event.reactor, 'subclass'): - setattr(event, event.reactor.subclass.__name__.lower(), event.reactor) - self.on_start(event) - - def on_start(self, event): - """ - Called when the event loop starts. (Just an alias for on_reactor_init) - """ - pass - def on_connection_closed(self, event): - """ - Called when the connection is closed. - """ - pass - def on_session_closed(self, event): - """ - Called when the session is closed. - """ - pass - def on_link_closed(self, event): - """ - Called when the link is closed. - """ - pass - def on_connection_closing(self, event): - """ - Called when the peer initiates the closing of the connection. - """ - pass - def on_session_closing(self, event): - """ - Called when the peer initiates the closing of the session. - """ - pass - def on_link_closing(self, event): - """ - Called when the peer initiates the closing of the link. - """ - pass - def on_disconnected(self, event): - """ - Called when the socket is disconnected. - """ - pass - - def on_sendable(self, event): - """ - Called when the sender link has credit and messages can - therefore be transferred. - """ - pass - - def on_accepted(self, event): - """ - Called when the remote peer accepts an outgoing message. - """ - pass - - def on_rejected(self, event): - """ - Called when the remote peer rejects an outgoing message. - """ - pass - - def on_released(self, event): - """ - Called when the remote peer releases an outgoing message. Note - that this may be in response to either the RELEASE or MODIFIED - state as defined by the AMQP specification. - """ - pass - - def on_settled(self, event): - """ - Called when the remote peer has settled the outgoing - message. This is the point at which it shouod never be - retransmitted. - """ - pass - def on_message(self, event): - """ - Called when a message is received. The message itself can be - obtained as a property on the event. For the purpose of - refering to this message in further actions (e.g. if - explicitly accepting it, the ``delivery`` should be used, also - obtainable via a property on the event. - """ - pass - -class TransactionHandler(object): - """ - The interface for transaction handlers, i.e. objects that want to - be notified of state changes related to a transaction. - """ - def on_transaction_declared(self, event): - pass - - def on_transaction_committed(self, event): - pass - - def on_transaction_aborted(self, event): - pass - - def on_transaction_declare_failed(self, event): - pass - - def on_transaction_commit_failed(self, event): - pass - -class TransactionalClientHandler(MessagingHandler, TransactionHandler): - """ - An extension to the MessagingHandler for applications using - transactions. - """ - - def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False): - super(TransactionalClientHandler, self).__init__(prefetch, auto_accept, auto_settle, peer_close_is_error) - - def accept(self, delivery, transaction=None): - if transaction: - transaction.accept(delivery) - else: - super(TransactionalClientHandler, self).accept(delivery) - -from proton import WrappedHandler -from cproton import pn_flowcontroller, pn_handshaker, pn_iohandler - -class CFlowController(WrappedHandler): - - def __init__(self, window=1024): - WrappedHandler.__init__(self, lambda: pn_flowcontroller(window)) - -class CHandshaker(WrappedHandler): - - def __init__(self): - WrappedHandler.__init__(self, pn_handshaker) - -class IOHandler(WrappedHandler): - - def __init__(self): - WrappedHandler.__init__(self, pn_iohandler) - -class PythonIO: - - def __init__(self): - self.selectables = [] - self.delegate = IOHandler() - - def on_unhandled(self, method, event): - event.dispatch(self.delegate) - - def on_selectable_init(self, event): - self.selectables.append(event.context) - - def on_selectable_updated(self, event): - pass - - def on_selectable_final(self, event): - sel = event.context - if sel.is_terminal: - self.selectables.remove(sel) - sel.release() - - def on_reactor_quiesced(self, event): - reactor = event.reactor - # check if we are still quiesced, other handlers of - # on_reactor_quiesced could have produced events to process - if not reactor.quiesced: return - - reading = [] - writing = [] - deadline = None - for sel in self.selectables: - if sel.reading: - reading.append(sel) - if sel.writing: - writing.append(sel) - if sel.deadline: - if deadline is None: - deadline = sel.deadline - else: - deadline = min(sel.deadline, deadline) - - if deadline is not None: - timeout = deadline - time.time() - else: - timeout = reactor.timeout - if (timeout < 0): timeout = 0 - timeout = min(timeout, reactor.timeout) - readable, writable, _ = select(reading, writing, [], timeout) - - reactor.mark() - - now = time.time() - - for s in readable: - s.readable() - for s in writable: - s.writable() - for s in self.selectables: - if s.deadline and now > s.deadline: - s.expired() - - reactor.yield_() http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/python/proton/reactor.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/reactor.py b/proton-c/bindings/python/proton/reactor.py deleted file mode 100644 index e4dab95..0000000 --- a/proton-c/bindings/python/proton/reactor.py +++ /dev/null @@ -1,884 +0,0 @@ -from __future__ import absolute_import -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import logging, os, socket, time, types -from heapq import heappush, heappop, nsmallest -from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch -from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message -from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol -from proton import Terminus, Timeout, Transport, TransportException, ulong, Url -from select import select -from proton.handlers import OutgoingMessageHandler -from proton import unicode2utf8, utf82unicode - -import traceback -from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable -from .wrapper import Wrapper, PYCTX -from cproton import * -from . import _compat - -try: - import Queue -except ImportError: - import queue as Queue - -class Task(Wrapper): - - @staticmethod - def wrap(impl): - if impl is None: - return None - else: - return Task(impl) - - def __init__(self, impl): - Wrapper.__init__(self, impl, pn_task_attachments) - - def _init(self): - pass - - def cancel(self): - pn_task_cancel(self._impl) - -class Acceptor(Wrapper): - - def __init__(self, impl): - Wrapper.__init__(self, impl) - - def set_ssl_domain(self, ssl_domain): - pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain) - - def close(self): - pn_acceptor_close(self._impl) - -class Reactor(Wrapper): - - @staticmethod - def wrap(impl): - if impl is None: - return None - else: - record = pn_reactor_attachments(impl) - attrs = pn_void2py(pn_record_get(record, PYCTX)) - if attrs and 'subclass' in attrs: - return attrs['subclass'](impl=impl) - else: - return Reactor(impl=impl) - - def __init__(self, *handlers, **kwargs): - Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments) - for h in handlers: - self.handler.add(h) - - def _init(self): - self.errors = [] - - def on_error(self, info): - self.errors.append(info) - self.yield_() - - def _get_global(self): - return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error) - - def _set_global(self, handler): - impl = _chandler(handler, self.on_error) - pn_reactor_set_global_handler(self._impl, impl) - pn_decref(impl) - - global_handler = property(_get_global, _set_global) - - def _get_timeout(self): - return millis2timeout(pn_reactor_get_timeout(self._impl)) - - def _set_timeout(self, secs): - return pn_reactor_set_timeout(self._impl, timeout2millis(secs)) - - timeout = property(_get_timeout, _set_timeout) - - def yield_(self): - pn_reactor_yield(self._impl) - - def mark(self): - return pn_reactor_mark(self._impl) - - def _get_handler(self): - return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error) - - def _set_handler(self, handler): - impl = _chandler(handler, self.on_error) - pn_reactor_set_handler(self._impl, impl) - pn_decref(impl) - - handler = property(_get_handler, _set_handler) - - def run(self): - self.timeout = 3.14159265359 - self.start() - while self.process(): pass - self.stop() - self.process() - self.global_handler = None - self.handler = None - - def wakeup(self): - n = pn_reactor_wakeup(self._impl) - if n: raise IOError(pn_error_text(pn_reactor_error(self._impl))) - - def start(self): - pn_reactor_start(self._impl) - - @property - def quiesced(self): - return pn_reactor_quiesced(self._impl) - - def _check_errors(self): - if self.errors: - for exc, value, tb in self.errors[:-1]: - traceback.print_exception(exc, value, tb) - exc, value, tb = self.errors[-1] - _compat.raise_(exc, value, tb) - - def process(self): - result = pn_reactor_process(self._impl) - self._check_errors() - return result - - def stop(self): - pn_reactor_stop(self._impl) - self._check_errors() - - def schedule(self, delay, task): - impl = _chandler(task, self.on_error) - task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl)) - pn_decref(impl) - return task - - def acceptor(self, host, port, handler=None): - impl = _chandler(handler, self.on_error) - aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl) - pn_decref(impl) - if aimpl: - return Acceptor(aimpl) - else: - raise IOError("%s (%s:%s)" % pn_error_text(pn_reactor_error(self._impl)), host, port) - - def connection(self, handler=None): - """Deprecated: use connection_to_host() instead - """ - impl = _chandler(handler, self.on_error) - result = Connection.wrap(pn_reactor_connection(self._impl, impl)) - if impl: pn_decref(impl) - return result - - def connection_to_host(self, host, port, handler=None): - """Create an outgoing Connection that will be managed by the reactor. - The reator's pn_iohandler will create a socket connection to the host - once the connection is opened. - """ - conn = self.connection(handler) - self.set_connection_host(conn, host, port) - return conn - - def set_connection_host(self, connection, host, port): - """Change the address used by the connection. The address is - used by the reactor's iohandler to create an outgoing socket - connection. This must be set prior to opening the connection. - """ - pn_reactor_set_connection_host(self._impl, - connection._impl, - unicode2utf8(str(host)), - unicode2utf8(str(port))) - - def get_connection_address(self, connection): - """This may be used to retrieve the remote peer address. - @return: string containing the address in URL format or None if no - address is available. Use the proton.Url class to create a Url object - from the returned value. - """ - _url = pn_reactor_get_connection_address(self._impl, connection._impl) - return utf82unicode(_url) - - def selectable(self, handler=None): - impl = _chandler(handler, self.on_error) - result = Selectable.wrap(pn_reactor_selectable(self._impl)) - if impl: - record = pn_selectable_attachments(result._impl) - pn_record_set_handler(record, impl) - pn_decref(impl) - return result - - def update(self, sel): - pn_reactor_update(self._impl, sel._impl) - - def push_event(self, obj, etype): - pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number) - -from proton import wrappers as _wrappers -_wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x)) -_wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x)) - - -class EventInjector(object): - """ - Can be added to a reactor to allow events to be triggered by an - external thread but handled on the event thread associated with - the reactor. An instance of this class can be passed to the - Reactor.selectable() method of the reactor in order to activate - it. The close() method should be called when it is no longer - needed, to allow the event loop to end if needed. - """ - def __init__(self): - self.queue = Queue.Queue() - self.pipe = os.pipe() - self._closed = False - - def trigger(self, event): - """ - Request that the given event be dispatched on the event thread - of the reactor to which this EventInjector was added. - """ - self.queue.put(event) - os.write(self.pipe[1], _compat.str2bin("!")) - - def close(self): - """ - Request that this EventInjector be closed. Existing events - will be dispctahed on the reactors event dispactch thread, - then this will be removed from the set of interest. - """ - self._closed = True - os.write(self.pipe[1], _compat.str2bin("!")) - - def fileno(self): - return self.pipe[0] - - def on_selectable_init(self, event): - sel = event.context - sel.fileno(self.fileno()) - sel.reading = True - event.reactor.update(sel) - - def on_selectable_readable(self, event): - os.read(self.pipe[0], 512) - while not self.queue.empty(): - requested = self.queue.get() - event.reactor.push_event(requested.context, requested.type) - if self._closed: - s = event.context - s.terminate() - event.reactor.update(s) - - -class ApplicationEvent(EventBase): - """ - Application defined event, which can optionally be associated with - an engine object and or an arbitrary subject - """ - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None): - super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename)) - self.connection = connection - self.session = session - self.link = link - self.delivery = delivery - if self.delivery: - self.link = self.delivery.link - if self.link: - self.session = self.link.session - if self.session: - self.connection = self.session.connection - self.subject = subject - - def __repr__(self): - objects = [self.connection, self.session, self.link, self.delivery, self.subject] - return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None])) - -class Transaction(object): - """ - Class to track state of an AMQP 1.0 transaction. - """ - def __init__(self, txn_ctrl, handler, settle_before_discharge=False): - self.txn_ctrl = txn_ctrl - self.handler = handler - self.id = None - self._declare = None - self._discharge = None - self.failed = False - self._pending = [] - self.settle_before_discharge = settle_before_discharge - self.declare() - - def commit(self): - self.discharge(False) - - def abort(self): - self.discharge(True) - - def declare(self): - self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None]) - - def discharge(self, failed): - self.failed = failed - self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed]) - - def _send_ctrl(self, descriptor, value): - delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value))) - delivery.transaction = self - return delivery - - def send(self, sender, msg, tag=None): - dlv = sender.send(msg, tag=tag) - dlv.local.data = [self.id] - dlv.update(0x34) - return dlv - - def accept(self, delivery): - self.update(delivery, PN_ACCEPTED) - if self.settle_before_discharge: - delivery.settle() - else: - self._pending.append(delivery) - - def update(self, delivery, state=None): - if state: - delivery.local.data = [self.id, Described(ulong(state), [])] - delivery.update(0x34) - - def _release_pending(self): - for d in self._pending: - d.update(Delivery.RELEASED) - d.settle() - self._clear_pending() - - def _clear_pending(self): - self._pending = [] - - def handle_outcome(self, event): - if event.delivery == self._declare: - if event.delivery.remote.data: - self.id = event.delivery.remote.data[0] - self.handler.on_transaction_declared(event) - elif event.delivery.remote_state == Delivery.REJECTED: - self.handler.on_transaction_declare_failed(event) - else: - logging.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state) - self.handler.on_transaction_declare_failed(event) - elif event.delivery == self._discharge: - if event.delivery.remote_state == Delivery.REJECTED: - if not self.failed: - self.handler.on_transaction_commit_failed(event) - self._release_pending() # make this optional? - else: - if self.failed: - self.handler.on_transaction_aborted(event) - self._release_pending() - else: - self.handler.on_transaction_committed(event) - self._clear_pending() - -class LinkOption(object): - """ - Abstract interface for link configuration options - """ - def apply(self, link): - """ - Subclasses will implement any configuration logic in this - method - """ - pass - def test(self, link): - """ - Subclasses can override this to selectively apply an option - e.g. based on some link criteria - """ - return True - -class AtMostOnce(LinkOption): - def apply(self, link): - link.snd_settle_mode = Link.SND_SETTLED - -class AtLeastOnce(LinkOption): - def apply(self, link): - link.snd_settle_mode = Link.SND_UNSETTLED - link.rcv_settle_mode = Link.RCV_FIRST - -class SenderOption(LinkOption): - def apply(self, sender): pass - def test(self, link): return link.is_sender - -class ReceiverOption(LinkOption): - def apply(self, receiver): pass - def test(self, link): return link.is_receiver - -class DynamicNodeProperties(LinkOption): - def __init__(self, props={}): - self.properties = {} - for k in props: - if isinstance(k, symbol): - self.properties[k] = props[k] - else: - self.properties[symbol(k)] = props[k] - - def apply(self, link): - if link.is_receiver: - link.source.properties.put_dict(self.properties) - else: - link.target.properties.put_dict(self.properties) - -class Filter(ReceiverOption): - def __init__(self, filter_set={}): - self.filter_set = filter_set - - def apply(self, receiver): - receiver.source.filter.put_dict(self.filter_set) - -class Selector(Filter): - """ - Configures a link with a message selector filter - """ - def __init__(self, value, name='selector'): - super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)}) - -class DurableSubscription(ReceiverOption): - def apply(self, receiver): - receiver.source.durability = Terminus.DELIVERIES - receiver.source.expiry_policy = Terminus.EXPIRE_NEVER - -class Move(ReceiverOption): - def apply(self, receiver): - receiver.source.distribution_mode = Terminus.DIST_MODE_MOVE - -class Copy(ReceiverOption): - def apply(self, receiver): - receiver.source.distribution_mode = Terminus.DIST_MODE_COPY - -def _apply_link_options(options, link): - if options: - if isinstance(options, list): - for o in options: - if o.test(link): o.apply(link) - else: - if options.test(link): options.apply(link) - -def _create_session(connection, handler=None): - session = connection.session() - session.open() - return session - - -def _get_attr(target, name): - if hasattr(target, name): - return getattr(target, name) - else: - return None - -class SessionPerConnection(object): - def __init__(self): - self._default_session = None - - def session(self, connection): - if not self._default_session: - self._default_session = _create_session(connection) - self._default_session.context = self - return self._default_session - - def on_session_remote_close(self, event): - event.connection.close() - self._default_session = None - -class GlobalOverrides(object): - """ - Internal handler that triggers the necessary socket connect for an - opened connection. - """ - def __init__(self, base): - self.base = base - - def on_unhandled(self, name, event): - if not self._override(event): - event.dispatch(self.base) - - def _override(self, event): - conn = event.connection - return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides) - -class Connector(Handler): - """ - Internal handler that triggers the necessary socket connect for an - opened connection. - """ - def __init__(self, connection): - self.connection = connection - self.address = None - self.heartbeat = None - self.reconnect = None - self.ssl_domain = None - self.allow_insecure_mechs = True - self.allowed_mechs = None - self.sasl_enabled = True - self.user = None - self.password = None - self.virtual_host = None - self.ssl_sni = None - - def _connect(self, connection, reactor): - assert(reactor is not None) - url = self.address.next() - reactor.set_connection_host(connection, url.host, str(url.port)) - # if virtual-host not set, use host from address as default - if self.virtual_host is None: - connection.hostname = url.host - logging.debug("connecting to %s..." % url) - - transport = Transport() - if self.sasl_enabled: - sasl = transport.sasl() - sasl.allow_insecure_mechs = self.allow_insecure_mechs - if url.username: - connection.user = url.username - elif self.user: - connection.user = self.user - if url.password: - connection.password = url.password - elif self.password: - connection.password = self.password - if self.allowed_mechs: - sasl.allowed_mechs(self.allowed_mechs) - transport.bind(connection) - if self.heartbeat: - transport.idle_timeout = self.heartbeat - if url.scheme == 'amqps': - if not self.ssl_domain: - raise SSLUnavailable("amqps: SSL libraries not found") - self.ssl = SSL(transport, self.ssl_domain) - self.ssl.peer_hostname = self.ssl_sni or self.virtual_host or url.host - - def on_connection_local_open(self, event): - self._connect(event.connection, event.reactor) - - def on_connection_remote_open(self, event): - logging.debug("connected to %s" % event.connection.hostname) - if self.reconnect: - self.reconnect.reset() - self.transport = None - - def on_transport_tail_closed(self, event): - self.on_transport_closed(event) - - def on_transport_closed(self, event): - if self.connection and self.connection.state & Endpoint.LOCAL_ACTIVE: - if self.reconnect: - event.transport.unbind() - delay = self.reconnect.next() - if delay == 0: - logging.info("Disconnected, reconnecting...") - self._connect(self.connection, event.reactor) - else: - logging.info("Disconnected will try to reconnect after %s seconds" % delay) - event.reactor.schedule(delay, self) - else: - logging.debug("Disconnected") - self.connection = None - - def on_timer_task(self, event): - self._connect(self.connection, event.reactor) - - def on_connection_remote_close(self, event): - self.connection = None - -class Backoff(object): - """ - A reconnect strategy involving an increasing delay between - retries, up to a maximum or 10 seconds. - """ - def __init__(self): - self.delay = 0 - - def reset(self): - self.delay = 0 - - def next(self): - current = self.delay - if current == 0: - self.delay = 0.1 - else: - self.delay = min(10, 2*current) - return current - -class Urls(object): - def __init__(self, values): - self.values = [Url(v) for v in values] - self.i = iter(self.values) - - def __iter__(self): - return self - - def next(self): - try: - return next(self.i) - except StopIteration: - self.i = iter(self.values) - return next(self.i) - -class SSLConfig(object): - def __init__(self): - self.client = SSLDomain(SSLDomain.MODE_CLIENT) - self.server = SSLDomain(SSLDomain.MODE_SERVER) - - def set_credentials(self, cert_file, key_file, password): - self.client.set_credentials(cert_file, key_file, password) - self.server.set_credentials(cert_file, key_file, password) - - def set_trusted_ca_db(self, certificate_db): - self.client.set_trusted_ca_db(certificate_db) - self.server.set_trusted_ca_db(certificate_db) - - -class Container(Reactor): - """A representation of the AMQP concept of a 'container', which - lossely speaking is something that establishes links to or from - another container, over which messages are transfered. This is - an extension to the Reactor class that adds convenience methods - for creating connections and sender- or receiver- links. - """ - def __init__(self, *handlers, **kwargs): - super(Container, self).__init__(*handlers, **kwargs) - if "impl" not in kwargs: - try: - self.ssl = SSLConfig() - except SSLUnavailable: - self.ssl = None - self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler)) - self.trigger = None - self.container_id = str(generate_uuid()) - self.allow_insecure_mechs = True - self.allowed_mechs = None - self.sasl_enabled = True - self.user = None - self.password = None - Wrapper.__setattr__(self, 'subclass', self.__class__) - - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs): - """ - Initiates the establishment of an AMQP connection. Returns an - instance of proton.Connection. - - @param url: URL string of process to connect to - - @param urls: list of URL strings of process to try to connect to - - Only one of url or urls should be specified. - - @param reconnect: A value of False will prevent the library - form automatically trying to reconnect if the underlying - socket is disconnected before the connection has been closed. - - @param heartbeat: A value in milliseconds indicating the - desired frequency of heartbeats used to test the underlying - socket is alive. - - @param ssl_domain: SSL configuration in the form of an - instance of proton.SSLdomain. - - @param handler: a connection scoped handler that will be - called to process any events in the scope of this connection - or its child links - - @param kwargs: sasl_enabled, which determines whether a sasl layer is - used for the connection; allowed_mechs an optional list of SASL - mechanisms to allow if sasl is enabled; allow_insecure_mechs a flag - indicating whether insecure mechanisms, such as PLAIN over a - non-encrypted socket, are allowed; 'virtual_host' the hostname to set - in the Open performative used by peer to determine the correct - back-end service for the client. If 'virtual_host' is not supplied the - host field from the URL is used instead." - - """ - conn = self.connection(handler) - conn.container = self.container_id or str(generate_uuid()) - conn.offered_capabilities = kwargs.get('offered_capabilities') - conn.desired_capabilities = kwargs.get('desired_capabilities') - conn.properties = kwargs.get('properties') - - connector = Connector(conn) - connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs) - connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs) - connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled) - connector.user = kwargs.get('user', self.user) - connector.password = kwargs.get('password', self.password) - connector.virtual_host = kwargs.get('virtual_host') - if connector.virtual_host: - # only set hostname if virtual-host is a non-empty string - conn.hostname = connector.virtual_host - connector.ssl_sni = kwargs.get('sni') - - conn._overrides = connector - if url: connector.address = Urls([url]) - elif urls: connector.address = Urls(urls) - elif address: connector.address = address - else: raise ValueError("One of url, urls or address required") - if heartbeat: - connector.heartbeat = heartbeat - if reconnect: - connector.reconnect = reconnect - elif reconnect is None: - connector.reconnect = Backoff() - # use container's default client domain if none specified. This is - # only necessary of the URL specifies the "amqps:" scheme - connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client) - conn._session_policy = SessionPerConnection() #todo: make configurable - conn.open() - return conn - - def _get_id(self, container, remote, local): - if local and remote: "%s-%s-%s" % (container, remote, local) - elif local: return "%s-%s" % (container, local) - elif remote: return "%s-%s" % (container, remote) - else: return "%s-%s" % (container, str(generate_uuid())) - - def _get_session(self, context): - if isinstance(context, Url): - return self._get_session(self.connect(url=context)) - elif isinstance(context, Session): - return context - elif isinstance(context, Connection): - if hasattr(context, '_session_policy'): - return context._session_policy.session(context) - else: - return _create_session(context) - else: - return context.session() - - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None): - """ - Initiates the establishment of a link over which messages can - be sent. Returns an instance of proton.Sender. - - There are two patterns of use. (1) A connection can be passed - as the first argument, in which case the link is established - on that connection. In this case the target address can be - specified as the second argument (or as a keyword - argument). The source address can also be specified if - desired. (2) Alternatively a URL can be passed as the first - argument. In this case a new connection will be establised on - which the link will be attached. If a path is specified and - the target is not, then the path of the URL is used as the - target address. - - The name of the link may be specified if desired, otherwise a - unique name will be generated. - - Various LinkOptions can be specified to further control the - attachment. - """ - if isinstance(context, _compat.STRING_TYPES): - context = Url(context) - if isinstance(context, Url) and not target: - target = context.path - session = self._get_session(context) - snd = session.sender(name or self._get_id(session.connection.container, target, source)) - if source: - snd.source.address = source - if target: - snd.target.address = target - if handler != None: - snd.handler = handler - if tags: - snd.tag_generator = tags - _apply_link_options(options, snd) - snd.open() - return snd - - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None): - """ - Initiates the establishment of a link over which messages can - be received (aka a subscription). Returns an instance of - proton.Receiver. - - There are two patterns of use. (1) A connection can be passed - as the first argument, in which case the link is established - on that connection. In this case the source address can be - specified as the second argument (or as a keyword - argument). The target address can also be specified if - desired. (2) Alternatively a URL can be passed as the first - argument. In this case a new connection will be establised on - which the link will be attached. If a path is specified and - the source is not, then the path of the URL is used as the - target address. - - The name of the link may be specified if desired, otherwise a - unique name will be generated. - - Various LinkOptions can be specified to further control the - attachment. - """ - if isinstance(context, _compat.STRING_TYPES): - context = Url(context) - if isinstance(context, Url) and not source: - source = context.path - session = self._get_session(context) - rcv = session.receiver(name or self._get_id(session.connection.container, source, target)) - if source: - rcv.source.address = source - if dynamic: - rcv.source.dynamic = True - if target: - rcv.target.address = target - if handler != None: - rcv.handler = handler - _apply_link_options(options, rcv) - rcv.open() - return rcv - - def declare_transaction(self, context, handler=None, settle_before_discharge=False): - if not _get_attr(context, '_txn_ctrl'): - class InternalTransactionHandler(OutgoingMessageHandler): - def __init__(self): - super(InternalTransactionHandler, self).__init__(auto_settle=True) - - def on_settled(self, event): - if hasattr(event.delivery, "transaction"): - event.transaction = event.delivery.transaction - event.delivery.transaction.handle_outcome(event) - context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler()) - context._txn_ctrl.target.type = Terminus.COORDINATOR - context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) - return Transaction(context._txn_ctrl, handler, settle_before_discharge) - - def listen(self, url, ssl_domain=None): - """ - Initiates a server socket, accepting incoming AMQP connections - on the interface and port specified. - """ - url = Url(url) - acceptor = self.acceptor(url.host, url.port) - ssl_config = ssl_domain - if not ssl_config and url.scheme == 'amqps': - # use container's default server domain - if self.ssl: - ssl_config = self.ssl.server - else: - raise SSLUnavailable("amqps: SSL libraries not found") - if ssl_config: - acceptor.set_ssl_domain(ssl_config) - return acceptor - - def do_work(self, timeout=None): - if timeout: - self.timeout = timeout - return self.process() http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/python/proton/utils.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py deleted file mode 100644 index 9cd7cf3..0000000 --- a/proton-c/bindings/python/proton/utils.py +++ /dev/null @@ -1,356 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import collections, socket, time, threading - -from proton import ConnectionException, Delivery, Endpoint, Handler, Link, LinkException, Message -from proton import ProtonException, Timeout, Url -from proton.reactor import Container -from proton.handlers import MessagingHandler, IncomingMessageHandler - - -class BlockingLink(object): - def __init__(self, connection, link): - self.connection = connection - self.link = link - self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT), - msg="Opening link %s" % link.name) - self._checkClosed() - - def _waitForClose(self, timeout=1): - try: - self.connection.wait(lambda: self.link.state & Endpoint.REMOTE_CLOSED, - timeout=timeout, - msg="Opening link %s" % self.link.name) - except Timeout as e: pass - self._checkClosed() - - def _checkClosed(self): - if self.link.state & Endpoint.REMOTE_CLOSED: - self.link.close() - raise LinkDetached(self.link) - - def close(self): - self.link.close() - self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_ACTIVE), - msg="Closing link %s" % self.link.name) - - # Access to other link attributes. - def __getattr__(self, name): return getattr(self.link, name) - -class SendException(ProtonException): - """ - Exception used to indicate an exceptional state/condition on a send request - """ - def __init__(self, state): - self.state = state - -def _is_settled(delivery): - return delivery.settled or delivery.link.snd_settle_mode == Link.SND_SETTLED - -class BlockingSender(BlockingLink): - def __init__(self, connection, sender): - super(BlockingSender, self).__init__(connection, sender) - if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address: - #this may be followed by a detach, which may contain an error condition, so wait a little... - self._waitForClose() - #...but close ourselves if peer does not - self.link.close() - raise LinkException("Failed to open sender %s, target does not match" % self.link.name) - - def send(self, msg, timeout=False, error_states=None): - delivery = self.link.send(msg) - self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name, timeout=timeout) - if delivery.link.snd_settle_mode != Link.SND_SETTLED: - delivery.settle() - bad = error_states - if bad is None: - bad = [Delivery.REJECTED, Delivery.RELEASED] - if delivery.remote_state in bad: - raise SendException(delivery.remote_state) - return delivery - -class Fetcher(MessagingHandler): - def __init__(self, connection, prefetch): - super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False) - self.connection = connection - self.incoming = collections.deque([]) - self.unsettled = collections.deque([]) - - def on_message(self, event): - self.incoming.append((event.message, event.delivery)) - self.connection.container.yield_() # Wake up the wait() loop to handle the message. - - def on_link_error(self, event): - if event.link.state & Endpoint.LOCAL_ACTIVE: - event.link.close() - raise LinkDetached(event.link) - - def on_connection_error(self, event): - raise ConnectionClosed(event.connection) - - @property - def has_message(self): - return len(self.incoming) - - def pop(self): - message, delivery = self.incoming.popleft() - if not delivery.settled: - self.unsettled.append(delivery) - return message - - def settle(self, state=None): - delivery = self.unsettled.popleft() - if state: - delivery.update(state) - delivery.settle() - - -class BlockingReceiver(BlockingLink): - def __init__(self, connection, receiver, fetcher, credit=1): - super(BlockingReceiver, self).__init__(connection, receiver) - if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address: - #this may be followed by a detach, which may contain an error condition, so wait a little... - self._waitForClose() - #...but close ourselves if peer does not - self.link.close() - raise LinkException("Failed to open receiver %s, source does not match" % self.link.name) - if credit: receiver.flow(credit) - self.fetcher = fetcher - - def __del__(self): - self.fetcher = None - self.link.handler = None - - def receive(self, timeout=False): - if not self.fetcher: - raise Exception("Can't call receive on this receiver as a handler was provided") - if not self.link.credit: - self.link.flow(1) - self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, timeout=timeout) - return self.fetcher.pop() - - def accept(self): - self.settle(Delivery.ACCEPTED) - - def reject(self): - self.settle(Delivery.REJECTED) - - def release(self, delivered=True): - if delivered: - self.settle(Delivery.MODIFIED) - else: - self.settle(Delivery.RELEASED) - - def settle(self, state=None): - if not self.fetcher: - raise Exception("Can't call accept/reject etc on this receiver as a handler was provided") - self.fetcher.settle(state) - - -class LinkDetached(LinkException): - def __init__(self, link): - self.link = link - if link.is_sender: - txt = "sender %s to %s closed" % (link.name, link.target.address) - else: - txt = "receiver %s from %s closed" % (link.name, link.source.address) - if link.remote_condition: - txt += " due to: %s" % link.remote_condition - self.condition = link.remote_condition.name - else: - txt += " by peer" - self.condition = None - super(LinkDetached, self).__init__(txt) - - -class ConnectionClosed(ConnectionException): - def __init__(self, connection): - self.connection = connection - txt = "Connection %s closed" % connection.hostname - if connection.remote_condition: - txt += " due to: %s" % connection.remote_condition - self.condition = connection.remote_condition.name - else: - txt += " by peer" - self.condition = None - super(ConnectionClosed, self).__init__(txt) - - -class BlockingConnection(Handler): - """ - A synchronous style connection wrapper. - """ - def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None, **kwargs): - self.disconnected = False - self.timeout = timeout or 60 - self.container = container or Container() - self.container.timeout = self.timeout - self.container.start() - self.url = Url(url).defaults() - self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat, **kwargs) - self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), - msg="Opening connection") - - def create_sender(self, address, handler=None, name=None, options=None): - return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler, options=options)) - - def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None): - prefetch = credit - if handler: - fetcher = None - if prefetch is None: - prefetch = 1 - else: - fetcher = Fetcher(self, credit) - return BlockingReceiver( - self, self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, options=options), fetcher, credit=prefetch) - - def close(self): - self.conn.close() - try: - self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), - msg="Closing connection") - finally: - self.conn = None - self.container = None - - def _is_closed(self): - return self.conn.state & (Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED) - - def run(self): - """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ - while self.container.process(): pass - - def wait(self, condition, timeout=False, msg=None): - """Call process until condition() is true""" - if timeout is False: - timeout = self.timeout - if timeout is None: - while not condition() and not self.disconnected: - self.container.process() - else: - container_timeout = self.container.timeout - self.container.timeout = timeout - try: - deadline = time.time() + timeout - while not condition() and not self.disconnected: - self.container.process() - if deadline < time.time(): - txt = "Connection %s timed out" % self.url - if msg: txt += ": " + msg - raise Timeout(txt) - finally: - self.container.timeout = container_timeout - if self.disconnected or self._is_closed(): - self.container.stop() - self.conn.handler = None # break cyclical reference - if self.disconnected and not self._is_closed(): - raise ConnectionException( - "Connection %s disconnected: %s" % (self.url, self.disconnected)) - - def on_link_remote_close(self, event): - if event.link.state & Endpoint.LOCAL_ACTIVE: - event.link.close() - raise LinkDetached(event.link) - - def on_connection_remote_close(self, event): - if event.connection.state & Endpoint.LOCAL_ACTIVE: - event.connection.close() - raise ConnectionClosed(event.connection) - - def on_transport_tail_closed(self, event): - self.on_transport_closed(event) - - def on_transport_head_closed(self, event): - self.on_transport_closed(event) - - def on_transport_closed(self, event): - self.disconnected = event.transport.condition or "unknown" - -class AtomicCount(object): - def __init__(self, start=0, step=1): - """Thread-safe atomic counter. Start at start, increment by step.""" - self.count, self.step = start, step - self.lock = threading.Lock() - - def next(self): - """Get the next value""" - self.lock.acquire() - self.count += self.step; - result = self.count - self.lock.release() - return result - -class SyncRequestResponse(IncomingMessageHandler): - """ - Implementation of the synchronous request-responce (aka RPC) pattern. - @ivar address: Address for all requests, may be None. - @ivar connection: Connection for requests and responses. - """ - - correlation_id = AtomicCount() - - def __init__(self, connection, address=None): - """ - Send requests and receive responses. A single instance can send many requests - to the same or different addresses. - - @param connection: A L{BlockingConnection} - @param address: Address for all requests. - If not specified, each request must have the address property set. - Sucessive messages may have different addresses. - """ - super(SyncRequestResponse, self).__init__() - self.connection = connection - self.address = address - self.sender = self.connection.create_sender(self.address) - # dynamic=true generates a unique address dynamically for this receiver. - # credit=1 because we want to receive 1 response message initially. - self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) - self.response = None - - def call(self, request): - """ - Send a request message, wait for and return the response message. - - @param request: A L{proton.Message}. If L{self.address} is not set the - L{self.address} must be set and will be used. - """ - if not self.address and not request.address: - raise ValueError("Request message has no address: %s" % request) - request.reply_to = self.reply_to - request.correlation_id = correlation_id = self.correlation_id.next() - self.sender.send(request) - def wakeup(): - return self.response and (self.response.correlation_id == correlation_id) - self.connection.wait(wakeup, msg="Waiting for response") - response = self.response - self.response = None # Ready for next response. - self.receiver.flow(1) # Set up credit for the next response. - return response - - @property - def reply_to(self): - """Return the dynamic address of our receiver.""" - return self.receiver.remote_source.address - - def on_message(self, event): - """Called when we receive a message for our receiver.""" - self.response = event.message - self.connection.container.yield_() # Wake up the wait() loop to handle the message. http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/python/proton/wrapper.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/wrapper.py b/proton-c/bindings/python/proton/wrapper.py deleted file mode 100644 index eff917b..0000000 --- a/proton-c/bindings/python/proton/wrapper.py +++ /dev/null @@ -1,114 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from cproton import * - -class EmptyAttrs: - - def __contains__(self, name): - return False - - def __getitem__(self, name): - raise KeyError(name) - - def __setitem__(self, name, value): - raise TypeError("does not support item assignment") - -EMPTY_ATTRS = EmptyAttrs() - -class Wrapper(object): - - def __init__(self, impl_or_constructor, get_context=None): - init = False - if callable(impl_or_constructor): - # we are constructing a new object - impl = impl_or_constructor() - if impl is None: - from proton import ProtonException - raise ProtonException("Wrapper failed to create wrapped object. Check for file descriptor or memory exhaustion.") - init = True - else: - # we are wrapping an existing object - impl = impl_or_constructor - pn_incref(impl) - - if get_context: - record = get_context(impl) - attrs = pn_void2py(pn_record_get(record, PYCTX)) - if attrs is None: - attrs = {} - pn_record_def(record, PYCTX, PN_PYREF) - pn_record_set(record, PYCTX, pn_py2void(attrs)) - init = True - else: - attrs = EMPTY_ATTRS - init = False - record = None - self.__dict__["_impl"] = impl - self.__dict__["_attrs"] = attrs - self.__dict__["_record"] = record - if init: self._init() - - def __getattr__(self, name): - attrs = self.__dict__["_attrs"] - if name in attrs: - return attrs[name] - else: - raise AttributeError(name + " not in _attrs") - - def __setattr__(self, name, value): - if hasattr(self.__class__, name): - object.__setattr__(self, name, value) - else: - attrs = self.__dict__["_attrs"] - attrs[name] = value - - def __delattr__(self, name): - attrs = self.__dict__["_attrs"] - if attrs: - del attrs[name] - - def __hash__(self): - return hash(addressof(self._impl)) - - def __eq__(self, other): - if isinstance(other, Wrapper): - return addressof(self._impl) == addressof(other._impl) - return False - - def __ne__(self, other): - if isinstance(other, Wrapper): - return addressof(self._impl) != addressof(other._impl) - return True - - def __del__(self): - pn_decref(self._impl) - - def __repr__(self): - return '<%s.%s 0x%x ~ 0x%x>' % (self.__class__.__module__, - self.__class__.__name__, - id(self), addressof(self._impl)) - - -if pn_py2void(Wrapper) is Wrapper: - PYCTX = Wrapper - import java.lang.System - addressof = java.lang.System.identityHashCode -else: - PYCTX = int(pn_py2void(Wrapper)) - addressof = int --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
