Repository: qpid-dispatch Updated Branches: refs/heads/tross-DISPATCH-179-1 c50da0faf -> 41c6e23e3
DISPATCH-179 - Added code to display links when requested through a management link query (like qdstat -l) Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/41c6e23e Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/41c6e23e Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/41c6e23e Branch: refs/heads/tross-DISPATCH-179-1 Commit: 41c6e23e31227ecf57a3678506071fc4fba5ce2a Parents: c50da0f Author: ganeshmurthy <[email protected]> Authored: Thu Nov 12 13:01:19 2015 -0500 Committer: ganeshmurthy <[email protected]> Committed: Thu Nov 12 13:01:19 2015 -0500 ---------------------------------------------------------------------- src/CMakeLists.txt | 1 + src/router_agent.c | 4 +- src/router_core/agent.c | 107 ++++++++++++++++-- src/router_core/agent_address.c | 71 ------------ src/router_core/agent_address.h | 2 - src/router_core/agent_link.c | 194 ++++++++++++++++++++++++++++++++ src/router_core/agent_link.h | 27 +++++ src/router_core/management_agent.c | 1 + src/router_core/router_core.c | 1 - 9 files changed, 326 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/41c6e23e/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a55b73b..4dd221a 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -67,6 +67,7 @@ set(qpid_dispatch_SOURCES router_config.c router_core/agent.c router_core/agent_address.c + router_core/agent_link.c router_core/connections.c router_core/router_core.c router_core/router_core_thread.c http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/41c6e23e/src/router_agent.c ---------------------------------------------------------------------- diff --git a/src/router_agent.c b/src/router_agent.c index b9632e2..2b006b9 100644 --- a/src/router_agent.c +++ b/src/router_agent.c @@ -74,6 +74,7 @@ qd_error_t qd_entity_refresh_router_address(qd_entity_t* entity, void *impl) { return qd_error_code(); } +/* static const char *qd_link_type_names[] = { "endpoint", "waypoint", "inter-router", "inter-area" }; ENUM_DEFINE(qd_link_type, qd_link_type_names); @@ -90,6 +91,7 @@ static const char* qd_router_link_name(qd_router_link_t* link) { return pn_link_name(qd_link_pn(link->link)); } +//TODO - Remove this function and the functions that it calls since this is not used anymore. qd_error_t qd_entity_refresh_router_link(qd_entity_t* entity, void *impl) { qd_router_link_t *link = (qd_router_link_t*) impl; @@ -104,7 +106,7 @@ qd_error_t qd_entity_refresh_router_link(qd_entity_t* entity, void *impl) return QD_ERROR_NONE; return qd_error_code(); } - +*/ void qd_router_build_node_list(qd_dispatch_t *qd, qd_composed_field_t *field) { qd_router_t *router = qd->router; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/41c6e23e/src/router_core/agent.c ---------------------------------------------------------------------- diff --git a/src/router_core/agent.c b/src/router_core/agent.c index 3950283..d3a56cb 100644 --- a/src/router_core/agent.c +++ b/src/router_core/agent.c @@ -19,8 +19,45 @@ #include <qpid/dispatch/amqp.h> #include "agent_address.h" +#include "agent_link.h" +#include "router_core_private.h" #include <stdio.h> + +static const char *qdr_address_columns[] = + {"name", + "identity", + "type", + "key", + "inProcess", + "subscriberCount", + "remoteCount", + "hostRouters", + "deliveriesIngress", + "deliveriesEgress", + "deliveriesTransit", + "deliveriesToContainer", + "deliveriesFromContainer", + 0}; + + +static const char *qdr_link_columns[] = + {"linkType", + "name", + "linkDir", + "msgFifoDepth", + "owningAddr", + "remoteContainer", + "linkName", + "eventFifoDepth", + "type", + "identity", + 0}; + +#define QDR_ADDRESS_COLUMN_COUNT 13 +#define QDR_LINK_COLUMN_COUNT 10 + + //================================================================================== // Internal Functions //================================================================================== @@ -64,8 +101,8 @@ void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query) static void qdrh_query_get_first_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdrh_query_get_next_CT(qdr_core_t *core, qdr_action_t *action, bool discard); - - +static void qdr_agent_emit_columns(qdr_query_t *query, const char *qdr_columns[], int column_count); +static void qdr_agent_set_columns(qdr_query_t *query, qd_parsed_field_t *attribute_names, const char *qdr_columns[], int column_count); //================================================================================== // Interface Functions //================================================================================== @@ -102,8 +139,8 @@ qdr_query_t *qdr_manage_query(qdr_core_t *core, void *context, qd_router_entity_ switch (query->entity_type) { case QD_ROUTER_CONNECTION: break; - case QD_ROUTER_LINK: break; - case QD_ROUTER_ADDRESS: qdra_address_set_columns(query, attribute_names);break; + case QD_ROUTER_LINK: qdr_agent_set_columns(query, attribute_names, qdr_link_columns, QDR_LINK_COLUMN_COUNT);break; + case QD_ROUTER_ADDRESS: qdr_agent_set_columns(query, attribute_names, qdr_address_columns, QDR_ADDRESS_COLUMN_COUNT);break; case QD_ROUTER_WAYPOINT: break; case QD_ROUTER_EXCHANGE: break; case QD_ROUTER_BINDING: break; @@ -117,8 +154,8 @@ void qdr_query_add_attribute_names(qdr_query_t *query) { switch (query->entity_type) { case QD_ROUTER_CONNECTION: break; - case QD_ROUTER_LINK: break; - case QD_ROUTER_ADDRESS: qdra_address_emit_columns(query); break; + case QD_ROUTER_LINK: qdr_agent_emit_columns(query, qdr_link_columns, QDR_LINK_COLUMN_COUNT);break; + case QD_ROUTER_ADDRESS: qdr_agent_emit_columns(query, qdr_address_columns, QDR_ADDRESS_COLUMN_COUNT); break; case QD_ROUTER_WAYPOINT: break; case QD_ROUTER_EXCHANGE: break; case QD_ROUTER_BINDING: break; @@ -147,6 +184,62 @@ void qdr_query_free(qdr_query_t *query) { } +static void qdr_agent_emit_columns(qdr_query_t *query, const char *qdr_columns[], int column_count) +{ + qd_compose_start_list(query->body); + int i = 0; + while (query->columns[i] >= 0) { + assert(query->columns[i] < column_count); + qd_compose_insert_string(query->body, qdr_columns[query->columns[i]]); + i++; + } + qd_compose_end_list(query->body); +} + +static void qdr_agent_set_columns(qdr_query_t *query, + qd_parsed_field_t *attribute_names, + const char *qdr_columns[], + int column_count) +{ + if (!attribute_names || + (qd_parse_tag(attribute_names) != QD_AMQP_LIST8 && + qd_parse_tag(attribute_names) != QD_AMQP_LIST32) || + qd_parse_sub_count(attribute_names) == 0) { + // + // Either the attribute_names field is absent, it's not a list, or it's an empty list. + // In this case, we will include all available attributes. + // + int i; + for (i = 0; i < column_count; i++) + query->columns[i] = i; + query->columns[i] = -1; + return; + } + + // + // We have a valid, non-empty attribute list. Set the columns appropriately. + // + uint32_t count = qd_parse_sub_count(attribute_names); + uint32_t idx; + + for (idx = 0; idx < count; idx++) { + qd_parsed_field_t *name = qd_parse_sub_value(attribute_names, idx); + if (!name || (qd_parse_tag(name) != QD_AMQP_STR8_UTF8 && qd_parse_tag(name) != QD_AMQP_STR32_UTF8)) + query->columns[idx] = QDR_AGENT_COLUMN_NULL; + else { + int j = 0; + while (qdr_columns[j]) { + qd_field_iterator_t *iter = qd_parse_raw(name); + if (qd_field_iterator_equal(iter, (const unsigned char*) qdr_columns[j])) { + query->columns[idx] = j; + break; + } + } + } + } +} + + void qdr_manage_handler(qdr_core_t *core, qdr_manage_response_t response_handler) { @@ -174,7 +267,7 @@ static void qdrh_query_get_first_CT(qdr_core_t *core, qdr_action_t *action, bool if (!discard) switch (query->entity_type) { case QD_ROUTER_CONNECTION: break; - case QD_ROUTER_LINK: break; + case QD_ROUTER_LINK: qdrl_link_get_first_CT(core, query, offset); break; case QD_ROUTER_ADDRESS: qdra_address_get_first_CT(core, query, offset); break; case QD_ROUTER_WAYPOINT: break; case QD_ROUTER_EXCHANGE: break; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/41c6e23e/src/router_core/agent_address.c ---------------------------------------------------------------------- diff --git a/src/router_core/agent_address.c b/src/router_core/agent_address.c index 88a60f7..08af140 100644 --- a/src/router_core/agent_address.c +++ b/src/router_core/agent_address.c @@ -19,22 +19,6 @@ #include "agent_address.h" -static const char *qdr_address_columns[] = - {"name", - "identity", - "type", - "key", - "inProcess", - "subscriberCount", - "remoteCount", - "hostRouters", - "deliveriesIngress", - "deliveriesEgress", - "deliveriesTransit", - "deliveriesToContainer", - "deliveriesFromContainer", - 0}; - #define QDR_ADDRESS_NAME 0 #define QDR_ADDRESS_IDENTITY 1 #define QDR_ADDRESS_TYPE 2 @@ -48,7 +32,6 @@ static const char *qdr_address_columns[] = #define QDR_ADDRESS_DELIVERIES_TRANSIT 10 #define QDR_ADDRESS_DELIVERIES_TO_CONTAINER 11 #define QDR_ADDRESS_DELIVERIES_FROM_CONTAINER 12 -#define QDR_ADDRESS_COLUMN_COUNT 13 static void qdr_manage_write_address_CT(qdr_query_t *query, qdr_address_t *addr) { @@ -129,60 +112,6 @@ static void qdr_manage_advance_address_CT(qdr_query_t *query, qdr_address_t *add } -void qdra_address_set_columns(qdr_query_t *query, qd_parsed_field_t *attribute_names) -{ - if (!attribute_names || - (qd_parse_tag(attribute_names) != QD_AMQP_LIST8 && - qd_parse_tag(attribute_names) != QD_AMQP_LIST32) || - qd_parse_sub_count(attribute_names) == 0) { - // - // Either the attribute_names field is absent, it's not a list, or it's an empty list. - // In this case, we will include all available attributes. - // - int i; - for (i = 0; i < QDR_ADDRESS_COLUMN_COUNT; i++) - query->columns[i] = i; - query->columns[i] = -1; - return; - } - - // - // We have a valid, non-empty attribute list. Set the columns appropriately. - // - uint32_t count = qd_parse_sub_count(attribute_names); - uint32_t idx; - - for (idx = 0; idx < count; idx++) { - qd_parsed_field_t *name = qd_parse_sub_value(attribute_names, idx); - if (!name || (qd_parse_tag(name) != QD_AMQP_STR8_UTF8 && qd_parse_tag(name) != QD_AMQP_STR32_UTF8)) - query->columns[idx] = QDR_AGENT_COLUMN_NULL; - else { - int j = 0; - while (qdr_address_columns[j]) { - qd_field_iterator_t *iter = qd_parse_raw(name); - if (qd_field_iterator_equal(iter, (const unsigned char*) qdr_address_columns[j])) { - query->columns[idx] = j; - break; - } - } - } - } -} - - -void qdra_address_emit_columns(qdr_query_t *query) -{ - qd_compose_start_list(query->body); - int i = 0; - while (query->columns[i] >= 0) { - assert(query->columns[i] < QDR_ADDRESS_COLUMN_COUNT); - qd_compose_insert_string(query->body, qdr_address_columns[query->columns[i]]); - i++; - } - qd_compose_end_list(query->body); -} - - void qdra_address_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset) { // http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/41c6e23e/src/router_core/agent_address.h ---------------------------------------------------------------------- diff --git a/src/router_core/agent_address.h b/src/router_core/agent_address.h index 53d383a..753c37d 100644 --- a/src/router_core/agent_address.h +++ b/src/router_core/agent_address.h @@ -21,8 +21,6 @@ #include "router_core_private.h" -void qdra_address_set_columns(qdr_query_t *query, qd_parsed_field_t *attribute_names); -void qdra_address_emit_columns(qdr_query_t *query); void qdra_address_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset); void qdra_address_get_next_CT(qdr_core_t *core, qdr_query_t *query); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/41c6e23e/src/router_core/agent_link.c ---------------------------------------------------------------------- diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c new file mode 100644 index 0000000..0cced7b --- /dev/null +++ b/src/router_core/agent_link.c @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "agent_link.h" + +#define QDR_LINK_LINK_TYPE 0 +#define QDR_LINK_LINK_NAME 1 +#define QDR_LINK_LINK_DIR 2 //done +#define QDR_LINK_MSG_FIFO_DEPTH 3 //done +#define QDR_LINK_OWNING_ADDR 4 //done +#define QDR_LINK_REMOTE_CONTAINER 5 +#define QDR_LINK_NAME 6 +#define QDR_LINK_EVENT_FIFO_DEPTH 7 //done +#define QDR_LINK_TYPE 8 +#define QDR_LINK_IDENTITY 9 + +static const char *qd_link_type_names[] = { "endpoint", "waypoint", "inter-router", "inter-area" }; +ENUM_DEFINE(qd_link_type, qd_link_type_names); + +static const char *address_key(qdr_address_t *addr) { + return addr && addr->hash_handle ? (const char*) qd_hash_key_by_handle(addr->hash_handle) : NULL; +} + +static const char* qd_router_link_remote_container(qdr_link_t *link) { + if (!link->link || !qd_link_pn(link->link)) + return ""; + return pn_connection_remote_container( + pn_session_connection(qd_link_pn_session(link->link))); +} + +static const char* qd_router_link_name(qdr_link_t *link) { + if (!link->link || !qd_link_pn(link->link)) + return ""; + return pn_link_name(qd_link_pn(link->link)); +} + +static void qdr_agent_write_link_CT(qdr_query_t *query, qdr_link_t *link ) +{ + qd_composed_field_t *body = query->body; + + qd_compose_start_list(body); + int i = 0; + while (query->columns[i] >= 0) { + switch(query->columns[i]) { + case QDR_LINK_IDENTITY: + case QDR_LINK_NAME: + // TODO - This needs to be fixed (use connection_id + link_name) + qd_compose_insert_string(body, "fix-me-hardcoded-for-now" ); + break; + case QDR_LINK_TYPE: + qd_compose_insert_string(body, "org.apache.qpid.dispatch.router.link"); + break; + + case QDR_LINK_REMOTE_CONTAINER: + qd_compose_insert_string(body, qd_router_link_remote_container(link)); + break; + + case QDR_LINK_LINK_NAME: + qd_compose_insert_string(body, qd_router_link_name(link)); + break; + + case QDR_LINK_LINK_TYPE: + qd_compose_insert_string(body, qd_link_type_name(link->link_type)); + break; + + case QDR_LINK_OWNING_ADDR: + qd_compose_insert_string(body, address_key(link->owning_addr)); + break; + + case QDR_LINK_LINK_DIR: + qd_compose_insert_string(body, link->link_direction == QD_INCOMING ? "in" : "out"); + break; + + case QDR_LINK_MSG_FIFO_DEPTH: + qd_compose_insert_ulong(body, DEQ_SIZE(link->msg_fifo)); + break; + + case QDR_LINK_EVENT_FIFO_DEPTH: + qd_compose_insert_ulong(body, DEQ_SIZE(link->event_fifo)); + break; + + default: + qd_compose_insert_null(body); + break; + } + i++; + } + qd_compose_end_list(body); +} + +static void qdr_manage_advance_link_CT(qdr_query_t *query, qdr_link_t *link) +{ + query->next_offset++; + link = DEQ_NEXT(link); + if (link) + query->more = true; + //query->next_key = qdr_field((const char*) qd_hash_key_by_handle(link->owning_addr->hash_handle)); + else + query->more = false; +} + + +void qdrl_link_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset) +{ + // + // Queries that get this far will always succeed. + // + query->status = &QD_AMQP_OK; + + // + // If the offset goes beyond the set of links, end the query now. + // + if (offset >= DEQ_SIZE(core->links)) { + query->more = false; + qdr_agent_enqueue_response_CT(core, query); + return; + } + + // + // Run to the address at the offset. + // + qdr_link_t *link = DEQ_HEAD(core->links); + for (int i = 0; i < offset && link; i++) + link = DEQ_NEXT(link); + assert(link); + + // + // Write the columns of the link into the response body. + // + qdr_agent_write_link_CT(query, link); + + // + // Advance to the next address + // + query->next_offset = offset; + qdr_manage_advance_link_CT(query, link); + + // + // Enqueue the response. + // + qdr_agent_enqueue_response_CT(core, query); +} + + +void qdrl_link_get_next_CT(qdr_core_t *core, qdr_query_t *query) +{ + qdr_link_t *link = 0; + + if (!link) { + // + // If the address was removed in the time between this get and the previous one, + // we need to use the saved offset, which is less efficient. + // + if (query->next_offset < DEQ_SIZE(core->links)) { + link = DEQ_HEAD(core->links); + for (int i = 0; i < query->next_offset && link; i++) + link = DEQ_NEXT(link); + } + } + + if (link) { + // + // Write the columns of the link entity into the response body. + // + qdr_agent_write_link_CT(query, link); + + // + // Advance to the next link + // + qdr_manage_advance_link_CT(query, link); + } else + query->more = false; + + // + // Enqueue the response. + // + qdr_agent_enqueue_response_CT(core, query); +} http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/41c6e23e/src/router_core/agent_link.h ---------------------------------------------------------------------- diff --git a/src/router_core/agent_link.h b/src/router_core/agent_link.h new file mode 100644 index 0000000..aaafa06 --- /dev/null +++ b/src/router_core/agent_link.h @@ -0,0 +1,27 @@ +#ifndef qdr_agent_link +#define qdr_agent_link 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "router_core_private.h" + +void qdrl_link_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset); +void qdrl_link_get_next_CT(qdr_core_t *core, qdr_query_t *query); + +#endif http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/41c6e23e/src/router_core/management_agent.c ---------------------------------------------------------------------- diff --git a/src/router_core/management_agent.c b/src/router_core/management_agent.c index 848ca66..2d93cb5 100644 --- a/src/router_core/management_agent.c +++ b/src/router_core/management_agent.c @@ -199,6 +199,7 @@ static void core_agent_query_handler(qd_dispatch_t *qd, // Set the callback function. qdr_manage_handler(core, manage_response_handler); + // Local local function that creates and returns a qd_management_context_t qd_management_context_t *ctx = qd_management_context(message, field, 0, reply_to, qd, (*count)); ctx->query = qdr_manage_query(core, ctx, entity_type, attribute_names_parsed_field, field); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/41c6e23e/src/router_core/router_core.c ---------------------------------------------------------------------- diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index b7f24e3..a747572 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -210,4 +210,3 @@ void qdr_del_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode) ref = DEQ_NEXT(ref); } } - --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
