DISPATCH-1194 - Refactored the address lookup for attach into a core module with asynchronous capability.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/564c5907 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/564c5907 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/564c5907 Branch: refs/heads/master Commit: 564c5907cebb12653181f2e266dce8b581dc7c32 Parents: d201dea Author: Ted Ross <[email protected]> Authored: Tue Dec 4 09:38:09 2018 -0500 Committer: Ted Ross <[email protected]> Committed: Tue Dec 4 09:38:09 2018 -0500 ---------------------------------------------------------------------- src/CMakeLists.txt | 1 + src/router_core/connections.c | 345 +---------------- src/router_core/core_attach_address_lookup.h | 54 +++ .../address_lookup_client/lookup_client.c | 367 +++++++++++++++++++ src/router_core/router_core_private.h | 3 + 5 files changed, 433 insertions(+), 337 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/564c5907/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 352aec7..8fc6485 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -110,6 +110,7 @@ set(qpid_dispatch_SOURCES router_core/modules/test_hooks/core_test_hooks.c router_core/modules/edge_addr_tracking/edge_addr_tracking.c router_core/modules/address_lookup/address_lookup.c + router_core/modules/address_lookup_client/lookup_client.c router_node.c router_pynode.c schema_enum.c http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/564c5907/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index db271f4..d06bec9 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -607,33 +607,6 @@ void qdr_link_enqueue_work_CT(qdr_core_t *core, /** - * Generate a temporary routable address for a destination connected to this - * router node. - */ -static void qdr_generate_temp_addr(qdr_core_t *core, char *buffer, size_t length) -{ - char discriminator[QD_DISCRIMINATOR_SIZE]; - qd_generate_discriminator(discriminator); - if (core->router_mode == QD_ROUTER_MODE_EDGE) - snprintf(buffer, length, "amqp:/_edge/%s/temp.%s", core->router_id, discriminator); - else - snprintf(buffer, length, "amqp:/_topo/%s/%s/temp.%s", core->router_area, core->router_id, discriminator); -} - - -/** - * Generate a temporary mobile address for a producer connected to this - * router node. - */ -static void qdr_generate_mobile_addr(qdr_core_t *core, char *buffer, size_t length) -{ - char discriminator[QD_DISCRIMINATOR_SIZE]; - qd_generate_discriminator(discriminator); - snprintf(buffer, length, "amqp:/_$temp.%s", discriminator); -} - - -/** * Generate a link name */ static void qdr_generate_link_name(const char *label, char *buffer, size_t length) @@ -1087,173 +1060,6 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr) } -/** - * qdr_lookup_terminus_address_CT - * - * Lookup a terminus address in the route table and possibly create a new address - * if no match is found. - * - * @param core Pointer to the core object - * @param dir Direction of the link for the terminus - * @param conn The connection over which the terminus was attached - * @param terminus The terminus containing the addressing information to be looked up - * @param create_if_not_found Iff true, return a pointer to a newly created address record - * @param accept_dynamic Iff true, honor the dynamic flag by creating a dynamic address - * @param [out] link_route True iff the lookup indicates that an attach should be routed - * @param [out] unavailable True iff this address is blocked as unavailable - * @param [out] core_endpoint True iff this address is bound to a core-internal endpoint - * @return Pointer to an address record or 0 if none is found - */ -static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core, - qd_direction_t dir, - qdr_connection_t *conn, - qdr_terminus_t *terminus, - bool create_if_not_found, - bool accept_dynamic, - bool *link_route, - bool *unavailable, - bool *core_endpoint) -{ - qdr_address_t *addr = 0; - - // - // Unless expressly stated, link routing is not indicated for this terminus. - // - *link_route = false; - *unavailable = false; - *core_endpoint = false; - - if (qdr_terminus_is_dynamic(terminus)) { - // - // The terminus is dynamic. Check to see if there is an address provided - // in the dynamic node properties. If so, look that address up as a link-routed - // destination. - // - qd_iterator_t *dnp_address = qdr_terminus_dnp_address(terminus); - if (dnp_address) { - qd_iterator_reset_view(dnp_address, ITER_VIEW_ADDRESS_WITH_SPACE); - if (conn->tenant_space) - qd_iterator_annotate_space(dnp_address, conn->tenant_space, conn->tenant_space_len); - qd_parse_tree_retrieve_match(core->link_route_tree[dir], dnp_address, (void**) &addr); - - if (addr && conn->tenant_space) { - // - // If this link is in a tenant space, translate the dnp address to - // the fully-qualified view - // - qdr_terminus_set_dnp_address_iterator(terminus, dnp_address); - } - - qd_iterator_free(dnp_address); - *link_route = true; - return addr; - } - - // - // The dynamic terminus has no address in the dynamic-node-propteries. If we are - // permitted to generate dynamic addresses, create a new address that is local to - // this router and insert it into the address table with a hash index. - // - if (!accept_dynamic) - return 0; - - char temp_addr[200]; - bool generating = true; - while (generating) { - // - // The address-generation process is performed in a loop in case the generated - // address collides with a previously generated address (this should be _highly_ - // unlikely). - // - if (dir == QD_OUTGOING) - qdr_generate_temp_addr(core, temp_addr, 200); - else - qdr_generate_mobile_addr(core, temp_addr, 200); - - qd_iterator_t *temp_iter = qd_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH); - qd_hash_retrieve(core->addr_hash, temp_iter, (void**) &addr); - if (!addr) { - addr = qdr_address_CT(core, QD_TREATMENT_ANYCAST_BALANCED); - qd_hash_insert(core->addr_hash, temp_iter, addr, &addr->hash_handle); - DEQ_INSERT_TAIL(core->addrs, addr); - qdr_terminus_set_address(terminus, temp_addr); - generating = false; - } - qd_iterator_free(temp_iter); - } - return addr; - } - - // - // If the terminus is anonymous, there is no address to look up. - // - if (qdr_terminus_is_anonymous(terminus)) - return 0; - - // - // The terminus has a non-dynamic address that we need to look up. First, look for - // a link-route destination for the address. - // - qd_iterator_t *iter = qdr_terminus_get_address(terminus); - qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_WITH_SPACE); - if (conn->tenant_space) - qd_iterator_annotate_space(iter, conn->tenant_space, conn->tenant_space_len); - qd_parse_tree_retrieve_match(core->link_route_tree[dir], iter, (void**) &addr); - if (addr) { - *link_route = true; - - // - // If this link is in a tenant space, translate the terminus address to - // the fully-qualified view - // - if (conn->tenant_space) { - qdr_terminus_set_address_iterator(terminus, iter); - } - return addr; - } - - // - // There was no match for a link-route destination, look for a message-route address. - // - int in_phase; - int out_phase; - int addr_phase; - int priority; - qd_address_treatment_t treat = qdr_treatment_for_address_CT(core, conn, iter, &in_phase, &out_phase, &priority); - - qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); - qd_iterator_annotate_prefix(iter, '\0'); // Cancel previous override - addr_phase = dir == QD_INCOMING ? in_phase : out_phase; - qd_iterator_annotate_phase(iter, (char) addr_phase + '0'); - - qd_hash_retrieve(core->addr_hash, iter, (void**) &addr); - - if (addr && addr->treatment == QD_TREATMENT_UNAVAILABLE) - *unavailable = true; - - if (!addr && create_if_not_found) { - addr = qdr_address_CT(core, treat); - if (addr) { - qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle); - DEQ_INSERT_TAIL(core->addrs, addr); - } - - if (!addr && treat == QD_TREATMENT_UNAVAILABLE) - *unavailable = true; - } - - if (qdr_terminus_is_coordinator(terminus)) - *unavailable = false; - - if (!!addr && addr->core_endpoint != 0) - *core_endpoint = true; - - if (addr) - addr->priority = priority; - return addr; -} - - static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { @@ -1412,19 +1218,6 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo } -static char* disambiguated_link_name(qdr_connection_info_t *conn, char *original) -{ - size_t olen = strlen(original); - size_t clen = strlen(conn->container); - char *name = (char*) malloc(olen + clen + 2); - memset(name, 0, olen + clen + 2); - strcat(name, original); - name[olen] = '@'; - strcat(name + olen + 1, conn->container); - return name; -} - - // // Handle the attachment and detachment of an inter-router control link // @@ -1508,7 +1301,6 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act qd_direction_t dir = action->args.connection.dir; qdr_terminus_t *source = action->args.connection.source; qdr_terminus_t *target = action->args.connection.target; - bool success; // // Put the link into the proper lists for tracking. @@ -1561,93 +1353,13 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act // // This link has a target address // - bool link_route; - bool unavailable; - bool core_endpoint; - qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, target, true, true, &link_route, &unavailable, &core_endpoint); - - if (core_endpoint) { - qdrc_endpoint_do_bound_attach_CT(core, addr, link, source, target); - } - - else if (unavailable) { - qdr_link_outbound_detach_CT(core, link, qdr_error(QD_AMQP_COND_NOT_FOUND, "Node not found"), 0, true); - qdr_terminus_free(source); - qdr_terminus_free(target); - } - - else if (!addr) { - // - // No route to this destination, reject the link - // + if (core->addr_lookup_handler) + core->addr_lookup_handler(core, conn, link, dir, source, target); + else { qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true); qdr_terminus_free(source); qdr_terminus_free(target); - } - - else if (link_route) { - // - // This is a link-routed destination, forward the attach to the next hop - // - if (qdr_terminus_survives_disconnect(target) && !core->qd->allow_resumable_link_route) { - qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_INVALID_LINK_EXPIRATION, true); - qdr_terminus_free(source); - qdr_terminus_free(target); - } else { - if (conn->role != QDR_ROLE_INTER_ROUTER && conn->connection_info) { - link->disambiguated_name = disambiguated_link_name(conn->connection_info, link->name); - } - success = qdr_forward_attach_CT(core, addr, link, source, target); - - if (!success) { - qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true); - qdr_terminus_free(source); - qdr_terminus_free(target); - } - } - - } - - else if (qdr_terminus_is_coordinator(target)) { - // - // This target terminus is a coordinator. - // If we got here, it means that the coordinator link attach could not be link routed to a broker (or to the next router). - // The router should reject this link because the router cannot coordinate transactions itself. - // - // The attach response should have a null target to indicate refusal and the immediately coming detach. - qdr_link_outbound_second_attach_CT(core, link, source, 0); - // Now, send back a detach with the error amqp:precondition-failed - qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_COORDINATOR_PRECONDITION_FAILED, true); - } - else { - // - // Associate the link with the address. With this association, it will be unnecessary - // to do an address lookup for deliveries that arrive on this link. - // - qdr_core_bind_address_link_CT(core, addr, link); - qdr_link_outbound_second_attach_CT(core, link, source, target); - - // - // Issue the initial credit only if one of the following - // holds: - // - there are destinations for the address - // - if the address treatment is multicast - // - the address is that of an exchange (no subscribers allowed) - // - if (DEQ_SIZE(addr->subscriptions) - || DEQ_SIZE(addr->rlinks) - || qd_bitmask_cardinality(addr->rnodes) - || qdr_is_addr_treatment_multicast(addr) - || !!addr->exchange) { - qdr_link_issue_credit_CT(core, link, link->capacity, false); - } - - // - // If this link came through an edge connection, raise a link event to - // herald that fact. - // - if (link->conn->role == QDR_ROLE_EDGE_CONNECTION) - qdrc_event_link_raise(core, QDRC_EVENT_LINK_EDGE_DATA_ATTACHED, link); + return; } } break; @@ -1668,54 +1380,13 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act // switch (link->link_type) { case QD_LINK_ENDPOINT: { - bool link_route; - bool unavailable; - bool core_endpoint; - qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, source, true, true, &link_route, &unavailable, &core_endpoint); - - if (core_endpoint) { - qdrc_endpoint_do_bound_attach_CT(core, addr, link, source, target); - } - - else if (unavailable) { - qdr_link_outbound_detach_CT(core, link, qdr_error(QD_AMQP_COND_NOT_FOUND, "Node not found"), 0, true); - qdr_terminus_free(source); - qdr_terminus_free(target); - } - - else if (!addr) { - // - // No route to this destination, reject the link - // + if (core->addr_lookup_handler) + core->addr_lookup_handler(core, conn, link, dir, source, target); + else { qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true); qdr_terminus_free(source); qdr_terminus_free(target); - } - - else if (link_route) { - // - // This is a link-routed destination, forward the attach to the next hop - // - if (qdr_terminus_survives_disconnect(source) && !core->qd->allow_resumable_link_route) { - qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_INVALID_LINK_EXPIRATION, true); - qdr_terminus_free(source); - qdr_terminus_free(target); - } else { - if (conn->role != QDR_ROLE_INTER_ROUTER && conn->connection_info) { - link->disambiguated_name = disambiguated_link_name(conn->connection_info, link->name); - } - bool success = qdr_forward_attach_CT(core, addr, link, source, target); - if (!success) { - qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true); - qdr_terminus_free(source); - qdr_terminus_free(target); - } - } - } - - else { - qdr_core_bind_address_link_CT(core, addr, link); - qdr_link_outbound_second_attach_CT(core, link, source, target); + return; } break; } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/564c5907/src/router_core/core_attach_address_lookup.h ---------------------------------------------------------------------- diff --git a/src/router_core/core_attach_address_lookup.h b/src/router_core/core_attach_address_lookup.h new file mode 100644 index 0000000..d05b9cb --- /dev/null +++ b/src/router_core/core_attach_address_lookup.h @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef qd_router_core_attach_addr_lookup_types +#define qd_router_core_attach_addr_lookup_types 1 + +#include "router_core_private.h" +#endif + +#ifndef qd_router_core_attach_addr_lookup +#define qd_router_core_attach_addr_lookup 1 + + +/** + * Handler - Look up the address on a received first-attach + * + * This handler is invoked upon receipt of a first-attach on a normal endpoint link. + * The appropriate address, from source or target will be resolved to and address for + * message or link routing. This operation may be synchronoue (completed before it + * returns) or asynchronous (completed later). + * + * @param core Pointer to the core state. + * @param conn Pointer to the connection over which the attach arrived. + * @param link Pointer to the attaching link. + * @param dir The direction of message flow for the link. + * @param source The source terminus for the attach. + * @param target The target terminus for the attach. + */ +typedef void (*qdrc_attach_addr_lookup_t) (qdr_core_t *core, + qdr_connection_t *conn, + qdr_link_t *link, + qd_direction_t dir, + qdr_terminus_t *source, + qdr_terminus_t *target); + + + +#endif http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/564c5907/src/router_core/modules/address_lookup_client/lookup_client.c ---------------------------------------------------------------------- diff --git a/src/router_core/modules/address_lookup_client/lookup_client.c b/src/router_core/modules/address_lookup_client/lookup_client.c new file mode 100644 index 0000000..924565e --- /dev/null +++ b/src/router_core/modules/address_lookup_client/lookup_client.c @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "module.h" +#include "core_attach_address_lookup.h" +#include "router_core_private.h" +#include <qpid/dispatch/discriminator.h> +#include <stdio.h> + +static char* disambiguated_link_name(qdr_connection_info_t *conn, char *original) +{ + size_t olen = strlen(original); + size_t clen = strlen(conn->container); + char *name = (char*) malloc(olen + clen + 2); + memset(name, 0, olen + clen + 2); + strcat(name, original); + name[olen] = '@'; + strcat(name + olen + 1, conn->container); + return name; +} + + +/** + * Generate a temporary routable address for a destination connected to this + * router node. + */ +static void qdr_generate_temp_addr(qdr_core_t *core, char *buffer, size_t length) +{ + char discriminator[QD_DISCRIMINATOR_SIZE]; + qd_generate_discriminator(discriminator); + if (core->router_mode == QD_ROUTER_MODE_EDGE) + snprintf(buffer, length, "amqp:/_edge/%s/temp.%s", core->router_id, discriminator); + else + snprintf(buffer, length, "amqp:/_topo/%s/%s/temp.%s", core->router_area, core->router_id, discriminator); +} + + +/** + * Generate a temporary mobile address for a producer connected to this + * router node. + */ +static void qdr_generate_mobile_addr(qdr_core_t *core, char *buffer, size_t length) +{ + char discriminator[QD_DISCRIMINATOR_SIZE]; + qd_generate_discriminator(discriminator); + snprintf(buffer, length, "amqp:/_$temp.%s", discriminator); +} + + +/** + * qdr_lookup_terminus_address_CT + * + * Lookup a terminus address in the route table and possibly create a new address + * if no match is found. + * + * @param core Pointer to the core object + * @param dir Direction of the link for the terminus + * @param conn The connection over which the terminus was attached + * @param terminus The terminus containing the addressing information to be looked up + * @param create_if_not_found Iff true, return a pointer to a newly created address record + * @param accept_dynamic Iff true, honor the dynamic flag by creating a dynamic address + * @param [out] link_route True iff the lookup indicates that an attach should be routed + * @param [out] unavailable True iff this address is blocked as unavailable + * @param [out] core_endpoint True iff this address is bound to a core-internal endpoint + * @return Pointer to an address record or 0 if none is found + */ +static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core, + qd_direction_t dir, + qdr_connection_t *conn, + qdr_terminus_t *terminus, + bool create_if_not_found, + bool accept_dynamic, + bool *link_route, + bool *unavailable, + bool *core_endpoint) +{ + qdr_address_t *addr = 0; + + // + // Unless expressly stated, link routing is not indicated for this terminus. + // + *link_route = false; + *unavailable = false; + *core_endpoint = false; + + if (qdr_terminus_is_dynamic(terminus)) { + // + // The terminus is dynamic. Check to see if there is an address provided + // in the dynamic node properties. If so, look that address up as a link-routed + // destination. + // + qd_iterator_t *dnp_address = qdr_terminus_dnp_address(terminus); + if (dnp_address) { + qd_iterator_reset_view(dnp_address, ITER_VIEW_ADDRESS_WITH_SPACE); + if (conn->tenant_space) + qd_iterator_annotate_space(dnp_address, conn->tenant_space, conn->tenant_space_len); + qd_parse_tree_retrieve_match(core->link_route_tree[dir], dnp_address, (void**) &addr); + + if (addr && conn->tenant_space) { + // + // If this link is in a tenant space, translate the dnp address to + // the fully-qualified view + // + qdr_terminus_set_dnp_address_iterator(terminus, dnp_address); + } + + qd_iterator_free(dnp_address); + *link_route = true; + return addr; + } + + // + // The dynamic terminus has no address in the dynamic-node-propteries. If we are + // permitted to generate dynamic addresses, create a new address that is local to + // this router and insert it into the address table with a hash index. + // + if (!accept_dynamic) + return 0; + + char temp_addr[200]; + bool generating = true; + while (generating) { + // + // The address-generation process is performed in a loop in case the generated + // address collides with a previously generated address (this should be _highly_ + // unlikely). + // + if (dir == QD_OUTGOING) + qdr_generate_temp_addr(core, temp_addr, 200); + else + qdr_generate_mobile_addr(core, temp_addr, 200); + + qd_iterator_t *temp_iter = qd_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH); + qd_hash_retrieve(core->addr_hash, temp_iter, (void**) &addr); + if (!addr) { + addr = qdr_address_CT(core, QD_TREATMENT_ANYCAST_BALANCED); + qd_hash_insert(core->addr_hash, temp_iter, addr, &addr->hash_handle); + DEQ_INSERT_TAIL(core->addrs, addr); + qdr_terminus_set_address(terminus, temp_addr); + generating = false; + } + qd_iterator_free(temp_iter); + } + return addr; + } + + // + // If the terminus is anonymous, there is no address to look up. + // + if (qdr_terminus_is_anonymous(terminus)) + return 0; + + // + // The terminus has a non-dynamic address that we need to look up. First, look for + // a link-route destination for the address. + // + qd_iterator_t *iter = qdr_terminus_get_address(terminus); + qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_WITH_SPACE); + if (conn->tenant_space) + qd_iterator_annotate_space(iter, conn->tenant_space, conn->tenant_space_len); + qd_parse_tree_retrieve_match(core->link_route_tree[dir], iter, (void**) &addr); + if (addr) { + *link_route = true; + + // + // If this link is in a tenant space, translate the terminus address to + // the fully-qualified view + // + if (conn->tenant_space) { + qdr_terminus_set_address_iterator(terminus, iter); + } + return addr; + } + + // + // There was no match for a link-route destination, look for a message-route address. + // + int in_phase; + int out_phase; + int addr_phase; + int priority; + qd_address_treatment_t treat = qdr_treatment_for_address_CT(core, conn, iter, &in_phase, &out_phase, &priority); + + qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); + qd_iterator_annotate_prefix(iter, '\0'); // Cancel previous override + addr_phase = dir == QD_INCOMING ? in_phase : out_phase; + qd_iterator_annotate_phase(iter, (char) addr_phase + '0'); + + qd_hash_retrieve(core->addr_hash, iter, (void**) &addr); + + if (addr && addr->treatment == QD_TREATMENT_UNAVAILABLE) + *unavailable = true; + + if (!addr && create_if_not_found) { + addr = qdr_address_CT(core, treat); + if (addr) { + qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle); + DEQ_INSERT_TAIL(core->addrs, addr); + } + + if (!addr && treat == QD_TREATMENT_UNAVAILABLE) + *unavailable = true; + } + + if (qdr_terminus_is_coordinator(terminus)) + *unavailable = false; + + if (!!addr && addr->core_endpoint != 0) + *core_endpoint = true; + + if (addr) + addr->priority = priority; + return addr; +} + + +static void qdr_link_react_to_first_attach_CT(qdr_core_t *core, + qdr_connection_t *conn, + qdr_address_t *addr, + qdr_link_t *link, + qd_direction_t dir, + qdr_terminus_t *source, + qdr_terminus_t *target, + bool link_route, + bool unavailable, + bool core_endpoint) +{ + if (core_endpoint) { + qdrc_endpoint_do_bound_attach_CT(core, addr, link, source, target); + } + + else if (unavailable) { + qdr_link_outbound_detach_CT(core, link, qdr_error(QD_AMQP_COND_NOT_FOUND, "Node not found"), 0, true); + qdr_terminus_free(source); + qdr_terminus_free(target); + } + + else if (!addr) { + // + // No route to this destination, reject the link + // + qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true); + qdr_terminus_free(source); + qdr_terminus_free(target); + } + + else if (link_route) { + // + // This is a link-routed destination, forward the attach to the next hop + // + qdr_terminus_t *term = dir == QD_INCOMING ? target : source; + if (qdr_terminus_survives_disconnect(term) && !core->qd->allow_resumable_link_route) { + qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_INVALID_LINK_EXPIRATION, true); + qdr_terminus_free(source); + qdr_terminus_free(target); + } else { + if (conn->role != QDR_ROLE_INTER_ROUTER && conn->connection_info) { + link->disambiguated_name = disambiguated_link_name(conn->connection_info, link->name); + } + bool success = qdr_forward_attach_CT(core, addr, link, source, target); + if (!success) { + qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true); + qdr_terminus_free(source); + qdr_terminus_free(target); + } + } + } + + else if (dir == QD_INCOMING && qdr_terminus_is_coordinator(target)) { + // + // This target terminus is a coordinator. + // If we got here, it means that the coordinator link attach could not be link routed to a broker (or to the next router). + // The router should reject this link because the router cannot coordinate transactions itself. + // + // The attach response should have a null target to indicate refusal and the immediately coming detach. + // + qdr_link_outbound_second_attach_CT(core, link, source, 0); + + // + // Now, send back a detach with the error amqp:precondition-failed + // + qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_COORDINATOR_PRECONDITION_FAILED, true); + } else { + // + // Associate the link with the address. With this association, it will be unnecessary + // to do an address lookup for deliveries that arrive on this link. + // + qdr_core_bind_address_link_CT(core, addr, link); + qdr_link_outbound_second_attach_CT(core, link, source, target); + + // + // Issue the initial credit only if one of the following + // holds: + // - there are destinations for the address + // - if the address treatment is multicast + // - the address is that of an exchange (no subscribers allowed) + // + if (dir == QD_INCOMING + && (DEQ_SIZE(addr->subscriptions) + || DEQ_SIZE(addr->rlinks) + || qd_bitmask_cardinality(addr->rnodes) + || qdr_is_addr_treatment_multicast(addr) + || !!addr->exchange)) { + qdr_link_issue_credit_CT(core, link, link->capacity, false); + } + + // + // If this link came through an edge connection, raise a link event to + // herald that fact. + // + if (link->conn->role == QDR_ROLE_EDGE_CONNECTION) + qdrc_event_link_raise(core, QDRC_EVENT_LINK_EDGE_DATA_ATTACHED, link); + } +} + + +static void qcm_addr_lookup_CT(qdr_core_t *core, + qdr_connection_t *conn, + qdr_link_t *link, + qd_direction_t dir, + qdr_terminus_t *source, + qdr_terminus_t *target) + +{ + bool link_route; + bool unavailable; + bool core_endpoint; + qdr_terminus_t *term = dir == QD_INCOMING ? target : source; + + qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, term, true, true, &link_route, &unavailable, &core_endpoint); + qdr_link_react_to_first_attach_CT(core, conn, addr, link, dir, source, target, link_route, unavailable, core_endpoint); +} + + +static void qcm_addr_lookup_client_init_CT(qdr_core_t *core, void **module_context) +{ + assert(core->addr_lookup_handler == 0); + + core->addr_lookup_handler = qcm_addr_lookup_CT; + *module_context = core; +} + + +static void qcm_addr_lookup_client_final_CT(void *module_context) +{ + qdr_core_t *core = (qdr_core_t*) module_context; + core->addr_lookup_handler = 0; +} + + +QDR_CORE_MODULE_DECLARE("address_lookup_client", qcm_addr_lookup_client_init_CT, qcm_addr_lookup_client_final_CT) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/564c5907/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 0b70c97..bb4af57 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -43,6 +43,7 @@ typedef struct qdr_edge_t qdr_edge_t; #include "core_link_endpoint.h" #include "core_events.h" +#include "core_attach_address_lookup.h" qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_treatment_t treatment); int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery, @@ -727,6 +728,8 @@ struct qdr_core_t { qdr_connection_list_t connections_to_activate; qdr_link_list_t open_links; + qdrc_attach_addr_lookup_t addr_lookup_handler; + // // Agent section // --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
