http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_exceptions.py ---------------------------------------------------------------------- diff --git a/python/proton/_exceptions.py b/python/proton/_exceptions.py new file mode 100644 index 0000000..47420c2 --- /dev/null +++ b/python/proton/_exceptions.py @@ -0,0 +1,92 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import absolute_import + +from cproton import PN_TIMEOUT, PN_INTR + + +class ProtonException(Exception): + """ + The root of the proton exception hierarchy. All proton exception + classes derive from this exception. + """ + pass + + +class Timeout(ProtonException): + """ + A timeout exception indicates that a blocking operation has timed + out. + """ + pass + + +class Interrupt(ProtonException): + """ + An interrupt exception indicates that a blocking operation was interrupted. + """ + pass + + +EXCEPTIONS = { + PN_TIMEOUT: Timeout, + PN_INTR: Interrupt +} + + +class MessageException(ProtonException): + """ + The MessageException class is the root of the message exception + hierarchy. All exceptions generated by the Message class derive from + this exception. + """ + pass + + +class DataException(ProtonException): + """ + The DataException class is the root of the Data exception hierarchy. + All exceptions raised by the Data class extend this exception. + """ + pass + + +class TransportException(ProtonException): + pass + + +class SSLException(TransportException): + pass + + +class SSLUnavailable(SSLException): + pass + + +class ConnectionException(ProtonException): + pass + + +class SessionException(ProtonException): + pass + + +class LinkException(ProtonException): + pass
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_message.py ---------------------------------------------------------------------- diff --git a/python/proton/_message.py b/python/proton/_message.py new file mode 100644 index 0000000..32a8c72 --- /dev/null +++ b/python/proton/_message.py @@ -0,0 +1,465 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import absolute_import + +from cproton import PN_STATUS_SETTLED, PN_DEFAULT_PRIORITY, PN_STATUS_MODIFIED, PN_STATUS_RELEASED, PN_STATUS_ABORTED, \ + PN_STATUS_REJECTED, PN_STATUS_PENDING, PN_STATUS_UNKNOWN, PN_STATUS_ACCEPTED, \ + PN_OVERFLOW, \ + pn_message_set_delivery_count, pn_message_set_address, pn_message_properties, \ + pn_message_get_user_id, pn_message_set_content_encoding, pn_message_get_subject, pn_message_get_priority, \ + pn_message_get_content_encoding, pn_message_body, \ + pn_message_correlation_id, pn_message_get_address, pn_message_set_content_type, pn_message_get_group_id, \ + pn_message_set_expiry_time, pn_message_set_creation_time, pn_message_error, \ + pn_message_is_first_acquirer, pn_message_set_priority, \ + pn_message_free, pn_message_get_creation_time, pn_message_is_inferred, pn_message_set_subject, \ + pn_message_set_user_id, pn_message_set_group_id, \ + pn_message_id, pn_message_clear, pn_message_set_durable, \ + pn_message_set_first_acquirer, pn_message_get_delivery_count, \ + pn_message_decode, pn_message_set_reply_to_group_id, \ + pn_message_get_group_sequence, pn_message_set_reply_to, \ + pn_message_set_ttl, pn_message_get_reply_to, pn_message, pn_message_annotations, pn_message_is_durable, \ + pn_message_instructions, pn_message_get_content_type, \ + pn_message_get_reply_to_group_id, pn_message_get_ttl, pn_message_encode, pn_message_get_expiry_time, \ + pn_message_set_group_sequence, pn_message_set_inferred, \ + pn_inspect, pn_string, pn_string_get, pn_free, pn_error_text + +from . import _compat +from ._common import Constant, isinteger, secs2millis, millis2secs, unicode2utf8, utf82unicode +from ._data import Data, ulong, symbol +from ._endpoints import Link +from ._exceptions import EXCEPTIONS, MessageException + +PENDING = Constant("PENDING") +ACCEPTED = Constant("ACCEPTED") +REJECTED = Constant("REJECTED") +RELEASED = Constant("RELEASED") +MODIFIED = Constant("MODIFIED") +ABORTED = Constant("ABORTED") +SETTLED = Constant("SETTLED") + +STATUSES = { + PN_STATUS_ABORTED: ABORTED, + PN_STATUS_ACCEPTED: ACCEPTED, + PN_STATUS_REJECTED: REJECTED, + PN_STATUS_RELEASED: RELEASED, + PN_STATUS_MODIFIED: MODIFIED, + PN_STATUS_PENDING: PENDING, + PN_STATUS_SETTLED: SETTLED, + PN_STATUS_UNKNOWN: None +} + + +class Message(object): + """The L{Message} class is a mutable holder of message content. + + @ivar instructions: delivery instructions for the message + @type instructions: dict + @ivar annotations: infrastructure defined message annotations + @type annotations: dict + @ivar properties: application defined message properties + @type properties: dict + @ivar body: message body + @type body: bytes | unicode | dict | list | int | long | float | UUID + """ + + DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY + + def __init__(self, body=None, **kwargs): + """ + @param kwargs: Message property name/value pairs to initialise the Message + """ + self._msg = pn_message() + self._id = Data(pn_message_id(self._msg)) + self._correlation_id = Data(pn_message_correlation_id(self._msg)) + self.instructions = None + self.annotations = None + self.properties = None + self.body = body + for k, v in _compat.iteritems(kwargs): + getattr(self, k) # Raise exception if it's not a valid attribute. + setattr(self, k, v) + + def __del__(self): + if hasattr(self, "_msg"): + pn_message_free(self._msg) + del self._msg + + def _check(self, err): + if err < 0: + exc = EXCEPTIONS.get(err, MessageException) + raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg)))) + else: + return err + + def _check_property_keys(self): + for k in self.properties.keys(): + if isinstance(k, unicode): + # py2 unicode, py3 str (via hack definition) + continue + # If key is binary then change to string + elif isinstance(k, str): + # py2 str + self.properties[k.encode('utf-8')] = self.properties.pop(k) + else: + raise MessageException('Application property key is not string type: key=%s %s' % (str(k), type(k))) + + def _pre_encode(self): + inst = Data(pn_message_instructions(self._msg)) + ann = Data(pn_message_annotations(self._msg)) + props = Data(pn_message_properties(self._msg)) + body = Data(pn_message_body(self._msg)) + + inst.clear() + if self.instructions is not None: + inst.put_object(self.instructions) + ann.clear() + if self.annotations is not None: + ann.put_object(self.annotations) + props.clear() + if self.properties is not None: + self._check_property_keys() + props.put_object(self.properties) + body.clear() + if self.body is not None: + body.put_object(self.body) + + def _post_decode(self): + inst = Data(pn_message_instructions(self._msg)) + ann = Data(pn_message_annotations(self._msg)) + props = Data(pn_message_properties(self._msg)) + body = Data(pn_message_body(self._msg)) + + if inst.next(): + self.instructions = inst.get_object() + else: + self.instructions = None + if ann.next(): + self.annotations = ann.get_object() + else: + self.annotations = None + if props.next(): + self.properties = props.get_object() + else: + self.properties = None + if body.next(): + self.body = body.get_object() + else: + self.body = None + + def clear(self): + """ + Clears the contents of the L{Message}. All fields will be reset to + their default values. + """ + pn_message_clear(self._msg) + self.instructions = None + self.annotations = None + self.properties = None + self.body = None + + def _is_inferred(self): + return pn_message_is_inferred(self._msg) + + def _set_inferred(self, value): + self._check(pn_message_set_inferred(self._msg, bool(value))) + + inferred = property(_is_inferred, _set_inferred, doc=""" +The inferred flag for a message indicates how the message content +is encoded into AMQP sections. If inferred is true then binary and +list values in the body of the message will be encoded as AMQP DATA +and AMQP SEQUENCE sections, respectively. If inferred is false, +then all values in the body of the message will be encoded as AMQP +VALUE sections regardless of their type. +""") + + def _is_durable(self): + return pn_message_is_durable(self._msg) + + def _set_durable(self, value): + self._check(pn_message_set_durable(self._msg, bool(value))) + + durable = property(_is_durable, _set_durable, + doc=""" +The durable property indicates that the message should be held durably +by any intermediaries taking responsibility for the message. +""") + + def _get_priority(self): + return pn_message_get_priority(self._msg) + + def _set_priority(self, value): + self._check(pn_message_set_priority(self._msg, value)) + + priority = property(_get_priority, _set_priority, + doc=""" +The priority of the message. +""") + + def _get_ttl(self): + return millis2secs(pn_message_get_ttl(self._msg)) + + def _set_ttl(self, value): + self._check(pn_message_set_ttl(self._msg, secs2millis(value))) + + ttl = property(_get_ttl, _set_ttl, + doc=""" +The time to live of the message measured in seconds. Expired messages +may be dropped. +""") + + def _is_first_acquirer(self): + return pn_message_is_first_acquirer(self._msg) + + def _set_first_acquirer(self, value): + self._check(pn_message_set_first_acquirer(self._msg, bool(value))) + + first_acquirer = property(_is_first_acquirer, _set_first_acquirer, + doc=""" +True iff the recipient is the first to acquire the message. +""") + + def _get_delivery_count(self): + return pn_message_get_delivery_count(self._msg) + + def _set_delivery_count(self, value): + self._check(pn_message_set_delivery_count(self._msg, value)) + + delivery_count = property(_get_delivery_count, _set_delivery_count, + doc=""" +The number of delivery attempts made for this message. +""") + + def _get_id(self): + return self._id.get_object() + + def _set_id(self, value): + if isinteger(value): + value = ulong(value) + self._id.rewind() + self._id.put_object(value) + + id = property(_get_id, _set_id, + doc=""" +The id of the message. +""") + + def _get_user_id(self): + return pn_message_get_user_id(self._msg) + + def _set_user_id(self, value): + self._check(pn_message_set_user_id(self._msg, value)) + + user_id = property(_get_user_id, _set_user_id, + doc=""" +The user id of the message creator. +""") + + def _get_address(self): + return utf82unicode(pn_message_get_address(self._msg)) + + def _set_address(self, value): + self._check(pn_message_set_address(self._msg, unicode2utf8(value))) + + address = property(_get_address, _set_address, + doc=""" +The address of the message. +""") + + def _get_subject(self): + return utf82unicode(pn_message_get_subject(self._msg)) + + def _set_subject(self, value): + self._check(pn_message_set_subject(self._msg, unicode2utf8(value))) + + subject = property(_get_subject, _set_subject, + doc=""" +The subject of the message. +""") + + def _get_reply_to(self): + return utf82unicode(pn_message_get_reply_to(self._msg)) + + def _set_reply_to(self, value): + self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value))) + + reply_to = property(_get_reply_to, _set_reply_to, + doc=""" +The reply-to address for the message. +""") + + def _get_correlation_id(self): + return self._correlation_id.get_object() + + def _set_correlation_id(self, value): + if isinteger(value): + value = ulong(value) + self._correlation_id.rewind() + self._correlation_id.put_object(value) + + correlation_id = property(_get_correlation_id, _set_correlation_id, + doc=""" +The correlation-id for the message. +""") + + def _get_content_type(self): + return symbol(utf82unicode(pn_message_get_content_type(self._msg))) + + def _set_content_type(self, value): + self._check(pn_message_set_content_type(self._msg, unicode2utf8(value))) + + content_type = property(_get_content_type, _set_content_type, + doc=""" +The content-type of the message. +""") + + def _get_content_encoding(self): + return symbol(utf82unicode(pn_message_get_content_encoding(self._msg))) + + def _set_content_encoding(self, value): + self._check(pn_message_set_content_encoding(self._msg, unicode2utf8(value))) + + content_encoding = property(_get_content_encoding, _set_content_encoding, + doc=""" +The content-encoding of the message. +""") + + def _get_expiry_time(self): + return millis2secs(pn_message_get_expiry_time(self._msg)) + + def _set_expiry_time(self, value): + self._check(pn_message_set_expiry_time(self._msg, secs2millis(value))) + + expiry_time = property(_get_expiry_time, _set_expiry_time, + doc=""" +The expiry time of the message. +""") + + def _get_creation_time(self): + return millis2secs(pn_message_get_creation_time(self._msg)) + + def _set_creation_time(self, value): + self._check(pn_message_set_creation_time(self._msg, secs2millis(value))) + + creation_time = property(_get_creation_time, _set_creation_time, + doc=""" +The creation time of the message. +""") + + def _get_group_id(self): + return utf82unicode(pn_message_get_group_id(self._msg)) + + def _set_group_id(self, value): + self._check(pn_message_set_group_id(self._msg, unicode2utf8(value))) + + group_id = property(_get_group_id, _set_group_id, + doc=""" +The group id of the message. +""") + + def _get_group_sequence(self): + return pn_message_get_group_sequence(self._msg) + + def _set_group_sequence(self, value): + self._check(pn_message_set_group_sequence(self._msg, value)) + + group_sequence = property(_get_group_sequence, _set_group_sequence, + doc=""" +The sequence of the message within its group. +""") + + def _get_reply_to_group_id(self): + return utf82unicode(pn_message_get_reply_to_group_id(self._msg)) + + def _set_reply_to_group_id(self, value): + self._check(pn_message_set_reply_to_group_id(self._msg, unicode2utf8(value))) + + reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id, + doc=""" +The group-id for any replies. +""") + + def encode(self): + self._pre_encode() + sz = 16 + while True: + err, data = pn_message_encode(self._msg, sz) + if err == PN_OVERFLOW: + sz *= 2 + continue + else: + self._check(err) + return data + + def decode(self, data): + self._check(pn_message_decode(self._msg, data)) + self._post_decode() + + def send(self, sender, tag=None): + dlv = sender.delivery(tag or sender.delivery_tag()) + encoded = self.encode() + sender.stream(encoded) + sender.advance() + if sender.snd_settle_mode == Link.SND_SETTLED: + dlv.settle() + return dlv + + def recv(self, link): + """ + Receives and decodes the message content for the current delivery + from the link. Upon success it will return the current delivery + for the link. If there is no current delivery, or if the current + delivery is incomplete, or if the link is not a receiver, it will + return None. + + @type link: Link + @param link: the link to receive a message from + @return the delivery associated with the decoded message (or None) + + """ + if link.is_sender: return None + dlv = link.current + if not dlv or dlv.partial: return None + dlv.encoded = link.recv(dlv.pending) + link.advance() + # the sender has already forgotten about the delivery, so we might + # as well too + if link.remote_snd_settle_mode == Link.SND_SETTLED: + dlv.settle() + self.decode(dlv.encoded) + return dlv + + def __repr2__(self): + props = [] + for attr in ("inferred", "address", "reply_to", "durable", "ttl", + "priority", "first_acquirer", "delivery_count", "id", + "correlation_id", "user_id", "group_id", "group_sequence", + "reply_to_group_id", "instructions", "annotations", + "properties", "body"): + value = getattr(self, attr) + if value: props.append("%s=%r" % (attr, value)) + return "Message(%s)" % ", ".join(props) + + def __repr__(self): + tmp = pn_string(None) + err = pn_inspect(self._msg, tmp) + result = pn_string_get(tmp) + pn_free(tmp) + self._check(err) + return result http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_reactor_impl.py ---------------------------------------------------------------------- diff --git a/python/proton/_reactor_impl.py b/python/proton/_reactor_impl.py new file mode 100644 index 0000000..39986ff --- /dev/null +++ b/python/proton/_reactor_impl.py @@ -0,0 +1,217 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import absolute_import + +import weakref + +from cproton import PN_INVALID_SOCKET, \ + pn_incref, pn_decref, \ + pn_handler_add, pn_handler_clear, pn_pyhandler, \ + pn_selectable_is_reading, pn_selectable_attachments, pn_selectable_set_reading, \ + pn_selectable_expired, pn_selectable_set_fd, pn_selectable_set_registered, pn_selectable_writable, \ + pn_selectable_is_writing, pn_selectable_set_deadline, pn_selectable_is_registered, pn_selectable_terminate, \ + pn_selectable_get_deadline, pn_selectable_is_terminal, pn_selectable_readable, \ + pn_selectable_release, pn_selectable_set_writing, pn_selectable_get_fd + +from ._common import millis2secs, secs2millis +from ._wrapper import Wrapper + +from . import _compat + +_DEFAULT = object() + + +class Selectable(Wrapper): + + @staticmethod + def wrap(impl): + if impl is None: + return None + else: + return Selectable(impl) + + def __init__(self, impl): + Wrapper.__init__(self, impl, pn_selectable_attachments) + + def _init(self): + pass + + def fileno(self, fd=_DEFAULT): + if fd is _DEFAULT: + return pn_selectable_get_fd(self._impl) + elif fd is None: + pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET) + else: + pn_selectable_set_fd(self._impl, fd) + + def _is_reading(self): + return pn_selectable_is_reading(self._impl) + + def _set_reading(self, val): + pn_selectable_set_reading(self._impl, bool(val)) + + reading = property(_is_reading, _set_reading) + + def _is_writing(self): + return pn_selectable_is_writing(self._impl) + + def _set_writing(self, val): + pn_selectable_set_writing(self._impl, bool(val)) + + writing = property(_is_writing, _set_writing) + + def _get_deadline(self): + tstamp = pn_selectable_get_deadline(self._impl) + if tstamp: + return millis2secs(tstamp) + else: + return None + + def _set_deadline(self, deadline): + pn_selectable_set_deadline(self._impl, secs2millis(deadline)) + + deadline = property(_get_deadline, _set_deadline) + + def readable(self): + pn_selectable_readable(self._impl) + + def writable(self): + pn_selectable_writable(self._impl) + + def expired(self): + pn_selectable_expired(self._impl) + + def _is_registered(self): + return pn_selectable_is_registered(self._impl) + + def _set_registered(self, registered): + pn_selectable_set_registered(self._impl, registered) + + registered = property(_is_registered, _set_registered, + doc=""" +The registered property may be get/set by an I/O polling system to +indicate whether the fd has been registered or not. +""") + + @property + def is_terminal(self): + return pn_selectable_is_terminal(self._impl) + + def terminate(self): + pn_selectable_terminate(self._impl) + + def release(self): + pn_selectable_release(self._impl) + + +class _cadapter: + + def __init__(self, handler, on_error=None): + self.handler = handler + self.on_error = on_error + + def dispatch(self, cevent, ctype): + from ._events import Event + ev = Event.wrap(cevent, ctype) + ev.dispatch(self.handler) + + def exception(self, exc, val, tb): + if self.on_error is None: + _compat.raise_(exc, val, tb) + else: + self.on_error((exc, val, tb)) + + +class WrappedHandlersChildSurrogate: + def __init__(self, delegate): + self.handlers = [] + self.delegate = weakref.ref(delegate) + + def on_unhandled(self, method, event): + from ._events import dispatch + delegate = self.delegate() + if delegate: + dispatch(delegate, method, event) + + +class WrappedHandlersProperty(object): + def __get__(self, obj, clazz): + if obj is None: + return None + return self.surrogate(obj).handlers + + def __set__(self, obj, value): + self.surrogate(obj).handlers = value + + def surrogate(self, obj): + key = "_surrogate" + objdict = obj.__dict__ + surrogate = objdict.get(key, None) + if surrogate is None: + objdict[key] = surrogate = WrappedHandlersChildSurrogate(obj) + obj.add(surrogate) + return surrogate + + +class WrappedHandler(Wrapper): + handlers = WrappedHandlersProperty() + + @classmethod + def wrap(cls, impl, on_error=None): + if impl is None: + return None + else: + handler = cls(impl) + handler.__dict__["on_error"] = on_error + return handler + + def __init__(self, impl_or_constructor): + Wrapper.__init__(self, impl_or_constructor) + if list(self.__class__.__mro__).index(WrappedHandler) > 1: + # instantiate the surrogate + self.handlers.extend([]) + + def _on_error(self, info): + on_error = getattr(self, "on_error", None) + if on_error is None: + _compat.raise_(info[0], info[1], info[2]) + else: + on_error(info) + + def add(self, handler, on_error=None): + if handler is None: return + if on_error is None: on_error = self._on_error + impl = _chandler(handler, on_error) + pn_handler_add(self._impl, impl) + pn_decref(impl) + + def clear(self): + pn_handler_clear(self._impl) + + +def _chandler(obj, on_error=None): + if obj is None: + return None + elif isinstance(obj, WrappedHandler): + impl = obj._impl + pn_incref(impl) + return impl + else: + return pn_pyhandler(_cadapter(obj, on_error)) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_transport.py ---------------------------------------------------------------------- diff --git a/python/proton/_transport.py b/python/proton/_transport.py new file mode 100644 index 0000000..3db0078 --- /dev/null +++ b/python/proton/_transport.py @@ -0,0 +1,524 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import absolute_import + +from cproton import PN_SASL_AUTH, PN_SASL_PERM, PN_SASL_SYS, PN_SSL_RESUME_REUSED, PN_SASL_NONE, PN_SSL_SHA1, \ + PN_SSL_CERT_SUBJECT_COUNTRY_NAME, PN_SASL_OK, PN_SSL_RESUME_UNKNOWN, PN_EOS, PN_SSL_ANONYMOUS_PEER, PN_SSL_MD5, \ + PN_SSL_CERT_SUBJECT_COMMON_NAME, PN_SSL_VERIFY_PEER, PN_SSL_CERT_SUBJECT_CITY_OR_LOCALITY, PN_SSL_MODE_SERVER, \ + PN_TRACE_DRV, PN_TRACE_RAW, pn_transport, PN_SSL_SHA256, PN_TRACE_FRM, PN_SSL_MODE_CLIENT, PN_SASL_TEMP, \ + PN_SSL_SHA512, PN_SSL_CERT_SUBJECT_ORGANIZATION_UNIT, PN_OK, PN_SSL_CERT_SUBJECT_STATE_OR_PROVINCE, \ + PN_SSL_VERIFY_PEER_NAME, PN_SSL_CERT_SUBJECT_ORGANIZATION_NAME, PN_SSL_RESUME_NEW, PN_TRACE_OFF, \ + pn_transport_get_channel_max, pn_transport_capacity, pn_transport_push, pn_transport_get_user, pn_transport_tick, \ + pn_transport_set_max_frame, pn_transport_attachments, pn_transport_unbind, pn_transport_peek, \ + pn_transport_set_channel_max, pn_transport_close_tail, pn_transport_condition, pn_transport_is_encrypted, \ + pn_transport_get_frames_input, pn_transport_bind, pn_transport_closed, pn_transport_get_idle_timeout, \ + pn_transport_get_remote_idle_timeout, pn_transport_get_frames_output, pn_transport_pending, \ + pn_transport_set_pytracer, pn_transport_close_head, pn_transport_get_remote_max_frame, \ + pn_transport_is_authenticated, pn_transport_set_idle_timeout, pn_transport_log, pn_transport_get_pytracer, \ + pn_transport_require_auth, pn_transport_get_max_frame, pn_transport_set_server, pn_transport_remote_channel_max, \ + pn_transport_require_encryption, pn_transport_pop, pn_transport_connection, \ + pn_sasl, pn_sasl_set_allow_insecure_mechs, pn_sasl_outcome, pn_transport_error, pn_sasl_get_user, \ + pn_sasl_extended, pn_sasl_done, pn_sasl_get_allow_insecure_mechs, pn_sasl_allowed_mechs, \ + pn_sasl_config_name, pn_sasl_config_path, \ + pn_ssl, pn_ssl_init, pn_ssl_domain_allow_unsecured_client, pn_ssl_domain_free, \ + pn_ssl_domain, pn_transport_trace, pn_ssl_resume_status, pn_sasl_get_mech, \ + pn_ssl_domain_set_trusted_ca_db, pn_ssl_get_remote_subject_subfield, pn_ssl_present, \ + pn_ssl_get_remote_subject, pn_ssl_domain_set_credentials, pn_ssl_domain_set_peer_authentication, \ + pn_ssl_get_peer_hostname, pn_ssl_set_peer_hostname, pn_ssl_get_cipher_name, pn_ssl_get_cert_fingerprint, \ + pn_ssl_get_protocol_name, \ + pn_error_text + +from ._common import millis2secs, secs2millis, unicode2utf8, utf82unicode +from ._condition import cond2obj +from ._exceptions import EXCEPTIONS, TransportException, SessionException, SSLException, SSLUnavailable +from ._wrapper import Wrapper + + +class TraceAdapter: + + def __init__(self, tracer): + self.tracer = tracer + + def __call__(self, trans_impl, message): + self.tracer(Transport.wrap(trans_impl), message) + + +class Transport(Wrapper): + TRACE_OFF = PN_TRACE_OFF + TRACE_DRV = PN_TRACE_DRV + TRACE_FRM = PN_TRACE_FRM + TRACE_RAW = PN_TRACE_RAW + + CLIENT = 1 + SERVER = 2 + + @staticmethod + def wrap(impl): + if impl is None: + return None + else: + return Transport(_impl=impl) + + def __init__(self, mode=None, _impl=pn_transport): + Wrapper.__init__(self, _impl, pn_transport_attachments) + if mode == Transport.SERVER: + pn_transport_set_server(self._impl) + elif mode is None or mode == Transport.CLIENT: + pass + else: + raise TransportException("Cannot initialise Transport from mode: %s" % str(mode)) + + def _init(self): + self._sasl = None + self._ssl = None + + def _check(self, err): + if err < 0: + exc = EXCEPTIONS.get(err, TransportException) + raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._impl)))) + else: + return err + + def _set_tracer(self, tracer): + pn_transport_set_pytracer(self._impl, TraceAdapter(tracer)) + + def _get_tracer(self): + adapter = pn_transport_get_pytracer(self._impl) + if adapter: + return adapter.tracer + else: + return None + + tracer = property(_get_tracer, _set_tracer, + doc=""" +A callback for trace logging. The callback is passed the transport and log message. +""") + + def log(self, message): + pn_transport_log(self._impl, message) + + def require_auth(self, bool): + pn_transport_require_auth(self._impl, bool) + + @property + def authenticated(self): + return pn_transport_is_authenticated(self._impl) + + def require_encryption(self, bool): + pn_transport_require_encryption(self._impl, bool) + + @property + def encrypted(self): + return pn_transport_is_encrypted(self._impl) + + @property + def user(self): + return pn_transport_get_user(self._impl) + + def bind(self, connection): + """Assign a connection to the transport""" + self._check(pn_transport_bind(self._impl, connection._impl)) + + def unbind(self): + """Release the connection""" + self._check(pn_transport_unbind(self._impl)) + + def trace(self, n): + pn_transport_trace(self._impl, n) + + def tick(self, now): + """Process any timed events (like heartbeat generation). + now = seconds since epoch (float). + """ + return millis2secs(pn_transport_tick(self._impl, secs2millis(now))) + + def capacity(self): + c = pn_transport_capacity(self._impl) + if c >= PN_EOS: + return c + else: + return self._check(c) + + def push(self, binary): + n = self._check(pn_transport_push(self._impl, binary)) + if n != len(binary): + raise OverflowError("unable to process all bytes: %s, %s" % (n, len(binary))) + + def close_tail(self): + self._check(pn_transport_close_tail(self._impl)) + + def pending(self): + p = pn_transport_pending(self._impl) + if p >= PN_EOS: + return p + else: + return self._check(p) + + def peek(self, size): + cd, out = pn_transport_peek(self._impl, size) + if cd == PN_EOS: + return None + else: + self._check(cd) + return out + + def pop(self, size): + pn_transport_pop(self._impl, size) + + def close_head(self): + self._check(pn_transport_close_head(self._impl)) + + @property + def closed(self): + return pn_transport_closed(self._impl) + + # AMQP 1.0 max-frame-size + def _get_max_frame_size(self): + return pn_transport_get_max_frame(self._impl) + + def _set_max_frame_size(self, value): + pn_transport_set_max_frame(self._impl, value) + + max_frame_size = property(_get_max_frame_size, _set_max_frame_size, + doc=""" +Sets the maximum size for received frames (in bytes). +""") + + @property + def remote_max_frame_size(self): + return pn_transport_get_remote_max_frame(self._impl) + + def _get_channel_max(self): + return pn_transport_get_channel_max(self._impl) + + def _set_channel_max(self, value): + if pn_transport_set_channel_max(self._impl, value): + raise SessionException("Too late to change channel max.") + + channel_max = property(_get_channel_max, _set_channel_max, + doc=""" +Sets the maximum channel that may be used on the transport. +""") + + @property + def remote_channel_max(self): + return pn_transport_remote_channel_max(self._impl) + + # AMQP 1.0 idle-time-out + def _get_idle_timeout(self): + return millis2secs(pn_transport_get_idle_timeout(self._impl)) + + def _set_idle_timeout(self, sec): + pn_transport_set_idle_timeout(self._impl, secs2millis(sec)) + + idle_timeout = property(_get_idle_timeout, _set_idle_timeout, + doc=""" +The idle timeout of the connection (float, in seconds). +""") + + @property + def remote_idle_timeout(self): + return millis2secs(pn_transport_get_remote_idle_timeout(self._impl)) + + @property + def frames_output(self): + return pn_transport_get_frames_output(self._impl) + + @property + def frames_input(self): + return pn_transport_get_frames_input(self._impl) + + def sasl(self): + return SASL(self) + + def ssl(self, domain=None, session_details=None): + # SSL factory (singleton for this transport) + if not self._ssl: + self._ssl = SSL(self, domain, session_details) + return self._ssl + + @property + def condition(self): + return cond2obj(pn_transport_condition(self._impl)) + + @property + def connection(self): + from . import _endpoints + return _endpoints.Connection.wrap(pn_transport_connection(self._impl)) + + +class SASLException(TransportException): + pass + + +class SASL(Wrapper): + OK = PN_SASL_OK + AUTH = PN_SASL_AUTH + SYS = PN_SASL_SYS + PERM = PN_SASL_PERM + TEMP = PN_SASL_TEMP + + @staticmethod + def extended(): + return pn_sasl_extended() + + def __init__(self, transport): + Wrapper.__init__(self, transport._impl, pn_transport_attachments) + self._sasl = pn_sasl(transport._impl) + + def _check(self, err): + if err < 0: + exc = EXCEPTIONS.get(err, SASLException) + raise exc("[%s]" % (err)) + else: + return err + + @property + def user(self): + return pn_sasl_get_user(self._sasl) + + @property + def mech(self): + return pn_sasl_get_mech(self._sasl) + + @property + def outcome(self): + outcome = pn_sasl_outcome(self._sasl) + if outcome == PN_SASL_NONE: + return None + else: + return outcome + + def allowed_mechs(self, mechs): + pn_sasl_allowed_mechs(self._sasl, unicode2utf8(mechs)) + + def _get_allow_insecure_mechs(self): + return pn_sasl_get_allow_insecure_mechs(self._sasl) + + def _set_allow_insecure_mechs(self, insecure): + pn_sasl_set_allow_insecure_mechs(self._sasl, insecure) + + allow_insecure_mechs = property(_get_allow_insecure_mechs, _set_allow_insecure_mechs, + doc=""" +Allow unencrypted cleartext passwords (PLAIN mech) +""") + + def done(self, outcome): + pn_sasl_done(self._sasl, outcome) + + def config_name(self, name): + pn_sasl_config_name(self._sasl, name) + + def config_path(self, path): + pn_sasl_config_path(self._sasl, path) + + +class SSLDomain(object): + MODE_CLIENT = PN_SSL_MODE_CLIENT + MODE_SERVER = PN_SSL_MODE_SERVER + VERIFY_PEER = PN_SSL_VERIFY_PEER + VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME + ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER + + def __init__(self, mode): + self._domain = pn_ssl_domain(mode) + if self._domain is None: + raise SSLUnavailable() + + def _check(self, err): + if err < 0: + exc = EXCEPTIONS.get(err, SSLException) + raise exc("SSL failure.") + else: + return err + + def set_credentials(self, cert_file, key_file, password): + return self._check(pn_ssl_domain_set_credentials(self._domain, + cert_file, key_file, + password)) + + def set_trusted_ca_db(self, certificate_db): + return self._check(pn_ssl_domain_set_trusted_ca_db(self._domain, + certificate_db)) + + def set_peer_authentication(self, verify_mode, trusted_CAs=None): + return self._check(pn_ssl_domain_set_peer_authentication(self._domain, + verify_mode, + trusted_CAs)) + + def allow_unsecured_client(self): + return self._check(pn_ssl_domain_allow_unsecured_client(self._domain)) + + def __del__(self): + pn_ssl_domain_free(self._domain) + + +class SSL(object): + + @staticmethod + def present(): + return pn_ssl_present() + + def _check(self, err): + if err < 0: + exc = EXCEPTIONS.get(err, SSLException) + raise exc("SSL failure.") + else: + return err + + def __new__(cls, transport, domain, session_details=None): + """Enforce a singleton SSL object per Transport""" + if transport._ssl: + # unfortunately, we've combined the allocation and the configuration in a + # single step. So catch any attempt by the application to provide what + # may be a different configuration than the original (hack) + ssl = transport._ssl + if (domain and (ssl._domain is not domain) or + session_details and (ssl._session_details is not session_details)): + raise SSLException("Cannot re-configure existing SSL object!") + else: + obj = super(SSL, cls).__new__(cls) + obj._domain = domain + obj._session_details = session_details + session_id = None + if session_details: + session_id = session_details.get_session_id() + obj._ssl = pn_ssl(transport._impl) + if obj._ssl is None: + raise SSLUnavailable() + if domain: + pn_ssl_init(obj._ssl, domain._domain, session_id) + transport._ssl = obj + return transport._ssl + + def cipher_name(self): + rc, name = pn_ssl_get_cipher_name(self._ssl, 128) + if rc: + return name + return None + + def protocol_name(self): + rc, name = pn_ssl_get_protocol_name(self._ssl, 128) + if rc: + return name + return None + + SHA1 = PN_SSL_SHA1 + SHA256 = PN_SSL_SHA256 + SHA512 = PN_SSL_SHA512 + MD5 = PN_SSL_MD5 + + CERT_COUNTRY_NAME = PN_SSL_CERT_SUBJECT_COUNTRY_NAME + CERT_STATE_OR_PROVINCE = PN_SSL_CERT_SUBJECT_STATE_OR_PROVINCE + CERT_CITY_OR_LOCALITY = PN_SSL_CERT_SUBJECT_CITY_OR_LOCALITY + CERT_ORGANIZATION_NAME = PN_SSL_CERT_SUBJECT_ORGANIZATION_NAME + CERT_ORGANIZATION_UNIT = PN_SSL_CERT_SUBJECT_ORGANIZATION_UNIT + CERT_COMMON_NAME = PN_SSL_CERT_SUBJECT_COMMON_NAME + + def get_cert_subject_subfield(self, subfield_name): + subfield_value = pn_ssl_get_remote_subject_subfield(self._ssl, subfield_name) + return subfield_value + + def get_cert_subject(self): + subject = pn_ssl_get_remote_subject(self._ssl) + return subject + + def _get_cert_subject_unknown_subfield(self): + # Pass in an unhandled enum + return self.get_cert_subject_subfield(10) + + # Convenience functions for obtaining the subfields of the subject field. + def get_cert_common_name(self): + return self.get_cert_subject_subfield(SSL.CERT_COMMON_NAME) + + def get_cert_organization(self): + return self.get_cert_subject_subfield(SSL.CERT_ORGANIZATION_NAME) + + def get_cert_organization_unit(self): + return self.get_cert_subject_subfield(SSL.CERT_ORGANIZATION_UNIT) + + def get_cert_locality_or_city(self): + return self.get_cert_subject_subfield(SSL.CERT_CITY_OR_LOCALITY) + + def get_cert_country(self): + return self.get_cert_subject_subfield(SSL.CERT_COUNTRY_NAME) + + def get_cert_state_or_province(self): + return self.get_cert_subject_subfield(SSL.CERT_STATE_OR_PROVINCE) + + def get_cert_fingerprint(self, fingerprint_length, digest_name): + rc, fingerprint_str = pn_ssl_get_cert_fingerprint(self._ssl, fingerprint_length, digest_name) + if rc == PN_OK: + return fingerprint_str + return None + + # Convenience functions for obtaining fingerprint for specific hashing algorithms + def _get_cert_fingerprint_unknown_hash_alg(self): + return self.get_cert_fingerprint(41, 10) + + def get_cert_fingerprint_sha1(self): + return self.get_cert_fingerprint(41, SSL.SHA1) + + def get_cert_fingerprint_sha256(self): + # sha256 produces a fingerprint that is 64 characters long + return self.get_cert_fingerprint(65, SSL.SHA256) + + def get_cert_fingerprint_sha512(self): + # sha512 produces a fingerprint that is 128 characters long + return self.get_cert_fingerprint(129, SSL.SHA512) + + def get_cert_fingerprint_md5(self): + return self.get_cert_fingerprint(33, SSL.MD5) + + @property + def remote_subject(self): + return pn_ssl_get_remote_subject(self._ssl) + + RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN + RESUME_NEW = PN_SSL_RESUME_NEW + RESUME_REUSED = PN_SSL_RESUME_REUSED + + def resume_status(self): + return pn_ssl_resume_status(self._ssl) + + def _set_peer_hostname(self, hostname): + self._check(pn_ssl_set_peer_hostname(self._ssl, unicode2utf8(hostname))) + + def _get_peer_hostname(self): + err, name = pn_ssl_get_peer_hostname(self._ssl, 1024) + self._check(err) + return utf82unicode(name) + + peer_hostname = property(_get_peer_hostname, _set_peer_hostname, + doc=""" +Manage the expected name of the remote peer. Used to authenticate the remote. +""") + + +class SSLSessionDetails(object): + """ Unique identifier for the SSL session. Used to resume previous session on a new + SSL connection. + """ + + def __init__(self, session_id): + self._session_id = session_id + + def get_session_id(self): + return self._session_id http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_url.py ---------------------------------------------------------------------- diff --git a/python/proton/_url.py b/python/proton/_url.py new file mode 100644 index 0000000..b4a9a6a --- /dev/null +++ b/python/proton/_url.py @@ -0,0 +1,161 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import absolute_import + +import socket + +from cproton import pn_url, pn_url_free, pn_url_parse, pn_url_str, pn_url_get_port, pn_url_get_scheme, \ + pn_url_get_host, pn_url_get_username, pn_url_get_password, pn_url_get_path, pn_url_set_scheme, pn_url_set_host, \ + pn_url_set_username, pn_url_set_password, pn_url_set_port, pn_url_set_path + +from ._common import unicode2utf8 + + +class Url(object): + """ + Simple URL parser/constructor, handles URLs of the form: + + <scheme>://<user>:<password>@<host>:<port>/<path> + + All components can be None if not specified in the URL string. + + The port can be specified as a service name, e.g. 'amqp' in the + URL string but Url.port always gives the integer value. + + Warning: The placement of user and password in URLs is not + recommended. It can result in credentials leaking out in program + logs. Use connection configuration attributes instead. + + @ivar scheme: Url scheme e.g. 'amqp' or 'amqps' + @ivar user: Username + @ivar password: Password + @ivar host: Host name, ipv6 literal or ipv4 dotted quad. + @ivar port: Integer port. + @ivar host_port: Returns host:port + """ + + AMQPS = "amqps" + AMQP = "amqp" + + class Port(int): + """An integer port number that can be constructed from a service name string""" + + def __new__(cls, value): + """@param value: integer port number or string service name.""" + port = super(Url.Port, cls).__new__(cls, cls._port_int(value)) + setattr(port, 'name', str(value)) + return port + + def __eq__(self, x): + return str(self) == x or int(self) == x + + def __ne__(self, x): + return not self == x + + def __str__(self): + return str(self.name) + + @staticmethod + def _port_int(value): + """Convert service, an integer or a service name, into an integer port number.""" + try: + return int(value) + except ValueError: + try: + return socket.getservbyname(value) + except socket.error: + # Not every system has amqp/amqps defined as a service + if value == Url.AMQPS: + return 5671 + elif value == Url.AMQP: + return 5672 + else: + raise ValueError("Not a valid port number or service name: '%s'" % value) + + def __init__(self, url=None, defaults=True, **kwargs): + """ + @param url: URL string to parse. + @param defaults: If true, fill in missing default values in the URL. + If false, you can fill them in later by calling self.defaults() + @param kwargs: scheme, user, password, host, port, path. + If specified, replaces corresponding part in url string. + """ + if url: + self._url = pn_url_parse(unicode2utf8(str(url))) + if not self._url: raise ValueError("Invalid URL '%s'" % url) + else: + self._url = pn_url() + for k in kwargs: # Let kwargs override values parsed from url + getattr(self, k) # Check for invalid kwargs + setattr(self, k, kwargs[k]) + if defaults: self.defaults() + + class PartDescriptor(object): + def __init__(self, part): + self.getter = globals()["pn_url_get_%s" % part] + self.setter = globals()["pn_url_set_%s" % part] + + def __get__(self, obj, type=None): return self.getter(obj._url) + + def __set__(self, obj, value): return self.setter(obj._url, str(value)) + + scheme = PartDescriptor('scheme') + username = PartDescriptor('username') + password = PartDescriptor('password') + host = PartDescriptor('host') + path = PartDescriptor('path') + + def _get_port(self): + portstr = pn_url_get_port(self._url) + return portstr and Url.Port(portstr) + + def _set_port(self, value): + if value is None: + pn_url_set_port(self._url, None) + else: + pn_url_set_port(self._url, str(Url.Port(value))) + + port = property(_get_port, _set_port) + + def __str__(self): + return pn_url_str(self._url) + + def __repr__(self): + return "Url(%s://%s/%s)" % (self.scheme, self.host, self.path) + + def __eq__(self, x): + return str(self) == str(x) + + def __ne__(self, x): + return not self == x + + def __del__(self): + pn_url_free(self._url) + del self._url + + def defaults(self): + """ + Fill in missing values (scheme, host or port) with defaults + @return: self + """ + self.scheme = self.scheme or self.AMQP + self.host = self.host or '0.0.0.0' + self.port = self.port or self.Port(self.scheme) + return self http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_wrapper.py ---------------------------------------------------------------------- diff --git a/python/proton/_wrapper.py b/python/proton/_wrapper.py new file mode 100644 index 0000000..805ecb1 --- /dev/null +++ b/python/proton/_wrapper.py @@ -0,0 +1,120 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from cproton import pn_incref, pn_decref, \ + pn_py2void, pn_void2py, \ + pn_record_get, pn_record_def, pn_record_set, \ + PN_PYREF + + +class EmptyAttrs: + + def __contains__(self, name): + return False + + def __getitem__(self, name): + raise KeyError(name) + + def __setitem__(self, name, value): + raise TypeError("does not support item assignment") + + +EMPTY_ATTRS = EmptyAttrs() + + +class Wrapper(object): + + def __init__(self, impl_or_constructor, get_context=None): + init = False + if callable(impl_or_constructor): + # we are constructing a new object + impl = impl_or_constructor() + if impl is None: + self.__dict__["_impl"] = impl + self.__dict__["_attrs"] = EMPTY_ATTRS + self.__dict__["_record"] = None + from proton import ProtonException + raise ProtonException( + "Wrapper failed to create wrapped object. Check for file descriptor or memory exhaustion.") + init = True + else: + # we are wrapping an existing object + impl = impl_or_constructor + pn_incref(impl) + + if get_context: + record = get_context(impl) + attrs = pn_void2py(pn_record_get(record, PYCTX)) + if attrs is None: + attrs = {} + pn_record_def(record, PYCTX, PN_PYREF) + pn_record_set(record, PYCTX, pn_py2void(attrs)) + init = True + else: + attrs = EMPTY_ATTRS + init = False + record = None + self.__dict__["_impl"] = impl + self.__dict__["_attrs"] = attrs + self.__dict__["_record"] = record + if init: self._init() + + def __getattr__(self, name): + attrs = self.__dict__["_attrs"] + if name in attrs: + return attrs[name] + else: + raise AttributeError(name + " not in _attrs") + + def __setattr__(self, name, value): + if hasattr(self.__class__, name): + object.__setattr__(self, name, value) + else: + attrs = self.__dict__["_attrs"] + attrs[name] = value + + def __delattr__(self, name): + attrs = self.__dict__["_attrs"] + if attrs: + del attrs[name] + + def __hash__(self): + return hash(addressof(self._impl)) + + def __eq__(self, other): + if isinstance(other, Wrapper): + return addressof(self._impl) == addressof(other._impl) + return False + + def __ne__(self, other): + if isinstance(other, Wrapper): + return addressof(self._impl) != addressof(other._impl) + return True + + def __del__(self): + pn_decref(self._impl) + + def __repr__(self): + return '<%s.%s 0x%x ~ 0x%x>' % (self.__class__.__module__, + self.__class__.__name__, + id(self), addressof(self._impl)) + + +PYCTX = int(pn_py2void(Wrapper)) +addressof = int http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/handlers.py ---------------------------------------------------------------------- diff --git a/python/proton/handlers.py b/python/proton/handlers.py index 76c9e51..1e61f44 100644 --- a/python/proton/handlers.py +++ b/python/proton/handlers.py @@ -16,28 +16,35 @@ # specific language governing permissions and limitations # under the License. # -import heapq, logging, os, re, socket, time, types, weakref -from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url -from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout -from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException +from __future__ import absolute_import + +import logging +import time +import weakref from select import select +from proton import Delivery, Endpoint +from proton import Message, Handler, ProtonException +from ._events import dispatch + log = logging.getLogger("proton") + class OutgoingMessageHandler(Handler): """ A utility for simpler and more intuitive handling of delivery events related to outgoing i.e. sent messages. """ + def __init__(self, auto_settle=True, delegate=None): self.auto_settle = auto_settle self.delegate = delegate def on_link_flow(self, event): if event.link.is_sender and event.link.credit \ - and event.link.state & Endpoint.LOCAL_ACTIVE \ - and event.link.state & Endpoint.REMOTE_ACTIVE : + and event.link.state & Endpoint.LOCAL_ACTIVE \ + and event.link.state & Endpoint.REMOTE_ACTIVE: self.on_sendable(event) def on_delivery(self, event): @@ -94,23 +101,27 @@ class OutgoingMessageHandler(Handler): if self.delegate != None: dispatch(self.delegate, 'on_settled', event) + def recv_msg(delivery): msg = Message() msg.decode(delivery.link.recv(delivery.pending)) delivery.link.advance() return msg + class Reject(ProtonException): - """ - An exception that indicate a message should be rejected - """ - pass + """ + An exception that indicate a message should be rejected + """ + pass + class Release(ProtonException): - """ - An exception that indicate a message should be rejected - """ - pass + """ + An exception that indicate a message should be rejected + """ + pass + class Acking(object): def accept(self, delivery): @@ -146,6 +157,7 @@ class Acking(object): delivery.update(state) delivery.settle() + class IncomingMessageHandler(Handler, Acking): """ A utility for simpler and more intuitive handling of delivery @@ -202,6 +214,7 @@ class IncomingMessageHandler(Handler, Acking): if self.delegate != None: dispatch(self.delegate, 'on_aborted', event) + class EndpointStateHandler(Handler): """ A utility that exposes 'endpoint' events i.e. the open/close for @@ -272,7 +285,7 @@ class EndpointStateHandler(Handler): return self.on_connection_error(event) elif self.is_local_closed(event.connection): - self.on_connection_closed(event) + self.on_connection_closed(event) else: self.on_connection_closing(event) event.connection.close() @@ -391,12 +404,14 @@ class EndpointStateHandler(Handler): if self.delegate != None and event.connection and self.is_local_open(event.connection): dispatch(self.delegate, 'on_disconnected', event) + class MessagingHandler(Handler, Acking): """ A general purpose handler that makes the proton-c events somewhat simpler to deal with and/or avoids repetitive tasks for common use cases. """ + def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False): self.handlers = [] if prefetch: @@ -414,7 +429,8 @@ class MessagingHandler(Handler, Acking): """ if event.transport.condition: if event.transport.condition.info: - log.error("%s: %s: %s" % (event.transport.condition.name, event.transport.condition.description, event.transport.condition.info)) + log.error("%s: %s: %s" % ( + event.transport.condition.name, event.transport.condition.description, event.transport.condition.info)) else: log.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description)) if event.transport.condition.name in self.fatal_conditions: @@ -455,36 +471,43 @@ class MessagingHandler(Handler, Acking): Called when the event loop starts. (Just an alias for on_reactor_init) """ pass + def on_connection_closed(self, event): """ Called when the connection is closed. """ pass + def on_session_closed(self, event): """ Called when the session is closed. """ pass + def on_link_closed(self, event): """ Called when the link is closed. """ pass + def on_connection_closing(self, event): """ Called when the peer initiates the closing of the connection. """ pass + def on_session_closing(self, event): """ Called when the peer initiates the closing of the session. """ pass + def on_link_closing(self, event): """ Called when the peer initiates the closing of the link. """ pass + def on_disconnected(self, event): """ Called when the socket is disconnected. @@ -525,6 +548,7 @@ class MessagingHandler(Handler, Acking): retransmitted. """ pass + def on_message(self, event): """ Called when a message is received. The message itself can be @@ -535,11 +559,13 @@ class MessagingHandler(Handler, Acking): """ pass + class TransactionHandler(object): """ The interface for transaction handlers, i.e. objects that want to be notified of state changes related to a transaction. """ + def on_transaction_declared(self, event): pass @@ -555,6 +581,7 @@ class TransactionHandler(object): def on_transaction_commit_failed(self, event): pass + class TransactionalClientHandler(MessagingHandler, TransactionHandler): """ An extension to the MessagingHandler for applications using @@ -570,24 +597,29 @@ class TransactionalClientHandler(MessagingHandler, TransactionHandler): else: super(TransactionalClientHandler, self).accept(delivery) -from proton import WrappedHandler + +from ._events import WrappedHandler from cproton import pn_flowcontroller, pn_handshaker, pn_iohandler + class CFlowController(WrappedHandler): def __init__(self, window=1024): WrappedHandler.__init__(self, lambda: pn_flowcontroller(window)) + class CHandshaker(WrappedHandler): def __init__(self): WrappedHandler.__init__(self, pn_handshaker) + class IOHandler(WrappedHandler): def __init__(self): WrappedHandler.__init__(self, pn_iohandler) + class PythonIO: def __init__(self): http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/reactor.py ---------------------------------------------------------------------- diff --git a/python/proton/reactor.py b/python/proton/reactor.py index d5d5183..ccdbf94 100644 --- a/python/proton/reactor.py +++ b/python/proton/reactor.py @@ -17,22 +17,35 @@ from __future__ import absolute_import # specific language governing permissions and limitations # under the License. # -import logging, os, socket, time, types -from heapq import heappush, heappop, nsmallest +from __future__ import absolute_import +import os +import logging import traceback -from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch -from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message -from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol -from proton import Terminus, Timeout, Transport, TransportException, ulong, Url -from select import select +from proton import Connection, Delivery, Described +from proton import Endpoint, EventType, Handler, Link, Message +from proton import Session, SSL, SSLDomain, SSLUnavailable, symbol +from proton import Terminus, Transport, ulong, Url from proton.handlers import OutgoingMessageHandler -from proton import unicode2utf8, utf82unicode -from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable -from .wrapper import Wrapper, PYCTX -from cproton import * +from proton import generate_uuid + +from ._common import isstring, secs2millis, millis2secs, unicode2utf8, utf82unicode + +from ._events import EventBase +from ._reactor_impl import Selectable, WrappedHandler, _chandler +from ._wrapper import Wrapper, PYCTX + +from cproton import PN_MILLIS_MAX, PN_PYREF, PN_ACCEPTED, \ + pn_reactor_stop, pn_selectable_attachments, pn_reactor_quiesced, pn_reactor_acceptor, \ + pn_record_set_handler, pn_collector_put, pn_reactor_get_timeout, pn_task_cancel, pn_acceptor_set_ssl_domain, \ + pn_record_get, pn_reactor_selectable, pn_task_attachments, pn_reactor_schedule, pn_acceptor_close, pn_py2void, \ + pn_reactor_error, pn_reactor_attachments, pn_reactor_get_global_handler, pn_reactor_process, pn_reactor, \ + pn_reactor_set_handler, pn_reactor_set_global_handler, pn_reactor_yield, pn_error_text, pn_reactor_connection, \ + pn_cast_pn_reactor, pn_reactor_get_connection_address, pn_reactor_update, pn_reactor_collector, pn_void2py, \ + pn_reactor_start, pn_reactor_set_connection_host, pn_cast_pn_task, pn_decref, pn_reactor_set_timeout, \ + pn_reactor_mark, pn_reactor_get_handler, pn_reactor_wakeup from . import _compat @@ -40,6 +53,17 @@ from ._compat import queue log = logging.getLogger("proton") + +def _timeout2millis(secs): + if secs is None: return PN_MILLIS_MAX + return secs2millis(secs) + + +def _millis2timeout(millis): + if millis == PN_MILLIS_MAX: return None + return millis2secs(millis) + + class Task(Wrapper): @staticmethod @@ -58,6 +82,7 @@ class Task(Wrapper): def cancel(self): pn_task_cancel(self._impl) + class Acceptor(Wrapper): def __init__(self, impl): @@ -69,6 +94,7 @@ class Acceptor(Wrapper): def close(self): pn_acceptor_close(self._impl) + class Reactor(Wrapper): @staticmethod @@ -95,11 +121,12 @@ class Reactor(Wrapper): # error will always be generated from a callback from this reactor. # Needed to prevent reference cycles and be compatible with wrappers. class ErrorDelegate(object): - def __init__(self, reactor): - self.reactor_impl = reactor._impl - def on_error(self, info): - ractor = Reactor.wrap(self.reactor_impl) - ractor.on_error(info) + def __init__(self, reactor): + self.reactor_impl = reactor._impl + + def on_error(self, info): + ractor = Reactor.wrap(self.reactor_impl) + ractor.on_error(info) def on_error_delegate(self): return Reactor.ErrorDelegate(self).on_error @@ -119,10 +146,10 @@ class Reactor(Wrapper): global_handler = property(_get_global, _set_global) def _get_timeout(self): - return millis2timeout(pn_reactor_get_timeout(self._impl)) + return _millis2timeout(pn_reactor_get_timeout(self._impl)) def _set_timeout(self, secs): - return pn_reactor_set_timeout(self._impl, timeout2millis(secs)) + return pn_reactor_set_timeout(self._impl, _timeout2millis(secs)) timeout = property(_get_timeout, _set_timeout) @@ -244,7 +271,9 @@ class Reactor(Wrapper): def push_event(self, obj, etype): pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number) -from proton import wrappers as _wrappers + +from ._events import wrappers as _wrappers + _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x)) _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x)) @@ -258,6 +287,7 @@ class EventInjector(object): it. The close() method should be called when it is no longer needed, to allow the event loop to end if needed. """ + def __init__(self): self.queue = queue.Queue() self.pipe = os.pipe() @@ -305,6 +335,7 @@ class ApplicationEvent(EventBase): Application defined event, which can optionally be associated with an engine object and or an arbitrary subject """ + def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None): super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename)) self.connection = connection @@ -323,10 +354,12 @@ class ApplicationEvent(EventBase): objects = [self.connection, self.session, self.link, self.delivery, self.subject] return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None])) + class Transaction(object): """ Class to track state of an AMQP 1.0 transaction. """ + def __init__(self, txn_ctrl, handler, settle_before_discharge=False): self.txn_ctrl = txn_ctrl self.handler = handler @@ -397,7 +430,7 @@ class Transaction(object): if event.delivery.remote_state == Delivery.REJECTED: if not self.failed: self.handler.on_transaction_commit_failed(event) - self._release_pending() # make this optional? + self._release_pending() # make this optional? else: if self.failed: self.handler.on_transaction_aborted(event) @@ -406,16 +439,19 @@ class Transaction(object): self.handler.on_transaction_committed(event) self._clear_pending() + class LinkOption(object): """ Abstract interface for link configuration options """ + def apply(self, link): """ Subclasses will implement any configuration logic in this method """ pass + def test(self, link): """ Subclasses can override this to selectively apply an option @@ -423,23 +459,30 @@ class LinkOption(object): """ return True + class AtMostOnce(LinkOption): def apply(self, link): link.snd_settle_mode = Link.SND_SETTLED + class AtLeastOnce(LinkOption): def apply(self, link): link.snd_settle_mode = Link.SND_UNSETTLED link.rcv_settle_mode = Link.RCV_FIRST + class SenderOption(LinkOption): def apply(self, sender): pass + def test(self, link): return link.is_sender + class ReceiverOption(LinkOption): def apply(self, receiver): pass + def test(self, link): return link.is_receiver + class DynamicNodeProperties(LinkOption): def __init__(self, props={}): self.properties = {} @@ -455,6 +498,7 @@ class DynamicNodeProperties(LinkOption): else: link.target.properties.put_dict(self.properties) + class Filter(ReceiverOption): def __init__(self, filter_set={}): self.filter_set = filter_set @@ -462,26 +506,32 @@ class Filter(ReceiverOption): def apply(self, receiver): receiver.source.filter.put_dict(self.filter_set) + class Selector(Filter): """ Configures a link with a message selector filter """ + def __init__(self, value, name='selector'): super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)}) + class DurableSubscription(ReceiverOption): def apply(self, receiver): receiver.source.durability = Terminus.DELIVERIES receiver.source.expiry_policy = Terminus.EXPIRE_NEVER + class Move(ReceiverOption): def apply(self, receiver): receiver.source.distribution_mode = Terminus.DIST_MODE_MOVE + class Copy(ReceiverOption): def apply(self, receiver): receiver.source.distribution_mode = Terminus.DIST_MODE_COPY + def _apply_link_options(options, link): if options: if isinstance(options, list): @@ -490,6 +540,7 @@ def _apply_link_options(options, link): else: if options.test(link): options.apply(link) + def _create_session(connection, handler=None): session = connection.session() session.open() @@ -502,6 +553,7 @@ def _get_attr(target, name): else: return None + class SessionPerConnection(object): def __init__(self): self._default_session = None @@ -511,11 +563,13 @@ class SessionPerConnection(object): self._default_session = _create_session(connection) return self._default_session + class GlobalOverrides(object): """ Internal handler that triggers the necessary socket connect for an opened connection. """ + def __init__(self, base): self.base = base @@ -527,11 +581,13 @@ class GlobalOverrides(object): conn = event.connection return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides) + class Connector(Handler): """ Internal handler that triggers the necessary socket connect for an opened connection. """ + def __init__(self, connection): self.connection = connection self.address = None @@ -548,7 +604,7 @@ class Connector(Handler): self.max_frame_size = None def _connect(self, connection, reactor): - assert(reactor is not None) + assert (reactor is not None) url = self.address.next() reactor.set_connection_host(connection, url.host, str(url.port)) # if virtual-host not set, use host from address as default @@ -615,11 +671,13 @@ class Connector(Handler): def on_timer_task(self, event): self._connect(self.connection, event.reactor) + class Backoff(object): """ A reconnect strategy involving an increasing delay between retries, up to a maximum or 10 seconds. """ + def __init__(self): self.delay = 0 @@ -631,9 +689,10 @@ class Backoff(object): if current == 0: self.delay = 0.1 else: - self.delay = min(10, 2*current) + self.delay = min(10, 2 * current) return current + class Urls(object): def __init__(self, values): self.values = [Url(v) for v in values] @@ -649,6 +708,7 @@ class Urls(object): self.i = iter(self.values) return next(self.i) + class SSLConfig(object): def __init__(self): self.client = SSLDomain(SSLDomain.MODE_CLIENT) @@ -670,6 +730,7 @@ class Container(Reactor): an extension to the Reactor class that adds convenience methods for creating connections and sender- or receiver- links. """ + def __init__(self, *handlers, **kwargs): super(Container, self).__init__(*handlers, **kwargs) if "impl" not in kwargs: @@ -687,7 +748,8 @@ class Container(Reactor): self.password = None Wrapper.__setattr__(self, 'subclass', self.__class__) - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs): + def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, + **kwargs): """ Initiates the establishment of an AMQP connection. Returns an instance of proton.Connection. @@ -748,10 +810,14 @@ class Container(Reactor): connector.max_frame_size = kwargs.get('max_frame_size') conn._overrides = connector - if url: connector.address = Urls([url]) - elif urls: connector.address = Urls(urls) - elif address: connector.address = address - else: raise ValueError("One of url, urls or address required") + if url: + connector.address = Urls([url]) + elif urls: + connector.address = Urls(urls) + elif address: + connector.address = address + else: + raise ValueError("One of url, urls or address required") if heartbeat: connector.heartbeat = heartbeat if reconnect: @@ -761,15 +827,19 @@ class Container(Reactor): # use container's default client domain if none specified. This is # only necessary of the URL specifies the "amqps:" scheme connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client) - conn._session_policy = SessionPerConnection() #todo: make configurable + conn._session_policy = SessionPerConnection() # todo: make configurable conn.open() return conn def _get_id(self, container, remote, local): - if local and remote: "%s-%s-%s" % (container, remote, local) - elif local: return "%s-%s" % (container, local) - elif remote: return "%s-%s" % (container, remote) - else: return "%s-%s" % (container, str(generate_uuid())) + if local and remote: + "%s-%s-%s" % (container, remote, local) + elif local: + return "%s-%s" % (container, local) + elif remote: + return "%s-%s" % (container, remote) + else: + return "%s-%s" % (container, str(generate_uuid())) def _get_session(self, context): if isinstance(context, Url): @@ -806,7 +876,7 @@ class Container(Reactor): Various LinkOptions can be specified to further control the attachment. """ - if isinstance(context, _compat.string_types): + if isstring(context): context = Url(context) if isinstance(context, Url) and not target: target = context.path @@ -847,7 +917,7 @@ class Container(Reactor): Various LinkOptions can be specified to further control the attachment. """ - if isinstance(context, _compat.string_types): + if isstring(context): context = Url(context) if isinstance(context, Url) and not source: source = context.path http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/utils.py ---------------------------------------------------------------------- diff --git a/python/proton/utils.py b/python/proton/utils.py index 1d052d0..c6f8cb4 100644 --- a/python/proton/utils.py +++ b/python/proton/utils.py @@ -38,7 +38,8 @@ class BlockingLink(object): self.connection.wait(lambda: self.link.state & Endpoint.REMOTE_CLOSED, timeout=timeout, msg="Opening link %s" % self.link.name) - except Timeout as e: pass + except Timeout as e: + pass self._checkClosed() def _checkClosed(self): @@ -53,31 +54,37 @@ class BlockingLink(object): msg="Closing link %s" % self.link.name) # Access to other link attributes. - def __getattr__(self, name): return getattr(self.link, name) + def __getattr__(self, name): + return getattr(self.link, name) + class SendException(ProtonException): """ Exception used to indicate an exceptional state/condition on a send request """ + def __init__(self, state): self.state = state + def _is_settled(delivery): return delivery.settled or delivery.link.snd_settle_mode == Link.SND_SETTLED + class BlockingSender(BlockingLink): def __init__(self, connection, sender): super(BlockingSender, self).__init__(connection, sender) if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address: - #this may be followed by a detach, which may contain an error condition, so wait a little... + # this may be followed by a detach, which may contain an error condition, so wait a little... self._waitForClose() - #...but close ourselves if peer does not + # ...but close ourselves if peer does not self.link.close() raise LinkException("Failed to open sender %s, target does not match" % self.link.name) def send(self, msg, timeout=False, error_states=None): delivery = self.link.send(msg) - self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name, timeout=timeout) + self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name, + timeout=timeout) if delivery.link.snd_settle_mode != Link.SND_SETTLED: delivery.settle() bad = error_states @@ -87,6 +94,7 @@ class BlockingSender(BlockingLink): raise SendException(delivery.remote_state) return delivery + class Fetcher(MessagingHandler): def __init__(self, connection, prefetch): super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False) @@ -96,7 +104,7 @@ class Fetcher(MessagingHandler): def on_message(self, event): self.incoming.append((event.message, event.delivery)) - self.connection.container.yield_() # Wake up the wait() loop to handle the message. + self.connection.container.yield_() # Wake up the wait() loop to handle the message. def on_link_error(self, event): if event.link.state & Endpoint.LOCAL_ACTIVE: @@ -129,9 +137,9 @@ class BlockingReceiver(BlockingLink): def __init__(self, connection, receiver, fetcher, credit=1): super(BlockingReceiver, self).__init__(connection, receiver) if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address: - #this may be followed by a detach, which may contain an error condition, so wait a little... + # this may be followed by a detach, which may contain an error condition, so wait a little... self._waitForClose() - #...but close ourselves if peer does not + # ...but close ourselves if peer does not self.link.close() raise LinkException("Failed to open receiver %s, source does not match" % self.link.name) if credit: receiver.flow(credit) @@ -151,7 +159,8 @@ class BlockingReceiver(BlockingLink): raise Exception("Can't call receive on this receiver as a handler was provided") if not self.link.credit: self.link.flow(1) - self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, timeout=timeout) + self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, + timeout=timeout) return self.fetcher.pop() def accept(self): @@ -210,6 +219,7 @@ class BlockingConnection(Handler): object operations are enclosed in a try block and that close() is always executed on exit. """ + def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None, **kwargs): self.disconnected = False self.timeout = timeout or 60 @@ -221,7 +231,8 @@ class BlockingConnection(Handler): self.closing = False failed = True try: - self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat, **kwargs) + self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, + heartbeat=heartbeat, **kwargs) self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), msg="Opening connection") failed = False @@ -230,7 +241,8 @@ class BlockingConnection(Handler): self.close() def create_sender(self, address, handler=None, name=None, options=None): - return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler, options=options)) + return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler, + options=options)) def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None): prefetch = credit @@ -241,7 +253,9 @@ class BlockingConnection(Handler): else: fetcher = Fetcher(self, credit) return BlockingReceiver( - self, self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, options=options), fetcher, credit=prefetch) + self, + self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, + options=options), fetcher, credit=prefetch) def close(self): # TODO: provide stronger interrupt protection on cleanup. See PEP 419 @@ -259,8 +273,8 @@ class BlockingConnection(Handler): # Nothing left to block on. Allow reactor to clean up. self.run() self.conn = None - self.container.global_handler = None # break circular ref: container to cadapter.on_error - pn_collector_release(pn_reactor_collector(self.container._impl)) # straggling event may keep reactor alive + self.container.global_handler = None # break circular ref: container to cadapter.on_error + pn_collector_release(pn_reactor_collector(self.container._impl)) # straggling event may keep reactor alive self.container = None def _is_closed(self): @@ -294,7 +308,7 @@ class BlockingConnection(Handler): self.container.timeout = container_timeout if self.disconnected or self._is_closed(): self.container.stop() - self.conn.handler = None # break cyclical reference + self.conn.handler = None # break cyclical reference if self.disconnected and not self._is_closed(): raise ConnectionException( "Connection %s disconnected: %s" % (self.url, self.disconnected)) @@ -320,6 +334,7 @@ class BlockingConnection(Handler): def on_transport_closed(self, event): self.disconnected = event.transport.condition or "unknown" + class AtomicCount(object): def __init__(self, start=0, step=1): """Thread-safe atomic counter. Start at start, increment by step.""" @@ -334,6 +349,7 @@ class AtomicCount(object): self.lock.release() return result + class SyncRequestResponse(IncomingMessageHandler): """ Implementation of the synchronous request-response (aka RPC) pattern. @@ -374,12 +390,14 @@ class SyncRequestResponse(IncomingMessageHandler): request.reply_to = self.reply_to request.correlation_id = correlation_id = str(self.correlation_id.next()) self.sender.send(request) + def wakeup(): return self.response and (self.response.correlation_id == correlation_id) + self.connection.wait(wakeup, msg="Waiting for response") response = self.response - self.response = None # Ready for next response. - self.receiver.flow(1) # Set up credit for the next response. + self.response = None # Ready for next response. + self.receiver.flow(1) # Set up credit for the next response. return response @property @@ -390,4 +408,4 @@ class SyncRequestResponse(IncomingMessageHandler): def on_message(self, event): """Called when we receive a message for our receiver.""" self.response = event.message - self.connection.container.yield_() # Wake up the wait() loop to handle the message. + self.connection.container.yield_() # Wake up the wait() loop to handle the message. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
