Author: rhs
Date: Mon Oct 22 21:24:00 2012
New Revision: 1401085
URL: http://svn.apache.org/viewvc?rev=1401085&view=rev
Log:
improved handling of message bodies
Modified:
qpid/proton/trunk/examples/messenger/client.py
qpid/proton/trunk/examples/messenger/recv.py
qpid/proton/trunk/examples/messenger/send.py
qpid/proton/trunk/examples/messenger/server.py
qpid/proton/trunk/proton-c/bindings/php/proton.php
qpid/proton/trunk/proton-c/bindings/php/tests.php
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/include/proton/message.h
qpid/proton/trunk/proton-c/src/message/message.c
qpid/proton/trunk/proton-c/src/messenger.c
qpid/proton/trunk/tests/proton_tests/message.py
Modified: qpid/proton/trunk/examples/messenger/client.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/messenger/client.py?rev=1401085&r1=1401084&r2=1401085&view=diff
==============================================================================
--- qpid/proton/trunk/examples/messenger/client.py (original)
+++ qpid/proton/trunk/examples/messenger/client.py Mon Oct 22 21:24:00 2012
@@ -44,7 +44,7 @@ msg.reply_to = opts.reply_to
mng.put(msg)
mng.send()
-if opts.reply_to[:2] != "~/":
+if opts.reply_to[:2] == "~/":
mng.recv(1)
try:
mng.get(msg)
Modified: qpid/proton/trunk/examples/messenger/recv.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/messenger/recv.py?rev=1401085&r1=1401084&r2=1401085&view=diff
==============================================================================
--- qpid/proton/trunk/examples/messenger/recv.py (original)
+++ qpid/proton/trunk/examples/messenger/recv.py Mon Oct 22 21:24:00 2012
@@ -49,11 +49,6 @@ while True:
except Exception, e:
print e
else:
- try:
- body = msg.save()
- except Exception, e:
- print e
- else:
- print msg.address, msg.subject or "(no subject)", msg.properties, body
+ print msg.address, msg.subject or "(no subject)", msg.properties,
msg.body
mng.stop()
Modified: qpid/proton/trunk/examples/messenger/send.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/messenger/send.py?rev=1401085&r1=1401084&r2=1401085&view=diff
==============================================================================
--- qpid/proton/trunk/examples/messenger/send.py (original)
+++ qpid/proton/trunk/examples/messenger/send.py Mon Oct 22 21:24:00 2012
@@ -32,10 +32,12 @@ if not args:
mng = Messenger()
mng.start()
+import uuid
+
msg = Message()
for m in args:
msg.address = opts.address
- msg.load(m)
+ msg.body = unicode(m)
mng.put(msg)
mng.send()
Modified: qpid/proton/trunk/examples/messenger/server.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/messenger/server.py?rev=1401085&r1=1401084&r2=1401085&view=diff
==============================================================================
--- qpid/proton/trunk/examples/messenger/server.py (original)
+++ qpid/proton/trunk/examples/messenger/server.py Mon Oct 22 21:24:00 2012
@@ -50,9 +50,10 @@ while True:
if mng.incoming > 0:
mng.get(msg)
if msg.reply_to:
+ print msg.reply_to
reply.address = msg.reply_to
reply.correlation_id = msg.correlation_id
- reply.load(msg.save())
+ reply.body = msg.body
dispatch(msg, reply)
mng.put(reply)
mng.send()
Modified: qpid/proton/trunk/proton-c/bindings/php/proton.php
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/php/proton.php?rev=1401085&r1=1401084&r2=1401085&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/php/proton.php (original)
+++ qpid/proton/trunk/proton-c/bindings/php/proton.php Mon Oct 22 21:24:00 2012
@@ -200,7 +200,7 @@ class Message {
$inst = new Data(pn_message_instructions($this->impl));
$ann = new Data(pn_message_annotations($this->impl));
$props = new Data(pn_message_properties($this->impl));
- //$body = Data(pn_message_body($this->impl));
+ $body = new Data(pn_message_body($this->impl));
$inst->clear();
if ($this->instructions != null)
@@ -211,13 +211,15 @@ class Message {
$props->clear();
if ($this->properties != null)
$props->put_object($this->properties);
+ if ($this->body != null)
+ $body->put_object($this->body);
}
function _post_decode() {
$inst = new Data(pn_message_instructions($this->impl));
$ann = new Data(pn_message_annotations($this->impl));
$props = new Data(pn_message_properties($this->impl));
- //$body = new Data(pn_message_body($this->impl));
+ $body = new Data(pn_message_body($this->impl));
if ($inst->next())
$this->instructions = $inst.get_object();
@@ -231,10 +233,10 @@ class Message {
$this->properties = $props->get_object();
else
$this->properties = null;
- /*if ($body->next())
+ if ($body->next())
$this->body = $body->get_object();
else
- $this->body = null;*/
+ $this->body = null;
}
public function clear() {
@@ -245,12 +247,20 @@ class Message {
$this->body = null;
}
- private function _is_durable() {
+ private function _get_inferred() {
+ return pn_message_is_inferred($this->impl);
+ }
+
+ private function _set_inferred($value) {
+ $this->_check(pn_message_set_inferred($this->impl, $value));
+ }
+
+ private function _get_durable() {
return pn_message_is_durable($this->impl);
}
private function _set_durable($value) {
- $this->_check(pn_message_set_durable($this->impl, bool($value)));
+ $this->_check(pn_message_set_durable($this->impl, $value));
}
private function _get_priority() {
@@ -269,12 +279,12 @@ class Message {
$this->_check(pn_message_set_ttl($this->impl, $value));
}
- private function _is_first_acquirer() {
+ private function _get_first_acquirer() {
return pn_message_is_first_acquirer($this->impl);
}
private function _set_first_acquirer($value) {
- $this->_check(pn_message_set_first_acquirer($this->impl, bool($value)));
+ $this->_check(pn_message_set_first_acquirer($this->impl, $value));
}
private function _get_delivery_count() {
@@ -448,7 +458,7 @@ class Binary {
}
public function __tostring() {
- return $this->bytes;
+ return "Binary($this->bytes)";
}
}
@@ -462,7 +472,7 @@ class Symbol {
}
public function __tostring() {
- return $this->name;
+ return "Symbol($this->name)";
}
}
@@ -484,7 +494,7 @@ class UUID {
}
-class Lst {
+class PList {
public $elements;
@@ -493,7 +503,7 @@ class Lst {
}
public function __tostring() {
- return "Lst(" . implode(", ", $this->elements) . ")";
+ return "PList(" . implode(", ", $this->elements) . ")";
}
}
@@ -508,6 +518,10 @@ class Described {
$this->value = $value;
}
+ public function __tostring() {
+ return "Described($this->descriptor, $this->value)";
+ }
+
}
class DataException extends ProtonException {}
@@ -535,9 +549,9 @@ class Data {
const BINARY = PN_BINARY;
const STRING = PN_STRING;
const SYMBOL = PN_SYMBOL;
- const DESCRIBED = PN_DESCRIPTOR;
- const ARY = PN_ARRAY;
- const LST = PN_LIST;
+ const DESCRIBED = PN_DESCRIBED;
+ const PARRAY = PN_ARRAY;
+ const PLIST = PN_LIST;
const MAP = PN_MAP;
private $impl;
@@ -920,7 +934,7 @@ class Data {
public function get_php_list() {
if ($this->enter()) {
try {
- $result = new Lst();
+ $result = new PList();
while ($this->next()) {
$result->elements[] = $this->get_object();
}
@@ -980,7 +994,7 @@ class Data {
"integer" => "put_long",
"double" => "put_double",
"Described" => "put_php_described",
- "Lst" => "put_php_list",
+ "PList" => "put_php_list",
"array" => "put_php_map"
);
private $get_mappings = array
@@ -1006,8 +1020,8 @@ class Data {
Data::STRING => "get_string",
Data::SYMBOL => "get_symbol",
Data::DESCRIBED => "get_php_described",
- Data::ARY => "get_php_array",
- Data::LST => "get_php_list",
+ Data::PARRAY => "get_php_array",
+ Data::PLIST => "get_php_list",
Data::MAP => "get_php_map"
);
Modified: qpid/proton/trunk/proton-c/bindings/php/tests.php
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/php/tests.php?rev=1401085&r1=1401084&r2=1401085&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/php/tests.php (original)
+++ qpid/proton/trunk/proton-c/bindings/php/tests.php Mon Oct 22 21:24:00 2012
@@ -2,26 +2,41 @@
include("proton.php");
-$msg = new Message();
-$msg->id = 10;
-$msg->correlation_id = "asdf";
-$msg->properties = array();
-$msg->properties["null"] = null;
-$msg->properties["boolean"] = true;
-$msg->properties["integer"] = 123;
-$msg->properties["double"] = 3.14159;
-$msg->properties["binary"] = new Binary("binary");
-$msg->properties["symbol"] = new Symbol("symbol");
-$msg->properties["uuid"] = new UUID("1234123412341234");
-$msg->properties["list"] = new Lst(1, 2, 3, 4);
-assert($msg->id == 10);
-assert($msg->correlation_id == "asdf");
+function round_trip($body) {
+ $msg = new Message();
+ $msg->inferred = true;
+ $msg->durable = true;
+ $msg->id = 10;
+ $msg->correlation_id = "asdf";
+ $msg->properties = array();
+ $msg->properties["null"] = null;
+ $msg->properties["boolean"] = true;
+ $msg->properties["integer"] = 123;
+ $msg->properties["double"] = 3.14159;
+ $msg->properties["binary"] = new Binary("binary");
+ $msg->properties["symbol"] = new Symbol("symbol");
+ $msg->properties["uuid"] = new UUID("1234123412341234");
+ $msg->properties["list"] = new PList(1, 2, 3, 4);
+ $msg->body = $body;
+ assert($msg->id == 10);
+ assert($msg->correlation_id == "asdf");
-$copy = new Message();
-$copy->decode($msg->encode());
-assert($copy->id == $msg->id);
-assert($copy->correlation_id == $msg->correlation_id);
-$diff = array_diff($msg->properties, $copy->properties);
-assert(count($diff) == 0);
+ $copy = new Message();
+ $copy->decode($msg->encode());
+ assert($copy->id == $msg->id);
+ assert($copy->correlation_id == $msg->correlation_id);
+ $diff = array_diff($msg->properties, $copy->properties);
+ assert($copy->durable == $msg->durable);
+ assert(count($diff) == 0);
+ assert($copy->body == $msg->body);
+}
-?>
\ No newline at end of file
+round_trip("this is a string body");
+round_trip(new Binary("this is a binary body"));
+round_trip(new Symbol("this is a symbol body"));
+round_trip(true);
+round_trip(1234);
+round_trip(3.14159);
+round_trip(array("pi" => 3.14159, "blueberry-pi" => "yummy"));
+
+?>
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1401085&r1=1401084&r2=1401085&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Mon Oct 22 21:24:00
2012
@@ -367,7 +367,7 @@ class Message(object):
inst = Data(pn_message_instructions(self._msg))
ann = Data(pn_message_annotations(self._msg))
props = Data(pn_message_properties(self._msg))
-# body = Data(pn_message_body(self._msg))
+ body = Data(pn_message_body(self._msg))
inst.clear()
if self.instructions is not None:
@@ -378,12 +378,14 @@ class Message(object):
props.clear()
if self.properties is not None:
props.put_object(self.properties)
+ if self.body is not None:
+ body.put_object(self.body)
def _post_decode(self):
inst = Data(pn_message_instructions(self._msg))
ann = Data(pn_message_annotations(self._msg))
props = Data(pn_message_properties(self._msg))
-# body = Data(pn_message_body(self._msg))
+ body = Data(pn_message_body(self._msg))
if inst.next():
self.instructions = inst.get_object()
@@ -397,6 +399,10 @@ class Message(object):
self.properties = props.get_object()
else:
self.properties = None
+ if body.next():
+ self.body = body.get_object()
+ else:
+ self.body = None
def clear(self):
"""
@@ -409,6 +415,14 @@ class Message(object):
self.properties = None
self.body = None
+ def _is_inferred(self):
+ return pn_message_is_inferred(self._msg)
+
+ def _set_inferred(self, value):
+ self._check(pn_message_set_inferred(self._msg, bool(value)))
+
+ inferred = property(_is_inferred, _set_inferred)
+
def _is_durable(self):
return pn_message_is_durable(self._msg)
@@ -1430,6 +1444,7 @@ class Data:
if getter:
return getter(self)
else:
+ self.dump()
return UnmappedType(str(type))
Modified: qpid/proton/trunk/proton-c/include/proton/message.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/message.h?rev=1401085&r1=1401084&r2=1401085&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/message.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/message.h Mon Oct 22 21:24:00 2012
@@ -32,6 +32,7 @@ extern "C" {
#endif
typedef struct pn_message_t pn_message_t;
+
typedef enum {
PN_DATA,
PN_TEXT,
@@ -48,6 +49,9 @@ void pn_message_clear(pn_messa
int pn_message_errno(pn_message_t *msg);
const char * pn_message_error(pn_message_t *msg);
+bool pn_message_is_inferred(pn_message_t *msg);
+int pn_message_set_inferred(pn_message_t *msg, bool inferred);
+
// standard message headers and properties
bool pn_message_is_durable (pn_message_t *msg);
int pn_message_set_durable (pn_message_t *msg, bool
durable);
Modified: qpid/proton/trunk/proton-c/src/message/message.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/message/message.c?rev=1401085&r1=1401084&r2=1401085&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/message/message.c (original)
+++ qpid/proton/trunk/proton-c/src/message/message.c Mon Oct 22 21:24:00 2012
@@ -66,6 +66,7 @@ struct pn_message_t {
pn_sequence_t group_sequence;
pn_buffer_t *reply_to_group_id;
+ bool inferred;
pn_data_t *data;
pn_data_t *instructions;
pn_data_t *annotations;
@@ -98,11 +99,14 @@ pn_message_t *pn_message()
msg->group_id = NULL;
msg->group_sequence = 0;
msg->reply_to_group_id = NULL;
+
+ msg->inferred = false;
msg->data = pn_data(16);
msg->instructions = pn_data(16);
msg->annotations = pn_data(16);
msg->properties = pn_data(16);
msg->body = pn_data(16);
+
msg->format = PN_DATA;
msg->parser = NULL;
msg->error = pn_error();
@@ -153,6 +157,7 @@ void pn_message_clear(pn_message_t *msg)
if (msg->group_id) pn_buffer_clear(msg->group_id);
msg->group_sequence = 0;
if (msg->reply_to_group_id) pn_buffer_clear(msg->reply_to_group_id);
+ msg->inferred = false;
pn_data_clear(msg->data);
pn_data_clear(msg->instructions);
pn_data_clear(msg->annotations);
@@ -178,6 +183,18 @@ const char *pn_message_error(pn_message_
}
}
+bool pn_message_is_inferred(pn_message_t *msg)
+{
+ return msg ? msg->inferred : false;
+}
+
+int pn_message_set_inferred(pn_message_t *msg, bool inferred)
+{
+ if (!msg) return PN_ARG_ERR;
+ msg->inferred = inferred;
+ return 0;
+}
+
pn_parser_t *pn_message_parser(pn_message_t *msg)
{
if (!msg->parser) {
@@ -512,6 +529,13 @@ int pn_message_decode(pn_message_t *msg,
err = pn_data_copy(msg->properties, msg->data);
if (err) return err;
break;
+ case DATA:
+ case AMQP_SEQUENCE:
+ case AMQP_VALUE:
+ pn_data_narrow(msg->data);
+ err = pn_data_copy(msg->body, msg->data);
+ if (err) return err;
+ break;
default:
err = pn_data_copy(msg->body, msg->data);
if (err) return err;
@@ -526,12 +550,6 @@ int pn_message_decode(pn_message_t *msg,
int pn_message_encode(pn_message_t *msg, char *bytes, size_t *size)
{
if (!msg || !bytes || !size || !*size) return PN_ARG_ERR;
- if (!msg->data) {
- msg->data = pn_data(64);
- }
- if (!msg->body) {
- msg->body = pn_data(64);
- }
pn_data_clear(msg->data);
@@ -596,23 +614,41 @@ int pn_message_encode(pn_message_t *msg,
pn_data_exit(msg->data);
}
- size_t remaining = *size;
- ssize_t encoded;
-
- pn_data_t *sections[2] = {
- msg->data, msg->body
- };
-
- for (int i = 0; i < 2; i++) {
- encoded = pn_data_encode(sections[i], bytes, remaining);
- if (encoded < 0)
- return pn_error_format(msg->error, encoded, "data error: %s",
- pn_data_error(sections[i]));
+ if (pn_data_size(msg->body)) {
+ pn_data_rewind(msg->body);
+ pn_data_next(msg->body);
+ pn_type_t body_type = pn_data_type(msg->body);
+ pn_data_rewind(msg->body);
- bytes += encoded;
- remaining -= encoded;
+ pn_data_put_described(msg->data);
+ pn_data_enter(msg->data);
+ if (msg->inferred) {
+ switch (body_type) {
+ case PN_BINARY:
+ pn_data_put_ulong(msg->data, DATA);
+ break;
+ case PN_LIST:
+ pn_data_put_ulong(msg->data, AMQP_SEQUENCE);
+ break;
+ default:
+ pn_data_put_ulong(msg->data, AMQP_VALUE);
+ break;
+ }
+ } else {
+ pn_data_put_ulong(msg->data, AMQP_VALUE);
+ }
+ pn_data_append(msg->data, msg->body);
}
+ size_t remaining = *size;
+ ssize_t encoded = pn_data_encode(msg->data, bytes, remaining);
+ if (encoded < 0)
+ return pn_error_format(msg->error, encoded, "data error: %s",
+ pn_data_error(msg->data));
+
+ bytes += encoded;
+ remaining -= encoded;
+
*size -= remaining;
pn_data_clear(msg->data);
@@ -650,12 +686,9 @@ int pn_message_load(pn_message_t *msg, c
int pn_message_load_data(pn_message_t *msg, const char *data, size_t size)
{
if (!msg) return PN_ARG_ERR;
- if (!msg->body) {
- msg->body = pn_data(64);
- }
pn_data_clear(msg->body);
- int err = pn_data_fill(msg->body, "DLz", DATA, size, data);
+ int err = pn_data_fill(msg->body, "z", size, data);
if (err) {
return pn_error_format(msg->error, err, "data error: %s",
pn_data_error(msg->body));
@@ -667,12 +700,9 @@ int pn_message_load_data(pn_message_t *m
int pn_message_load_text(pn_message_t *msg, const char *data, size_t size)
{
if (!msg) return PN_ARG_ERR;
- if (!msg->body) {
- msg->body = pn_data(64);
- }
pn_data_clear(msg->body);
- int err = pn_data_fill(msg->body, "DLS", AMQP_VALUE, data);
+ int err = pn_data_fill(msg->body, "S", data);
if (err) {
return pn_error_format(msg->error, err, "data error: %s",
pn_data_error(msg->body));
@@ -685,10 +715,6 @@ int pn_message_load_amqp(pn_message_t *m
{
if (!msg) return PN_ARG_ERR;
- if (!msg->body) {
- msg->body = pn_data(64);
- }
-
pn_parser_t *parser = pn_message_parser(msg);
pn_data_clear(msg->body);
@@ -733,13 +759,12 @@ int pn_message_save_data(pn_message_t *m
return 0;
}
- uint64_t desc;
- pn_bytes_t bytes;
bool scanned;
- int err = pn_data_scan(msg->body, "DL?z", &desc, &scanned, &bytes);
+ pn_bytes_t bytes;
+ int err = pn_data_scan(msg->body, "?z", &scanned, &bytes);
if (err) return pn_error_format(msg->error, err, "data error: %s",
pn_data_error(msg->body));
- if (desc == DATA && scanned) {
+ if (scanned) {
if (bytes.size > *size) {
return PN_OVERFLOW;
} else {
@@ -756,28 +781,31 @@ int pn_message_save_text(pn_message_t *m
{
if (!msg) return PN_ARG_ERR;
- if (pn_data_size(msg->body) == 0) {
- *size = 0;
- return 0;
- }
-
- uint64_t desc;
- pn_bytes_t str = {0,0};
- bool scanned, dscanned;
- int err = pn_data_scan(msg->body, "?DL?S", &dscanned, &desc, &scanned, &str);
- if (err) return pn_error_format(msg->error, err, "data error: %s",
- pn_data_error(msg->body));
- if (dscanned && desc == AMQP_VALUE) {
- if (scanned && str.size >= *size) {
- return PN_OVERFLOW;
- } else {
- memcpy(data, str.start, str.size);
- data[str.size] = '\0';
- *size = str.size;
+ pn_data_rewind(msg->body);
+ if (pn_data_next(msg->body)) {
+ switch (pn_data_type(msg->body)) {
+ case PN_STRING:
+ {
+ pn_bytes_t str = pn_data_get_bytes(msg->body);
+ if (str.size >= *size) {
+ return PN_OVERFLOW;
+ } else {
+ memcpy(data, str.start, str.size);
+ data[str.size] = '\0';
+ *size = str.size;
+ return 0;
+ }
+ }
+ break;
+ case PN_NULL:
+ *size = 0;
return 0;
+ default:
+ return PN_STATE_ERR;
}
} else {
- return PN_STATE_ERR;
+ *size = 0;
+ return 0;
}
}
Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1401085&r1=1401084&r2=1401085&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Mon Oct 22 21:24:00 2012
@@ -540,7 +540,7 @@ static void outward_munge(pn_messenger_t
int len = address ? strlen(address) : 0;
if (len > 1 && address[0] == '~' && address[1] == '/') {
char buf[len + strlen(mng->name) + 9];
- sprintf(buf, "amqp://%s/%s", mng->name, address);
+ sprintf(buf, "amqp://%s/%s", mng->name, address + 2);
pn_message_set_reply_to(msg, buf);
} else if (len == 0) {
char buf[strlen(mng->name) + 8];
Modified: qpid/proton/trunk/tests/proton_tests/message.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/message.py?rev=1401085&r1=1401084&r2=1401085&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/message.py (original)
+++ qpid/proton/trunk/tests/proton_tests/message.py Mon Oct 22 21:24:00 2012
@@ -174,4 +174,4 @@ class LoadSaveTest(Test):
self.msg.clear()
self.msg.load(None)
saved = self.msg.save()
- assert saved == ""
+ assert saved == "", repr(saved)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]