Repository: qpid-dispatch Updated Branches: refs/heads/master db87d5d3e -> 564c5907c
DISPATCH-1194: Link Route address lookup for edge router This closes #423 Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/d201deac Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/d201deac Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/d201deac Branch: refs/heads/master Commit: d201deac4ec8854571dcd3acd1aa5aa39d16de5b Parents: db87d5d Author: Kenneth Giusti <[email protected]> Authored: Wed Nov 28 09:30:09 2018 -0500 Committer: Ted Ross <[email protected]> Committed: Tue Dec 4 09:34:47 2018 -0500 ---------------------------------------------------------------------- include/qpid/dispatch/amqp.h | 5 + src/CMakeLists.txt | 1 + .../modules/address_lookup/address_lookup.c | 454 +++++++++++++++++++ .../modules/address_lookup/address_lookup.h | 75 +++ src/router_core/router_core_private.h | 2 +- src/router_core/transfer.c | 45 +- tests/CMakeLists.txt | 1 + tests/system_test.py | 9 +- tests/system_tests_address_lookup.py | 287 ++++++++++++ 9 files changed, 857 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/include/qpid/dispatch/amqp.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h index e62b7f7..99315c7 100644 --- a/include/qpid/dispatch/amqp.h +++ b/include/qpid/dispatch/amqp.h @@ -184,5 +184,10 @@ extern const char * const QD_AMQP_COND_ILLEGAL_STATE; extern const char * const QD_AMQP_COND_FRAME_SIZE_TOO_SMALL; /// @}; +/** @name AMQP link endpoint role. */ +/// @{ +#define QD_AMQP_LINK_ROLE_SENDER false +#define QD_AMQP_LINK_ROLE_RECEIVER true +/// @}; #endif http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ebc0e09..352aec7 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -109,6 +109,7 @@ set(qpid_dispatch_SOURCES router_core/modules/edge_router/edge_mgmt.c router_core/modules/test_hooks/core_test_hooks.c router_core/modules/edge_addr_tracking/edge_addr_tracking.c + router_core/modules/address_lookup/address_lookup.c router_node.c router_pynode.c schema_enum.c http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/src/router_core/modules/address_lookup/address_lookup.c ---------------------------------------------------------------------- diff --git a/src/router_core/modules/address_lookup/address_lookup.c b/src/router_core/modules/address_lookup/address_lookup.c new file mode 100644 index 0000000..93297ac --- /dev/null +++ b/src/router_core/modules/address_lookup/address_lookup.c @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "address_lookup.h" +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/amqp.h> +#include "module.h" +#include "core_link_endpoint.h" + +#include "inttypes.h" + +#define PROTOCOL_VERSION 1 + +typedef enum { + // note: keep unit test in sync + OPCODE_INVALID, + OPCODE_LINK_ROUTE_LOOKUP, +} address_lookup_opcode_t; + + +/* create the message application properties and body for the link route lookup + * request message + */ +int qcm_link_route_lookup_msg(qd_iterator_t *address, + qd_direction_t dir, + qd_composed_field_t **properties, + qd_composed_field_t **body) +{ + *properties = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0); + if (!*properties) + return -1; + qd_compose_start_map(*properties); + qd_compose_insert_string(*properties, "version"); + qd_compose_insert_uint(*properties, PROTOCOL_VERSION); + qd_compose_insert_string(*properties, "opcode"); + qd_compose_insert_uint(*properties, OPCODE_LINK_ROUTE_LOOKUP); + qd_compose_end_map(*properties); + + *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); + if (!*body) { + qd_compose_free(*properties); + *properties = 0; + return -1; + } + qd_compose_start_list(*body); + qd_compose_insert_string_iterator(*body, address); + qd_compose_insert_bool(*body, (dir == QD_INCOMING + ? QD_AMQP_LINK_ROLE_RECEIVER + : QD_AMQP_LINK_ROLE_SENDER)); + qd_compose_end_list(*body); + return 0; +} + + +/* parse a reply to the link route lookup request + */ +qcm_address_lookup_status_t qcm_link_route_lookup_decode(qd_iterator_t *properties, + qd_iterator_t *body, + bool *is_link_route, + bool *has_destinations) +{ + qcm_address_lookup_status_t rc = QCM_ADDR_LOOKUP_OK; + *is_link_route = false; + *has_destinations = false; + + qd_parsed_field_t *props = qd_parse(properties); + if (!props || !qd_parse_ok(props) || !qd_parse_is_map(props)) + return QCM_ADDR_LOOKUP_INVALID_REQUEST; + + qd_parsed_field_t *bod = qd_parse(body); + if (!bod || !qd_parse_ok(bod) || !qd_parse_is_list(bod)) { + qd_parse_free(props); + return QCM_ADDR_LOOKUP_INVALID_REQUEST; + } + + qd_parsed_field_t *tmp = qd_parse_value_by_key(props, "status"); + if (!tmp || !qd_parse_is_scalar(tmp)) { + rc = QCM_ADDR_LOOKUP_INVALID_REQUEST; + goto exit; + } else { + int32_t status = qd_parse_as_int(tmp); + if (status != QCM_ADDR_LOOKUP_OK) { + rc = (qcm_address_lookup_status_t) status; + goto exit; + } + } + + // bod[0] == is_link_route (bool) + // bod[1] == has_destinations (bool) + + if (qd_parse_sub_count(bod) < 2) { + rc = QCM_ADDR_LOOKUP_INVALID_REQUEST; + goto exit; + } + + *is_link_route = qd_parse_as_bool(qd_parse_sub_value(bod, 0)); + *has_destinations = qd_parse_as_bool(qd_parse_sub_value(bod, 1)); + +exit: + qd_parse_free(props); + qd_parse_free(bod); + return rc; +} + + +typedef struct _endpoint_ref { + DEQ_LINKS(struct _endpoint_ref); + qdrc_endpoint_t *endpoint; + const char *container_id; +} _endpoint_ref_t; +DEQ_DECLARE(_endpoint_ref_t, _endpoint_ref_list_t); +ALLOC_DEFINE(_endpoint_ref_t); + + +static struct { + qdr_core_t *core; + _endpoint_ref_list_t endpoints; +} _server_state; + + +/* parse out the opcode from the request + */ +static address_lookup_opcode_t _decode_opcode(qd_parsed_field_t *properties) +{ + if (!properties) + return OPCODE_INVALID; + qd_parsed_field_t *oc = qd_parse_value_by_key(properties, "opcode"); + if (!oc) + return OPCODE_INVALID; + uint32_t opcode = qd_parse_as_uint(oc); + if (!qd_parse_ok(oc)) + return OPCODE_INVALID; + return (address_lookup_opcode_t)opcode; +} + + +/* send a reply to a lookup request + */ +static uint64_t _send_reply(_endpoint_ref_t *epr, + address_lookup_opcode_t opcode, + qcm_address_lookup_status_t status, + qd_iterator_t *correlation_id, + qd_iterator_t *reply_to, + qd_composed_field_t *body) +{ + if (!correlation_id || !reply_to) { + qd_log(_server_state.core->log, QD_LOG_ERROR, + "Link route address reply failed - invalid request message properties" + " (container=%s, endpoint=%p)", + epr->container_id, (void *)epr->endpoint); + return PN_REJECTED; + } + + qd_composed_field_t *fld = qd_compose(QD_PERFORMATIVE_HEADER, 0); + qd_compose_start_list(fld); + qd_compose_insert_bool(fld, 0); // durable + qd_compose_end_list(fld); + + fld = qd_compose(QD_PERFORMATIVE_PROPERTIES, fld); + qd_compose_start_list(fld); + qd_compose_insert_null(fld); // message-id + qd_compose_insert_null(fld); // user-id + qd_compose_insert_typed_iterator(fld, reply_to); // to + qd_compose_insert_null(fld); // subject + qd_compose_insert_null(fld); // reply-to + qd_compose_insert_typed_iterator(fld, correlation_id); + qd_compose_end_list(fld); + + fld = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, fld); + qd_compose_start_map(fld); + qd_compose_insert_string(fld, "version"); + qd_compose_insert_uint(fld, PROTOCOL_VERSION); + qd_compose_insert_string(fld, "opcode"); + qd_compose_insert_uint(fld, opcode); + qd_compose_insert_string(fld, "status"); + qd_compose_insert_uint(fld, status); + qd_compose_end_map(fld); + + qd_message_t *msg = qd_message(); + + qd_message_compose_3(msg, fld, body); + qdr_in_process_send_to_CT(_server_state.core, reply_to, msg, true, false); + qd_message_free(msg); + qd_compose_free(fld); + + return PN_ACCEPTED; +} + + +/* perform a link route lookup + */ +static uint64_t _do_link_route_lookup(_endpoint_ref_t *epr, + qd_parsed_field_t *body, + qd_iterator_t *reply_to, + qd_iterator_t *cid) +{ + if (!body || !qd_parse_ok(body) || qd_parse_sub_count(body) < 2) { + qd_log(_server_state.core->log, QD_LOG_ERROR, + "Link route address lookup failed - invalid request body" + " (container=%s, endpoint=%p)", + epr->container_id, (void *)epr->endpoint); + return PN_REJECTED; + } + + // + // body[0] == fully qualified address (string) + // body[1] == direction (bool, true == receiver) + // + + qd_iterator_t *addr_i = qd_parse_raw(qd_parse_sub_value(body, 0)); + qd_direction_t dir = (qd_parse_as_bool(qd_parse_sub_value(body, 1)) + ? QD_INCOMING : QD_OUTGOING); + + bool is_link_route = false; + bool has_destinations = false; + qdr_address_t *addr = 0; + qd_iterator_reset_view(addr_i, ITER_VIEW_ALL); + qd_parse_tree_retrieve_match(_server_state.core->link_route_tree[dir], addr_i, (void**) &addr); + if (addr) { + is_link_route = true; + has_destinations = !!(DEQ_SIZE(addr->conns) || DEQ_SIZE(addr->rlinks) || qd_bitmask_cardinality(addr->rnodes)); + } + + // out_body[0] == is_link_route (bool) + // out_body[1] == has_destinations (bool) + + qd_composed_field_t *out_body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); + qd_compose_start_list(out_body); + qd_compose_insert_bool(out_body, is_link_route); + qd_compose_insert_bool(out_body, has_destinations); + qd_compose_end_list(out_body); + + uint64_t rc = _send_reply(epr, + OPCODE_LINK_ROUTE_LOOKUP, + addr ? QCM_ADDR_LOOKUP_OK : QCM_ADDR_LOOKUP_NOT_FOUND, + cid, + reply_to, + out_body); + qd_compose_free(out_body); + + if (qd_log_enabled(_server_state.core->log, QD_LOG_TRACE)) { + char *as = (char *)qd_iterator_copy(addr_i); + qd_log(_server_state.core->log, QD_LOG_TRACE, + "Link route address lookup on %s - %sfound is link route=%s has_destinations=%s" + " (container=%s, endpoint=%p)", + as, + (addr) ? "" : "not ", + is_link_route ? "yes" : "no", + has_destinations ? "yes" : "no", + epr->container_id, + (void *)epr->endpoint); + free(as); + } + return rc; +} + + +/* handle lookup request from client + */ +void _on_transfer(void *link_context, + qdr_delivery_t *delivery, + qd_message_t *message) +{ + if (!qd_message_receive_complete(message)) + return; + + _endpoint_ref_t *epr = (_endpoint_ref_t *)link_context; + qd_log(_server_state.core->log, QD_LOG_TRACE, + "Address lookup request received (container=%s, endpoint=%p)", + epr->container_id, (void *)epr->endpoint); + + uint64_t disposition = PN_ACCEPTED; + qd_iterator_t *p_iter = qd_message_field_iterator(message, QD_FIELD_APPLICATION_PROPERTIES); + qd_parsed_field_t *props = qd_parse(p_iter); + if (!props || !qd_parse_ok(props) || !qd_parse_is_map(props)) { + qd_log(_server_state.core->log, QD_LOG_ERROR, + "Invalid address lookup request - no properties (container=%s, endpoint=%p)", + epr->container_id, (void *)epr->endpoint); + disposition = PN_REJECTED; + goto exit; + } + + qd_parsed_field_t *v = qd_parse_value_by_key(props, "version"); + if (!v) { + qd_log(_server_state.core->log, QD_LOG_ERROR, + "Invalid address lookup request - no version (container=%s, endpoint=%p)", + epr->container_id, (void *)epr->endpoint); + disposition = PN_REJECTED; + goto exit; + } + + uint32_t version = qd_parse_as_uint(v); + if (!qd_parse_ok(v)) { + qd_log(_server_state.core->log, QD_LOG_ERROR, + "Invalid address lookup request - invalid version (container=%s, endpoint=%p)", + epr->container_id, (void *)epr->endpoint); + disposition = PN_REJECTED; + goto exit; + } + + if (version != PROTOCOL_VERSION) { + qd_log(_server_state.core->log, QD_LOG_ERROR, + "Invalid address lookup request - unknown version" + " (container=%s, endpoint=%p, version=%"PRIu32")", + epr->container_id, (void *)epr->endpoint, version); + disposition = PN_REJECTED; + goto exit; + // @TODO(kgiusti) send reply with status QCM_ADDR_LOOKUP_BAD_VERSION + } + + address_lookup_opcode_t opcode = _decode_opcode(props); + switch (opcode) { + case OPCODE_LINK_ROUTE_LOOKUP: { + qd_iterator_t *b_iter = qd_message_field_iterator(message, QD_FIELD_BODY); + qd_parsed_field_t *body = qd_parse(b_iter); + qd_iterator_t *reply_to = qd_message_field_iterator_typed(message, QD_FIELD_REPLY_TO); + qd_iterator_t *cid = qd_message_field_iterator_typed(message, QD_FIELD_CORRELATION_ID); + disposition = _do_link_route_lookup(epr, body, reply_to, cid); + qd_iterator_free(cid); + qd_iterator_free(reply_to); + qd_parse_free(body); + qd_iterator_free(b_iter); + break; + } + case OPCODE_INVALID: + default: + qd_log(_server_state.core->log, QD_LOG_ERROR, + "Invalid address lookup request - invalid opcode" + " (container=%s, endpoint=%p, opcode=%d)", + epr->container_id, (void *)epr->endpoint, opcode); + disposition = PN_REJECTED; + } + +exit: + qd_parse_free(props); + qd_iterator_free(p_iter); + qdrc_endpoint_settle_CT(_server_state.core, delivery, disposition); + qdrc_endpoint_flow_CT(_server_state.core, epr->endpoint, 1, false); + return; +} + + +/* handle incoming attach to address lookup service + */ +static void _on_first_attach(void *bind_context, + qdrc_endpoint_t *endpoint, + void **link_context, + qdr_terminus_t *remote_source, + qdr_terminus_t *remote_target) +{ + // + // Only accept incoming links initiated by the edge router. Detach all + // other links + // + qdr_connection_t *conn = qdrc_endpoint_get_connection_CT(endpoint); + if (qdrc_endpoint_get_direction_CT(endpoint) != QD_INCOMING || + conn->role != QDR_ROLE_EDGE_CONNECTION) { + *link_context = 0; + qdrc_endpoint_detach_CT(_server_state.core, endpoint, 0); + qd_log(_server_state.core->log, QD_LOG_ERROR, + "Attempt to attach to address lookup server rejected (container=%s)", + (conn->connection_info) ? conn->connection_info->container : "<unknown>"); + return; + } + + _endpoint_ref_t *epr = new__endpoint_ref_t(); + ZERO(epr); + epr->endpoint = endpoint; + epr->container_id = (conn->connection_info) ? conn->connection_info->container : "<unknown>"; + DEQ_INSERT_TAIL(_server_state.endpoints, epr); + *link_context = epr; + qdrc_endpoint_second_attach_CT(_server_state.core, endpoint, remote_source, remote_target); + qdrc_endpoint_flow_CT(_server_state.core, endpoint, 1, false); + + qd_log(_server_state.core->log, QD_LOG_TRACE, + "Client attached to address lookup server (container=%s, endpoint=%p)", + epr->container_id, (void *)endpoint); +} + + +/* handle incoming detach from client + */ +static void _on_first_detach(void *link_context, + qdr_error_t *error) +{ + _endpoint_ref_t *epr = (_endpoint_ref_t *)link_context; + qdrc_endpoint_detach_CT(_server_state.core, epr->endpoint, 0); + DEQ_REMOVE(_server_state.endpoints, epr); + qd_log(_server_state.core->log, QD_LOG_TRACE, + "Client detached from address lookup server (container=%s, endpoint=%p)", + epr->container_id, (void *)epr->endpoint); + free__endpoint_ref_t(epr); +} + + +static qdrc_endpoint_desc_t _endpoint_handlers = +{ + .label = "address lookup", + .on_first_attach = _on_first_attach, + .on_transfer = _on_transfer, + .on_first_detach = _on_first_detach, +}; + + +static void _address_lookup_init_CT(qdr_core_t *core, void **module_context) +{ + // + // Address resolution service is provided by interior routers only + // + if (core->router_mode != QD_ROUTER_MODE_INTERIOR) + return; + + _server_state.core = core; + + // + // Handle any incoming links to the QD_TERMINUS_ADDRESS_LOOKUP address + // + qdrc_endpoint_bind_mobile_address_CT(core, + QD_TERMINUS_ADDRESS_LOOKUP, + '0', // phase + &_endpoint_handlers, + &_server_state); + *module_context = &_server_state; +} + + +static void _address_lookup_final_CT(void *module_context) +{ + _endpoint_ref_t *epr = DEQ_HEAD(_server_state.endpoints); + while (epr) { + DEQ_REMOVE_HEAD(_server_state.endpoints); + free__endpoint_ref_t(epr); + epr = DEQ_HEAD(_server_state.endpoints); + } +} + + +QDR_CORE_MODULE_DECLARE("address lookup", _address_lookup_init_CT, _address_lookup_final_CT) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/src/router_core/modules/address_lookup/address_lookup.h ---------------------------------------------------------------------- diff --git a/src/router_core/modules/address_lookup/address_lookup.h b/src/router_core/modules/address_lookup/address_lookup.h new file mode 100644 index 0000000..9f8b65e --- /dev/null +++ b/src/router_core/modules/address_lookup/address_lookup.h @@ -0,0 +1,75 @@ +#ifndef router_core_address_lookup_h +#define router_core_address_lookup_h 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/iterator.h> +#include <qpid/dispatch/container.h> +#include <qpid/dispatch/compose.h> +// +// API for building address lookup request messages. The message properties +// and body fields are handled separately so they can be passed directly to the +// core client API. +// + +typedef enum { + // note: keep unit test in sync + QCM_ADDR_LOOKUP_OK, + QCM_ADDR_LOOKUP_BAD_VERSION, + QCM_ADDR_LOOKUP_BAD_OPCODE, + QCM_ADDR_LOOKUP_NOT_FOUND, + QCM_ADDR_LOOKUP_INVALID_REQUEST, +} qcm_address_lookup_status_t; + + +/** + * Create the message properties and body for a link route address lookup. The + * returned properties and body can be passed directly to + * qdrc_client_request_CT(). + * + * @param address - fully qualified link route address to look up. + * @param dir - QD_INCOMING or QD_OUTGOING + * @param properties - return value for message application properties section + * @param body - return value for message body + * @return zero on success + */ +int qcm_link_route_lookup_request(qd_iterator_t *address, + qd_direction_t dir, + qd_composed_field_t **properties, + qd_composed_field_t **body); + + +/** + * Parse out the payload of the link route lookup reply message. The + * properties and body fields are provided by the on_reply_cb() callback passed + * to the qdrc_client_request_CT() call. + * + * @param properties - application properties as returned in the reply + * @param body - body from reply message + * @param is_link_route - set True if the address is a link route address that + * exists in the route tables of the queried router. + * @param has_destination - if is_link_route this indicates whether or not the + * queried router has active destinations for this link route. + * @return QCM_ADDR_LOOKUP_OK if query is successful, else and error code + */ +qcm_address_lookup_status_t qcm_link_route_lookup_decode(qd_iterator_t *properties, + qd_iterator_t *body, + bool *is_link_route, + bool *has_destinations); +#endif // router_core_address_lookup_h http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 91d0016..0b70c97 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -857,7 +857,7 @@ void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *delivery); bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *delivery); void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *delivery, const char *label); void qdr_forward_on_message_CT(qdr_core_t *core, qdr_subscription_t *sub, qdr_link_t *link, qd_message_t *msg); - +void qdr_in_process_send_to_CT(qdr_core_t *core, qd_iterator_t *address, qd_message_t *msg, bool exclude_inprocess, bool control); /** * Links the in_dlv to the out_dlv and increments ref counts of both deliveries */ http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 6d9206d..1cce8de 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -1061,27 +1061,36 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { - qdr_field_t *addr_field = action->args.io.address; - qd_message_t *msg = action->args.io.message; - if (!discard) { - qdr_address_t *addr = 0; - - qd_iterator_reset_view(addr_field->iterator, ITER_VIEW_ADDRESS_HASH); - qd_hash_retrieve(core->addr_hash, addr_field->iterator, (void**) &addr); - if (addr) { - // - // Forward the message. We don't care what the fanout count is. - // - (void) qdr_forward_message_CT(core, addr, msg, 0, action->args.io.exclude_inprocess, - action->args.io.control); - addr->deliveries_from_container++; - } else - qd_log(core->log, QD_LOG_DEBUG, "In-process send to an unknown address"); + qdr_in_process_send_to_CT(core, + qdr_field_iterator(action->args.io.address), + action->args.io.message, + action->args.io.exclude_inprocess, + action->args.io.control); } - qdr_field_free(addr_field); - qd_message_free(msg); + qdr_field_free(action->args.io.address); + qd_message_free(action->args.io.message); +} + + +/** + * forward an in-process message based on the destination address + */ +void qdr_in_process_send_to_CT(qdr_core_t *core, qd_iterator_t *address, qd_message_t *msg, bool exclude_inprocess, bool control) +{ + qdr_address_t *addr = 0; + + qd_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH); + qd_hash_retrieve(core->addr_hash, address, (void**) &addr); + if (addr) { + // + // Forward the message. We don't care what the fanout count is. + // + (void) qdr_forward_message_CT(core, addr, msg, 0, exclude_inprocess, control); + addr->deliveries_from_container++; + } else + qd_log(core->log, QD_LOG_DEBUG, "In-process send to an unknown address"); } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 51dc438..45206e8 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -123,6 +123,7 @@ foreach(py_test_module ${CONSOLE_TEST} system_tests_priority system_tests_core_client + system_tests_address_lookup ) add_test(${py_test_module} ${TEST_WRAP} -x unit2 -v ${py_test_module}) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/tests/system_test.py ---------------------------------------------------------------------- diff --git a/tests/system_test.py b/tests/system_test.py index d268671..d4cd362 100755 --- a/tests/system_test.py +++ b/tests/system_test.py @@ -436,11 +436,12 @@ class Qdrouterd(Process): except: return False - def wait_address(self, address, subscribers=0, remotes=0, **retry_kwargs): + def wait_address(self, address, subscribers=0, remotes=0, containers=0, **retry_kwargs): """ Wait for an address to be visible on the router. @keyword subscribers: Wait till subscriberCount >= subscribers @keyword remotes: Wait till remoteCount >= remotes + @keyword containers: Wait till containerCount >= remotes @param retry_kwargs: keyword args for L{retry} """ def check(): @@ -449,11 +450,13 @@ class Qdrouterd(Process): # endswith check is because of M0/L/R prefixes addrs = self.management.query( type='org.apache.qpid.dispatch.router.address', - attribute_names=[u'name', u'subscriberCount', u'remoteCount']).get_entities() + attribute_names=[u'name', u'subscriberCount', u'remoteCount', u'containerCount']).get_entities() addrs = [a for a in addrs if a['name'].endswith(address)] - return addrs and addrs[0]['subscriberCount'] >= subscribers and addrs[0]['remoteCount'] >= remotes + return (addrs and addrs[0]['subscriberCount'] >= subscribers + and addrs[0]['remoteCount'] >= remotes + and addrs[0]['containerCount'] >= containers) assert retry(check, **retry_kwargs) def get_host(self, protocol_family): http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/tests/system_tests_address_lookup.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_address_lookup.py b/tests/system_tests_address_lookup.py new file mode 100644 index 0000000..3538f93 --- /dev/null +++ b/tests/system_tests_address_lookup.py @@ -0,0 +1,287 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import unicode_literals +from __future__ import division +from __future__ import absolute_import +from __future__ import print_function + +import unittest2 as unittest +import time + +from system_test import TestCase, Qdrouterd, TIMEOUT +from system_test import AsyncTestReceiver +from test_broker import FakeBroker + +from proton import Disposition +from proton import Message +from proton.utils import BlockingConnection +from proton.utils import SyncRequestResponse +from proton.utils import SendException +from proton.utils import LinkDetached + + +class LinkRouteLookupTest(TestCase): + """ + Tests link route address lookup + """ + # hardcoded values from the router's C code + QD_TERMINUS_ADDRESS_LOOKUP = '_$qd.addr_lookup' + PROTOCOL_VERSION = 1 + OPCODE_LINK_ROUTE_LOOKUP = 1 + QCM_ADDR_LOOKUP_OK = 0 + QCM_ADDR_LOOKUP_NOT_FOUND = 3 + + def _check_response(self, message): + self.assertTrue(isinstance(message.properties, dict)) + self.assertEqual(self.PROTOCOL_VERSION, message.properties.get('version')) + self.assertTrue(message.properties.get('opcode') is not None) + self.assertTrue(isinstance(message.body, list)) + self.assertEqual(2, len(message.body)) + return (message.properties.get('status'), + message.body[0], # is link_route? + message.body[1]) # has destinations? + + @classmethod + def setUpClass(cls): + """Start a router""" + super(LinkRouteLookupTest, cls).setUpClass() + + def router(name, mode, extra=None): + config = [ + ('router', {'mode': mode, 'id': name}), + ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': cls.tester.get_port(), 'saslMechanisms': 'ANONYMOUS'}) + ] + + if extra: + config.extend(extra) + config = Qdrouterd.Config(config) + cls.routers.append(cls.tester.qdrouterd(name, config, wait=False)) + + cls.routers = [] + + inter_router_port = cls.tester.get_port() + edge_port_A = cls.tester.get_port() + broker_port_A = cls.tester.get_port() + broker_port_B = cls.tester.get_port() + + router('INT.A', 'interior', + [ + ('listener', {'role': 'edge', + 'port': cls.tester.get_port()}), + ('listener', {'role': 'inter-router', + 'port': inter_router_port}), + ('connector', {'role': 'route-container', + 'port': cls.tester.get_port()}), + + ('linkRoute', {'pattern': 'org.apache.A.#', + 'containerId': 'FakeBrokerA', + 'direction': 'in'}), + ('linkRoute', {'pattern': 'org.apache.A.#', + 'containerId': 'FakeBrokerA', + 'direction': 'out'}) + ]) + cls.INT_A = cls.routers[-1] + cls.INT_A.listener = cls.INT_A.addresses[0] + cls.INT_A.edge_listener = cls.INT_A.addresses[1] + cls.INT_A.inter_router_listener = cls.INT_A.addresses[2] + cls.INT_A.broker_connector = cls.INT_A.connector_addresses[0] + + router('INT.B', 'interior', + [ + ('listener', {'role': 'edge', + 'port': cls.tester.get_port()}), + ('connector', {'role': 'inter-router', + 'name': 'connectorToA', + 'port': inter_router_port}), + ('connector', {'role': 'route-container', + 'port': cls.tester.get_port()}), + + ('linkRoute', {'pattern': 'org.apache.B.#', + 'containerId': 'FakeBrokerB', + 'direction': 'in'}), + ('linkRoute', {'pattern': 'org.apache.B.#', + 'containerId': 'FakeBrokerB', + 'direction': 'out'}) + ]) + cls.INT_B = cls.routers[-1] + cls.INT_B.edge_listener = cls.INT_B.addresses[1] + cls.INT_B.broker_connector = cls.INT_B.connector_addresses[1] + + cls.INT_A.wait_router_connected('INT.B') + cls.INT_B.wait_router_connected('INT.A') + + def _lookup_request(self, lr_address, direction): + """ + Construct a link route lookup request message + """ + return Message(body=[lr_address, + direction], + properties={"version": self.PROTOCOL_VERSION, + "opcode": self.OPCODE_LINK_ROUTE_LOOKUP}) + + def test_link_route_lookup_ok(self): + """ + verify a link route address can be looked up successfully for both + locally attached and remotely attached containers + """ + + # fire up a fake broker attached to the router local to the edge router + fb = FakeBroker(self.INT_A.broker_connector, container_id='FakeBrokerA') + self.INT_A.wait_address("org.apache.A", containers=1) + + # create a fake edge and lookup the target address + bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT) + srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP) + + for direction in [True, False]: + # True = direction inbound (receiver) False = direction outbound (sender) + rsp = self._check_response(srr.call(self._lookup_request("org.apache.A.foo", direction))) + self.assertEqual(self.QCM_ADDR_LOOKUP_OK, rsp[0]) + self.assertTrue(rsp[1]) + self.assertTrue(rsp[2]) + + # shutdown fake router + fb.join() + + # now fire up a fake broker attached to the remote router + fb = FakeBroker(self.INT_B.broker_connector, container_id='FakeBrokerB') + self.INT_A.wait_address("org.apache.B", remotes=1) + + for direction in [True, False]: + rsp = self._check_response(srr.call(self._lookup_request("org.apache.B.foo", direction))) + self.assertEqual(self.QCM_ADDR_LOOKUP_OK, rsp[0]) + self.assertTrue(rsp[1]) + self.assertTrue(rsp[2]) + + fb.join() + bc.close() + + def test_link_route_lookup_not_found(self): + """ + verify correct handling of lookup misses + """ + bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT) + srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP) + + rsp = self._check_response(srr.call(self._lookup_request("not.found.address", True))) + self.assertEqual(self.QCM_ADDR_LOOKUP_NOT_FOUND, rsp[0]) + + def test_link_route_lookup_not_link_route(self): + """ + verify correct handling of matches to mobile addresses + """ + addr = "not.a.linkroute" + client = AsyncTestReceiver(self.INT_A.listener, addr) + self.INT_A.wait_address(addr) + + bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT) + srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP) + + rsp = self._check_response(srr.call(self._lookup_request(addr, True))) + # self.assertEqual(self.QCM_ADDR_LOOKUP_OK, rsp[0]) + self.assertEqual(False, rsp[1]) + bc.close() + client.stop() + + def test_link_route_lookup_no_dest(self): + """ + verify correct handling of matches to mobile addresses + """ + bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT) + srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP) + rsp = self._check_response(srr.call(self._lookup_request("org.apache.A.nope", True))) + self.assertEqual(self.QCM_ADDR_LOOKUP_OK, rsp[0]) + self.assertEqual(True, rsp[1]) + self.assertEqual(False, rsp[2]) + bc.close() + + def _invalid_request_test(self, msg, disposition=Disposition.REJECTED): + bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT) + srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP) + # @TODO(kgiusti) - self.assertRaises does not work here: + try: + srr.call(msg) + self.assertTrue(False) + except SendException as exc: + self.assertEqual(disposition, exc.state) + bc.close() + + def test_link_route_invalid_request(self): + """ + Test various invalid message content + """ + + # empty message + self._invalid_request_test(Message()) + + # missing properties + msg = self._lookup_request("ignore", False) + msg.properties = None + self._invalid_request_test(msg) + + # missing version + msg = self._lookup_request("ignore", False) + del msg.properties['version'] + self._invalid_request_test(msg) + + # invalid version + msg = self._lookup_request("ignore", False) + msg.properties['version'] = "blech" + self._invalid_request_test(msg) + + # unsupported version + msg = self._lookup_request("ignore", False) + msg.properties['version'] = 97387187 + self._invalid_request_test(msg) + + # missing opcode + msg = self._lookup_request("ignore", False) + del msg.properties['opcode'] + self._invalid_request_test(msg) + + # bad opcode + msg = self._lookup_request("ignore", False) + msg.properties['opcode'] = "snarf" + self._invalid_request_test(msg) + + # bad body + msg = self._lookup_request("ignore", False) + msg.body = [71] + self._invalid_request_test(msg) + + def test_lookup_bad_connection(self): + """ + Verify that clients connected via non-edge connections fail + """ + bc = BlockingConnection(self.INT_A.listener, timeout=TIMEOUT) + with self.assertRaises(LinkDetached): + SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP) + bc.close() + + bc = BlockingConnection(self.INT_A.inter_router_listener, timeout=TIMEOUT) + with self.assertRaises(LinkDetached): + SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP) + bc.close() + + # consuming from the lookup address is forbidden: + bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT) + with self.assertRaises(LinkDetached): + rcv = bc.create_receiver(self.QD_TERMINUS_ADDRESS_LOOKUP) + bc.close() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
