Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 326d09a89 -> eaf81eaab


DISPATCH-179 - Clean up deliveries that are stranded on closed links


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/eaf81eaa
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/eaf81eaa
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/eaf81eaa

Branch: refs/heads/master
Commit: eaf81eaab7c59ff72f9a04c2d38f8b4be20a2097
Parents: 326d09a
Author: Ted Ross <[email protected]>
Authored: Sat Mar 19 10:56:08 2016 -0400
Committer: Ted Ross <[email protected]>
Committed: Sat Mar 19 10:56:08 2016 -0400

----------------------------------------------------------------------
 src/router_core/connections.c         | 85 +++++++++++++++++++-----------
 src/router_core/forwarder.c           |  7 +--
 src/router_core/route_control.c       |  5 +-
 src/router_core/router_core_private.h |  3 ++
 src/router_core/transfer.c            | 28 ++++++++--
 5 files changed, 86 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/eaf81eaa/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 6a65707..e35f6a1 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -424,6 +424,61 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, 
qdr_connection_t *conn, qdr_li
     }
 
     //
+    // Clean up the lists of deliveries on this link
+    //
+    qdr_delivery_ref_list_t updated_deliveries;
+    qdr_delivery_list_t     undelivered;
+    qdr_delivery_list_t     unsettled;
+
+    sys_mutex_lock(conn->work_lock);
+    DEQ_MOVE(link->updated_deliveries, updated_deliveries);
+    DEQ_MOVE(link->undelivered, undelivered);
+    DEQ_MOVE(link->unsettled, unsettled);
+    sys_mutex_unlock(conn->work_lock);
+
+    //
+    // Free all the 'updated' references
+    //
+    qdr_delivery_ref_t *ref = DEQ_HEAD(updated_deliveries);
+    while (ref) {
+        qdr_del_delivery_ref(&updated_deliveries, ref);
+        ref = DEQ_HEAD(updated_deliveries);
+    }
+
+    //
+    // Free the undelivered deliveries.  If this is an incoming link, the
+    // undelivereds can simply be desetroyed.  If it's an outgoing link, the
+    // undelivereds' peer deliveries need to be released.
+    //
+    qdr_delivery_t *dlv = DEQ_HEAD(undelivered);
+    qdr_delivery_t *peer;
+    while (dlv) {
+        DEQ_REMOVE_HEAD(undelivered);
+        peer = dlv->peer;
+        qdr_delivery_free(dlv);
+        if (peer) {
+            peer->peer = 0;
+            qdr_delivery_release_CT(core, peer);
+        }
+        dlv = DEQ_HEAD(undelivered);
+    }
+
+    //
+    // Free the unsettled deliveries.
+    //
+    dlv = DEQ_HEAD(unsettled);
+    while (dlv) {
+        DEQ_REMOVE_HEAD(unsettled);
+        peer = dlv->peer;
+        qdr_delivery_free(dlv);
+        if (peer) {
+            peer->peer = 0;
+            qdr_delivery_remove_unsettled_CT(core, peer);
+        }
+        dlv = DEQ_HEAD(unsettled);
+    }
+
+    //
     // Remove the reference to this link in the connection's reference lists
     //
     qdr_del_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
@@ -991,23 +1046,6 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t 
*core, qdr_action_t *act
             break;
         }
     }
-
-    //
-    // Cases to be handled:
-    //
-    // dir = Incoming or Outgoing:
-    //    Link is an router-control link
-    //       Note the control link on the connection
-    //       Issue a second attach back to the originating node
-    //
-    // dir = Incoming:
-    //    Issue credit for the inbound fifo
-    //
-    // dir = Outgoing:
-    //    Link is a router-control link
-    //       Associate the link with the router-hello address
-    //       Associate the link with the link-mask-bit being used by the router
-    //
 }
 
 
@@ -1098,19 +1136,6 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t 
*core, qdr_action_t *ac
 
     qdr_terminus_free(source);
     qdr_terminus_free(target);
-
-    //
-    // Cases to be handled:
-    //
-    // Link is a router-control link:
-    //    Note the control link on the connection
-    //    Associate the link with the router-hello address
-    //    Associate the link with the link-mask-bit being used by the router
-    // Link is link-routed:
-    //    Propagate the second attach back toward the originating node
-    // Link is Incoming:
-    //    Issue credit for the inbound fifo
-    //
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/eaf81eaa/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 02c0bb5..c9752c5 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -137,8 +137,9 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
                              bool             exclude_inprocess,
                              bool             control)
 {
-    bool bypass_valid_origins = addr->forwarder->bypass_valid_origins;
-    int  fanout = 0;
+    bool          bypass_valid_origins = addr->forwarder->bypass_valid_origins;
+    int           fanout               = 0;
+    qd_bitmask_t *link_exclusion       = !!in_delivery ? 
in_delivery->link_exclusion : 0;
 
     //
     // Forward to local subscribers
@@ -219,7 +220,7 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
             dest_link = control ?
                 core->control_links_by_mask_bit[link_bit] :
                 core->data_links_by_mask_bit[link_bit];
-            if (dest_link && (!in_delivery->link_exclusion || 
qd_bitmask_value(in_delivery->link_exclusion, link_bit) == 0)) {
+            if (dest_link && (!link_exclusion || 
qd_bitmask_value(link_exclusion, link_bit) == 0)) {
                 qdr_delivery_t *out_delivery = 
qdr_forward_new_delivery_CT(core, in_delivery, dest_link, msg);
                 qdr_forward_deliver_CT(core, dest_link, out_delivery);
                 fanout++;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/eaf81eaa/src/router_core/route_control.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c
index 23bda17..dd9d43f 100644
--- a/src/router_core/route_control.c
+++ b/src/router_core/route_control.c
@@ -120,10 +120,6 @@ static void qdr_auto_link_activate_CT(qdr_core_t *core, 
qdr_auto_link_t *al, qdr
 
     qdr_route_log_CT(core, "Auto Link Activated", al->name, al->identity, 
conn);
 
-    //
-    // Activate the link for an auto_link.  If this is the first activation 
for this
-    // address, notify the router module of the added address.
-    //
     if (al->addr) {
         qdr_terminus_t *source = 0;
         qdr_terminus_t *target = 0;
@@ -147,6 +143,7 @@ static void qdr_auto_link_activate_CT(qdr_core_t *core, 
qdr_auto_link_t *al, qdr
 
 static void qdr_auto_link_deactivate_CT(qdr_core_t *core, qdr_auto_link_t *al, 
qdr_connection_t *conn)
 {
+    //qdr_route_log_CT(core, "Auto Link Deactivated", al->name, al->identity, 
conn);
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/eaf81eaa/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index 458c93f..f6ff913 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -552,6 +552,9 @@ void qdr_action_enqueue(qdr_core_t *core, qdr_action_t 
*action);
 void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit);
 void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr);
 void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv);
+void qdr_delivery_free(qdr_delivery_t *delivery);
+void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery);
+void qdr_delivery_remove_unsettled_CT(qdr_core_t *core, qdr_delivery_t 
*delivery);
 void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query);
 
 void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/eaf81eaa/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 5ab27cf..47b499b 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -203,6 +203,20 @@ void qdr_delivery_free(qdr_delivery_t *delivery)
 }
 
 
+void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery)
+{
+}
+
+
+void qdr_delivery_remove_unsettled_CT(qdr_core_t *core, qdr_delivery_t 
*delivery)
+{
+    //
+    // Remove a delivery from its unsettled list.  Side effects include issuing
+    // replacement credit and visiting the link-quiescence algorithm
+    //
+}
+
+
 void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t 
*delivery, uint64_t disposition, bool settled)
 {
     qdr_action_t *action = qdr_action(qdr_update_delivery_CT, 
"update_delivery");
@@ -309,6 +323,7 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t 
*link, qdr_delivery_
         }
     } else if (fanout == 1) {
         qd_bitmask_free(dlv->link_exclusion);
+        dlv->link_exclusion = 0;
         if (dlv->settled) {
             //
             // The delivery is settled.  Keep it off the unsettled list and 
issue
@@ -330,6 +345,7 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t 
*link, qdr_delivery_
         // The fanout is greater than one.  Do something!  TODO
         //
         qd_bitmask_free(dlv->link_exclusion);
+        dlv->link_exclusion = 0;
 
         if (presettled) {
             qdr_link_issue_credit_CT(core, link, 1);
@@ -439,20 +455,20 @@ static void qdr_update_delivery_CT(qdr_core_t *core, 
qdr_action_t *action, bool
             push = true;
             peer->peer = 0;
             dlv->peer  = 0;
-            if (peer->link && !link_routed) {
+            if (peer->link) {
                 sys_mutex_lock(peer->link->conn->work_lock);
                 DEQ_REMOVE(peer->link->unsettled, peer);
                 sys_mutex_unlock(peer->link->conn->work_lock);
-                if (peer->link->link_direction == QD_INCOMING)
+                if (peer->link->link_direction == QD_INCOMING && !link_routed)
                     qdr_link_issue_credit_CT(core, peer->link, 1);
             }
         }
 
-        if (dlv->link && !link_routed) {
+        if (dlv->link) {
             sys_mutex_lock(dlv->link->conn->work_lock);
             DEQ_REMOVE(dlv->link->unsettled, dlv);
             sys_mutex_unlock(dlv->link->conn->work_lock);
-            if (dlv->link->link_direction == QD_INCOMING)
+            if (dlv->link->link_direction == QD_INCOMING && !link_routed)
                 qdr_link_issue_credit_CT(core, dlv->link, 1);
         }
 
@@ -506,6 +522,9 @@ void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t 
*link, int credit)
  */
 void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr)
 {
+    //
+    // If there aren't any inlinks, there's no point in proceeding.
+    //
     if (DEQ_SIZE(addr->inlinks) == 0)
         return;
 
@@ -524,7 +543,6 @@ void qdr_addr_start_inlinks_CT(qdr_core_t *core, 
qdr_address_t *addr)
             // Drain undelivered deliveries via the forwarder
             //
             if (DEQ_SIZE(link->undelivered) > 0) {
-
                 //
                 // Move all the undelivered to a local list in case not all 
can be delivered.
                 // We don't want to loop here forever putting the same 
messages on the undelivered


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to