Author: rhs
Date: Fri Oct 19 21:50:32 2012
New Revision: 1400309
URL: http://svn.apache.org/viewvc?rev=1400309&view=rev
Log:
fixed message-id and correlation-id
Modified:
qpid/proton/trunk/proton-c/bindings/php/proton.php
qpid/proton/trunk/proton-c/bindings/php/tests.php
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/bindings/ruby/ruby.i
qpid/proton/trunk/proton-c/include/proton/codec.h
qpid/proton/trunk/proton-c/include/proton/message.h
qpid/proton/trunk/proton-c/src/codec/codec.c
qpid/proton/trunk/proton-c/src/message/message.c
qpid/proton/trunk/tests/proton_tests/message.py
Modified: qpid/proton/trunk/proton-c/bindings/php/proton.php
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/php/proton.php?rev=1400309&r1=1400308&r2=1400309&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/php/proton.php (original)
+++ qpid/proton/trunk/proton-c/bindings/php/proton.php Fri Oct 19 21:50:32 2012
@@ -153,6 +153,8 @@ class Message {
const DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY;
var $impl;
+ var $_id;
+ var $_correlation_id;
public $instructions = null;
public $annotations = null;
public $properties = null;
@@ -160,6 +162,8 @@ class Message {
public function __construct() {
$this->impl = pn_message();
+ $this->_id = new Data(pn_message_id($this->impl));
+ $this->_correlation_id = new Data(pn_message_correlation_id($this->impl));
}
public function __destruct() {
@@ -273,13 +277,13 @@ class Message {
$this->_check(pn_message_set_delivery_count($this->impl, $value));
}
- # XXX
private function _get_id() {
- return pn_message_get_id($this->impl);
+ return $this->_id->get_object();
}
private function _set_id($value) {
- $this->_check(pn_message_set_id($this->impl, $value));
+ $this->_id->rewind();
+ $this->_id->put_object($value);
}
private function _get_user_id() {
@@ -314,13 +318,13 @@ class Message {
$this->_check(pn_message_set_reply_to($this->impl, $value));
}
- # XXX
private function _get_correlation_id() {
- return pn_message_get_correlation_id($this->impl);
+ return $this->_correlation_id->get_object();
}
private function _set_correlation_id($value) {
- $this->_check(pn_message_set_correlation_id($this->impl, $value));
+ $this->_correlation_id->rewind();
+ $this->_correlation_id->put_object($value);
}
private function _get_content_type() {
Modified: qpid/proton/trunk/proton-c/bindings/php/tests.php
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/php/tests.php?rev=1400309&r1=1400308&r2=1400309&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/php/tests.php (original)
+++ qpid/proton/trunk/proton-c/bindings/php/tests.php Fri Oct 19 21:50:32 2012
@@ -1,179 +1,16 @@
<?php
- // run from the command line lke this:
- // php -d extension=<path-to>/php/libcproton.so -f <path-to>/php/tests.php
- //
-include("cproton.php");
-function pump($t1, $t2) {
- while (TRUE) {
- $out1 = pn_output($t1, 1024);
- $out2 = pn_output($t2, 1024);
+include("proton.php");
- assert("$out1[0] >= 0");
- assert("$out2[0] >= 0");
- //print("pn_output1[0]= " . $out1[0] . " [1]=" . $out1[1] . "\n");
- //print("pn_output2[0]= " . $out2[0] . " [1]=" . $out2[1] . "\n");
-
- if ($out1[1] or $out2[1]) {
- if ($out1[1]) {
- $cd = pn_input($t2, $out1[1]);
- print("pn_input1= " . $cd . "\n");
- assert('$cd == strlen($out1[1])');
- }
- if ($out2[1]) {
- $cd = pn_input($t1, $out2[1]);
- print("pn_input2= " . $cd . "\n");
- assert('$cd == strlen($out2[1])');
- }
- } else {
- return;
- }
- }
-}
-
-print("BEGIN TEST\n");
-
-// class Test setup
-$c1 = pn_connection();
-$t1 = pn_transport($c1);
-pn_transport_open($t1);
-
-$c2 = pn_connection();
-$t2 = pn_transport($c2);
-pn_transport_open($t2);
-
-
-// class TransferTest setup
-$ssn1 = pn_session($c1);
-pn_connection_open($c1);
-pn_connection_open($c2);
-pn_session_open($ssn1);
-pump($t1, $t2);
-$ssn2 = pn_session_head($c2, PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE);
-pn_session_open($ssn2);
-
-$snd = pn_sender($ssn1, "test-link");
-$rcv = pn_receiver($ssn2, "test-link");
-
-pn_link_open($snd);
-pn_link_open($rcv);
-pump($t1, $t2);
-
-// test_disposition
-
-pn_flow($rcv, 1);
-
-pump($t1, $t2);
-
-$sd = pn_delivery($snd, "tag");
-$xxx = pn_delivery_tag($sd);
-assert('$xxx == "tag"');
-//print("Delivery Tag=" . $xxx . "\n");
-
-
-$msg = "this is a test";
-$n = pn_send($snd, $msg);
-//print("pn_send= " . $n . "\n");
-assert('$n == 14');
-pn_advance($snd);
-
-pump($t1, $t2);
-
-
-$rd = pn_current($rcv);
-assert('pn_delivery_tag($rd) == "tag"');
-assert('pn_delivery_tag($sd) == "tag"');
-//print("pn_delivery_tag= " . . "\n");
-//print("pn_delivery_tag= " . pn_delivery_tag($sd) . "\n");
-
-$rmsg = pn_recv($rcv, 1024);
-//print("pn_recv [0]= " . $rmsg[0] . " [1]= " . $rmsg[1] . "\n" );
-assert('$rmsg[0] == 14');
-assert('$rmsg[1] == "this is a test"');
-pn_disposition($rd, PN_ACCEPTED);
-
-pump($t1, $t2);
-
-// assert pn_remote_disposition(sd) == pn_local_disposition(rd) ==
PN_ACCEPTED
-// assert pn_updated(sd)
-
-pn_disposition($sd, PN_ACCEPTED);
-pn_settle($sd);
-
-pump($t1, $t2);
-
-// cleanup
-pn_connection_free($c1);
-pn_connection_free($c2);
-
-
-//
-// pn_message_data test
-//
-
-$x = pn_message_data("This is test data!", 1024);
-//print("Msg Len = " . $x[0] . "\n");
-assert("$x[0] > 0");
-//print("Msg data = " . $x[1] . "\n");
-
-
-//
-// pn_listener/pn_connector context handling
-//
-
-$pnd = pn_driver();
-
-// test empty listener & connector
-$pnl = pn_driver_listener($pnd);
-$pnlc = pn_listener_context($pnl);
-//print("Empty listener " . $pnlc . "\n");
-unset($pnlc);
-pn_listener_free($pnl);
-
-$pnc = pn_driver_connector($pnd);
-$pncc = pn_connector_context($pnc);
-//print("Empty connector " . $pncc . "\n");
-//assert("is_null($pncc)");
-unset($pncc);
-
-pn_connector_free($pnc);
-
-
-// manage a listener context
-$x = "listener-context";
-$pnl = pn_listener_fd($pnd, 1, $x);
-unset($x);
-$x = pn_listener_context($pnl);
-assert('$x == "listener-context"');
-//print("Retrieved listener context: " . $x . "\n");
-unset($x);
-$m = pn_listener_context($pnl);
-pn_listener_free($pnl);
-//print("After free: " . $m . "\n");
-assert('$m == "listener-context"');
-
-
-// manage a connector context
-$y = "connector-context";
-$pnc = pn_connector_fd($pnd, 2, $y);
-unset($y);
-$y = pn_connector_context($pnc);
-//print("Retrieved connector context: " . $y . "\n");
-assert('$y == "connector-context"');
-$y = 75;
-pn_connector_set_context($pnc, $y);
-unset($y);
-//print("Updated connector context: " . pn_connector_context($pnc) . "\n");
-assert('pn_connector_context($pnc) == 75');
-$m = pn_connector_context($pnc);
-pn_connector_free($pnc);
-//print("After free: " . $m . "\n");
-assert('$m == 75');
-
-
-pn_driver_free($pnd);
-
-
-print("END TEST\n");
+$msg = new Message();
+$msg->id = 10;
+$msg->correlation_id = "asdf";
+assert($msg->id == 10);
+assert($msg->correlation_id == "asdf");
+
+$copy = new Message();
+$copy->decode($msg->encode());
+assert($copy->id == $msg->id);
+assert($copy->correlation_id == $msg->correlation_id);
?>
\ No newline at end of file
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1400309&r1=1400308&r2=1400309&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Fri Oct 19 21:50:32
2012
@@ -344,6 +344,8 @@ class Message(object):
def __init__(self):
self._msg = pn_message()
+ self._id = Data(pn_message_id(self._msg))
+ self._correlation_id = Data(pn_message_correlation_id(self._msg))
self.instructions = None
self.annotations = None
self.properties = None
@@ -362,10 +364,10 @@ class Message(object):
return err
def _pre_encode(self):
- inst = Data(_data = pn_message_instructions(self._msg))
- ann = Data(_data = pn_message_annotations(self._msg))
- props = Data(_data = pn_message_properties(self._msg))
-# body = Data(_data = pn_message_body(self._msg))
+ inst = Data(pn_message_instructions(self._msg))
+ ann = Data(pn_message_annotations(self._msg))
+ props = Data(pn_message_properties(self._msg))
+# body = Data(pn_message_body(self._msg))
inst.clear()
if self.instructions is not None:
@@ -378,10 +380,10 @@ class Message(object):
props.put_object(self.properties)
def _post_decode(self):
- inst = Data(_data = pn_message_instructions(self._msg))
- ann = Data(_data = pn_message_annotations(self._msg))
- props = Data(_data = pn_message_properties(self._msg))
-# body = Data(_data = pn_message_body(self._msg))
+ inst = Data(pn_message_instructions(self._msg))
+ ann = Data(pn_message_annotations(self._msg))
+ props = Data(pn_message_properties(self._msg))
+# body = Data(pn_message_body(self._msg))
if inst.next():
self.instructions = inst.get_object()
@@ -464,13 +466,14 @@ True iff the recipient is the first to a
The number of delivery attempts made for this message.
""")
- # XXX
- def _get_id(self):
- return pn_message_get_id(self._msg)
+ def _get_id(self):
+ return self._id.get_object()
def _set_id(self, value):
- self._check(pn_message_set_id(self._msg, value))
-
+ if type(value) in (int, long):
+ value = ulong(value)
+ self._id.rewind()
+ self._id.put_object(value)
id = property(_get_id, _set_id,
doc="""
The id of the message.
@@ -520,12 +523,13 @@ The subject of the message.
The reply-to address for the message.
""")
- # XXX
def _get_correlation_id(self):
- return pn_message_get_correlation_id(self._msg)
-
+ return self._correlation_id.get_object()
def _set_correlation_id(self, value):
- self._check(pn_message_set_correlation_id(self._msg, value))
+ if type(value) in (int, long):
+ value = ulong(value)
+ self._correlation_id.rewind()
+ self._correlation_id.put_object(value)
correlation_id = property(_get_correlation_id, _set_correlation_id,
doc="""
@@ -666,6 +670,9 @@ class UnmappedType:
def __repr__(self):
return "UnmappedType(%s)" % self.msg
+class ulong(long):
+ pass
+
class Data:
"""
The L{Data} class provides an interface for decoding, extracting,
@@ -739,18 +746,18 @@ class Data:
BINARY = PN_BINARY; "A binary string."
STRING = PN_STRING; "A unicode string."
SYMBOL = PN_SYMBOL; "A symbolic string."
- DESCRIBED = PN_DESCRIPTOR; "A described value."
+ DESCRIBED = PN_DESCRIBED; "A described value."
ARRAY = PN_ARRAY; "An array value."
LIST = PN_LIST; "A list value."
MAP = PN_MAP; "A map value."
- def __init__(self, capacity=16, _data=None):
- if _data:
- self._data = _data or pn_data(capacity)
- self._free = False
- else:
+ def __init__(self, capacity=16):
+ if type(capacity) in (int, long):
self._data = pn_data(capacity)
self._free = True
+ else:
+ self._data = capacity
+ self._free = False
def __del__(self):
if self._free and hasattr(self, "_data"):
@@ -1233,7 +1240,7 @@ class Data:
If the current node is an unsigned long, returns its value,
returns 0 otherwise.
"""
- return pn_data_get_ulong(self._data)
+ return ulong(pn_data_get_ulong(self._data))
def get_long(self):
"""
@@ -1380,7 +1387,7 @@ class Data:
return result
put_mappings = {
- None.__class__: put_null,
+ None.__class__: lambda s, _: s.put_null(),
dict: put_dict,
list: put_sequence,
tuple: put_sequence,
@@ -1388,11 +1395,12 @@ class Data:
bytes: put_binary,
int: put_long,
long: put_long,
+ ulong: put_ulong,
float: put_double,
uuid.UUID: put_uuid
}
get_mappings = {
- NULL: lambda s, _: None,
+ NULL: lambda s: None,
MAP: get_dict,
LIST: get_sequence,
STRING: get_string,
@@ -1489,19 +1497,19 @@ class Connection(Endpoint):
@property
def offered_capabilities(self):
- return Data(_data=pn_connection_offered_capabilities(self._conn))
+ return Data(pn_connection_offered_capabilities(self._conn))
@property
def desired_capabilities(self):
- return Data(_data=pn_connection_desired_capabilities(self._conn))
+ return Data(pn_connection_desired_capabilities(self._conn))
@property
def remote_offered_capabilities(self):
- return Data(_data=pn_connection_remote_offered_capabilities(self._conn))
+ return Data(pn_connection_remote_offered_capabilities(self._conn))
@property
def remote_desired_capabilities(self):
- return Data(_data=pn_connection_remote_desired_capabilities(self._conn))
+ return Data(pn_connection_remote_desired_capabilities(self._conn))
def open(self):
pn_connection_open(self._conn)
@@ -1723,19 +1731,19 @@ class Terminus(object):
@property
def properties(self):
- return Data(_data = pn_terminus_properties(self._impl))
+ return Data(pn_terminus_properties(self._impl))
@property
def capabilities(self):
- return Data(_data = pn_terminus_capabilities(self._impl))
+ return Data(pn_terminus_capabilities(self._impl))
@property
def outcomes(self):
- return Data(_data = pn_terminus_outcomes(self._impl))
+ return Data(pn_terminus_outcomes(self._impl))
@property
def filter(self):
- return Data(_data = pn_terminus_filter(self._impl))
+ return Data(pn_terminus_filter(self._impl))
def copy(self, src):
self._check(pn_terminus_copy(self._impl, src._impl))
Modified: qpid/proton/trunk/proton-c/bindings/ruby/ruby.i
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/ruby/ruby.i?rev=1400309&r1=1400308&r2=1400309&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/ruby/ruby.i (original)
+++ qpid/proton/trunk/proton-c/bindings/ruby/ruby.i Fri Oct 19 21:50:32 2012
@@ -73,14 +73,14 @@ typedef int int32_t;
case T_STRING:
$1.type = PN_STRING;
- $1.u.as_string.start = RSTRING_PTR($input);
- if ($1.u.as_string.start)
+ $1.u.as_bytes.start = RSTRING_PTR($input);
+ if ($1.u.as_bytes.start)
{
- $1.u.as_string.size = RSTRING_LEN($input);
+ $1.u.as_bytes.size = RSTRING_LEN($input);
}
else
{
- $1.u.as_string.size = 0;
+ $1.u.as_bytes.size = 0;
}
break;
@@ -151,7 +151,7 @@ typedef int int32_t;
break;
case PN_STRING:
- $result = rb_str_new($1.u.as_string.start, $1.u.as_string.size);
+ $result = rb_str_new($1.u.as_bytes.start, $1.u.as_bytes.size);
break;
}
}
Modified: qpid/proton/trunk/proton-c/include/proton/codec.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/codec.h?rev=1400309&r1=1400308&r2=1400309&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/codec.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/codec.h Fri Oct 19 21:50:32 2012
@@ -54,11 +54,10 @@ typedef enum {
PN_BINARY,
PN_STRING,
PN_SYMBOL,
- PN_DESCRIPTOR,
+ PN_DESCRIBED,
PN_ARRAY,
PN_LIST,
- PN_MAP,
- PN_TYPE
+ PN_MAP
} pn_type_t;
typedef struct {
@@ -81,11 +80,8 @@ typedef struct {
pn_decimal64_t as_decimal64;
pn_decimal128_t as_decimal128;
pn_uuid_t as_uuid;
- pn_bytes_t as_binary;
- pn_bytes_t as_string;
- pn_bytes_t as_symbol;
- size_t count;
- pn_type_t type;
+ pn_bytes_t as_bytes;
+ size_t as_count;
} u;
} pn_atom_t;
@@ -142,6 +138,7 @@ int pn_data_put_uuid(pn_data_t *data, pn
int pn_data_put_binary(pn_data_t *data, pn_bytes_t bytes);
int pn_data_put_string(pn_data_t *data, pn_bytes_t string);
int pn_data_put_symbol(pn_data_t *data, pn_bytes_t symbol);
+int pn_data_put_atom(pn_data_t *data, pn_atom_t atom);
size_t pn_data_get_list(pn_data_t *data);
size_t pn_data_get_map(pn_data_t *data);
@@ -170,6 +167,8 @@ pn_uuid_t pn_data_get_uuid(pn_data_t *da
pn_bytes_t pn_data_get_binary(pn_data_t *data);
pn_bytes_t pn_data_get_string(pn_data_t *data);
pn_bytes_t pn_data_get_symbol(pn_data_t *data);
+pn_bytes_t pn_data_get_bytes(pn_data_t *data);
+pn_atom_t pn_data_get_atom(pn_data_t *data);
int pn_data_copy(pn_data_t *data, pn_data_t *src);
int pn_data_append(pn_data_t *data, pn_data_t *src);
Modified: qpid/proton/trunk/proton-c/include/proton/message.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/message.h?rev=1400309&r1=1400308&r2=1400309&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/message.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/message.h Fri Oct 19 21:50:32 2012
@@ -64,6 +64,7 @@ int pn_message_set_first_acqu
uint32_t pn_message_get_delivery_count (pn_message_t *msg);
int pn_message_set_delivery_count (pn_message_t *msg, uint32_t
count);
+pn_data_t * pn_message_id (pn_message_t *msg);
pn_atom_t pn_message_get_id (pn_message_t *msg);
int pn_message_set_id (pn_message_t *msg, pn_atom_t
id);
@@ -79,6 +80,7 @@ int pn_message_set_subject
const char * pn_message_get_reply_to (pn_message_t *msg);
int pn_message_set_reply_to (pn_message_t *msg, const char
*reply_to);
+pn_data_t * pn_message_correlation_id (pn_message_t *msg);
pn_atom_t pn_message_get_correlation_id (pn_message_t *msg);
int pn_message_set_correlation_id (pn_message_t *msg, pn_atom_t
atom);
@@ -123,14 +125,6 @@ pn_data_t *pn_message_annotations(pn_mes
pn_data_t *pn_message_properties(pn_message_t *msg);
pn_data_t *pn_message_body(pn_message_t *msg);
-// TODO:
-// bind vars
-//int pn_message_set(pn_message_t *message, int i, pn_atom_t *atom);
-//pn_atom_t *pn_message_get(pn_message_t *message, int i);
-// querying
-// could separate query from getting results
-//int pn_message_scan(pn_message_t *message, const char *query, pn_atom_t
*atom);
-
int pn_message_decode(pn_message_t *msg, const char *bytes, size_t size);
int pn_message_encode(pn_message_t *msg, char *bytes, size_t *size);
Modified: qpid/proton/trunk/proton-c/src/codec/codec.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/codec/codec.c?rev=1400309&r1=1400308&r2=1400309&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/codec/codec.c (original)
+++ qpid/proton/trunk/proton-c/src/codec/codec.c Fri Oct 19 21:50:32 2012
@@ -35,20 +35,51 @@
#include "encodings.h"
#include "../util.h"
+#define PN_DESCRIPTOR (PN_DESCRIBED)
+#define PN_TYPE (64)
+
+typedef struct {
+ pn_type_t type;
+ union {
+ bool as_bool;
+ uint8_t as_ubyte;
+ int8_t as_byte;
+ uint16_t as_ushort;
+ int16_t as_short;
+ uint32_t as_uint;
+ int32_t as_int;
+ pn_char_t as_char;
+ uint64_t as_ulong;
+ int64_t as_long;
+ pn_timestamp_t as_timestamp;
+ float as_float;
+ double as_double;
+ pn_decimal32_t as_decimal32;
+ pn_decimal64_t as_decimal64;
+ pn_decimal128_t as_decimal128;
+ pn_uuid_t as_uuid;
+ pn_bytes_t as_binary;
+ pn_bytes_t as_string;
+ pn_bytes_t as_symbol;
+ size_t count;
+ pn_type_t type;
+ } u;
+} pn_iatom_t;
+
typedef struct {
size_t size;
- pn_atom_t *start;
+ pn_iatom_t *start;
} pn_atoms_t;
int pn_decode_atoms(pn_bytes_t *bytes, pn_atoms_t *atoms);
int pn_encode_atoms(pn_bytes_t *bytes, pn_atoms_t *atoms);
int pn_decode_one(pn_bytes_t *bytes, pn_atoms_t *atoms);
-int pn_print_atom(pn_atom_t atom);
+int pn_print_atom(pn_iatom_t atom);
const char *pn_type_str(pn_type_t type);
int pn_print_atoms(const pn_atoms_t *atoms);
ssize_t pn_format_atoms(char *buf, size_t n, pn_atoms_t atoms);
-int pn_format_atom(pn_bytes_t *bytes, pn_atom_t atom);
+int pn_format_atom(pn_bytes_t *bytes, pn_iatom_t atom);
typedef union {
uint32_t i;
@@ -60,6 +91,8 @@ typedef union {
const char *pn_type_str(pn_type_t type)
{
+ if (type == PN_TYPE) return "PN_TYPE";
+
switch (type)
{
case PN_NULL: return "PN_NULL";
@@ -87,7 +120,6 @@ const char *pn_type_str(pn_type_t type)
case PN_ARRAY: return "PN_ARRAY";
case PN_LIST: return "PN_LIST";
case PN_MAP: return "PN_MAP";
- case PN_TYPE: return "PN_TYPE";
}
return "<UNKNOWN>";
@@ -111,7 +143,7 @@ int pn_bytes_format(pn_bytes_t *bytes, c
}
}
-int pn_print_atom(pn_atom_t atom)
+int pn_print_atom(pn_iatom_t atom)
{
size_t size = 4;
while (true) {
@@ -132,8 +164,12 @@ int pn_print_atom(pn_atom_t atom)
}
}
-int pn_format_atom(pn_bytes_t *bytes, pn_atom_t atom)
+int pn_format_atom(pn_bytes_t *bytes, pn_iatom_t atom)
{
+ if (atom.type == PN_TYPE) {
+ return pn_bytes_format(bytes, "%s", pn_type_str(atom.u.type));
+ }
+
switch (atom.type)
{
case PN_NULL:
@@ -257,8 +293,6 @@ int pn_format_atom(pn_bytes_t *bytes, pn
return pn_bytes_format(bytes, "list[%zu]", atom.u.count);
case PN_MAP:
return pn_bytes_format(bytes, "map[%zu]", atom.u.count);
- case PN_TYPE:
- return pn_bytes_format(bytes, "%s", pn_type_str(atom.u.type));
}
return PN_ARG_ERR;
@@ -296,7 +330,7 @@ size_t pn_atoms_ltrim(pn_atoms_t *atoms,
int pn_format_atoms_one(pn_bytes_t *bytes, pn_atoms_t *atoms, int level)
{
if (!atoms->size) return PN_UNDERFLOW;
- pn_atom_t *atom = atoms->start;
+ pn_iatom_t *atom = atoms->start;
pn_atoms_ltrim(atoms, 1);
int err, count;
@@ -552,7 +586,7 @@ int pn_encode_type(pn_bytes_t *bytes, pn
{
if (!atoms->size) return PN_UNDERFLOW;
- pn_atom_t *atom = atoms->start;
+ pn_iatom_t *atom = atoms->start;
if (atom->type == PN_DESCRIPTOR)
{
pn_atoms_ltrim(atoms, 1);
@@ -572,7 +606,7 @@ int pn_encode_type(pn_bytes_t *bytes, pn
int pn_encode_value(pn_bytes_t *bytes, pn_atoms_t *atoms, pn_type_t type)
{
- pn_atom_t *atom = atoms->start;
+ pn_iatom_t *atom = atoms->start;
int e;
conv_t c;
@@ -668,7 +702,7 @@ int pn_decode_type(pn_bytes_t *bytes, pn
pn_bytes_ltrim(bytes, 1);
return 0;
} else {
- atoms->start[0] = (pn_atom_t) {.type=PN_DESCRIPTOR};
+ atoms->start[0] = (pn_iatom_t) {.type=PN_DESCRIPTOR};
pn_bytes_ltrim(bytes, 1);
pn_atoms_ltrim(atoms, 1);
int e = pn_decode_atom(bytes, atoms);
@@ -761,7 +795,7 @@ int pn_decode_value(pn_bytes_t *bytes, p
if (!atoms->size) return PN_OVERFLOW;
- pn_atom_t atom;
+ pn_iatom_t atom;
switch (code)
{
@@ -983,7 +1017,7 @@ int pn_decode_value(pn_bytes_t *bytes, p
case PNE_ARRAY32:
{
if (!atoms->size) return PN_OVERFLOW;
- atoms->start[0] = (pn_atom_t) {.type=PN_ARRAY, .u.count=count};
+ atoms->start[0] = (pn_iatom_t) {.type=PN_ARRAY, .u.count=count};
pn_atoms_ltrim(atoms, 1);
uint8_t acode;
int e = pn_decode_type(bytes, atoms, &acode);
@@ -991,7 +1025,7 @@ int pn_decode_value(pn_bytes_t *bytes, p
if (!atoms->size) return PN_OVERFLOW;
pn_type_t type = pn_code2type(acode);
if (type < 0) return type;
- atoms->start[0] = (pn_atom_t) {.type=PN_TYPE, .u.type=type};
+ atoms->start[0] = (pn_iatom_t) {.type=PN_TYPE, .u.type=type};
pn_atoms_ltrim(atoms, 1);
for (int i = 0; i < count; i++)
{
@@ -1003,13 +1037,13 @@ int pn_decode_value(pn_bytes_t *bytes, p
case PNE_LIST8:
case PNE_LIST32:
if (!atoms->size) return PN_OVERFLOW;
- atoms->start[0] = (pn_atom_t) {.type=PN_LIST, .u.count=count};
+ atoms->start[0] = (pn_iatom_t) {.type=PN_LIST, .u.count=count};
pn_atoms_ltrim(atoms, 1);
break;
case PNE_MAP8:
case PNE_MAP32:
if (!atoms->size) return PN_OVERFLOW;
- atoms->start[0] = (pn_atom_t) {.type=PN_MAP, .u.count=count};
+ atoms->start[0] = (pn_iatom_t) {.type=PN_MAP, .u.count=count};
pn_atoms_ltrim(atoms, 1);
break;
default:
@@ -1054,7 +1088,7 @@ typedef struct {
size_t down;
size_t parent;
size_t children;
- pn_atom_t atom;
+ pn_iatom_t atom;
// for arrays
bool described;
pn_type_t type;
@@ -1174,6 +1208,7 @@ void pn_data_rebase(pn_data_t *data, cha
int pn_data_intern_node(pn_data_t *data, pn_node_t *node)
{
pn_bytes_t *bytes = pn_data_bytes(data, node);
+ if (!bytes) return 0;
size_t oldcap = pn_buffer_capacity(data->buf);
ssize_t offset = pn_data_intern(data, bytes->start, bytes->size);
if (offset < 0) return offset;
@@ -1804,7 +1839,7 @@ int pn_data_as_atoms(pn_data_t *data, pn
int pn_data_print(pn_data_t *data)
{
- pn_atom_t atoms[data->size + data->extras];
+ pn_iatom_t atoms[data->size + data->extras];
pn_atoms_t latoms = {.size=data->size + data->extras, .start=atoms};
pn_data_as_atoms(data, &latoms);
return pn_print_atoms(&latoms);
@@ -1812,7 +1847,7 @@ int pn_data_print(pn_data_t *data)
int pn_data_format(pn_data_t *data, char *bytes, size_t *size)
{
- pn_atom_t atoms[data->size + data->extras];
+ pn_iatom_t atoms[data->size + data->extras];
pn_atoms_t latoms = {.size=data->size + data->extras, .start=atoms};
pn_data_as_atoms(data, &latoms);
@@ -1911,9 +1946,14 @@ pn_node_t *pn_data_peek(pn_data_t *data)
pn_node_t *current = pn_data_current(data);
if (current) {
return pn_data_node(data, current->next);
- } else {
- return NULL;
}
+
+ pn_node_t *parent = pn_data_node(data, data->parent);
+ if (parent) {
+ return pn_data_node(data, parent->down);
+ }
+
+ return NULL;
}
bool pn_data_next(pn_data_t *data)
@@ -2078,16 +2118,16 @@ int pn_data_as_atoms(pn_data_t *data, pn
if (node->atom.type == PN_ARRAY) {
if (node->described) {
- atoms->start[natoms++] = (pn_atom_t) {.type=PN_DESCRIPTOR};
+ atoms->start[natoms++] = (pn_iatom_t) {.type=PN_DESCRIPTOR};
} else {
- atoms->start[natoms++] = (pn_atom_t) {.type=PN_TYPE,
.u.type=node->type};
+ atoms->start[natoms++] = (pn_iatom_t) {.type=PN_TYPE,
.u.type=node->type};
}
}
pn_node_t *parent = pn_data_node(data, node->parent);
if (parent && parent->atom.type == PN_ARRAY && parent->described &&
parent->down == pn_data_id(data, node)) {
- atoms->start[natoms++] = (pn_atom_t) {.type=PN_TYPE,
.u.type=parent->type};
+ atoms->start[natoms++] = (pn_iatom_t) {.type=PN_TYPE,
.u.type=parent->type};
}
size_t next = 0;
@@ -2115,7 +2155,7 @@ int pn_data_as_atoms(pn_data_t *data, pn
ssize_t pn_data_encode(pn_data_t *data, char *bytes, size_t size)
{
- pn_atom_t atoms[data->size + data->extras];
+ pn_iatom_t atoms[data->size + data->extras];
pn_atoms_t latoms = {.size=data->size + data->extras, .start=atoms};
pn_data_as_atoms(data, &latoms);
@@ -2133,7 +2173,8 @@ int pn_data_parse_atoms(pn_data_t *data,
for (i = offset; i < atoms.size; i++) {
if (count == limit) return i - offset;
- pn_atom_t atom = atoms.start[i];
+ pn_iatom_t atom = atoms.start[i];
+ if (atom.type == PN_TYPE) return PN_ERR;
switch (atom.type)
{
case PN_NULL:
@@ -2287,9 +2328,6 @@ int pn_data_parse_atoms(pn_data_t *data,
pn_data_exit(data);
count++;
break;
- case PN_TYPE:
- return PN_ERR;
- break;
}
}
@@ -2303,7 +2341,7 @@ ssize_t pn_data_decode(pn_data_t *data,
pn_bytes_t lbytes;
while (true) {
- pn_atom_t atoms[asize];
+ pn_iatom_t atoms[asize];
latoms.size = asize;
latoms.start = atoms;
lbytes.size = size;
@@ -2360,7 +2398,7 @@ int pn_data_put_described(pn_data_t *dat
int pn_data_put_null(pn_data_t *data)
{
pn_node_t *node = pn_data_add(data);
- node->atom = (pn_atom_t) {.type=PN_NULL};
+ node->atom = (pn_iatom_t) {.type=PN_NULL};
return 0;
}
@@ -2524,6 +2562,13 @@ int pn_data_put_symbol(pn_data_t *data,
return pn_data_intern_node(data, node);
}
+int pn_data_put_atom(pn_data_t *data, pn_atom_t atom)
+{
+ pn_node_t *node = pn_data_add(data);
+ node->atom = *((pn_iatom_t *) (&atom));
+ return pn_data_intern_node(data, node);
+}
+
size_t pn_data_get_list(pn_data_t *data)
{
pn_node_t *node = pn_data_current(data);
@@ -2790,6 +2835,24 @@ pn_bytes_t pn_data_get_symbol(pn_data_t
}
}
+pn_bytes_t pn_data_get_bytes(pn_data_t *data)
+{
+ pn_node_t *node = pn_data_current(data);
+ if (node && (node->atom.type == PN_BINARY ||
+ node->atom.type == PN_STRING ||
+ node->atom.type == PN_SYMBOL)) {
+ return node->atom.u.as_binary;
+ } else {
+ return (pn_bytes_t) {0};
+ }
+}
+
+pn_atom_t pn_data_get_atom(pn_data_t *data)
+{
+ pn_node_t *node = pn_data_current(data);
+ return *((pn_atom_t *) &node->atom);
+}
+
int pn_data_copy(pn_data_t *data, pn_data_t *src)
{
pn_data_clear(data);
@@ -2949,8 +3012,6 @@ int pn_data_appendn(pn_data_t *data, pn_
pn_data_enter(src);
level++;
break;
- case PN_TYPE:
- return PN_ERR;
}
if (err) { pn_data_restore(src, point); return err; }
Modified: qpid/proton/trunk/proton-c/src/message/message.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/message/message.c?rev=1400309&r1=1400308&r2=1400309&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/message/message.c (original)
+++ qpid/proton/trunk/proton-c/src/message/message.c Fri Oct 19 21:50:32 2012
@@ -52,12 +52,12 @@ struct pn_message_t {
pn_millis_t ttl;
bool first_acquirer;
uint32_t delivery_count;
- pn_atom_t id;
+ pn_data_t *id;
pn_buffer_t *user_id;
pn_buffer_t *address;
pn_buffer_t *subject;
pn_buffer_t *reply_to;
- pn_atom_t correlation_id;
+ pn_data_t *correlation_id;
pn_buffer_t *content_type;
pn_buffer_t *content_encoding;
pn_timestamp_t expiry_time;
@@ -85,12 +85,12 @@ pn_message_t *pn_message()
msg->ttl = 0;
msg->first_acquirer = false;
msg->delivery_count = 0;
- msg->id.type = PN_NULL;
+ msg->id = pn_data(1);
msg->user_id = NULL;
msg->address = NULL;
msg->subject = NULL;
msg->reply_to = NULL;
- msg->correlation_id.type = PN_NULL;
+ msg->correlation_id = pn_data(1);
msg->content_type = NULL;
msg->content_encoding = NULL;
msg->expiry_time = 0;
@@ -120,6 +120,8 @@ void pn_message_free(pn_message_t *msg)
pn_buffer_free(msg->content_encoding);
pn_buffer_free(msg->group_id);
pn_buffer_free(msg->reply_to_group_id);
+ pn_data_free(msg->id);
+ pn_data_free(msg->correlation_id);
pn_data_free(msg->data);
pn_data_free(msg->instructions);
pn_data_free(msg->annotations);
@@ -138,12 +140,12 @@ void pn_message_clear(pn_message_t *msg)
msg->ttl = 0;
msg->first_acquirer = false;
msg->delivery_count = 0;
- msg->id.type = PN_NULL;
+ pn_data_clear(msg->id);
if (msg->user_id) pn_buffer_clear(msg->user_id);
if (msg->address) pn_buffer_clear(msg->address);
if (msg->subject) pn_buffer_clear(msg->subject);
if (msg->reply_to) pn_buffer_clear(msg->reply_to);
- msg->correlation_id.type = PN_NULL;
+ pn_data_clear(msg->correlation_id);
if (msg->content_type) pn_buffer_clear(msg->content_type);
if (msg->content_encoding) pn_buffer_clear(msg->content_encoding);
msg->expiry_time = 0;
@@ -240,16 +242,20 @@ int pn_message_set_delivery_count(pn_mes
return 0;
}
+pn_data_t *pn_message_id(pn_message_t *msg)
+{
+ return msg ? msg->id : NULL;
+}
pn_atom_t pn_message_get_id(pn_message_t *msg)
{
- return msg ? msg->id : (pn_atom_t) {.type=PN_NULL};
+ return msg ? pn_data_get_atom(msg->id) : (pn_atom_t) {.type=PN_NULL};
}
int pn_message_set_id(pn_message_t *msg, pn_atom_t id)
{
if (!msg) return PN_ARG_ERR;
- msg->id = id;
- return 0;
+ pn_data_rewind(msg->id);
+ return pn_data_put_atom(msg->id, id);
}
static int pn_buffer_set_bytes(pn_buffer_t **buf, pn_bytes_t bytes)
@@ -337,16 +343,20 @@ int pn_message_set_reply_to(pn_message_t
return pn_buffer_set_str(&msg->reply_to, reply_to);
}
+pn_data_t *pn_message_correlation_id(pn_message_t *msg)
+{
+ return msg ? msg->correlation_id : NULL;
+}
pn_atom_t pn_message_get_correlation_id(pn_message_t *msg)
{
- return msg ? msg->correlation_id : (pn_atom_t) {.type=PN_NULL};
+ return msg ? pn_data_get_atom(msg->correlation_id) : (pn_atom_t)
{.type=PN_NULL};
}
int pn_message_set_correlation_id(pn_message_t *msg, pn_atom_t atom)
{
if (!msg) return PN_ARG_ERR;
- msg->correlation_id = atom;
- return 0;
+ pn_data_rewind(msg->correlation_id);
+ return pn_data_put_atom(msg->correlation_id, atom);
}
const char *pn_message_get_content_type(pn_message_t *msg)
@@ -458,8 +468,11 @@ int pn_message_decode(pn_message_t *msg,
{
pn_bytes_t user_id, address, subject, reply_to, ctype, cencoding,
group_id, reply_to_group_id;
- err = pn_data_scan(msg->data, "D.[.zSSS.ssttSIS]", &user_id, &address,
- &subject, &reply_to, &ctype, &cencoding,
+ pn_data_clear(msg->id);
+ pn_data_clear(msg->correlation_id);
+ err = pn_data_scan(msg->data, "D.[CzSSSCssttSIS]", msg->id,
+ &user_id, &address, &subject, &reply_to,
+ msg->correlation_id, &ctype, &cencoding,
&msg->expiry_time, &msg->creation_time, &group_id,
&msg->group_sequence, &reply_to_group_id);
if (err) return pn_error_format(msg->error, err, "data error: %s",
@@ -553,11 +566,13 @@ int pn_message_encode(pn_message_t *msg,
pn_data_exit(msg->data);
}
- err = pn_data_fill(msg->data, "DL[nzSSSnssttSIS]", PROPERTIES,
+ err = pn_data_fill(msg->data, "DL[CzSSSCssttSIS]", PROPERTIES,
+ msg->id,
pn_buffer_bytes(msg->user_id),
pn_buffer_str(msg->address),
pn_buffer_str(msg->subject),
pn_buffer_str(msg->reply_to),
+ msg->correlation_id,
pn_buffer_str(msg->content_type),
pn_buffer_str(msg->content_encoding),
msg->expiry_time,
Modified: qpid/proton/trunk/tests/proton_tests/message.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/message.py?rev=1400309&r1=1400308&r2=1400309&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/message.py (original)
+++ qpid/proton/trunk/tests/proton_tests/message.py Fri Oct 19 21:50:32 2012
@@ -19,6 +19,7 @@
import os, common
from proton import *
+from uuid import uuid3, NAMESPACE_OID
class Test(common.Test):
@@ -45,6 +46,12 @@ class AccessorsTest(Test):
def _test_time(self, name):
self._test(name, 0, (0, 123456789, 987654321))
+ def testId(self):
+ self._test("id", None, ("bytes", None, 123, u"string",
uuid3(NAMESPACE_OID, "blah")))
+
+ def testCorrelationId(self):
+ self._test("correlation_id", None, ("bytes", None, 123, u"string",
uuid3(NAMESPACE_OID, "blah")))
+
def testDurable(self):
self._test("durable", False, (True, False))
@@ -96,6 +103,8 @@ class AccessorsTest(Test):
class CodecTest(Test):
def testRoundTrip(self):
+ self.msg.id = "asdf"
+ self.msg.correlation_id = uuid3(NAMESPACE_OID, "bleh")
self.msg.ttl = 3
self.msg.priority = 100
self.msg.address = "address"
@@ -108,6 +117,8 @@ class CodecTest(Test):
msg2 = Message()
msg2.decode(data)
+ assert self.msg.id == msg2.id, (self.msg.id, msg2.id)
+ assert self.msg.correlation_id == msg2.correlation_id,
(self.msg.correlation_id, msg2.correlation_id)
assert self.msg.ttl == msg2.ttl, (self.msg.ttl, msg2.ttl)
assert self.msg.priority == msg2.priority, (self.msg.priority,
msg2.priority)
assert self.msg.address == msg2.address, (self.msg.address, msg2.address)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]