Author: rhs
Date: Tue Oct 2 17:59:57 2012
New Revision: 1393063
URL: http://svn.apache.org/viewvc?rev=1393063&view=rev
Log:
added tests for larger message sizes; fixed various hard coded limits
Modified:
qpid/proton/trunk/proton-c/include/proton/buffer.h
qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
qpid/proton/trunk/proton-c/src/messenger.c
qpid/proton/trunk/tests/proton_tests/messenger.py
Modified: qpid/proton/trunk/proton-c/include/proton/buffer.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/buffer.h?rev=1393063&r1=1393062&r2=1393063&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/buffer.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/buffer.h Tue Oct 2 17:59:57 2012
@@ -34,6 +34,8 @@ pn_buffer_t *pn_buffer(size_t capacity);
void pn_buffer_free(pn_buffer_t *buf);
size_t pn_buffer_size(pn_buffer_t *buf);
size_t pn_buffer_capacity(pn_buffer_t *buf);
+size_t pn_buffer_available(pn_buffer_t *buf);
+int pn_buffer_ensure(pn_buffer_t *buf, size_t size);
int pn_buffer_append(pn_buffer_t *buf, const char *bytes, size_t size);
int pn_buffer_prepend(pn_buffer_t *buf, const char *bytes, size_t size);
size_t pn_buffer_get(pn_buffer_t *buf, size_t offset, size_t size, char *dst);
Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c?rev=1393063&r1=1393062&r2=1393063&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c Tue Oct 2 17:59:57
2012
@@ -35,6 +35,8 @@ pn_dispatcher_t *pn_dispatcher(uint8_t f
disp->context = context;
disp->trace = PN_TRACE_OFF;
+ disp->input = pn_buffer(1024);
+
disp->channel = 0;
disp->code = 0;
disp->args = pn_data(16);
@@ -48,7 +50,6 @@ pn_dispatcher_t *pn_dispatcher(uint8_t f
disp->available = 0;
disp->halt = false;
- disp->batch = true;
return disp;
}
@@ -56,6 +57,7 @@ pn_dispatcher_t *pn_dispatcher(uint8_t f
void pn_dispatcher_free(pn_dispatcher_t *disp)
{
if (disp) {
+ pn_buffer_free(disp->input);
pn_data_free(disp->args);
pn_data_free(disp->output_args);
free(disp->output);
@@ -105,64 +107,84 @@ void pn_dispatcher_trace(pn_dispatcher_t
va_end(ap);
}
+int pn_dispatch_frame(pn_dispatcher_t *disp, pn_frame_t frame)
+{
+ ssize_t dsize = pn_data_decode(disp->args, frame.payload, frame.size);
+ if (dsize < 0) {
+ fprintf(stderr, "Error decoding frame: %s %s\n", pn_code(dsize),
+ pn_data_error(disp->args));
+ pn_fprint_data(stderr, frame.payload, frame.size);
+ fprintf(stderr, "\n");
+ return dsize;
+ }
+
+ disp->channel = frame.channel;
+ // XXX: assuming numeric
+ uint64_t lcode;
+ bool scanned;
+ int e = pn_data_scan(disp->args, "D?L.", &scanned, &lcode);
+ if (e) {
+ fprintf(stderr, "Scan error\n");
+ return e;
+ }
+ if (!scanned) {
+ fprintf(stderr, "Error dispatching frame\n");
+ return PN_ERR;
+ }
+ uint8_t code = lcode;
+ disp->code = code;
+ disp->size = frame.size - dsize;
+ if (disp->size)
+ disp->payload = frame.payload + dsize;
+
+ pn_do_trace(disp, disp->channel, IN, disp->args, disp->payload, disp->size);
+
+ pn_action_t *action = disp->actions[code];
+ int err = action(disp);
+
+ disp->channel = 0;
+ disp->code = 0;
+ pn_data_clear(disp->args);
+ disp->size = 0;
+ disp->payload = NULL;
+
+ return err;
+}
+
ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, char *bytes, size_t
available)
{
+ size_t leftover = pn_buffer_size(disp->input);
+ if (leftover) {
+ int e = pn_buffer_append(disp->input, bytes, available);
+ if (e) return e;
+ pn_bytes_t b = pn_buffer_bytes(disp->input);
+ bytes = b.start;
+ available = b.size;
+ }
+
size_t read = 0;
+
while (!disp->halt) {
pn_frame_t frame;
- size_t n = pn_read_frame(&frame, bytes + read, available);
- if (n) {
- ssize_t dsize = pn_data_decode(disp->args, frame.payload, frame.size);
- if (dsize < 0) {
- fprintf(stderr, "Error decoding frame: %s %s\n", pn_code(dsize),
- pn_data_error(disp->args));
- pn_fprint_data(stderr, frame.payload, frame.size);
- fprintf(stderr, "\n");
- return dsize;
- }
-
- disp->channel = frame.channel;
- // XXX: assuming numeric
- uint64_t lcode;
- bool scanned;
- int e = pn_data_scan(disp->args, "D?L.", &scanned, &lcode);
- if (e) {
- fprintf(stderr, "Scan error\n");
- return e;
- }
- if (!scanned) {
- fprintf(stderr, "Error dispatching frame\n");
- return PN_ERR;
- }
- uint8_t code = lcode;
- disp->code = code;
- disp->size = frame.size - dsize;
- if (disp->size)
- disp->payload = frame.payload + dsize;
-
- pn_do_trace(disp, disp->channel, IN, disp->args, disp->payload,
disp->size);
-
- pn_action_t *action = disp->actions[code];
- int err = action(disp);
-
- disp->channel = 0;
- disp->code = 0;
- pn_data_clear(disp->args);
- disp->size = 0;
- disp->payload = NULL;
-
- if (err) return err;
- available -= n;
+ size_t n = pn_read_frame(&frame, bytes + read, available - read);
+ if (n) {
+ int e = pn_dispatch_frame(disp, frame);
+ if (e) return e;
read += n;
-
- if (!disp->batch) break;
} else {
+ if (leftover) {
+ pn_buffer_trim(disp->input, read, 0);
+ } else {
+ int e = pn_buffer_append(disp->input, bytes + read, available - read);
+ if (e) return e;
+ }
+ read = available;
break;
}
}
- return read;
+ return read - leftover;
}
int pn_scan_args(pn_dispatcher_t *disp, const char *fmt, ...)
Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h?rev=1393063&r1=1393062&r2=1393063&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h Tue Oct 2 17:59:57
2012
@@ -24,6 +24,7 @@
#include <sys/types.h>
#include <stdbool.h>
+#include <proton/buffer.h>
#include <proton/codec.h>
typedef struct pn_dispatcher_t pn_dispatcher_t;
@@ -38,6 +39,7 @@ struct pn_dispatcher_t {
const char *names[256];
uint8_t frame_type;
pn_trace_t trace;
+ pn_buffer_t *input;
uint16_t channel;
uint8_t code;
pn_data_t *args;
Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1393063&r1=1393062&r2=1393063&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Tue Oct 2 17:59:57 2012
@@ -23,6 +23,7 @@
#include <proton/driver.h>
#include <proton/util.h>
#include <proton/ssl.h>
+#include <proton/buffer.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
@@ -39,6 +40,7 @@ struct pn_messenger_t {
pn_driver_t *driver;
int credit;
uint64_t next_tag;
+ pn_buffer_t *buffer;
pn_error_t *error;
};
@@ -69,6 +71,7 @@ pn_messenger_t *pn_messenger(const char
m->driver = pn_driver();
m->credit = 0;
m->next_tag = 0;
+ m->buffer = pn_buffer(1024);
m->error = pn_error();
}
@@ -149,6 +152,7 @@ void pn_messenger_free(pn_messenger_t *m
free(messenger->password);
free(messenger->trusted_certificates);
pn_driver_free(messenger->driver);
+ pn_buffer_free(messenger->buffer);
pn_error_free(messenger->error);
free(messenger);
}
@@ -548,25 +552,30 @@ int pn_messenger_put(pn_messenger_t *mes
return pn_error_format(messenger->error, PN_ERR,
"unable to send to address: %s (%s)", address,
pn_driver_error(messenger->driver));
- // XXX: proper tag
- char tag[8];
- void *ptr = &tag;
- uint64_t next = messenger->next_tag++;
- *((uint32_t *) ptr) = next;
- pn_delivery(sender, pn_dtag(tag, 8));
- size_t size = 1024;
- // XXX: max message size
- while (size < 16*1024) {
- char encoded[size];
+
+ pn_buffer_t *buf = messenger->buffer;
+
+ while (true) {
+ char *encoded = pn_buffer_bytes(buf).start;
+ size_t size = pn_buffer_capacity(buf);
int err = pn_message_encode(msg, encoded, &size);
if (err == PN_OVERFLOW) {
- size *= 2;
+ err = pn_buffer_ensure(buf, 2*pn_buffer_capacity(buf));
+ if (err) return pn_error_format(messenger->error, err, "put: error
growing buffer");
} else if (err) {
- return err;
+ return pn_error_format(messenger->error, err, "encode error: %s",
+ pn_message_error(msg));
} else {
+ // XXX: proper tag
+ char tag[8];
+ void *ptr = &tag;
+ uint64_t next = messenger->next_tag++;
+ *((uint32_t *) ptr) = next;
+ pn_delivery(sender, pn_dtag(tag, 8));
ssize_t n = pn_link_send(sender, encoded, size);
if (n < 0) {
- return n;
+ return pn_error_format(messenger->error, n, "send error: %s",
+ pn_error_text(pn_link_error(sender)));
} else {
pn_link_advance(sender);
pn_messenger_tsync(messenger, false_pred, 0);
@@ -653,12 +662,21 @@ int pn_messenger_get(pn_messenger_t *mes
if (pn_delivery_readable(d)) {
pn_link_t *l = pn_delivery_link(d);
size_t pending = pn_delivery_pending(d);
- char buf[pending];
- ssize_t n = pn_link_recv(l, buf, pending);
+ pn_buffer_t *buf = messenger->buffer;
+ int err = pn_buffer_ensure(buf, pending + 1);
+ if (err) return pn_error_format(messenger->error, err, "get: error
growing buffer");
+ char *encoded = pn_buffer_bytes(buf).start;
+ ssize_t n = pn_link_recv(l, encoded, pending);
+ if (n != pending) {
+ return pn_error_format(messenger->error, n, "didn't receive pending
bytes: %zi", n);
+ }
+ n = pn_link_recv(l, encoded + pending, 1);
pn_delivery_settle(d);
- if (n < 0) return pn_error_format(messenger->error, n, "receive
error");
+ if (n != PN_EOS) {
+ return pn_error_format(messenger->error, n, "PN_EOS expected");
+ }
if (msg) {
- int err = pn_message_decode(msg, buf, n);
+ int err = pn_message_decode(msg, encoded, pending);
if (err) {
return pn_error_format(messenger->error, err, "error decoding
message: %s",
pn_message_error(msg));
Modified: qpid/proton/trunk/tests/proton_tests/messenger.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/messenger.py?rev=1393063&r1=1393062&r2=1393063&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/proton_tests/messenger.py Tue Oct 2 17:59:57 2012
@@ -65,11 +65,15 @@ class MessengerTest(Test):
self.server.stop()
self.running = False
- def testSendReceive(self):
+ def _testSendReceive(self, size=None):
msg = Message()
msg.address="amqp://0.0.0.0:12345"
msg.subject="Hello World!"
body = "First the world, then the galaxy!"
+ if size is not None:
+ while len(body) < size:
+ body = 2*body
+ body = body[:size]
msg.load(body)
self.client.put(msg)
self.client.send()
@@ -83,6 +87,27 @@ class MessengerTest(Test):
rbod = reply.save()
assert rbod == body, (rbod, body)
+ def testSendReceive(self):
+ self._testSendReceive()
+
+ def testSendReceive1K(self):
+ self._testSendReceive(1024)
+
+ def testSendReceive2K(self):
+ self._testSendReceive(2*1024)
+
+ def testSendReceive4K(self):
+ self._testSendReceive(4*1024)
+
+ def testSendReceive10K(self):
+ self._testSendReceive(10*1024)
+
+ def testSendReceive100K(self):
+ self._testSendReceive(100*1024)
+
+ def testSendReceive1M(self):
+ self._testSendReceive(1024*1024)
+
def testSendBogus(self):
msg = Message()
msg.address="totally-bogus-address"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]