Author: rhs
Date: Sat Jun 23 18:46:28 2012
New Revision: 1353174

URL: http://svn.apache.org/viewvc?rev=1353174&view=rev
Log:
added support for data and text messages

Modified:
    qpid/proton/trunk/examples/messenger/recv.py
    qpid/proton/trunk/examples/messenger/send.py
    qpid/proton/trunk/proton-c/bindings/python/python.i
    qpid/proton/trunk/proton-c/include/proton/codec.h
    qpid/proton/trunk/proton-c/include/proton/message.h
    qpid/proton/trunk/proton-c/src/codec/codec.c
    qpid/proton/trunk/proton-c/src/message/message.c
    qpid/proton/trunk/tests/proton_tests/message.py

Modified: qpid/proton/trunk/examples/messenger/recv.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/messenger/recv.py?rev=1353174&r1=1353173&r2=1353174&view=diff
==============================================================================
--- qpid/proton/trunk/examples/messenger/recv.py (original)
+++ qpid/proton/trunk/examples/messenger/recv.py Sat Jun 23 18:46:28 2012
@@ -20,7 +20,7 @@
 import sys, optparse
 from xproton import *
 
-parser = optparse.OptionParser(usage="usage: %prog [options] addr_1 ... 
addr_n",
+parser = optparse.OptionParser(usage="usage: %prog [options] <addr_1> ... 
<addr_n>",
                                description="simple message receiver")
 
 opts, args = parser.parse_args()
@@ -45,7 +45,10 @@ while True:
     if pn_messenger_get(mng, msg):
       print pn_messenger_error(mng)
     else:
-      print "%s: %s" % (pn_message_get_address(msg), 
pn_message_get_subject(msg))
+      cd, body = pn_message_save(msg, 1024)
+      print pn_message_get_address(msg), \
+          pn_message_get_subject(msg) or "(no subject)", \
+          body
 
 pn_messenger_stop(mng)
 pn_messenger_free(mng)

Modified: qpid/proton/trunk/examples/messenger/send.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/examples/messenger/send.py?rev=1353174&r1=1353173&r2=1353174&view=diff
==============================================================================
--- qpid/proton/trunk/examples/messenger/send.py (original)
+++ qpid/proton/trunk/examples/messenger/send.py Sat Jun 23 18:46:28 2012
@@ -20,7 +20,7 @@
 import sys, optparse
 from xproton import *
 
-parser = optparse.OptionParser(usage="usage: %prog [options] msg_1 ... msg_n",
+parser = optparse.OptionParser(usage="usage: %prog [options] <msg_1> ... 
<msg_n>",
                                description="simple message sender")
 parser.add_option("-a", "--address", default="//0.0.0.0",
                   help="address: //<domain>[/<name>] (default %default)")
@@ -35,7 +35,7 @@ pn_messenger_start(mng)
 msg = pn_message()
 for m in args:
   pn_message_set_address(msg, opts.address)
-  pn_message_set_subject(msg, m)
+  pn_message_load(msg, m)
   if pn_messenger_put(mng, msg):
     print pn_messenger_error(mng)
     break

Modified: qpid/proton/trunk/proton-c/bindings/python/python.i
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/python.i?rev=1353174&r1=1353173&r2=1353174&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/python.i (original)
+++ qpid/proton/trunk/proton-c/bindings/python/python.i Sat Jun 23 18:46:28 2012
@@ -37,12 +37,39 @@ typedef int int32_t;
   $result = PyString_FromStringAndSize($1.start, $1.size);
 }
 
+int pn_message_load(pn_message_t *msg, char *STRING, size_t LENGTH);
+%ignore pn_message_load;
+
+int pn_message_load_data(pn_message_t *msg, char *STRING, size_t LENGTH);
+%ignore pn_message_load_data;
+
+int pn_message_load_text(pn_message_t *msg, char *STRING, size_t LENGTH);
+%ignore pn_message_load_text;
+
+int pn_message_load_amqp(pn_message_t *msg, char *STRING, size_t LENGTH);
+%ignore pn_message_load_amqp;
+
+int pn_message_load_json(pn_message_t *msg, char *STRING, size_t LENGTH);
+%ignore pn_message_load_json;
+
 int pn_message_encode(pn_message_t *msg, char *OUTPUT, size_t *OUTPUT_SIZE);
 %ignore pn_message_encode;
 
 int pn_message_save(pn_message_t *msg, char *OUTPUT, size_t *OUTPUT_SIZE);
 %ignore pn_message_save;
 
+int pn_message_save_data(pn_message_t *msg, char *OUTPUT, size_t *OUTPUT_SIZE);
+%ignore pn_message_save_data;
+
+int pn_message_save_text(pn_message_t *msg, char *OUTPUT, size_t *OUTPUT_SIZE);
+%ignore pn_message_save_text;
+
+int pn_message_save_amqp(pn_message_t *msg, char *OUTPUT, size_t *OUTPUT_SIZE);
+%ignore pn_message_save_amqp;
+
+int pn_message_save_json(pn_message_t *msg, char *OUTPUT, size_t *OUTPUT_SIZE);
+%ignore pn_message_save_json;
+
 ssize_t pn_send(pn_link_t *transport, char *STRING, size_t LENGTH);
 %ignore pn_send;
 

Modified: qpid/proton/trunk/proton-c/include/proton/codec.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/codec.h?rev=1353174&r1=1353173&r2=1353174&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/codec.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/codec.h Sat Jun 23 18:46:28 2012
@@ -106,6 +106,7 @@ int pn_data_clear(pn_data_t *data);
 int pn_data_grow(pn_data_t *data);
 int pn_data_decode(pn_data_t *data, char *bytes, size_t *size);
 int pn_data_encode(pn_data_t *data, char *bytes, size_t *size);
+int pn_data_intern(pn_data_t *data);
 int pn_data_vfill(pn_data_t *data, const char *fmt, va_list ap);
 int pn_data_fill(pn_data_t *data, const char *fmt, ...);
 int pn_data_vscan(pn_data_t *data, const char *fmt, va_list ap);

Modified: qpid/proton/trunk/proton-c/include/proton/message.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/message.h?rev=1353174&r1=1353173&r2=1353174&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/message.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/message.h Sat Jun 23 18:46:28 2012
@@ -29,6 +29,8 @@
 
 typedef struct pn_message_t pn_message_t;
 typedef enum {
+  PN_DATA,
+  PN_TEXT,
   PN_AMQP,
   PN_JSON
 } pn_format_t;
@@ -116,8 +118,17 @@ int            pn_message_set_json      
 pn_format_t pn_message_get_format(pn_message_t *message);
 int pn_message_set_format(pn_message_t *message, pn_format_t format);
 
-int pn_message_load(pn_message_t *message, const char *data);
+int pn_message_load(pn_message_t *message, const char *data, size_t size);
+int pn_message_load_data(pn_message_t *message, const char *data, size_t size);
+int pn_message_load_text(pn_message_t *message, const char *data, size_t size);
+int pn_message_load_amqp(pn_message_t *message, const char *data, size_t size);
+int pn_message_load_json(pn_message_t *message, const char *data, size_t size);
+
 int pn_message_save(pn_message_t *message, char *data, size_t *size);
+int pn_message_save_data(pn_message_t *message, char *data, size_t *size);
+int pn_message_save_text(pn_message_t *message, char *data, size_t *size);
+int pn_message_save_amqp(pn_message_t *message, char *data, size_t *size);
+int pn_message_save_json(pn_message_t *message, char *data, size_t *size);
 
 // TODO:
 // bind vars

Modified: qpid/proton/trunk/proton-c/src/codec/codec.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/codec/codec.c?rev=1353174&r1=1353173&r2=1353174&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/codec/codec.c (original)
+++ qpid/proton/trunk/proton-c/src/codec/codec.c Sat Jun 23 18:46:28 2012
@@ -1701,6 +1701,45 @@ int pn_data_encode(pn_data_t *data, char
   return 0;
 }
 
+int pn_data_intern_bytes(pn_data_t *data, pn_bytes_t *bytes)
+{
+  pn_bytes_t prev = pn_buffer_bytes(data->buf);
+  int err = pn_buffer_append(data->buf, bytes->start, bytes->size);
+  if (err) return err;
+  bytes->start = prev.start + prev.size;
+  return 0;
+}
+
+int pn_data_intern(pn_data_t *data)
+{
+  if (!data) return PN_ARG_ERR;
+  if (!data->buf) {
+    data->buf = pn_buffer(64);
+  }
+
+  for (int i = 0; i < data->size; i++) {
+    pn_atom_t *atom = data->atoms + i;
+    pn_bytes_t *bytes;
+    switch (atom->type) {
+    case PN_BINARY:
+      bytes = &atom->u.as_binary;
+      break;
+    case PN_STRING:
+      bytes = &atom->u.as_string;
+      break;
+    case PN_SYMBOL:
+      bytes = &atom->u.as_symbol;
+      break;
+    default:
+      continue;
+    }
+    int err = pn_data_intern_bytes(data, bytes);
+    if (err) return err;
+  }
+
+  return 0;
+}
+
 int pn_data_vfill(pn_data_t *data, const char *fmt, va_list ap)
 {
   pn_atoms_t atoms;

Modified: qpid/proton/trunk/proton-c/src/message/message.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/message/message.c?rev=1353174&r1=1353173&r2=1353174&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/message/message.c (original)
+++ qpid/proton/trunk/proton-c/src/message/message.c Sat Jun 23 18:46:28 2012
@@ -95,7 +95,7 @@ pn_message_t *pn_message()
   msg->reply_to_group_id = NULL;
   msg->data = NULL;
   msg->body = NULL;
-  msg->format = PN_AMQP;
+  msg->format = PN_DATA;
   msg->parser = NULL;
   return msg;
 }
@@ -475,6 +475,8 @@ int pn_message_decode(pn_message_t *msg,
         pn_data_t *data = msg->body;
         msg->body = msg->data;
         msg->data = data;
+        err = pn_data_intern(msg->body);
+        if (err) return err;
       }
       break;
     }
@@ -548,7 +550,45 @@ int pn_message_set_format(pn_message_t *
   return 0;
 }
 
-int pn_message_load(pn_message_t *msg, const char *data)
+int pn_message_load(pn_message_t *msg, const char *data, size_t size)
+{
+  if (!msg) return PN_ARG_ERR;
+
+  switch (msg->format) {
+  case PN_DATA: return pn_message_load_data(msg, data, size);
+  case PN_TEXT: return pn_message_load_text(msg, data, size);
+  case PN_AMQP: return pn_message_load_amqp(msg, data, size);
+  case PN_JSON: return pn_message_load_json(msg, data, size);
+  }
+
+  return PN_STATE_ERR;
+}
+
+int pn_message_load_data(pn_message_t *msg, const char *data, size_t size)
+{
+  if (!msg) return PN_ARG_ERR;
+  if (!msg->body) {
+    msg->body = pn_data(64);
+  }
+
+  int err = pn_data_fill(msg->body, "DLz", DATA, size, data);
+  if (err) return err;
+  return pn_data_intern(msg->body);
+}
+
+int pn_message_load_text(pn_message_t *msg, const char *data, size_t size)
+{
+  if (!msg) return PN_ARG_ERR;
+  if (!msg->body) {
+    msg->body = pn_data(64);
+  }
+
+  int err = pn_data_fill(msg->body, "DLS", AMQP_VALUE, data);
+  if (err) return err;
+  return pn_data_intern(msg->body);
+}
+
+int pn_message_load_amqp(pn_message_t *msg, const char *data, size_t size)
 {
   if (!msg) return PN_ARG_ERR;
 
@@ -574,10 +614,88 @@ int pn_message_load(pn_message_t *msg, c
   }
 }
 
+int pn_message_load_json(pn_message_t *msg, const char *data, size_t size)
+{
+  if (!msg) return PN_ARG_ERR;
+
+  // XXX: unsupported format
+
+  return PN_ERR;
+}
+
 int pn_message_save(pn_message_t *msg, char *data, size_t *size)
 {
   if (!msg) return PN_ARG_ERR;
 
+  switch (msg->format) {
+  case PN_DATA: return pn_message_save_data(msg, data, size);
+  case PN_TEXT: return pn_message_save_text(msg, data, size);
+  case PN_AMQP: return pn_message_save_amqp(msg, data, size);
+  case PN_JSON: return pn_message_save_json(msg, data, size);
+  }
+
+  return PN_STATE_ERR;
+}
+
+int pn_message_save_data(pn_message_t *msg, char *data, size_t *size)
+{
+  if (!msg) return PN_ARG_ERR;
+
+  if (!msg->body) {
+    *size = 0;
+    return 0;
+  }
+
+  uint64_t desc;
+  pn_bytes_t bytes;
+  bool scanned;
+  int err = pn_data_scan(msg->body, "DL?z", &desc, &scanned, &bytes);
+  if (err) return err;
+  if (desc == DATA && scanned) {
+    if (bytes.size > *size) {
+      return PN_OVERFLOW;
+    } else {
+      memcpy(data, bytes.start, bytes.size);
+      *size = bytes.size;
+      return 0;
+    }
+  } else {
+    return PN_STATE_ERR;
+  }
+}
+
+int pn_message_save_text(pn_message_t *msg, char *data, size_t *size)
+{
+  if (!msg) return PN_ARG_ERR;
+
+  if (!msg->body) {
+    *size = 0;
+    return 0;
+  }
+
+  uint64_t desc;
+  pn_bytes_t str;
+  bool scanned;
+  int err = pn_data_scan(msg->body, "DL?S", &desc, &scanned, &str);
+  if (err) return err;
+  if (desc == AMQP_VALUE && scanned) {
+    if (str.size >= *size) {
+      return PN_OVERFLOW;
+    } else {
+      memcpy(data, str.start, str.size);
+      data[str.size] = '\0';
+      *size = str.size;
+      return 0;
+    }
+  } else {
+    return PN_STATE_ERR;
+  }
+}
+
+int pn_message_save_amqp(pn_message_t *msg, char *data, size_t *size)
+{
+  if (!msg) return PN_ARG_ERR;
+
   if (!msg->body) {
     *size = 0;
     return 0;
@@ -585,3 +703,12 @@ int pn_message_save(pn_message_t *msg, c
 
   return pn_data_format(msg->body, data, size);
 }
+
+int pn_message_save_json(pn_message_t *msg, char *data, size_t *size)
+{
+  if (!msg) return PN_ARG_ERR;
+
+  // XXX: unsupported format
+
+  return PN_ERR;
+}

Modified: qpid/proton/trunk/tests/proton_tests/message.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/message.py?rev=1353174&r1=1353173&r2=1353174&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/message.py (original)
+++ qpid/proton/trunk/tests/proton_tests/message.py Sat Jun 23 18:46:28 2012
@@ -108,7 +108,7 @@ class CodecTest(Test):
     assert not pn_message_set_priority(self.msg, 100)
     assert not pn_message_set_address(self.msg, "address")
     assert not pn_message_set_subject(self.msg, "subject")
-    body = '"test body"'
+    body = 'Hello World!'
     rc = pn_message_load(self.msg, body)
     assert not rc, rc
 
@@ -127,9 +127,11 @@ class CodecTest(Test):
     assert not cd, cd
     assert saved == body, (body, saved)
 
-class ParserTest(Test):
 
-  def _test(self, *bodies):
+class LoadSaveTest(Test):
+
+  def _test(self, fmt, *bodies):
+    pn_message_set_format(self.msg, fmt)
     for body in bodies:
       pn_message_clear(self.msg)
       cd, saved = pn_message_save(self.msg, 1024)
@@ -141,26 +143,32 @@ class ParserTest(Test):
       assert saved == body, (body, saved)
 
   def testIntegral(self):
-    self._test("0", "1", "-1", "9223372036854775807")
+    self._test(PN_AMQP, "0", "1", "-1", "9223372036854775807")
 
   def testFloating(self):
-    self._test("1.1", "3.14159", "-3.14159", "-1.1")
+    self._test(PN_AMQP, "1.1", "3.14159", "-3.14159", "-1.1")
 
   def testSymbol(self):
-    self._test(':symbol', ':"quoted symbol"')
+    self._test(PN_AMQP, ':symbol', ':"quoted symbol"')
 
   def testString(self):
-    self._test('"string"', '"string with spaces"')
+    self._test(PN_AMQP, '"string"', '"string with spaces"')
 
   def testBinary(self):
-    self._test('b"binary"', 'b"binary with spaces and special values: 
\\x00\\x01\\x02"')
+    self._test(PN_AMQP, 'b"binary"', 'b"binary with spaces and special values: 
\\x00\\x01\\x02"')
 
   def testMap(self):
-    self._test('{"one"=1, :two=2, :pi=3.14159}', '{[1, 2, 3]=[3, 2, 1], 
{1=2}={3=4}}')
+    self._test(PN_AMQP, '{"one"=1, :two=2, :pi=3.14159}', '{[1, 2, 3]=[3, 2, 
1], {1=2}={3=4}}')
 
   def testList(self):
-    self._test('[1, 2, 3]', '["one", "two", "three"]', '[:one, 2, 3.14159]',
+    self._test(PN_AMQP, '[1, 2, 3]', '["one", "two", "three"]', '[:one, 2, 
3.14159]',
                '[{1=2}, {3=4}, {5=6}]')
 
   def testDescriptor(self):
-    self._test('@21 ["one", 2, "three", @:url "http://example.org";]')
+    self._test(PN_AMQP, '@21 ["one", 2, "three", @:url "http://example.org";]')
+
+  def testData(self):
+    self._test(PN_DATA, "this is data\x00\x01\x02 blah blah")
+
+  def testText(self):
+    self._test(PN_TEXT, "this is a text string")



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to