http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6a78d2f7/proton-c/bindings/python/proton/__init__.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py new file mode 100644 index 0000000..fce3255 --- /dev/null +++ b/proton-c/bindings/python/proton/__init__.py @@ -0,0 +1,3891 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +The proton module defines a suite of APIs that implement the AMQP 1.0 +protocol. + +The proton APIs consist of the following classes: + + - L{Messenger} -- A messaging endpoint. + - L{Message} -- A class for creating and/or accessing AMQP message content. + - L{Data} -- A class for creating and/or accessing arbitrary AMQP encoded + data. + +""" + +from cproton import * + +import weakref, re, socket +try: + import uuid +except ImportError: + """ + No 'native' UUID support. Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases. + """ + import struct + class uuid: + class UUID: + def __init__(self, hex=None, bytes=None): + if [hex, bytes].count(None) != 1: + raise TypeError("need one of hex or bytes") + if bytes is not None: + self.bytes = bytes + elif hex is not None: + fields=hex.split("-") + fields[4:5] = [fields[4][:4], fields[4][4:]] + self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields]) + + def __cmp__(self, other): + if isinstance(other, uuid.UUID): + return cmp(self.bytes, other.bytes) + else: + return -1 + + def __str__(self): + return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes) + + def __repr__(self): + return "UUID(%r)" % str(self) + + def __hash__(self): + return self.bytes.__hash__() + + import os, random, socket, time + rand = random.Random() + rand.seed((os.getpid(), time.time(), socket.gethostname())) + def random_uuid(): + bytes = [rand.randint(0, 255) for i in xrange(16)] + + # From RFC4122, the version bits are set to 0100 + bytes[7] &= 0x0F + bytes[7] |= 0x40 + + # From RFC4122, the top two bits of byte 8 get set to 01 + bytes[8] &= 0x3F + bytes[8] |= 0x80 + return "".join(map(chr, bytes)) + + def uuid4(): + return uuid.UUID(bytes=random_uuid()) + +try: + bytes() +except NameError: + bytes = str + +VERSION_MAJOR = PN_VERSION_MAJOR +VERSION_MINOR = PN_VERSION_MINOR +API_LANGUAGE = "C" +IMPLEMENTATION_LANGUAGE = "C" + +class Constant(object): + + def __init__(self, name): + self.name = name + + def __repr__(self): + return self.name + +class ProtonException(Exception): + """ + The root of the proton exception hierarchy. All proton exception + classes derive from this exception. + """ + pass + +class Timeout(ProtonException): + """ + A timeout exception indicates that a blocking operation has timed + out. + """ + pass + +class Interrupt(ProtonException): + """ + An interrupt exception indicaes that a blocking operation was interrupted. + """ + pass + +class MessengerException(ProtonException): + """ + The root of the messenger exception hierarchy. All exceptions + generated by the messenger class derive from this exception. + """ + pass + +class MessageException(ProtonException): + """ + The MessageException class is the root of the message exception + hierarhcy. All exceptions generated by the Message class derive from + this exception. + """ + pass + +EXCEPTIONS = { + PN_TIMEOUT: Timeout, + PN_INTR: Interrupt + } + +PENDING = Constant("PENDING") +ACCEPTED = Constant("ACCEPTED") +REJECTED = Constant("REJECTED") +RELEASED = Constant("RELEASED") +ABORTED = Constant("ABORTED") +SETTLED = Constant("SETTLED") + +STATUSES = { + PN_STATUS_ABORTED: ABORTED, + PN_STATUS_ACCEPTED: ACCEPTED, + PN_STATUS_REJECTED: REJECTED, + PN_STATUS_RELEASED: RELEASED, + PN_STATUS_PENDING: PENDING, + PN_STATUS_SETTLED: SETTLED, + PN_STATUS_UNKNOWN: None + } + +AUTOMATIC = Constant("AUTOMATIC") +MANUAL = Constant("MANUAL") + +class Messenger(object): + """ + The L{Messenger} class defines a high level interface for sending + and receiving L{Messages<Message>}. Every L{Messenger} contains a + single logical queue of incoming messages and a single logical queue + of outgoing messages. These messages in these queues may be destined + for, or originate from, a variety of addresses. + + The messenger interface is single-threaded. All methods + except one (L{interrupt}) are intended to be used from within + the messenger thread. + + + Address Syntax + ============== + + An address has the following form:: + + [ amqp[s]:// ] [user[:password]@] domain [/[name]] + + Where domain can be one of:: + + host | host:port | ip | ip:port | name + + The following are valid examples of addresses: + + - example.org + - example.org:1234 + - amqp://example.org + - amqps://example.org + - example.org/incoming + - amqps://example.org/outgoing + - amqps://fred:[email protected] + - 127.0.0.1:1234 + - amqps://127.0.0.1:1234 + + Sending & Receiving Messages + ============================ + + The L{Messenger} class works in conjuction with the L{Message} class. The + L{Message} class is a mutable holder of message content. + + The L{put} method copies its L{Message} to the outgoing queue, and may + send queued messages if it can do so without blocking. The L{send} + method blocks until it has sent the requested number of messages, + or until a timeout interrupts the attempt. + + + >>> message = Message() + >>> for i in range(3): + ... message.address = "amqp://host/queue" + ... message.subject = "Hello World %i" % i + ... messenger.put(message) + >>> messenger.send() + + Similarly, the L{recv} method receives messages into the incoming + queue, and may block as it attempts to receive the requested number + of messages, or until timeout is reached. It may receive fewer + than the requested number. The L{get} method pops the + eldest L{Message} off the incoming queue and copies it into the L{Message} + object that you supply. It will not block. + + + >>> message = Message() + >>> messenger.recv(10): + >>> while messenger.incoming > 0: + ... messenger.get(message) + ... print message.subject + Hello World 0 + Hello World 1 + Hello World 2 + + The blocking flag allows you to turn off blocking behavior entirely, + in which case L{send} and L{recv} will do whatever they can without + blocking, and then return. You can then look at the number + of incoming and outgoing messages to see how much outstanding work + still remains. + """ + + def __init__(self, name=None): + """ + Construct a new L{Messenger} with the given name. The name has + global scope. If a NULL name is supplied, a UUID based name will + be chosen. + + @type name: string + @param name: the name of the messenger or None + + """ + self._mng = pn_messenger(name) + self._selectables = {} + + def __del__(self): + """ + Destroy the L{Messenger}. This will close all connections that + are managed by the L{Messenger}. Call the L{stop} method before + destroying the L{Messenger}. + """ + if hasattr(self, "_mng"): + pn_messenger_free(self._mng) + del self._mng + + def _check(self, err): + if err < 0: + if (err == PN_INPROGRESS): + return + exc = EXCEPTIONS.get(err, MessengerException) + raise exc("[%s]: %s" % (err, pn_error_text(pn_messenger_error(self._mng)))) + else: + return err + + @property + def name(self): + """ + The name of the L{Messenger}. + """ + return pn_messenger_name(self._mng) + + def _get_certificate(self): + return pn_messenger_get_certificate(self._mng) + + def _set_certificate(self, value): + self._check(pn_messenger_set_certificate(self._mng, value)) + + certificate = property(_get_certificate, _set_certificate, + doc=""" +Path to a certificate file for the L{Messenger}. This certificate is +used when the L{Messenger} accepts or establishes SSL/TLS connections. +This property must be specified for the L{Messenger} to accept +incoming SSL/TLS connections and to establish client authenticated +outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS +connections do not require this property. +""") + + def _get_private_key(self): + return pn_messenger_get_private_key(self._mng) + + def _set_private_key(self, value): + self._check(pn_messenger_set_private_key(self._mng, value)) + + private_key = property(_get_private_key, _set_private_key, + doc=""" +Path to a private key file for the L{Messenger's<Messenger>} +certificate. This property must be specified for the L{Messenger} to +accept incoming SSL/TLS connections and to establish client +authenticated outgoing SSL/TLS connection. Non client authenticated +SSL/TLS connections do not require this property. +""") + + def _get_password(self): + return pn_messenger_get_password(self._mng) + + def _set_password(self, value): + self._check(pn_messenger_set_password(self._mng, value)) + + password = property(_get_password, _set_password, + doc=""" +This property contains the password for the L{Messenger.private_key} +file, or None if the file is not encrypted. +""") + + def _get_trusted_certificates(self): + return pn_messenger_get_trusted_certificates(self._mng) + + def _set_trusted_certificates(self, value): + self._check(pn_messenger_set_trusted_certificates(self._mng, value)) + + trusted_certificates = property(_get_trusted_certificates, + _set_trusted_certificates, + doc=""" +A path to a database of trusted certificates for use in verifying the +peer on an SSL/TLS connection. If this property is None, then the peer +will not be verified. +""") + + def _get_timeout(self): + t = pn_messenger_get_timeout(self._mng) + if t == -1: + return None + else: + return millis2secs(t) + + def _set_timeout(self, value): + if value is None: + t = -1 + else: + t = secs2millis(value) + self._check(pn_messenger_set_timeout(self._mng, t)) + + timeout = property(_get_timeout, _set_timeout, + doc=""" +The timeout property contains the default timeout for blocking +operations performed by the L{Messenger}. +""") + + def _is_blocking(self): + return pn_messenger_is_blocking(self._mng) + + def _set_blocking(self, b): + self._check(pn_messenger_set_blocking(self._mng, b)) + + blocking = property(_is_blocking, _set_blocking, + doc=""" +Enable or disable blocking behavior during L{Message} sending +and receiving. This affects every blocking call, with the +exception of L{work}. Currently, the affected calls are +L{send}, L{recv}, and L{stop}. +""") + + def _is_passive(self): + return pn_messenger_is_passive(self._mng) + + def _set_passive(self, b): + self._check(pn_messenger_set_passive(self._mng, b)) + + passive = property(_is_passive, _set_passive, + doc=""" +When passive is set to true, Messenger will not attempt to perform I/O +internally. In this mode it is necessary to use the selectables API to +drive any I/O needed to perform requested actions. In this mode +Messenger will never block. +""") + + def _get_incoming_window(self): + return pn_messenger_get_incoming_window(self._mng) + + def _set_incoming_window(self, window): + self._check(pn_messenger_set_incoming_window(self._mng, window)) + + incoming_window = property(_get_incoming_window, _set_incoming_window, + doc=""" +The incoming tracking window for the messenger. The messenger will +track the remote status of this many incoming deliveries after they +have been accepted or rejected. Defaults to zero. + +L{Messages<Message>} enter this window only when you take them into your application +using L{get}. If your incoming window size is I{n}, and you get I{n}+1 L{messages<Message>} +without explicitly accepting or rejecting the oldest message, then the +message that passes beyond the edge of the incoming window will be assigned +the default disposition of its link. +""") + + def _get_outgoing_window(self): + return pn_messenger_get_outgoing_window(self._mng) + + def _set_outgoing_window(self, window): + self._check(pn_messenger_set_outgoing_window(self._mng, window)) + + outgoing_window = property(_get_outgoing_window, _set_outgoing_window, + doc=""" +The outgoing tracking window for the messenger. The messenger will +track the remote status of this many outgoing deliveries after calling +send. Defaults to zero. + +A L{Message} enters this window when you call the put() method with the +message. If your outgoing window size is I{n}, and you call L{put} I{n}+1 +times, status information will no longer be available for the +first message. +""") + + def start(self): + """ + Currently a no-op placeholder. + For future compatibility, do not L{send} or L{recv} messages + before starting the L{Messenger}. + """ + self._check(pn_messenger_start(self._mng)) + + def stop(self): + """ + Transitions the L{Messenger} to an inactive state. An inactive + L{Messenger} will not send or receive messages from its internal + queues. A L{Messenger} should be stopped before being discarded to + ensure a clean shutdown handshake occurs on any internally managed + connections. + """ + self._check(pn_messenger_stop(self._mng)) + + @property + def stopped(self): + """ + Returns true iff a L{Messenger} is in the stopped state. + This function does not block. + """ + return pn_messenger_stopped(self._mng) + + def subscribe(self, source): + """ + Subscribes the L{Messenger} to messages originating from the + specified source. The source is an address as specified in the + L{Messenger} introduction with the following addition. If the + domain portion of the address begins with the '~' character, the + L{Messenger} will interpret the domain as host/port, bind to it, + and listen for incoming messages. For example "~0.0.0.0", + "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any + local interface and listen for incoming messages with the last + variant only permitting incoming SSL connections. + + @type source: string + @param source: the source of messages to subscribe to + """ + sub_impl = pn_messenger_subscribe(self._mng, source) + if not sub_impl: + self._check(pn_error_code(pn_messenger_error(self._mng))) + raise MessengerException("Cannot subscribe to %s"%source) + return Subscription(sub_impl) + + def put(self, message): + """ + Places the content contained in the message onto the outgoing + queue of the L{Messenger}. This method will never block, however + it will send any unblocked L{Messages<Message>} in the outgoing + queue immediately and leave any blocked L{Messages<Message>} + remaining in the outgoing queue. The L{send} call may be used to + block until the outgoing queue is empty. The L{outgoing} property + may be used to check the depth of the outgoing queue. + + When the content in a given L{Message} object is copied to the outgoing + message queue, you may then modify or discard the L{Message} object + without having any impact on the content in the outgoing queue. + + This method returns an outgoing tracker for the L{Message}. The tracker + can be used to determine the delivery status of the L{Message}. + + @type message: Message + @param message: the message to place in the outgoing queue + @return: a tracker + """ + message._pre_encode() + self._check(pn_messenger_put(self._mng, message._msg)) + return pn_messenger_outgoing_tracker(self._mng) + + def status(self, tracker): + """ + Gets the last known remote state of the delivery associated with + the given tracker. + + @type tracker: tracker + @param tracker: the tracker whose status is to be retrieved + + @return: one of None, PENDING, REJECTED, or ACCEPTED + """ + disp = pn_messenger_status(self._mng, tracker); + return STATUSES.get(disp, disp) + + def buffered(self, tracker): + """ + Checks if the delivery associated with the given tracker is still + waiting to be sent. + + @type tracker: tracker + @param tracker: the tracker whose status is to be retrieved + + @return: true if delivery is still buffered + """ + return pn_messenger_buffered(self._mng, tracker); + + def settle(self, tracker=None): + """ + Frees a L{Messenger} from tracking the status associated with a given + tracker. If you don't supply a tracker, all outgoing L{messages<Message>} up + to the most recent will be settled. + """ + if tracker is None: + tracker = pn_messenger_outgoing_tracker(self._mng) + flags = PN_CUMULATIVE + else: + flags = 0 + self._check(pn_messenger_settle(self._mng, tracker, flags)) + + def send(self, n=-1): + """ + This call will block until the indicated number of L{messages<Message>} + have been sent, or until the operation times out. If n is -1 this call will + block until all outgoing L{messages<Message>} have been sent. If n is 0 then + this call will send whatever it can without blocking. + """ + self._check(pn_messenger_send(self._mng, n)) + + def recv(self, n=None): + """ + Receives up to I{n} L{messages<Message>} into the incoming queue. If no value + for I{n} is supplied, this call will receive as many L{messages<Message>} as it + can buffer internally. If the L{Messenger} is in blocking mode, this + call will block until at least one L{Message} is available in the + incoming queue. + """ + if n is None: + n = -1 + self._check(pn_messenger_recv(self._mng, n)) + + def work(self, timeout=None): + """ + Sends or receives any outstanding L{messages<Message>} queued for a L{Messenger}. + This will block for the indicated timeout. + This method may also do I/O work other than sending and receiving + L{messages<Message>}. For example, closing connections after messenger.L{stop}() + has been called. + """ + if timeout is None: + t = -1 + else: + t = secs2millis(timeout) + err = pn_messenger_work(self._mng, t) + if (err == PN_TIMEOUT): + return False + else: + self._check(err) + return True + + @property + def receiving(self): + return pn_messenger_receiving(self._mng) + + def interrupt(self): + """ + The L{Messenger} interface is single-threaded. + This is the only L{Messenger} function intended to be called + from outside of the L{Messenger} thread. + Call this from a non-messenger thread to interrupt + a L{Messenger} that is blocking. + This will cause any in-progress blocking call to throw + the L{Interrupt} exception. If there is no currently blocking + call, then the next blocking call will be affected, even if it + is within the same thread that interrupt was called from. + """ + self._check(pn_messenger_interrupt(self._mng)) + + def get(self, message=None): + """ + Moves the message from the head of the incoming message queue into + the supplied message object. Any content in the message will be + overwritten. + + A tracker for the incoming L{Message} is returned. The tracker can + later be used to communicate your acceptance or rejection of the + L{Message}. + + If None is passed in for the L{Message} object, the L{Message} + popped from the head of the queue is discarded. + + @type message: Message + @param message: the destination message object + @return: a tracker + """ + if message is None: + impl = None + else: + impl = message._msg + self._check(pn_messenger_get(self._mng, impl)) + if message is not None: + message._post_decode() + return pn_messenger_incoming_tracker(self._mng) + + def accept(self, tracker=None): + """ + Signal the sender that you have acted on the L{Message} + pointed to by the tracker. If no tracker is supplied, + then all messages that have been returned by the L{get} + method are accepted, except those that have already been + auto-settled by passing beyond your incoming window size. + + @type tracker: tracker + @param tracker: a tracker as returned by get + """ + if tracker is None: + tracker = pn_messenger_incoming_tracker(self._mng) + flags = PN_CUMULATIVE + else: + flags = 0 + self._check(pn_messenger_accept(self._mng, tracker, flags)) + + def reject(self, tracker=None): + """ + Rejects the L{Message} indicated by the tracker. If no tracker + is supplied, all messages that have been returned by the L{get} + method are rejected, except those that have already been auto-settled + by passing beyond your outgoing window size. + + @type tracker: tracker + @param tracker: a tracker as returned by get + """ + if tracker is None: + tracker = pn_messenger_incoming_tracker(self._mng) + flags = PN_CUMULATIVE + else: + flags = 0 + self._check(pn_messenger_reject(self._mng, tracker, flags)) + + @property + def outgoing(self): + """ + The outgoing queue depth. + """ + return pn_messenger_outgoing(self._mng) + + @property + def incoming(self): + """ + The incoming queue depth. + """ + return pn_messenger_incoming(self._mng) + + def route(self, pattern, address): + """ + Adds a routing rule to a L{Messenger's<Messenger>} internal routing table. + + The route procedure may be used to influence how a L{Messenger} will + internally treat a given address or class of addresses. Every call + to the route procedure will result in L{Messenger} appending a routing + rule to its internal routing table. + + Whenever a L{Message} is presented to a L{Messenger} for delivery, it + will match the address of this message against the set of routing + rules in order. The first rule to match will be triggered, and + instead of routing based on the address presented in the message, + the L{Messenger} will route based on the address supplied in the rule. + + The pattern matching syntax supports two types of matches, a '%' + will match any character except a '/', and a '*' will match any + character including a '/'. + + A routing address is specified as a normal AMQP address, however it + may additionally use substitution variables from the pattern match + that triggered the rule. + + Any message sent to "foo" will be routed to "amqp://foo.com": + + >>> messenger.route("foo", "amqp://foo.com"); + + Any message sent to "foobar" will be routed to + "amqp://foo.com/bar": + + >>> messenger.route("foobar", "amqp://foo.com/bar"); + + Any message sent to bar/<path> will be routed to the corresponding + path within the amqp://bar.com domain: + + >>> messenger.route("bar/*", "amqp://bar.com/$1"); + + Route all L{messages<Message>} over TLS: + + >>> messenger.route("amqp:*", "amqps:$1") + + Supply credentials for foo.com: + + >>> messenger.route("amqp://foo.com/*", "amqp://user:[email protected]/$1"); + + Supply credentials for all domains: + + >>> messenger.route("amqp://*", "amqp://user:password@$1"); + + Route all addresses through a single proxy while preserving the + original destination: + + >>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2"); + + Route any address through a single broker: + + >>> messenger.route("*", "amqp://user:password@broker/$1"); + """ + self._check(pn_messenger_route(self._mng, pattern, address)) + + def rewrite(self, pattern, address): + """ + Similar to route(), except that the destination of + the L{Message} is determined before the message address is rewritten. + + The outgoing address is only rewritten after routing has been + finalized. If a message has an outgoing address of + "amqp://0.0.0.0:5678", and a rewriting rule that changes its + outgoing address to "foo", it will still arrive at the peer that + is listening on "amqp://0.0.0.0:5678", but when it arrives there, + the receiver will see its outgoing address as "foo". + + The default rewrite rule removes username and password from addresses + before they are transmitted. + """ + self._check(pn_messenger_rewrite(self._mng, pattern, address)) + + def selectable(self): + impl = pn_messenger_selectable(self._mng) + if impl: + fd = pn_selectable_fd(impl) + sel = self._selectables.get(fd, None) + if sel is None: + sel = Selectable(self, impl) + self._selectables[fd] = sel + return sel + else: + return None + + @property + def deadline(self): + tstamp = pn_messenger_deadline(self._mng) + if tstamp: + return millis2secs(tstamp) + else: + return None + +class Message(object): + """The L{Message} class is a mutable holder of message content. + + @ivar instructions: delivery instructions for the message + @type instructions: dict + @ivar annotations: infrastructure defined message annotations + @type annotations: dict + @ivar properties: application defined message properties + @type properties: dict + @ivar body: message body + @type body: bytes | unicode | dict | list | int | long | float | UUID + """ + + DATA = PN_DATA + TEXT = PN_TEXT + AMQP = PN_AMQP + JSON = PN_JSON + + DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY + + def __init__(self, **kwargs): + """ + @param kwargs: Message property name/value pairs to initialise the Message + """ + self._msg = pn_message() + self._id = Data(pn_message_id(self._msg)) + self._correlation_id = Data(pn_message_correlation_id(self._msg)) + self.instructions = None + self.annotations = None + self.properties = None + self.body = None + for k,v in kwargs.iteritems(): + getattr(self, k) # Raise exception if it's not a valid attribute. + setattr(self, k, v) + + def __del__(self): + if hasattr(self, "_msg"): + pn_message_free(self._msg) + del self._msg + + def _check(self, err): + if err < 0: + exc = EXCEPTIONS.get(err, MessageException) + raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg)))) + else: + return err + + def _pre_encode(self): + inst = Data(pn_message_instructions(self._msg)) + ann = Data(pn_message_annotations(self._msg)) + props = Data(pn_message_properties(self._msg)) + body = Data(pn_message_body(self._msg)) + + inst.clear() + if self.instructions is not None: + inst.put_object(self.instructions) + ann.clear() + if self.annotations is not None: + ann.put_object(self.annotations) + props.clear() + if self.properties is not None: + props.put_object(self.properties) + body.clear() + if self.body is not None: + body.put_object(self.body) + + def _post_decode(self): + inst = Data(pn_message_instructions(self._msg)) + ann = Data(pn_message_annotations(self._msg)) + props = Data(pn_message_properties(self._msg)) + body = Data(pn_message_body(self._msg)) + + if inst.next(): + self.instructions = inst.get_object() + else: + self.instructions = None + if ann.next(): + self.annotations = ann.get_object() + else: + self.annotations = None + if props.next(): + self.properties = props.get_object() + else: + self.properties = None + if body.next(): + self.body = body.get_object() + else: + self.body = None + + def clear(self): + """ + Clears the contents of the L{Message}. All fields will be reset to + their default values. + """ + pn_message_clear(self._msg) + self.instructions = None + self.annotations = None + self.properties = None + self.body = None + + def _is_inferred(self): + return pn_message_is_inferred(self._msg) + + def _set_inferred(self, value): + self._check(pn_message_set_inferred(self._msg, bool(value))) + + inferred = property(_is_inferred, _set_inferred, doc=""" +The inferred flag for a message indicates how the message content +is encoded into AMQP sections. If inferred is true then binary and +list values in the body of the message will be encoded as AMQP DATA +and AMQP SEQUENCE sections, respectively. If inferred is false, +then all values in the body of the message will be encoded as AMQP +VALUE sections regardless of their type. +""") + + def _is_durable(self): + return pn_message_is_durable(self._msg) + + def _set_durable(self, value): + self._check(pn_message_set_durable(self._msg, bool(value))) + + durable = property(_is_durable, _set_durable, + doc=""" +The durable property indicates that the message should be held durably +by any intermediaries taking responsibility for the message. +""") + + def _get_priority(self): + return pn_message_get_priority(self._msg) + + def _set_priority(self, value): + self._check(pn_message_set_priority(self._msg, value)) + + priority = property(_get_priority, _set_priority, + doc=""" +The priority of the message. +""") + + def _get_ttl(self): + return millis2secs(pn_message_get_ttl(self._msg)) + + def _set_ttl(self, value): + self._check(pn_message_set_ttl(self._msg, secs2millis(value))) + + ttl = property(_get_ttl, _set_ttl, + doc=""" +The time to live of the message measured in seconds. Expired messages +may be dropped. +""") + + def _is_first_acquirer(self): + return pn_message_is_first_acquirer(self._msg) + + def _set_first_acquirer(self, value): + self._check(pn_message_set_first_acquirer(self._msg, bool(value))) + + first_acquirer = property(_is_first_acquirer, _set_first_acquirer, + doc=""" +True iff the recipient is the first to acquire the message. +""") + + def _get_delivery_count(self): + return pn_message_get_delivery_count(self._msg) + + def _set_delivery_count(self, value): + self._check(pn_message_set_delivery_count(self._msg, value)) + + delivery_count = property(_get_delivery_count, _set_delivery_count, + doc=""" +The number of delivery attempts made for this message. +""") + + + def _get_id(self): + return self._id.get_object() + def _set_id(self, value): + if type(value) in (int, long): + value = ulong(value) + self._id.rewind() + self._id.put_object(value) + id = property(_get_id, _set_id, + doc=""" +The id of the message. +""") + + def _get_user_id(self): + return pn_message_get_user_id(self._msg) + + def _set_user_id(self, value): + self._check(pn_message_set_user_id(self._msg, value)) + + user_id = property(_get_user_id, _set_user_id, + doc=""" +The user id of the message creator. +""") + + def _get_address(self): + return pn_message_get_address(self._msg) + + def _set_address(self, value): + self._check(pn_message_set_address(self._msg, value)) + + address = property(_get_address, _set_address, + doc=""" +The address of the message. +""") + + def _get_subject(self): + return pn_message_get_subject(self._msg) + + def _set_subject(self, value): + self._check(pn_message_set_subject(self._msg, value)) + + subject = property(_get_subject, _set_subject, + doc=""" +The subject of the message. +""") + + def _get_reply_to(self): + return pn_message_get_reply_to(self._msg) + + def _set_reply_to(self, value): + self._check(pn_message_set_reply_to(self._msg, value)) + + reply_to = property(_get_reply_to, _set_reply_to, + doc=""" +The reply-to address for the message. +""") + + def _get_correlation_id(self): + return self._correlation_id.get_object() + def _set_correlation_id(self, value): + if type(value) in (int, long): + value = ulong(value) + self._correlation_id.rewind() + self._correlation_id.put_object(value) + + correlation_id = property(_get_correlation_id, _set_correlation_id, + doc=""" +The correlation-id for the message. +""") + + def _get_content_type(self): + return pn_message_get_content_type(self._msg) + + def _set_content_type(self, value): + self._check(pn_message_set_content_type(self._msg, value)) + + content_type = property(_get_content_type, _set_content_type, + doc=""" +The content-type of the message. +""") + + def _get_content_encoding(self): + return pn_message_get_content_encoding(self._msg) + + def _set_content_encoding(self, value): + self._check(pn_message_set_content_encoding(self._msg, value)) + + content_encoding = property(_get_content_encoding, _set_content_encoding, + doc=""" +The content-encoding of the message. +""") + + def _get_expiry_time(self): + return millis2secs(pn_message_get_expiry_time(self._msg)) + + def _set_expiry_time(self, value): + self._check(pn_message_set_expiry_time(self._msg, secs2millis(value))) + + expiry_time = property(_get_expiry_time, _set_expiry_time, + doc=""" +The expiry time of the message. +""") + + def _get_creation_time(self): + return millis2secs(pn_message_get_creation_time(self._msg)) + + def _set_creation_time(self, value): + self._check(pn_message_set_creation_time(self._msg, secs2millis(value))) + + creation_time = property(_get_creation_time, _set_creation_time, + doc=""" +The creation time of the message. +""") + + def _get_group_id(self): + return pn_message_get_group_id(self._msg) + + def _set_group_id(self, value): + self._check(pn_message_set_group_id(self._msg, value)) + + group_id = property(_get_group_id, _set_group_id, + doc=""" +The group id of the message. +""") + + def _get_group_sequence(self): + return pn_message_get_group_sequence(self._msg) + + def _set_group_sequence(self, value): + self._check(pn_message_set_group_sequence(self._msg, value)) + + group_sequence = property(_get_group_sequence, _set_group_sequence, + doc=""" +The sequence of the message within its group. +""") + + def _get_reply_to_group_id(self): + return pn_message_get_reply_to_group_id(self._msg) + + def _set_reply_to_group_id(self, value): + self._check(pn_message_set_reply_to_group_id(self._msg, value)) + + reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id, + doc=""" +The group-id for any replies. +""") + + # XXX + def _get_format(self): + return pn_message_get_format(self._msg) + + def _set_format(self, value): + self._check(pn_message_set_format(self._msg, value)) + + format = property(_get_format, _set_format, + doc=""" +The format of the message. +""") + + def encode(self): + self._pre_encode() + sz = 16 + while True: + err, data = pn_message_encode(self._msg, sz) + if err == PN_OVERFLOW: + sz *= 2 + continue + else: + self._check(err) + return data + + def decode(self, data): + self._check(pn_message_decode(self._msg, data, len(data))) + self._post_decode() + + def load(self, data): + self._check(pn_message_load(self._msg, data)) + + def save(self): + sz = 16 + while True: + err, data = pn_message_save(self._msg, sz) + if err == PN_OVERFLOW: + sz *= 2 + continue + else: + self._check(err) + return data + + def __repr2__(self): + props = [] + for attr in ("inferred", "address", "reply_to", "durable", "ttl", + "priority", "first_acquirer", "delivery_count", "id", + "correlation_id", "user_id", "group_id", "group_sequence", + "reply_to_group_id", "instructions", "annotations", + "properties", "body"): + value = getattr(self, attr) + if value: props.append("%s=%r" % (attr, value)) + return "Message(%s)" % ", ".join(props) + + def __repr__(self): + tmp = pn_string(None) + err = pn_inspect(self._msg, tmp) + result = pn_string_get(tmp) + pn_free(tmp) + self._check(err) + return result + +class Subscription(object): + + def __init__(self, impl): + self._impl = impl + + @property + def address(self): + return pn_subscription_address(self._impl) + +class Selectable(object): + + def __init__(self, messenger, impl): + self.messenger = messenger + self._impl = impl + + def fileno(self): + if not self._impl: raise ValueError("selectable freed") + return pn_selectable_fd(self._impl) + + @property + def capacity(self): + if not self._impl: raise ValueError("selectable freed") + return pn_selectable_capacity(self._impl) + + @property + def pending(self): + if not self._impl: raise ValueError("selectable freed") + return pn_selectable_pending(self._impl) + + @property + def deadline(self): + if not self._impl: raise ValueError("selectable freed") + tstamp = pn_selectable_deadline(self._impl) + if tstamp: + return millis2secs(tstamp) + else: + return None + + def readable(self): + if not self._impl: raise ValueError("selectable freed") + pn_selectable_readable(self._impl) + + def writable(self): + if not self._impl: raise ValueError("selectable freed") + pn_selectable_writable(self._impl) + + def expired(self): + if not self._impl: raise ValueError("selectable freed") + pn_selectable_expired(self._impl) + + def _is_registered(self): + if not self._impl: raise ValueError("selectable freed") + return pn_selectable_is_registered(self._impl) + + def _set_registered(self, registered): + if not self._impl: raise ValueError("selectable freed") + pn_selectable_set_registered(self._impl, registered) + + registered = property(_is_registered, _set_registered, + doc=""" +The registered property may be get/set by an I/O polling system to +indicate whether the fd has been registered or not. +""") + + @property + def is_terminal(self): + if not self._impl: return True + return pn_selectable_is_terminal(self._impl) + + def free(self): + if self._impl: + del self.messenger._selectables[self.fileno()] + pn_selectable_free(self._impl) + self._impl = None + + def __del__(self): + self.free() + +class DataException(ProtonException): + """ + The DataException class is the root of the Data exception hierarchy. + All exceptions raised by the Data class extend this exception. + """ + pass + +class UnmappedType: + + def __init__(self, msg): + self.msg = msg + + def __repr__(self): + return "UnmappedType(%s)" % self.msg + +class ulong(long): + + def __repr__(self): + return "ulong(%s)" % long.__repr__(self) + +class timestamp(long): + + def __repr__(self): + return "timestamp(%s)" % long.__repr__(self) + +class symbol(unicode): + + def __repr__(self): + return "symbol(%s)" % unicode.__repr__(self) + +class char(unicode): + + def __repr__(self): + return "char(%s)" % unicode.__repr__(self) + +class Described(object): + + def __init__(self, descriptor, value): + self.descriptor = descriptor + self.value = value + + def __repr__(self): + return "Described(%r, %r)" % (self.descriptor, self.value) + + def __eq__(self, o): + if isinstance(o, Described): + return self.descriptor == o.descriptor and self.value == o.value + else: + return False + +UNDESCRIBED = Constant("UNDESCRIBED") + +class Array(object): + + def __init__(self, descriptor, type, *elements): + self.descriptor = descriptor + self.type = type + self.elements = elements + + def __repr__(self): + if self.elements: + els = ", %s" % (", ".join(map(repr, self.elements))) + else: + els = "" + return "Array(%r, %r%s)" % (self.descriptor, self.type, els) + + def __eq__(self, o): + if isinstance(o, Array): + return self.descriptor == o.descriptor and \ + self.type == o.type and self.elements == o.elements + else: + return False + +class Data: + """ + The L{Data} class provides an interface for decoding, extracting, + creating, and encoding arbitrary AMQP data. A L{Data} object + contains a tree of AMQP values. Leaf nodes in this tree correspond + to scalars in the AMQP type system such as L{ints<INT>} or + L{strings<STRING>}. Non-leaf nodes in this tree correspond to + compound values in the AMQP type system such as L{lists<LIST>}, + L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}. + The root node of the tree is the L{Data} object itself and can have + an arbitrary number of children. + + A L{Data} object maintains the notion of the current sibling node + and a current parent node. Siblings are ordered within their parent. + Values are accessed and/or added by using the L{next}, L{prev}, + L{enter}, and L{exit} methods to navigate to the desired location in + the tree and using the supplied variety of put_*/get_* methods to + access or add a value of the desired type. + + The put_* methods will always add a value I{after} the current node + in the tree. If the current node has a next sibling the put_* method + will overwrite the value on this node. If there is no current node + or the current node has no next sibling then one will be added. The + put_* methods always set the added/modified node to the current + node. The get_* methods read the value of the current node and do + not change which node is current. + + The following types of scalar values are supported: + + - L{NULL} + - L{BOOL} + - L{UBYTE} + - L{USHORT} + - L{SHORT} + - L{UINT} + - L{INT} + - L{ULONG} + - L{LONG} + - L{FLOAT} + - L{DOUBLE} + - L{BINARY} + - L{STRING} + - L{SYMBOL} + + The following types of compound values are supported: + + - L{DESCRIBED} + - L{ARRAY} + - L{LIST} + - L{MAP} + """ + + NULL = PN_NULL; "A null value." + BOOL = PN_BOOL; "A boolean value." + UBYTE = PN_UBYTE; "An unsigned byte value." + BYTE = PN_BYTE; "A signed byte value." + USHORT = PN_USHORT; "An unsigned short value." + SHORT = PN_SHORT; "A short value." + UINT = PN_UINT; "An unsigned int value." + INT = PN_INT; "A signed int value." + CHAR = PN_CHAR; "A character value." + ULONG = PN_ULONG; "An unsigned long value." + LONG = PN_LONG; "A signed long value." + TIMESTAMP = PN_TIMESTAMP; "A timestamp value." + FLOAT = PN_FLOAT; "A float value." + DOUBLE = PN_DOUBLE; "A double value." + DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value." + DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value." + DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value." + UUID = PN_UUID; "A UUID value." + BINARY = PN_BINARY; "A binary string." + STRING = PN_STRING; "A unicode string." + SYMBOL = PN_SYMBOL; "A symbolic string." + DESCRIBED = PN_DESCRIBED; "A described value." + ARRAY = PN_ARRAY; "An array value." + LIST = PN_LIST; "A list value." + MAP = PN_MAP; "A map value." + + type_names = { + NULL: "null", + BOOL: "bool", + BYTE: "byte", + UBYTE: "ubyte", + SHORT: "short", + USHORT: "ushort", + INT: "int", + UINT: "uint", + CHAR: "char", + LONG: "long", + ULONG: "ulong", + TIMESTAMP: "timestamp", + FLOAT: "float", + DOUBLE: "double", + DECIMAL32: "decimal32", + DECIMAL64: "decimal64", + DECIMAL128: "decimal128", + UUID: "uuid", + BINARY: "binary", + STRING: "string", + SYMBOL: "symbol", + DESCRIBED: "described", + ARRAY: "array", + LIST: "list", + MAP: "map" + } + + @classmethod + def type_name(type): return Data.type_names[type] + + def __init__(self, capacity=16): + if type(capacity) in (int, long): + self._data = pn_data(capacity) + self._free = True + else: + self._data = capacity + self._free = False + + def __del__(self): + if self._free and hasattr(self, "_data"): + pn_data_free(self._data) + del self._data + + def _check(self, err): + if err < 0: + exc = EXCEPTIONS.get(err, DataException) + raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data)))) + else: + return err + + def clear(self): + """ + Clears the data object. + """ + pn_data_clear(self._data) + + def rewind(self): + """ + Clears current node and sets the parent to the root node. Clearing the + current node sets it _before_ the first node, calling next() will advance to + the first node. + """ + assert self._data is not None + pn_data_rewind(self._data) + + def next(self): + """ + Advances the current node to its next sibling and returns its + type. If there is no next sibling the current node remains + unchanged and None is returned. + """ + found = pn_data_next(self._data) + if found: + return self.type() + else: + return None + + def prev(self): + """ + Advances the current node to its previous sibling and returns its + type. If there is no previous sibling the current node remains + unchanged and None is returned. + """ + found = pn_data_prev(self._data) + if found: + return self.type() + else: + return None + + def enter(self): + """ + Sets the parent node to the current node and clears the current node. + Clearing the current node sets it _before_ the first child, + call next() advances to the first child. + """ + return pn_data_enter(self._data) + + def exit(self): + """ + Sets the current node to the parent node and the parent node to + its own parent. + """ + return pn_data_exit(self._data) + + def lookup(self, name): + return pn_data_lookup(self._data, name) + + def narrow(self): + pn_data_narrow(self._data) + + def widen(self): + pn_data_widen(self._data) + + def type(self): + """ + Returns the type of the current node. + """ + dtype = pn_data_type(self._data) + if dtype == -1: + return None + else: + return dtype + + def encode(self): + """ + Returns a representation of the data encoded in AMQP format. + """ + size = 1024 + while True: + cd, enc = pn_data_encode(self._data, size) + if cd == PN_OVERFLOW: + size *= 2 + elif cd >= 0: + return enc + else: + self._check(cd) + + def decode(self, encoded): + """ + Decodes the first value from supplied AMQP data and returns the + number of bytes consumed. + + @type encoded: binary + @param encoded: AMQP encoded binary data + """ + return self._check(pn_data_decode(self._data, encoded)) + + def put_list(self): + """ + Puts a list value. Elements may be filled by entering the list + node and putting element values. + + >>> data = Data() + >>> data.put_list() + >>> data.enter() + >>> data.put_int(1) + >>> data.put_int(2) + >>> data.put_int(3) + >>> data.exit() + """ + self._check(pn_data_put_list(self._data)) + + def put_map(self): + """ + Puts a map value. Elements may be filled by entering the map node + and putting alternating key value pairs. + + >>> data = Data() + >>> data.put_map() + >>> data.enter() + >>> data.put_string("key") + >>> data.put_string("value") + >>> data.exit() + """ + self._check(pn_data_put_map(self._data)) + + def put_array(self, described, element_type): + """ + Puts an array value. Elements may be filled by entering the array + node and putting the element values. The values must all be of the + specified array element type. If an array is described then the + first child value of the array is the descriptor and may be of any + type. + + >>> data = Data() + >>> + >>> data.put_array(False, Data.INT) + >>> data.enter() + >>> data.put_int(1) + >>> data.put_int(2) + >>> data.put_int(3) + >>> data.exit() + >>> + >>> data.put_array(True, Data.DOUBLE) + >>> data.enter() + >>> data.put_symbol("array-descriptor") + >>> data.put_double(1.1) + >>> data.put_double(1.2) + >>> data.put_double(1.3) + >>> data.exit() + + @type described: bool + @param described: specifies whether the array is described + @type element_type: int + @param element_type: the type of the array elements + """ + self._check(pn_data_put_array(self._data, described, element_type)) + + def put_described(self): + """ + Puts a described value. A described node has two children, the + descriptor and the value. These are specified by entering the node + and putting the desired values. + + >>> data = Data() + >>> data.put_described() + >>> data.enter() + >>> data.put_symbol("value-descriptor") + >>> data.put_string("the value") + >>> data.exit() + """ + self._check(pn_data_put_described(self._data)) + + def put_null(self): + """ + Puts a null value. + """ + self._check(pn_data_put_null(self._data)) + + def put_bool(self, b): + """ + Puts a boolean value. + + @param b: a boolean value + """ + self._check(pn_data_put_bool(self._data, b)) + + def put_ubyte(self, ub): + """ + Puts an unsigned byte value. + + @param ub: an integral value + """ + self._check(pn_data_put_ubyte(self._data, ub)) + + def put_byte(self, b): + """ + Puts a signed byte value. + + @param b: an integral value + """ + self._check(pn_data_put_byte(self._data, b)) + + def put_ushort(self, us): + """ + Puts an unsigned short value. + + @param us: an integral value. + """ + self._check(pn_data_put_ushort(self._data, us)) + + def put_short(self, s): + """ + Puts a signed short value. + + @param s: an integral value + """ + self._check(pn_data_put_short(self._data, s)) + + def put_uint(self, ui): + """ + Puts an unsigned int value. + + @param ui: an integral value + """ + self._check(pn_data_put_uint(self._data, ui)) + + def put_int(self, i): + """ + Puts a signed int value. + + @param i: an integral value + """ + self._check(pn_data_put_int(self._data, i)) + + def put_char(self, c): + """ + Puts a char value. + + @param c: a single character + """ + self._check(pn_data_put_char(self._data, ord(c))) + + def put_ulong(self, ul): + """ + Puts an unsigned long value. + + @param ul: an integral value + """ + self._check(pn_data_put_ulong(self._data, ul)) + + def put_long(self, l): + """ + Puts a signed long value. + + @param l: an integral value + """ + self._check(pn_data_put_long(self._data, l)) + + def put_timestamp(self, t): + """ + Puts a timestamp value. + + @param t: an integral value + """ + self._check(pn_data_put_timestamp(self._data, t)) + + def put_float(self, f): + """ + Puts a float value. + + @param f: a floating point value + """ + self._check(pn_data_put_float(self._data, f)) + + def put_double(self, d): + """ + Puts a double value. + + @param d: a floating point value. + """ + self._check(pn_data_put_double(self._data, d)) + + def put_decimal32(self, d): + """ + Puts a decimal32 value. + + @param d: a decimal32 value + """ + self._check(pn_data_put_decimal32(self._data, d)) + + def put_decimal64(self, d): + """ + Puts a decimal64 value. + + @param d: a decimal64 value + """ + self._check(pn_data_put_decimal64(self._data, d)) + + def put_decimal128(self, d): + """ + Puts a decimal128 value. + + @param d: a decimal128 value + """ + self._check(pn_data_put_decimal128(self._data, d)) + + def put_uuid(self, u): + """ + Puts a UUID value. + + @param u: a uuid value + """ + self._check(pn_data_put_uuid(self._data, u.bytes)) + + def put_binary(self, b): + """ + Puts a binary value. + + @type b: binary + @param b: a binary value + """ + self._check(pn_data_put_binary(self._data, b)) + + def put_string(self, s): + """ + Puts a unicode value. + + @type s: unicode + @param s: a unicode value + """ + self._check(pn_data_put_string(self._data, s.encode("utf8"))) + + def put_symbol(self, s): + """ + Puts a symbolic value. + + @type s: string + @param s: the symbol name + """ + self._check(pn_data_put_symbol(self._data, s)) + + def get_list(self): + """ + If the current node is a list, return the number of elements, + otherwise return zero. List elements can be accessed by entering + the list. + + >>> count = data.get_list() + >>> data.enter() + >>> for i in range(count): + ... type = data.next() + ... if type == Data.STRING: + ... print data.get_string() + ... elif type == ...: + ... ... + >>> data.exit() + """ + return pn_data_get_list(self._data) + + def get_map(self): + """ + If the current node is a map, return the number of child elements, + otherwise return zero. Key value pairs can be accessed by entering + the map. + + >>> count = data.get_map() + >>> data.enter() + >>> for i in range(count/2): + ... type = data.next() + ... if type == Data.STRING: + ... print data.get_string() + ... elif type == ...: + ... ... + >>> data.exit() + """ + return pn_data_get_map(self._data) + + def get_array(self): + """ + If the current node is an array, return a tuple of the element + count, a boolean indicating whether the array is described, and + the type of each element, otherwise return (0, False, None). Array + data can be accessed by entering the array. + + >>> # read an array of strings with a symbolic descriptor + >>> count, described, type = data.get_array() + >>> data.enter() + >>> data.next() + >>> print "Descriptor:", data.get_symbol() + >>> for i in range(count): + ... data.next() + ... print "Element:", data.get_string() + >>> data.exit() + """ + count = pn_data_get_array(self._data) + described = pn_data_is_array_described(self._data) + type = pn_data_get_array_type(self._data) + if type == -1: + type = None + return count, described, type + + def is_described(self): + """ + Checks if the current node is a described value. The descriptor + and value may be accessed by entering the described value. + + >>> # read a symbolically described string + >>> assert data.is_described() # will error if the current node is not described + >>> data.enter() + >>> print data.get_symbol() + >>> print data.get_string() + >>> data.exit() + """ + return pn_data_is_described(self._data) + + def is_null(self): + """ + Checks if the current node is a null. + """ + return pn_data_is_null(self._data) + + def get_bool(self): + """ + If the current node is a boolean, returns its value, returns False + otherwise. + """ + return pn_data_get_bool(self._data) + + def get_ubyte(self): + """ + If the current node is an unsigned byte, returns its value, + returns 0 otherwise. + """ + return pn_data_get_ubyte(self._data) + + def get_byte(self): + """ + If the current node is a signed byte, returns its value, returns 0 + otherwise. + """ + return pn_data_get_byte(self._data) + + def get_ushort(self): + """ + If the current node is an unsigned short, returns its value, + returns 0 otherwise. + """ + return pn_data_get_ushort(self._data) + + def get_short(self): + """ + If the current node is a signed short, returns its value, returns + 0 otherwise. + """ + return pn_data_get_short(self._data) + + def get_uint(self): + """ + If the current node is an unsigned int, returns its value, returns + 0 otherwise. + """ + return pn_data_get_uint(self._data) + + def get_int(self): + """ + If the current node is a signed int, returns its value, returns 0 + otherwise. + """ + return pn_data_get_int(self._data) + + def get_char(self): + """ + If the current node is a char, returns its value, returns 0 + otherwise. + """ + return char(unichr(pn_data_get_char(self._data))) + + def get_ulong(self): + """ + If the current node is an unsigned long, returns its value, + returns 0 otherwise. + """ + return ulong(pn_data_get_ulong(self._data)) + + def get_long(self): + """ + If the current node is an signed long, returns its value, returns + 0 otherwise. + """ + return pn_data_get_long(self._data) + + def get_timestamp(self): + """ + If the current node is a timestamp, returns its value, returns 0 + otherwise. + """ + return timestamp(pn_data_get_timestamp(self._data)) + + def get_float(self): + """ + If the current node is a float, returns its value, raises 0 + otherwise. + """ + return pn_data_get_float(self._data) + + def get_double(self): + """ + If the current node is a double, returns its value, returns 0 + otherwise. + """ + return pn_data_get_double(self._data) + + # XXX: need to convert + def get_decimal32(self): + """ + If the current node is a decimal32, returns its value, returns 0 + otherwise. + """ + return pn_data_get_decimal32(self._data) + + # XXX: need to convert + def get_decimal64(self): + """ + If the current node is a decimal64, returns its value, returns 0 + otherwise. + """ + return pn_data_get_decimal64(self._data) + + # XXX: need to convert + def get_decimal128(self): + """ + If the current node is a decimal128, returns its value, returns 0 + otherwise. + """ + return pn_data_get_decimal128(self._data) + + def get_uuid(self): + """ + If the current node is a UUID, returns its value, returns None + otherwise. + """ + if pn_data_type(self._data) == Data.UUID: + return uuid.UUID(bytes=pn_data_get_uuid(self._data)) + else: + return None + + def get_binary(self): + """ + If the current node is binary, returns its value, returns "" + otherwise. + """ + return pn_data_get_binary(self._data) + + def get_string(self): + """ + If the current node is a string, returns its value, returns "" + otherwise. + """ + return pn_data_get_string(self._data).decode("utf8") + + def get_symbol(self): + """ + If the current node is a symbol, returns its value, returns "" + otherwise. + """ + return symbol(pn_data_get_symbol(self._data)) + + def copy(self, src): + self._check(pn_data_copy(self._data, src._data)) + + def format(self): + sz = 16 + while True: + err, result = pn_data_format(self._data, sz) + if err == PN_OVERFLOW: + sz *= 2 + continue + else: + self._check(err) + return result + + def dump(self): + pn_data_dump(self._data) + + def put_dict(self, d): + self.put_map() + self.enter() + try: + for k, v in d.items(): + self.put_object(k) + self.put_object(v) + finally: + self.exit() + + def get_dict(self): + if self.enter(): + try: + result = {} + while self.next(): + k = self.get_object() + if self.next(): + v = self.get_object() + else: + v = None + result[k] = v + finally: + self.exit() + return result + + def put_sequence(self, s): + self.put_list() + self.enter() + try: + for o in s: + self.put_object(o) + finally: + self.exit() + + def get_sequence(self): + if self.enter(): + try: + result = [] + while self.next(): + result.append(self.get_object()) + finally: + self.exit() + return result + + def get_py_described(self): + if self.enter(): + try: + self.next() + descriptor = self.get_object() + self.next() + value = self.get_object() + finally: + self.exit() + return Described(descriptor, value) + + def put_py_described(self, d): + self.put_described() + self.enter() + try: + self.put_object(d.descriptor) + self.put_object(d.value) + finally: + self.exit() + + def get_py_array(self): + """ + If the current node is an array, return an Array object + representing the array and its contents. Otherwise return None. + This is a convenience wrapper around get_array, enter, etc. + """ + + count, described, type = self.get_array() + if type is None: return None + if self.enter(): + try: + if described: + self.next() + descriptor = self.get_object() + else: + descriptor = UNDESCRIBED + elements = [] + while self.next(): + elements.append(self.get_object()) + finally: + self.exit() + return Array(descriptor, type, *elements) + + def put_py_array(self, a): + described = a.descriptor != UNDESCRIBED + self.put_array(described, a.type) + self.enter() + try: + if described: + self.put_object(a.descriptor) + for e in a.elements: + self.put_object(e) + finally: + self.exit() + + put_mappings = { + None.__class__: lambda s, _: s.put_null(), + bool: put_bool, + dict: put_dict, + list: put_sequence, + tuple: put_sequence, + unicode: put_string, + bytes: put_binary, + symbol: put_symbol, + int: put_long, + char: put_char, + long: put_long, + ulong: put_ulong, + timestamp: put_timestamp, + float: put_double, + uuid.UUID: put_uuid, + Described: put_py_described, + Array: put_py_array + } + get_mappings = { + NULL: lambda s: None, + BOOL: get_bool, + BYTE: get_byte, + UBYTE: get_ubyte, + SHORT: get_short, + USHORT: get_ushort, + INT: get_int, + UINT: get_uint, + CHAR: get_char, + LONG: get_long, + ULONG: get_ulong, + TIMESTAMP: get_timestamp, + FLOAT: get_float, + DOUBLE: get_double, + DECIMAL32: get_decimal32, + DECIMAL64: get_decimal64, + DECIMAL128: get_decimal128, + UUID: get_uuid, + BINARY: get_binary, + STRING: get_string, + SYMBOL: get_symbol, + DESCRIBED: get_py_described, + ARRAY: get_py_array, + LIST: get_sequence, + MAP: get_dict + } + + + def put_object(self, obj): + putter = self.put_mappings[obj.__class__] + putter(self, obj) + + def get_object(self): + type = self.type() + if type is None: return None + getter = self.get_mappings.get(type) + if getter: + return getter(self) + else: + return UnmappedType(str(type)) + +class ConnectionException(ProtonException): + pass + +class Endpoint(object): + + LOCAL_UNINIT = PN_LOCAL_UNINIT + REMOTE_UNINIT = PN_REMOTE_UNINIT + LOCAL_ACTIVE = PN_LOCAL_ACTIVE + REMOTE_ACTIVE = PN_REMOTE_ACTIVE + LOCAL_CLOSED = PN_LOCAL_CLOSED + REMOTE_CLOSED = PN_REMOTE_CLOSED + + def __init__(self): + self.condition = None + self._release_invoked = False + + def _release(self): + """Release the underlying C Engine resource.""" + if not self._release_invoked: + for c in self._children: + c._release() + self._free_resource() + self.connection._releasing(self) + self._release_invoked = True + + def _update_cond(self): + obj2cond(self.condition, self._get_cond_impl()) + + @property + def remote_condition(self): + return cond2obj(self._get_remote_cond_impl()) + + # the following must be provided by subclasses + def _get_cond_impl(self): + assert False, "Subclass must override this!" + + def _get_remote_cond_impl(self): + assert False, "Subclass must override this!" + +class Condition: + + def __init__(self, name, description=None, info=None): + self.name = name + self.description = description + self.info = info + + def __repr__(self): + return "Condition(%s)" % ", ".join([repr(x) for x in + (self.name, self.description, self.info) + if x]) + + def __eq__(self, o): + if not isinstance(o, Condition): return False + return self.name == o.name and \ + self.description == o.description and \ + self.info == o.info + +def obj2cond(obj, cond): + pn_condition_clear(cond) + if obj: + pn_condition_set_name(cond, str(obj.name)) + pn_condition_set_description(cond, obj.description) + info = Data(pn_condition_info(cond)) + if obj.info: + info.put_object(obj.info) + +def cond2obj(cond): + if pn_condition_is_set(cond): + return Condition(pn_condition_get_name(cond), + pn_condition_get_description(cond), + dat2obj(pn_condition_info(cond))) + else: + return None + +def dat2obj(dimpl): + if dimpl: + d = Data(dimpl) + d.rewind() + d.next() + obj = d.get_object() + d.rewind() + return obj + +def obj2dat(obj, dimpl): + if obj is not None: + d = Data(dimpl) + d.put_object(obj) + +def secs2millis(secs): + return long(secs*1000) + +def millis2secs(millis): + return float(millis)/1000.0 + +class Connection(Endpoint): + + @staticmethod + def _wrap_connection(c_conn): + """Maintain only a single instance of this class for each Connection + object that exists in the the C Engine. This is done by storing a (weak) + reference to the python instance in the context field of the C object. + """ + if not c_conn: return None + py_conn = pn_void2py(pn_connection_get_context(c_conn)) + if py_conn: return py_conn + wrapper = Connection(_conn=c_conn) + return wrapper + + def __init__(self, _conn=None): + Endpoint.__init__(self) + if _conn: + self._conn = _conn + else: + self._conn = pn_connection() + pn_connection_set_context(self._conn, pn_py2void(self)) + self.offered_capabilities = None + self.desired_capabilities = None + self.properties = None + self._sessions = set() + + def __del__(self): + if hasattr(self, "_conn") and self._conn: + self._release() + + def free(self): + self._release() + + @property + def _children(self): + return self._sessions + + @property + def connection(self): + return self + + def _free_resource(self): + pn_connection_free(self._conn) + + def _released(self): + self._conn = None + + def _releasing(self, child): + coll = getattr(self, "_collector", None) + if coll: coll = coll() + if coll: + coll._contexts.add(child) + else: + child._released() + + def _check(self, err): + if err < 0: + exc = EXCEPTIONS.get(err, ConnectionException) + raise exc("[%s]: %s" % (err, pn_connection_error(self._conn))) + else: + return err + + def _get_cond_impl(self): + return pn_connection_condition(self._conn) + + def _get_remote_cond_impl(self): + return pn_connection_remote_condition(self._conn) + + def collect(self, collector): + if collector is None: + pn_connection_collect(self._conn, None) + else: + pn_connection_collect(self._conn, collector._impl) + self._collector = weakref.ref(collector) + + def _get_container(self): + return pn_connection_get_container(self._conn) + def _set_container(self, name): + return pn_connection_set_container(self._conn, name) + + container = property(_get_container, _set_container) + + def _get_hostname(self): + return pn_connection_get_hostname(self._conn) + def _set_hostname(self, name): + return pn_connection_set_hostname(self._conn, name) + + hostname = property(_get_hostname, _set_hostname) + + @property + def remote_container(self): + return pn_connection_remote_container(self._conn) + + @property + def remote_hostname(self): + return pn_connection_remote_hostname(self._conn) + + @property + def remote_offered_capabilities(self): + return dat2obj(pn_connection_remote_offered_capabilities(self._conn)) + + @property + def remote_desired_capabilities(self): + return dat2obj(pn_connection_remote_desired_capabilities(self._conn)) + + @property + def remote_properties(self): + return dat2obj(pn_connection_remote_properties(self._conn)) + + def open(self): + obj2dat(self.offered_capabilities, + pn_connection_offered_capabilities(self._conn)) + obj2dat(self.desired_capabilities, + pn_connection_desired_capabilities(self._conn)) + obj2dat(self.properties, pn_connection_properties(self._conn)) + pn_connection_open(self._conn) + + def close(self): + self._update_cond() + pn_connection_close(self._conn) + + @property + def state(self): + return pn_connection_state(self._conn) + + def session(self): + return Session._wrap_session(pn_session(self._conn)) + + def session_head(self, mask): + return Session._wrap_session(pn_session_head(self._conn, mask)) + + def link_head(self, mask): + return Link._wrap_link(pn_link_head(self._conn, mask)) + + @property + def work_head(self): + return Delivery._wrap_delivery(pn_work_head(self._conn)) + + @property + def error(self): + return pn_error_code(pn_connection_error(self._conn)) + +class SessionException(ProtonException): + pass + +class Session(Endpoint): + + @staticmethod + def _wrap_session(c_ssn): + """Maintain only a single instance of this class for each Session object that + exists in the C Engine. + """ + if c_ssn is None: return None + py_ssn = pn_void2py(pn_session_get_context(c_ssn)) + if py_ssn: return py_ssn + wrapper = Session(c_ssn) + return wrapper + + def __init__(self, ssn): + Endpoint.__init__(self) + self._ssn = ssn + pn_session_set_context(self._ssn, pn_py2void(self)) + self._links = set() + self.connection._sessions.add(self) + + @property + def _children(self): + return self._links + + def _free_resource(self): + pn_session_free(self._ssn) + + def _released(self): + self._ssn = None + + def free(self): + """Release the Session, freeing its resources. + + Call this when you no longer need the session. This will allow the + session's resources to be reclaimed. Once called, you should no longer + reference the session. + + """ + self.connection._sessions.remove(self) + self._release() + + def _get_cond_impl(self): + return pn_session_condition(self._ssn) + + def _get_remote_cond_impl(self): + return pn_session_remote_condition(self._ssn) + + def _get_incoming_capacity(self): + return pn_session_get_incoming_capacity(self._ssn) + + def _set_incoming_capacity(self, capacity): + pn_session_set_incoming_capacity(self._ssn, capacity) + + incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity) + + @property + def outgoing_bytes(self): + return pn_session_outgoing_bytes(self._ssn) + + @property + def incoming_bytes(self): + return pn_session_incoming_bytes(self._ssn) + + def open(self): + pn_session_open(self._ssn) + + def close(self): + self._update_cond() + pn_session_close(self._ssn) + + def next(self, mask): + return Session._wrap_session(pn_session_next(self._ssn, mask)) + + @property + def state(self): + return pn_session_state(self._ssn) + + @property + def connection(self): + return Connection._wrap_connection(pn_session_connection(self._ssn)) + + def sender(self, name): + return Link._wrap_link(pn_sender(self._ssn, name)) + + def receiver(self, name): + return Link._wrap_link(pn_receiver(self._ssn, name)) + +class LinkException(ProtonException): + pass + +class Link(Endpoint): + + SND_UNSETTLED = PN_SND_UNSETTLED + SND_SETTLED = PN_SND_SETTLED + SND_MIXED = PN_SND_MIXED + + RCV_FIRST = PN_RCV_FIRST + RCV_SECOND = PN_RCV_SECOND + + @staticmethod + def _wrap_link(c_link): + """Maintain only a single instance of this class for each Session object that + exists in the C Engine. + """ + if c_link is None: return None + py_link = pn_void2py(pn_link_get_context(c_link)) + if py_link: return py_link + if pn_link_is_sender(c_link): + wrapper = Sender(c_link) + else: + wrapper = Receiver(c_link) + return wrapper + + def __init__(self, c_link): + Endpoint.__init__(self) + self._link = c_link + pn_link_set_context(self._link, pn_py2void(self)) + self._deliveries = set() + self.session._links.add(self) + + @property + def _children(self): + return self._deliveries + + def _free_resource(self): + pn_link_free(self._link) + + def _released(self): + self._link = None + + def free(self): + """Release the Link, freeing its resources""" + self.session._links.remove(self) + self._release() + + def _check(self, err): + if err < 0: + exc = EXCEPTIONS.get(err, LinkException) + raise exc("[%s]: %s" % (err, pn_link_error(self._link))) + else: + return err + + def _get_cond_impl(self): + return pn_link_condition(self._link) + + def _get_remote_cond_impl(self): + return pn_link_remote_condition(self._link) + + def open(self): + pn_link_open(self._link) + + def close(self): + self._update_cond() + pn_link_close(self._link) + + @property + def state(self): + return pn_link_state(self._link) + + @property + def source(self): + return Terminus(pn_link_source(self._link)) + + @property + def target(self): + return Terminus(pn_link_target(self._link)) + + @property + def remote_source(self): + return Terminus(pn_link_remote_source(self._link)) + @property + def remote_target(self): + return Terminus(pn_link_remote_target(self._link)) + + @property + def session(self): + return Session._wrap_session(pn_link_session(self._link)) + + @property + def connection(self): + return self.session.connection + + def delivery(self, tag): + return Delivery._wrap_delivery(pn_delivery(self._link, tag)) + + @property + def current(self): + return Delivery._wrap_delivery(pn_link_current(self._link)) + + def advance(self): + return pn_link_advance(self._link) + + @property + def unsettled(self): + return pn_link_unsettled(self._link) + + @property + def credit(self): + return pn_link_credit(self._link) + + @property + def available(self): + return pn_link_available(self._link) + + @property + def queued(self): + return pn_link_queued(self._link) + + def next(self, mask): + return Link._wrap_link(pn_link_next(self._link, mask)) + + @property + def name(self): + return pn_link_name(self._link) + + @property + def is_sender(self): + return pn_link_is_sender(self._link) + + @property + def is_receiver(self): + return pn_link_is_receiver(self._link) + + @property + def remote_snd_settle_mode(self): + return pn_link_remote_snd_settle_mode(self._link) + + @property + def remote_rcv_settle_mode(self): + return pn_link_remote_rcv_settle_mode(self._link) + + def _get_snd_settle_mode(self): + return pn_link_snd_settle_mode(self._link) + def _set_snd_settle_mode(self, mode): + pn_link_set_snd_settle_mode(self._link, mode) + snd_settle_mode = property(_get_snd_settle_mode, _set_snd_settle_mode) + + def _get_rcv_settle_mode(self): + return pn_link_rcv_settle_mode(self._link) + def _set_rcv_settle_mode(self, mode): + pn_link_set_rcv_settle_mode(self._link, mode) + rcv_settle_mode = property(_get_rcv_settle_mode, _set_rcv_settle_mode) + + def drained(self): + return pn_link_drained(self._link) + + def detach(self): + return pn_link_detach(self._link) + +class Terminus(object): + + UNSPECIFIED = PN_UNSPECIFIED + SOURCE = PN_SOURCE + TARGET = PN_TARGET + COORDINATOR = PN_COORDINATOR + + NONDURABLE = PN_NONDURABLE + CONFIGURATION = PN_CONFIGURATION + DELIVERIES = PN_DELIVERIES + + DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED + DIST_MODE_COPY = PN_DIST_MODE_COPY + DIST_MODE_MOVE = PN_DIST_MODE_MOVE + + def __init__(self, impl): + self._impl = impl + + def _check(self, err): + if err < 0: + exc = EXCEPTIONS.get(err, LinkException) + raise exc("[%s]" % err) + else: + return err + + def _get_type(self): + return pn_terminus_get_type(self._impl) + def _set_type(self, type): + self._check(pn_terminus_set_type(self._impl, type)) + type = property(_get_type, _set_type) + + def _get_address(self): + return pn_terminus_get_address(self._impl) + def _set_address(self, address): + self._check(pn_terminus_set_address(self._impl, address)) + address = property(_get_address, _set_address) + + def _get_durability(self): + return pn_terminus_get_durability(self._impl) + def _set_durability(self, seconds): + self._check(pn_terminus_set_durability(self._impl, seconds)) + durability = property(_get_durability, _set_durability) + + def _get_expiry_policy(self): + return pn_terminus_get_expiry_policy(self._impl) + def _set_expiry_policy(self, seconds): + self._check(pn_terminus_set_expiry_policy(self._impl, seconds)) + expiry_policy = property(_get_expiry_policy, _set_expiry_policy) + + def _get_timeout(self): + return pn_terminus_get_timeout(self._impl) + def _set_timeout(self, seconds): + self._check(pn_terminus_set_timeout(self._impl, seconds)) + timeout = property(_get_timeout, _set_timeout) + + def _is_dynamic(self): + return pn_terminus_is_dynamic(self._impl) + def _set_dynamic(self, dynamic): + self._check(pn_terminus_set_dynamic(self._impl, dynamic)) + dynamic = property(_is_dynamic, _set_dynamic) + + def _get_distribution_mode(self): + return pn_terminus_get_distribution_mode(self._impl) + def _set_distribution_mode(self, mode): + self._check(pn_terminus_set_distribution_mode(self._impl, mode)) + distribution_mode = property(_get_distribution_mode, _set_distribution_mode) + + @property + def properties(self): + return Data(pn_terminus_properties(self._impl)) + + @property + def capabilities(self): + return Data(pn_terminus_capabilities(self._impl)) + + @property + def outcomes(self): + return Data(pn_terminus_outcomes(self._impl)) + + @property + def filter(self): + return Data(pn_terminus_filter(self._impl)) + + def copy(self, src): + self._check(pn_terminus_copy(self._impl, src._impl)) + +class Sender(Link): + + def __init__(self, c_link): + super(Sender, self).__init__(c_link) + + def offered(self, n): + pn_link_offered(self._link, n) + + def send(self, bytes): + return self._check(pn_link_send(self._link, bytes)) + +class Receiver(Link): + + def __init__(self, c_link): + super(Receiver, self).__init__(c_link) + + def flow(self, n): + pn_link_flow(self._link, n) + + def recv(self, limit): + n, bytes = pn_link_recv(self._link, limit) + if n == PN_EOS: + return None + else: + self._check(n) + return bytes + + def drain(self, n): + pn_link_drain(self._link, n) + + def draining(self): + return pn_link_draining(self._link) + +class NamedInt(int): + + values = {} + + def __new__(cls, i, name): + ni = super(NamedInt, cls).__new__(cls, i) + cls.values[i] = ni + return ni + + def __init__(self, i, name): + self.name = name + + def __repr__(self): + return self.name + + def __str__(self): + return self.name + + @classmethod + def get(cls, i): + return cls.values.get(i, i) + +class DispositionType(NamedInt): + values = {} + +class Disposition(object): + + RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED") + ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED") + REJECTED = DispositionType(PN_REJECTED, "REJECTED") + RELEASED = DispositionType(PN_RELEASED, "RELEASED") + MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED") + + def __init__(self, impl, local): + self._impl = impl + self.local = local + self._data = None + self._condition = None + self._annotations = None + + @property + def type(self): + return DispositionType.get(pn_disposition_type(self._impl)) + + def _get_section_number(self): + return pn_disposition_get_section_number(self._impl) + def _set_section_number(self, n): + pn_disposition_set_section_number(self._impl, n) + section_number = property(_get_section_number, _set_section_number) + + def _get_section_offset(self): + return pn_disposition_get_section_offset(self._impl) + def _set_section_offset(self, n): + pn_disposition_set_section_offset(self._impl, n) + section_offset = property(_get_section_offset, _set_section_offset) + + def _get_failed(self): + return pn_disposition_is_failed(self._impl) + def _set_failed(self, b): + pn_disposition_set_failed(self._impl, b) + failed = property(_get_failed, _set_failed) + + def _get_undeliverable(self): + return pn_disposition_is_undeliverable(self._impl) + def _set_undeliverable(self, b): + pn_disposition_set_undeliverable(self._impl, b) + undeliverable = property(_get_undeliverable, _set_undeliverable) + + def _get_data(self): + if self.local: + return self._data + else: + return dat2obj(pn_disposition_data(self._impl)) + def _set_data(self, obj): + if self.local: + self._data = obj + else: + raise AttributeError("data attribute is read-only") + data = property(_get_data, _set_data) + + def _get_annotations(self): + if self.local: + return self._annotations + else: + return dat2obj(pn_disposition_annotations(self._impl)) + def _set_annotations(self, obj): + if self.local: + self._annotations = obj + else: + raise AttributeError("annotations attribute is read-only") + annotations = property(_get_annotations, _set_annotations) + + def _get_condition(self): + if self.local: + return self._condition + else: + return cond2obj(pn_disposition_condition(self._impl)) + def _set_condition(self, obj): + if self.local: + self._condition = obj + else: + raise AttributeError("condition attribute is read-only") + condition = property(_get_condition, _set_condition) + +class Delivery(object): + + RECEIVED = Disposition.RECEIVED + ACCEPTED = Disposition.ACCEPTED + REJECTED = Disposition.REJECTED + RELEASED = Disposition.RELEASED + MODIFIED = Disposition.MODIFIED + + @staticmethod + def _wrap_delivery(c_dlv): + """Maintain only a single instance of this class for each Delivery object that + exists in the C Engine. + """ + if not c_dlv: return None + py_dlv = pn_void2py(pn_delivery_get_context(c_dlv)) + if py_dlv: return py_dlv + wrapper = Delivery(c_dlv) + return wrapper + + def __init__(self, dlv): + self._dlv = dlv + pn_delivery_set_context(self._dlv, pn_py2void(self)) + self.local = Disposition(pn_delivery_local(self._dlv), True) + self.remote = Disposition(pn_delivery_remote(self._dlv), False) + self.link._deliveries.add(self) + + def __del__(self): + self._release() + + def _release(self): + """Release the underlying C Engine resource.""" + if self._dlv: + pn_delivery_set_context(self._dlv, pn_py2void(None)) + pn_delivery_settle(self._dlv) + self._dlv = None + + @property + def released(self): + return self._dlv is None + + @property + def tag(self): + return pn_delivery_tag(self._dlv) + + @property + def writable(self): + return pn_delivery_writable(self._dlv) + + @property + def readable(self): + return pn_delivery_readable(self._dlv) + + @property + def updated(self): + return pn_delivery_updated(self._dlv) + + def update(self, state): + obj2dat(self.local._data, pn_disposition_data(self.local._impl)) + obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl)) + obj2cond(self.local._condition, pn_disposition_condition(self.local._impl)) + pn_delivery_update(self._dlv, state) + + @property + def pending(self): + return pn_delivery_pending(self._dlv) + + @property + def partial(self): + return pn_delivery_partial(self._dlv) + + @property + def local_state(self): + return DispositionType.get(pn_delivery_local_state(self._dlv)) + + @property + def remote_state(self): + return DispositionType.get(pn_delivery_remote_state(self._dlv)) + + @property + def settled(self): + return pn_delivery_settled(self._dlv) + + def settle(self): + """Release the delivery""" + self.link._deliveries.remove(self) + self._release() + + @property + def work_next(self): + return Delivery._wrap_delivery(pn_work_next(self._dlv)) + + @property + def link(self): + return Link._wrap_link(pn_delivery_link(self._dlv)) + +class TransportException(ProtonException): + pass + +class Transport(object): + + TRACE_OFF = PN_TRACE_OFF + TRACE_DRV = PN_TRACE_DRV + TRACE_FRM = PN_TRACE_FRM + TRACE_RAW = PN_TRACE_RAW + + CLIENT = 1 + SERVER = 2 + + @staticmethod + def _wrap_transport(c_trans): + if not c_trans: return None + wrapper = Transport(_trans=c_trans) + return wrapper + + def __init__(self, mode=None, _trans=None): + if not mode and not _trans: + self._trans = pn_transport() + elif not mode: + self._shared_trans = True + self._trans = _trans + elif mode==Transport.CLIENT: + self._trans = pn_transport() + elif mode==Transport.SERVER: + self._trans = pn_transport() + pn_transport_set_server(self._trans) + else: + raise TransportException("Cannot initialise Transport from mode: %s" % str(mode)) + self._sasl = None + self._ssl = None + + def __del__(self): + if hasattr(self, "_trans"): + if not hasattr(self, "_shared_trans"): + pn_transport_free(self._trans) + if hasattr(self, "_sasl") and self._sasl: + # pn_transport_free deallocs the C sasl associated with the + # transport, so erase the reference if a SASL object was used. + self._sasl._sasl = None + self._sasl = None + if hasattr(self, "_ssl") and self._ssl: + # ditto the owned c SSL object + self._ssl._ssl = None + self._ssl = None + del self._trans + + def _check(self, err): + if err < 0: + exc = EXCEPTIONS.get(err, TransportException) + raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._trans)))) + else: + return err + + def bind(self, connection): + """Assign a connection to the transport""" + self._check(pn_transport_bind(self._trans, connection._conn)) + # keep python connection from being garbage collected: + self._connection = connection + + def unbind(self): + """Release the connection""" + self._check(pn_transport_unbind(self._trans)) + self._connection = None + + def trace(self, n): + pn_transport_trace(self._trans, n) + + def tick(self, now): + """Process any timed events (like heartbeat generation). + now = seconds since epoch (float). + """ + return millis2secs(pn_transport_tick(self._trans, secs2millis(now))) + + def capacity(self): + c = pn_transport_capacity(self._trans) + if c >= PN_EOS: + return c + else: + return self._check(c) + + def push(self, bytes): + n = self._check(pn_transport_push(self._trans, bytes)) + if n != len(bytes): + raise OverflowError("unable to process all bytes") + + def close_tail(self): + self._check(pn_transport_close_tail(self._trans)) + + def pending(self): + p = pn_transport_pending(self._trans) + if p >= PN_EOS: + return p + else: + return self._check(p) + + def peek(self, size): + cd, out = pn_transport_peek(self._trans, size) + if cd == PN_EOS: + return None + else: + self._check(cd) + return out + + def pop(self, size): + pn_transport_pop(self._trans, size) + + def close_head(self): + self._check(pn_transport_close_head(self._trans)) + + @property + def closed(self): + return pn_transport_closed(self._trans) + + # AMQP 1.0 max-frame-size + def _get_max_frame_size(self): + return pn_transport_get_max_frame(self._tra
<TRUNCATED> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
