http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_common.py ---------------------------------------------------------------------- diff --git a/python/proton/_common.py b/python/proton/_common.py new file mode 100644 index 0000000..3715c6a --- /dev/null +++ b/python/proton/_common.py @@ -0,0 +1,91 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +# +# Hacks to provide Python2 <---> Python3 compatibility +# +# The results are +# | |long|unicode| +# |python2|long|unicode| +# |python3| int| str| +try: + long() +except NameError: + long = int +try: + unicode() +except NameError: + unicode = str + + +def isinteger(value): + return isinstance(value, (int, long)) + + +def isstring(value): + return isinstance(value, (str, unicode)) + + +class Constant(object): + + def __init__(self, name): + self.name = name + + def __repr__(self): + return self.name + + +def secs2millis(secs): + return long(secs * 1000) + + +def millis2secs(millis): + return float(millis) / 1000.0 + + +def unicode2utf8(string): + """Some Proton APIs expect a null terminated string. Convert python text + types to UTF8 to avoid zero bytes introduced by other multi-byte encodings. + This method will throw if the string cannot be converted. + """ + if string is None: + return None + elif isinstance(string, str): + # Must be py2 or py3 str + # The swig binding converts py3 str -> utf8 char* and back sutomatically + return string + elif isinstance(string, unicode): + # This must be python2 unicode as we already detected py3 str above + return string.encode('utf-8') + # Anything else illegal - specifically python3 bytes + raise TypeError("Unrecognized string type: %r (%s)" % (string, type(string))) + + +def utf82unicode(string): + """Convert C strings returned from proton-c into python unicode""" + if string is None: + return None + elif isinstance(string, unicode): + # py2 unicode, py3 str (via hack definition) + return string + elif isinstance(string, bytes): + # py2 str (via hack definition), py3 bytes + return string.decode('utf8') + raise TypeError("Unrecognized string type")
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_compat.py ---------------------------------------------------------------------- diff --git a/python/proton/_compat.py b/python/proton/_compat.py index afd82e3..eae4c84 100644 --- a/python/proton/_compat.py +++ b/python/proton/_compat.py @@ -32,8 +32,6 @@ except ImportError: PY3 = sys.version_info[0] == 3 if PY3: - string_types = (str,) - def raise_(t, v=None, tb=None): """Mimic the old 2.x raise behavior: Raise an exception of type t with value v using optional traceback tb @@ -45,23 +43,22 @@ if PY3: else: raise v.with_traceback(tb) + def iteritems(d, **kw): return iter(d.items(**kw)) + unichr = chr else: - # includes both unicode and non-unicode strings: - string_types = (basestring,) - # the raise syntax will cause a parse error in Py3, so 'sneak' in a # definition that won't cause the parser to barf - exec("""def raise_(t, v=None, tb=None): + exec ("""def raise_(t, v=None, tb=None): raise t, v, tb """) + def iteritems(d, **kw): return d.iteritems(**kw) - unichr = unichr -__all__ = [ 'PY3', 'queue', 'string_types', 'raise_', 'iteritems', 'unichr'] \ No newline at end of file + unichr = unichr http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_condition.py ---------------------------------------------------------------------- diff --git a/python/proton/_condition.py b/python/proton/_condition.py new file mode 100644 index 0000000..e5dbde9 --- /dev/null +++ b/python/proton/_condition.py @@ -0,0 +1,63 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import absolute_import + +from cproton import pn_condition_clear, pn_condition_set_name, pn_condition_set_description, pn_condition_info, \ + pn_condition_is_set, pn_condition_get_name, pn_condition_get_description + +from ._data import Data, dat2obj + + +class Condition: + + def __init__(self, name, description=None, info=None): + self.name = name + self.description = description + self.info = info + + def __repr__(self): + return "Condition(%s)" % ", ".join([repr(x) for x in + (self.name, self.description, self.info) + if x]) + + def __eq__(self, o): + if not isinstance(o, Condition): return False + return self.name == o.name and \ + self.description == o.description and \ + self.info == o.info + + +def obj2cond(obj, cond): + pn_condition_clear(cond) + if obj: + pn_condition_set_name(cond, str(obj.name)) + pn_condition_set_description(cond, obj.description) + info = Data(pn_condition_info(cond)) + if obj.info: + info.put_object(obj.info) + + +def cond2obj(cond): + if pn_condition_is_set(cond): + return Condition(pn_condition_get_name(cond), + pn_condition_get_description(cond), + dat2obj(pn_condition_info(cond))) + else: + return None http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_data.py ---------------------------------------------------------------------- diff --git a/python/proton/_data.py b/python/proton/_data.py new file mode 100644 index 0000000..f4ad381 --- /dev/null +++ b/python/proton/_data.py @@ -0,0 +1,1129 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import absolute_import + +import uuid + +from cproton import PN_TIMESTAMP, PN_FLOAT, PN_DESCRIBED, PN_DECIMAL64, PN_UBYTE, PN_UUID, PN_NULL, PN_BINARY, \ + PN_LIST, PN_OVERFLOW, PN_MAP, PN_LONG, PN_SHORT, PN_CHAR, PN_UINT, PN_ULONG, PN_STRING, PN_USHORT, PN_DOUBLE, \ + PN_BYTE, PN_DECIMAL32, PN_DECIMAL128, PN_ARRAY, PN_SYMBOL, PN_BOOL, PN_INT, \ + pn_data_get_binary, pn_data_get_decimal64, pn_data_put_symbol, pn_data_put_float, \ + pn_data_is_array_described, pn_data_exit, pn_data_put_uint, pn_data_put_decimal128, \ + pn_data_lookup, pn_data_put_char, pn_data_encoded_size, pn_data_get_bool, \ + pn_data_get_short, pn_data_prev, pn_data_type, pn_data_widen, pn_data_put_decimal64, \ + pn_data_put_string, pn_data_get_array, pn_data_put_ulong, pn_data_get_byte, pn_data_get_symbol, pn_data_encode, \ + pn_data_rewind, pn_data_put_bool, pn_data_is_null, pn_data_error, \ + pn_data_put_double, pn_data_copy, pn_data_put_int, pn_data_get_ubyte, pn_data_free, pn_data_clear, \ + pn_data_get_double, pn_data_put_byte, pn_data_put_uuid, pn_data_put_ushort, pn_data_is_described, \ + pn_data_get_float, pn_data_get_uint, pn_data_put_described, pn_data_get_decimal128, pn_data, \ + pn_data_get_array_type, pn_data_put_map, pn_data_put_list, pn_data_get_string, pn_data_get_char, \ + pn_data_put_decimal32, pn_data_enter, pn_data_put_short, pn_data_put_timestamp, \ + pn_data_get_long, pn_data_get_map, pn_data_narrow, pn_data_put_array, pn_data_get_ushort, \ + pn_data_get_int, pn_data_get_list, pn_data_get_ulong, pn_data_put_ubyte, \ + pn_data_format, pn_data_dump, pn_data_get_uuid, pn_data_get_decimal32, \ + pn_data_put_binary, pn_data_get_timestamp, pn_data_decode, pn_data_next, pn_data_put_null, pn_data_put_long, \ + pn_error_text + +from ._common import Constant +from ._exceptions import EXCEPTIONS, DataException + +from . import _compat + +# +# Hacks to provide Python2 <---> Python3 compatibility +# +# The results are +# | |long|unicode| +# |python2|long|unicode| +# |python3| int| str| +try: + long() +except NameError: + long = int +try: + unicode() +except NameError: + unicode = str + + +class UnmappedType: + + def __init__(self, msg): + self.msg = msg + + def __repr__(self): + return "UnmappedType(%s)" % self.msg + + +class ulong(long): + + def __repr__(self): + return "ulong(%s)" % long.__repr__(self) + + +class timestamp(long): + + def __repr__(self): + return "timestamp(%s)" % long.__repr__(self) + + +class symbol(unicode): + + def __repr__(self): + return "symbol(%s)" % unicode.__repr__(self) + + +class char(unicode): + + def __repr__(self): + return "char(%s)" % unicode.__repr__(self) + + +class byte(int): + + def __repr__(self): + return "byte(%s)" % int.__repr__(self) + + +class short(int): + + def __repr__(self): + return "short(%s)" % int.__repr__(self) + + +class int32(int): + + def __repr__(self): + return "int32(%s)" % int.__repr__(self) + + +class ubyte(int): + + def __repr__(self): + return "ubyte(%s)" % int.__repr__(self) + + +class ushort(int): + + def __repr__(self): + return "ushort(%s)" % int.__repr__(self) + + +class uint(long): + + def __repr__(self): + return "uint(%s)" % long.__repr__(self) + + +class float32(float): + + def __repr__(self): + return "float32(%s)" % float.__repr__(self) + + +class decimal32(int): + + def __repr__(self): + return "decimal32(%s)" % int.__repr__(self) + + +class decimal64(long): + + def __repr__(self): + return "decimal64(%s)" % long.__repr__(self) + + +class decimal128(bytes): + + def __repr__(self): + return "decimal128(%s)" % bytes.__repr__(self) + + +class Described(object): + + def __init__(self, descriptor, value): + self.descriptor = descriptor + self.value = value + + def __repr__(self): + return "Described(%r, %r)" % (self.descriptor, self.value) + + def __eq__(self, o): + if isinstance(o, Described): + return self.descriptor == o.descriptor and self.value == o.value + else: + return False + + +UNDESCRIBED = Constant("UNDESCRIBED") + + +class Array(object): + + def __init__(self, descriptor, type, *elements): + self.descriptor = descriptor + self.type = type + self.elements = elements + + def __iter__(self): + return iter(self.elements) + + def __repr__(self): + if self.elements: + els = ", %s" % (", ".join(map(repr, self.elements))) + else: + els = "" + return "Array(%r, %r%s)" % (self.descriptor, self.type, els) + + def __eq__(self, o): + if isinstance(o, Array): + return self.descriptor == o.descriptor and \ + self.type == o.type and self.elements == o.elements + else: + return False + + +class Data: + """ + The L{Data} class provides an interface for decoding, extracting, + creating, and encoding arbitrary AMQP data. A L{Data} object + contains a tree of AMQP values. Leaf nodes in this tree correspond + to scalars in the AMQP type system such as L{ints<INT>} or + L{strings<STRING>}. Non-leaf nodes in this tree correspond to + compound values in the AMQP type system such as L{lists<LIST>}, + L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}. + The root node of the tree is the L{Data} object itself and can have + an arbitrary number of children. + + A L{Data} object maintains the notion of the current sibling node + and a current parent node. Siblings are ordered within their parent. + Values are accessed and/or added by using the L{next}, L{prev}, + L{enter}, and L{exit} methods to navigate to the desired location in + the tree and using the supplied variety of put_*/get_* methods to + access or add a value of the desired type. + + The put_* methods will always add a value I{after} the current node + in the tree. If the current node has a next sibling the put_* method + will overwrite the value on this node. If there is no current node + or the current node has no next sibling then one will be added. The + put_* methods always set the added/modified node to the current + node. The get_* methods read the value of the current node and do + not change which node is current. + + The following types of scalar values are supported: + + - L{NULL} + - L{BOOL} + - L{UBYTE} + - L{USHORT} + - L{SHORT} + - L{UINT} + - L{INT} + - L{ULONG} + - L{LONG} + - L{FLOAT} + - L{DOUBLE} + - L{BINARY} + - L{STRING} + - L{SYMBOL} + + The following types of compound values are supported: + + - L{DESCRIBED} + - L{ARRAY} + - L{LIST} + - L{MAP} + """ + + NULL = PN_NULL; "A null value." + BOOL = PN_BOOL; "A boolean value." + UBYTE = PN_UBYTE; "An unsigned byte value." + BYTE = PN_BYTE; "A signed byte value." + USHORT = PN_USHORT; "An unsigned short value." + SHORT = PN_SHORT; "A short value." + UINT = PN_UINT; "An unsigned int value." + INT = PN_INT; "A signed int value." + CHAR = PN_CHAR; "A character value." + ULONG = PN_ULONG; "An unsigned long value." + LONG = PN_LONG; "A signed long value." + TIMESTAMP = PN_TIMESTAMP; "A timestamp value." + FLOAT = PN_FLOAT; "A float value." + DOUBLE = PN_DOUBLE; "A double value." + DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value." + DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value." + DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value." + UUID = PN_UUID; "A UUID value." + BINARY = PN_BINARY; "A binary string." + STRING = PN_STRING; "A unicode string." + SYMBOL = PN_SYMBOL; "A symbolic string." + DESCRIBED = PN_DESCRIBED; "A described value." + ARRAY = PN_ARRAY; "An array value." + LIST = PN_LIST; "A list value." + MAP = PN_MAP; "A map value." + + type_names = { + NULL: "null", + BOOL: "bool", + BYTE: "byte", + UBYTE: "ubyte", + SHORT: "short", + USHORT: "ushort", + INT: "int", + UINT: "uint", + CHAR: "char", + LONG: "long", + ULONG: "ulong", + TIMESTAMP: "timestamp", + FLOAT: "float", + DOUBLE: "double", + DECIMAL32: "decimal32", + DECIMAL64: "decimal64", + DECIMAL128: "decimal128", + UUID: "uuid", + BINARY: "binary", + STRING: "string", + SYMBOL: "symbol", + DESCRIBED: "described", + ARRAY: "array", + LIST: "list", + MAP: "map" + } + + @classmethod + def type_name(type): + return Data.type_names[type] + + def __init__(self, capacity=16): + if isinstance(capacity, (int, long)): + self._data = pn_data(capacity) + self._free = True + else: + self._data = capacity + self._free = False + + def __del__(self): + if self._free and hasattr(self, "_data"): + pn_data_free(self._data) + del self._data + + def _check(self, err): + if err < 0: + exc = EXCEPTIONS.get(err, DataException) + raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data)))) + else: + return err + + def clear(self): + """ + Clears the data object. + """ + pn_data_clear(self._data) + + def rewind(self): + """ + Clears current node and sets the parent to the root node. Clearing the + current node sets it _before_ the first node, calling next() will advance to + the first node. + """ + assert self._data is not None + pn_data_rewind(self._data) + + def next(self): + """ + Advances the current node to its next sibling and returns its + type. If there is no next sibling the current node remains + unchanged and None is returned. + """ + found = pn_data_next(self._data) + if found: + return self.type() + else: + return None + + def prev(self): + """ + Advances the current node to its previous sibling and returns its + type. If there is no previous sibling the current node remains + unchanged and None is returned. + """ + found = pn_data_prev(self._data) + if found: + return self.type() + else: + return None + + def enter(self): + """ + Sets the parent node to the current node and clears the current node. + Clearing the current node sets it _before_ the first child, + call next() advances to the first child. + """ + return pn_data_enter(self._data) + + def exit(self): + """ + Sets the current node to the parent node and the parent node to + its own parent. + """ + return pn_data_exit(self._data) + + def lookup(self, name): + return pn_data_lookup(self._data, name) + + def narrow(self): + pn_data_narrow(self._data) + + def widen(self): + pn_data_widen(self._data) + + def type(self): + """ + Returns the type of the current node. + """ + dtype = pn_data_type(self._data) + if dtype == -1: + return None + else: + return dtype + + def encoded_size(self): + """ + Returns the size in bytes needed to encode the data in AMQP format. + """ + return pn_data_encoded_size(self._data) + + def encode(self): + """ + Returns a representation of the data encoded in AMQP format. + """ + size = 1024 + while True: + cd, enc = pn_data_encode(self._data, size) + if cd == PN_OVERFLOW: + size *= 2 + elif cd >= 0: + return enc + else: + self._check(cd) + + def decode(self, encoded): + """ + Decodes the first value from supplied AMQP data and returns the + number of bytes consumed. + + @type encoded: binary + @param encoded: AMQP encoded binary data + """ + return self._check(pn_data_decode(self._data, encoded)) + + def put_list(self): + """ + Puts a list value. Elements may be filled by entering the list + node and putting element values. + + >>> data = Data() + >>> data.put_list() + >>> data.enter() + >>> data.put_int(1) + >>> data.put_int(2) + >>> data.put_int(3) + >>> data.exit() + """ + self._check(pn_data_put_list(self._data)) + + def put_map(self): + """ + Puts a map value. Elements may be filled by entering the map node + and putting alternating key value pairs. + + >>> data = Data() + >>> data.put_map() + >>> data.enter() + >>> data.put_string("key") + >>> data.put_string("value") + >>> data.exit() + """ + self._check(pn_data_put_map(self._data)) + + def put_array(self, described, element_type): + """ + Puts an array value. Elements may be filled by entering the array + node and putting the element values. The values must all be of the + specified array element type. If an array is described then the + first child value of the array is the descriptor and may be of any + type. + + >>> data = Data() + >>> + >>> data.put_array(False, Data.INT) + >>> data.enter() + >>> data.put_int(1) + >>> data.put_int(2) + >>> data.put_int(3) + >>> data.exit() + >>> + >>> data.put_array(True, Data.DOUBLE) + >>> data.enter() + >>> data.put_symbol("array-descriptor") + >>> data.put_double(1.1) + >>> data.put_double(1.2) + >>> data.put_double(1.3) + >>> data.exit() + + @type described: bool + @param described: specifies whether the array is described + @type element_type: int + @param element_type: the type of the array elements + """ + self._check(pn_data_put_array(self._data, described, element_type)) + + def put_described(self): + """ + Puts a described value. A described node has two children, the + descriptor and the value. These are specified by entering the node + and putting the desired values. + + >>> data = Data() + >>> data.put_described() + >>> data.enter() + >>> data.put_symbol("value-descriptor") + >>> data.put_string("the value") + >>> data.exit() + """ + self._check(pn_data_put_described(self._data)) + + def put_null(self): + """ + Puts a null value. + """ + self._check(pn_data_put_null(self._data)) + + def put_bool(self, b): + """ + Puts a boolean value. + + @param b: a boolean value + """ + self._check(pn_data_put_bool(self._data, b)) + + def put_ubyte(self, ub): + """ + Puts an unsigned byte value. + + @param ub: an integral value + """ + self._check(pn_data_put_ubyte(self._data, ub)) + + def put_byte(self, b): + """ + Puts a signed byte value. + + @param b: an integral value + """ + self._check(pn_data_put_byte(self._data, b)) + + def put_ushort(self, us): + """ + Puts an unsigned short value. + + @param us: an integral value. + """ + self._check(pn_data_put_ushort(self._data, us)) + + def put_short(self, s): + """ + Puts a signed short value. + + @param s: an integral value + """ + self._check(pn_data_put_short(self._data, s)) + + def put_uint(self, ui): + """ + Puts an unsigned int value. + + @param ui: an integral value + """ + self._check(pn_data_put_uint(self._data, ui)) + + def put_int(self, i): + """ + Puts a signed int value. + + @param i: an integral value + """ + self._check(pn_data_put_int(self._data, i)) + + def put_char(self, c): + """ + Puts a char value. + + @param c: a single character + """ + self._check(pn_data_put_char(self._data, ord(c))) + + def put_ulong(self, ul): + """ + Puts an unsigned long value. + + @param ul: an integral value + """ + self._check(pn_data_put_ulong(self._data, ul)) + + def put_long(self, l): + """ + Puts a signed long value. + + @param l: an integral value + """ + self._check(pn_data_put_long(self._data, l)) + + def put_timestamp(self, t): + """ + Puts a timestamp value. + + @param t: an integral value + """ + self._check(pn_data_put_timestamp(self._data, t)) + + def put_float(self, f): + """ + Puts a float value. + + @param f: a floating point value + """ + self._check(pn_data_put_float(self._data, f)) + + def put_double(self, d): + """ + Puts a double value. + + @param d: a floating point value. + """ + self._check(pn_data_put_double(self._data, d)) + + def put_decimal32(self, d): + """ + Puts a decimal32 value. + + @param d: a decimal32 value + """ + self._check(pn_data_put_decimal32(self._data, d)) + + def put_decimal64(self, d): + """ + Puts a decimal64 value. + + @param d: a decimal64 value + """ + self._check(pn_data_put_decimal64(self._data, d)) + + def put_decimal128(self, d): + """ + Puts a decimal128 value. + + @param d: a decimal128 value + """ + self._check(pn_data_put_decimal128(self._data, d)) + + def put_uuid(self, u): + """ + Puts a UUID value. + + @param u: a uuid value + """ + self._check(pn_data_put_uuid(self._data, u.bytes)) + + def put_binary(self, b): + """ + Puts a binary value. + + @type b: binary + @param b: a binary value + """ + self._check(pn_data_put_binary(self._data, b)) + + def put_memoryview(self, mv): + """Put a python memoryview object as an AMQP binary value""" + self.put_binary(mv.tobytes()) + + def put_buffer(self, buff): + """Put a python buffer object as an AMQP binary value""" + self.put_binary(bytes(buff)) + + def put_string(self, s): + """ + Puts a unicode value. + + @type s: unicode + @param s: a unicode value + """ + self._check(pn_data_put_string(self._data, s.encode("utf8"))) + + def put_symbol(self, s): + """ + Puts a symbolic value. + + @type s: string + @param s: the symbol name + """ + self._check(pn_data_put_symbol(self._data, s.encode('ascii'))) + + def get_list(self): + """ + If the current node is a list, return the number of elements, + otherwise return zero. List elements can be accessed by entering + the list. + + >>> count = data.get_list() + >>> data.enter() + >>> for i in range(count): + ... type = data.next() + ... if type == Data.STRING: + ... print data.get_string() + ... elif type == ...: + ... ... + >>> data.exit() + """ + return pn_data_get_list(self._data) + + def get_map(self): + """ + If the current node is a map, return the number of child elements, + otherwise return zero. Key value pairs can be accessed by entering + the map. + + >>> count = data.get_map() + >>> data.enter() + >>> for i in range(count/2): + ... type = data.next() + ... if type == Data.STRING: + ... print data.get_string() + ... elif type == ...: + ... ... + >>> data.exit() + """ + return pn_data_get_map(self._data) + + def get_array(self): + """ + If the current node is an array, return a tuple of the element + count, a boolean indicating whether the array is described, and + the type of each element, otherwise return (0, False, None). Array + data can be accessed by entering the array. + + >>> # read an array of strings with a symbolic descriptor + >>> count, described, type = data.get_array() + >>> data.enter() + >>> data.next() + >>> print "Descriptor:", data.get_symbol() + >>> for i in range(count): + ... data.next() + ... print "Element:", data.get_string() + >>> data.exit() + """ + count = pn_data_get_array(self._data) + described = pn_data_is_array_described(self._data) + type = pn_data_get_array_type(self._data) + if type == -1: + type = None + return count, described, type + + def is_described(self): + """ + Checks if the current node is a described value. The descriptor + and value may be accessed by entering the described value. + + >>> # read a symbolically described string + >>> assert data.is_described() # will error if the current node is not described + >>> data.enter() + >>> data.next() + >>> print data.get_symbol() + >>> data.next() + >>> print data.get_string() + >>> data.exit() + """ + return pn_data_is_described(self._data) + + def is_null(self): + """ + Checks if the current node is a null. + """ + return pn_data_is_null(self._data) + + def get_bool(self): + """ + If the current node is a boolean, returns its value, returns False + otherwise. + """ + return pn_data_get_bool(self._data) + + def get_ubyte(self): + """ + If the current node is an unsigned byte, returns its value, + returns 0 otherwise. + """ + return ubyte(pn_data_get_ubyte(self._data)) + + def get_byte(self): + """ + If the current node is a signed byte, returns its value, returns 0 + otherwise. + """ + return byte(pn_data_get_byte(self._data)) + + def get_ushort(self): + """ + If the current node is an unsigned short, returns its value, + returns 0 otherwise. + """ + return ushort(pn_data_get_ushort(self._data)) + + def get_short(self): + """ + If the current node is a signed short, returns its value, returns + 0 otherwise. + """ + return short(pn_data_get_short(self._data)) + + def get_uint(self): + """ + If the current node is an unsigned int, returns its value, returns + 0 otherwise. + """ + return uint(pn_data_get_uint(self._data)) + + def get_int(self): + """ + If the current node is a signed int, returns its value, returns 0 + otherwise. + """ + return int32(pn_data_get_int(self._data)) + + def get_char(self): + """ + If the current node is a char, returns its value, returns 0 + otherwise. + """ + return char(_compat.unichr(pn_data_get_char(self._data))) + + def get_ulong(self): + """ + If the current node is an unsigned long, returns its value, + returns 0 otherwise. + """ + return ulong(pn_data_get_ulong(self._data)) + + def get_long(self): + """ + If the current node is an signed long, returns its value, returns + 0 otherwise. + """ + return long(pn_data_get_long(self._data)) + + def get_timestamp(self): + """ + If the current node is a timestamp, returns its value, returns 0 + otherwise. + """ + return timestamp(pn_data_get_timestamp(self._data)) + + def get_float(self): + """ + If the current node is a float, returns its value, raises 0 + otherwise. + """ + return float32(pn_data_get_float(self._data)) + + def get_double(self): + """ + If the current node is a double, returns its value, returns 0 + otherwise. + """ + return pn_data_get_double(self._data) + + # XXX: need to convert + def get_decimal32(self): + """ + If the current node is a decimal32, returns its value, returns 0 + otherwise. + """ + return decimal32(pn_data_get_decimal32(self._data)) + + # XXX: need to convert + def get_decimal64(self): + """ + If the current node is a decimal64, returns its value, returns 0 + otherwise. + """ + return decimal64(pn_data_get_decimal64(self._data)) + + # XXX: need to convert + def get_decimal128(self): + """ + If the current node is a decimal128, returns its value, returns 0 + otherwise. + """ + return decimal128(pn_data_get_decimal128(self._data)) + + def get_uuid(self): + """ + If the current node is a UUID, returns its value, returns None + otherwise. + """ + if pn_data_type(self._data) == Data.UUID: + return uuid.UUID(bytes=pn_data_get_uuid(self._data)) + else: + return None + + def get_binary(self): + """ + If the current node is binary, returns its value, returns "" + otherwise. + """ + return pn_data_get_binary(self._data) + + def get_string(self): + """ + If the current node is a string, returns its value, returns "" + otherwise. + """ + return pn_data_get_string(self._data).decode("utf8") + + def get_symbol(self): + """ + If the current node is a symbol, returns its value, returns "" + otherwise. + """ + return symbol(pn_data_get_symbol(self._data).decode('ascii')) + + def copy(self, src): + self._check(pn_data_copy(self._data, src._data)) + + def format(self): + sz = 16 + while True: + err, result = pn_data_format(self._data, sz) + if err == PN_OVERFLOW: + sz *= 2 + continue + else: + self._check(err) + return result + + def dump(self): + pn_data_dump(self._data) + + def put_dict(self, d): + self.put_map() + self.enter() + try: + for k, v in d.items(): + self.put_object(k) + self.put_object(v) + finally: + self.exit() + + def get_dict(self): + if self.enter(): + try: + result = {} + while self.next(): + k = self.get_object() + if self.next(): + v = self.get_object() + else: + v = None + result[k] = v + finally: + self.exit() + return result + + def put_sequence(self, s): + self.put_list() + self.enter() + try: + for o in s: + self.put_object(o) + finally: + self.exit() + + def get_sequence(self): + if self.enter(): + try: + result = [] + while self.next(): + result.append(self.get_object()) + finally: + self.exit() + return result + + def get_py_described(self): + if self.enter(): + try: + self.next() + descriptor = self.get_object() + self.next() + value = self.get_object() + finally: + self.exit() + return Described(descriptor, value) + + def put_py_described(self, d): + self.put_described() + self.enter() + try: + self.put_object(d.descriptor) + self.put_object(d.value) + finally: + self.exit() + + def get_py_array(self): + """ + If the current node is an array, return an Array object + representing the array and its contents. Otherwise return None. + This is a convenience wrapper around get_array, enter, etc. + """ + + count, described, type = self.get_array() + if type is None: return None + if self.enter(): + try: + if described: + self.next() + descriptor = self.get_object() + else: + descriptor = UNDESCRIBED + elements = [] + while self.next(): + elements.append(self.get_object()) + finally: + self.exit() + return Array(descriptor, type, *elements) + + def put_py_array(self, a): + described = a.descriptor != UNDESCRIBED + self.put_array(described, a.type) + self.enter() + try: + if described: + self.put_object(a.descriptor) + for e in a.elements: + self.put_object(e) + finally: + self.exit() + + put_mappings = { + None.__class__: lambda s, _: s.put_null(), + bool: put_bool, + ubyte: put_ubyte, + ushort: put_ushort, + uint: put_uint, + ulong: put_ulong, + byte: put_byte, + short: put_short, + int32: put_int, + long: put_long, + float32: put_float, + float: put_double, + decimal32: put_decimal32, + decimal64: put_decimal64, + decimal128: put_decimal128, + char: put_char, + timestamp: put_timestamp, + uuid.UUID: put_uuid, + bytes: put_binary, + unicode: put_string, + symbol: put_symbol, + list: put_sequence, + tuple: put_sequence, + dict: put_dict, + Described: put_py_described, + Array: put_py_array + } + # for python 3.x, long is merely an alias for int, but for python 2.x + # we need to add an explicit int since it is a different type + if int not in put_mappings: + put_mappings[int] = put_int + # Python >=3.0 has 'memoryview', <=2.5 has 'buffer', >=2.6 has both. + try: + put_mappings[memoryview] = put_memoryview + except NameError: + pass + try: + put_mappings[buffer] = put_buffer + except NameError: + pass + get_mappings = { + NULL: lambda s: None, + BOOL: get_bool, + BYTE: get_byte, + UBYTE: get_ubyte, + SHORT: get_short, + USHORT: get_ushort, + INT: get_int, + UINT: get_uint, + CHAR: get_char, + LONG: get_long, + ULONG: get_ulong, + TIMESTAMP: get_timestamp, + FLOAT: get_float, + DOUBLE: get_double, + DECIMAL32: get_decimal32, + DECIMAL64: get_decimal64, + DECIMAL128: get_decimal128, + UUID: get_uuid, + BINARY: get_binary, + STRING: get_string, + SYMBOL: get_symbol, + DESCRIBED: get_py_described, + ARRAY: get_py_array, + LIST: get_sequence, + MAP: get_dict + } + + def put_object(self, obj): + putter = self.put_mappings[obj.__class__] + putter(self, obj) + + def get_object(self): + type = self.type() + if type is None: return None + getter = self.get_mappings.get(type) + if getter: + return getter(self) + else: + return UnmappedType(str(type)) + + +def dat2obj(dimpl): + if dimpl: + d = Data(dimpl) + d.rewind() + d.next() + obj = d.get_object() + d.rewind() + return obj + + +def obj2dat(obj, dimpl): + if obj is not None: + d = Data(dimpl) + d.put_object(obj) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_delivery.py ---------------------------------------------------------------------- diff --git a/python/proton/_delivery.py b/python/proton/_delivery.py new file mode 100644 index 0000000..e609451 --- /dev/null +++ b/python/proton/_delivery.py @@ -0,0 +1,293 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import absolute_import + +from cproton import PN_REJECTED, PN_RELEASED, PN_MODIFIED, PN_RECEIVED, PN_ACCEPTED, \ + pn_disposition_is_undeliverable, pn_disposition_set_section_number, pn_disposition_get_section_number, \ + pn_disposition_set_undeliverable, pn_disposition_set_failed, pn_disposition_condition, \ + pn_disposition_set_section_offset, pn_disposition_data, pn_disposition_get_section_offset, \ + pn_disposition_is_failed, pn_disposition_annotations, \ + pn_delivery_partial, pn_delivery_aborted, pn_disposition_type, pn_delivery_pending, pn_delivery_updated, \ + pn_delivery_readable, pn_delivery_abort, pn_delivery_remote, pn_delivery_tag, pn_delivery_link, pn_delivery_local, \ + pn_delivery_update, pn_delivery_attachments, pn_delivery_local_state, pn_delivery_settled, pn_delivery_settle, \ + pn_delivery_writable, pn_delivery_remote_state, \ + pn_work_next + +from ._condition import cond2obj, obj2cond +from ._data import dat2obj, obj2dat +from ._wrapper import Wrapper + + +class NamedInt(int): + values = {} # type: Dict[int, str] + + def __new__(cls, i, name): + ni = super(NamedInt, cls).__new__(cls, i) + cls.values[i] = ni + return ni + + def __init__(self, i, name): + self.name = name + + def __repr__(self): + return self.name + + def __str__(self): + return self.name + + @classmethod + def get(cls, i): + return cls.values.get(i, i) + + +class DispositionType(NamedInt): + values = {} + + +class Disposition(object): + RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED") + ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED") + REJECTED = DispositionType(PN_REJECTED, "REJECTED") + RELEASED = DispositionType(PN_RELEASED, "RELEASED") + MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED") + + def __init__(self, impl, local): + self._impl = impl + self.local = local + self._data = None + self._condition = None + self._annotations = None + + @property + def type(self): + return DispositionType.get(pn_disposition_type(self._impl)) + + def _get_section_number(self): + return pn_disposition_get_section_number(self._impl) + + def _set_section_number(self, n): + pn_disposition_set_section_number(self._impl, n) + + section_number = property(_get_section_number, _set_section_number) + + def _get_section_offset(self): + return pn_disposition_get_section_offset(self._impl) + + def _set_section_offset(self, n): + pn_disposition_set_section_offset(self._impl, n) + + section_offset = property(_get_section_offset, _set_section_offset) + + def _get_failed(self): + return pn_disposition_is_failed(self._impl) + + def _set_failed(self, b): + pn_disposition_set_failed(self._impl, b) + + failed = property(_get_failed, _set_failed) + + def _get_undeliverable(self): + return pn_disposition_is_undeliverable(self._impl) + + def _set_undeliverable(self, b): + pn_disposition_set_undeliverable(self._impl, b) + + undeliverable = property(_get_undeliverable, _set_undeliverable) + + def _get_data(self): + if self.local: + return self._data + else: + return dat2obj(pn_disposition_data(self._impl)) + + def _set_data(self, obj): + if self.local: + self._data = obj + else: + raise AttributeError("data attribute is read-only") + + data = property(_get_data, _set_data) + + def _get_annotations(self): + if self.local: + return self._annotations + else: + return dat2obj(pn_disposition_annotations(self._impl)) + + def _set_annotations(self, obj): + if self.local: + self._annotations = obj + else: + raise AttributeError("annotations attribute is read-only") + + annotations = property(_get_annotations, _set_annotations) + + def _get_condition(self): + if self.local: + return self._condition + else: + return cond2obj(pn_disposition_condition(self._impl)) + + def _set_condition(self, obj): + if self.local: + self._condition = obj + else: + raise AttributeError("condition attribute is read-only") + + condition = property(_get_condition, _set_condition) + + +class Delivery(Wrapper): + """ + Tracks and/or records the delivery of a message over a link. + """ + + RECEIVED = Disposition.RECEIVED + ACCEPTED = Disposition.ACCEPTED + REJECTED = Disposition.REJECTED + RELEASED = Disposition.RELEASED + MODIFIED = Disposition.MODIFIED + + @staticmethod + def wrap(impl): + if impl is None: + return None + else: + return Delivery(impl) + + def __init__(self, impl): + Wrapper.__init__(self, impl, pn_delivery_attachments) + + def _init(self): + self.local = Disposition(pn_delivery_local(self._impl), True) + self.remote = Disposition(pn_delivery_remote(self._impl), False) + + @property + def tag(self): + """The identifier for the delivery.""" + return pn_delivery_tag(self._impl) + + @property + def writable(self): + """Returns true for an outgoing delivery to which data can now be written.""" + return pn_delivery_writable(self._impl) + + @property + def readable(self): + """Returns true for an incoming delivery that has data to read.""" + return pn_delivery_readable(self._impl) + + @property + def updated(self): + """Returns true if the state of the delivery has been updated + (e.g. it has been settled and/or accepted, rejected etc).""" + return pn_delivery_updated(self._impl) + + def update(self, state): + """ + Set the local state of the delivery e.g. ACCEPTED, REJECTED, RELEASED. + """ + obj2dat(self.local._data, pn_disposition_data(self.local._impl)) + obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl)) + obj2cond(self.local._condition, pn_disposition_condition(self.local._impl)) + pn_delivery_update(self._impl, state) + + @property + def pending(self): + return pn_delivery_pending(self._impl) + + @property + def partial(self): + """ + Returns true for an incoming delivery if not all the data is + yet available. + """ + return pn_delivery_partial(self._impl) + + @property + def local_state(self): + """Returns the local state of the delivery.""" + return DispositionType.get(pn_delivery_local_state(self._impl)) + + @property + def remote_state(self): + """ + Returns the state of the delivery as indicated by the remote + peer. + """ + return DispositionType.get(pn_delivery_remote_state(self._impl)) + + @property + def settled(self): + """ + Returns true if the delivery has been settled by the remote peer. + """ + return pn_delivery_settled(self._impl) + + def settle(self): + """ + Settles the delivery locally. This indicates the application + considers the delivery complete and does not wish to receive any + further events about it. Every delivery should be settled locally. + """ + pn_delivery_settle(self._impl) + + @property + def aborted(self): + """Returns true if the delivery has been aborted.""" + return pn_delivery_aborted(self._impl) + + def abort(self): + """ + Aborts the delivery. This indicates the application wishes to + invalidate any data that may have already been sent on this delivery. + The delivery cannot be aborted after it has been completely delivered. + """ + pn_delivery_abort(self._impl) + + @property + def work_next(self): + return Delivery.wrap(pn_work_next(self._impl)) + + @property + def link(self): + """ + Returns the link on which the delivery was sent or received. + """ + from . import _endpoints + return _endpoints.Link.wrap(pn_delivery_link(self._impl)) + + @property + def session(self): + """ + Returns the session over which the delivery was sent or received. + """ + return self.link.session + + @property + def connection(self): + """ + Returns the connection over which the delivery was sent or received. + """ + return self.session.connection + + @property + def transport(self): + return self.connection.transport http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_endpoints.py ---------------------------------------------------------------------- diff --git a/python/proton/_endpoints.py b/python/proton/_endpoints.py new file mode 100644 index 0000000..bfa9880 --- /dev/null +++ b/python/proton/_endpoints.py @@ -0,0 +1,765 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +The proton.endpoints module +""" + +from __future__ import absolute_import + +import weakref + +from cproton import PN_LOCAL_UNINIT, PN_REMOTE_UNINIT, PN_LOCAL_ACTIVE, PN_REMOTE_ACTIVE, PN_LOCAL_CLOSED, \ + PN_REMOTE_CLOSED, \ + pn_object_reactor, pn_record_get_handler, pn_record_set_handler, pn_decref, \ + pn_connection, pn_connection_attachments, pn_connection_transport, pn_connection_error, pn_connection_condition, \ + pn_connection_remote_condition, pn_connection_collect, pn_connection_set_container, pn_connection_get_container, \ + pn_connection_get_hostname, pn_connection_set_hostname, pn_connection_get_user, pn_connection_set_user, \ + pn_connection_set_password, pn_connection_remote_container, pn_connection_remote_hostname, \ + pn_connection_remote_offered_capabilities, pn_connection_remote_desired_capabilities, \ + pn_connection_remote_properties, pn_connection_offered_capabilities, pn_connection_desired_capabilities, \ + pn_connection_properties, pn_connection_open, pn_connection_close, pn_connection_state, pn_connection_release, \ + pn_session, pn_session_head, pn_session_attachments, pn_session_condition, pn_session_remote_condition, \ + pn_session_get_incoming_capacity, pn_session_set_incoming_capacity, pn_session_get_outgoing_window, \ + pn_session_set_outgoing_window, pn_session_incoming_bytes, pn_session_outgoing_bytes, pn_session_open, \ + pn_session_close, pn_session_next, pn_session_state, pn_session_connection, pn_session_free, \ + PN_SND_UNSETTLED, PN_SND_SETTLED, PN_SND_MIXED, PN_RCV_FIRST, PN_RCV_SECOND, \ + pn_link_head, pn_link_is_sender, pn_link_attachments, pn_link_error, pn_link_condition, pn_link_remote_condition, \ + pn_link_open, pn_link_close, pn_link_state, pn_link_source, pn_link_target, pn_link_remote_source, \ + pn_link_remote_target, pn_link_session, pn_link_current, pn_link_advance, pn_link_unsettled, pn_link_credit, \ + pn_link_available, pn_link_queued, pn_link_next, pn_link_name, pn_link_is_receiver, pn_link_remote_snd_settle_mode, \ + pn_link_remote_rcv_settle_mode, pn_link_snd_settle_mode, pn_link_set_snd_settle_mode, pn_link_rcv_settle_mode, \ + pn_link_set_rcv_settle_mode, pn_link_get_drain, pn_link_set_drain, pn_link_drained, pn_link_remote_max_message_size, \ + pn_link_max_message_size, pn_link_set_max_message_size, pn_link_detach, pn_link_free, pn_link_offered, pn_link_send, \ + pn_link_flow, pn_link_recv, pn_link_drain, pn_link_draining, \ + pn_sender, pn_receiver, \ + PN_UNSPECIFIED, PN_SOURCE, PN_TARGET, PN_COORDINATOR, PN_NONDURABLE, PN_CONFIGURATION, \ + PN_DELIVERIES, PN_DIST_MODE_UNSPECIFIED, PN_DIST_MODE_COPY, PN_DIST_MODE_MOVE, PN_EXPIRE_WITH_LINK, \ + PN_EXPIRE_WITH_SESSION, PN_EXPIRE_WITH_CONNECTION, PN_EXPIRE_NEVER, \ + pn_terminus_set_durability, pn_terminus_set_timeout, pn_terminus_set_dynamic, pn_terminus_get_type, \ + pn_terminus_get_durability, pn_terminus_set_type, pn_terminus_get_address, pn_terminus_capabilities, \ + pn_terminus_set_address, pn_terminus_get_timeout, pn_terminus_filter, pn_terminus_properties, \ + pn_terminus_get_expiry_policy, pn_terminus_set_expiry_policy, pn_terminus_set_distribution_mode, \ + pn_terminus_get_distribution_mode, pn_terminus_copy, pn_terminus_outcomes, pn_terminus_is_dynamic, \ + PN_EOS, \ + pn_delivery, \ + pn_work_head, \ + pn_error_code, pn_error_text + +from ._common import utf82unicode, unicode2utf8 +from ._condition import obj2cond, cond2obj +from ._data import Data, obj2dat, dat2obj +from ._delivery import Delivery +from ._exceptions import EXCEPTIONS, LinkException, SessionException, ConnectionException +from ._transport import Transport +from ._wrapper import Wrapper + + +class Endpoint(object): + LOCAL_UNINIT = PN_LOCAL_UNINIT + REMOTE_UNINIT = PN_REMOTE_UNINIT + LOCAL_ACTIVE = PN_LOCAL_ACTIVE + REMOTE_ACTIVE = PN_REMOTE_ACTIVE + LOCAL_CLOSED = PN_LOCAL_CLOSED + REMOTE_CLOSED = PN_REMOTE_CLOSED + + def _init(self): + self.condition = None + + def _update_cond(self): + obj2cond(self.condition, self._get_cond_impl()) + + @property + def remote_condition(self): + return cond2obj(self._get_remote_cond_impl()) + + # the following must be provided by subclasses + def _get_cond_impl(self): + assert False, "Subclass must override this!" + + def _get_remote_cond_impl(self): + assert False, "Subclass must override this!" + + def _get_handler(self): + from . import reactor + from . import _reactor_impl + ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl)) + if ractor: + on_error = ractor.on_error_delegate() + else: + on_error = None + record = self._get_attachments() + return _reactor_impl.WrappedHandler.wrap(pn_record_get_handler(record), on_error) + + def _set_handler(self, handler): + from . import reactor + from . import _reactor_impl + ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl)) + if ractor: + on_error = ractor.on_error_delegate() + else: + on_error = None + impl = _reactor_impl._chandler(handler, on_error) + record = self._get_attachments() + pn_record_set_handler(record, impl) + pn_decref(impl) + + handler = property(_get_handler, _set_handler) + + @property + def transport(self): + return self.connection.transport + + +class Connection(Wrapper, Endpoint): + """ + A representation of an AMQP connection + """ + + @staticmethod + def wrap(impl): + if impl is None: + return None + else: + return Connection(impl) + + def __init__(self, impl=pn_connection): + Wrapper.__init__(self, impl, pn_connection_attachments) + + def _init(self): + Endpoint._init(self) + self.offered_capabilities = None + self.desired_capabilities = None + self.properties = None + + def _get_attachments(self): + return pn_connection_attachments(self._impl) + + @property + def connection(self): + return self + + @property + def transport(self): + return Transport.wrap(pn_connection_transport(self._impl)) + + def _check(self, err): + if err < 0: + exc = EXCEPTIONS.get(err, ConnectionException) + raise exc("[%s]: %s" % (err, pn_connection_error(self._impl))) + else: + return err + + def _get_cond_impl(self): + return pn_connection_condition(self._impl) + + def _get_remote_cond_impl(self): + return pn_connection_remote_condition(self._impl) + + def collect(self, collector): + if collector is None: + pn_connection_collect(self._impl, None) + else: + pn_connection_collect(self._impl, collector._impl) + self._collector = weakref.ref(collector) + + def _get_container(self): + return utf82unicode(pn_connection_get_container(self._impl)) + + def _set_container(self, name): + return pn_connection_set_container(self._impl, unicode2utf8(name)) + + container = property(_get_container, _set_container) + + def _get_hostname(self): + return utf82unicode(pn_connection_get_hostname(self._impl)) + + def _set_hostname(self, name): + return pn_connection_set_hostname(self._impl, unicode2utf8(name)) + + hostname = property(_get_hostname, _set_hostname, + doc=""" +Set the name of the host (either fully qualified or relative) to which this +connection is connecting to. This information may be used by the remote +peer to determine the correct back-end service to connect the client to. +This value will be sent in the Open performative, and will be used by SSL +and SASL layers to identify the peer. +""") + + def _get_user(self): + return utf82unicode(pn_connection_get_user(self._impl)) + + def _set_user(self, name): + return pn_connection_set_user(self._impl, unicode2utf8(name)) + + user = property(_get_user, _set_user) + + def _get_password(self): + return None + + def _set_password(self, name): + return pn_connection_set_password(self._impl, unicode2utf8(name)) + + password = property(_get_password, _set_password) + + @property + def remote_container(self): + """The container identifier specified by the remote peer for this connection.""" + return pn_connection_remote_container(self._impl) + + @property + def remote_hostname(self): + """The hostname specified by the remote peer for this connection.""" + return pn_connection_remote_hostname(self._impl) + + @property + def remote_offered_capabilities(self): + """The capabilities offered by the remote peer for this connection.""" + return dat2obj(pn_connection_remote_offered_capabilities(self._impl)) + + @property + def remote_desired_capabilities(self): + """The capabilities desired by the remote peer for this connection.""" + return dat2obj(pn_connection_remote_desired_capabilities(self._impl)) + + @property + def remote_properties(self): + """The properties specified by the remote peer for this connection.""" + return dat2obj(pn_connection_remote_properties(self._impl)) + + def open(self): + """ + Opens the connection. + + In more detail, this moves the local state of the connection to + the ACTIVE state and triggers an open frame to be sent to the + peer. A connection is fully active once both peers have opened it. + """ + obj2dat(self.offered_capabilities, + pn_connection_offered_capabilities(self._impl)) + obj2dat(self.desired_capabilities, + pn_connection_desired_capabilities(self._impl)) + obj2dat(self.properties, pn_connection_properties(self._impl)) + pn_connection_open(self._impl) + + def close(self): + """ + Closes the connection. + + In more detail, this moves the local state of the connection to + the CLOSED state and triggers a close frame to be sent to the + peer. A connection is fully closed once both peers have closed it. + """ + self._update_cond() + pn_connection_close(self._impl) + if hasattr(self, '_session_policy'): + # break circular ref + del self._session_policy + + @property + def state(self): + """ + The state of the connection as a bit field. The state has a local + and a remote component. Each of these can be in one of three + states: UNINIT, ACTIVE or CLOSED. These can be tested by masking + against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT, + REMOTE_ACTIVE and REMOTE_CLOSED. + """ + return pn_connection_state(self._impl) + + def session(self): + """ + Returns a new session on this connection. + """ + ssn = pn_session(self._impl) + if ssn is None: + raise (SessionException("Session allocation failed.")) + else: + return Session(ssn) + + def session_head(self, mask): + return Session.wrap(pn_session_head(self._impl, mask)) + + def link_head(self, mask): + return Link.wrap(pn_link_head(self._impl, mask)) + + @property + def work_head(self): + return Delivery.wrap(pn_work_head(self._impl)) + + @property + def error(self): + return pn_error_code(pn_connection_error(self._impl)) + + def free(self): + pn_connection_release(self._impl) + + +class Session(Wrapper, Endpoint): + + @staticmethod + def wrap(impl): + if impl is None: + return None + else: + return Session(impl) + + def __init__(self, impl): + Wrapper.__init__(self, impl, pn_session_attachments) + + def _get_attachments(self): + return pn_session_attachments(self._impl) + + def _get_cond_impl(self): + return pn_session_condition(self._impl) + + def _get_remote_cond_impl(self): + return pn_session_remote_condition(self._impl) + + def _get_incoming_capacity(self): + return pn_session_get_incoming_capacity(self._impl) + + def _set_incoming_capacity(self, capacity): + pn_session_set_incoming_capacity(self._impl, capacity) + + incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity) + + def _get_outgoing_window(self): + return pn_session_get_outgoing_window(self._impl) + + def _set_outgoing_window(self, window): + pn_session_set_outgoing_window(self._impl, window) + + outgoing_window = property(_get_outgoing_window, _set_outgoing_window) + + @property + def outgoing_bytes(self): + return pn_session_outgoing_bytes(self._impl) + + @property + def incoming_bytes(self): + return pn_session_incoming_bytes(self._impl) + + def open(self): + pn_session_open(self._impl) + + def close(self): + self._update_cond() + pn_session_close(self._impl) + + def next(self, mask): + return Session.wrap(pn_session_next(self._impl, mask)) + + @property + def state(self): + return pn_session_state(self._impl) + + @property + def connection(self): + return Connection.wrap(pn_session_connection(self._impl)) + + def sender(self, name): + return Sender(pn_sender(self._impl, unicode2utf8(name))) + + def receiver(self, name): + return Receiver(pn_receiver(self._impl, unicode2utf8(name))) + + def free(self): + pn_session_free(self._impl) + + +class Link(Wrapper, Endpoint): + """ + A representation of an AMQP link, of which there are two concrete + implementations, Sender and Receiver. + """ + + SND_UNSETTLED = PN_SND_UNSETTLED + SND_SETTLED = PN_SND_SETTLED + SND_MIXED = PN_SND_MIXED + + RCV_FIRST = PN_RCV_FIRST + RCV_SECOND = PN_RCV_SECOND + + @staticmethod + def wrap(impl): + if impl is None: return None + if pn_link_is_sender(impl): + return Sender(impl) + else: + return Receiver(impl) + + def __init__(self, impl): + Wrapper.__init__(self, impl, pn_link_attachments) + + def _get_attachments(self): + return pn_link_attachments(self._impl) + + def _check(self, err): + if err < 0: + exc = EXCEPTIONS.get(err, LinkException) + raise exc("[%s]: %s" % (err, pn_error_text(pn_link_error(self._impl)))) + else: + return err + + def _get_cond_impl(self): + return pn_link_condition(self._impl) + + def _get_remote_cond_impl(self): + return pn_link_remote_condition(self._impl) + + def open(self): + """ + Opens the link. + + In more detail, this moves the local state of the link to the + ACTIVE state and triggers an attach frame to be sent to the + peer. A link is fully active once both peers have attached it. + """ + pn_link_open(self._impl) + + def close(self): + """ + Closes the link. + + In more detail, this moves the local state of the link to the + CLOSED state and triggers an detach frame (with the closed flag + set) to be sent to the peer. A link is fully closed once both + peers have detached it. + """ + self._update_cond() + pn_link_close(self._impl) + + @property + def state(self): + """ + The state of the link as a bit field. The state has a local + and a remote component. Each of these can be in one of three + states: UNINIT, ACTIVE or CLOSED. These can be tested by masking + against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT, + REMOTE_ACTIVE and REMOTE_CLOSED. + """ + return pn_link_state(self._impl) + + @property + def source(self): + """The source of the link as described by the local peer.""" + return Terminus(pn_link_source(self._impl)) + + @property + def target(self): + """The target of the link as described by the local peer.""" + return Terminus(pn_link_target(self._impl)) + + @property + def remote_source(self): + """The source of the link as described by the remote peer.""" + return Terminus(pn_link_remote_source(self._impl)) + + @property + def remote_target(self): + """The target of the link as described by the remote peer.""" + return Terminus(pn_link_remote_target(self._impl)) + + @property + def session(self): + return Session.wrap(pn_link_session(self._impl)) + + @property + def connection(self): + """The connection on which this link was attached.""" + return self.session.connection + + def delivery(self, tag): + return Delivery(pn_delivery(self._impl, tag)) + + @property + def current(self): + return Delivery.wrap(pn_link_current(self._impl)) + + def advance(self): + return pn_link_advance(self._impl) + + @property + def unsettled(self): + return pn_link_unsettled(self._impl) + + @property + def credit(self): + """The amount of outstanding credit on this link.""" + return pn_link_credit(self._impl) + + @property + def available(self): + return pn_link_available(self._impl) + + @property + def queued(self): + return pn_link_queued(self._impl) + + def next(self, mask): + return Link.wrap(pn_link_next(self._impl, mask)) + + @property + def name(self): + """Returns the name of the link""" + return utf82unicode(pn_link_name(self._impl)) + + @property + def is_sender(self): + """Returns true if this link is a sender.""" + return pn_link_is_sender(self._impl) + + @property + def is_receiver(self): + """Returns true if this link is a receiver.""" + return pn_link_is_receiver(self._impl) + + @property + def remote_snd_settle_mode(self): + return pn_link_remote_snd_settle_mode(self._impl) + + @property + def remote_rcv_settle_mode(self): + return pn_link_remote_rcv_settle_mode(self._impl) + + def _get_snd_settle_mode(self): + return pn_link_snd_settle_mode(self._impl) + + def _set_snd_settle_mode(self, mode): + pn_link_set_snd_settle_mode(self._impl, mode) + + snd_settle_mode = property(_get_snd_settle_mode, _set_snd_settle_mode) + + def _get_rcv_settle_mode(self): + return pn_link_rcv_settle_mode(self._impl) + + def _set_rcv_settle_mode(self, mode): + pn_link_set_rcv_settle_mode(self._impl, mode) + + rcv_settle_mode = property(_get_rcv_settle_mode, _set_rcv_settle_mode) + + def _get_drain(self): + return pn_link_get_drain(self._impl) + + def _set_drain(self, b): + pn_link_set_drain(self._impl, bool(b)) + + drain_mode = property(_get_drain, _set_drain) + + def drained(self): + return pn_link_drained(self._impl) + + @property + def remote_max_message_size(self): + return pn_link_remote_max_message_size(self._impl) + + def _get_max_message_size(self): + return pn_link_max_message_size(self._impl) + + def _set_max_message_size(self, mode): + pn_link_set_max_message_size(self._impl, mode) + + max_message_size = property(_get_max_message_size, _set_max_message_size) + + def detach(self): + return pn_link_detach(self._impl) + + def free(self): + pn_link_free(self._impl) + + +class Sender(Link): + """ + A link over which messages are sent. + """ + + def offered(self, n): + pn_link_offered(self._impl, n) + + def stream(self, data): + """ + Send specified data as part of the current delivery + + @type data: binary + @param data: data to send + """ + return self._check(pn_link_send(self._impl, data)) + + def send(self, obj, tag=None): + """ + Send specified object over this sender; the object is expected to + have a send() method on it that takes the sender and an optional + tag as arguments. + + Where the object is a Message, this will send the message over + this link, creating a new delivery for the purpose. + """ + if hasattr(obj, 'send'): + return obj.send(self, tag=tag) + else: + # treat object as bytes + return self.stream(obj) + + def delivery_tag(self): + if not hasattr(self, 'tag_generator'): + def simple_tags(): + count = 1 + while True: + yield str(count) + count += 1 + + self.tag_generator = simple_tags() + return next(self.tag_generator) + + +class Receiver(Link): + """ + A link over which messages are received. + """ + + def flow(self, n): + """Increases the credit issued to the remote sender by the specified number of messages.""" + pn_link_flow(self._impl, n) + + def recv(self, limit): + n, binary = pn_link_recv(self._impl, limit) + if n == PN_EOS: + return None + else: + self._check(n) + return binary + + def drain(self, n): + pn_link_drain(self._impl, n) + + def draining(self): + return pn_link_draining(self._impl) + + +class Terminus(object): + UNSPECIFIED = PN_UNSPECIFIED + SOURCE = PN_SOURCE + TARGET = PN_TARGET + COORDINATOR = PN_COORDINATOR + + NONDURABLE = PN_NONDURABLE + CONFIGURATION = PN_CONFIGURATION + DELIVERIES = PN_DELIVERIES + + DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED + DIST_MODE_COPY = PN_DIST_MODE_COPY + DIST_MODE_MOVE = PN_DIST_MODE_MOVE + + EXPIRE_WITH_LINK = PN_EXPIRE_WITH_LINK + EXPIRE_WITH_SESSION = PN_EXPIRE_WITH_SESSION + EXPIRE_WITH_CONNECTION = PN_EXPIRE_WITH_CONNECTION + EXPIRE_NEVER = PN_EXPIRE_NEVER + + def __init__(self, impl): + self._impl = impl + + def _check(self, err): + if err < 0: + exc = EXCEPTIONS.get(err, LinkException) + raise exc("[%s]" % err) + else: + return err + + def _get_type(self): + return pn_terminus_get_type(self._impl) + + def _set_type(self, type): + self._check(pn_terminus_set_type(self._impl, type)) + + type = property(_get_type, _set_type) + + def _get_address(self): + """The address that identifies the source or target node""" + return utf82unicode(pn_terminus_get_address(self._impl)) + + def _set_address(self, address): + self._check(pn_terminus_set_address(self._impl, unicode2utf8(address))) + + address = property(_get_address, _set_address) + + def _get_durability(self): + return pn_terminus_get_durability(self._impl) + + def _set_durability(self, seconds): + self._check(pn_terminus_set_durability(self._impl, seconds)) + + durability = property(_get_durability, _set_durability) + + def _get_expiry_policy(self): + return pn_terminus_get_expiry_policy(self._impl) + + def _set_expiry_policy(self, seconds): + self._check(pn_terminus_set_expiry_policy(self._impl, seconds)) + + expiry_policy = property(_get_expiry_policy, _set_expiry_policy) + + def _get_timeout(self): + return pn_terminus_get_timeout(self._impl) + + def _set_timeout(self, seconds): + self._check(pn_terminus_set_timeout(self._impl, seconds)) + + timeout = property(_get_timeout, _set_timeout) + + def _is_dynamic(self): + """Indicates whether the source or target node was dynamically + created""" + return pn_terminus_is_dynamic(self._impl) + + def _set_dynamic(self, dynamic): + self._check(pn_terminus_set_dynamic(self._impl, dynamic)) + + dynamic = property(_is_dynamic, _set_dynamic) + + def _get_distribution_mode(self): + return pn_terminus_get_distribution_mode(self._impl) + + def _set_distribution_mode(self, mode): + self._check(pn_terminus_set_distribution_mode(self._impl, mode)) + + distribution_mode = property(_get_distribution_mode, _set_distribution_mode) + + @property + def properties(self): + """Properties of a dynamic source or target.""" + return Data(pn_terminus_properties(self._impl)) + + @property + def capabilities(self): + """Capabilities of the source or target.""" + return Data(pn_terminus_capabilities(self._impl)) + + @property + def outcomes(self): + return Data(pn_terminus_outcomes(self._impl)) + + @property + def filter(self): + """A filter on a source allows the set of messages transfered over + the link to be restricted""" + return Data(pn_terminus_filter(self._impl)) + + def copy(self, src): + self._check(pn_terminus_copy(self._impl, src._impl)) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/_events.py ---------------------------------------------------------------------- diff --git a/python/proton/_events.py b/python/proton/_events.py new file mode 100644 index 0000000..c8af8e2 --- /dev/null +++ b/python/proton/_events.py @@ -0,0 +1,333 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import absolute_import + +import threading + +from cproton import PN_SESSION_REMOTE_CLOSE, PN_SESSION_FINAL, pn_event_context, pn_collector_put, \ + PN_SELECTABLE_UPDATED, pn_collector, PN_CONNECTION_REMOTE_OPEN, pn_event_attachments, pn_event_type, \ + pn_collector_free, pn_handler_dispatch, PN_SELECTABLE_WRITABLE, PN_SELECTABLE_INIT, PN_SESSION_REMOTE_OPEN, \ + pn_collector_peek, PN_CONNECTION_BOUND, PN_LINK_FLOW, pn_event_connection, PN_LINK_LOCAL_CLOSE, \ + PN_TRANSPORT_ERROR, PN_CONNECTION_LOCAL_OPEN, PN_CONNECTION_LOCAL_CLOSE, pn_event_delivery, \ + PN_LINK_REMOTE_OPEN, PN_TRANSPORT_CLOSED, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT, pn_event_reactor, \ + PN_CONNECTION_REMOTE_CLOSE, pn_collector_pop, PN_LINK_INIT, pn_event_link, PN_CONNECTION_UNBOUND, \ + pn_event_type_name, pn_event_session, PN_LINK_FINAL, pn_py2void, PN_REACTOR_INIT, PN_REACTOR_QUIESCED, \ + PN_LINK_LOCAL_DETACH, PN_SESSION_INIT, PN_CONNECTION_FINAL, PN_TIMER_TASK, pn_class_name, PN_SELECTABLE_READABLE, \ + pn_event_transport, PN_TRANSPORT_TAIL_CLOSED, PN_SELECTABLE_FINAL, PN_SESSION_LOCAL_OPEN, PN_DELIVERY, \ + PN_SESSION_LOCAL_CLOSE, pn_event_copy, PN_REACTOR_FINAL, PN_LINK_LOCAL_OPEN, PN_SELECTABLE_EXPIRED, \ + PN_LINK_REMOTE_DETACH, PN_PYREF, PN_LINK_REMOTE_CLOSE, pn_event_root, PN_SELECTABLE_ERROR, \ + PN_CONNECTION_INIT, pn_event_class, pn_void2py, pn_cast_pn_session, pn_cast_pn_link, pn_cast_pn_delivery, \ + pn_cast_pn_transport, pn_cast_pn_connection, pn_cast_pn_selectable + +from ._common import Constant +from ._delivery import Delivery +from ._endpoints import Connection, Session, Link +from ._reactor_impl import Selectable, WrappedHandler +from ._transport import Transport +from ._wrapper import Wrapper + + +class Collector: + + def __init__(self): + self._impl = pn_collector() + + def put(self, obj, etype): + pn_collector_put(self._impl, PN_PYREF, pn_py2void(obj), etype.number) + + def peek(self): + return Event.wrap(pn_collector_peek(self._impl)) + + def pop(self): + ev = self.peek() + pn_collector_pop(self._impl) + + def __del__(self): + pn_collector_free(self._impl) + del self._impl + + +if "TypeExtender" not in globals(): + class TypeExtender: + def __init__(self, number): + self.number = number + + def next(self): + try: + return self.number + finally: + self.number += 1 + + +class EventType(object): + _lock = threading.Lock() + _extended = TypeExtender(10000) + TYPES = {} + + def __init__(self, name=None, number=None, method=None): + if name is None and number is None: + raise TypeError("extended events require a name") + try: + self._lock.acquire() + if name is None: + name = pn_event_type_name(number) + + if number is None: + number = self._extended.next() + + if method is None: + method = "on_%s" % name + + self.name = name + self.number = number + self.method = method + + self.TYPES[number] = self + finally: + self._lock.release() + + def __repr__(self): + return self.name + + +def dispatch(handler, method, *args): + m = getattr(handler, method, None) + if m: + return m(*args) + elif hasattr(handler, "on_unhandled"): + return handler.on_unhandled(method, *args) + + +class EventBase(object): + + def __init__(self, clazz, context, type): + self.clazz = clazz + self.context = context + self.type = type + + def dispatch(self, handler): + return dispatch(handler, self.type.method, self) + + +def _none(x): return None + + +DELEGATED = Constant("DELEGATED") + + +def _core(number, method): + return EventType(number=number, method=method) + + +wrappers = { + "pn_void": lambda x: pn_void2py(x), + "pn_pyref": lambda x: pn_void2py(x), + "pn_connection": lambda x: Connection.wrap(pn_cast_pn_connection(x)), + "pn_session": lambda x: Session.wrap(pn_cast_pn_session(x)), + "pn_link": lambda x: Link.wrap(pn_cast_pn_link(x)), + "pn_delivery": lambda x: Delivery.wrap(pn_cast_pn_delivery(x)), + "pn_transport": lambda x: Transport.wrap(pn_cast_pn_transport(x)), + "pn_selectable": lambda x: Selectable.wrap(pn_cast_pn_selectable(x)) +} + + +class Event(Wrapper, EventBase): + REACTOR_INIT = _core(PN_REACTOR_INIT, "on_reactor_init") + REACTOR_QUIESCED = _core(PN_REACTOR_QUIESCED, "on_reactor_quiesced") + REACTOR_FINAL = _core(PN_REACTOR_FINAL, "on_reactor_final") + + TIMER_TASK = _core(PN_TIMER_TASK, "on_timer_task") + + CONNECTION_INIT = _core(PN_CONNECTION_INIT, "on_connection_init") + CONNECTION_BOUND = _core(PN_CONNECTION_BOUND, "on_connection_bound") + CONNECTION_UNBOUND = _core(PN_CONNECTION_UNBOUND, "on_connection_unbound") + CONNECTION_LOCAL_OPEN = _core(PN_CONNECTION_LOCAL_OPEN, "on_connection_local_open") + CONNECTION_LOCAL_CLOSE = _core(PN_CONNECTION_LOCAL_CLOSE, "on_connection_local_close") + CONNECTION_REMOTE_OPEN = _core(PN_CONNECTION_REMOTE_OPEN, "on_connection_remote_open") + CONNECTION_REMOTE_CLOSE = _core(PN_CONNECTION_REMOTE_CLOSE, "on_connection_remote_close") + CONNECTION_FINAL = _core(PN_CONNECTION_FINAL, "on_connection_final") + + SESSION_INIT = _core(PN_SESSION_INIT, "on_session_init") + SESSION_LOCAL_OPEN = _core(PN_SESSION_LOCAL_OPEN, "on_session_local_open") + SESSION_LOCAL_CLOSE = _core(PN_SESSION_LOCAL_CLOSE, "on_session_local_close") + SESSION_REMOTE_OPEN = _core(PN_SESSION_REMOTE_OPEN, "on_session_remote_open") + SESSION_REMOTE_CLOSE = _core(PN_SESSION_REMOTE_CLOSE, "on_session_remote_close") + SESSION_FINAL = _core(PN_SESSION_FINAL, "on_session_final") + + LINK_INIT = _core(PN_LINK_INIT, "on_link_init") + LINK_LOCAL_OPEN = _core(PN_LINK_LOCAL_OPEN, "on_link_local_open") + LINK_LOCAL_CLOSE = _core(PN_LINK_LOCAL_CLOSE, "on_link_local_close") + LINK_LOCAL_DETACH = _core(PN_LINK_LOCAL_DETACH, "on_link_local_detach") + LINK_REMOTE_OPEN = _core(PN_LINK_REMOTE_OPEN, "on_link_remote_open") + LINK_REMOTE_CLOSE = _core(PN_LINK_REMOTE_CLOSE, "on_link_remote_close") + LINK_REMOTE_DETACH = _core(PN_LINK_REMOTE_DETACH, "on_link_remote_detach") + LINK_FLOW = _core(PN_LINK_FLOW, "on_link_flow") + LINK_FINAL = _core(PN_LINK_FINAL, "on_link_final") + + DELIVERY = _core(PN_DELIVERY, "on_delivery") + + TRANSPORT = _core(PN_TRANSPORT, "on_transport") + TRANSPORT_ERROR = _core(PN_TRANSPORT_ERROR, "on_transport_error") + TRANSPORT_HEAD_CLOSED = _core(PN_TRANSPORT_HEAD_CLOSED, "on_transport_head_closed") + TRANSPORT_TAIL_CLOSED = _core(PN_TRANSPORT_TAIL_CLOSED, "on_transport_tail_closed") + TRANSPORT_CLOSED = _core(PN_TRANSPORT_CLOSED, "on_transport_closed") + + SELECTABLE_INIT = _core(PN_SELECTABLE_INIT, "on_selectable_init") + SELECTABLE_UPDATED = _core(PN_SELECTABLE_UPDATED, "on_selectable_updated") + SELECTABLE_READABLE = _core(PN_SELECTABLE_READABLE, "on_selectable_readable") + SELECTABLE_WRITABLE = _core(PN_SELECTABLE_WRITABLE, "on_selectable_writable") + SELECTABLE_EXPIRED = _core(PN_SELECTABLE_EXPIRED, "on_selectable_expired") + SELECTABLE_ERROR = _core(PN_SELECTABLE_ERROR, "on_selectable_error") + SELECTABLE_FINAL = _core(PN_SELECTABLE_FINAL, "on_selectable_final") + + @staticmethod + def wrap(impl, number=None): + if impl is None: + return None + + if number is None: + number = pn_event_type(impl) + + event = Event(impl, number) + + # check for an application defined ApplicationEvent and return that. This + # avoids an expensive wrap operation invoked by event.context + if pn_event_class(impl) == PN_PYREF and \ + isinstance(event.context, EventBase): + return event.context + else: + return event + + def __init__(self, impl, number): + Wrapper.__init__(self, impl, pn_event_attachments) + self.__dict__["type"] = EventType.TYPES[number] + + def _init(self): + pass + + def copy(self): + copy = pn_event_copy(self._impl) + return Event.wrap(copy) + + @property + def clazz(self): + cls = pn_event_class(self._impl) + if cls: + return pn_class_name(cls) + else: + return None + + @property + def root(self): + return WrappedHandler.wrap(pn_event_root(self._impl)) + + @property + def context(self): + """Returns the context object associated with the event. The type of this depend on the type of event.""" + return wrappers[self.clazz](pn_event_context(self._impl)) + + def dispatch(self, handler, type=None): + type = type or self.type + if isinstance(handler, WrappedHandler): + pn_handler_dispatch(handler._impl, self._impl, type.number) + else: + result = dispatch(handler, type.method, self) + if result != DELEGATED and hasattr(handler, "handlers"): + for h in handler.handlers: + self.dispatch(h, type) + + @property + def reactor(self): + """Returns the reactor associated with the event.""" + return wrappers.get("pn_reactor", _none)(pn_event_reactor(self._impl)) + + def __getattr__(self, name): + r = self.reactor + if r and hasattr(r, 'subclass') and r.subclass.__name__.lower() == name: + return r + else: + return super(Event, self).__getattr__(name) + + @property + def transport(self): + """Returns the transport associated with the event, or null if none is associated with it.""" + return Transport.wrap(pn_event_transport(self._impl)) + + @property + def connection(self): + """Returns the connection associated with the event, or null if none is associated with it.""" + return Connection.wrap(pn_event_connection(self._impl)) + + @property + def session(self): + """Returns the session associated with the event, or null if none is associated with it.""" + return Session.wrap(pn_event_session(self._impl)) + + @property + def link(self): + """Returns the link associated with the event, or null if none is associated with it.""" + return Link.wrap(pn_event_link(self._impl)) + + @property + def sender(self): + """Returns the sender link associated with the event, or null if + none is associated with it. This is essentially an alias for + link(), that does an additional checkon the type of the + link.""" + l = self.link + if l and l.is_sender: + return l + else: + return None + + @property + def receiver(self): + """Returns the receiver link associated with the event, or null if + none is associated with it. This is essentially an alias for + link(), that does an additional checkon the type of the link.""" + l = self.link + if l and l.is_receiver: + return l + else: + return None + + @property + def delivery(self): + """Returns the delivery associated with the event, or null if none is associated with it.""" + return Delivery.wrap(pn_event_delivery(self._impl)) + + def __repr__(self): + return "%s(%s)" % (self.type, self.context) + + +class LazyHandlers(object): + def __get__(self, obj, clazz): + if obj is None: + return self + ret = [] + obj.__dict__['handlers'] = ret + return ret + + +class Handler(object): + handlers = LazyHandlers() + + def on_unhandled(self, method, *args): + pass --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
