Repository: qpid-dispatch Updated Branches: refs/heads/master 326d09a89 -> eaf81eaab
DISPATCH-179 - Clean up deliveries that are stranded on closed links Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/eaf81eaa Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/eaf81eaa Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/eaf81eaa Branch: refs/heads/master Commit: eaf81eaab7c59ff72f9a04c2d38f8b4be20a2097 Parents: 326d09a Author: Ted Ross <[email protected]> Authored: Sat Mar 19 10:56:08 2016 -0400 Committer: Ted Ross <[email protected]> Committed: Sat Mar 19 10:56:08 2016 -0400 ---------------------------------------------------------------------- src/router_core/connections.c | 85 +++++++++++++++++++----------- src/router_core/forwarder.c | 7 +-- src/router_core/route_control.c | 5 +- src/router_core/router_core_private.h | 3 ++ src/router_core/transfer.c | 28 ++++++++-- 5 files changed, 86 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/eaf81eaa/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 6a65707..e35f6a1 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -424,6 +424,61 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li } // + // Clean up the lists of deliveries on this link + // + qdr_delivery_ref_list_t updated_deliveries; + qdr_delivery_list_t undelivered; + qdr_delivery_list_t unsettled; + + sys_mutex_lock(conn->work_lock); + DEQ_MOVE(link->updated_deliveries, updated_deliveries); + DEQ_MOVE(link->undelivered, undelivered); + DEQ_MOVE(link->unsettled, unsettled); + sys_mutex_unlock(conn->work_lock); + + // + // Free all the 'updated' references + // + qdr_delivery_ref_t *ref = DEQ_HEAD(updated_deliveries); + while (ref) { + qdr_del_delivery_ref(&updated_deliveries, ref); + ref = DEQ_HEAD(updated_deliveries); + } + + // + // Free the undelivered deliveries. If this is an incoming link, the + // undelivereds can simply be desetroyed. If it's an outgoing link, the + // undelivereds' peer deliveries need to be released. + // + qdr_delivery_t *dlv = DEQ_HEAD(undelivered); + qdr_delivery_t *peer; + while (dlv) { + DEQ_REMOVE_HEAD(undelivered); + peer = dlv->peer; + qdr_delivery_free(dlv); + if (peer) { + peer->peer = 0; + qdr_delivery_release_CT(core, peer); + } + dlv = DEQ_HEAD(undelivered); + } + + // + // Free the unsettled deliveries. + // + dlv = DEQ_HEAD(unsettled); + while (dlv) { + DEQ_REMOVE_HEAD(unsettled); + peer = dlv->peer; + qdr_delivery_free(dlv); + if (peer) { + peer->peer = 0; + qdr_delivery_remove_unsettled_CT(core, peer); + } + dlv = DEQ_HEAD(unsettled); + } + + // // Remove the reference to this link in the connection's reference lists // qdr_del_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION); @@ -991,23 +1046,6 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act break; } } - - // - // Cases to be handled: - // - // dir = Incoming or Outgoing: - // Link is an router-control link - // Note the control link on the connection - // Issue a second attach back to the originating node - // - // dir = Incoming: - // Issue credit for the inbound fifo - // - // dir = Outgoing: - // Link is a router-control link - // Associate the link with the router-hello address - // Associate the link with the link-mask-bit being used by the router - // } @@ -1098,19 +1136,6 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac qdr_terminus_free(source); qdr_terminus_free(target); - - // - // Cases to be handled: - // - // Link is a router-control link: - // Note the control link on the connection - // Associate the link with the router-hello address - // Associate the link with the link-mask-bit being used by the router - // Link is link-routed: - // Propagate the second attach back toward the originating node - // Link is Incoming: - // Issue credit for the inbound fifo - // } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/eaf81eaa/src/router_core/forwarder.c ---------------------------------------------------------------------- diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 02c0bb5..c9752c5 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -137,8 +137,9 @@ int qdr_forward_multicast_CT(qdr_core_t *core, bool exclude_inprocess, bool control) { - bool bypass_valid_origins = addr->forwarder->bypass_valid_origins; - int fanout = 0; + bool bypass_valid_origins = addr->forwarder->bypass_valid_origins; + int fanout = 0; + qd_bitmask_t *link_exclusion = !!in_delivery ? in_delivery->link_exclusion : 0; // // Forward to local subscribers @@ -219,7 +220,7 @@ int qdr_forward_multicast_CT(qdr_core_t *core, dest_link = control ? core->control_links_by_mask_bit[link_bit] : core->data_links_by_mask_bit[link_bit]; - if (dest_link && (!in_delivery->link_exclusion || qd_bitmask_value(in_delivery->link_exclusion, link_bit) == 0)) { + if (dest_link && (!link_exclusion || qd_bitmask_value(link_exclusion, link_bit) == 0)) { qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, dest_link, msg); qdr_forward_deliver_CT(core, dest_link, out_delivery); fanout++; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/eaf81eaa/src/router_core/route_control.c ---------------------------------------------------------------------- diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c index 23bda17..dd9d43f 100644 --- a/src/router_core/route_control.c +++ b/src/router_core/route_control.c @@ -120,10 +120,6 @@ static void qdr_auto_link_activate_CT(qdr_core_t *core, qdr_auto_link_t *al, qdr qdr_route_log_CT(core, "Auto Link Activated", al->name, al->identity, conn); - // - // Activate the link for an auto_link. If this is the first activation for this - // address, notify the router module of the added address. - // if (al->addr) { qdr_terminus_t *source = 0; qdr_terminus_t *target = 0; @@ -147,6 +143,7 @@ static void qdr_auto_link_activate_CT(qdr_core_t *core, qdr_auto_link_t *al, qdr static void qdr_auto_link_deactivate_CT(qdr_core_t *core, qdr_auto_link_t *al, qdr_connection_t *conn) { + //qdr_route_log_CT(core, "Auto Link Deactivated", al->name, al->identity, conn); } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/eaf81eaa/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 458c93f..f6ff913 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -552,6 +552,9 @@ void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action); void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit); void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr); void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv); +void qdr_delivery_free(qdr_delivery_t *delivery); +void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery); +void qdr_delivery_remove_unsettled_CT(qdr_core_t *core, qdr_delivery_t *delivery); void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query); void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/eaf81eaa/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 5ab27cf..47b499b 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -203,6 +203,20 @@ void qdr_delivery_free(qdr_delivery_t *delivery) } +void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery) +{ +} + + +void qdr_delivery_remove_unsettled_CT(qdr_core_t *core, qdr_delivery_t *delivery) +{ + // + // Remove a delivery from its unsettled list. Side effects include issuing + // replacement credit and visiting the link-quiescence algorithm + // +} + + void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disposition, bool settled) { qdr_action_t *action = qdr_action(qdr_update_delivery_CT, "update_delivery"); @@ -309,6 +323,7 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_ } } else if (fanout == 1) { qd_bitmask_free(dlv->link_exclusion); + dlv->link_exclusion = 0; if (dlv->settled) { // // The delivery is settled. Keep it off the unsettled list and issue @@ -330,6 +345,7 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_ // The fanout is greater than one. Do something! TODO // qd_bitmask_free(dlv->link_exclusion); + dlv->link_exclusion = 0; if (presettled) { qdr_link_issue_credit_CT(core, link, 1); @@ -439,20 +455,20 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool push = true; peer->peer = 0; dlv->peer = 0; - if (peer->link && !link_routed) { + if (peer->link) { sys_mutex_lock(peer->link->conn->work_lock); DEQ_REMOVE(peer->link->unsettled, peer); sys_mutex_unlock(peer->link->conn->work_lock); - if (peer->link->link_direction == QD_INCOMING) + if (peer->link->link_direction == QD_INCOMING && !link_routed) qdr_link_issue_credit_CT(core, peer->link, 1); } } - if (dlv->link && !link_routed) { + if (dlv->link) { sys_mutex_lock(dlv->link->conn->work_lock); DEQ_REMOVE(dlv->link->unsettled, dlv); sys_mutex_unlock(dlv->link->conn->work_lock); - if (dlv->link->link_direction == QD_INCOMING) + if (dlv->link->link_direction == QD_INCOMING && !link_routed) qdr_link_issue_credit_CT(core, dlv->link, 1); } @@ -506,6 +522,9 @@ void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit) */ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr) { + // + // If there aren't any inlinks, there's no point in proceeding. + // if (DEQ_SIZE(addr->inlinks) == 0) return; @@ -524,7 +543,6 @@ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr) // Drain undelivered deliveries via the forwarder // if (DEQ_SIZE(link->undelivered) > 0) { - // // Move all the undelivered to a local list in case not all can be delivered. // We don't want to loop here forever putting the same messages on the undelivered --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
