PROTON-1512: c examples: fix thread safety issue in broker Replace global receiving buffer with buffer-per-link to accumulate messages correctly.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5b1be877 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5b1be877 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5b1be877 Branch: refs/heads/master Commit: 5b1be8771faeafa7401d5f96705d38d266c61206 Parents: ea02b93 Author: Alan Conway <[email protected]> Authored: Wed Sep 27 10:17:28 2017 -0400 Committer: Alan Conway <[email protected]> Committed: Wed Sep 27 14:10:59 2017 -0400 ---------------------------------------------------------------------- examples/c/broker.c | 54 +++++++++++++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5b1be877/examples/c/broker.c ---------------------------------------------------------------------- diff --git a/examples/c/broker.c b/examples/c/broker.c index 9129c3f..c2e4c16 100644 --- a/examples/c/broker.c +++ b/examples/c/broker.c @@ -119,20 +119,24 @@ static void queue_send(queue_t *q, pn_link_t *s) { } } -/* Data associated with each broker connection */ -typedef struct broker_data_t { - bool check_queues; /* Check senders on the connection for available data in queues. */ -} broker_data_t; - -/* Use the context pointer as a boolean flag to indicate we need to check queues */ -void pn_connection_set_check_queues(pn_connection_t *c, bool check) { +/* Use the connection context pointer as a boolean flag to indicate we need to check queues */ +void set_check_queues(pn_connection_t *c, bool check) { pn_connection_set_context(c, (void*)check); } -bool pn_connection_get_check_queues(pn_connection_t *c) { +bool get_check_queues(pn_connection_t *c) { return (bool)pn_connection_get_context(c); } +/* Use a buffer per link to accumulate message data - message can arrive in multiple deliveries, + and the broker can receive messages on many concurrently. */ +pn_rwbytes_t *message_buffer(pn_link_t *l) { + if (!pn_link_get_context(l)) { + pn_link_set_context(l, calloc(1, sizeof(pn_rwbytes_t))); + } + return (pn_rwbytes_t*)pn_link_get_context(l); +} + /* Put a message on the queue, called in receiver dispatch loop. If the queue was previously empty, notify waiting senders. */ @@ -142,7 +146,7 @@ static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) { if (q->messages.len == 1) { /* Was empty, notify waiting connections */ for (size_t i = 0; i < q->waiting.len; ++i) { pn_connection_t *c = q->waiting.data[i]; - pn_connection_set_check_queues(c, true); + set_check_queues(c, true); pn_connection_wake(c); /* Wake the connection */ } q->waiting.len = 0; @@ -192,7 +196,6 @@ typedef struct broker_t { const char *container_id; /* AMQP container-id */ queues_t queues; bool finished; - pn_rwbytes_t receiving; /* Partially received message data */ } broker_t; void broker_stop(broker_t *b) { @@ -285,8 +288,8 @@ static void handle(broker_t* b, pn_event_t* e) { break; } case PN_CONNECTION_WAKE: { - if (pn_connection_get_check_queues(c)) { - pn_connection_set_check_queues(c, false); + if (get_check_queues(c)) { + set_check_queues(c, false); int flags = PN_LOCAL_ACTIVE&PN_REMOTE_ACTIVE; for (pn_link_t *l = pn_link_head(c, flags); l != NULL; l = pn_link_next(l, flags)) link_send(b, l); @@ -314,15 +317,24 @@ static void handle(broker_t* b, pn_event_t* e) { link_send(b, pn_event_link(e)); break; } - case PN_DELIVERY: { + case PN_LINK_FINAL: { + pn_rwbytes_t *buf = (pn_rwbytes_t*)pn_link_get_context(pn_event_link(e)); + if (buf) { + free(buf->start); + free(buf); + } + break; + } + case PN_DELIVERY: { /* Incoming message data */ pn_delivery_t *d = pn_event_delivery(e); - pn_link_t *r = pn_delivery_link(d); + pn_link_t *l = pn_delivery_link(d); if (!pn_delivery_readable(d)) break; + pn_rwbytes_t *m = message_buffer(l); for (size_t p = pn_delivery_pending(d); p > 0; p = pn_delivery_pending(d)) { /* Append data to the reeving buffer */ - b->receiving.size += p; - b->receiving.start = (char*)realloc(b->receiving.start, b->receiving.size); - int recv = pn_link_recv(r, b->receiving.start + b->receiving.size - p, p); + m->size += p; + m->start = (char*)realloc(m->start, m->size); + int recv = pn_link_recv(l, m->start + m->size - p, p); if (recv < 0 && recv != PN_EOS) { fprintf(stderr, "PN_DELIVERY: pn_link_recv error %s\n", pn_code(recv)); break; @@ -330,12 +342,12 @@ static void handle(broker_t* b, pn_event_t* e) { } if (!pn_delivery_partial(d)) { /* The broker does not decode the message, just forwards it. */ - const char *qname = pn_terminus_get_address(pn_link_target(r)); - queue_receive(b->proactor, queues_get(&b->queues, qname), b->receiving); - b->receiving = pn_rwbytes_null; + const char *qname = pn_terminus_get_address(pn_link_target(l)); + queue_receive(b->proactor, queues_get(&b->queues, qname), *m); + *m = pn_rwbytes_null; pn_delivery_update(d, PN_ACCEPTED); pn_delivery_settle(d); - pn_link_flow(r, WINDOW - pn_link_credit(r)); + pn_link_flow(l, WINDOW - pn_link_credit(l)); } break; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
