Repository: qpid-dispatch Updated Branches: refs/heads/tross-DISPATCH-179-1 af4fef5af -> 72678b250
DISPATCH-179 - WIP, added central address lookup and semantics lookup Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/72678b25 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/72678b25 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/72678b25 Branch: refs/heads/tross-DISPATCH-179-1 Commit: 72678b25053aa1fb03a6f8b816a83d09669658a5 Parents: af4fef5 Author: Ted Ross <[email protected]> Authored: Wed Dec 9 16:37:38 2015 -0500 Committer: Ted Ross <[email protected]> Committed: Wed Dec 9 16:37:38 2015 -0500 ---------------------------------------------------------------------- include/qpid/dispatch/router_core.h | 45 ++++- src/router_core/connections.c | 259 ++++++++++++++++++++++++++++- src/router_core/router_core_private.h | 1 + src/router_core/terminus.c | 50 +++++- src/router_node.c | 5 + 5 files changed, 348 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/72678b25/include/qpid/dispatch/router_core.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 7dc3fac..f83e4cc 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -94,8 +94,7 @@ typedef enum { QD_LINK_ENDPOINT, ///< A link to a connected endpoint QD_LINK_WAYPOINT, ///< A link to a configured waypoint QD_LINK_CONTROL, ///< A link to a peer router for control messages - QD_LINK_ROUTER, ///< A link to a peer router for routed messages - QD_LINK_AREA ///< A link to a peer router in a different area (area boundary) + QD_LINK_ROUTER ///< A link to a peer router for routed messages } qd_link_type_t; ENUM_DECLARE(qd_link_type); @@ -233,6 +232,48 @@ void qdr_terminus_add_capability(qdr_terminus_t *term, const char *capability); */ bool qdr_terminus_has_capability(qdr_terminus_t *term, const char *capability); +/** + * qdr_terminus_is_anonymous + * + * Indicate whether this terminus represents an anonymous endpoint. + * + * @param term A qdr_terminus pointer returned by qdr_terminus() + * @return true iff the terminus is anonymous + */ +bool qdr_terminus_is_anonymous(qdr_terminus_t *term); + +/** + * qdr_terminus_is_dynamic + * + * Indicate whether this terminus represents a dynamic endpoint. + * + * @param term A qdr_terminus pointer returned by qdr_terminus() + * @return true iff the terminus is dynamic + */ +bool qdr_terminus_is_dynamic(qdr_terminus_t *term); + +/** + * qdr_terminus_get_address + * + * Return the address of the terminus in the form of an iterator. + * The iterator is borrowed, the caller must not free the iterator. + * + * @param term A qdr_terminus pointer returned by qdr_terminus() + * @return A pointer to an iterator or 0 if the terminus is anonymous. + */ +qd_field_iterator_t *qdr_terminus_get_address(qdr_terminus_t *term); + +/** + * qdr_terminus_dnp_address + * + * Return the address field in the dynamic-node-properties if it is there. + * This iterator is given, the caller must free it when it is no longer needed. + * + * @param term A qdr_terminus pointer returned by qdr_terminus() + * @return A pointer to an iterator or 0 if there is no such field. + */ +qd_field_iterator_t *qdr_terminus_dnp_address(qdr_terminus_t *term); + /** ****************************************************************************** http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/72678b25/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 3e8b888..5836bf0 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -19,6 +19,7 @@ #include "router_core_private.h" #include <qpid/dispatch/amqp.h> +#include <stdio.h> static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, bool discard); @@ -29,6 +30,15 @@ static void qdr_link_detach_CT(qdr_core_t *core, qdr_action_t *action, bool disc ALLOC_DEFINE(qdr_connection_t); ALLOC_DEFINE(qdr_connection_work_t); +static qd_address_semantics_t qdr_dynamic_semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_BACKPRESSURE; +static qd_address_semantics_t qdr_default_semantics = QD_FANOUT_SINGLE | QD_BIAS_SPREAD | QD_CONGESTION_BACKPRESSURE; + +typedef enum { + QDR_CONDITION_NO_ROUTE_TO_DESTINATION, + QDR_CONDITION_ROUTED_LINK_LOST, + QDR_CONDITION_FORBIDDEN +} qdr_condition_t; + //================================================================================== // Internal Functions //================================================================================== @@ -260,6 +270,171 @@ static qdr_link_t *qdr_create_link_CT(qdr_core_t *core, } +static void qdr_link_reject_CT(qdr_core_t *core, qdr_link_t *link, qdr_condition_t condition) +{ +} + + +static void qdr_link_accept_CT(qdr_core_t *core, qdr_link_t *link) +{ +} + + +static void qdr_forward_first_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_address_t *addr) +{ +} + + +/** + * Generate a temporary routable address for a destination connected to this + * router node. + */ +static void qdr_generate_temp_addr(qdr_core_t *core, char *buffer, size_t length) +{ + static const char *table = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+_"; + char discriminator[16]; + long int rnd1 = random(); + long int rnd2 = random(); + long int rnd3 = random(); + int idx; + int cursor = 0; + + for (idx = 0; idx < 5; idx++) { + discriminator[cursor++] = table[(rnd1 >> (idx * 6)) & 63]; + discriminator[cursor++] = table[(rnd2 >> (idx * 6)) & 63]; + discriminator[cursor++] = table[(rnd3 >> (idx * 6)) & 63]; + } + discriminator[cursor] = '\0'; + + snprintf(buffer, length, "amqp:/_topo/%s/%s/temp.%s", core->router_area, core->router_id, discriminator); +} + + +static char qdr_prefix_for_dir(qd_direction_t dir) +{ + return (dir == QD_INCOMING) ? 'C' : 'D'; +} + + +static qd_address_semantics_t qdr_semantics_for_address(qdr_core_t *core, qd_field_iterator_t *iter) +{ + qdr_address_t *addr = 0; + + // + // Question: Should we use a new prefix for configuration? + // + qd_hash_retrieve_prefix(core->addr_hash, iter, (void**) addr); + return addr ? addr->semantics : qdr_default_semantics; +} + + +/** + * qdr_lookup_terminus_address_CT + * + * Lookup a terminus address in the route table and possibly create a new address + * if no match is found. + * + * @param core Pointer to the core object + * @param dir Direction of the link for the terminus + * @param terminus The terminus containing the addressing information to be looked up + * @param create_if_not_found Iff true, return a pointer to a newly created address record + * @param accept_dynamic Iff true, honor the dynamic flag by creating a dynamic address + * @param [out] link_route True iff the lookup indicates that an attach should be routed + * @return Pointer to an address record or 0 if none is found + */ +static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core, + qd_direction_t dir, + qdr_terminus_t *terminus, + bool create_if_not_found, + bool accept_dynamic, + bool *link_route) +{ + qdr_address_t *addr = 0; + + // + // Unless expressly stated, link routing is not indicated for this terminus. + // + *link_route = false; + + if (qdr_terminus_is_dynamic(terminus)) { + // + // The terminus is dynamic. Check to see if there is an address provided + // in the dynamic node properties. If so, look that address up as a link-routed + // destination. + // + qd_field_iterator_t *dnp_address = qdr_terminus_dnp_address(terminus); + if (dnp_address) { + qd_address_iterator_override_prefix(dnp_address, qdr_prefix_for_dir(dir)); + qd_hash_retrieve_prefix(core->addr_hash, dnp_address, (void**) &addr); + qd_field_iterator_free(dnp_address); + *link_route = true; + return addr; + } + + // + // The dynamic terminus has no address in the dynamic-node-propteries. If we are + // permitted to generate dynamic addresses, create a new address that is local to + // this router and insert it into the address table with a hash index. + // + if (!accept_dynamic) + return 0; + + char temp_addr[200]; + bool generating = true; + while (generating) { + // + // The address-generation process is performed in a loop in case the generated + // address collides with a previously generated address (this should be _highly_ + // unlikely). + // + qdr_generate_temp_addr(core, temp_addr, 200); + qd_field_iterator_t *temp_iter = qd_address_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH); + qd_hash_retrieve(core->addr_hash, temp_iter, (void**) &addr); + if (!addr) { + addr = qdr_address(qdr_dynamic_semantics); + qd_hash_insert(core->addr_hash, temp_iter, addr, &addr->hash_handle); + DEQ_INSERT_TAIL(core->addrs, addr); + generating = false; + } + qd_field_iterator_free(temp_iter); + } + return addr; + } + + // + // If the terminus is anonymous, there is no address to look up. + // + if (qdr_terminus_is_anonymous(terminus)) + return 0; + + // + // The terminus has a non-dynamic address that we need to look up. First, look for + // a link-route destination for the address. + // + qd_field_iterator_t *iter = qdr_terminus_get_address(terminus); + qd_address_iterator_override_prefix(iter, qdr_prefix_for_dir(dir)); + qd_hash_retrieve_prefix(core->addr_hash, iter, (void**) &addr); + if (addr) { + *link_route = true; + return addr; + } + + // + // There was no match for a link-route destination, look for a message-route address. + // + qd_address_iterator_override_prefix(iter, '\0'); // Cancel previous override + qd_hash_retrieve(core->addr_hash, iter, (void**) &addr); + if (!addr && create_if_not_found) { + qd_address_semantics_t sem = qdr_semantics_for_address(core, iter); + addr = qdr_address(sem); + qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle); + DEQ_INSERT_TAIL(core->addrs, addr); + } + + return addr; +} + + static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { if (discard) @@ -335,11 +510,6 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo } -static void qdr_link_reject_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link) -{ -} - - static void qdr_link_first_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { if (discard) @@ -347,15 +517,86 @@ static void qdr_link_first_attach_CT(qdr_core_t *core, qdr_action_t *action, boo qdr_connection_t *conn = action->args.connection.conn; qdr_link_t *link = action->args.connection.link; - //qd_direction_t dir = action->args.connection.dir; + qd_direction_t dir = action->args.connection.dir; //qdr_terminus_t *source = action->args.connection.source; - //qdr_terminus_t *target = action->args.connection.target; + qdr_terminus_t *target = action->args.connection.target; // - // Check for some rejected-attach cases + // Reject any attaches of inter-router links that arrive on connections that are not inter-router. // if ((link->link_type == QD_LINK_CONTROL || link->link_type == QD_LINK_ROUTER) && conn->role != QDR_ROLE_INTER_ROUTER) - qdr_link_reject_CT(core, conn, link); + qdr_link_reject_CT(core, link, QDR_CONDITION_FORBIDDEN); + + // + // Reject any waypoint links. Waypoint links are always initiated by a router, not the remote container. + // + if (link->link_type == QD_LINK_WAYPOINT) + qdr_link_reject_CT(core, link, QDR_CONDITION_FORBIDDEN); + + if (dir == QD_INCOMING) { + // + // Handle incoming link cases + // + switch (link->link_type) { + case QD_LINK_ENDPOINT: + if (qdr_terminus_is_anonymous(target)) { + link->addr = 0; + qdr_link_accept_CT(core, link); + } else { + // + // This link has a target address + // + bool link_route = false; + qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, target, false, false, &link_route); + if (!addr) + // + // No route to this destination, reject the link + // + qdr_link_reject_CT(core, link, QDR_CONDITION_NO_ROUTE_TO_DESTINATION); + + else if (link_route) + // + // This is a link-routed destination, forward the attach to the next hop + // + qdr_forward_first_attach_CT(core, link, addr); + + else { + // + // Associate the link with the address. With this association, it will be unnecessary + // to do an address lookup for deliveries that arrive on this link. + // + link->addr = addr; + qdr_link_accept_CT(core, link); + } + } + break; + + case QD_LINK_WAYPOINT: + // No action, waypoint links are rejected above. + break; + + case QD_LINK_CONTROL: + break; + + case QD_LINK_ROUTER: + break; + } + } else { + // + // Handle outgoing link cases + // + switch (link->link_type) { + case QD_LINK_ENDPOINT: + break; + case QD_LINK_WAYPOINT: + // No action, waypoint links are rejected above. + break; + case QD_LINK_CONTROL: + break; + case QD_LINK_ROUTER: + break; + } + } // // Cases to be handled: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/72678b25/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index e628d73..11176a6 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -162,6 +162,7 @@ struct qdr_link_t { qdr_connection_t *conn; ///< [ref] Connection that owns this link qd_link_type_t link_type; qd_direction_t link_direction; + qdr_address_t *addr; ///< [ref] Associated address record qdr_address_t *owning_addr; ///< [ref] Address record that owns this link //qd_waypoint_t *waypoint; ///< [ref] Waypoint that owns this link qdr_link_t *connected_link; ///< [ref] If this is a link-route, reference the connected link http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/72678b25/src/router_core/terminus.c ---------------------------------------------------------------------- diff --git a/src/router_core/terminus.c b/src/router_core/terminus.c index d78b8f9..175acf4 100644 --- a/src/router_core/terminus.c +++ b/src/router_core/terminus.c @@ -46,7 +46,10 @@ qdr_terminus_t *qdr_terminus(pn_terminus_t *pn) term->capabilities = pn_data(0); if (pn) { - term->address = qdr_field(pn_terminus_get_address(pn)); + const char *addr = pn_terminus_get_address(pn); + if (addr && *addr) + term->address = qdr_field(addr); + term->durability = pn_terminus_get_durability(pn); term->expiry_policy = pn_terminus_get_expiry_policy(pn); term->timeout = pn_terminus_get_timeout(pn); @@ -119,3 +122,48 @@ bool qdr_terminus_has_capability(qdr_terminus_t *term, const char *capability) return false; } + +bool qdr_terminus_is_anonymous(qdr_terminus_t *term) +{ + return term == 0 || term->address == 0; +} + + +bool qdr_terminus_is_dynamic(qdr_terminus_t *term) +{ + return term->dynamic; +} + + +qd_field_iterator_t *qdr_terminus_get_address(qdr_terminus_t *term) +{ + if (qdr_terminus_is_anonymous(term)) + return 0; + + return term->address->iterator; +} + + +qd_field_iterator_t *qdr_terminus_dnp_address(qdr_terminus_t *term) +{ + pn_data_t *props = term->properties; + + if (!props) + return 0; + + pn_data_rewind(props); + if (pn_data_next(props) && pn_data_enter(props) && pn_data_next(props)) { + pn_bytes_t sym = pn_data_get_symbol(props); + if (sym.start && strcmp(QD_DYNAMIC_NODE_PROPERTY_ADDRESS, sym.start) == 0) { + if (pn_data_next(props)) { + pn_bytes_t val = pn_data_get_string(props); + if (val.start && *val.start != '\0') + return qd_field_iterator_string(val.start); + } + } + } + + return 0; +} + + http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/72678b25/src/router_node.c ---------------------------------------------------------------------- diff --git a/src/router_node.c b/src/router_node.c index 071845b..0f2c32b 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -296,6 +296,7 @@ static int qd_router_terminus_is_router(pn_terminus_t *term) /** * If the terminus has a dynamic-node-property for a node address, * return an iterator for the content of that property. + * DEPRECATE */ static const char *qd_router_terminus_dnp_address(pn_terminus_t *term) { @@ -323,6 +324,7 @@ static const char *qd_router_terminus_dnp_address(pn_terminus_t *term) /** * Generate a temporary routable address for a destination connected to this * router node. + * DEPRECATE */ static void qd_router_generate_temp_addr(qd_router_t *router, char *buffer, size_t length) { @@ -373,6 +375,9 @@ static int qd_router_find_mask_bit_LH(qd_router_t *router, qd_link_t *link) } +/** + * DEPRECATE + */ static qd_address_t *router_lookup_terminus_LH(qd_router_t *router, const char *taddr, qd_direction_t dir) { char addr_prefix = (dir == QD_INCOMING) ? 'C' : 'D'; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
