Repository: qpid-dispatch Updated Branches: refs/heads/master 73e238660 -> 8475aca8f
DISPATCH-57 - Implemented "balanced" delivery based on lowest number of unsettled deliveries. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/8475aca8 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/8475aca8 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/8475aca8 Branch: refs/heads/master Commit: 8475aca8f61e52128df3f932db1d880129196ea9 Parents: 73e2386 Author: Ted Ross <tr...@redhat.com> Authored: Tue Mar 29 09:19:46 2016 -0400 Committer: Ted Ross <tr...@redhat.com> Committed: Tue Mar 29 09:19:46 2016 -0400 ---------------------------------------------------------------------- src/router_core/forwarder.c | 70 ++++++++++++++++++++++++++++-- src/router_core/router_core_private.h | 7 --- 2 files changed, 67 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8475aca8/src/router_core/forwarder.c ---------------------------------------------------------------------- diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 8e41995..52789b1 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -340,8 +340,6 @@ int qdr_forward_closest_CT(qdr_core_t *core, } } - - return 0; } @@ -353,7 +351,73 @@ int qdr_forward_balanced_CT(qdr_core_t *core, bool exclude_inprocess, bool control) { - return qdr_forward_closest_CT(core, addr, msg, in_delivery, exclude_inprocess, control); + qdr_link_t *out_link = 0; + uint32_t link_backlog; + bool transit = false; + + // + // Find all the possible outbound links for this delivery, searching for the one with the + // smallest backlog. + // + + // + // Start with the local links + // + qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks); + while (link_ref) { + qdr_link_t *link = link_ref->link; + uint32_t backlog = DEQ_SIZE(link->undelivered) + DEQ_SIZE(link->unsettled); + + if (!out_link || link_backlog > backlog) { + out_link = link; + link_backlog = backlog; + } + + link_ref = DEQ_NEXT(link_ref); + } + + if (!out_link || link_backlog > 0) { + // + // If we haven't already found a link with zero backlog, check the + // remotes as well. + // + int router_bit; + int c; + qdr_node_t *next_node; + + for (QD_BITMASK_EACH(addr->rnodes, router_bit, c)) { + qdr_node_t *rnode = core->routers_by_mask_bit[router_bit]; + if (rnode) { + if (rnode->next_hop) + next_node = rnode->next_hop; + else + next_node = rnode; + + qdr_link_t *link = control ? next_node->peer_control_link : next_node->peer_data_link; + if (link) { + uint32_t backlog = DEQ_SIZE(link->undelivered) + DEQ_SIZE(link->unsettled); + if (backlog < link_backlog) { + out_link = link; + link_backlog = backlog; + transit = true; + } + } + } + } + } + + if (out_link) { + qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg); + qdr_forward_deliver_CT(core, out_link, out_delivery); + + if (transit) + addr->deliveries_transit++; + else + addr->deliveries_egress++; + return 1; + } + + return 0; } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8475aca8/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 29de0f6..1a61620 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -314,7 +314,6 @@ struct qdr_address_t { qd_address_treatment_t treatment; qdr_forwarder_t *forwarder; int ref_count; ///< Number of link-routes + auto-links referencing this address - bool toggle; bool block_deletion; bool local; @@ -557,12 +556,6 @@ struct qdr_core_t { qdr_forwarder_t *forwarders[QD_TREATMENT_LINK_BALANCED + 1]; }; -typedef enum { - PASSTHROUGH, - TAP, - BYPASS -} qdr_waypoint_mode_t; - void *router_core_thread(void *arg); uint64_t qdr_identifier(qdr_core_t* core); void qdr_management_agent_on_message(void *context, qd_message_t *msg, int unused_link_id); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org