Repository: qpid-dispatch Updated Branches: refs/heads/master 49891577c -> 8a17ef367
DISPATCH_179 - Added Delete/Deactivate implementation for link routes and auto links. Added -L option to qdstat to display auto links. Added a new assertion to DEQ_REMOVE to protect against removing the same item twice. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/8a17ef36 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/8a17ef36 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/8a17ef36 Branch: refs/heads/master Commit: 8a17ef367d55d1b0a68ed139380f4fc0dc15c833 Parents: 4989157 Author: Ted Ross <[email protected]> Authored: Mon Mar 28 12:38:38 2016 -0400 Committer: Ted Ross <[email protected]> Committed: Mon Mar 28 12:38:38 2016 -0400 ---------------------------------------------------------------------- include/qpid/dispatch/ctools.h | 1 + include/qpid/dispatch/router_core.h | 1 + python/qpid_dispatch/management/qdrouter.json | 2 +- src/router_core/agent_link.c | 24 +++++++- src/router_core/connections.c | 23 +++---- src/router_core/error.c | 12 ++++ src/router_core/forwarder.c | 7 ++- src/router_core/route_control.c | 70 +++++++++++++++++++++- src/router_core/router_core_private.h | 20 ++++++- tools/qdstat | 16 +++-- 10 files changed, 151 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8a17ef36/include/qpid/dispatch/ctools.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/ctools.h b/include/qpid/dispatch/ctools.h index 9ab1c70..fc159a6 100644 --- a/include/qpid/dispatch/ctools.h +++ b/include/qpid/dispatch/ctools.h @@ -162,6 +162,7 @@ do { \ (i)->prev##n->next##n = (i)->next##n; \ else \ (d).head = (i)->next##n; \ + CT_ASSERT((d).size > 0); \ (d).size--; \ (i)->next##n = 0; \ (i)->prev##n = 0; \ http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8a17ef36/include/qpid/dispatch/router_core.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 6e5baa5..6e26b41 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -339,6 +339,7 @@ qdr_error_t *qdr_error_from_pn(pn_condition_t *pn); qdr_error_t *qdr_error(const char *name, const char *description); void qdr_error_free(qdr_error_t *error); void qdr_error_copy(qdr_error_t *from, pn_condition_t *to); +char *qdr_error_description(qdr_error_t *err); /** ****************************************************************************** http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8a17ef36/python/qpid_dispatch/management/qdrouter.json ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index ee5fe8b..4112b89 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -990,7 +990,7 @@ "update": true }, "operStatus": { - "type": ["up", "down", "quiescing"] + "type": ["up", "down", "quiescing", "idle"] }, "linkName": { "type": "string", http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8a17ef36/src/router_core/agent_link.c ---------------------------------------------------------------------- diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c index af06f6d..29419cf 100644 --- a/src/router_core/agent_link.c +++ b/src/router_core/agent_link.c @@ -48,8 +48,8 @@ const char *qdr_link_columns[] = "undeliveredCount", "unsettledCount", "deliveryCount", - "adminState", - "operState", + "adminStatus", + "operStatus", 0}; static const char *qd_link_type_name(qd_link_type_t lt) @@ -71,8 +71,9 @@ static const char *address_key(qdr_address_t *addr) static void qdr_agent_write_column_CT(qd_composed_field_t *body, int col, qdr_link_t *link) { - switch(col) { + char *text = 0; + switch(col) { case QDR_LINK_NAME: { if (link->name) qd_compose_insert_string(body, link->name); @@ -138,7 +139,24 @@ static void qdr_agent_write_column_CT(qd_composed_field_t *body, int col, qdr_li break; case QDR_LINK_ADMIN_STATE: + text = link->admin_enabled ? "enabled" : "disabled"; + qd_compose_insert_string(body, text); + break; + case QDR_LINK_OPER_STATE: + switch (link->oper_status) { + case QDR_LINK_OPER_UP: text = "up"; break; + case QDR_LINK_OPER_DOWN: text = "down"; break; + case QDR_LINK_OPER_QUIESCING: text = "quiescing"; break; + case QDR_LINK_OPER_IDLE: text = "idle"; break; + default: + text = 0; + } + if (!!text) + qd_compose_insert_string(body, text); + else + qd_compose_insert_null(body); + break; default: qd_compose_insert_null(body); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8a17ef36/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index e35f6a1..ee9ec04 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -31,13 +31,6 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b ALLOC_DEFINE(qdr_connection_t); ALLOC_DEFINE(qdr_connection_work_t); -typedef enum { - QDR_CONDITION_NO_ROUTE_TO_DESTINATION, - QDR_CONDITION_ROUTED_LINK_LOST, - QDR_CONDITION_FORBIDDEN, - QDR_CONDITION_NONE -} qdr_condition_t; - //================================================================================== // Internal Functions //================================================================================== @@ -272,6 +265,8 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, strcpy(link->name, name); link->link_direction = dir; link->capacity = 32; // TODO - make this configurable + link->admin_enabled = true; + link->oper_status = QDR_LINK_OPER_DOWN; link->strip_annotations_in = conn->strip_annotations_in; link->strip_annotations_out = conn->strip_annotations_out; @@ -511,6 +506,8 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core, link->capacity = 32; // TODO - make this configurable link->name = (char*) malloc(QDR_DISCRIMINATOR_SIZE + 8); qdr_generate_link_name("qdlink", link->name, QDR_DISCRIMINATOR_SIZE + 8); + link->admin_enabled = true; + link->oper_status = QDR_LINK_OPER_DOWN; link->strip_annotations_in = conn->strip_annotations_in; link->strip_annotations_out = conn->strip_annotations_out; @@ -530,7 +527,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core, } -static void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition) +void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition) { qdr_connection_work_t *work = new_qdr_connection_work_t(); ZERO(work); @@ -575,6 +572,8 @@ static void qdr_link_outbound_second_attach_CT(qdr_core_t *core, qdr_link_t *lin work->source = source; work->target = target; + link->oper_status = QDR_LINK_OPER_UP; + qdr_connection_enqueue_work_CT(core, link->conn, work); } @@ -673,7 +672,7 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local) // deleted. // if (DEQ_SIZE(addr->subscriptions) == 0 && DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->inlinks) == 0 && - qd_bitmask_cardinality(addr->rnodes) == 0 && !addr->waypoint && !addr->block_deletion) { + qd_bitmask_cardinality(addr->rnodes) == 0 && addr->ref_count == 0 && !addr->block_deletion) { qd_hash_remove_by_handle(core->addr_hash, addr->hash_handle); DEQ_REMOVE(core->addrs, addr); qd_hash_handle_free(addr->hash_handle); @@ -1059,6 +1058,8 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac qdr_terminus_t *source = action->args.connection.source; qdr_terminus_t *target = action->args.connection.target; + link->oper_status = QDR_LINK_OPER_UP; + // // Handle attach-routed links // @@ -1168,8 +1169,10 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b // For auto links, switch the auto link to failed state and record the error // if (link->auto_link) { + link->auto_link->link = 0; link->auto_link->state = QDR_AUTO_LINK_STATE_FAILED; - // TODO - last_error + free(link->auto_link->last_error); + link->auto_link->last_error = qdr_error_description(error); } link->owning_addr = 0; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8a17ef36/src/router_core/error.c ---------------------------------------------------------------------- diff --git a/src/router_core/error.c b/src/router_core/error.c index 7bc1cae..46a8c97 100644 --- a/src/router_core/error.c +++ b/src/router_core/error.c @@ -18,6 +18,7 @@ */ #include "router_core_private.h" +#include <stdio.h> struct qdr_error_t { qdr_field_t *name; @@ -95,3 +96,14 @@ void qdr_error_copy(qdr_error_t *from, pn_condition_t *to) } +char *qdr_error_description(qdr_error_t *err) +{ + if (!err || !err->description || !err->description->iterator) + return 0; + int length = qd_field_iterator_length(err->description->iterator); + char *text = (char*) malloc(length + 1); + qd_field_iterator_ncopy(err->description->iterator, (unsigned char*) text, length); + text[length] = '\0'; + return text; +} + http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8a17ef36/src/router_core/forwarder.c ---------------------------------------------------------------------- diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index c9752c5..0b305d2 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -85,9 +85,10 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *pe // Create peer linkage only if the delivery is not settled // if (!dlv->settled) { - dlv->peer = peer; - if (peer && peer->peer == 0) + if (peer && peer->peer == 0) { + dlv->peer = peer; peer->peer = dlv; // TODO - make this a back-list for multicast tracking + } } return dlv; @@ -408,6 +409,8 @@ bool qdr_forward_link_balanced_CT(qdr_core_t *core, out_link->link_type = QD_LINK_ENDPOINT; out_link->link_direction = qdr_link_direction(in_link) == QD_OUTGOING ? QD_INCOMING : QD_OUTGOING; out_link->name = in_link->name; + out_link->admin_enabled = true; + out_link->oper_status = QDR_LINK_OPER_DOWN; out_link->connected_link = in_link; in_link->connected_link = out_link; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8a17ef36/src/router_core/route_control.c ---------------------------------------------------------------------- diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c index dd9d43f..3343153 100644 --- a/src/router_core/route_control.c +++ b/src/router_core/route_control.c @@ -143,7 +143,15 @@ static void qdr_auto_link_activate_CT(qdr_core_t *core, qdr_auto_link_t *al, qdr static void qdr_auto_link_deactivate_CT(qdr_core_t *core, qdr_auto_link_t *al, qdr_connection_t *conn) { - //qdr_route_log_CT(core, "Auto Link Deactivated", al->name, al->identity, conn); + qdr_route_log_CT(core, "Auto Link Deactivated", al->name, al->identity, conn); + + if (al->link) { + qdr_link_outbound_detach_CT(core, al->link, 0, QDR_CONDITION_ROUTED_LINK_LOST); + al->link->auto_link = 0; + al->link = 0; + } + + al->state = QDR_AUTO_LINK_STATE_INACTIVE; } @@ -180,6 +188,8 @@ qdr_link_route_t *qdr_route_add_link_route_CT(qdr_core_t *core, qd_hash_insert(core->addr_hash, iter, lr->addr, &lr->addr->hash_handle); } + lr->addr->ref_count++; + // // Find or create a connection identifier structure for this link route // @@ -201,6 +211,32 @@ qdr_link_route_t *qdr_route_add_link_route_CT(qdr_core_t *core, void qdr_route_del_link_route_CT(qdr_core_t *core, qdr_link_route_t *lr) { + // + // Disassociate from the connection identifier. Check to see if the identifier + // should be removed. + // + qdr_conn_identifier_t *cid = lr->conn_id; + if (cid) { + if (!!cid->open_connection) + qdr_link_route_deactivate_CT(core, lr, cid->open_connection); + DEQ_REMOVE_N(REF, cid->link_route_refs, lr); + qdr_route_check_id_for_deletion_CT(core, cid); + } + + // + // Disassociate the link route from its address. Check to see if the address + // should be removed. + // + qdr_address_t *addr = lr->addr; + if (addr && --addr->ref_count == 0) + qdr_check_addr_CT(core, addr, false); + + // + // Remove the link route from the core list. + // + DEQ_REMOVE(core->link_routes, lr); + free(lr->name); + free_qdr_link_route_t(lr); } @@ -215,7 +251,7 @@ qdr_auto_link_t *qdr_route_add_auto_link_CT(qdr_core_t *core, qdr_auto_link_t *al = new_qdr_auto_link_t(); // - // Set up the link_route structure + // Set up the auto_link structure // ZERO(al); al->identity = qdr_identifier(core); @@ -238,6 +274,8 @@ qdr_auto_link_t *qdr_route_add_auto_link_CT(qdr_core_t *core, qd_hash_insert(core->addr_hash, iter, al->addr, &al->addr->hash_handle); } + al->addr->ref_count++; + // // Find or create a connection identifier structure for this auto_link // @@ -257,8 +295,34 @@ qdr_auto_link_t *qdr_route_add_auto_link_CT(qdr_core_t *core, } -void qdr_route_del_auto_link_CT(qdr_core_t *core, qdr_auto_link_t *auto_link) +void qdr_route_del_auto_link_CT(qdr_core_t *core, qdr_auto_link_t *al) { + // + // Disassociate from the connection identifier. Check to see if the identifier + // should be removed. + // + qdr_conn_identifier_t *cid = al->conn_id; + if (cid) { + if (!!cid->open_connection) + qdr_auto_link_deactivate_CT(core, al, cid->open_connection); + DEQ_REMOVE_N(REF, cid->auto_link_refs, al); + qdr_route_check_id_for_deletion_CT(core, cid); + } + + // + // Disassociate the auto link from its address. Check to see if the address + // should be removed. + // + qdr_address_t *addr = al->addr; + if (addr && --addr->ref_count == 0) + qdr_check_addr_CT(core, addr, false); + + // + // Remove the auto link from the core list. + // + DEQ_REMOVE(core->auto_links, al); + free(al->name); + free_qdr_auto_link_t(al); } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8a17ef36/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index f6ff913..dbf3f58 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -42,6 +42,13 @@ int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t * bool qdr_forward_attach_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *in_link, qdr_terminus_t *source, qdr_terminus_t *target); +typedef enum { + QDR_CONDITION_NO_ROUTE_TO_DESTINATION, + QDR_CONDITION_ROUTED_LINK_LOST, + QDR_CONDITION_FORBIDDEN, + QDR_CONDITION_NONE +} qdr_condition_t; + /** * qdr_field_t - This type is used to pass variable-length fields (strings, etc.) into * and out of the router-core thread. @@ -215,6 +222,13 @@ void qdr_del_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_ref_t *ref #define QDR_LINK_LIST_CLASS_CONNECTION 3 #define QDR_LINK_LIST_CLASSES 4 +typedef enum { + QDR_LINK_OPER_UP, + QDR_LINK_OPER_DOWN, + QDR_LINK_OPER_QUIESCING, + QDR_LINK_OPER_IDLE +} qdr_link_oper_status_t; + struct qdr_link_t { DEQ_LINKS(qdr_link_t); qdr_core_t *core; @@ -232,6 +246,8 @@ struct qdr_link_t { qdr_delivery_list_t undelivered; ///< Deliveries to be forwarded or sent qdr_delivery_list_t unsettled; ///< Unsettled deliveries qdr_delivery_ref_list_t updated_deliveries; ///< References to deliveries (in the unsettled list) with updates. + bool admin_enabled; + qdr_link_oper_status_t oper_status; bool strip_annotations_in; bool strip_annotations_out; int capacity; @@ -291,8 +307,8 @@ struct qdr_address_t { qd_hash_handle_t *hash_handle; ///< Linkage back to the hash table entry qd_address_treatment_t treatment; qdr_forwarder_t *forwarder; + int ref_count; ///< Number of link-routes + auto-links referencing this address bool toggle; - bool waypoint; bool block_deletion; bool local; @@ -581,6 +597,8 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core, qdr_terminus_t *source, qdr_terminus_t *target); +void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition); + qdr_query_t *qdr_query(qdr_core_t *core, void *context, qd_router_entity_type_t type, http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8a17ef36/tools/qdstat ---------------------------------------------------------------------- diff --git a/tools/qdstat b/tools/qdstat index e98502d..bbd1973 100755 --- a/tools/qdstat +++ b/tools/qdstat @@ -46,8 +46,8 @@ def parse_args(argv): parser.add_option("-l", "--links", help="Show Router Links", action="store_const", const="l", dest="show") parser.add_option("-n", "--nodes", help="Show Router Nodes", action="store_const", const="n", dest="show") parser.add_option("-a", "--address", help="Show Router Addresses", action="store_const", const="a", dest="show") - parser.add_option("-m", "--memory", help="Show Broker Memory Stats", action="store_const", const="m", dest="show") - parser.add_option("-A", "--autolink", help="Show Auto Links", action="store_const", const="A", dest="show") + parser.add_option("-m", "--memory", help="Show Router Memory Stats", action="store_const", const="m", dest="show") + parser.add_option("-L", "--autolink", help="Show Auto Links", action="store_const", const="L", dest="show") parser.add_option("-v", "--verbose", help="Show maximum detail", action="store_true", dest="verbose") parser.add_option("--log", help="Show recent log entries", action="store_const", const="log", dest="show") parser.add_option("--limit", help="Limit number of log entries", type="int") @@ -221,7 +221,10 @@ class BusManager(Node): heads.append(Header("undel")) heads.append(Header("unsettled")) heads.append(Header("deliveries")) - heads.append(Header("name")) + heads.append(Header("admin")) + heads.append(Header("oper")) + if self.opts.verbose: + heads.append(Header("name")) rows = [] objects = self.query('org.apache.qpid.dispatch.router.link') @@ -237,7 +240,10 @@ class BusManager(Node): row.append(link.undeliveredCount) row.append(link.unsettledCount) row.append(link.deliveryCount) - row.append(link.linkName) + row.append(link.adminStatus) + row.append(link.operStatus) + if self.opts.verbose: + row.append(link.linkName) rows.append(row) title = "Router Links" dispRows = rows @@ -389,7 +395,7 @@ class BusManager(Node): elif main == 'm': self.displayMemory() elif main == 'g': self.displayGeneral() elif main == 'c': self.displayConnections() - elif main == 'A': self.displayAutolinks() + elif main == 'L': self.displayAutolinks() elif main == 'log': self.displayLog() def display(self, identitys): --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
