Author: rhs
Date: Sun Nov  4 23:19:27 2012
New Revision: 1405664

URL: http://svn.apache.org/viewvc?rev=1405664&view=rev
Log:
fixed deadlock with acks, removed some redundant gc of the tracker history

Modified:
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/proton-c/src/messenger.c

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1405664&r1=1405663&r2=1405664&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Sun Nov  4 23:19:27 2012
@@ -1669,10 +1669,12 @@ int pn_do_disposition(pn_dispatcher_t *d
     if (id < lwm) continue;
     pn_delivery_state_t *state = pn_delivery_buffer_get(deliveries, id - lwm);
     pn_delivery_t *delivery = state->delivery;
-    delivery->remote_state = dispo;
-    delivery->remote_settled = settled;
-    delivery->updated = true;
-    pn_work_update(transport->connection, delivery);
+    if (delivery) {
+      delivery->remote_state = dispo;
+      delivery->remote_settled = settled;
+      delivery->updated = true;
+      pn_work_update(transport->connection, delivery);
+    }
   }
 
   return 0;
@@ -2146,6 +2148,10 @@ int pn_process_tpwork(pn_transport_t *tr
     pn_delivery_t *delivery = conn->tpwork_head;
     while (delivery)
     {
+      if (!delivery->transport_context && transport->disp->available > 0) {
+        break;
+      }
+
       pn_link_t *link = delivery->link;
       if (pn_link_is_sender(link)) {
         int err = pn_process_tpwork_sender(transport, delivery);

Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1405664&r1=1405663&r2=1405664&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Sun Nov  4 23:19:27 2012
@@ -141,13 +141,11 @@ void pn_queue_del(pn_queue_t *queue, pn_
     pn_connection_t *conn =
       pn_session_connection(pn_link_session(pn_delivery_link(delivery)));
     pn_decref(conn);
-    pn_queue_gc(queue);
   }
 }
 
 pn_sequence_t pn_queue_add(pn_queue_t *queue, pn_delivery_t *delivery)
 {
-  pn_queue_gc(queue);
   pn_sequence_t id = queue->hwm++;
   size_t offset = id - queue->lwm;
   PN_ENSUREZ(queue->deliveries, queue->capacity, offset + 1);
@@ -160,6 +158,22 @@ pn_sequence_t pn_queue_add(pn_queue_t *q
   return id;
 }
 
+void pn_queue_slide(pn_queue_t *queue)
+{
+  if (queue->window >= 0) {
+    while (queue->hwm - queue->mwm > queue->window) {
+      pn_delivery_t *d = pn_queue_get(queue, queue->lwm);
+      if (d) {
+        pn_delivery_settle(d);
+        pn_queue_del(queue, d);
+      } else {
+        pn_queue_gc(queue);
+      }
+    }
+  }
+  pn_queue_gc(queue);
+}
+
 int pn_queue_update(pn_queue_t *queue, pn_sequence_t id, pn_status_t status,
                     int flags, bool settle, bool match)
 {
@@ -201,13 +215,7 @@ int pn_queue_update(pn_queue_t *queue, p
     }
   }
 
-  if (queue->window >= 0) {
-    while (queue->hwm - queue->mwm > queue->window) {
-      pn_delivery_t *d = pn_queue_get(queue, queue->lwm);
-      pn_delivery_settle(d);
-      pn_queue_del(queue, d);
-    }
-  }
+  pn_queue_slide(queue);
 
   return 0;
 }
@@ -397,10 +405,16 @@ void pn_messenger_endpoints(pn_messenger
 
   pn_delivery_t *d = pn_work_head(conn);
   while (d) {
+    pn_link_t *link = pn_delivery_link(d);
+    if (pn_delivery_updated(d) && pn_link_is_sender(link)) {
+      pn_delivery_update(d, pn_delivery_remote_state(d));
+    }
     pn_delivery_clear(d);
     d = pn_work_next(d);
   }
 
+  pn_queue_slide(&messenger->outgoing);
+
   if (pn_work_head(conn)) {
     return;
   }
@@ -780,7 +794,7 @@ int pn_messenger_get_outgoing_window(pn_
 
 int pn_messenger_set_outgoing_window(pn_messenger_t *messenger, int window)
 {
-  if (window > PN_SESSION_WINDOW) {
+  if (window >= PN_SESSION_WINDOW) {
     return pn_error_format(messenger->error, PN_ARG_ERR,
                            "specified window (%i) exceeds max (%i)",
                            window, PN_SESSION_WINDOW);
@@ -797,7 +811,7 @@ int pn_messenger_get_incoming_window(pn_
 
 int pn_messenger_set_incoming_window(pn_messenger_t *messenger, int window)
 {
-  if (window > PN_SESSION_WINDOW) {
+  if (window >= PN_SESSION_WINDOW) {
     return pn_error_format(messenger->error, PN_ARG_ERR,
                            "specified window (%i) exceeds max (%i)",
                            window, PN_SESSION_WINDOW);
@@ -973,11 +987,7 @@ bool pn_messenger_rcvd(pn_messenger_t *m
 
 int pn_messenger_send(pn_messenger_t *messenger)
 {
-  int err = pn_messenger_sync(messenger, pn_messenger_sent);
-  if (err) return err;
-  return pn_queue_update(&messenger->outgoing,
-                         messenger->outgoing.hwm - 1,
-                         0, PN_CUMULATIVE, false, true);
+  return pn_messenger_sync(messenger, pn_messenger_sent);
 }
 
 int pn_messenger_recv(pn_messenger_t *messenger, int n)



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to