DISPATCH-57 - Fixed balanced forwarding so it a) wont involve "full" links, and b) will honor the valid-origin for messages when choosing a path to the destination, and c) uses the router cost as a threshold for using inter-router links.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/8fa96352 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/8fa96352 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/8fa96352 Branch: refs/heads/master Commit: 8fa9635252abac997266f2388ba5a5844f34e88b Parents: 704003c Author: Ted Ross <[email protected]> Authored: Tue May 3 21:59:48 2016 -0400 Committer: Ted Ross <[email protected]> Committed: Tue May 3 22:05:48 2016 -0400 ---------------------------------------------------------------------- src/router_core/connections.c | 10 ++- src/router_core/forwarder.c | 126 ++++++++++++++++++++--------- src/router_core/route_tables.c | 1 - src/router_core/router_core_private.h | 12 ++- src/router_core/transfer.c | 7 ++ 5 files changed, 111 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8fa96352/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 69379b1..55638eb 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -276,7 +276,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, link->name = (char*) malloc(strlen(name) + 1); strcpy(link->name, name); link->link_direction = dir; - link->capacity = dir == QD_INCOMING ? conn->link_capacity : 0; + link->capacity = conn->link_capacity; link->admin_enabled = true; link->oper_status = QDR_LINK_OPER_DOWN; @@ -691,12 +691,16 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local) // deleted. // if (DEQ_SIZE(addr->subscriptions) == 0 && DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->inlinks) == 0 && - qd_bitmask_cardinality(addr->rnodes) == 0 && addr->ref_count == 0 && !addr->block_deletion) { + qd_bitmask_cardinality(addr->rnodes) == 0 && addr->ref_count == 0 && !addr->block_deletion && + addr->tracked_deliveries == 0) { qd_hash_remove_by_handle(core->addr_hash, addr->hash_handle); DEQ_REMOVE(core->addrs, addr); qd_hash_handle_free(addr->hash_handle); qd_bitmask_free(addr->rnodes); - qd_bitmask_free(addr->closest_remotes); + if (addr->treatment == QD_TREATMENT_ANYCAST_CLOSEST) + qd_bitmask_free(addr->closest_remotes); + else if (addr->treatment == QD_TREATMENT_ANYCAST_BALANCED) + free(addr->outstanding_deliveries); free_qdr_address_t(addr); } } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8fa96352/src/router_core/forwarder.c ---------------------------------------------------------------------- diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index b82a6e5..836bc02 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -371,18 +371,18 @@ int qdr_forward_closest_CT(qdr_core_t *core, } // - // Forward to remote routers with subscribers using the appropriate - // link for the traffic class: control or data - // - qdr_node_t *next_node; - - // // If the cached list of closest remotes is stale (i.e. cost data has changed), // recompute the closest remote routers. // if (addr->cost_epoch != core->cost_epoch) qdr_forward_find_closest_remotes_CT(core, addr); + // + // Forward to remote routers with subscribers using the appropriate + // link for the traffic class: control or data + // + qdr_node_t *next_node; + if (addr->next_remote >= 0) { qdr_node_t *rnode = core->routers_by_mask_bit[addr->next_remote]; if (rnode) { @@ -416,13 +416,29 @@ int qdr_forward_balanced_CT(qdr_core_t *core, bool exclude_inprocess, bool control) { - qdr_link_t *out_link = 0; - uint32_t link_backlog = UINT32_MAX; - bool transit = false; + // + // Control messages should never use balanced treatment. + // + assert(!control); + + // + // If this is the first time through here, allocate the array for outstanding delivery counts. + // + if (addr->outstanding_deliveries == 0) { + addr->outstanding_deliveries = NEW_ARRAY(int, qd_bitmask_width()); + for (int i = 0; i < qd_bitmask_width(); i++) + addr->outstanding_deliveries[i] = 0; + } + + qdr_link_t *chosen_link = 0; + int chosen_link_bit = -1; + uint32_t link_value = UINT32_MAX; + bool transit = false; // // Find all the possible outbound links for this delivery, searching for the one with the - // smallest backlog. + // smallest eligible value. Value = outstanding_deliveries + minimum_downrange_cost. + // A link is ineligible if the outstanding_deliveries is equal to the link's capacity. // // @@ -430,51 +446,81 @@ int qdr_forward_balanced_CT(qdr_core_t *core, // qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks); while (link_ref) { - qdr_link_t *link = link_ref->link; - uint32_t backlog = DEQ_SIZE(link->undelivered) + DEQ_SIZE(link->unsettled); + qdr_link_t *link = link_ref->link; + uint32_t value = DEQ_SIZE(link->undelivered) + DEQ_SIZE(link->unsettled); + bool eligible = link->capacity > value; - if (!out_link || link_backlog > backlog) { - out_link = link; - link_backlog = backlog; + // + // If this is the best eligible link thus far, choose it. + // + if (eligible && link_value > value) { + chosen_link = link; + link_value = value; } link_ref = DEQ_NEXT(link_ref); } - if (!out_link || link_backlog > 0) { + // + // If we haven't already found a link with zero (best possible) value, check the + // inter-router links as well. + // + if (!chosen_link || link_value > 0) { // - // If we haven't already found a link with zero backlog, check the - // remotes as well. + // Get the mask bit associated with the ingress router for the message. + // This will be compared against the "valid_origin" masks for each + // candidate destination router. // - int router_bit; - int c; - qdr_node_t *next_node; - - for (QD_BITMASK_EACH(addr->rnodes, router_bit, c)) { - qdr_node_t *rnode = core->routers_by_mask_bit[router_bit]; - if (rnode) { - if (rnode->next_hop) - next_node = rnode->next_hop; - else - next_node = rnode; + int origin = 0; + qd_field_iterator_t *ingress_iter = in_delivery ? in_delivery->origin : 0; + + if (ingress_iter) { + qd_address_iterator_reset_view(ingress_iter, ITER_VIEW_NODE_HASH); + qdr_address_t *origin_addr; + qd_hash_retrieve(core->addr_hash, ingress_iter, (void*) &origin_addr); + if (origin_addr && qd_bitmask_cardinality(origin_addr->rnodes) == 1) + qd_bitmask_first_set(origin_addr->rnodes, &origin); + } - qdr_link_t *link = control ? next_node->peer_control_link : next_node->peer_data_link; - if (link) { - uint32_t backlog = DEQ_SIZE(link->undelivered) + DEQ_SIZE(link->unsettled); - if (backlog < link_backlog) { - out_link = link; - link_backlog = backlog; - transit = true; - } + int c; + int node_bit; + for (QD_BITMASK_EACH(addr->rnodes, node_bit, c)) { + qdr_node_t *rnode = core->routers_by_mask_bit[node_bit]; + qdr_node_t *next_node = rnode->next_hop ? rnode->next_hop : rnode; + qdr_link_t *link = next_node->peer_data_link; + int link_bit = link->conn->mask_bit; + int value = addr->outstanding_deliveries[link_bit]; + if (value < link->capacity && qd_bitmask_value(rnode->valid_origins, origin)) { + // + // Link is eligible, adjust the value by the bias (node cost). + // + value += rnode->cost; + if (link_value > value) { + chosen_link = link; + chosen_link_bit = link_bit; + link_value = value; + transit = true; } } } } - if (out_link) { - qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg); - qdr_forward_deliver_CT(core, out_link, out_delivery); + if (chosen_link) { + qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, chosen_link, msg); + qdr_forward_deliver_CT(core, chosen_link, out_delivery); + + // + // If the delivery is unsettled and the link is inter-router, account for the outstanding delivery. + // + if (!in_delivery->settled && chosen_link_bit >= 0) { + addr->outstanding_deliveries[chosen_link_bit]++; + out_delivery->tracking_addr = addr; + addr->tracked_deliveries++; + } + // + // Bump the appropriate counter based on where we sent the delivery. + // if (transit) addr->deliveries_transit++; else http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8fa96352/src/router_core/route_tables.c ---------------------------------------------------------------------- diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c index 8f59d39..d8fd860 100644 --- a/src/router_core/route_tables.c +++ b/src/router_core/route_tables.c @@ -389,7 +389,6 @@ static void qdr_del_router_CT(qdr_core_t *core, qdr_action_t *action, bool disca DEQ_REMOVE(core->addrs, oaddr); qd_hash_handle_free(oaddr->hash_handle); core->routers_by_mask_bit[router_maskbit] = 0; - qd_bitmask_free(oaddr->closest_remotes); qd_bitmask_free(oaddr->rnodes); free_qdr_address_t(oaddr); } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8fa96352/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index a3e0b17..36e3ec0 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -210,6 +210,7 @@ struct qdr_delivery_t { uint8_t tag[32]; int tag_length; qd_bitmask_t *link_exclusion; + qdr_address_t *tracking_addr; }; ALLOC_DECLARE(qdr_delivery_t); @@ -320,11 +321,20 @@ struct qdr_address_t { int ref_count; ///< Number of link-routes + auto-links referencing this address bool block_deletion; bool local; + uint32_t tracked_deliveries; + uint64_t cost_epoch; - uint64_t cost_epoch; + // + // State for "closest" treatment + // qd_bitmask_t *closest_remotes; int next_remote; + // + // State for "balanced" treatment + // + int *outstanding_deliveries; + /**@name Statistics */ ///@{ uint64_t deliveries_ingress; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8fa96352/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index ffe11c9..c79b089 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -295,6 +295,13 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv) if (link->link_direction == QD_OUTGOING) sys_mutex_unlock(conn->work_lock); + if (dlv->tracking_addr) { + int link_bit = link->conn->mask_bit; + dlv->tracking_addr->outstanding_deliveries[link_bit]--; + dlv->tracking_addr->tracked_deliveries--; + dlv->tracking_addr = 0; + } + // // If this is an incoming link and it is not link-routed, issue // one replacement credit on the link. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
