Author: rhs
Date: Sat Jun 23 18:46:28 2012
New Revision: 1353174
URL: http://svn.apache.org/viewvc?rev=1353174&view=rev
Log:
added support for data and text messages
Modified:
qpid/proton/trunk/examples/messenger/recv.py
qpid/proton/trunk/examples/messenger/send.py
qpid/proton/trunk/proton-c/bindings/python/python.i
qpid/proton/trunk/proton-c/include/proton/codec.h
qpid/proton/trunk/proton-c/include/proton/message.h
qpid/proton/trunk/proton-c/src/codec/codec.c
qpid/proton/trunk/proton-c/src/message/message.c
qpid/proton/trunk/tests/proton_tests/message.py
Modified: qpid/proton/trunk/examples/messenger/recv.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/messenger/recv.py?rev=1353174&r1=1353173&r2=1353174&view=diff
==============================================================================
--- qpid/proton/trunk/examples/messenger/recv.py (original)
+++ qpid/proton/trunk/examples/messenger/recv.py Sat Jun 23 18:46:28 2012
@@ -20,7 +20,7 @@
import sys, optparse
from xproton import *
-parser = optparse.OptionParser(usage="usage: %prog [options] addr_1 ...
addr_n",
+parser = optparse.OptionParser(usage="usage: %prog [options] <addr_1> ...
<addr_n>",
description="simple message receiver")
opts, args = parser.parse_args()
@@ -45,7 +45,10 @@ while True:
if pn_messenger_get(mng, msg):
print pn_messenger_error(mng)
else:
- print "%s: %s" % (pn_message_get_address(msg),
pn_message_get_subject(msg))
+ cd, body = pn_message_save(msg, 1024)
+ print pn_message_get_address(msg), \
+ pn_message_get_subject(msg) or "(no subject)", \
+ body
pn_messenger_stop(mng)
pn_messenger_free(mng)
Modified: qpid/proton/trunk/examples/messenger/send.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/messenger/send.py?rev=1353174&r1=1353173&r2=1353174&view=diff
==============================================================================
--- qpid/proton/trunk/examples/messenger/send.py (original)
+++ qpid/proton/trunk/examples/messenger/send.py Sat Jun 23 18:46:28 2012
@@ -20,7 +20,7 @@
import sys, optparse
from xproton import *
-parser = optparse.OptionParser(usage="usage: %prog [options] msg_1 ... msg_n",
+parser = optparse.OptionParser(usage="usage: %prog [options] <msg_1> ...
<msg_n>",
description="simple message sender")
parser.add_option("-a", "--address", default="//0.0.0.0",
help="address: //<domain>[/<name>] (default %default)")
@@ -35,7 +35,7 @@ pn_messenger_start(mng)
msg = pn_message()
for m in args:
pn_message_set_address(msg, opts.address)
- pn_message_set_subject(msg, m)
+ pn_message_load(msg, m)
if pn_messenger_put(mng, msg):
print pn_messenger_error(mng)
break
Modified: qpid/proton/trunk/proton-c/bindings/python/python.i
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/python.i?rev=1353174&r1=1353173&r2=1353174&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/python.i (original)
+++ qpid/proton/trunk/proton-c/bindings/python/python.i Sat Jun 23 18:46:28 2012
@@ -37,12 +37,39 @@ typedef int int32_t;
$result = PyString_FromStringAndSize($1.start, $1.size);
}
+int pn_message_load(pn_message_t *msg, char *STRING, size_t LENGTH);
+%ignore pn_message_load;
+
+int pn_message_load_data(pn_message_t *msg, char *STRING, size_t LENGTH);
+%ignore pn_message_load_data;
+
+int pn_message_load_text(pn_message_t *msg, char *STRING, size_t LENGTH);
+%ignore pn_message_load_text;
+
+int pn_message_load_amqp(pn_message_t *msg, char *STRING, size_t LENGTH);
+%ignore pn_message_load_amqp;
+
+int pn_message_load_json(pn_message_t *msg, char *STRING, size_t LENGTH);
+%ignore pn_message_load_json;
+
int pn_message_encode(pn_message_t *msg, char *OUTPUT, size_t *OUTPUT_SIZE);
%ignore pn_message_encode;
int pn_message_save(pn_message_t *msg, char *OUTPUT, size_t *OUTPUT_SIZE);
%ignore pn_message_save;
+int pn_message_save_data(pn_message_t *msg, char *OUTPUT, size_t *OUTPUT_SIZE);
+%ignore pn_message_save_data;
+
+int pn_message_save_text(pn_message_t *msg, char *OUTPUT, size_t *OUTPUT_SIZE);
+%ignore pn_message_save_text;
+
+int pn_message_save_amqp(pn_message_t *msg, char *OUTPUT, size_t *OUTPUT_SIZE);
+%ignore pn_message_save_amqp;
+
+int pn_message_save_json(pn_message_t *msg, char *OUTPUT, size_t *OUTPUT_SIZE);
+%ignore pn_message_save_json;
+
ssize_t pn_send(pn_link_t *transport, char *STRING, size_t LENGTH);
%ignore pn_send;
Modified: qpid/proton/trunk/proton-c/include/proton/codec.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/codec.h?rev=1353174&r1=1353173&r2=1353174&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/codec.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/codec.h Sat Jun 23 18:46:28 2012
@@ -106,6 +106,7 @@ int pn_data_clear(pn_data_t *data);
int pn_data_grow(pn_data_t *data);
int pn_data_decode(pn_data_t *data, char *bytes, size_t *size);
int pn_data_encode(pn_data_t *data, char *bytes, size_t *size);
+int pn_data_intern(pn_data_t *data);
int pn_data_vfill(pn_data_t *data, const char *fmt, va_list ap);
int pn_data_fill(pn_data_t *data, const char *fmt, ...);
int pn_data_vscan(pn_data_t *data, const char *fmt, va_list ap);
Modified: qpid/proton/trunk/proton-c/include/proton/message.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/message.h?rev=1353174&r1=1353173&r2=1353174&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/message.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/message.h Sat Jun 23 18:46:28 2012
@@ -29,6 +29,8 @@
typedef struct pn_message_t pn_message_t;
typedef enum {
+ PN_DATA,
+ PN_TEXT,
PN_AMQP,
PN_JSON
} pn_format_t;
@@ -116,8 +118,17 @@ int pn_message_set_json
pn_format_t pn_message_get_format(pn_message_t *message);
int pn_message_set_format(pn_message_t *message, pn_format_t format);
-int pn_message_load(pn_message_t *message, const char *data);
+int pn_message_load(pn_message_t *message, const char *data, size_t size);
+int pn_message_load_data(pn_message_t *message, const char *data, size_t size);
+int pn_message_load_text(pn_message_t *message, const char *data, size_t size);
+int pn_message_load_amqp(pn_message_t *message, const char *data, size_t size);
+int pn_message_load_json(pn_message_t *message, const char *data, size_t size);
+
int pn_message_save(pn_message_t *message, char *data, size_t *size);
+int pn_message_save_data(pn_message_t *message, char *data, size_t *size);
+int pn_message_save_text(pn_message_t *message, char *data, size_t *size);
+int pn_message_save_amqp(pn_message_t *message, char *data, size_t *size);
+int pn_message_save_json(pn_message_t *message, char *data, size_t *size);
// TODO:
// bind vars
Modified: qpid/proton/trunk/proton-c/src/codec/codec.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/codec/codec.c?rev=1353174&r1=1353173&r2=1353174&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/codec/codec.c (original)
+++ qpid/proton/trunk/proton-c/src/codec/codec.c Sat Jun 23 18:46:28 2012
@@ -1701,6 +1701,45 @@ int pn_data_encode(pn_data_t *data, char
return 0;
}
+int pn_data_intern_bytes(pn_data_t *data, pn_bytes_t *bytes)
+{
+ pn_bytes_t prev = pn_buffer_bytes(data->buf);
+ int err = pn_buffer_append(data->buf, bytes->start, bytes->size);
+ if (err) return err;
+ bytes->start = prev.start + prev.size;
+ return 0;
+}
+
+int pn_data_intern(pn_data_t *data)
+{
+ if (!data) return PN_ARG_ERR;
+ if (!data->buf) {
+ data->buf = pn_buffer(64);
+ }
+
+ for (int i = 0; i < data->size; i++) {
+ pn_atom_t *atom = data->atoms + i;
+ pn_bytes_t *bytes;
+ switch (atom->type) {
+ case PN_BINARY:
+ bytes = &atom->u.as_binary;
+ break;
+ case PN_STRING:
+ bytes = &atom->u.as_string;
+ break;
+ case PN_SYMBOL:
+ bytes = &atom->u.as_symbol;
+ break;
+ default:
+ continue;
+ }
+ int err = pn_data_intern_bytes(data, bytes);
+ if (err) return err;
+ }
+
+ return 0;
+}
+
int pn_data_vfill(pn_data_t *data, const char *fmt, va_list ap)
{
pn_atoms_t atoms;
Modified: qpid/proton/trunk/proton-c/src/message/message.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/message/message.c?rev=1353174&r1=1353173&r2=1353174&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/message/message.c (original)
+++ qpid/proton/trunk/proton-c/src/message/message.c Sat Jun 23 18:46:28 2012
@@ -95,7 +95,7 @@ pn_message_t *pn_message()
msg->reply_to_group_id = NULL;
msg->data = NULL;
msg->body = NULL;
- msg->format = PN_AMQP;
+ msg->format = PN_DATA;
msg->parser = NULL;
return msg;
}
@@ -475,6 +475,8 @@ int pn_message_decode(pn_message_t *msg,
pn_data_t *data = msg->body;
msg->body = msg->data;
msg->data = data;
+ err = pn_data_intern(msg->body);
+ if (err) return err;
}
break;
}
@@ -548,7 +550,45 @@ int pn_message_set_format(pn_message_t *
return 0;
}
-int pn_message_load(pn_message_t *msg, const char *data)
+int pn_message_load(pn_message_t *msg, const char *data, size_t size)
+{
+ if (!msg) return PN_ARG_ERR;
+
+ switch (msg->format) {
+ case PN_DATA: return pn_message_load_data(msg, data, size);
+ case PN_TEXT: return pn_message_load_text(msg, data, size);
+ case PN_AMQP: return pn_message_load_amqp(msg, data, size);
+ case PN_JSON: return pn_message_load_json(msg, data, size);
+ }
+
+ return PN_STATE_ERR;
+}
+
+int pn_message_load_data(pn_message_t *msg, const char *data, size_t size)
+{
+ if (!msg) return PN_ARG_ERR;
+ if (!msg->body) {
+ msg->body = pn_data(64);
+ }
+
+ int err = pn_data_fill(msg->body, "DLz", DATA, size, data);
+ if (err) return err;
+ return pn_data_intern(msg->body);
+}
+
+int pn_message_load_text(pn_message_t *msg, const char *data, size_t size)
+{
+ if (!msg) return PN_ARG_ERR;
+ if (!msg->body) {
+ msg->body = pn_data(64);
+ }
+
+ int err = pn_data_fill(msg->body, "DLS", AMQP_VALUE, data);
+ if (err) return err;
+ return pn_data_intern(msg->body);
+}
+
+int pn_message_load_amqp(pn_message_t *msg, const char *data, size_t size)
{
if (!msg) return PN_ARG_ERR;
@@ -574,10 +614,88 @@ int pn_message_load(pn_message_t *msg, c
}
}
+int pn_message_load_json(pn_message_t *msg, const char *data, size_t size)
+{
+ if (!msg) return PN_ARG_ERR;
+
+ // XXX: unsupported format
+
+ return PN_ERR;
+}
+
int pn_message_save(pn_message_t *msg, char *data, size_t *size)
{
if (!msg) return PN_ARG_ERR;
+ switch (msg->format) {
+ case PN_DATA: return pn_message_save_data(msg, data, size);
+ case PN_TEXT: return pn_message_save_text(msg, data, size);
+ case PN_AMQP: return pn_message_save_amqp(msg, data, size);
+ case PN_JSON: return pn_message_save_json(msg, data, size);
+ }
+
+ return PN_STATE_ERR;
+}
+
+int pn_message_save_data(pn_message_t *msg, char *data, size_t *size)
+{
+ if (!msg) return PN_ARG_ERR;
+
+ if (!msg->body) {
+ *size = 0;
+ return 0;
+ }
+
+ uint64_t desc;
+ pn_bytes_t bytes;
+ bool scanned;
+ int err = pn_data_scan(msg->body, "DL?z", &desc, &scanned, &bytes);
+ if (err) return err;
+ if (desc == DATA && scanned) {
+ if (bytes.size > *size) {
+ return PN_OVERFLOW;
+ } else {
+ memcpy(data, bytes.start, bytes.size);
+ *size = bytes.size;
+ return 0;
+ }
+ } else {
+ return PN_STATE_ERR;
+ }
+}
+
+int pn_message_save_text(pn_message_t *msg, char *data, size_t *size)
+{
+ if (!msg) return PN_ARG_ERR;
+
+ if (!msg->body) {
+ *size = 0;
+ return 0;
+ }
+
+ uint64_t desc;
+ pn_bytes_t str;
+ bool scanned;
+ int err = pn_data_scan(msg->body, "DL?S", &desc, &scanned, &str);
+ if (err) return err;
+ if (desc == AMQP_VALUE && scanned) {
+ if (str.size >= *size) {
+ return PN_OVERFLOW;
+ } else {
+ memcpy(data, str.start, str.size);
+ data[str.size] = '\0';
+ *size = str.size;
+ return 0;
+ }
+ } else {
+ return PN_STATE_ERR;
+ }
+}
+
+int pn_message_save_amqp(pn_message_t *msg, char *data, size_t *size)
+{
+ if (!msg) return PN_ARG_ERR;
+
if (!msg->body) {
*size = 0;
return 0;
@@ -585,3 +703,12 @@ int pn_message_save(pn_message_t *msg, c
return pn_data_format(msg->body, data, size);
}
+
+int pn_message_save_json(pn_message_t *msg, char *data, size_t *size)
+{
+ if (!msg) return PN_ARG_ERR;
+
+ // XXX: unsupported format
+
+ return PN_ERR;
+}
Modified: qpid/proton/trunk/tests/proton_tests/message.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/message.py?rev=1353174&r1=1353173&r2=1353174&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/message.py (original)
+++ qpid/proton/trunk/tests/proton_tests/message.py Sat Jun 23 18:46:28 2012
@@ -108,7 +108,7 @@ class CodecTest(Test):
assert not pn_message_set_priority(self.msg, 100)
assert not pn_message_set_address(self.msg, "address")
assert not pn_message_set_subject(self.msg, "subject")
- body = '"test body"'
+ body = 'Hello World!'
rc = pn_message_load(self.msg, body)
assert not rc, rc
@@ -127,9 +127,11 @@ class CodecTest(Test):
assert not cd, cd
assert saved == body, (body, saved)
-class ParserTest(Test):
- def _test(self, *bodies):
+class LoadSaveTest(Test):
+
+ def _test(self, fmt, *bodies):
+ pn_message_set_format(self.msg, fmt)
for body in bodies:
pn_message_clear(self.msg)
cd, saved = pn_message_save(self.msg, 1024)
@@ -141,26 +143,32 @@ class ParserTest(Test):
assert saved == body, (body, saved)
def testIntegral(self):
- self._test("0", "1", "-1", "9223372036854775807")
+ self._test(PN_AMQP, "0", "1", "-1", "9223372036854775807")
def testFloating(self):
- self._test("1.1", "3.14159", "-3.14159", "-1.1")
+ self._test(PN_AMQP, "1.1", "3.14159", "-3.14159", "-1.1")
def testSymbol(self):
- self._test(':symbol', ':"quoted symbol"')
+ self._test(PN_AMQP, ':symbol', ':"quoted symbol"')
def testString(self):
- self._test('"string"', '"string with spaces"')
+ self._test(PN_AMQP, '"string"', '"string with spaces"')
def testBinary(self):
- self._test('b"binary"', 'b"binary with spaces and special values:
\\x00\\x01\\x02"')
+ self._test(PN_AMQP, 'b"binary"', 'b"binary with spaces and special values:
\\x00\\x01\\x02"')
def testMap(self):
- self._test('{"one"=1, :two=2, :pi=3.14159}', '{[1, 2, 3]=[3, 2, 1],
{1=2}={3=4}}')
+ self._test(PN_AMQP, '{"one"=1, :two=2, :pi=3.14159}', '{[1, 2, 3]=[3, 2,
1], {1=2}={3=4}}')
def testList(self):
- self._test('[1, 2, 3]', '["one", "two", "three"]', '[:one, 2, 3.14159]',
+ self._test(PN_AMQP, '[1, 2, 3]', '["one", "two", "three"]', '[:one, 2,
3.14159]',
'[{1=2}, {3=4}, {5=6}]')
def testDescriptor(self):
- self._test('@21 ["one", 2, "three", @:url "http://example.org"]')
+ self._test(PN_AMQP, '@21 ["one", 2, "three", @:url "http://example.org"]')
+
+ def testData(self):
+ self._test(PN_DATA, "this is data\x00\x01\x02 blah blah")
+
+ def testText(self):
+ self._test(PN_TEXT, "this is a text string")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]