Author: rhs
Date: Wed May 29 13:44:43 2013
New Revision: 1487485

URL: http://svn.apache.org/r1487485
Log:
replaced delivery buffer data structure with delivery map

Modified:
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/engine/engine.c

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1487485&r1=1487484&r2=1487485&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Wed May 29 13:44:43 
2013
@@ -56,18 +56,16 @@ struct pn_endpoint_t {
 };
 
 typedef struct {
-  pn_delivery_t *delivery;
   pn_sequence_t id;
   bool sent;
+  bool init;
 } pn_delivery_state_t;
 
 typedef struct {
-  pn_sequence_t next;
   size_t capacity;
-  size_t head;
-  size_t size;
-  pn_delivery_state_t *deliveries;
-} pn_delivery_buffer_t;
+  pn_sequence_t next;
+  pn_hash_t *deliveries;
+} pn_delivery_map_t;
 
 typedef struct {
   pn_link_t *link;
@@ -84,8 +82,8 @@ typedef struct {
   uint16_t local_channel;
   uint16_t remote_channel;
   bool incoming_init;
-  pn_delivery_buffer_t incoming;
-  pn_delivery_buffer_t outgoing;
+  pn_delivery_map_t incoming;
+  pn_delivery_map_t outgoing;
   pn_sequence_t incoming_transfer_count;
   pn_sequence_t incoming_window;
   pn_sequence_t outgoing_transfer_count;
@@ -258,8 +256,8 @@ struct pn_delivery_t {
   bool tpwork;
   pn_buffer_t *bytes;
   bool done;
-  void *transport_context;
   void *context;
+  pn_delivery_state_t state;
 };
 
 #define PN_SET_LOCAL(OLD, NEW)                                          \

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1487485&r1=1487484&r2=1487485&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Wed May 29 13:44:43 2013
@@ -37,92 +37,60 @@ static ssize_t transport_consume(pn_tran
 
 // delivery buffers
 
-void pn_delivery_buffer_init(pn_delivery_buffer_t *db, pn_sequence_t next, 
size_t capacity)
+void pn_delivery_map_init(pn_delivery_map_t *db, pn_sequence_t next, size_t 
capacity)
 {
-  // XXX: error handling
-  db->deliveries = (pn_delivery_state_t *) malloc(sizeof(pn_delivery_state_t) 
* capacity);
-  db->next = next;
+  db->deliveries = pn_hash(capacity, 0.75, PN_REFCOUNT);
   db->capacity = capacity;
-  db->head = 0;
-  db->size = 0;
-}
-
-void pn_delivery_buffer_free(pn_delivery_buffer_t *db)
-{
-  free(db->deliveries);
-}
-
-size_t pn_delivery_buffer_size(pn_delivery_buffer_t *db)
-{
-  return db->size;
-}
-
-size_t pn_delivery_buffer_available(pn_delivery_buffer_t *db)
-{
-  return db->capacity - db->size;
-}
-
-bool pn_delivery_buffer_empty(pn_delivery_buffer_t *db)
-{
-  return db->size == 0;
-}
-
-pn_delivery_state_t *pn_delivery_buffer_get(pn_delivery_buffer_t *db, size_t 
index)
-{
-  if (index < db->size) return db->deliveries + ((db->head + index) % 
db->capacity);
-  else return NULL;
+  db->next = next;
 }
 
-pn_delivery_state_t *pn_delivery_buffer_head(pn_delivery_buffer_t *db)
+void pn_delivery_map_free(pn_delivery_map_t *db)
 {
-  if (db->size) return db->deliveries + db->head;
-  else return NULL;
+  pn_free(db->deliveries);
 }
 
-pn_delivery_state_t *pn_delivery_buffer_tail(pn_delivery_buffer_t *db)
+size_t pn_delivery_map_available(pn_delivery_map_t *db)
 {
-  if (db->size) return pn_delivery_buffer_get(db, db->size - 1);
-  else return NULL;
+  return db->capacity - pn_hash_size(db->deliveries);
 }
 
-pn_sequence_t pn_delivery_buffer_lwm(pn_delivery_buffer_t *db)
+pn_delivery_t *pn_delivery_map_get(pn_delivery_map_t *db, pn_sequence_t id)
 {
-  if (db->size) return pn_delivery_buffer_head(db)->id;
-  else return db->next;
+  return (pn_delivery_t *) pn_hash_get(db->deliveries, id);
 }
 
 static void pn_delivery_state_init(pn_delivery_state_t *ds, pn_delivery_t 
*delivery, pn_sequence_t id)
 {
-  ds->delivery = delivery;
   ds->id = id;
   ds->sent = false;
+  ds->init = true;
 }
 
-pn_delivery_state_t *pn_delivery_buffer_push(pn_delivery_buffer_t *db, 
pn_delivery_t *delivery)
+pn_delivery_state_t *pn_delivery_map_push(pn_delivery_map_t *db, pn_delivery_t 
*delivery)
 {
-  if (!pn_delivery_buffer_available(db))
+  if (!pn_delivery_map_available(db))
     return NULL;
-  db->size++;
-  pn_delivery_state_t *ds = pn_delivery_buffer_tail(db);
+  pn_delivery_state_t *ds = &delivery->state;
   pn_delivery_state_init(ds, delivery, db->next++);
+  pn_hash_put(db->deliveries, ds->id, delivery);
   return ds;
 }
 
-bool pn_delivery_buffer_pop(pn_delivery_buffer_t *db)
+void pn_delivery_map_del(pn_delivery_map_t *db, pn_delivery_t *delivery)
 {
-  if (db->size) {
-    db->head = (db->head + 1) % db->capacity;
-    db->size--;
-    return true;
-  } else {
-    return false;
-  }
+  pn_hash_del(db->deliveries, delivery->state.id);
+  delivery->state.init = false;
 }
 
-void pn_delivery_buffer_gc(pn_delivery_buffer_t *db)
+void pn_delivery_map_clear(pn_delivery_map_t *dm)
 {
-  while (db->size && !pn_delivery_buffer_head(db)->delivery) {
-    pn_delivery_buffer_pop(db);
+  pn_hash_t *hash = dm->deliveries;
+  for (pn_handle_t entry = pn_hash_head(hash);
+       entry;
+       entry = pn_hash_next(hash, entry))
+  {
+    pn_delivery_t *dlv = (pn_delivery_t *) pn_hash_value(hash, entry);
+    pn_delivery_map_del(dm, dlv);
   }
 }
 
@@ -642,8 +610,8 @@ static void pn_session_finalize(void *ob
   pn_free(session->links);
   pn_endpoint_tini(&session->endpoint);
 
-  pn_delivery_buffer_free(&session->state.incoming);
-  pn_delivery_buffer_free(&session->state.outgoing);
+  pn_delivery_map_free(&session->state.incoming);
+  pn_delivery_map_free(&session->state.outgoing);
   pn_free(session->state.local_handles);
   pn_free(session->state.remote_handles);
 }
@@ -662,9 +630,12 @@ pn_session_t *pn_session(pn_connection_t
   ssn->context = 0;
 
   // begin transport state
-  ssn->state = (pn_session_state_t) {ssn, (uint16_t)-1, (uint16_t)-1};
-  pn_delivery_buffer_init(&ssn->state.incoming, 0, PN_SESSION_WINDOW);
-  pn_delivery_buffer_init(&ssn->state.outgoing, 0, PN_SESSION_WINDOW);
+  memset(&ssn->state, 0, sizeof(ssn->state));
+  ssn->state.session = ssn;
+  ssn->state.local_channel = (uint16_t)-1;
+  ssn->state.remote_channel = (uint16_t)-1;
+  pn_delivery_map_init(&ssn->state.incoming, 0, PN_SESSION_WINDOW);
+  pn_delivery_map_init(&ssn->state.outgoing, 0, PN_SESSION_WINDOW);
   ssn->state.local_handles = pn_hash(0, 0.75, PN_REFCOUNT);
   ssn->state.remote_handles = pn_hash(0, 0.75, PN_REFCOUNT);
   // end transport state
@@ -838,21 +809,11 @@ int pn_transport_unbind(pn_transport_t *
   transport->connection = NULL;
   conn->transport = NULL;
 
-  pn_link_t *link = pn_link_head(conn, 0);
-  while (link) {
-    pn_delivery_t *dlv = link->unsettled_head;
-    while (dlv) {
-      dlv->transport_context = NULL;
-      dlv = dlv->unsettled_next;
-    }
-
-    dlv = link->settled_head;
-    while (dlv) {
-      dlv->transport_context = NULL;
-      dlv = dlv->settled_next;
-    }
-
-    link = pn_link_next(link, 0);
+  pn_session_t *ssn = pn_session_head(conn, 0);
+  while (ssn) {
+    pn_delivery_map_clear(&ssn->state.incoming);
+    pn_delivery_map_clear(&ssn->state.outgoing);
+    ssn = pn_session_next(ssn, 0);
   }
 
   pn_endpoint_t *endpoint = conn->endpoint_head;
@@ -1190,9 +1151,12 @@ pn_delivery_t *pn_delivery(pn_link_t *li
   delivery->tpwork = false;
   pn_buffer_clear(delivery->bytes);
   delivery->done = false;
-  delivery->transport_context = NULL;
   delivery->context = NULL;
 
+  // begin delivery state
+  delivery->state.init = false;
+  // end delivery state
+
   if (!link->current)
     link->current = delivery;
 
@@ -1332,14 +1296,13 @@ void pn_real_settle(pn_delivery_t *deliv
   delivery->settled = true;
 }
 
-void pn_full_settle(pn_delivery_buffer_t *db, pn_delivery_t *delivery)
+void pn_full_settle(pn_delivery_map_t *db, pn_delivery_t *delivery)
 {
   assert(!delivery->work);
-  pn_delivery_state_t *state = (pn_delivery_state_t *) 
delivery->transport_context;
-  delivery->transport_context = NULL;
-  if (state) state->delivery = NULL;
+  if (delivery->state.init) {
+    pn_delivery_map_del(db, delivery);
+  }
   pn_real_settle(delivery);
-  if (state) pn_delivery_buffer_gc(db);
   pn_clear_tpwork(delivery);
 }
 
@@ -1615,9 +1578,9 @@ int pn_do_transfer(pn_dispatcher_t *disp
   if (link->unsettled_tail && !link->unsettled_tail->done) {
     delivery = link->unsettled_tail;
   } else {
-    pn_delivery_buffer_t *incoming = &ssn_state->incoming;
+    pn_delivery_map_t *incoming = &ssn_state->incoming;
 
-    if (!pn_delivery_buffer_available(incoming)) {
+    if (!pn_delivery_map_available(incoming)) {
       return pn_do_error(transport, "amqp:session:window-violation", "incoming 
session window exceeded");
     }
 
@@ -1627,8 +1590,7 @@ int pn_do_transfer(pn_dispatcher_t *disp
     }
 
     delivery = pn_delivery(link, pn_dtag(tag.start, tag.size));
-    pn_delivery_state_t *state = pn_delivery_buffer_push(incoming, delivery);
-    delivery->transport_context = state;
+    pn_delivery_state_t *state = pn_delivery_map_push(incoming, delivery);
     if (id_present && id != state->id) {
       int err = pn_do_error(transport, "amqp:session:invalid-field",
                             "sequencing error, expected delivery-id %u, got 
%u",
@@ -1736,19 +1698,15 @@ int pn_do_disposition(pn_dispatcher_t *d
     }
   }
 
-  pn_delivery_buffer_t *deliveries;
+  pn_delivery_map_t *deliveries;
   if (role) {
     deliveries = &ssn_state->outgoing;
   } else {
     deliveries = &ssn_state->incoming;
   }
 
-  pn_sequence_t lwm = pn_delivery_buffer_lwm(deliveries);
-
   for (pn_sequence_t id = first; id <= last; id++) {
-    if (id < lwm) continue;
-    pn_delivery_state_t *state = pn_delivery_buffer_get(deliveries, id - lwm);
-    pn_delivery_t *delivery = state->delivery;
+    pn_delivery_t *delivery = pn_delivery_map_get(deliveries, id);
     if (delivery) {
       delivery->remote_state = dispo;
       delivery->remote_settled = settled;
@@ -1993,8 +1951,8 @@ bool pn_delivery_buffered(pn_delivery_t 
 {
   if (delivery->settled) return false;
   if (pn_link_is_sender(delivery->link)) {
-    pn_delivery_state_t *state = (pn_delivery_state_t *) 
delivery->transport_context;
-    if (state) {
+    pn_delivery_state_t *state = &delivery->state;
+    if (state->init) {
       return (delivery->done && !state->sent) || 
pn_buffer_size(delivery->bytes) > 0;
     } else {
       return delivery->done;
@@ -2051,8 +2009,8 @@ int pn_process_ssn_setup(pn_transport_t 
       pn_post_frame(transport->disp, channel, "DL[?HIII]", BEGIN,
                     ((int16_t) state->remote_channel >= 0), 
state->remote_channel,
                     state->outgoing_transfer_count,
-                    pn_delivery_buffer_available(&state->incoming),
-                    pn_delivery_buffer_available(&state->outgoing));
+                    pn_delivery_map_available(&state->incoming),
+                    pn_delivery_map_available(&state->outgoing));
       state->local_channel = channel;
       pn_hash_put(transport->local_channels, channel, ssn);
     }
@@ -2123,13 +2081,13 @@ int pn_process_link_setup(pn_transport_t
 
 int pn_post_flow(pn_transport_t *transport, pn_session_state_t *ssn_state, 
pn_link_state_t *state)
 {
-  ssn_state->incoming_window = 
pn_delivery_buffer_available(&ssn_state->incoming);
+  ssn_state->incoming_window = pn_delivery_map_available(&ssn_state->incoming);
   bool link = (bool) state;
   return pn_post_frame(transport->disp, ssn_state->local_channel, 
"DL[?IIII?I?I?In?o]", FLOW,
                        (int16_t) ssn_state->remote_channel >= 0, 
ssn_state->incoming_transfer_count,
                        ssn_state->incoming_window,
                        ssn_state->outgoing.next,
-                       pn_delivery_buffer_available(&ssn_state->outgoing),
+                       pn_delivery_map_available(&ssn_state->outgoing),
                        link, state ? state->local_handle : 0,
                        link, state ? state->delivery_count : 0,
                        link, state ? state->link_credit : 0,
@@ -2178,8 +2136,8 @@ int pn_post_disp(pn_transport_t *transpo
   pn_link_t *link = delivery->link;
   pn_session_state_t *ssn_state = pn_session_get_state(transport, 
link->session);
   pn_modified(transport->connection, &link->session->endpoint);
-  // XXX: check for null state
-  pn_delivery_state_t *state = (pn_delivery_state_t *) 
delivery->transport_context;
+  pn_delivery_state_t *state = &delivery->state;
+  assert(state->init);
   uint64_t code;
   switch(delivery->local_state) {
   case PN_ACCEPTED:
@@ -2233,10 +2191,9 @@ int pn_process_tpwork_sender(pn_transpor
   pn_session_state_t *ssn_state = pn_session_get_state(transport, 
link->session);
   pn_link_state_t *link_state = pn_link_get_state(ssn_state, link);
   if ((int16_t) ssn_state->local_channel >= 0 && (int32_t) 
link_state->local_handle >= 0) {
-    pn_delivery_state_t *state = (pn_delivery_state_t *) 
delivery->transport_context;
-    if (!(*allocation_blocked) && !state && 
pn_delivery_buffer_available(&ssn_state->outgoing)) {
-      state = pn_delivery_buffer_push(&ssn_state->outgoing, delivery);
-      delivery->transport_context = state;
+    pn_delivery_state_t *state = delivery->state.init ? &delivery->state : 
NULL;
+    if (!(*allocation_blocked) && !state && 
pn_delivery_map_available(&ssn_state->outgoing)) {
+      state = pn_delivery_map_push(&ssn_state->outgoing, delivery);
     } else {
       *allocation_blocked = true;
     }
@@ -2266,7 +2223,7 @@ int pn_process_tpwork_sender(pn_transpor
     }
   }
 
-  pn_delivery_state_t *state = (pn_delivery_state_t *) 
delivery->transport_context;
+  pn_delivery_state_t *state = delivery->state.init ? &delivery->state : NULL;
   // XXX: need to prevent duplicate disposition sending
   if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled
       && state && state->sent) {
@@ -2286,16 +2243,16 @@ int pn_process_tpwork_receiver(pn_transp
   pn_link_t *link = delivery->link;
   // XXX: need to prevent duplicate disposition sending
   pn_session_state_t *ssn_state = pn_session_get_state(transport, 
link->session);
-  if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled && 
delivery->transport_context) {
+  if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled && 
delivery->state.init) {
     int err = pn_post_disp(transport, delivery);
     if (err) return err;
   }
 
   if (delivery->local_settled) {
-    size_t available = pn_delivery_buffer_available(&ssn_state->incoming);
+    size_t available = pn_delivery_map_available(&ssn_state->incoming);
     pn_full_settle(&ssn_state->incoming, delivery);
     if (!ssn_state->incoming_window &&
-        pn_delivery_buffer_available(&ssn_state->incoming) > available) {
+        pn_delivery_map_available(&ssn_state->incoming) > available) {
       int err = pn_post_flow(transport, ssn_state, NULL);
       if (err) return err;
     }
@@ -2313,7 +2270,7 @@ int pn_process_tpwork(pn_transport_t *tr
     bool allocation_blocked = false;
     while (delivery)
     {
-      if (!delivery->transport_context && transport->disp->available > 0) {
+      if (!delivery->state.init && transport->disp->available > 0) {
         break;
       }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to