Repository: qpid-dispatch Updated Branches: refs/heads/master 564c5907c -> d965d9b1d
DISPATCH-1194: re-arrange the address lookup server source code Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/d965d9b1 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/d965d9b1 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/d965d9b1 Branch: refs/heads/master Commit: d965d9b1df8c83ba8f59512886b50b203666e33f Parents: 564c590 Author: Kenneth Giusti <[email protected]> Authored: Tue Dec 4 11:56:13 2018 -0500 Committer: Kenneth Giusti <[email protected]> Committed: Tue Dec 4 11:56:13 2018 -0500 ---------------------------------------------------------------------- include/qpid/dispatch/address_lookup_server.h | 83 ++++ src/CMakeLists.txt | 3 +- src/address_lookup_utils.c | 110 +++++ .../modules/address_lookup/address_lookup.c | 454 ------------------- .../modules/address_lookup/address_lookup.h | 75 --- .../address_lookup_server.c | 360 +++++++++++++++ 6 files changed, 555 insertions(+), 530 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d965d9b1/include/qpid/dispatch/address_lookup_server.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/address_lookup_server.h b/include/qpid/dispatch/address_lookup_server.h new file mode 100644 index 0000000..33a6782 --- /dev/null +++ b/include/qpid/dispatch/address_lookup_server.h @@ -0,0 +1,83 @@ +#ifndef router_core_address_lookup_server_h +#define router_core_address_lookup__server_h 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/iterator.h> +#include <qpid/dispatch/container.h> +#include <qpid/dispatch/compose.h> +// +// API for building address lookup request messages. The message properties +// and body fields are handled separately so they can be passed directly to the +// core client API. +// + +#define PROTOCOL_VERSION 1 + +typedef enum { + // note: keep unit test in sync + OPCODE_INVALID, + OPCODE_LINK_ROUTE_LOOKUP, +} address_lookup_opcode_t; + +typedef enum { + // note: keep unit test in sync + QCM_ADDR_LOOKUP_OK, + QCM_ADDR_LOOKUP_BAD_VERSION, + QCM_ADDR_LOOKUP_BAD_OPCODE, + QCM_ADDR_LOOKUP_NOT_FOUND, + QCM_ADDR_LOOKUP_INVALID_REQUEST, +} qcm_address_lookup_status_t; + + +/** + * Create the message properties and body for a link route address lookup. The + * returned properties and body can be passed directly to + * qdrc_client_request_CT(). + * + * @param address - fully qualified link route address to look up. + * @param dir - QD_INCOMING or QD_OUTGOING + * @param properties - return value for message application properties section + * @param body - return value for message body + * @return zero on success + */ +int qcm_link_route_lookup_request(qd_iterator_t *address, + qd_direction_t dir, + qd_composed_field_t **properties, + qd_composed_field_t **body); + + +/** + * Parse out the payload of the link route lookup reply message. The + * properties and body fields are provided by the on_reply_cb() callback passed + * to the qdrc_client_request_CT() call. + * + * @param properties - application properties as returned in the reply + * @param body - body from reply message + * @param is_link_route - set True if the address is a link route address that + * exists in the route tables of the queried router. + * @param has_destination - if is_link_route this indicates whether or not the + * queried router has active destinations for this link route. + * @return QCM_ADDR_LOOKUP_OK if query is successful, else and error code + */ +qcm_address_lookup_status_t qcm_link_route_lookup_decode(qd_iterator_t *properties, + qd_iterator_t *body, + bool *is_link_route, + bool *has_destinations); +#endif // router_core_address_lookup_server_h http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d965d9b1/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 8fc6485..1544db9 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -78,6 +78,7 @@ set(qpid_dispatch_SOURCES python_embedded.c router_agent.c router_config.c + address_lookup_utils.c router_core/agent.c router_core/agent_address.c router_core/agent_config_address.c @@ -109,7 +110,7 @@ set(qpid_dispatch_SOURCES router_core/modules/edge_router/edge_mgmt.c router_core/modules/test_hooks/core_test_hooks.c router_core/modules/edge_addr_tracking/edge_addr_tracking.c - router_core/modules/address_lookup/address_lookup.c + router_core/modules/address_lookup_server/address_lookup_server.c router_core/modules/address_lookup_client/lookup_client.c router_node.c router_pynode.c http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d965d9b1/src/address_lookup_utils.c ---------------------------------------------------------------------- diff --git a/src/address_lookup_utils.c b/src/address_lookup_utils.c new file mode 100644 index 0000000..4c92921 --- /dev/null +++ b/src/address_lookup_utils.c @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// +// API for interacting with the core address lookup server +// + +#include <qpid/dispatch/address_lookup_server.h> +#include <qpid/dispatch/amqp.h> +#include <qpid/dispatch/parse.h> + +/* create the message application properties and body for the link route lookup + * request message + */ +int qcm_link_route_lookup_msg(qd_iterator_t *address, + qd_direction_t dir, + qd_composed_field_t **properties, + qd_composed_field_t **body) +{ + *properties = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0); + if (!*properties) + return -1; + qd_compose_start_map(*properties); + qd_compose_insert_string(*properties, "version"); + qd_compose_insert_uint(*properties, PROTOCOL_VERSION); + qd_compose_insert_string(*properties, "opcode"); + qd_compose_insert_uint(*properties, OPCODE_LINK_ROUTE_LOOKUP); + qd_compose_end_map(*properties); + + *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); + if (!*body) { + qd_compose_free(*properties); + *properties = 0; + return -1; + } + qd_compose_start_list(*body); + qd_compose_insert_string_iterator(*body, address); + qd_compose_insert_bool(*body, (dir == QD_INCOMING + ? QD_AMQP_LINK_ROLE_RECEIVER + : QD_AMQP_LINK_ROLE_SENDER)); + qd_compose_end_list(*body); + return 0; +} + + +/* parse a reply to the link route lookup request + */ +qcm_address_lookup_status_t qcm_link_route_lookup_decode(qd_iterator_t *properties, + qd_iterator_t *body, + bool *is_link_route, + bool *has_destinations) +{ + qcm_address_lookup_status_t rc = QCM_ADDR_LOOKUP_OK; + *is_link_route = false; + *has_destinations = false; + + qd_parsed_field_t *props = qd_parse(properties); + if (!props || !qd_parse_ok(props) || !qd_parse_is_map(props)) + return QCM_ADDR_LOOKUP_INVALID_REQUEST; + + qd_parsed_field_t *bod = qd_parse(body); + if (!bod || !qd_parse_ok(bod) || !qd_parse_is_list(bod)) { + qd_parse_free(props); + return QCM_ADDR_LOOKUP_INVALID_REQUEST; + } + + qd_parsed_field_t *tmp = qd_parse_value_by_key(props, "status"); + if (!tmp || !qd_parse_is_scalar(tmp)) { + rc = QCM_ADDR_LOOKUP_INVALID_REQUEST; + goto exit; + } else { + int32_t status = qd_parse_as_int(tmp); + if (status != QCM_ADDR_LOOKUP_OK) { + rc = (qcm_address_lookup_status_t) status; + goto exit; + } + } + + // bod[0] == is_link_route (bool) + // bod[1] == has_destinations (bool) + + if (qd_parse_sub_count(bod) < 2) { + rc = QCM_ADDR_LOOKUP_INVALID_REQUEST; + goto exit; + } + + *is_link_route = qd_parse_as_bool(qd_parse_sub_value(bod, 0)); + *has_destinations = qd_parse_as_bool(qd_parse_sub_value(bod, 1)); + +exit: + qd_parse_free(props); + qd_parse_free(bod); + return rc; +} http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d965d9b1/src/router_core/modules/address_lookup/address_lookup.c ---------------------------------------------------------------------- diff --git a/src/router_core/modules/address_lookup/address_lookup.c b/src/router_core/modules/address_lookup/address_lookup.c deleted file mode 100644 index 93297ac..0000000 --- a/src/router_core/modules/address_lookup/address_lookup.c +++ /dev/null @@ -1,454 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "address_lookup.h" -#include <qpid/dispatch/ctools.h> -#include <qpid/dispatch/amqp.h> -#include "module.h" -#include "core_link_endpoint.h" - -#include "inttypes.h" - -#define PROTOCOL_VERSION 1 - -typedef enum { - // note: keep unit test in sync - OPCODE_INVALID, - OPCODE_LINK_ROUTE_LOOKUP, -} address_lookup_opcode_t; - - -/* create the message application properties and body for the link route lookup - * request message - */ -int qcm_link_route_lookup_msg(qd_iterator_t *address, - qd_direction_t dir, - qd_composed_field_t **properties, - qd_composed_field_t **body) -{ - *properties = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0); - if (!*properties) - return -1; - qd_compose_start_map(*properties); - qd_compose_insert_string(*properties, "version"); - qd_compose_insert_uint(*properties, PROTOCOL_VERSION); - qd_compose_insert_string(*properties, "opcode"); - qd_compose_insert_uint(*properties, OPCODE_LINK_ROUTE_LOOKUP); - qd_compose_end_map(*properties); - - *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); - if (!*body) { - qd_compose_free(*properties); - *properties = 0; - return -1; - } - qd_compose_start_list(*body); - qd_compose_insert_string_iterator(*body, address); - qd_compose_insert_bool(*body, (dir == QD_INCOMING - ? QD_AMQP_LINK_ROLE_RECEIVER - : QD_AMQP_LINK_ROLE_SENDER)); - qd_compose_end_list(*body); - return 0; -} - - -/* parse a reply to the link route lookup request - */ -qcm_address_lookup_status_t qcm_link_route_lookup_decode(qd_iterator_t *properties, - qd_iterator_t *body, - bool *is_link_route, - bool *has_destinations) -{ - qcm_address_lookup_status_t rc = QCM_ADDR_LOOKUP_OK; - *is_link_route = false; - *has_destinations = false; - - qd_parsed_field_t *props = qd_parse(properties); - if (!props || !qd_parse_ok(props) || !qd_parse_is_map(props)) - return QCM_ADDR_LOOKUP_INVALID_REQUEST; - - qd_parsed_field_t *bod = qd_parse(body); - if (!bod || !qd_parse_ok(bod) || !qd_parse_is_list(bod)) { - qd_parse_free(props); - return QCM_ADDR_LOOKUP_INVALID_REQUEST; - } - - qd_parsed_field_t *tmp = qd_parse_value_by_key(props, "status"); - if (!tmp || !qd_parse_is_scalar(tmp)) { - rc = QCM_ADDR_LOOKUP_INVALID_REQUEST; - goto exit; - } else { - int32_t status = qd_parse_as_int(tmp); - if (status != QCM_ADDR_LOOKUP_OK) { - rc = (qcm_address_lookup_status_t) status; - goto exit; - } - } - - // bod[0] == is_link_route (bool) - // bod[1] == has_destinations (bool) - - if (qd_parse_sub_count(bod) < 2) { - rc = QCM_ADDR_LOOKUP_INVALID_REQUEST; - goto exit; - } - - *is_link_route = qd_parse_as_bool(qd_parse_sub_value(bod, 0)); - *has_destinations = qd_parse_as_bool(qd_parse_sub_value(bod, 1)); - -exit: - qd_parse_free(props); - qd_parse_free(bod); - return rc; -} - - -typedef struct _endpoint_ref { - DEQ_LINKS(struct _endpoint_ref); - qdrc_endpoint_t *endpoint; - const char *container_id; -} _endpoint_ref_t; -DEQ_DECLARE(_endpoint_ref_t, _endpoint_ref_list_t); -ALLOC_DEFINE(_endpoint_ref_t); - - -static struct { - qdr_core_t *core; - _endpoint_ref_list_t endpoints; -} _server_state; - - -/* parse out the opcode from the request - */ -static address_lookup_opcode_t _decode_opcode(qd_parsed_field_t *properties) -{ - if (!properties) - return OPCODE_INVALID; - qd_parsed_field_t *oc = qd_parse_value_by_key(properties, "opcode"); - if (!oc) - return OPCODE_INVALID; - uint32_t opcode = qd_parse_as_uint(oc); - if (!qd_parse_ok(oc)) - return OPCODE_INVALID; - return (address_lookup_opcode_t)opcode; -} - - -/* send a reply to a lookup request - */ -static uint64_t _send_reply(_endpoint_ref_t *epr, - address_lookup_opcode_t opcode, - qcm_address_lookup_status_t status, - qd_iterator_t *correlation_id, - qd_iterator_t *reply_to, - qd_composed_field_t *body) -{ - if (!correlation_id || !reply_to) { - qd_log(_server_state.core->log, QD_LOG_ERROR, - "Link route address reply failed - invalid request message properties" - " (container=%s, endpoint=%p)", - epr->container_id, (void *)epr->endpoint); - return PN_REJECTED; - } - - qd_composed_field_t *fld = qd_compose(QD_PERFORMATIVE_HEADER, 0); - qd_compose_start_list(fld); - qd_compose_insert_bool(fld, 0); // durable - qd_compose_end_list(fld); - - fld = qd_compose(QD_PERFORMATIVE_PROPERTIES, fld); - qd_compose_start_list(fld); - qd_compose_insert_null(fld); // message-id - qd_compose_insert_null(fld); // user-id - qd_compose_insert_typed_iterator(fld, reply_to); // to - qd_compose_insert_null(fld); // subject - qd_compose_insert_null(fld); // reply-to - qd_compose_insert_typed_iterator(fld, correlation_id); - qd_compose_end_list(fld); - - fld = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, fld); - qd_compose_start_map(fld); - qd_compose_insert_string(fld, "version"); - qd_compose_insert_uint(fld, PROTOCOL_VERSION); - qd_compose_insert_string(fld, "opcode"); - qd_compose_insert_uint(fld, opcode); - qd_compose_insert_string(fld, "status"); - qd_compose_insert_uint(fld, status); - qd_compose_end_map(fld); - - qd_message_t *msg = qd_message(); - - qd_message_compose_3(msg, fld, body); - qdr_in_process_send_to_CT(_server_state.core, reply_to, msg, true, false); - qd_message_free(msg); - qd_compose_free(fld); - - return PN_ACCEPTED; -} - - -/* perform a link route lookup - */ -static uint64_t _do_link_route_lookup(_endpoint_ref_t *epr, - qd_parsed_field_t *body, - qd_iterator_t *reply_to, - qd_iterator_t *cid) -{ - if (!body || !qd_parse_ok(body) || qd_parse_sub_count(body) < 2) { - qd_log(_server_state.core->log, QD_LOG_ERROR, - "Link route address lookup failed - invalid request body" - " (container=%s, endpoint=%p)", - epr->container_id, (void *)epr->endpoint); - return PN_REJECTED; - } - - // - // body[0] == fully qualified address (string) - // body[1] == direction (bool, true == receiver) - // - - qd_iterator_t *addr_i = qd_parse_raw(qd_parse_sub_value(body, 0)); - qd_direction_t dir = (qd_parse_as_bool(qd_parse_sub_value(body, 1)) - ? QD_INCOMING : QD_OUTGOING); - - bool is_link_route = false; - bool has_destinations = false; - qdr_address_t *addr = 0; - qd_iterator_reset_view(addr_i, ITER_VIEW_ALL); - qd_parse_tree_retrieve_match(_server_state.core->link_route_tree[dir], addr_i, (void**) &addr); - if (addr) { - is_link_route = true; - has_destinations = !!(DEQ_SIZE(addr->conns) || DEQ_SIZE(addr->rlinks) || qd_bitmask_cardinality(addr->rnodes)); - } - - // out_body[0] == is_link_route (bool) - // out_body[1] == has_destinations (bool) - - qd_composed_field_t *out_body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); - qd_compose_start_list(out_body); - qd_compose_insert_bool(out_body, is_link_route); - qd_compose_insert_bool(out_body, has_destinations); - qd_compose_end_list(out_body); - - uint64_t rc = _send_reply(epr, - OPCODE_LINK_ROUTE_LOOKUP, - addr ? QCM_ADDR_LOOKUP_OK : QCM_ADDR_LOOKUP_NOT_FOUND, - cid, - reply_to, - out_body); - qd_compose_free(out_body); - - if (qd_log_enabled(_server_state.core->log, QD_LOG_TRACE)) { - char *as = (char *)qd_iterator_copy(addr_i); - qd_log(_server_state.core->log, QD_LOG_TRACE, - "Link route address lookup on %s - %sfound is link route=%s has_destinations=%s" - " (container=%s, endpoint=%p)", - as, - (addr) ? "" : "not ", - is_link_route ? "yes" : "no", - has_destinations ? "yes" : "no", - epr->container_id, - (void *)epr->endpoint); - free(as); - } - return rc; -} - - -/* handle lookup request from client - */ -void _on_transfer(void *link_context, - qdr_delivery_t *delivery, - qd_message_t *message) -{ - if (!qd_message_receive_complete(message)) - return; - - _endpoint_ref_t *epr = (_endpoint_ref_t *)link_context; - qd_log(_server_state.core->log, QD_LOG_TRACE, - "Address lookup request received (container=%s, endpoint=%p)", - epr->container_id, (void *)epr->endpoint); - - uint64_t disposition = PN_ACCEPTED; - qd_iterator_t *p_iter = qd_message_field_iterator(message, QD_FIELD_APPLICATION_PROPERTIES); - qd_parsed_field_t *props = qd_parse(p_iter); - if (!props || !qd_parse_ok(props) || !qd_parse_is_map(props)) { - qd_log(_server_state.core->log, QD_LOG_ERROR, - "Invalid address lookup request - no properties (container=%s, endpoint=%p)", - epr->container_id, (void *)epr->endpoint); - disposition = PN_REJECTED; - goto exit; - } - - qd_parsed_field_t *v = qd_parse_value_by_key(props, "version"); - if (!v) { - qd_log(_server_state.core->log, QD_LOG_ERROR, - "Invalid address lookup request - no version (container=%s, endpoint=%p)", - epr->container_id, (void *)epr->endpoint); - disposition = PN_REJECTED; - goto exit; - } - - uint32_t version = qd_parse_as_uint(v); - if (!qd_parse_ok(v)) { - qd_log(_server_state.core->log, QD_LOG_ERROR, - "Invalid address lookup request - invalid version (container=%s, endpoint=%p)", - epr->container_id, (void *)epr->endpoint); - disposition = PN_REJECTED; - goto exit; - } - - if (version != PROTOCOL_VERSION) { - qd_log(_server_state.core->log, QD_LOG_ERROR, - "Invalid address lookup request - unknown version" - " (container=%s, endpoint=%p, version=%"PRIu32")", - epr->container_id, (void *)epr->endpoint, version); - disposition = PN_REJECTED; - goto exit; - // @TODO(kgiusti) send reply with status QCM_ADDR_LOOKUP_BAD_VERSION - } - - address_lookup_opcode_t opcode = _decode_opcode(props); - switch (opcode) { - case OPCODE_LINK_ROUTE_LOOKUP: { - qd_iterator_t *b_iter = qd_message_field_iterator(message, QD_FIELD_BODY); - qd_parsed_field_t *body = qd_parse(b_iter); - qd_iterator_t *reply_to = qd_message_field_iterator_typed(message, QD_FIELD_REPLY_TO); - qd_iterator_t *cid = qd_message_field_iterator_typed(message, QD_FIELD_CORRELATION_ID); - disposition = _do_link_route_lookup(epr, body, reply_to, cid); - qd_iterator_free(cid); - qd_iterator_free(reply_to); - qd_parse_free(body); - qd_iterator_free(b_iter); - break; - } - case OPCODE_INVALID: - default: - qd_log(_server_state.core->log, QD_LOG_ERROR, - "Invalid address lookup request - invalid opcode" - " (container=%s, endpoint=%p, opcode=%d)", - epr->container_id, (void *)epr->endpoint, opcode); - disposition = PN_REJECTED; - } - -exit: - qd_parse_free(props); - qd_iterator_free(p_iter); - qdrc_endpoint_settle_CT(_server_state.core, delivery, disposition); - qdrc_endpoint_flow_CT(_server_state.core, epr->endpoint, 1, false); - return; -} - - -/* handle incoming attach to address lookup service - */ -static void _on_first_attach(void *bind_context, - qdrc_endpoint_t *endpoint, - void **link_context, - qdr_terminus_t *remote_source, - qdr_terminus_t *remote_target) -{ - // - // Only accept incoming links initiated by the edge router. Detach all - // other links - // - qdr_connection_t *conn = qdrc_endpoint_get_connection_CT(endpoint); - if (qdrc_endpoint_get_direction_CT(endpoint) != QD_INCOMING || - conn->role != QDR_ROLE_EDGE_CONNECTION) { - *link_context = 0; - qdrc_endpoint_detach_CT(_server_state.core, endpoint, 0); - qd_log(_server_state.core->log, QD_LOG_ERROR, - "Attempt to attach to address lookup server rejected (container=%s)", - (conn->connection_info) ? conn->connection_info->container : "<unknown>"); - return; - } - - _endpoint_ref_t *epr = new__endpoint_ref_t(); - ZERO(epr); - epr->endpoint = endpoint; - epr->container_id = (conn->connection_info) ? conn->connection_info->container : "<unknown>"; - DEQ_INSERT_TAIL(_server_state.endpoints, epr); - *link_context = epr; - qdrc_endpoint_second_attach_CT(_server_state.core, endpoint, remote_source, remote_target); - qdrc_endpoint_flow_CT(_server_state.core, endpoint, 1, false); - - qd_log(_server_state.core->log, QD_LOG_TRACE, - "Client attached to address lookup server (container=%s, endpoint=%p)", - epr->container_id, (void *)endpoint); -} - - -/* handle incoming detach from client - */ -static void _on_first_detach(void *link_context, - qdr_error_t *error) -{ - _endpoint_ref_t *epr = (_endpoint_ref_t *)link_context; - qdrc_endpoint_detach_CT(_server_state.core, epr->endpoint, 0); - DEQ_REMOVE(_server_state.endpoints, epr); - qd_log(_server_state.core->log, QD_LOG_TRACE, - "Client detached from address lookup server (container=%s, endpoint=%p)", - epr->container_id, (void *)epr->endpoint); - free__endpoint_ref_t(epr); -} - - -static qdrc_endpoint_desc_t _endpoint_handlers = -{ - .label = "address lookup", - .on_first_attach = _on_first_attach, - .on_transfer = _on_transfer, - .on_first_detach = _on_first_detach, -}; - - -static void _address_lookup_init_CT(qdr_core_t *core, void **module_context) -{ - // - // Address resolution service is provided by interior routers only - // - if (core->router_mode != QD_ROUTER_MODE_INTERIOR) - return; - - _server_state.core = core; - - // - // Handle any incoming links to the QD_TERMINUS_ADDRESS_LOOKUP address - // - qdrc_endpoint_bind_mobile_address_CT(core, - QD_TERMINUS_ADDRESS_LOOKUP, - '0', // phase - &_endpoint_handlers, - &_server_state); - *module_context = &_server_state; -} - - -static void _address_lookup_final_CT(void *module_context) -{ - _endpoint_ref_t *epr = DEQ_HEAD(_server_state.endpoints); - while (epr) { - DEQ_REMOVE_HEAD(_server_state.endpoints); - free__endpoint_ref_t(epr); - epr = DEQ_HEAD(_server_state.endpoints); - } -} - - -QDR_CORE_MODULE_DECLARE("address lookup", _address_lookup_init_CT, _address_lookup_final_CT) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d965d9b1/src/router_core/modules/address_lookup/address_lookup.h ---------------------------------------------------------------------- diff --git a/src/router_core/modules/address_lookup/address_lookup.h b/src/router_core/modules/address_lookup/address_lookup.h deleted file mode 100644 index 9f8b65e..0000000 --- a/src/router_core/modules/address_lookup/address_lookup.h +++ /dev/null @@ -1,75 +0,0 @@ -#ifndef router_core_address_lookup_h -#define router_core_address_lookup_h 1 -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/iterator.h> -#include <qpid/dispatch/container.h> -#include <qpid/dispatch/compose.h> -// -// API for building address lookup request messages. The message properties -// and body fields are handled separately so they can be passed directly to the -// core client API. -// - -typedef enum { - // note: keep unit test in sync - QCM_ADDR_LOOKUP_OK, - QCM_ADDR_LOOKUP_BAD_VERSION, - QCM_ADDR_LOOKUP_BAD_OPCODE, - QCM_ADDR_LOOKUP_NOT_FOUND, - QCM_ADDR_LOOKUP_INVALID_REQUEST, -} qcm_address_lookup_status_t; - - -/** - * Create the message properties and body for a link route address lookup. The - * returned properties and body can be passed directly to - * qdrc_client_request_CT(). - * - * @param address - fully qualified link route address to look up. - * @param dir - QD_INCOMING or QD_OUTGOING - * @param properties - return value for message application properties section - * @param body - return value for message body - * @return zero on success - */ -int qcm_link_route_lookup_request(qd_iterator_t *address, - qd_direction_t dir, - qd_composed_field_t **properties, - qd_composed_field_t **body); - - -/** - * Parse out the payload of the link route lookup reply message. The - * properties and body fields are provided by the on_reply_cb() callback passed - * to the qdrc_client_request_CT() call. - * - * @param properties - application properties as returned in the reply - * @param body - body from reply message - * @param is_link_route - set True if the address is a link route address that - * exists in the route tables of the queried router. - * @param has_destination - if is_link_route this indicates whether or not the - * queried router has active destinations for this link route. - * @return QCM_ADDR_LOOKUP_OK if query is successful, else and error code - */ -qcm_address_lookup_status_t qcm_link_route_lookup_decode(qd_iterator_t *properties, - qd_iterator_t *body, - bool *is_link_route, - bool *has_destinations); -#endif // router_core_address_lookup_h http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d965d9b1/src/router_core/modules/address_lookup_server/address_lookup_server.c ---------------------------------------------------------------------- diff --git a/src/router_core/modules/address_lookup_server/address_lookup_server.c b/src/router_core/modules/address_lookup_server/address_lookup_server.c new file mode 100644 index 0000000..9a05c87 --- /dev/null +++ b/src/router_core/modules/address_lookup_server/address_lookup_server.c @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/address_lookup_server.h> +#include <qpid/dispatch/ctools.h> +#include "module.h" +#include "core_link_endpoint.h" + +#include "inttypes.h" + + +typedef struct _endpoint_ref { + DEQ_LINKS(struct _endpoint_ref); + qdrc_endpoint_t *endpoint; + const char *container_id; +} _endpoint_ref_t; +DEQ_DECLARE(_endpoint_ref_t, _endpoint_ref_list_t); +ALLOC_DEFINE(_endpoint_ref_t); + + +static struct { + qdr_core_t *core; + _endpoint_ref_list_t endpoints; +} _server_state; + + +/* parse out the opcode from the request + */ +static address_lookup_opcode_t _decode_opcode(qd_parsed_field_t *properties) +{ + if (!properties) + return OPCODE_INVALID; + qd_parsed_field_t *oc = qd_parse_value_by_key(properties, "opcode"); + if (!oc) + return OPCODE_INVALID; + uint32_t opcode = qd_parse_as_uint(oc); + if (!qd_parse_ok(oc)) + return OPCODE_INVALID; + return (address_lookup_opcode_t)opcode; +} + + +/* send a reply to a lookup request + */ +static uint64_t _send_reply(_endpoint_ref_t *epr, + address_lookup_opcode_t opcode, + qcm_address_lookup_status_t status, + qd_iterator_t *correlation_id, + qd_iterator_t *reply_to, + qd_composed_field_t *body) +{ + if (!correlation_id || !reply_to) { + qd_log(_server_state.core->log, QD_LOG_ERROR, + "Link route address reply failed - invalid request message properties" + " (container=%s, endpoint=%p)", + epr->container_id, (void *)epr->endpoint); + return PN_REJECTED; + } + + qd_composed_field_t *fld = qd_compose(QD_PERFORMATIVE_HEADER, 0); + qd_compose_start_list(fld); + qd_compose_insert_bool(fld, 0); // durable + qd_compose_end_list(fld); + + fld = qd_compose(QD_PERFORMATIVE_PROPERTIES, fld); + qd_compose_start_list(fld); + qd_compose_insert_null(fld); // message-id + qd_compose_insert_null(fld); // user-id + qd_compose_insert_typed_iterator(fld, reply_to); // to + qd_compose_insert_null(fld); // subject + qd_compose_insert_null(fld); // reply-to + qd_compose_insert_typed_iterator(fld, correlation_id); + qd_compose_end_list(fld); + + fld = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, fld); + qd_compose_start_map(fld); + qd_compose_insert_string(fld, "version"); + qd_compose_insert_uint(fld, PROTOCOL_VERSION); + qd_compose_insert_string(fld, "opcode"); + qd_compose_insert_uint(fld, opcode); + qd_compose_insert_string(fld, "status"); + qd_compose_insert_uint(fld, status); + qd_compose_end_map(fld); + + qd_message_t *msg = qd_message(); + + qd_message_compose_3(msg, fld, body); + qdr_in_process_send_to_CT(_server_state.core, reply_to, msg, true, false); + qd_message_free(msg); + qd_compose_free(fld); + + return PN_ACCEPTED; +} + + +/* perform a link route lookup + */ +static uint64_t _do_link_route_lookup(_endpoint_ref_t *epr, + qd_parsed_field_t *body, + qd_iterator_t *reply_to, + qd_iterator_t *cid) +{ + if (!body || !qd_parse_ok(body) || qd_parse_sub_count(body) < 2) { + qd_log(_server_state.core->log, QD_LOG_ERROR, + "Link route address lookup failed - invalid request body" + " (container=%s, endpoint=%p)", + epr->container_id, (void *)epr->endpoint); + return PN_REJECTED; + } + + // + // body[0] == fully qualified address (string) + // body[1] == direction (bool, true == receiver) + // + + qd_iterator_t *addr_i = qd_parse_raw(qd_parse_sub_value(body, 0)); + qd_direction_t dir = (qd_parse_as_bool(qd_parse_sub_value(body, 1)) + ? QD_INCOMING : QD_OUTGOING); + + bool is_link_route = false; + bool has_destinations = false; + qdr_address_t *addr = 0; + qd_iterator_reset_view(addr_i, ITER_VIEW_ALL); + qd_parse_tree_retrieve_match(_server_state.core->link_route_tree[dir], addr_i, (void**) &addr); + if (addr) { + is_link_route = true; + has_destinations = !!(DEQ_SIZE(addr->conns) || DEQ_SIZE(addr->rlinks) || qd_bitmask_cardinality(addr->rnodes)); + } + + // out_body[0] == is_link_route (bool) + // out_body[1] == has_destinations (bool) + + qd_composed_field_t *out_body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); + qd_compose_start_list(out_body); + qd_compose_insert_bool(out_body, is_link_route); + qd_compose_insert_bool(out_body, has_destinations); + qd_compose_end_list(out_body); + + uint64_t rc = _send_reply(epr, + OPCODE_LINK_ROUTE_LOOKUP, + addr ? QCM_ADDR_LOOKUP_OK : QCM_ADDR_LOOKUP_NOT_FOUND, + cid, + reply_to, + out_body); + qd_compose_free(out_body); + + if (qd_log_enabled(_server_state.core->log, QD_LOG_TRACE)) { + char *as = (char *)qd_iterator_copy(addr_i); + qd_log(_server_state.core->log, QD_LOG_TRACE, + "Link route address lookup on %s - %sfound is link route=%s has_destinations=%s" + " (container=%s, endpoint=%p)", + as, + (addr) ? "" : "not ", + is_link_route ? "yes" : "no", + has_destinations ? "yes" : "no", + epr->container_id, + (void *)epr->endpoint); + free(as); + } + return rc; +} + + +/* handle lookup request from client + */ +void _on_transfer(void *link_context, + qdr_delivery_t *delivery, + qd_message_t *message) +{ + if (!qd_message_receive_complete(message)) + return; + + _endpoint_ref_t *epr = (_endpoint_ref_t *)link_context; + qd_log(_server_state.core->log, QD_LOG_TRACE, + "Address lookup request received (container=%s, endpoint=%p)", + epr->container_id, (void *)epr->endpoint); + + uint64_t disposition = PN_ACCEPTED; + qd_iterator_t *p_iter = qd_message_field_iterator(message, QD_FIELD_APPLICATION_PROPERTIES); + qd_parsed_field_t *props = qd_parse(p_iter); + if (!props || !qd_parse_ok(props) || !qd_parse_is_map(props)) { + qd_log(_server_state.core->log, QD_LOG_ERROR, + "Invalid address lookup request - no properties (container=%s, endpoint=%p)", + epr->container_id, (void *)epr->endpoint); + disposition = PN_REJECTED; + goto exit; + } + + qd_parsed_field_t *v = qd_parse_value_by_key(props, "version"); + if (!v) { + qd_log(_server_state.core->log, QD_LOG_ERROR, + "Invalid address lookup request - no version (container=%s, endpoint=%p)", + epr->container_id, (void *)epr->endpoint); + disposition = PN_REJECTED; + goto exit; + } + + uint32_t version = qd_parse_as_uint(v); + if (!qd_parse_ok(v)) { + qd_log(_server_state.core->log, QD_LOG_ERROR, + "Invalid address lookup request - invalid version (container=%s, endpoint=%p)", + epr->container_id, (void *)epr->endpoint); + disposition = PN_REJECTED; + goto exit; + } + + if (version != PROTOCOL_VERSION) { + qd_log(_server_state.core->log, QD_LOG_ERROR, + "Invalid address lookup request - unknown version" + " (container=%s, endpoint=%p, version=%"PRIu32")", + epr->container_id, (void *)epr->endpoint, version); + disposition = PN_REJECTED; + goto exit; + // @TODO(kgiusti) send reply with status QCM_ADDR_LOOKUP_BAD_VERSION + } + + address_lookup_opcode_t opcode = _decode_opcode(props); + switch (opcode) { + case OPCODE_LINK_ROUTE_LOOKUP: { + qd_iterator_t *b_iter = qd_message_field_iterator(message, QD_FIELD_BODY); + qd_parsed_field_t *body = qd_parse(b_iter); + qd_iterator_t *reply_to = qd_message_field_iterator_typed(message, QD_FIELD_REPLY_TO); + qd_iterator_t *cid = qd_message_field_iterator_typed(message, QD_FIELD_CORRELATION_ID); + disposition = _do_link_route_lookup(epr, body, reply_to, cid); + qd_iterator_free(cid); + qd_iterator_free(reply_to); + qd_parse_free(body); + qd_iterator_free(b_iter); + break; + } + case OPCODE_INVALID: + default: + qd_log(_server_state.core->log, QD_LOG_ERROR, + "Invalid address lookup request - invalid opcode" + " (container=%s, endpoint=%p, opcode=%d)", + epr->container_id, (void *)epr->endpoint, opcode); + disposition = PN_REJECTED; + } + +exit: + qd_parse_free(props); + qd_iterator_free(p_iter); + qdrc_endpoint_settle_CT(_server_state.core, delivery, disposition); + qdrc_endpoint_flow_CT(_server_state.core, epr->endpoint, 1, false); + return; +} + + +/* handle incoming attach to address lookup service + */ +static void _on_first_attach(void *bind_context, + qdrc_endpoint_t *endpoint, + void **link_context, + qdr_terminus_t *remote_source, + qdr_terminus_t *remote_target) +{ + // + // Only accept incoming links initiated by the edge router. Detach all + // other links + // + qdr_connection_t *conn = qdrc_endpoint_get_connection_CT(endpoint); + if (qdrc_endpoint_get_direction_CT(endpoint) != QD_INCOMING || + conn->role != QDR_ROLE_EDGE_CONNECTION) { + *link_context = 0; + qdrc_endpoint_detach_CT(_server_state.core, endpoint, 0); + qd_log(_server_state.core->log, QD_LOG_ERROR, + "Attempt to attach to address lookup server rejected (container=%s)", + (conn->connection_info) ? conn->connection_info->container : "<unknown>"); + return; + } + + _endpoint_ref_t *epr = new__endpoint_ref_t(); + ZERO(epr); + epr->endpoint = endpoint; + epr->container_id = (conn->connection_info) ? conn->connection_info->container : "<unknown>"; + DEQ_INSERT_TAIL(_server_state.endpoints, epr); + *link_context = epr; + qdrc_endpoint_second_attach_CT(_server_state.core, endpoint, remote_source, remote_target); + qdrc_endpoint_flow_CT(_server_state.core, endpoint, 1, false); + + qd_log(_server_state.core->log, QD_LOG_TRACE, + "Client attached to address lookup server (container=%s, endpoint=%p)", + epr->container_id, (void *)endpoint); +} + + +/* handle incoming detach from client + */ +static void _on_first_detach(void *link_context, + qdr_error_t *error) +{ + _endpoint_ref_t *epr = (_endpoint_ref_t *)link_context; + qdrc_endpoint_detach_CT(_server_state.core, epr->endpoint, 0); + DEQ_REMOVE(_server_state.endpoints, epr); + qd_log(_server_state.core->log, QD_LOG_TRACE, + "Client detached from address lookup server (container=%s, endpoint=%p)", + epr->container_id, (void *)epr->endpoint); + free__endpoint_ref_t(epr); +} + + +static qdrc_endpoint_desc_t _endpoint_handlers = +{ + .label = "address lookup", + .on_first_attach = _on_first_attach, + .on_transfer = _on_transfer, + .on_first_detach = _on_first_detach, +}; + + +static void _address_lookup_init_CT(qdr_core_t *core, void **module_context) +{ + // + // Address resolution service is provided by interior routers only + // + if (core->router_mode != QD_ROUTER_MODE_INTERIOR) + return; + + _server_state.core = core; + + // + // Handle any incoming links to the QD_TERMINUS_ADDRESS_LOOKUP address + // + qdrc_endpoint_bind_mobile_address_CT(core, + QD_TERMINUS_ADDRESS_LOOKUP, + '0', // phase + &_endpoint_handlers, + &_server_state); + *module_context = &_server_state; +} + + +static void _address_lookup_final_CT(void *module_context) +{ + _endpoint_ref_t *epr = DEQ_HEAD(_server_state.endpoints); + while (epr) { + DEQ_REMOVE_HEAD(_server_state.endpoints); + free__endpoint_ref_t(epr); + epr = DEQ_HEAD(_server_state.endpoints); + } +} + + +QDR_CORE_MODULE_DECLARE("address lookup", _address_lookup_init_CT, _address_lookup_final_CT) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
