Repository: qpid-dispatch Updated Branches: refs/heads/tross-DISPATCH-179-1 77281d629 -> 1aa7fece4
DISPATCH-179 - Added multicast forwarder into core. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/1aa7fece Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/1aa7fece Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/1aa7fece Branch: refs/heads/tross-DISPATCH-179-1 Commit: 1aa7fece432fd2572f45f0379c84db2ebb69e88a Parents: 77281d6 Author: Ted Ross <[email protected]> Authored: Thu Jan 7 16:56:08 2016 -0500 Committer: Ted Ross <[email protected]> Committed: Thu Jan 7 16:56:08 2016 -0500 ---------------------------------------------------------------------- include/qpid/dispatch/router_core.h | 12 +- src/router_core/agent_link.c | 4 +- src/router_core/connections.c | 4 +- src/router_core/forwarder.c | 174 ++++++++++++++++++++++++----- src/router_core/router_core.c | 1 + src/router_core/router_core_private.h | 46 ++++---- src/router_core/transfer.c | 76 +++++++++++-- src/router_node.c | 12 +- 8 files changed, 259 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1aa7fece/include/qpid/dispatch/router_core.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 25727c2..20840cd 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -474,11 +474,12 @@ void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error); * @param ingress Field iterator referencing the value of the ingress-router header. NOTE: This * iterator is assumed to reference content in the message that will stay valid * through the lifetime of the message. + * @param settled True iff the delivery is pre-settled. * @return Pointer to the qdr_delivery that will track the lifecycle of this delivery on this link. */ -qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_field_iterator_t *ingress); +qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_field_iterator_t *ingress, bool settled); qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg, - qd_field_iterator_t *ingress, qd_field_iterator_t *addr); + qd_field_iterator_t *ingress, qd_field_iterator_t *addr, bool settled); qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg); typedef void (*qdr_link_first_attach_t) (void *context, qdr_connection_t *conn, qdr_link_t *link, @@ -486,13 +487,15 @@ typedef void (*qdr_link_first_attach_t) (void *context, qdr_connection_t *conn, typedef void (*qdr_link_second_attach_t) (void *context, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target); typedef void (*qdr_link_detach_t) (void *context, qdr_link_t *link, qdr_error_t *error); +typedef void (*qdr_link_flow_t) (void *context, qdr_link_t *link); void qdr_connection_handlers(qdr_core_t *core, void *context, qdr_connection_activate_t activate, qdr_link_first_attach_t first_attach, qdr_link_second_attach_t second_attach, - qdr_link_detach_t detach); + qdr_link_detach_t detach, + qdr_link_flow_t flow); /** ****************************************************************************** @@ -501,6 +504,9 @@ void qdr_connection_handlers(qdr_core_t *core, */ void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context); void *qdr_delivery_get_context(qdr_delivery_t *delivery); +uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery); +bool qdr_delivery_is_settled(const qdr_delivery_t *delivery); + void qdr_delivery_update_disposition(qdr_delivery_t *delivery); void qdr_delivery_update_flow(qdr_delivery_t *delivery); void qdr_delivery_process(qdr_delivery_t *delivery); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1aa7fece/src/router_core/agent_link.c ---------------------------------------------------------------------- diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c index 350a717..e3cc484 100644 --- a/src/router_core/agent_link.c +++ b/src/router_core/agent_link.c @@ -75,11 +75,11 @@ static void qdr_agent_write_link_CT(qdr_query_t *query, qdr_link_t *link ) break; case QDR_LINK_MSG_FIFO_DEPTH: - qd_compose_insert_ulong(body, DEQ_SIZE(link->msg_fifo)); + qd_compose_insert_ulong(body, 0); // FIXME break; case QDR_LINK_EVENT_FIFO_DEPTH: - qd_compose_insert_ulong(body, DEQ_SIZE(link->event_fifo)); + qd_compose_insert_ulong(body, 0); // FIXME break; default: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1aa7fece/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 01b658b..f93051e 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -267,13 +267,15 @@ void qdr_connection_handlers(qdr_core_t *core, qdr_connection_activate_t activate, qdr_link_first_attach_t first_attach, qdr_link_second_attach_t second_attach, - qdr_link_detach_t detach) + qdr_link_detach_t detach, + qdr_link_flow_t flow) { core->user_context = context; core->activate_handler = activate; core->first_attach_handler = first_attach; core->second_attach_handler = second_attach; core->detach_handler = detach; + core->flow_handler = flow; } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1aa7fece/src/router_core/forwarder.c ---------------------------------------------------------------------- diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 29d4c42..a7400af 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -21,12 +21,16 @@ #include <qpid/dispatch/amqp.h> #include <stdio.h> -typedef void (*qdr_forward_message_t) (qdr_core_t *core, - qdr_address_t *addr, - qd_message_t *msg, - qdr_delivery_t *in_delivery, - bool exclude_inprocess, - bool control); +// +// NOTE: If the in_delivery argument is NULL, the resulting out deliveries +// shall be pre-settled. +// +typedef int (*qdr_forward_message_t) (qdr_core_t *core, + qdr_address_t *addr, + qd_message_t *msg, + qdr_delivery_t *in_delivery, + bool exclude_inprocess, + bool control); typedef void (*qdr_forward_attach_t) (qdr_core_t *core, qdr_forwarder_t *forw, @@ -42,34 +46,148 @@ struct qdr_forwarder_t { // Built-in Forwarders //================================================================================== -void qdr_forward_multicast_CT(qdr_core_t *core, - qdr_address_t *addr, - qd_message_t *msg, - qdr_delivery_t *in_delivery, - bool exclude_inprocess, - bool control) + +qdr_delivery_t *qdr_forward_new_delivery(qdr_delivery_t *peer, qdr_link_t *link, qd_message_t *msg) { - //bool bypass_valid_origins = addr->forwarder->bypass_valid_origins; -} + qdr_delivery_t *dlv = new_qdr_delivery_t(); + ZERO(dlv); + dlv->link = link; + dlv->peer = peer; + dlv->msg = qd_message_copy(msg); + dlv->settled = !peer || peer->settled; -void qdr_forward_closest_CT(qdr_core_t *core, - qdr_address_t *addr, - qd_message_t *msg, - qdr_delivery_t *in_delivery, - bool exclude_inprocess, - bool control) -{ + if (peer->peer == 0) + peer->peer = dlv; // TODO - make this a back-list for multicast tracking + + return dlv; } -void qdr_forward_balanced_CT(qdr_core_t *core, +int qdr_forward_multicast_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery, bool exclude_inprocess, bool control) { + bool bypass_valid_origins = addr->forwarder->bypass_valid_origins; + int fanout = 0; + + // + // Forward to local subscribers + // + qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks); + while (link_ref) { + qdr_link_t *out_link = link_ref->link; + qdr_delivery_t *out_delivery = qdr_forward_new_delivery(in_delivery, out_link, msg); + DEQ_INSERT_TAIL(out_link->undelivered, out_delivery); // TODO - check locking on this list + // TODO activate the connection for the out link + fanout++; + link_ref = DEQ_NEXT(link_ref); + } + + // + // Forward to remote routers with subscribers using the appropriate + // link for the traffic class: control or data + // + // + // Get the mask bit associated with the ingress router for the message. + // This will be compared against the "valid_origin" masks for each + // candidate destination router. + // + int origin = -1; + qd_field_iterator_t *ingress_iter = in_delivery ? in_delivery->origin : 0; + + if (ingress_iter && !bypass_valid_origins) { + qd_address_iterator_reset_view(ingress_iter, ITER_VIEW_NODE_HASH); + qdr_address_t *origin_addr; + qd_hash_retrieve(core->addr_hash, ingress_iter, (void*) &origin_addr); + if (origin_addr && DEQ_SIZE(origin_addr->rnodes) == 1) { + qdr_router_ref_t *rref = DEQ_HEAD(origin_addr->rnodes); + origin = rref->router->mask_bit; + } + } else + origin = 0; + + // + // Forward to the next-hops for remote destinations. + // + if (origin >= 0) { + qdr_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes); + qdr_link_t *dest_link; + qdr_node_t *next_node; + qd_bitmask_t *link_set = qd_bitmask(0); + + // + // Loop over the target nodes for this address. Build a set of outgoing links + // for which there are valid targets. We do this to avoid sending more than one + // message down a given link. It's possible that there are multiple destinations + // for this address that are all reachable over the same link. In this case, we + // will send only one copy of the message over the link and allow a downstream + // router to fan the message out. + // + while (dest_node_ref) { + if (dest_node_ref->router->next_hop) + next_node = dest_node_ref->router->next_hop; + else + next_node = dest_node_ref->router; + dest_link = control ? next_node->peer_control_link : next_node->peer_data_link; + if (dest_link && qd_bitmask_value(dest_node_ref->router->valid_origins, origin)) + qd_bitmask_set_bit(link_set, dest_link->conn->mask_bit); + dest_node_ref = DEQ_NEXT(dest_node_ref); + } + + // + // Send a copy of the message outbound on each identified link. + // + int link_bit; + while (qd_bitmask_first_set(link_set, &link_bit)) { + qd_bitmask_clear_bit(link_set, link_bit); + dest_link = control ? + core->control_links_by_mask_bit[link_bit] : + core->data_links_by_mask_bit[link_bit]; + if (dest_link) { + qdr_delivery_t *out_delivery = qdr_forward_new_delivery(in_delivery, dest_link, msg); + DEQ_INSERT_TAIL(dest_link->undelivered, out_delivery); // TODO - check locking on this list + fanout++; + addr->deliveries_transit++; + // TODO - Activate link's connection + } + } + + qd_bitmask_free(link_set); + } + + if (!exclude_inprocess) { + // + // Forward to in-process subscribers + // + } + + return fanout; +} + + +int qdr_forward_closest_CT(qdr_core_t *core, + qdr_address_t *addr, + qd_message_t *msg, + qdr_delivery_t *in_delivery, + bool exclude_inprocess, + bool control) +{ + return 0; +} + + +int qdr_forward_balanced_CT(qdr_core_t *core, + qdr_address_t *addr, + qd_message_t *msg, + qdr_delivery_t *in_delivery, + bool exclude_inprocess, + bool control) +{ + return 0; } @@ -121,14 +239,14 @@ qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_semantics_t seman } -void qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery, - bool exclude_inprocess, bool control) +int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery, + bool exclude_inprocess, bool control) { if (addr->forwarder) - addr->forwarder->forward_message(core, addr, msg, in_delivery, exclude_inprocess, control); - else { - // TODO - Deal with this delivery's disposition - } + return addr->forwarder->forward_message(core, addr, msg, in_delivery, exclude_inprocess, control); + + // TODO - Deal with this delivery's disposition + return 0; } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1aa7fece/src/router_core/router_core.c ---------------------------------------------------------------------- diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index c043e09..359a192 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -23,6 +23,7 @@ ALLOC_DEFINE(qdr_query_t); ALLOC_DEFINE(qdr_address_t); ALLOC_DEFINE(qdr_node_t); +ALLOC_DEFINE(qdr_delivery_t); ALLOC_DEFINE(qdr_link_t); ALLOC_DEFINE(qdr_router_ref_t); ALLOC_DEFINE(qdr_link_ref_t); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1aa7fece/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index bc94b68..aa9f3ee 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -34,8 +34,8 @@ typedef struct qdr_lrp_ref_t qdr_lrp_ref_t; typedef struct qdr_forwarder_t qdr_forwarder_t; qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_semantics_t semantics); -void qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery, - bool exclude_inprocess, bool control); +int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery, + bool exclude_inprocess, bool control); void qdr_forward_attach_CT(qdr_core_t *core, qdr_forwarder_t *forwarder, qdr_link_t *in_link); /** @@ -165,11 +165,14 @@ DEQ_DECLARE(qdr_router_ref_t, qdr_router_ref_list_t); struct qdr_delivery_t { DEQ_LINKS(qdr_delivery_t); - void *context; - qdr_link_t *link; - qdr_delivery_t *peer; - uint64_t disposition; - bool settled; + void *context; + qdr_link_t *link; + qdr_delivery_t *peer; + qd_message_t *msg; + qd_field_iterator_t *to_addr; + qd_field_iterator_t *origin; + uint64_t disposition; + bool settled; }; ALLOC_DECLARE(qdr_delivery_t); @@ -178,21 +181,19 @@ DEQ_DECLARE(qdr_delivery_t, qdr_delivery_list_t); struct qdr_link_t { DEQ_LINKS(qdr_link_t); - qdr_core_t *core; - void *user_context; - qdr_connection_t *conn; ///< [ref] Connection that owns this link - qd_link_type_t link_type; - qd_direction_t link_direction; - char *name; - qdr_address_t *owning_addr; ///< [ref] Address record that owns this link - //qd_waypoint_t *waypoint; ///< [ref] Waypoint that owns this link - qdr_link_t *connected_link; ///< [ref] If this is a link-route, reference the connected link - qdr_link_ref_t *ref; ///< Pointer to a containing reference object - qd_routed_event_list_t event_fifo; ///< FIFO of outgoing delivery/link events (no messages) - qd_routed_event_list_t msg_fifo; ///< FIFO of incoming or outgoing message deliveries - qd_router_delivery_list_t deliveries; ///< [own] outstanding unsettled deliveries - bool strip_annotations_in; - bool strip_annotations_out; + qdr_core_t *core; + void *user_context; + qdr_connection_t *conn; ///< [ref] Connection that owns this link + qd_link_type_t link_type; + qd_direction_t link_direction; + char *name; + qdr_address_t *owning_addr; ///< [ref] Address record that owns this link + qdr_link_t *connected_link; ///< [ref] If this is a link-route, reference the connected link + qdr_link_ref_t *ref; ///< Pointer to a containing reference object (TODO - check this!) + qdr_delivery_list_t undelivered; ///< Deliveries to be forwarded or sent + qdr_delivery_list_t unsettled; ///< Unsettled deliveries + bool strip_annotations_in; + bool strip_annotations_out; }; ALLOC_DECLARE(qdr_link_t); @@ -380,6 +381,7 @@ struct qdr_core_t { qdr_link_first_attach_t first_attach_handler; qdr_link_second_attach_t second_attach_handler; qdr_link_detach_t detach_handler; + qdr_link_flow_t flow_handler; const char *router_area; const char *router_id; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1aa7fece/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 48d3446..1c56f13 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -21,10 +21,8 @@ #include <qpid/dispatch/amqp.h> #include <stdio.h> -ALLOC_DEFINE(qdr_delivery_t); static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_link_deliver_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard); @@ -37,22 +35,38 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard) // Interface Functions //================================================================================== -qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_field_iterator_t *ingress) +qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_field_iterator_t *ingress, bool settled) { qdr_action_t *action = qdr_action(qdr_link_deliver_CT, "link_deliver"); qdr_delivery_t *dlv = new_qdr_delivery_t(); + ZERO(dlv); + dlv->link = link; + dlv->msg = msg; + dlv->to_addr = 0; + dlv->origin = ingress; + dlv->settled = settled; + + action->args.connection.delivery = dlv; qdr_action_enqueue(link->core, action); return dlv; } qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg, - qd_field_iterator_t *ingress, qd_field_iterator_t *addr) + qd_field_iterator_t *ingress, qd_field_iterator_t *addr, bool settled) { - qdr_action_t *action = qdr_action(qdr_link_deliver_to_CT, "link_deliver_to"); + qdr_action_t *action = qdr_action(qdr_link_deliver_CT, "link_deliver"); qdr_delivery_t *dlv = new_qdr_delivery_t(); + ZERO(dlv); + dlv->link = link; + dlv->msg = msg; + dlv->to_addr = addr; + dlv->origin = ingress; + dlv->settled = settled; + + action->args.connection.delivery = dlv; qdr_action_enqueue(link->core, action); return dlv; } @@ -101,6 +115,18 @@ void *qdr_delivery_get_context(qdr_delivery_t *delivery) } +uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery) +{ + return delivery->disposition; +} + + +bool qdr_delivery_is_settled(const qdr_delivery_t *delivery) +{ + return delivery->settled; +} + + //================================================================================== // In-Thread Functions //================================================================================== @@ -109,13 +135,41 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis { if (discard) return; -} + qdr_delivery_t *dlv = action->args.connection.delivery; + qdr_link_t *link = dlv->link; + int count = 0; + + if (DEQ_IS_EMPTY(link->undelivered)) { + qdr_address_t *addr = link->owning_addr; + if (!addr && dlv->to_addr) { + qd_hash_retrieve(core->addr_hash, dlv->to_addr, (void**) &addr); + if (addr) + count = qdr_forward_message_CT(core, addr, dlv->msg, dlv, false, link->link_type == QD_LINK_CONTROL); + } + } -static void qdr_link_deliver_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard) -{ - if (discard) - return; + if (count == 0) { + if (link->owning_addr) + // + // Message was not delivered and the link is not anonymous. + // Queue the message for later delivery (when the address gets + // a valid destination). + // + DEQ_INSERT_TAIL(link->undelivered, dlv); + else { + // + // TODO - Release the delivery + // + } + } else if (count == 1) { + if (qdr_delivery_is_settled(dlv)) + DEQ_INSERT_TAIL(link->unsettled, dlv); + } else { + // + // The count is greater than one. Do something! TODO + // + } } @@ -129,7 +183,7 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard) qd_address_iterator_reset_view(addr_field->iterator, ITER_VIEW_ADDRESS_HASH); qd_hash_retrieve(core->addr_hash, addr_field->iterator, (void**) &addr); if (addr) - qdr_forward_message_CT(core, addr, msg, 0, action->args.io.exclude_inprocess, action->args.io.control); + (void) qdr_forward_message_CT(core, addr, msg, 0, action->args.io.exclude_inprocess, action->args.io.control); else qd_log(core->log, QD_LOG_DEBUG, "In-process send to an unknown address"); } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1aa7fece/src/router_node.c ---------------------------------------------------------------------- diff --git a/src/router_node.c b/src/router_node.c index c775434..08da120 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -286,9 +286,9 @@ static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd addr_iter = qd_message_field_iterator(msg, QD_FIELD_TO); if (addr_iter) - delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, addr_iter); + delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, addr_iter, pn_delivery_settled(pnd)); } else - delivery = qdr_link_deliver(rlink, msg, ingress_iter); + delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd)); if (delivery) { pn_delivery_set_context(pnd, delivery); @@ -612,6 +612,11 @@ static void qd_router_link_detach(void *context, qdr_link_t *link, qdr_error_t * } +static void qd_router_link_flow(void *context, qdr_link_t *link) +{ +} + + void qd_router_setup_late(qd_dispatch_t *qd) { qd->router->router_core = qdr_core(qd, qd->router->router_area, qd->router->router_id); @@ -620,7 +625,8 @@ void qd_router_setup_late(qd_dispatch_t *qd) qd_router_connection_activate, qd_router_link_first_attach, qd_router_link_second_attach, - qd_router_link_detach); + qd_router_link_detach, + qd_router_link_flow); qd_router_python_setup(qd->router); qd_timer_schedule(qd->router->timer, 1000); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
