Author: rhs
Date: Tue Oct  2 17:59:57 2012
New Revision: 1393063

URL: http://svn.apache.org/viewvc?rev=1393063&view=rev
Log:
added tests for larger message sizes; fixed various hard coded limits

Modified:
    qpid/proton/trunk/proton-c/include/proton/buffer.h
    qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
    qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
    qpid/proton/trunk/proton-c/src/messenger.c
    qpid/proton/trunk/tests/proton_tests/messenger.py

Modified: qpid/proton/trunk/proton-c/include/proton/buffer.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/buffer.h?rev=1393063&r1=1393062&r2=1393063&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/buffer.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/buffer.h Tue Oct  2 17:59:57 2012
@@ -34,6 +34,8 @@ pn_buffer_t *pn_buffer(size_t capacity);
 void pn_buffer_free(pn_buffer_t *buf);
 size_t pn_buffer_size(pn_buffer_t *buf);
 size_t pn_buffer_capacity(pn_buffer_t *buf);
+size_t pn_buffer_available(pn_buffer_t *buf);
+int pn_buffer_ensure(pn_buffer_t *buf, size_t size);
 int pn_buffer_append(pn_buffer_t *buf, const char *bytes, size_t size);
 int pn_buffer_prepend(pn_buffer_t *buf, const char *bytes, size_t size);
 size_t pn_buffer_get(pn_buffer_t *buf, size_t offset, size_t size, char *dst);

Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c?rev=1393063&r1=1393062&r2=1393063&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c Tue Oct  2 17:59:57 
2012
@@ -35,6 +35,8 @@ pn_dispatcher_t *pn_dispatcher(uint8_t f
   disp->context = context;
   disp->trace = PN_TRACE_OFF;
 
+  disp->input = pn_buffer(1024);
+
   disp->channel = 0;
   disp->code = 0;
   disp->args = pn_data(16);
@@ -48,7 +50,6 @@ pn_dispatcher_t *pn_dispatcher(uint8_t f
   disp->available = 0;
 
   disp->halt = false;
-  disp->batch = true;
 
   return disp;
 }
@@ -56,6 +57,7 @@ pn_dispatcher_t *pn_dispatcher(uint8_t f
 void pn_dispatcher_free(pn_dispatcher_t *disp)
 {
   if (disp) {
+    pn_buffer_free(disp->input);
     pn_data_free(disp->args);
     pn_data_free(disp->output_args);
     free(disp->output);
@@ -105,64 +107,84 @@ void pn_dispatcher_trace(pn_dispatcher_t
   va_end(ap);
 }
 
+int pn_dispatch_frame(pn_dispatcher_t *disp, pn_frame_t frame)
+{
+  ssize_t dsize = pn_data_decode(disp->args, frame.payload, frame.size);
+  if (dsize < 0) {
+    fprintf(stderr, "Error decoding frame: %s %s\n", pn_code(dsize),
+            pn_data_error(disp->args));
+    pn_fprint_data(stderr, frame.payload, frame.size);
+    fprintf(stderr, "\n");
+    return dsize;
+  }
+
+  disp->channel = frame.channel;
+  // XXX: assuming numeric
+  uint64_t lcode;
+  bool scanned;
+  int e = pn_data_scan(disp->args, "D?L.", &scanned, &lcode);
+  if (e) {
+    fprintf(stderr, "Scan error\n");
+    return e;
+  }
+  if (!scanned) {
+    fprintf(stderr, "Error dispatching frame\n");
+    return PN_ERR;
+  }
+  uint8_t code = lcode;
+  disp->code = code;
+  disp->size = frame.size - dsize;
+  if (disp->size)
+    disp->payload = frame.payload + dsize;
+
+  pn_do_trace(disp, disp->channel, IN, disp->args, disp->payload, disp->size);
+
+  pn_action_t *action = disp->actions[code];
+  int err = action(disp);
+
+  disp->channel = 0;
+  disp->code = 0;
+  pn_data_clear(disp->args);
+  disp->size = 0;
+  disp->payload = NULL;
+
+  return err;
+}
+
 ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, char *bytes, size_t 
available)
 {
+  size_t leftover = pn_buffer_size(disp->input);
+  if (leftover) {
+    int e = pn_buffer_append(disp->input, bytes, available);
+    if (e) return e;
+    pn_bytes_t b = pn_buffer_bytes(disp->input);
+    bytes = b.start;
+    available = b.size;
+  }
+
   size_t read = 0;
+
   while (!disp->halt) {
     pn_frame_t frame;
-    size_t n = pn_read_frame(&frame, bytes + read, available);
-    if (n) {
-      ssize_t dsize = pn_data_decode(disp->args, frame.payload, frame.size);
-      if (dsize < 0) {
-        fprintf(stderr, "Error decoding frame: %s %s\n", pn_code(dsize),
-                pn_data_error(disp->args));
-        pn_fprint_data(stderr, frame.payload, frame.size);
-        fprintf(stderr, "\n");
-        return dsize;
-      }
-
-      disp->channel = frame.channel;
-      // XXX: assuming numeric
-      uint64_t lcode;
-      bool scanned;
-      int e = pn_data_scan(disp->args, "D?L.", &scanned, &lcode);
-      if (e) {
-        fprintf(stderr, "Scan error\n");
-        return e;
-      }
-      if (!scanned) {
-        fprintf(stderr, "Error dispatching frame\n");
-        return PN_ERR;
-      }
-      uint8_t code = lcode;
-      disp->code = code;
-      disp->size = frame.size - dsize;
-      if (disp->size)
-        disp->payload = frame.payload + dsize;
-
-      pn_do_trace(disp, disp->channel, IN, disp->args, disp->payload, 
disp->size);
-
-      pn_action_t *action = disp->actions[code];
-      int err = action(disp);
-
-      disp->channel = 0;
-      disp->code = 0;
-      pn_data_clear(disp->args);
-      disp->size = 0;
-      disp->payload = NULL;
-
-      if (err) return err;
 
-      available -= n;
+    size_t n = pn_read_frame(&frame, bytes + read, available - read);
+    if (n) {
+      int e = pn_dispatch_frame(disp, frame);
+      if (e) return e;
       read += n;
-
-      if (!disp->batch) break;
     } else {
+      if (leftover) {
+        pn_buffer_trim(disp->input, read, 0);
+      } else {
+        int e = pn_buffer_append(disp->input, bytes + read, available - read);
+        if (e) return e;
+      }
+      read = available;
       break;
     }
   }
 
-  return read;
+  return read - leftover;
 }
 
 int pn_scan_args(pn_dispatcher_t *disp, const char *fmt, ...)

Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h?rev=1393063&r1=1393062&r2=1393063&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h Tue Oct  2 17:59:57 
2012
@@ -24,6 +24,7 @@
 
 #include <sys/types.h>
 #include <stdbool.h>
+#include <proton/buffer.h>
 #include <proton/codec.h>
 
 typedef struct pn_dispatcher_t pn_dispatcher_t;
@@ -38,6 +39,7 @@ struct pn_dispatcher_t {
   const char *names[256];
   uint8_t frame_type;
   pn_trace_t trace;
+  pn_buffer_t *input;
   uint16_t channel;
   uint8_t code;
   pn_data_t *args;

Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1393063&r1=1393062&r2=1393063&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Tue Oct  2 17:59:57 2012
@@ -23,6 +23,7 @@
 #include <proton/driver.h>
 #include <proton/util.h>
 #include <proton/ssl.h>
+#include <proton/buffer.h>
 #include <stdlib.h>
 #include <string.h>
 #include <stdio.h>
@@ -39,6 +40,7 @@ struct pn_messenger_t {
   pn_driver_t *driver;
   int credit;
   uint64_t next_tag;
+  pn_buffer_t *buffer;
   pn_error_t *error;
 };
 
@@ -69,6 +71,7 @@ pn_messenger_t *pn_messenger(const char 
     m->driver = pn_driver();
     m->credit = 0;
     m->next_tag = 0;
+    m->buffer = pn_buffer(1024);
     m->error = pn_error();
   }
 
@@ -149,6 +152,7 @@ void pn_messenger_free(pn_messenger_t *m
     free(messenger->password);
     free(messenger->trusted_certificates);
     pn_driver_free(messenger->driver);
+    pn_buffer_free(messenger->buffer);
     pn_error_free(messenger->error);
     free(messenger);
   }
@@ -548,25 +552,30 @@ int pn_messenger_put(pn_messenger_t *mes
     return pn_error_format(messenger->error, PN_ERR,
                            "unable to send to address: %s (%s)", address,
                            pn_driver_error(messenger->driver));
-  // XXX: proper tag
-  char tag[8];
-  void *ptr = &tag;
-  uint64_t next = messenger->next_tag++;
-  *((uint32_t *) ptr) = next;
-  pn_delivery(sender, pn_dtag(tag, 8));
-  size_t size = 1024;
-  // XXX: max message size
-  while (size < 16*1024) {
-    char encoded[size];
+
+  pn_buffer_t *buf = messenger->buffer;
+
+  while (true) {
+    char *encoded = pn_buffer_bytes(buf).start;
+    size_t size = pn_buffer_capacity(buf);
     int err = pn_message_encode(msg, encoded, &size);
     if (err == PN_OVERFLOW) {
-      size *= 2;
+      err = pn_buffer_ensure(buf, 2*pn_buffer_capacity(buf));
+      if (err) return pn_error_format(messenger->error, err, "put: error 
growing buffer");
     } else if (err) {
-      return err;
+      return pn_error_format(messenger->error, err, "encode error: %s",
+                             pn_message_error(msg));
     } else {
+      // XXX: proper tag
+      char tag[8];
+      void *ptr = &tag;
+      uint64_t next = messenger->next_tag++;
+      *((uint32_t *) ptr) = next;
+      pn_delivery(sender, pn_dtag(tag, 8));
       ssize_t n = pn_link_send(sender, encoded, size);
       if (n < 0) {
-        return n;
+        return pn_error_format(messenger->error, n, "send error: %s",
+                               pn_error_text(pn_link_error(sender)));
       } else {
         pn_link_advance(sender);
         pn_messenger_tsync(messenger, false_pred, 0);
@@ -653,12 +662,21 @@ int pn_messenger_get(pn_messenger_t *mes
       if (pn_delivery_readable(d)) {
         pn_link_t *l = pn_delivery_link(d);
         size_t pending = pn_delivery_pending(d);
-        char buf[pending];
-        ssize_t n = pn_link_recv(l, buf, pending);
+        pn_buffer_t *buf = messenger->buffer;
+        int err = pn_buffer_ensure(buf, pending + 1);
+        if (err) return pn_error_format(messenger->error, err, "get: error 
growing buffer");
+        char *encoded = pn_buffer_bytes(buf).start;
+        ssize_t n = pn_link_recv(l, encoded, pending);
+        if (n != pending) {
+          return pn_error_format(messenger->error, n, "didn't receive pending 
bytes: %zi", n);
+        }
+        n = pn_link_recv(l, encoded + pending, 1);
         pn_delivery_settle(d);
-        if (n < 0) return pn_error_format(messenger->error, n, "receive 
error");
+        if (n != PN_EOS) {
+          return pn_error_format(messenger->error, n, "PN_EOS expected");
+        }
         if (msg) {
-          int err = pn_message_decode(msg, buf, n);
+          int err = pn_message_decode(msg, encoded, pending);
           if (err) {
             return pn_error_format(messenger->error, err, "error decoding 
message: %s",
                                    pn_message_error(msg));

Modified: qpid/proton/trunk/tests/proton_tests/messenger.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/messenger.py?rev=1393063&r1=1393062&r2=1393063&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/proton_tests/messenger.py Tue Oct  2 17:59:57 2012
@@ -65,11 +65,15 @@ class MessengerTest(Test):
     self.server.stop()
     self.running = False
 
-  def testSendReceive(self):
+  def _testSendReceive(self, size=None):
     msg = Message()
     msg.address="amqp://0.0.0.0:12345"
     msg.subject="Hello World!"
     body = "First the world, then the galaxy!"
+    if size is not None:
+      while len(body) < size:
+        body = 2*body
+      body = body[:size]
     msg.load(body)
     self.client.put(msg)
     self.client.send()
@@ -83,6 +87,27 @@ class MessengerTest(Test):
     rbod = reply.save()
     assert rbod == body, (rbod, body)
 
+  def testSendReceive(self):
+    self._testSendReceive()
+
+  def testSendReceive1K(self):
+    self._testSendReceive(1024)
+
+  def testSendReceive2K(self):
+    self._testSendReceive(2*1024)
+
+  def testSendReceive4K(self):
+    self._testSendReceive(4*1024)
+
+  def testSendReceive10K(self):
+    self._testSendReceive(10*1024)
+
+  def testSendReceive100K(self):
+    self._testSendReceive(100*1024)
+
+  def testSendReceive1M(self):
+    self._testSendReceive(1024*1024)
+
   def testSendBogus(self):
     msg = Message()
     msg.address="totally-bogus-address"



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to