Repository: qpid-proton Updated Branches: refs/heads/master ea02b9337 -> 58dfb4aa4
PROTON-1512: c examples: clean up examples and fix some issues. All examples that receive messages use a consistent code sequence and error reporting, all deal with aborted messages consistently. All receivers accumulate messages in local buffer not in proton link buffers. Fixed issues - broker.c use per-link buffer for thread safety - direct.c - wasn't accumulating messages in separate buffer Added abort tests to example_test.py, using send-abort.c and verifying broker.c and direct.c correctly report and drop aborted messages and continue to function. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/58dfb4aa Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/58dfb4aa Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/58dfb4aa Branch: refs/heads/master Commit: 58dfb4aa4424c0116443533639a42b7ceda774e7 Parents: 5b1be87 Author: Alan Conway <[email protected]> Authored: Wed Sep 27 14:01:48 2017 -0400 Committer: Alan Conway <[email protected]> Committed: Wed Sep 27 14:10:59 2017 -0400 ---------------------------------------------------------------------- examples/c/broker.c | 36 +++++----- examples/c/direct.c | 156 +++++++++++++++++++++------------------- examples/c/example_test.py | 59 +++++++++++---- examples/c/receive.c | 68 +++++++++--------- 4 files changed, 175 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/58dfb4aa/examples/c/broker.c ---------------------------------------------------------------------- diff --git a/examples/c/broker.c b/examples/c/broker.c index c2e4c16..3a4d04a 100644 --- a/examples/c/broker.c +++ b/examples/c/broker.c @@ -327,28 +327,26 @@ static void handle(broker_t* b, pn_event_t* e) { } case PN_DELIVERY: { /* Incoming message data */ pn_delivery_t *d = pn_event_delivery(e); - pn_link_t *l = pn_delivery_link(d); - if (!pn_delivery_readable(d)) break; - pn_rwbytes_t *m = message_buffer(l); - for (size_t p = pn_delivery_pending(d); p > 0; p = pn_delivery_pending(d)) { - /* Append data to the reeving buffer */ - m->size += p; + if (pn_delivery_readable(d)) { + pn_link_t *l = pn_delivery_link(d); + size_t size = pn_delivery_pending(d); + pn_rwbytes_t* m = message_buffer(l); /* Append data to incoming message buffer */ + m->size += size; m->start = (char*)realloc(m->start, m->size); - int recv = pn_link_recv(l, m->start + m->size - p, p); - if (recv < 0 && recv != PN_EOS) { - fprintf(stderr, "PN_DELIVERY: pn_link_recv error %s\n", pn_code(recv)); - break; + int err = pn_link_recv(l, m->start, m->size); + if (err < 0 && err != PN_EOS) { + fprintf(stderr, "PN_DELIVERY error: %s\n", pn_code(err)); + pn_delivery_settle(d); /* Free the delivery so we can receive the next message */ + m->size = 0; /* forget the data we accumulated */ + } else if (!pn_delivery_partial(d)) { /* Message is complete */ + const char *qname = pn_terminus_get_address(pn_link_target(l)); + queue_receive(b->proactor, queues_get(&b->queues, qname), *m); + *m = pn_rwbytes_null; /* Reset the buffer for the next message*/ + pn_delivery_update(d, PN_ACCEPTED); + pn_delivery_settle(d); + pn_link_flow(l, WINDOW - pn_link_credit(l)); } } - if (!pn_delivery_partial(d)) { - /* The broker does not decode the message, just forwards it. */ - const char *qname = pn_terminus_get_address(pn_link_target(l)); - queue_receive(b->proactor, queues_get(&b->queues, qname), *m); - *m = pn_rwbytes_null; - pn_delivery_update(d, PN_ACCEPTED); - pn_delivery_settle(d); - pn_link_flow(l, WINDOW - pn_link_credit(l)); - } break; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/58dfb4aa/examples/c/direct.c ---------------------------------------------------------------------- diff --git a/examples/c/direct.c b/examples/c/direct.c index 15550e6..f55519e 100644 --- a/examples/c/direct.c +++ b/examples/c/direct.c @@ -41,7 +41,7 @@ typedef struct app_data_t { pn_proactor_t *proactor; pn_listener_t *listener; - pn_rwbytes_t message_buffer; + pn_rwbytes_t msgin, msgout; /* Buffers for incoming/outgoing messages */ /* Sender values */ int sent; @@ -56,11 +56,20 @@ static const int BATCH = 1000; /* Batch size for unlimited receive */ static int exit_code = 0; -static void check_condition(pn_event_t *e, pn_condition_t *cond) { +/* Close the connection and the listener so so we will get a + * PN_PROACTOR_INACTIVE event and exit, once all outstanding events + * are processed. + */ +static void close_all(pn_connection_t *c, app_data_t *app) { + if (c) pn_connection_close(c); + if (app->listener) pn_listener_close(app->listener); +} + +static void check_condition(pn_event_t *e, pn_condition_t *cond, app_data_t *app) { if (pn_condition_is_set(cond)) { fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)), pn_condition_get_name(cond), pn_condition_get_description(cond)); - pn_connection_close(pn_event_connection(e)); + close_all(pn_event_connection(e), app); exit_code = 1; } } @@ -78,18 +87,18 @@ static pn_bytes_t encode_message(app_data_t* app) { pn_data_exit(body); /* encode the message, expanding the encode buffer as needed */ - if (app->message_buffer.start == NULL) { + if (app->msgout.start == NULL) { static const size_t initial_size = 128; - app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size)); + app->msgout = pn_rwbytes(initial_size, (char*)malloc(initial_size)); } - /* app->message_buffer is the total buffer space available. */ + /* app->msgout is the total buffer space available. */ /* mbuf wil point at just the portion used by the encoded message */ - pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start); + pn_rwbytes_t mbuf = pn_rwbytes(app->msgout.size, app->msgout.start); int status = 0; while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) { - app->message_buffer.size *= 2; - app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size); - mbuf.size = app->message_buffer.size; + app->msgout.size *= 2; + app->msgout.start = (char*)realloc(app->msgout.start, app->msgout.size); + mbuf.size = app->msgout.size; } if (status != 0) { fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message))); @@ -99,26 +108,21 @@ static pn_bytes_t encode_message(app_data_t* app) { return pn_bytes(mbuf.size, mbuf.start); } -#define MAX_SIZE 1024 - -static void decode_message(pn_delivery_t *dlv) { - static char buffer[MAX_SIZE]; - ssize_t len; - // try to decode the message body - if (pn_delivery_pending(dlv) < MAX_SIZE) { - // read in the raw data - len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE); - if (len > 0) { - // decode it into a proton message - pn_message_t *m = pn_message(); - if (PN_OK == pn_message_decode(m, buffer, len)) { - pn_string_t *s = pn_string(NULL); - pn_inspect(pn_message_body(m), s); - printf("%s\n", pn_string_get(s)); - pn_free(s); - } - pn_message_free(m); - } +static void decode_message(pn_rwbytes_t data) { + pn_message_t *m = pn_message(); + int err = pn_message_decode(m, data.start, data.size); + if (!err) { + /* Print the decoded message */ + pn_string_t *s = pn_string(NULL); + pn_inspect(pn_message_body(m), s); + printf("%s\n", pn_string_get(s)); + fflush(stdout); + pn_free(s); + pn_message_free(m); + free(data.start); + } else { + fprintf(stderr, "decode_message: %s\n", pn_code(err)); + exit_code = 1; } } @@ -132,36 +136,37 @@ static void handle_receive(app_data_t* app, pn_event_t* event) { pn_link_flow(l, app->message_count ? app->message_count : BATCH); } break; - case PN_DELIVERY: { - /* A message has been received */ - pn_link_t *link = NULL; - pn_delivery_t *dlv = pn_event_delivery(event); - if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) { - link = pn_delivery_link(dlv); - decode_message(dlv); - /* Accept the delivery */ - pn_delivery_update(dlv, PN_ACCEPTED); - /* done with the delivery, move to the next and free it */ - pn_link_advance(link); - pn_delivery_settle(dlv); /* dlv is now freed */ - - if (app->message_count == 0) { - /* receive forever - see if more credit is needed */ - if (pn_link_credit(link) < BATCH/2) { - /* Grant enough credit to bring it up to BATCH: */ - pn_link_flow(link, BATCH - pn_link_credit(link)); + case PN_DELIVERY: { /* Incoming message data */ + pn_delivery_t *d = pn_event_delivery(event); + if (pn_delivery_readable(d)) { + pn_link_t *l = pn_delivery_link(d); + size_t size = pn_delivery_pending(d); + pn_rwbytes_t* m = &app->msgin; /* Append data to incoming message buffer */ + m->size += size; + m->start = (char*)realloc(m->start, m->size); + int err = pn_link_recv(l, m->start, m->size); + if (err < 0 && err != PN_EOS) { + fprintf(stderr, "PN_DELIVERY error: %s\n", pn_code(err)); + pn_delivery_settle(d); /* Free the delivery so we can receive the next message */ + m->size = 0; /* forget the data we accumulated */ + } else if (!pn_delivery_partial(d)) { /* Message is complete */ + decode_message(*m); + *m = pn_rwbytes_null; + pn_delivery_update(d, PN_ACCEPTED); + pn_delivery_settle(d); /* settle and free d */ + if (app->message_count == 0) { + /* receive forever - see if more credit is needed */ + if (pn_link_credit(l) < BATCH/2) { + pn_link_flow(l, BATCH - pn_link_credit(l)); + } + } else if (++app->received >= app->message_count) { + printf("%d messages received\n", app->received); + close_all(pn_event_connection(event), app); } - } else if (++app->received >= app->message_count) { - /* done receiving, close the endpoints */ - printf("%d messages received\n", app->received); - pn_session_t *ssn = pn_link_session(link); - pn_link_close(link); - pn_session_close(ssn); - pn_connection_close(pn_session_connection(ssn)); } } - } break; - + break; + } default: break; } @@ -197,8 +202,7 @@ static void handle_send(app_data_t* app, pn_event_t* event) { if (pn_delivery_remote_state(d) == PN_ACCEPTED) { if (++app->acknowledged == app->message_count) { printf("%d messages sent and acknowledged\n", app->acknowledged); - pn_connection_close(pn_event_connection(event)); - /* Continue handling events till we receive TRANSPORT_CLOSED */ + close_all(pn_event_connection(event), app); } } } break; @@ -244,24 +248,25 @@ static bool handle(app_data_t* app, pn_event_t* event) { } case PN_TRANSPORT_CLOSED: - check_condition(event, pn_transport_condition(pn_event_transport(event))); - pn_listener_close(app->listener); /* Finished */ + check_condition(event, pn_transport_condition(pn_event_transport(event)), app); break; case PN_CONNECTION_REMOTE_CLOSE: - check_condition(event, pn_connection_remote_condition(pn_event_connection(event))); - pn_connection_close(pn_event_connection(event)); + check_condition(event, pn_connection_remote_condition(pn_event_connection(event)), app); + pn_connection_close(pn_event_connection(event)); /* Return the close */ break; case PN_SESSION_REMOTE_CLOSE: - check_condition(event, pn_session_remote_condition(pn_event_session(event))); - pn_connection_close(pn_event_connection(event)); + check_condition(event, pn_session_remote_condition(pn_event_session(event)), app); + pn_session_close(pn_event_session(event)); /* Return the close */ + pn_session_free(pn_event_session(event)); break; case PN_LINK_REMOTE_CLOSE: case PN_LINK_REMOTE_DETACH: - check_condition(event, pn_link_remote_condition(pn_event_link(event))); - pn_connection_close(pn_event_connection(event)); + check_condition(event, pn_link_remote_condition(pn_event_link(event)), app); + pn_link_close(pn_event_link(event)); /* Return the close */ + pn_link_free(pn_event_link(event)); break; case PN_PROACTOR_TIMEOUT: @@ -270,7 +275,8 @@ static bool handle(app_data_t* app, pn_event_t* event) { break; case PN_LISTENER_CLOSE: - check_condition(event, pn_listener_condition(pn_event_listener(event))); + app->listener = NULL; /* Listener is closed */ + check_condition(event, pn_listener_condition(pn_event_listener(event)), app); break; case PN_PROACTOR_INACTIVE: @@ -306,12 +312,11 @@ void run(app_data_t *app) { int main(int argc, char **argv) { struct app_data_t app = {0}; - int i = 0; - app.container_id = argv[i++]; /* Should be unique */ - app.host = (argc > 1) ? argv[i++] : ""; - app.port = (argc > 1) ? argv[i++] : "amqp"; - app.amqp_address = (argc > i) ? argv[i++] : "examples"; - app.message_count = (argc > i) ? atoi(argv[i++]) : 10; + app.container_id = argv[0]; /* Should be unique */ + app.host = (argc > 1) ? argv[1] : ""; + app.port = (argc > 2) ? argv[2] : "amqp"; + app.amqp_address = (argc > 3) ? argv[3] : "examples"; + app.message_count = (argc > 4) ? atoi(argv[4]) : 10; /* Create the proactor and connect */ app.proactor = pn_proactor(); @@ -321,6 +326,7 @@ int main(int argc, char **argv) { pn_proactor_listen(app.proactor, app.listener, addr, 16); run(&app); pn_proactor_free(app.proactor); - free(app.message_buffer.start); + free(app.msgout.start); + free(app.msgin.start); return exit_code; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/58dfb4aa/examples/c/example_test.py ---------------------------------------------------------------------- diff --git a/examples/c/example_test.py b/examples/c/example_test.py index 02bb1fd..ee3e4e4 100644 --- a/examples/c/example_test.py +++ b/examples/c/example_test.py @@ -26,8 +26,13 @@ def python_cmd(name): dir = os.path.dirname(__file__) return [sys.executable, os.path.join(dir, "..", "..", "python", name)] -def receive_expect(n): - return ''.join('{"sequence"=%s}\n'%i for i in xrange(1, n+1)) + "%s messages received\n"%n +def receive_expect_messages(n=10): return ''.join(['{"sequence"=%s}\n'%i for i in xrange(1, n+1)]) +def receive_expect_total(n=10): return "%s messages received\n"%n +def receive_expect(n=10): return receive_expect_messages(n)+receive_expect_total(n) + +def send_expect(n=10): return "%s messages sent and acknowledged\n" % n +def send_abort_expect(n=10): return "%s messages started and aborted\n" % n + class Broker(object): def __init__(self, test): @@ -51,38 +56,64 @@ class Broker(object): class CExampleTest(ProcTestCase): + def runex(self, name, port, *args): + """Run an example with standard arugments, return output""" + return self.proc([name, "", port, "examples"] + list(args)).wait_exit() + def test_send_receive(self): """Send first then receive""" with Broker(self) as b: - s = self.proc(["send", "", b.port]) - self.assertEqual("10 messages sent and acknowledged\n", s.wait_exit()) - r = self.proc(["receive", "", b.port]) - self.assertEqual(receive_expect(10), r.wait_exit()) + self.assertEqual(send_expect(), self.runex("send", b.port)) + self.assertMultiLineEqual(receive_expect(), self.runex("receive", b.port)) def test_receive_send(self): """Start receiving first, then send.""" with Broker(self) as b: - r = self.proc(["receive", "", b.port]); - s = self.proc(["send", "", b.port]); - self.assertEqual("10 messages sent and acknowledged\n", s.wait_exit()) - self.assertEqual(receive_expect(10), r.wait_exit()) + self.assertEqual(send_expect(), self.runex("send", b.port)) + self.assertMultiLineEqual(receive_expect(), self.runex("receive", b.port)) def test_send_direct(self): """Send to direct server""" with TestPort() as tp: d = self.proc(["direct", "", tp.port]) d.wait_re("listening") - self.assertEqual("10 messages sent and acknowledged\n", self.proc(["send", "", tp.port]).wait_exit()) - self.assertIn(receive_expect(10), d.wait_exit()) + self.assertEqual(send_expect(), self.runex("send", tp.port)) + self.assertMultiLineEqual("listening\n"+receive_expect(), d.wait_exit()) def test_receive_direct(self): """Receive from direct server""" with TestPort() as tp: d = self.proc(["direct", "", tp.port]) d.wait_re("listening") - self.assertEqual(receive_expect(10), self.proc(["receive", "", tp.port]).wait_exit()) - self.assertIn("10 messages sent and acknowledged\n", d.wait_exit()) + self.assertMultiLineEqual(receive_expect(), self.runex("receive", tp.port)) + self.assertEqual("listening\n10 messages sent and acknowledged\n", d.wait_exit()) + def test_send_abort_broker(self): + """Sending aborted messages to a broker""" + with Broker(self) as b: + self.assertEqual(send_expect(), self.runex("send", b.port)) + self.assertEqual(send_abort_expect(), self.runex("send-abort", b.port)) + b.proc.wait_re("PN_DELIVERY error: PN_ABORTED\n"*10) + self.assertEqual(send_expect(), self.runex("send", b.port)) + expect = receive_expect_messages(10)+receive_expect_messages(10)+receive_expect_total(20) + self.assertMultiLineEqual(expect, self.runex("receive", b.port, "20")) + + def test_send_abort_direct(self): + """Send aborted messages to the direct server""" + with TestPort() as tp: + d = self.proc(["direct", "", tp.port, "examples", "20"]) + expect = "listening\n" + d.wait_re(expect) + self.assertEqual(send_expect(), self.runex("send", tp.port)) + expect += receive_expect_messages() + d.wait_re(expect) + self.assertEqual(send_abort_expect(), self.runex("send-abort", tp.port)) + expect += "PN_DELIVERY error: PN_ABORTED\n"*10 + d.wait_re(expect) + self.assertEqual(send_expect(), self.runex("send", tp.port)) + expect += receive_expect_messages()+receive_expect_total(20) + self.maxDiff = None + self.assertMultiLineEqual(expect, d.wait_exit()) if __name__ == "__main__": unittest.main() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/58dfb4aa/examples/c/receive.c ---------------------------------------------------------------------- diff --git a/examples/c/receive.c b/examples/c/receive.c index 1b3ac1a..7df955a 100644 --- a/examples/c/receive.c +++ b/examples/c/receive.c @@ -40,7 +40,7 @@ typedef struct app_data_t { pn_proactor_t *proactor; int received; bool finished; - pn_rwbytes_t receiving; /* Partially received message */ + pn_rwbytes_t msgin; /* Partially received message */ } app_data_t; static const int BATCH = 1000; /* Batch size for unlimited receive */ @@ -93,44 +93,40 @@ static bool handle(app_data_t* app, pn_event_t* event) { case PN_DELIVERY: { /* A message has been received */ pn_delivery_t *d = pn_event_delivery(event); - pn_link_t *r = pn_delivery_link(d); - if (!pn_delivery_readable(d)) break; - for (size_t p = pn_delivery_pending(d); p > 0; p = pn_delivery_pending(d)) { - /* Append data to the receving buffer */ - app->receiving.size += p; - app->receiving.start = (char*)realloc(app->receiving.start, app->receiving.size); - int recv = pn_link_recv(r, app->receiving.start + app->receiving.size - p, p); - if (recv < 0 && recv != PN_EOS) { - fprintf(stderr, "PN_DELIVERY: pn_link_recv error %s\n", pn_code(recv)); - exit_code = 1; - break; - } - } - if (!pn_delivery_partial(d)) { - decode_message(app->receiving); - app->receiving = pn_rwbytes_null; - /* Accept the delivery */ - pn_delivery_update(d, PN_ACCEPTED); - /* done with the delivery, move to the next and free it */ - pn_link_advance(r); - pn_delivery_settle(d); /* d is now freed */ - - if (app->message_count == 0) { - /* receive forever - see if more credit is needed */ - if (pn_link_credit(r) < BATCH/2) { - /* Grant enough credit to bring it up to BATCH: */ - pn_link_flow(r, BATCH - pn_link_credit(r)); + if (pn_delivery_readable(d)) { + pn_link_t *l = pn_delivery_link(d); + size_t size = pn_delivery_pending(d); + pn_rwbytes_t* m = &app->msgin; /* Append data to incoming message buffer */ + m->size += size; + m->start = (char*)realloc(m->start, m->size); + int err = pn_link_recv(l, m->start, m->size); + if (err < 0 && err != PN_EOS) { + fprintf(stderr, "PN_DELIVERY error: %s\n", pn_code(err)); + pn_delivery_settle(d); /* Free the delivery so we can receive the next message */ + m->size = 0; /* forget the data we accumulated */ + } else if (!pn_delivery_partial(d)) { /* Message is complete */ + decode_message(*m); + *m = pn_rwbytes_null; /* Reset the buffer for the next message*/ + /* Accept the delivery */ + pn_delivery_update(d, PN_ACCEPTED); + pn_delivery_settle(d); /* settle and free d */ + if (app->message_count == 0) { + /* receive forever - see if more credit is needed */ + if (pn_link_credit(l) < BATCH/2) { + /* Grant enough credit to bring it up to BATCH: */ + pn_link_flow(l, BATCH - pn_link_credit(l)); + } + } else if (++app->received >= app->message_count) { + printf("%d messages received\n", app->received); + pn_session_t *ssn = pn_link_session(l); + pn_link_close(l); + pn_session_close(ssn); + pn_connection_close(pn_session_connection(ssn)); } - } else if (++app->received >= app->message_count) { - /* done receiving, close the endpoints */ - printf("%d messages received\n", app->received); - pn_session_t *ssn = pn_link_session(r); - pn_link_close(r); - pn_session_close(ssn); - pn_connection_close(pn_session_connection(ssn)); } } - } break; + break; + } case PN_TRANSPORT_CLOSED: check_condition(event, pn_transport_condition(pn_event_transport(event))); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
