Repository: qpid-dispatch Updated Branches: refs/heads/tross-DISPATCH-179-1 ccf4c7086 -> af71e0182
DISPATCH-179 - Added statistics for address and link, finished agent for link. Numerous fixes. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/86852b2b Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/86852b2b Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/86852b2b Branch: refs/heads/tross-DISPATCH-179-1 Commit: 86852b2b02807c100b11eb8daea1a03e0b8ed1f9 Parents: ccf4c70 Author: Ted Ross <[email protected]> Authored: Fri Jan 29 11:29:06 2016 -0500 Committer: Ted Ross <[email protected]> Committed: Fri Jan 29 11:29:06 2016 -0500 ---------------------------------------------------------------------- include/qpid/dispatch/router_core.h | 4 +- python/qpid_dispatch/management/qdrouter.json | 39 +++++++-- src/router_core/agent.c | 19 ++--- src/router_core/agent_link.c | 94 +++++++++++++--------- src/router_core/connections.c | 89 +++++++++++++++----- src/router_core/forwarder.c | 30 ++++++- src/router_core/route_tables.c | 2 +- src/router_core/router_core_private.h | 13 +-- src/router_core/transfer.c | 30 ++++--- src/router_node.c | 26 ++++-- tests/parse_test.c | 1 + tools/qdstat | 20 +++-- 12 files changed, 245 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/86852b2b/include/qpid/dispatch/router_core.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index c081b6e..0260fad 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -132,7 +132,6 @@ typedef enum { QD_LINK_CONTROL, ///< A link to a peer router for control messages QD_LINK_ROUTER ///< A link to a peer router for routed messages } qd_link_type_t; -ENUM_DECLARE(qd_link_type); typedef enum { QDR_ROLE_NORMAL, @@ -528,8 +527,7 @@ void qdr_connection_handlers(qdr_core_t *core, ****************************************************************************** */ void qdr_delivery_free(qdr_delivery_t *delivery); -void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disp); -void qdr_delivery_settle(qdr_core_t *core, qdr_delivery_t *delivery); +void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disp, bool settled); void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context); void *qdr_delivery_get_context(qdr_delivery_t *delivery); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/86852b2b/python/qpid_dispatch/management/qdrouter.json ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index dd54457..1de8a45 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -832,13 +832,38 @@ "description": "Link to another AMQP endpoint: router node, client or other AMQP process.", "extends": "operationalEntity", "attributes": { - "linkName": {"type": "string"}, - "linkType": {"type": ["endpoint", "waypoint", "inter-router", "inter-area"]}, - "linkDir": {"type": ["in", "out"]}, - "owningAddr": {"type": "string"}, - "eventFifoDepth": {"type": "integer", "graph": true}, - "msgFifoDepth": {"type": "integer", "graph": true}, - "remoteContainer": {"type": "string"} + "linkName": { + "type": "string", + "description": "Name assigned to the link in the Attach." + }, + "linkType": { + "type": ["endpoint", "waypoint", "router-control", "inter-router"], + "description": "Type of link: endpoint: a link to a normally connected endpoint; waypoint: a link to a waypoint node; inter-router: a link to another router in the network." + }, + "linkDir": { + "type": ["in", "out"], + "description": "Direction of delivery flow over the link, inbound or outbound to or from the router." + }, + "owningAddr": { + "type": "string", + "description": "Address assigned to this link during attach: The target for inbound links or the source for outbound links." + }, + "capacity": { + "type": "integer", + "description": "The capacity, in deliveries, for the link. The number of undelivered plus unsettled deliveries shall not exceed the capacity. This is enforced by link flow control." + }, + "undeliveredCount": { + "type": "integer", + "description": "The number of undelivered messages pending for the link." + }, + "unsettledCount": { + "type": "integer", + "description": "The number of unsettled deliveries awaiting settlement on the link" + }, + "deliveryCount": { + "type": "integer", + "description": "The total number of deliveries that have traversed this link." + } } }, http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/86852b2b/src/router_core/agent.c ---------------------------------------------------------------------- diff --git a/src/router_core/agent.c b/src/router_core/agent.c index 5c94374..3f42fd2 100644 --- a/src/router_core/agent.c +++ b/src/router_core/agent.c @@ -43,20 +43,21 @@ static const char *qdr_address_columns[] = static const char *qdr_link_columns[] = - {"linkType", - "name", + {"name", + "identity", + "type", + "linkName", + "linkType", "linkDir", - "msgFifoDepth", "owningAddr", - "remoteContainer", - "linkName", - "eventFifoDepth", - "type", - "identity", + "capacity", + "undeliveredCount", + "unsettledCount", + "deliveryCount", 0}; -#define QDR_LINK_COLUMN_COUNT 10 +#define QDR_LINK_COLUMN_COUNT 11 static void qdr_manage_read_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_manage_create_CT(qdr_core_t *core, qdr_action_t *action, bool discard); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/86852b2b/src/router_core/agent_link.c ---------------------------------------------------------------------- diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c index e3cc484..4d27425 100644 --- a/src/router_core/agent_link.c +++ b/src/router_core/agent_link.c @@ -19,25 +19,38 @@ #include "agent_link.h" -#define QDR_LINK_LINK_TYPE 0 -#define QDR_LINK_LINK_NAME 1 -#define QDR_LINK_LINK_DIR 2 //done -#define QDR_LINK_MSG_FIFO_DEPTH 3 //done -#define QDR_LINK_OWNING_ADDR 4 //done -#define QDR_LINK_REMOTE_CONTAINER 5 -#define QDR_LINK_NAME 6 -#define QDR_LINK_EVENT_FIFO_DEPTH 7 //done -#define QDR_LINK_TYPE 8 -#define QDR_LINK_IDENTITY 9 - -static const char *qd_link_type_names[] = { "endpoint", "waypoint", "inter-router", "inter-area" }; -ENUM_DEFINE(qd_link_type, qd_link_type_names); - -static const char *address_key(qdr_address_t *addr) { +#define QDR_LINK_NAME 0 +#define QDR_LINK_IDENTITY 1 +#define QDR_LINK_TYPE 2 +#define QDR_LINK_LINK_NAME 3 +#define QDR_LINK_LINK_TYPE 4 +#define QDR_LINK_LINK_DIR 5 +#define QDR_LINK_OWNING_ADDR 6 +#define QDR_LINK_CAPACITY 7 +#define QDR_LINK_UNDELIVERED_COUNT 8 +#define QDR_LINK_UNSETTLED_COUNT 9 +#define QDR_LINK_DELIVERY_COUNT 10 + +static const char *qd_link_type_name(qd_link_type_t lt) +{ + switch (lt) { + case QD_LINK_ENDPOINT : return "endpoint"; + case QD_LINK_WAYPOINT : return "waypoint"; + case QD_LINK_CONTROL : return "router-control"; + case QD_LINK_ROUTER : return "inter-router"; + } + + return ""; +} + + +static const char *address_key(qdr_address_t *addr) +{ return addr && addr->hash_handle ? (const char*) qd_hash_key_by_handle(addr->hash_handle) : NULL; } -static void qdr_agent_write_link_CT(qdr_query_t *query, qdr_link_t *link ) + +static void qdr_agent_write_link_CT(qdr_query_t *query, qdr_link_t *link) { qd_composed_field_t *body = query->body; @@ -54,32 +67,39 @@ static void qdr_agent_write_link_CT(qdr_query_t *query, qdr_link_t *link ) qd_compose_insert_string(body, "org.apache.qpid.dispatch.router.link"); break; - case QDR_LINK_REMOTE_CONTAINER: - qd_compose_insert_null(body); // FIXME - break; - case QDR_LINK_LINK_NAME: - qd_compose_insert_null(body); // FIXME + qd_compose_insert_string(body, link->name); break; case QDR_LINK_LINK_TYPE: qd_compose_insert_string(body, qd_link_type_name(link->link_type)); break; + case QDR_LINK_LINK_DIR: + qd_compose_insert_string(body, link->link_direction == QD_INCOMING ? "in" : "out"); + break; + case QDR_LINK_OWNING_ADDR: - qd_compose_insert_string(body, address_key(link->owning_addr)); + if (link->owning_addr) + qd_compose_insert_string(body, address_key(link->owning_addr)); + else + qd_compose_insert_null(body); break; - case QDR_LINK_LINK_DIR: - qd_compose_insert_string(body, link->link_direction == QD_INCOMING ? "in" : "out"); + case QDR_LINK_CAPACITY: + qd_compose_insert_uint(body, link->capacity); + break; + + case QDR_LINK_UNDELIVERED_COUNT: + qd_compose_insert_ulong(body, DEQ_SIZE(link->undelivered)); break; - case QDR_LINK_MSG_FIFO_DEPTH: - qd_compose_insert_ulong(body, 0); // FIXME + case QDR_LINK_UNSETTLED_COUNT: + qd_compose_insert_ulong(body, DEQ_SIZE(link->unsettled)); break; - case QDR_LINK_EVENT_FIFO_DEPTH: - qd_compose_insert_ulong(body, 0); // FIXME + case QDR_LINK_DELIVERY_COUNT: + qd_compose_insert_ulong(body, link->total_deliveries); break; default: @@ -95,10 +115,10 @@ static void qdr_manage_advance_link_CT(qdr_query_t *query, qdr_link_t *link) { query->next_offset++; link = DEQ_NEXT(link); - if (link) + if (link) { query->more = true; //query->next_key = qdr_field((const char*) qd_hash_key_by_handle(link->owning_addr->hash_handle)); - else + } else query->more = false; } @@ -113,7 +133,7 @@ void qdra_link_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset) // // If the offset goes beyond the set of links, end the query now. // - if (true /*offset >= DEQ_SIZE(core->links)*/) { // FIXME + if (offset >= DEQ_SIZE(core->open_links)) { query->more = false; qdr_agent_enqueue_response_CT(core, query); return; @@ -122,7 +142,7 @@ void qdra_link_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset) // // Run to the address at the offset. // - qdr_link_t *link = 0; // DEQ_HEAD(core->links); FIXME + qdr_link_t *link = DEQ_HEAD(core->open_links); for (int i = 0; i < offset && link; i++) link = DEQ_NEXT(link); assert(link); @@ -149,17 +169,11 @@ void qdra_link_get_next_CT(qdr_core_t *core, qdr_query_t *query) { qdr_link_t *link = 0; - if (!link) { - // - // If the address was removed in the time between this get and the previous one, - // we need to use the saved offset, which is less efficient. - // - if (false /*query->next_offset < DEQ_SIZE(core->links)*/) { // FIXME - link = 0; //DEQ_HEAD(core->links); + if (query->next_offset < DEQ_SIZE(core->open_links)) { + link = DEQ_HEAD(core->open_links); for (int i = 0; i < query->next_offset && link; i++) link = DEQ_NEXT(link); } - } if (link) { // http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/86852b2b/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 7e998f0..6f4e405 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -114,14 +114,10 @@ void *qdr_connection_get_context(const qdr_connection_t *conn) int qdr_connection_process(qdr_connection_t *conn) { qdr_connection_work_list_t work_list; - qdr_link_ref_list_t links_with_deliveries; - qdr_link_ref_list_t links_with_credit; qdr_core_t *core = conn->core; sys_mutex_lock(conn->work_lock); DEQ_MOVE(conn->work_list, work_list); - DEQ_MOVE(conn->links_with_deliveries, links_with_deliveries); - DEQ_MOVE(conn->links_with_credit, links_with_credit); sys_mutex_unlock(conn->work_lock); int event_count = DEQ_SIZE(work_list); @@ -150,20 +146,38 @@ int qdr_connection_process(qdr_connection_t *conn) work = DEQ_HEAD(work_list); } - qdr_link_ref_t *ref = DEQ_HEAD(links_with_deliveries); - while (ref) { - core->push_handler(core->user_context, ref->link); - qdr_del_link_ref(&links_with_deliveries, ref->link, QDR_LINK_LIST_CLASS_DELIVERY); - ref = DEQ_HEAD(links_with_deliveries); - } - - ref = DEQ_HEAD(links_with_credit); - while (ref) { - core->flow_handler(core->user_context, ref->link, ref->link->incremental_credit); - ref->link->incremental_credit = 0; - qdr_del_link_ref(&links_with_credit, ref->link, QDR_LINK_LIST_CLASS_FLOW); - ref = DEQ_HEAD(links_with_credit); - } + qdr_link_ref_t *ref; + qdr_link_t *link; + + do { + sys_mutex_lock(conn->work_lock); + ref = DEQ_HEAD(conn->links_with_deliveries); + if (ref) { + link = ref->link; + qdr_del_link_ref(&conn->links_with_deliveries, ref->link, QDR_LINK_LIST_CLASS_DELIVERY); + } else + link = 0; + sys_mutex_unlock(conn->work_lock); + + if (link) + core->push_handler(core->user_context, link); + } while (link); + + do { + sys_mutex_lock(conn->work_lock); + ref = DEQ_HEAD(conn->links_with_credit); + if (ref) { + link = ref->link; + qdr_del_link_ref(&conn->links_with_credit, ref->link, QDR_LINK_LIST_CLASS_FLOW); + } else + link = 0; + sys_mutex_unlock(conn->work_lock); + + if (link) { + core->flow_handler(core->user_context, link, link->incremental_credit); + link->incremental_credit = 0; + } + } while (link); return event_count; } @@ -402,6 +416,9 @@ static qdr_link_t *qdr_create_link_CT(qdr_core_t *core, link->strip_annotations_in = conn->strip_annotations_in; link->strip_annotations_out = conn->strip_annotations_out; + DEQ_INSERT_TAIL(core->open_links, link); + qdr_add_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION); + qdr_connection_work_t *work = new_qdr_connection_work_t(); ZERO(work); work->work_type = QDR_CONNECTION_WORK_FIRST_ATTACH; @@ -493,6 +510,24 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local) } +static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link) +{ + // + // Remove the link from the master list links + // + DEQ_REMOVE(core->open_links, link); + + // + // Remove the reference to this link in the connection's reference lists + // + qdr_del_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION); + sys_mutex_lock(conn->work_lock); + qdr_del_link_ref(&conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY); + qdr_del_link_ref(&conn->links_with_credit , link, QDR_LINK_LIST_CLASS_FLOW); + sys_mutex_unlock(conn->work_lock); +} + + /** * qdr_lookup_terminus_address_CT * @@ -666,6 +701,18 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo // This involves the links and the dispositions of deliveries stored // with the links. // + qdr_link_ref_t *link_ref = DEQ_HEAD(conn->links); + while (link_ref) { + // + // TODO - if the link is link-routed and has a peer, detach the peer. + // + + // + // Clean up the link and all its associated state. + // + qdr_link_cleanup_CT(core, conn, link_ref->link); + link_ref = DEQ_HEAD(conn->links); + } // // Discard items on the work list @@ -689,6 +736,12 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act qdr_terminus_t *target = action->args.connection.target; // + // Put the link into the proper lists for tracking. + // + DEQ_INSERT_TAIL(core->open_links, link); + qdr_add_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION); + + // // Reject any attaches of inter-router links that arrive on connections that are not inter-router. // Reject any waypoint links. Waypoint links are always initiated by a router, not the remote container. // http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/86852b2b/src/router_core/forwarder.c ---------------------------------------------------------------------- diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 90e210f..c66721c 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -127,6 +127,8 @@ int qdr_forward_multicast_CT(qdr_core_t *core, qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg); qdr_forward_deliver_CT(core, out_link, out_delivery); fanout++; + if (out_link->link_type != QD_LINK_CONTROL && out_link->link_type != QD_LINK_ROUTER) + addr->deliveries_egress++; link_ref = DEQ_NEXT(link_ref); } } @@ -199,7 +201,6 @@ int qdr_forward_multicast_CT(qdr_core_t *core, qdr_forward_deliver_CT(core, dest_link, out_delivery); fanout++; addr->deliveries_transit++; - qdr_connection_activate_CT(core, dest_link->conn); } } @@ -214,6 +215,7 @@ int qdr_forward_multicast_CT(qdr_core_t *core, while (sub) { qdr_forward_on_message_CT(core, sub, in_delivery ? in_delivery->link : 0, msg); fanout++; + addr->deliveries_to_container++; sub = DEQ_NEXT(sub); } } @@ -232,6 +234,9 @@ int qdr_forward_closest_CT(qdr_core_t *core, bool control, qd_bitmask_t *link_exclusion) { + qdr_link_t *out_link; + qdr_delivery_t *out_delivery; + // // The Anycast forwarders don't respect link exclusions. // @@ -263,6 +268,7 @@ int qdr_forward_closest_CT(qdr_core_t *core, DEQ_INSERT_TAIL(addr->subscriptions, sub); } + addr->deliveries_to_container++; return 1; } } @@ -272,8 +278,8 @@ int qdr_forward_closest_CT(qdr_core_t *core, // qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks); if (link_ref) { - qdr_link_t *out_link = link_ref->link; - qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg); + out_link = link_ref->link; + out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg); qdr_forward_deliver_CT(core, out_link, out_delivery); // @@ -285,14 +291,30 @@ int qdr_forward_closest_CT(qdr_core_t *core, DEQ_INSERT_TAIL(addr->rlinks, link_ref); } + addr->deliveries_egress++; return 1; } // - // TODO // Forward to remote routers with subscribers using the appropriate // link for the traffic class: control or data // + // TODO - presently, this picks one remote link to send to. This needs + // to be enhanced so it properly chooses the route to the closest destination. + // + int router_bit; + if (qd_bitmask_first_set(addr->rnodes, &router_bit)) { + qdr_node_t *rnode = core->routers_by_mask_bit[router_bit]; + if (rnode) { + out_link = control ? rnode->peer_control_link : rnode->peer_data_link; + if (out_link) { + out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg); + qdr_forward_deliver_CT(core, out_link, out_delivery); + addr->deliveries_transit++; + return 1; + } + } + } return 0; } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/86852b2b/src/router_core/route_tables.c ---------------------------------------------------------------------- diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c index 0bc31d8..cbd0a11 100644 --- a/src/router_core/route_tables.c +++ b/src/router_core/route_tables.c @@ -235,7 +235,7 @@ static void qdr_add_router_CT(qdr_core_t *core, qdr_action_t *action, bool disca // This record will be found whenever a "foreign" topological address to this // remote router is looked up. // - addr = qdr_address_CT(core, QD_SEMANTICS_MULTICAST_FLOOD); // TODO - switch back to ANYCAST_CLOSEST when able + addr = qdr_address_CT(core, QD_SEMANTICS_ANYCAST_CLOSEST); qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle); DEQ_INSERT_TAIL(core->addrs, addr); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/86852b2b/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 24f56bc..8b8abd9 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -201,10 +201,11 @@ DEQ_DECLARE(qdr_delivery_ref_t, qdr_delivery_ref_list_t); void qdr_add_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_t *dlv); void qdr_del_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_ref_t *ref); -#define QDR_LINK_LIST_CLASS_ADDRESS 0 -#define QDR_LINK_LIST_CLASS_DELIVERY 1 -#define QDR_LINK_LIST_CLASS_FLOW 2 -#define QDR_LINK_LIST_CLASSES 3 +#define QDR_LINK_LIST_CLASS_ADDRESS 0 +#define QDR_LINK_LIST_CLASS_DELIVERY 1 +#define QDR_LINK_LIST_CLASS_FLOW 2 +#define QDR_LINK_LIST_CLASS_CONNECTION 3 +#define QDR_LINK_LIST_CLASSES 4 struct qdr_link_t { DEQ_LINKS(qdr_link_t); @@ -225,6 +226,7 @@ struct qdr_link_t { int capacity; int incremental_credit_CT; int incremental_credit; + uint64_t total_deliveries; }; ALLOC_DECLARE(qdr_link_t); @@ -368,9 +370,9 @@ struct qdr_connection_t { bool strip_annotations_in; bool strip_annotations_out; int mask_bit; - qdr_link_list_t links; qdr_connection_work_list_t work_list; sys_mutex_t *work_lock; + qdr_link_ref_list_t links; qdr_link_ref_list_t links_with_deliveries; qdr_link_ref_list_t links_with_credit; }; @@ -393,6 +395,7 @@ struct qdr_core_t { qd_timer_t *work_timer; qdr_connection_list_t open_connections; + qdr_link_list_t open_links; // // Agent section http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/86852b2b/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index af88530..50e5320 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -119,6 +119,7 @@ void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) if (!dlv->settled) DEQ_INSERT_TAIL(link->unsettled, dlv); credit--; + link->total_deliveries++; offer = DEQ_SIZE(link->undelivered); } else drained = true; @@ -183,26 +184,18 @@ void qdr_delivery_free(qdr_delivery_t *delivery) { if (delivery->msg) qd_message_free(delivery->msg); + if (delivery->to_addr) + qd_field_iterator_free(delivery->to_addr); free_qdr_delivery_t(delivery); } -void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disposition) +void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disposition, bool settled) { qdr_action_t *action = qdr_action(qdr_update_delivery_CT, "update_delivery"); action->args.delivery.delivery = delivery; action->args.delivery.disposition = disposition; - action->args.delivery.settled = false; - - qdr_action_enqueue(core, action); -} - - -void qdr_delivery_settle(qdr_core_t *core, qdr_delivery_t *delivery) -{ - qdr_action_t *action = qdr_action(qdr_update_delivery_CT, "update_delivery"); - action->args.delivery.delivery = delivery; - action->args.delivery.settled = true; + action->args.delivery.settled = settled; qdr_action_enqueue(core, action); } @@ -286,12 +279,17 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis qdr_address_t *addr = link->owning_addr; if (!addr && dlv->to_addr) qd_hash_retrieve(core->addr_hash, dlv->to_addr, (void**) &addr); - if (addr) + if (addr) { fanout = qdr_forward_message_CT(core, addr, dlv->msg, dlv, false, link->link_type == QD_LINK_CONTROL, link_exclude); + if (link->link_type != QD_LINK_CONTROL && link->link_type != QD_LINK_ROUTER) + addr->deliveries_ingress++; + link->total_deliveries++; + } } if (fanout == 0) { + printf("TODO fanout == 0\n"); if (link->owning_addr) { // // Message was not delivered and the link is not anonymous. @@ -303,7 +301,6 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis // // TODO - Release the delivery // - printf("TODO fanout == 0\n"); } } else if (fanout == 1) { if (presettled) { @@ -335,13 +332,14 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard) qd_address_iterator_reset_view(addr_field->iterator, ITER_VIEW_ADDRESS_HASH); qd_hash_retrieve(core->addr_hash, addr_field->iterator, (void**) &addr); - if (addr) + if (addr) { // // Forward the message. We don't care what the fanout count is. // (void) qdr_forward_message_CT(core, addr, msg, 0, action->args.io.exclude_inprocess, action->args.io.control, 0); - else + addr->deliveries_from_container++; + } else qd_log(core->log, QD_LOG_DEBUG, "In-process send to an unknown address"); } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/86852b2b/src/router_node.c ---------------------------------------------------------------------- diff --git a/src/router_node.c b/src/router_node.c index b16b862..2235ba3 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -311,6 +311,7 @@ static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd // pn_delivery_update(pnd, PN_REJECTED); pn_delivery_settle(pnd); + qd_server_activate(qd_link_connection(link)); } // @@ -333,6 +334,7 @@ static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd // pn_delivery_update(pnd, PN_REJECTED); pn_delivery_settle(pnd); + qd_server_activate(qd_link_connection(link)); } } @@ -349,19 +351,24 @@ static void router_disposition_handler(void* context, qd_link_t *link, pn_delive return; // - // Update the disposition of the delivery - // - qdr_delivery_update_disposition(router->router_core, delivery, pn_delivery_remote_state(pnd)); - - // - // If the delivery is settled, remove the linkage between pn-delivery and qdr-delivery - // and settle the qdr-delivery. + // If the delivery is settled, remove the linkage between the PN and QDR deliveries. // if (pn_delivery_settled(pnd)) { pn_delivery_set_context(pnd, 0); qdr_delivery_set_context(delivery, 0); - qdr_delivery_settle(router->router_core, delivery); } + + // + // Update the disposition of the delivery + // + qdr_delivery_update_disposition(router->router_core, delivery, + pn_delivery_remote_state(pnd), pn_delivery_settled(pnd)); + + // + // If settled, close out the delivery + // + if (pn_delivery_settled(pnd)) + pn_delivery_settle(pnd); } @@ -700,6 +707,9 @@ static void qd_router_delivery_update(void *context, qdr_delivery_t *dlv, uint64 { pn_delivery_t *pnd = (pn_delivery_t*) qdr_delivery_get_context(dlv); + if (!pnd) + return; + // // If the disposition has changed, update the proton delivery. // http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/86852b2b/tests/parse_test.c ---------------------------------------------------------------------- diff --git a/tests/parse_test.c b/tests/parse_test.c index 066d8ef..4d30cce 100644 --- a/tests/parse_test.c +++ b/tests/parse_test.c @@ -56,6 +56,7 @@ struct fs_vector_t { {"\x81\x01\x02\x03\x04\x05\x06\x07\x08", 9, QD_AMQP_LONG, 0, 0, 0, 1, 0, 0x0102030405060708}, // 15 {"\x55\x08", 2, QD_AMQP_SMALLLONG, 0, 0, 0, 1, 0, 0x08}, // 16 +{"\x45", 1, QD_AMQP_LIST0, 0, 0, 0, 0, 0, 0}, // 17 {0, 0, 0, 0, 0} }; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/86852b2b/tools/qdstat ---------------------------------------------------------------------- diff --git a/tools/qdstat b/tools/qdstat index 3bcc33d..39bcc0c 100755 --- a/tools/qdstat +++ b/tools/qdstat @@ -202,8 +202,10 @@ class BusManager(Node): heads.append(Header("class")) heads.append(Header("address")) heads.append(Header("phase")) - heads.append(Header("event-fifo")) - heads.append(Header("msg-fifo")) + heads.append(Header("capacity")) + heads.append(Header("undelivered")) + heads.append(Header("unsettled")) + heads.append(Header("deliveries")) rows = [] objects = self.query('org.apache.qpid.dispatch.router.link') @@ -212,18 +214,14 @@ class BusManager(Node): row = [] row.append(link.linkType) row.append(link.linkDir) - if link.linkType == "inter-router": - row.append(self._identity_clean(link.identity)) - else: - row.append('-') + row.append(self._identity_clean(link.identity)) row.append(self._addr_class(link.owningAddr)) row.append(self._addr_text(link.owningAddr)) row.append(self._addr_phase(link.owningAddr)) - row.append(link.eventFifoDepth) - if link.linkDir == 'out': - row.append(link.msgFifoDepth) - else: - row.append('-') + row.append(link.capacity) + row.append(link.undeliveredCount) + row.append(link.unsettledCount) + row.append(link.deliveryCount) rows.append(row) title = "Router Links" dispRows = rows --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
