PROTON-1512: c examples: fix thread safety issue in broker

Replace global receiving buffer with buffer-per-link to accumulate messages 
correctly.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5b1be877
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5b1be877
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5b1be877

Branch: refs/heads/master
Commit: 5b1be8771faeafa7401d5f96705d38d266c61206
Parents: ea02b93
Author: Alan Conway <[email protected]>
Authored: Wed Sep 27 10:17:28 2017 -0400
Committer: Alan Conway <[email protected]>
Committed: Wed Sep 27 14:10:59 2017 -0400

----------------------------------------------------------------------
 examples/c/broker.c | 54 +++++++++++++++++++++++++++++-------------------
 1 file changed, 33 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5b1be877/examples/c/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/broker.c b/examples/c/broker.c
index 9129c3f..c2e4c16 100644
--- a/examples/c/broker.c
+++ b/examples/c/broker.c
@@ -119,20 +119,24 @@ static void queue_send(queue_t *q, pn_link_t *s) {
   }
 }
 
-/* Data associated with each broker connection */
-typedef struct broker_data_t {
-  bool check_queues;          /* Check senders on the connection for available 
data in queues. */
-} broker_data_t;
-
-/* Use the context pointer as a boolean flag to indicate we need to check 
queues */
-void pn_connection_set_check_queues(pn_connection_t *c, bool check) {
+/* Use the connection context pointer as a boolean flag to indicate we need to 
check queues */
+void set_check_queues(pn_connection_t *c, bool check) {
   pn_connection_set_context(c, (void*)check);
 }
 
-bool pn_connection_get_check_queues(pn_connection_t *c) {
+bool get_check_queues(pn_connection_t *c) {
   return (bool)pn_connection_get_context(c);
 }
 
+/* Use a buffer per link to accumulate message data - message can arrive in 
multiple deliveries,
+   and the broker can receive messages on many concurrently. */
+pn_rwbytes_t *message_buffer(pn_link_t *l) {
+  if (!pn_link_get_context(l)) {
+    pn_link_set_context(l, calloc(1, sizeof(pn_rwbytes_t)));
+  }
+  return (pn_rwbytes_t*)pn_link_get_context(l);
+}
+
 /* Put a message on the queue, called in receiver dispatch loop.
    If the queue was previously empty, notify waiting senders.
 */
@@ -142,7 +146,7 @@ static void queue_receive(pn_proactor_t *d, queue_t *q, 
pn_rwbytes_t m) {
   if (q->messages.len == 1) { /* Was empty, notify waiting connections */
     for (size_t i = 0; i < q->waiting.len; ++i) {
       pn_connection_t *c = q->waiting.data[i];
-      pn_connection_set_check_queues(c, true);
+      set_check_queues(c, true);
       pn_connection_wake(c); /* Wake the connection */
     }
     q->waiting.len = 0;
@@ -192,7 +196,6 @@ typedef struct broker_t {
   const char *container_id;     /* AMQP container-id */
   queues_t queues;
   bool finished;
-  pn_rwbytes_t receiving;       /* Partially received message data */
 } broker_t;
 
 void broker_stop(broker_t *b) {
@@ -285,8 +288,8 @@ static void handle(broker_t* b, pn_event_t* e) {
      break;
    }
    case PN_CONNECTION_WAKE: {
-     if (pn_connection_get_check_queues(c)) {
-       pn_connection_set_check_queues(c, false);
+     if (get_check_queues(c)) {
+       set_check_queues(c, false);
        int flags = PN_LOCAL_ACTIVE&PN_REMOTE_ACTIVE;
        for (pn_link_t *l = pn_link_head(c, flags); l != NULL; l = 
pn_link_next(l, flags))
          link_send(b, l);
@@ -314,15 +317,24 @@ static void handle(broker_t* b, pn_event_t* e) {
      link_send(b, pn_event_link(e));
      break;
    }
-   case PN_DELIVERY: {
+   case PN_LINK_FINAL: {
+     pn_rwbytes_t *buf = (pn_rwbytes_t*)pn_link_get_context(pn_event_link(e));
+     if (buf) {
+       free(buf->start);
+       free(buf);
+     }
+     break;
+   }
+   case PN_DELIVERY: {          /* Incoming message data */
      pn_delivery_t *d = pn_event_delivery(e);
-     pn_link_t *r = pn_delivery_link(d);
+     pn_link_t *l = pn_delivery_link(d);
      if (!pn_delivery_readable(d)) break;
+     pn_rwbytes_t *m = message_buffer(l);
      for (size_t p = pn_delivery_pending(d); p > 0; p = 
pn_delivery_pending(d)) {
        /* Append data to the reeving buffer */
-       b->receiving.size += p;
-       b->receiving.start = (char*)realloc(b->receiving.start, 
b->receiving.size);
-       int recv = pn_link_recv(r, b->receiving.start + b->receiving.size - p, 
p);
+       m->size += p;
+       m->start = (char*)realloc(m->start, m->size);
+       int recv = pn_link_recv(l, m->start + m->size - p, p);
        if (recv < 0 && recv != PN_EOS) {
          fprintf(stderr, "PN_DELIVERY: pn_link_recv error %s\n", 
pn_code(recv));
          break;
@@ -330,12 +342,12 @@ static void handle(broker_t* b, pn_event_t* e) {
      }
      if (!pn_delivery_partial(d)) {
        /* The broker does not decode the message, just forwards it. */
-       const char *qname = pn_terminus_get_address(pn_link_target(r));
-       queue_receive(b->proactor, queues_get(&b->queues, qname), b->receiving);
-       b->receiving = pn_rwbytes_null;
+       const char *qname = pn_terminus_get_address(pn_link_target(l));
+       queue_receive(b->proactor, queues_get(&b->queues, qname), *m);
+       *m = pn_rwbytes_null;
        pn_delivery_update(d, PN_ACCEPTED);
        pn_delivery_settle(d);
-       pn_link_flow(r, WINDOW - pn_link_credit(r));
+       pn_link_flow(l, WINDOW - pn_link_credit(l));
      }
      break;
    }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to