This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 06aad3c9c570841981aaf2fedb8e465010eb8110 Author: Ted Ross <[email protected]> AuthorDate: Fri Mar 15 11:12:35 2019 -0400 DISPATCH-1289 - Added link-level logging at the INFO level, including full terminus information and terminal stats DISPATCH-1289 - Added uptime-ticks value in the core state. DISPATCH-1289 - Record the ingress time (in core ticks) for each delivery. DISPATCH-1289 - Added counters for delayed settlements and management access to those counters. DISPATCH-1289 - Cleaned up the columns in qdstat -a: In-process stats are now under --verbose. Local destinations and local containers (link-routes) now combined into one column. DISPATCH-1289 - Fixed test to handle consolidation of ROUTER_CORE into ROUTER (logs). DISPATCH-1289 - Added settlement rate computation on links. DISPATCH-1289 - Added 'C' prefix to connection numbers in the server.c INFO logs. --- docs/man/qdstat.8.adoc | 11 ++- include/qpid/dispatch/router_core.h | 13 +++ python/qpid_dispatch/management/qdrouter.json | 25 ++++++ src/router_core/agent_link.c | 53 ++++++++--- src/router_core/agent_link.h | 2 +- src/router_core/agent_router.c | 22 +++-- src/router_core/agent_router.h | 2 +- src/router_core/connections.c | 53 ++++++++--- src/router_core/core_timer.c | 2 + src/router_core/forwarder.c | 2 + src/router_core/router_core.c | 2 +- src/router_core/router_core_private.h | 35 +++++--- src/router_core/terminus.c | 122 ++++++++++++++++++++++++++ src/router_core/transfer.c | 38 ++++++++ src/router_node.c | 23 +++-- src/server.c | 8 +- tests/system_tests_core_client.py | 2 +- tools/qdstat.in | 48 ++++++---- 18 files changed, 390 insertions(+), 73 deletions(-) diff --git a/docs/man/qdstat.8.adoc b/docs/man/qdstat.8.adoc index 42491a2..a7eff2a 100644 --- a/docs/man/qdstat.8.adoc +++ b/docs/man/qdstat.8.adoc @@ -145,6 +145,12 @@ The number of deliveries on this link that were released. mod:: The number of deliveries on this link that were modified. +delay:: +The number of settled deliveries on this link that were unsettled for more than one second. + +rate:: +The average rate (over a period of five seconds) at which deliveries have been settled on this link. + admin:: The administrative status of the link: - 'enabled' - The link is enabled for normal operation. @@ -212,14 +218,11 @@ in-proc:: The number of in-process consumers for this address. local:: -For this router, the number of local consumers for this address. +For this router, the number of local consumers for this address, or for link-routed addresses, the number of locally-attached containers that are destinations for this address. remote:: The number of remote routers that have at least one consumer for this address. -cntnr:: -The number of locally-attached containers that are destinations for link routes on this address. - in:: The number of deliveries for this address that entered the network on this router. diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 2f7da25..65b1393 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -286,6 +286,19 @@ qdr_terminus_t *qdr_terminus(pn_terminus_t *pn); void qdr_terminus_free(qdr_terminus_t *terminus); /** + * qdr_terminus_format + * + * Write a human-readable representation of the terminus content to the string + * in 'output'. + * + * @param terminus The pointer returned by qdr_terminus() + * @param output The string buffer where the result shall be written + * @param size Input: the number of bytes availabie in output for writing. Output: the + * number of bytes remaining after the operation. + */ +void qdr_terminus_format(qdr_terminus_t *terminus, char *output, size_t *size); + +/** * qdr_terminus_copy * * Copy the contents of the qdr_terminus into a proton terminus diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index 5a5900e..b1b253d 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -554,6 +554,16 @@ "description":"Number of deliveries whose delivery state was set to MODIFIED by the router. These deliveries were modified but not processed.", "graph": true }, + "deliveriesDelayed1Sec": { + "type": "integer", + "graph": true, + "description": "The total number of settled deliveries that were held in the router for 1 to 10 seconds." + }, + "deliveriesDelayed10Sec": { + "type": "integer", + "graph": true, + "description": "The total number of settled deliveries that were held in the router for more than 10 seconds." + }, "deliveriesIngress": { "type": "integer", "description":"Number of deliveries that were sent to it by a sender that is directly attached to the router.", @@ -1442,6 +1452,21 @@ "graph": true, "description": "The total number of modified deliveries." }, + "deliveriesDelayed1Sec": { + "type": "integer", + "graph": true, + "description": "The total number of settled deliveries that were held in the router for 1 to 10 seconds." + }, + "deliveriesDelayed10Sec": { + "type": "integer", + "graph": true, + "description": "The total number of settled deliveries that were held in the router for more than 10 seconds." + }, + "settleRate": { + "type": "integer", + "graph": true, + "description": "The average rate (over five seconds) of settlement in deliveries-per-second. This is included for egress links only." + }, "ingressHistogram": { "type": "list", "description": "For outgoing links on connections with 'normal' role. This histogram shows the number of settled deliveries on the link that ingressed the network at each interior router node." diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c index 9600aca..4ed4055 100644 --- a/src/router_core/agent_link.c +++ b/src/router_core/agent_link.c @@ -42,8 +42,11 @@ #define QDR_LINK_REJECTED_COUNT 18 #define QDR_LINK_RELEASED_COUNT 19 #define QDR_LINK_MODIFIED_COUNT 20 -#define QDR_LINK_INGRESS_HISTOGRAM 21 -#define QDR_LINK_PRIORITY 22 +#define QDR_LINK_DELAYED_1SEC 21 +#define QDR_LINK_DELAYED_10SEC 22 +#define QDR_LINK_INGRESS_HISTOGRAM 23 +#define QDR_LINK_PRIORITY 24 +#define QDR_LINK_SETTLE_RATE 25 const char *qdr_link_columns[] = {"name", @@ -67,8 +70,11 @@ const char *qdr_link_columns[] = "rejectedCount", "releasedCount", "modifiedCount", + "deliveriesDelayed1Sec", + "deliveriesDelayed10Sec", "ingressHistogram", "priority", + "settleRate", 0}; static const char *qd_link_type_name(qd_link_type_t lt) @@ -89,7 +95,7 @@ static const char *address_key(qdr_address_t *addr) return addr && addr->hash_handle ? (const char*) qd_hash_key_by_handle(addr->hash_handle) : NULL; } -static void qdr_agent_write_column_CT(qd_composed_field_t *body, int col, qdr_link_t *link) +static void qdr_agent_write_column_CT(qdr_core_t *core, qd_composed_field_t *body, int col, qdr_link_t *link) { char *text = 0; @@ -213,6 +219,14 @@ static void qdr_agent_write_column_CT(qd_composed_field_t *body, int col, qdr_li qd_compose_insert_ulong(body, link->modified_deliveries); break; + case QDR_LINK_DELAYED_1SEC: + qd_compose_insert_ulong(body, link->deliveries_delayed_1sec); + break; + + case QDR_LINK_DELAYED_10SEC: + qd_compose_insert_ulong(body, link->deliveries_delayed_10sec); + break; + case QDR_LINK_INGRESS_HISTOGRAM: if (link->ingress_histogram) { qd_compose_start_list(body); @@ -227,20 +241,39 @@ static void qdr_agent_write_column_CT(qd_composed_field_t *body, int col, qdr_li qd_compose_insert_uint(body, link->priority); break; + case QDR_LINK_SETTLE_RATE: { + uint32_t delta_time = core->uptime_ticks - link->core_ticks; + if (delta_time > 0) { + if (delta_time > QDR_LINK_RATE_DEPTH) + delta_time = QDR_LINK_RATE_DEPTH; + for (uint8_t delta_slots = 0; delta_slots < delta_time; delta_slots++) { + link->rate_cursor = (link->rate_cursor + 1) % QDR_LINK_RATE_DEPTH; + link->settled_deliveries[link->rate_cursor] = 0; + } + link->core_ticks = core->uptime_ticks; + } + + uint64_t total = 0; + for (uint8_t i = 0; i < QDR_LINK_RATE_DEPTH; i++) + total += link->settled_deliveries[i]; + qd_compose_insert_uint(body, total / QDR_LINK_RATE_DEPTH); + } + break; + default: qd_compose_insert_null(body); break; } } -static void qdr_agent_write_link_CT(qdr_query_t *query, qdr_link_t *link) +static void qdr_agent_write_link_CT(qdr_core_t *core, qdr_query_t *query, qdr_link_t *link) { qd_composed_field_t *body = query->body; qd_compose_start_list(body); int i = 0; while (query->columns[i] >= 0) { - qdr_agent_write_column_CT(body, query->columns[i], link); + qdr_agent_write_column_CT(core, body, query->columns[i], link); i++; } qd_compose_end_list(body); @@ -285,7 +318,7 @@ void qdra_link_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset) // // Write the columns of the link into the response body. // - qdr_agent_write_link_CT(query, link); + qdr_agent_write_link_CT(core, query, link); // // Advance to the next address @@ -314,7 +347,7 @@ void qdra_link_get_next_CT(qdr_core_t *core, qdr_query_t *query) // // Write the columns of the link entity into the response body. // - qdr_agent_write_link_CT(query, link); + qdr_agent_write_link_CT(core, query, link); // // Advance to the next link @@ -330,13 +363,13 @@ void qdra_link_get_next_CT(qdr_core_t *core, qdr_query_t *query) } -static void qdr_manage_write_response_map_CT(qd_composed_field_t *body, qdr_link_t *link) +static void qdr_manage_write_response_map_CT(qdr_core_t *core, qd_composed_field_t *body, qdr_link_t *link) { qd_compose_start_map(body); for(int i = 0; i < QDR_LINK_COLUMN_COUNT; i++) { qd_compose_insert_string(body, qdr_link_columns[i]); - qdr_agent_write_column_CT(body, i, link); + qdr_agent_write_column_CT(core, body, i, link); } qd_compose_end_map(body); @@ -386,7 +419,7 @@ static void qdra_link_update_set_status(qdr_core_t *core, qdr_query_t *query, qd { if (link) { //link->admin_state = qd_iterator_copy(adm_state); - qdr_manage_write_response_map_CT(query->body, link); + qdr_manage_write_response_map_CT(core, query->body, link); query->status = QD_AMQP_OK; } else { diff --git a/src/router_core/agent_link.h b/src/router_core/agent_link.h index cd92c1b..80fa8a4 100644 --- a/src/router_core/agent_link.h +++ b/src/router_core/agent_link.h @@ -29,7 +29,7 @@ void qdra_link_update_CT(qdr_core_t *core, qdr_query_t *query, qd_parsed_field_t *in_body); -#define QDR_LINK_COLUMN_COUNT 23 +#define QDR_LINK_COLUMN_COUNT 26 const char *qdr_link_columns[QDR_LINK_COLUMN_COUNT + 1]; diff --git a/src/router_core/agent_router.c b/src/router_core/agent_router.c index f9303c8..d7ea523 100644 --- a/src/router_core/agent_router.c +++ b/src/router_core/agent_router.c @@ -43,11 +43,13 @@ #define QDR_ROUTER_REJECTED_DELIVERIES 16 #define QDR_ROUTER_RELEASED_DELIVERIES 17 #define QDR_ROUTER_MODIFIED_DELIVERIES 18 -#define QDR_ROUTER_DELIVERIES_INGRESS 19 -#define QDR_ROUTER_DELIVERIES_EGRESS 20 -#define QDR_ROUTER_DELIVERIES_TRANSIT 21 -#define QDR_ROUTER_DELIVERIES_INGRESS_ROUTE_CONTAINER 22 -#define QDR_ROUTER_DELIVERIES_EGRESS_ROUTE_CONTAINER 23 +#define QDR_ROUTER_DELAYED_1SEC 19 +#define QDR_ROUTER_DELAYED_10SEC 20 +#define QDR_ROUTER_DELIVERIES_INGRESS 21 +#define QDR_ROUTER_DELIVERIES_EGRESS 22 +#define QDR_ROUTER_DELIVERIES_TRANSIT 23 +#define QDR_ROUTER_DELIVERIES_INGRESS_ROUTE_CONTAINER 24 +#define QDR_ROUTER_DELIVERIES_EGRESS_ROUTE_CONTAINER 25 const char *qdr_router_columns[] = @@ -70,6 +72,8 @@ const char *qdr_router_columns[] = "rejectedDeliveries", "releasedDeliveries", "modifiedDeliveries", + "deliveriesDelayed1Sec", + "deliveriesDelayed10Sec", "deliveriesIngress", "deliveriesEgress", "deliveriesTransit", @@ -174,6 +178,14 @@ static void qdr_agent_write_column_CT(qd_composed_field_t *body, int col, qdr_co qd_compose_insert_ulong(body, core->modified_deliveries); break; + case QDR_ROUTER_DELAYED_1SEC: + qd_compose_insert_ulong(body, core->deliveries_delayed_1sec); + break; + + case QDR_ROUTER_DELAYED_10SEC: + qd_compose_insert_ulong(body, core->deliveries_delayed_10sec); + break; + case QDR_ROUTER_DELIVERIES_INGRESS: qd_compose_insert_ulong(body, core->deliveries_ingress); break; diff --git a/src/router_core/agent_router.h b/src/router_core/agent_router.h index dffc0eb..6a6e35f 100644 --- a/src/router_core/agent_router.h +++ b/src/router_core/agent_router.h @@ -21,7 +21,7 @@ #include "router_core_private.h" -#define QDR_ROUTER_COLUMN_COUNT 24 +#define QDR_ROUTER_COLUMN_COUNT 26 const char *qdr_router_columns[QDR_ROUTER_COLUMN_COUNT + 1]; diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 8498ff3..0c58da4 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -23,6 +23,7 @@ #include <qpid/dispatch/amqp.h> #include <stdio.h> #include <strings.h> +#include <inttypes.h> #include "router_core_private.h" #include "core_link_endpoint.h" @@ -475,6 +476,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, link->credit_pending = conn->link_capacity; link->admin_enabled = true; link->oper_status = QDR_LINK_OPER_DOWN; + link->core_ticks = conn->core->uptime_ticks; link->terminus_survives_disconnect = qdr_terminus_survives_disconnect(local_terminus); link->strip_annotations_in = conn->strip_annotations_in; @@ -816,7 +818,7 @@ static void qdr_link_abort_undelivered_CT(qdr_core_t *core, qdr_link_t *link) } -static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link) +static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link, const char *log_text) { // // Remove the link from the master list of links @@ -900,6 +902,17 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li free(link->ingress_histogram); free(link->insert_prefix); free(link->strip_prefix); + + // + // Log the link closure + // + qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"][L%"PRIu64"] %s: del=%"PRIu64" presett=%"PRIu64" psdrop=%"PRIu64 + " acc=%"PRIu64" rej=%"PRIu64" rel=%"PRIu64" mod=%"PRIu64" delay1=%"PRIu64" delay10=%"PRIu64, + conn->identity, link->identity, log_text, link->total_deliveries, link->presettled_deliveries, + link->dropped_presettled_deliveries, link->accepted_deliveries, link->rejected_deliveries, + link->released_deliveries, link->modified_deliveries, link->deliveries_delayed_1sec, + link->deliveries_delayed_10sec); + free_qdr_link_t(link); } @@ -931,9 +944,10 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core, qdr_generate_link_name("qdlink", link->name, QD_DISCRIMINATOR_SIZE + 8); link->admin_enabled = true; link->oper_status = QDR_LINK_OPER_DOWN; - link->insert_prefix = 0; - link->strip_prefix = 0; - link->attach_count = 1; + link->insert_prefix = 0; + link->strip_prefix = 0; + link->attach_count = 1; + link->core_ticks = core->uptime_ticks; link->strip_annotations_in = conn->strip_annotations_in; link->strip_annotations_out = conn->strip_annotations_out; @@ -1196,7 +1210,6 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo qdr_connection_t *conn = action->args.connection.conn; - // // Deactivate routes associated with this connection // @@ -1236,7 +1249,7 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo // // Clean up the link and all its associated state. // - qdr_link_cleanup_CT(core, conn, link); // link_cleanup disconnects and frees the ref. + qdr_link_cleanup_CT(core, conn, link, "Link closed due to connection loss"); // link_cleanup disconnects and frees the ref. link_ref = DEQ_HEAD(conn->links); } @@ -1260,6 +1273,8 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo qdrc_event_conn_raise(core, QDRC_EVENT_CONN_CLOSED, conn); + qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"] Connection Closed", conn->identity); + DEQ_REMOVE(core->open_connections, conn); qdr_connection_free(conn); } @@ -1375,6 +1390,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_FORBIDDEN, true); qdr_terminus_free(source); qdr_terminus_free(target); + qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"] Router attach forbidden on non-inter-router connection", conn->identity); return; } @@ -1388,6 +1404,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_WRONG_ROLE, true); qdr_terminus_free(source); qdr_terminus_free(target); + qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"] Endpoint attach forbidden on inter-router connection", conn->identity); return; } @@ -1412,6 +1429,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true); qdr_terminus_free(source); qdr_terminus_free(target); + qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"] Endpoint attach failed - no address lookup handler", conn->identity); return; } } @@ -1439,6 +1457,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true); qdr_terminus_free(source); qdr_terminus_free(target); + qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"] Endpoint attach failed - no address lookup handler", conn->identity); return; } break; @@ -1460,6 +1479,17 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act break; } } + + char source_str[1000]; + char target_str[1000]; + size_t source_len = 1000; + size_t target_len = 1000; + + qdr_terminus_format(source, source_str, &source_len); + qdr_terminus_format(target, target_str, &target_len); + + qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"][L%"PRIu64"] Link attached: dir=%s source=%s target=%s", + conn->identity, link->identity, dir == QD_INCOMING ? "in" : "out", source_str, target_str); } @@ -1615,9 +1645,8 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b // // If the link is completely detached, release its resources // - if (link->detach_send_done) { - qdr_link_cleanup_CT(core, conn, link); - } + if (link->detach_send_done) + qdr_link_cleanup_CT(core, conn, link, "Link detached"); return; } @@ -1709,11 +1738,11 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NONE, dt == QD_CLOSED); } else { // no detach can be sent out because the connection was lost - qdr_link_cleanup_CT(core, conn, link); + qdr_link_cleanup_CT(core, conn, link, "Link lost"); } } else if (link->detach_send_done) { // detach count indicates detach has been scheduled // I/O thread is finished sending detach, ok to free link now - qdr_link_cleanup_CT(core, conn, link); + qdr_link_cleanup_CT(core, conn, link, "Link detached"); } // @@ -1741,7 +1770,7 @@ static void qdr_link_detach_sent_CT(qdr_core_t *core, qdr_action_t *action, bool link->detach_send_done = true; if (link->conn && link->detach_received) { // link is fully detached - qdr_link_cleanup_CT(core, link->conn, link); + qdr_link_cleanup_CT(core, link->conn, link, "Link detached"); } } } diff --git a/src/router_core/core_timer.c b/src/router_core/core_timer.c index a20cd64..d34beb1 100644 --- a/src/router_core/core_timer.c +++ b/src/router_core/core_timer.c @@ -109,6 +109,8 @@ void qdr_process_tick_CT(qdr_core_t *core, qdr_action_t *action, bool discard) if (discard) return; + core->uptime_ticks++; + qdr_core_timer_t *timer = DEQ_HEAD(core->scheduled_timers); qdr_core_timer_t *timer_next = 0; diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 9d2437c..1d6a208 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -125,6 +125,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in out_dlv->tag_length = 8; out_dlv->error = 0; + out_dlv->ingress_time = in_dlv ? in_dlv->ingress_time : core->uptime_ticks; out_dlv->ingress_index = in_dlv ? in_dlv->ingress_index : -1; // @@ -185,6 +186,7 @@ static void qdr_forward_drop_presettled_CT_LH(qdr_core_t *core, qdr_link_t *link free_qdr_link_work_t(dlv->link_work); dlv->link_work = 0; } + dlv->disposition = PN_RELEASED; qdr_delivery_decref_CT(core, dlv, "qdr_forward_drop_presettled_CT_LH - remove from link-work list"); // Increment the presettled_dropped_deliveries on the out_link diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index 5b5fbec..a66a703 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -55,7 +55,7 @@ qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, // // Set up the logging sources for the router core // - core->log = qd_log_source("ROUTER_CORE"); + core->log = qd->router->log_source; core->agent_log = qd_log_source("AGENT"); // diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index ba5f790..eb58837 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -393,6 +393,7 @@ struct qdr_delivery_t { qd_iterator_t *to_addr; qd_iterator_t *origin; uint64_t disposition; + uint32_t ingress_time; pn_data_t *extension_state; qdr_error_t *error; bool settled; @@ -431,6 +432,8 @@ typedef enum { QDR_LINK_OPER_IDLE } qdr_link_oper_status_t; +#define QDR_LINK_RATE_DEPTH 5 + struct qdr_link_t { DEQ_LINKS(qdr_link_t); qdr_core_t *core; @@ -480,8 +483,13 @@ struct qdr_link_t { uint64_t rejected_deliveries; uint64_t released_deliveries; uint64_t modified_deliveries; + uint64_t deliveries_delayed_1sec; + uint64_t deliveries_delayed_10sec; + uint64_t settled_deliveries[QDR_LINK_RATE_DEPTH]; uint64_t *ingress_histogram; uint8_t priority; + uint8_t rate_cursor; + uint32_t core_ticks; }; ALLOC_DECLARE(qdr_link_t); @@ -762,6 +770,7 @@ struct qdr_core_t { qdr_core_timer_list_t scheduled_timers; qdr_general_work_list_t work_list; qd_timer_t *work_timer; + uint32_t uptime_ticks; qdr_connection_list_t open_connections; qdr_connection_t *active_edge_connection; @@ -847,19 +856,19 @@ struct qdr_core_t { qdr_delivery_cleanup_list_t delivery_cleanup_list; ///< List of delivery cleanup items to be processed in an IO thread // Overall delivery counters - uint64_t presettled_deliveries; - uint64_t dropped_presettled_deliveries; - uint64_t accepted_deliveries; - uint64_t rejected_deliveries; - uint64_t released_deliveries; - uint64_t modified_deliveries; - uint64_t deliveries_ingress; - uint64_t deliveries_egress; - uint64_t deliveries_transit; - uint64_t deliveries_egress_route_container; - uint64_t deliveries_ingress_route_container; - - + uint64_t presettled_deliveries; + uint64_t dropped_presettled_deliveries; + uint64_t accepted_deliveries; + uint64_t rejected_deliveries; + uint64_t released_deliveries; + uint64_t modified_deliveries; + uint64_t deliveries_ingress; + uint64_t deliveries_egress; + uint64_t deliveries_transit; + uint64_t deliveries_egress_route_container; + uint64_t deliveries_ingress_route_container; + uint64_t deliveries_delayed_1sec; + uint64_t deliveries_delayed_10sec; }; struct qdr_terminus_t { diff --git a/src/router_core/terminus.c b/src/router_core/terminus.c index 58ec9e5..b88befc 100644 --- a/src/router_core/terminus.c +++ b/src/router_core/terminus.c @@ -20,6 +20,7 @@ #include "router_core_private.h" #include <strings.h> #include <stdio.h> +#include <inttypes.h> ALLOC_DEFINE(qdr_terminus_t); @@ -75,6 +76,127 @@ void qdr_terminus_free(qdr_terminus_t *term) } +void qdr_terminus_format(qdr_terminus_t *term, char *output, size_t *size) +{ + size_t len = snprintf(output, *size, "{"); + + output += len; + *size -= len; + len = 0; + + do { + if (term == 0) + break; + + if (term->coordinator) { + len = snprintf(output, *size, "<coordinator>"); + break; + } + + if (term->dynamic) + len = snprintf(output, *size, "<dynamic>"); + else if (term->address && term->address->iterator) { + qd_iterator_reset_view(term->address->iterator, ITER_VIEW_ALL); + len = qd_iterator_ncopy(term->address->iterator, (unsigned char*) output, *size); + } else if (term->address == 0) + len = snprintf(output, *size, "<none>"); + + output += len; + *size -= len; + + char *text = ""; + switch (term->durability) { + case PN_NONDURABLE: break; + case PN_CONFIGURATION: text = " dur:config"; break; + case PN_DELIVERIES: text = " dur:deliveries"; break; + } + + len = snprintf(output, *size, text); + output += len; + *size -= len; + + switch (term->expiry_policy) { + case PN_EXPIRE_WITH_LINK: text = " expire:link"; break; + case PN_EXPIRE_WITH_SESSION: text = " expire:sess"; break; + case PN_EXPIRE_WITH_CONNECTION: text = " expire:conn"; break; + case PN_EXPIRE_NEVER: text = ""; break; + } + + len = snprintf(output, *size, text); + output += len; + *size -= len; + + switch (term->distribution_mode) { + case PN_DIST_MODE_UNSPECIFIED: text = ""; break; + case PN_DIST_MODE_COPY: text = " dist:copy"; break; + case PN_DIST_MODE_MOVE: text = " dist:move"; break; + } + + len = snprintf(output, *size, text); + output += len; + *size -= len; + + if (term->timeout > 0) { + len = snprintf(output, *size, " timeout:%"PRIu32, term->timeout); + output += len; + *size -= len; + } + + if (term->capabilities && pn_data_size(term->capabilities) > 0) { + len = snprintf(output, *size, " caps:"); + output += len; + *size -= len; + + len = *size; + pn_data_format(term->capabilities, output, &len); + output += len; + *size -= len; + } + + if (term->filter && pn_data_size(term->filter) > 0) { + len = snprintf(output, *size, " flt:"); + output += len; + *size -= len; + + len = *size; + pn_data_format(term->filter, output, &len); + output += len; + *size -= len; + } + + if (term->outcomes && pn_data_size(term->outcomes) > 0) { + len = snprintf(output, *size, " outcomes:"); + output += len; + *size -= len; + + len = *size; + pn_data_format(term->outcomes, output, &len); + output += len; + *size -= len; + } + + if (term->properties && pn_data_size(term->properties) > 0) { + len = snprintf(output, *size, " props:"); + output += len; + *size -= len; + + len = *size; + pn_data_format(term->properties, output, &len); + output += len; + *size -= len; + } + + len = 0; + } while (false); + + output += len; + *size -= len; + + len = snprintf(output, *size, "}"); + *size -= len; +} + + void qdr_terminus_copy(qdr_terminus_t *from, pn_terminus_t *to) { if (!from) { diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 08b6cde..a9caffd 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -554,17 +554,22 @@ void qdr_increment_delivery_counters_CT(qdr_core_t *core, qdr_delivery_t *delive { qdr_link_t *link = delivery->link; if (link) { + bool do_rate = false; + if (delivery->presettled) { + do_rate = delivery->disposition != PN_RELEASED; link->presettled_deliveries++; if (link->link_direction == QD_INCOMING && link->link_type == QD_LINK_ENDPOINT) core->presettled_deliveries++; } else if (delivery->disposition == PN_ACCEPTED) { + do_rate = true; link->accepted_deliveries++; if (link->link_direction == QD_INCOMING) core->accepted_deliveries++; } else if (delivery->disposition == PN_REJECTED) { + do_rate = true; link->rejected_deliveries++; if (link->link_direction == QD_INCOMING) core->rejected_deliveries++; @@ -580,8 +585,36 @@ void qdr_increment_delivery_counters_CT(qdr_core_t *core, qdr_delivery_t *delive core->modified_deliveries++; } + uint32_t delay = core->uptime_ticks - delivery->ingress_time; + if (delay > 10) { + link->deliveries_delayed_10sec++; + if (link->link_direction == QD_INCOMING) + core->deliveries_delayed_10sec++; + } else if (delay > 1) { + link->deliveries_delayed_1sec++; + if (link->link_direction == QD_INCOMING) + core->deliveries_delayed_1sec++; + } + if (qd_bitmask_valid_bit_value(delivery->ingress_index) && link->ingress_histogram) link->ingress_histogram[delivery->ingress_index]++; + + // + // Compute the settlement rate + // + if (do_rate) { + uint32_t delta_time = core->uptime_ticks - link->core_ticks; + if (delta_time > 0) { + if (delta_time > QDR_LINK_RATE_DEPTH) + delta_time = QDR_LINK_RATE_DEPTH; + for (uint8_t delta_slots = 0; delta_slots < delta_time; delta_slots++) { + link->rate_cursor = (link->rate_cursor + 1) % QDR_LINK_RATE_DEPTH; + link->settled_deliveries[link->rate_cursor] = 0; + } + link->core_ticks = core->uptime_ticks; + } + link->settled_deliveries[link->rate_cursor]++; + } } } @@ -1019,6 +1052,11 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis qdr_link_t *link = dlv->link; // + // Record the ingress time so we can track the age of this delivery. + // + dlv->ingress_time = core->uptime_ticks; + + // // If the link is an edge link, mark this delivery as via-edge // dlv->via_edge = link->edge; diff --git a/src/router_node.c b/src/router_node.c index 6e9cc25..e87adcd 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -292,7 +292,7 @@ static void log_link_message(qd_connection_t *conn, pn_link_t *pn_link, qd_messa const char *src = pn_terminus_get_address(pn_link_source(pn_link)); const char *tgt = pn_terminus_get_address(pn_link_target(pn_link)); qd_log(qd_message_log_source(), QD_LOG_TRACE, - "[%"PRIu64"]: %s %s on link '%s' (%s -> %s)", + "[C%"PRIu64"]: %s %s on link '%s' (%s -> %s)", qd_connection_connection_id(conn), pn_link_is_sender(pn_link) ? "Sent" : "Received", msg_str, @@ -1145,15 +1145,18 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool } - qdr_connection_info_t *connection_info = qdr_connection_info(tport && pn_transport_is_encrypted(tport), - tport && pn_transport_is_authenticated(tport), + bool encrypted = tport && pn_transport_is_encrypted(tport); + bool authenticated = tport && pn_transport_is_authenticated(tport); + + qdr_connection_info_t *connection_info = qdr_connection_info(encrypted, + authenticated, conn->opened, - (char *)mech, + (char*) mech, conn->connector ? QD_OUTGOING : QD_INCOMING, host, proto, cipher, - (char *)user, + (char*) user, container, props, ssl_ssf, @@ -1168,6 +1171,16 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool vhost, connection_info, bind_connection_context, conn); + + char props_str[1000]; + size_t props_len = 1000; + + pn_data_format(props, props_str, &props_len); + + qd_log(router->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connection Opened: dir=%s host=%s vhost=%s encrypted=%s" + " auth=%s user=%s container_id=%s props=%s", + connection_id, inbound ? "in" : "out", host, vhost ? vhost : "", encrypted ? proto : "no", + authenticated ? mech : "no", (char*) user, container, props_str); } static int AMQP_inbound_opened_handler(void *type_context, qd_connection_t *conn, void *context) diff --git a/src/server.c b/src/server.c index 760126d..bcd4a82 100644 --- a/src/server.c +++ b/src/server.c @@ -692,7 +692,7 @@ static void on_connection_bound(qd_server_t *server, pn_event_t *e) { pn_sasl_set_allow_insecure_mechs(sasl, config->allowInsecureAuthentication); sys_mutex_unlock(ctx->server->lock); - qd_log(ctx->server->log_source, QD_LOG_INFO, "[%"PRIu64"]: Accepted connection to %s from %s", + qd_log(ctx->server->log_source, QD_LOG_INFO, "[C%"PRIu64"] Accepted connection to %s from %s", ctx->connection_id, name, ctx->rhost_port); } else if (ctx->connector) { /* Establishing an outgoing connection */ config = &ctx->connector->config; @@ -969,14 +969,14 @@ static bool handle(qd_server_t *qd_server, pn_event_t *e, pn_connection_t *pn_co qd_increment_conn_index(ctx); const qd_server_config_t *config = &ctx->connector->config; if (condition && pn_condition_is_set(condition)) { - qd_log(qd_server->log_source, QD_LOG_INFO, "[%"PRIu64"]: Connection to %s failed: %s %s", ctx->connection_id, config->host_port, + qd_log(qd_server->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connection to %s failed: %s %s", ctx->connection_id, config->host_port, pn_condition_get_name(condition), pn_condition_get_description(condition)); } else { - qd_log(qd_server->log_source, QD_LOG_INFO, "[%"PRIu64"]: Connection to %s failed", ctx->connection_id, config->host_port); + qd_log(qd_server->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connection to %s failed", ctx->connection_id, config->host_port); } } else if (ctx && ctx->listener) { /* Incoming connection */ if (condition && pn_condition_is_set(condition)) { - qd_log(ctx->server->log_source, QD_LOG_INFO, "[%"PRIu64"]: Connection from %s (to %s) failed: %s %s", + qd_log(ctx->server->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connection from %s (to %s) failed: %s %s", ctx->connection_id, ctx->rhost_port, ctx->listener->config.host_port, pn_condition_get_name(condition), pn_condition_get_description(condition)); } diff --git a/tests/system_tests_core_client.py b/tests/system_tests_core_client.py index 9030bcd..742c6e0 100644 --- a/tests/system_tests_core_client.py +++ b/tests/system_tests_core_client.py @@ -236,7 +236,7 @@ class TestCallTimeout(TestService): def on_timer_task(self, event): log = self.service.qm.get_log() for e in log: - if (e[0] == 'ROUTER_CORE'and e[1] == 'error' + if (e[0] == 'ROUTER' and e[1] == 'error' and e[2] == 'client test request done ' 'error=Timed out'): # yes this is the line you're looking for: diff --git a/tools/qdstat.in b/tools/qdstat.in index b0846e0..51c2e20 100755 --- a/tools/qdstat.in +++ b/tools/qdstat.in @@ -321,6 +321,11 @@ class BusManager(Node): rows.append(('Rejected Count', router.rejectedDeliveries)) rows.append(('Released Count', router.releasedDeliveries)) rows.append(('Modified Count', router.modifiedDeliveries)) + try: + rows.append(('Deliveries Delayed > 1sec', router.deliveriesDelayed1Sec)) + rows.append(('Deliveries Delayed > 10sec', router.deliveriesDelayed10Sec)) + except: + pass rows.append(('Ingress Count', router.deliveriesIngress)) rows.append(('Egress Count', router.deliveriesEgress)) rows.append(('Transit Count', router.deliveriesTransit)) @@ -351,12 +356,14 @@ class BusManager(Node): cols = ('linkType', 'linkDir', 'connectionId', 'identity', 'peer', 'owningAddr', 'capacity', 'undeliveredCount', 'unsettledCount', 'deliveryCount', 'presettledCount', 'droppedPresettledCount', 'acceptedCount', 'rejectedCount', 'releasedCount', - 'modifiedCount', 'adminStatus', 'operStatus', 'linkName', 'priority') + 'modifiedCount', 'deliveriesDelayed1Sec', 'deliveriesDelayed10Sec', 'adminStatus', 'operStatus', + 'linkName', 'priority', 'settleRate') objects = self.query('org.apache.qpid.dispatch.router.link', cols, limit=self.opts.limit) has_dropped_presettled_count = False has_priority = False + has_delayed = False if objects: first_row = objects[0] @@ -365,15 +372,16 @@ class BusManager(Node): has_dropped_presettled_count = True if hasattr(first_row, 'priority'): has_priority = True + if hasattr(first_row, 'deliveriesDelayed1Sec'): + has_delayed = True if has_priority: heads.append(Header("pri")) heads.append(Header("undel")) heads.append(Header("unsett")) - heads.append(Header("del")) + heads.append(Header("deliv")) heads.append(Header("presett")) - if has_dropped_presettled_count: heads.append(Header("psdrop")) @@ -381,9 +389,12 @@ class BusManager(Node): heads.append(Header("rej")) heads.append(Header("rel")) heads.append(Header("mod")) - heads.append(Header("admin")) - heads.append(Header("oper")) + if has_delayed: + heads.append(Header("delay")) + heads.append(Header("rate")) if self.opts.verbose: + heads.append(Header("admin")) + heads.append(Header("oper")) heads.append(Header("name")) for link in objects: @@ -409,9 +420,12 @@ class BusManager(Node): row.append(link.rejectedCount) row.append(link.releasedCount) row.append(link.modifiedCount) - row.append(link.adminStatus) - row.append(link.operStatus) + if has_delayed: + row.append(link.deliveriesDelayed1Sec + link.deliveriesDelayed10Sec) + row.append(link.settleRate) if self.opts.verbose: + row.append(link.adminStatus) + row.append(link.operStatus) row.append(link.linkName) rows.append(row) title = "Router Links" @@ -491,15 +505,16 @@ class BusManager(Node): if has_priority: heads.append(Header("pri")) - heads.append(Header("in-proc", Header.COMMAS)) + if self.opts.verbose: + heads.append(Header("in-proc", Header.COMMAS)) heads.append(Header("local", Header.COMMAS)) heads.append(Header("remote", Header.COMMAS)) - heads.append(Header("cntnr", Header.COMMAS)) heads.append(Header("in", Header.COMMAS)) heads.append(Header("out", Header.COMMAS)) heads.append(Header("thru", Header.COMMAS)) - heads.append(Header("to-proc", Header.COMMAS)) - heads.append(Header("from-proc", Header.COMMAS)) + if self.opts.verbose: + heads.append(Header("to-proc", Header.COMMAS)) + heads.append(Header("from-proc", Header.COMMAS)) for addr in objects: row = [] @@ -509,15 +524,16 @@ class BusManager(Node): row.append(addr.distribution) if has_priority: row.append(addr.priority if addr.priority >= 0 else "") - row.append(addr.inProcess) - row.append(addr.subscriberCount) + if self.opts.verbose: + row.append(addr.inProcess) + row.append(addr.containerCount if addr.name[0] in 'CDEF' else addr.subscriberCount) row.append(addr.remoteCount) - row.append(addr.containerCount) row.append(addr.deliveriesIngress) row.append(addr.deliveriesEgress) row.append(addr.deliveriesTransit) - row.append(addr.deliveriesToContainer) - row.append(addr.deliveriesFromContainer) + if self.opts.verbose: + row.append(addr.deliveriesToContainer) + row.append(addr.deliveriesFromContainer) rows.append(row) title = "Router Addresses" sorter = Sorter(heads, rows, 'addr', 0, True) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
