Repository: qpid-dispatch
Updated Branches:
  refs/heads/tross-DISPATCH-179-1 ccf4c7086 -> af71e0182


DISPATCH-179 - Added statistics for address and link, finished agent for link. 
Numerous fixes.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/86852b2b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/86852b2b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/86852b2b

Branch: refs/heads/tross-DISPATCH-179-1
Commit: 86852b2b02807c100b11eb8daea1a03e0b8ed1f9
Parents: ccf4c70
Author: Ted Ross <[email protected]>
Authored: Fri Jan 29 11:29:06 2016 -0500
Committer: Ted Ross <[email protected]>
Committed: Fri Jan 29 11:29:06 2016 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h           |  4 +-
 python/qpid_dispatch/management/qdrouter.json | 39 +++++++--
 src/router_core/agent.c                       | 19 ++---
 src/router_core/agent_link.c                  | 94 +++++++++++++---------
 src/router_core/connections.c                 | 89 +++++++++++++++-----
 src/router_core/forwarder.c                   | 30 ++++++-
 src/router_core/route_tables.c                |  2 +-
 src/router_core/router_core_private.h         | 13 +--
 src/router_core/transfer.c                    | 30 ++++---
 src/router_node.c                             | 26 ++++--
 tests/parse_test.c                            |  1 +
 tools/qdstat                                  | 20 +++--
 12 files changed, 245 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/86852b2b/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h 
b/include/qpid/dispatch/router_core.h
index c081b6e..0260fad 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -132,7 +132,6 @@ typedef enum {
     QD_LINK_CONTROL,    ///< A link to a peer router for control messages
     QD_LINK_ROUTER      ///< A link to a peer router for routed messages
 } qd_link_type_t;
-ENUM_DECLARE(qd_link_type);
 
 typedef enum {
     QDR_ROLE_NORMAL,
@@ -528,8 +527,7 @@ void qdr_connection_handlers(qdr_core_t                
*core,
  ******************************************************************************
  */
 void qdr_delivery_free(qdr_delivery_t *delivery);
-void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t 
*delivery, uint64_t disp);
-void qdr_delivery_settle(qdr_core_t *core, qdr_delivery_t *delivery);
+void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t 
*delivery, uint64_t disp, bool settled);
 
 void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context);
 void *qdr_delivery_get_context(qdr_delivery_t *delivery);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/86852b2b/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json 
b/python/qpid_dispatch/management/qdrouter.json
index dd54457..1de8a45 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -832,13 +832,38 @@
             "description": "Link to another AMQP endpoint: router node, client 
or other AMQP process.",
             "extends": "operationalEntity",
             "attributes": {
-                "linkName": {"type": "string"},
-                "linkType": {"type": ["endpoint", "waypoint", "inter-router", 
"inter-area"]},
-                "linkDir": {"type": ["in", "out"]},
-                "owningAddr": {"type": "string"},
-                "eventFifoDepth": {"type": "integer", "graph": true},
-                "msgFifoDepth": {"type": "integer", "graph": true},
-                "remoteContainer": {"type": "string"}
+                "linkName": {
+                    "type": "string",
+                    "description": "Name assigned to the link in the Attach."
+                },
+                "linkType": {
+                    "type": ["endpoint", "waypoint", "router-control", 
"inter-router"],
+                    "description": "Type of link: endpoint: a link to a 
normally connected endpoint; waypoint: a link to a waypoint node; inter-router: 
a link to another router in the network."
+                },
+                "linkDir": {
+                    "type": ["in", "out"],
+                    "description": "Direction of delivery flow over the link, 
inbound or outbound to or from the router."
+                },
+                "owningAddr": {
+                    "type": "string",
+                    "description": "Address assigned to this link during 
attach: The target for inbound links or the source for outbound links."
+                },
+                "capacity": {
+                    "type": "integer",
+                    "description": "The capacity, in deliveries, for the link. 
 The number of undelivered plus unsettled deliveries shall not exceed the 
capacity.  This is enforced by link flow control."
+                },
+                "undeliveredCount": {
+                    "type": "integer",
+                    "description": "The number of undelivered messages pending 
for the link."
+                },
+                "unsettledCount": {
+                    "type": "integer",
+                    "description": "The number of unsettled deliveries 
awaiting settlement on the link"
+                },
+                "deliveryCount": {
+                    "type": "integer",
+                    "description": "The total number of deliveries that have 
traversed this link."
+                }
             }
         },
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/86852b2b/src/router_core/agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent.c b/src/router_core/agent.c
index 5c94374..3f42fd2 100644
--- a/src/router_core/agent.c
+++ b/src/router_core/agent.c
@@ -43,20 +43,21 @@ static const char *qdr_address_columns[] =
 
 
 static const char *qdr_link_columns[] =
-    {"linkType",
-     "name",
+    {"name",
+     "identity",
+     "type",
+     "linkName",
+     "linkType",
      "linkDir",
-     "msgFifoDepth",
      "owningAddr",
-     "remoteContainer",
-     "linkName",
-     "eventFifoDepth",
-     "type",
-     "identity",
+     "capacity",
+     "undeliveredCount",
+     "unsettledCount",
+     "deliveryCount",
      0};
 
 
-#define QDR_LINK_COLUMN_COUNT     10
+#define QDR_LINK_COLUMN_COUNT     11
 
 static void qdr_manage_read_CT(qdr_core_t *core, qdr_action_t *action, bool 
discard);
 static void qdr_manage_create_CT(qdr_core_t *core, qdr_action_t *action, bool 
discard);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/86852b2b/src/router_core/agent_link.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c
index e3cc484..4d27425 100644
--- a/src/router_core/agent_link.c
+++ b/src/router_core/agent_link.c
@@ -19,25 +19,38 @@
 
 #include "agent_link.h"
 
-#define QDR_LINK_LINK_TYPE          0
-#define QDR_LINK_LINK_NAME          1
-#define QDR_LINK_LINK_DIR           2 //done
-#define QDR_LINK_MSG_FIFO_DEPTH     3 //done
-#define QDR_LINK_OWNING_ADDR        4 //done
-#define QDR_LINK_REMOTE_CONTAINER   5
-#define QDR_LINK_NAME               6
-#define QDR_LINK_EVENT_FIFO_DEPTH   7 //done
-#define QDR_LINK_TYPE               8
-#define QDR_LINK_IDENTITY           9
-
-static const char *qd_link_type_names[] = { "endpoint", "waypoint", 
"inter-router", "inter-area" };
-ENUM_DEFINE(qd_link_type, qd_link_type_names);
-
-static const char *address_key(qdr_address_t *addr) {
+#define QDR_LINK_NAME               0
+#define QDR_LINK_IDENTITY           1
+#define QDR_LINK_TYPE               2
+#define QDR_LINK_LINK_NAME          3
+#define QDR_LINK_LINK_TYPE          4
+#define QDR_LINK_LINK_DIR           5
+#define QDR_LINK_OWNING_ADDR        6
+#define QDR_LINK_CAPACITY           7
+#define QDR_LINK_UNDELIVERED_COUNT  8
+#define QDR_LINK_UNSETTLED_COUNT    9
+#define QDR_LINK_DELIVERY_COUNT     10
+
+static const char *qd_link_type_name(qd_link_type_t lt)
+{
+    switch (lt) {
+    case QD_LINK_ENDPOINT : return "endpoint";
+    case QD_LINK_WAYPOINT : return "waypoint";
+    case QD_LINK_CONTROL  : return "router-control";
+    case QD_LINK_ROUTER   : return "inter-router";
+    }
+
+    return "";
+}
+
+
+static const char *address_key(qdr_address_t *addr)
+{
     return addr && addr->hash_handle ? (const char*) 
qd_hash_key_by_handle(addr->hash_handle) : NULL;
 }
 
-static void qdr_agent_write_link_CT(qdr_query_t *query,  qdr_link_t *link )
+
+static void qdr_agent_write_link_CT(qdr_query_t *query,  qdr_link_t *link)
 {
     qd_composed_field_t *body = query->body;
 
@@ -54,32 +67,39 @@ static void qdr_agent_write_link_CT(qdr_query_t *query,  
qdr_link_t *link )
             qd_compose_insert_string(body, 
"org.apache.qpid.dispatch.router.link");
             break;
 
-        case QDR_LINK_REMOTE_CONTAINER:
-            qd_compose_insert_null(body); // FIXME
-            break;
-
         case QDR_LINK_LINK_NAME:
-            qd_compose_insert_null(body); // FIXME
+            qd_compose_insert_string(body, link->name);
             break;
 
         case QDR_LINK_LINK_TYPE:
             qd_compose_insert_string(body, qd_link_type_name(link->link_type));
             break;
 
+        case QDR_LINK_LINK_DIR:
+            qd_compose_insert_string(body, link->link_direction == QD_INCOMING 
? "in" : "out");
+            break;
+
         case QDR_LINK_OWNING_ADDR:
-            qd_compose_insert_string(body, address_key(link->owning_addr));
+            if (link->owning_addr)
+                qd_compose_insert_string(body, address_key(link->owning_addr));
+            else
+                qd_compose_insert_null(body);
             break;
 
-        case QDR_LINK_LINK_DIR:
-            qd_compose_insert_string(body, link->link_direction == QD_INCOMING 
? "in" : "out");
+        case QDR_LINK_CAPACITY:
+            qd_compose_insert_uint(body, link->capacity);
+            break;
+
+        case QDR_LINK_UNDELIVERED_COUNT:
+            qd_compose_insert_ulong(body, DEQ_SIZE(link->undelivered));
             break;
 
-        case QDR_LINK_MSG_FIFO_DEPTH:
-            qd_compose_insert_ulong(body, 0); // FIXME
+        case QDR_LINK_UNSETTLED_COUNT:
+            qd_compose_insert_ulong(body, DEQ_SIZE(link->unsettled));
             break;
 
-        case QDR_LINK_EVENT_FIFO_DEPTH:
-            qd_compose_insert_ulong(body, 0); // FIXME
+        case QDR_LINK_DELIVERY_COUNT:
+            qd_compose_insert_ulong(body, link->total_deliveries);
             break;
 
         default:
@@ -95,10 +115,10 @@ static void qdr_manage_advance_link_CT(qdr_query_t *query, 
qdr_link_t *link)
 {
     query->next_offset++;
     link = DEQ_NEXT(link);
-    if (link)
+    if (link) {
         query->more     = true;
         //query->next_key = qdr_field((const char*) 
qd_hash_key_by_handle(link->owning_addr->hash_handle));
-    else
+    } else
         query->more = false;
 }
 
@@ -113,7 +133,7 @@ void qdra_link_get_first_CT(qdr_core_t *core, qdr_query_t 
*query, int offset)
     //
     // If the offset goes beyond the set of links, end the query now.
     //
-    if (true /*offset >= DEQ_SIZE(core->links)*/) {  // FIXME
+    if (offset >= DEQ_SIZE(core->open_links)) {
         query->more = false;
         qdr_agent_enqueue_response_CT(core, query);
         return;
@@ -122,7 +142,7 @@ void qdra_link_get_first_CT(qdr_core_t *core, qdr_query_t 
*query, int offset)
     //
     // Run to the address at the offset.
     //
-    qdr_link_t *link = 0; // DEQ_HEAD(core->links);  FIXME
+    qdr_link_t *link = DEQ_HEAD(core->open_links);
     for (int i = 0; i < offset && link; i++)
         link = DEQ_NEXT(link);
     assert(link);
@@ -149,17 +169,11 @@ void qdra_link_get_next_CT(qdr_core_t *core, qdr_query_t 
*query)
 {
     qdr_link_t *link = 0;
 
-    if (!link) {
-        //
-        // If the address was removed in the time between this get and the 
previous one,
-        // we need to use the saved offset, which is less efficient.
-        //
-        if (false /*query->next_offset < DEQ_SIZE(core->links)*/) {  // FIXME
-            link = 0; //DEQ_HEAD(core->links);
+        if (query->next_offset < DEQ_SIZE(core->open_links)) {
+            link = DEQ_HEAD(core->open_links);
             for (int i = 0; i < query->next_offset && link; i++)
                 link = DEQ_NEXT(link);
         }
-    }
 
     if (link) {
         //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/86852b2b/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 7e998f0..6f4e405 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -114,14 +114,10 @@ void *qdr_connection_get_context(const qdr_connection_t 
*conn)
 int qdr_connection_process(qdr_connection_t *conn)
 {
     qdr_connection_work_list_t  work_list;
-    qdr_link_ref_list_t         links_with_deliveries;
-    qdr_link_ref_list_t         links_with_credit;
     qdr_core_t                 *core = conn->core;
 
     sys_mutex_lock(conn->work_lock);
     DEQ_MOVE(conn->work_list, work_list);
-    DEQ_MOVE(conn->links_with_deliveries, links_with_deliveries);
-    DEQ_MOVE(conn->links_with_credit, links_with_credit);
     sys_mutex_unlock(conn->work_lock);
 
     int event_count = DEQ_SIZE(work_list);
@@ -150,20 +146,38 @@ int qdr_connection_process(qdr_connection_t *conn)
         work = DEQ_HEAD(work_list);
     }
 
-    qdr_link_ref_t *ref = DEQ_HEAD(links_with_deliveries);
-    while (ref) {
-        core->push_handler(core->user_context, ref->link);
-        qdr_del_link_ref(&links_with_deliveries, ref->link, 
QDR_LINK_LIST_CLASS_DELIVERY);
-        ref = DEQ_HEAD(links_with_deliveries);
-    }
-
-    ref = DEQ_HEAD(links_with_credit);
-    while (ref) {
-        core->flow_handler(core->user_context, ref->link, 
ref->link->incremental_credit);
-        ref->link->incremental_credit = 0;
-        qdr_del_link_ref(&links_with_credit, ref->link, 
QDR_LINK_LIST_CLASS_FLOW);
-        ref = DEQ_HEAD(links_with_credit);
-    }
+    qdr_link_ref_t *ref;
+    qdr_link_t     *link;
+
+    do {
+        sys_mutex_lock(conn->work_lock);
+        ref = DEQ_HEAD(conn->links_with_deliveries);
+        if (ref) {
+            link = ref->link;
+            qdr_del_link_ref(&conn->links_with_deliveries, ref->link, 
QDR_LINK_LIST_CLASS_DELIVERY);
+        } else
+            link = 0;
+        sys_mutex_unlock(conn->work_lock);
+
+        if (link)
+            core->push_handler(core->user_context, link);
+    } while (link);
+
+    do {
+        sys_mutex_lock(conn->work_lock);
+        ref = DEQ_HEAD(conn->links_with_credit);
+        if (ref) {
+            link = ref->link;
+            qdr_del_link_ref(&conn->links_with_credit, ref->link, 
QDR_LINK_LIST_CLASS_FLOW);
+        } else
+            link = 0;
+        sys_mutex_unlock(conn->work_lock);
+
+        if (link) {
+            core->flow_handler(core->user_context, link, 
link->incremental_credit);
+            link->incremental_credit = 0;
+        }
+    } while (link);
 
     return event_count;
 }
@@ -402,6 +416,9 @@ static qdr_link_t *qdr_create_link_CT(qdr_core_t       
*core,
     link->strip_annotations_in  = conn->strip_annotations_in;
     link->strip_annotations_out = conn->strip_annotations_out;
 
+    DEQ_INSERT_TAIL(core->open_links, link);
+    qdr_add_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
+
     qdr_connection_work_t *work = new_qdr_connection_work_t();
     ZERO(work);
     work->work_type = QDR_CONNECTION_WORK_FIRST_ATTACH;
@@ -493,6 +510,24 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t 
*addr, bool was_local)
 }
 
 
+static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, 
qdr_link_t *link)
+{
+    //
+    // Remove the link from the master list links
+    //
+    DEQ_REMOVE(core->open_links, link);
+
+    //
+    // Remove the reference to this link in the connection's reference lists
+    //
+    qdr_del_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
+    sys_mutex_lock(conn->work_lock);
+    qdr_del_link_ref(&conn->links_with_deliveries, link, 
QDR_LINK_LIST_CLASS_DELIVERY);
+    qdr_del_link_ref(&conn->links_with_credit    , link, 
QDR_LINK_LIST_CLASS_FLOW);
+    sys_mutex_unlock(conn->work_lock);
+}
+
+
 /**
  * qdr_lookup_terminus_address_CT
  *
@@ -666,6 +701,18 @@ static void qdr_connection_closed_CT(qdr_core_t *core, 
qdr_action_t *action, boo
     //        This involves the links and the dispositions of deliveries stored
     //        with the links.
     //
+    qdr_link_ref_t *link_ref = DEQ_HEAD(conn->links);
+    while (link_ref) {
+        //
+        // TODO - if the link is link-routed and has a peer, detach the peer.
+        //
+
+        //
+        // Clean up the link and all its associated state.
+        //
+        qdr_link_cleanup_CT(core, conn, link_ref->link);
+        link_ref = DEQ_HEAD(conn->links);
+    }
 
     //
     // Discard items on the work list
@@ -689,6 +736,12 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t 
*core, qdr_action_t *act
     qdr_terminus_t    *target = action->args.connection.target;
 
     //
+    // Put the link into the proper lists for tracking.
+    //
+    DEQ_INSERT_TAIL(core->open_links, link);
+    qdr_add_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
+
+    //
     // Reject any attaches of inter-router links that arrive on connections 
that are not inter-router.
     // Reject any waypoint links.  Waypoint links are always initiated by a 
router, not the remote container.
     //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/86852b2b/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 90e210f..c66721c 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -127,6 +127,8 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
             qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, 
in_delivery, out_link, msg);
             qdr_forward_deliver_CT(core, out_link, out_delivery);
             fanout++;
+            if (out_link->link_type != QD_LINK_CONTROL && out_link->link_type 
!= QD_LINK_ROUTER)
+                addr->deliveries_egress++;
             link_ref = DEQ_NEXT(link_ref);
         }
     }
@@ -199,7 +201,6 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
                 qdr_forward_deliver_CT(core, dest_link, out_delivery);
                 fanout++;
                 addr->deliveries_transit++;
-                qdr_connection_activate_CT(core, dest_link->conn);
             }
         }
 
@@ -214,6 +215,7 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
         while (sub) {
             qdr_forward_on_message_CT(core, sub, in_delivery ? 
in_delivery->link : 0, msg);
             fanout++;
+            addr->deliveries_to_container++;
             sub = DEQ_NEXT(sub);
         }
     }
@@ -232,6 +234,9 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
                            bool             control,
                            qd_bitmask_t    *link_exclusion)
 {
+    qdr_link_t     *out_link;
+    qdr_delivery_t *out_delivery;
+
     //
     // The Anycast forwarders don't respect link exclusions.
     //
@@ -263,6 +268,7 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
                 DEQ_INSERT_TAIL(addr->subscriptions, sub);
             }
 
+            addr->deliveries_to_container++;
             return 1;
         }
     }
@@ -272,8 +278,8 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
     //
     qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
     if (link_ref) {
-        qdr_link_t     *out_link     = link_ref->link;
-        qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, 
in_delivery, out_link, msg);
+        out_link     = link_ref->link;
+        out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, 
out_link, msg);
         qdr_forward_deliver_CT(core, out_link, out_delivery);
 
         //
@@ -285,14 +291,30 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
             DEQ_INSERT_TAIL(addr->rlinks, link_ref);
         }
 
+        addr->deliveries_egress++;
         return 1;
     }
 
     //
-    // TODO
     // Forward to remote routers with subscribers using the appropriate
     // link for the traffic class: control or data
     //
+    // TODO - presently, this picks one remote link to send to.  This needs
+    //        to be enhanced so it properly chooses the route to the closest 
destination.
+    //
+    int router_bit;
+    if (qd_bitmask_first_set(addr->rnodes, &router_bit)) {
+        qdr_node_t *rnode = core->routers_by_mask_bit[router_bit];
+        if (rnode) {
+            out_link = control ? rnode->peer_control_link : 
rnode->peer_data_link;
+            if (out_link) {
+                out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, 
out_link, msg);
+                qdr_forward_deliver_CT(core, out_link, out_delivery);
+                addr->deliveries_transit++;
+                return 1;
+            }
+        }
+    }
 
     return 0;
 }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/86852b2b/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 0bc31d8..cbd0a11 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -235,7 +235,7 @@ static void qdr_add_router_CT(qdr_core_t *core, 
qdr_action_t *action, bool disca
         // This record will be found whenever a "foreign" topological address 
to this
         // remote router is looked up.
         //
-        addr = qdr_address_CT(core, QD_SEMANTICS_MULTICAST_FLOOD); // TODO - 
switch back to ANYCAST_CLOSEST when able
+        addr = qdr_address_CT(core, QD_SEMANTICS_ANYCAST_CLOSEST);
         qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
         DEQ_INSERT_TAIL(core->addrs, addr);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/86852b2b/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index 24f56bc..8b8abd9 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -201,10 +201,11 @@ DEQ_DECLARE(qdr_delivery_ref_t, qdr_delivery_ref_list_t);
 void qdr_add_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_t *dlv);
 void qdr_del_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_ref_t 
*ref);
 
-#define QDR_LINK_LIST_CLASS_ADDRESS  0
-#define QDR_LINK_LIST_CLASS_DELIVERY 1
-#define QDR_LINK_LIST_CLASS_FLOW     2
-#define QDR_LINK_LIST_CLASSES        3
+#define QDR_LINK_LIST_CLASS_ADDRESS    0
+#define QDR_LINK_LIST_CLASS_DELIVERY   1
+#define QDR_LINK_LIST_CLASS_FLOW       2
+#define QDR_LINK_LIST_CLASS_CONNECTION 3
+#define QDR_LINK_LIST_CLASSES          4
 
 struct qdr_link_t {
     DEQ_LINKS(qdr_link_t);
@@ -225,6 +226,7 @@ struct qdr_link_t {
     int                      capacity;
     int                      incremental_credit_CT;
     int                      incremental_credit;
+    uint64_t                 total_deliveries;
 };
 
 ALLOC_DECLARE(qdr_link_t);
@@ -368,9 +370,9 @@ struct qdr_connection_t {
     bool                        strip_annotations_in;
     bool                        strip_annotations_out;
     int                         mask_bit;
-    qdr_link_list_t             links;
     qdr_connection_work_list_t  work_list;
     sys_mutex_t                *work_lock;
+    qdr_link_ref_list_t         links;
     qdr_link_ref_list_t         links_with_deliveries;
     qdr_link_ref_list_t         links_with_credit;
 };
@@ -393,6 +395,7 @@ struct qdr_core_t {
     qd_timer_t              *work_timer;
 
     qdr_connection_list_t open_connections;
+    qdr_link_list_t       open_links;
 
     //
     // Agent section

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/86852b2b/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index af88530..50e5320 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -119,6 +119,7 @@ void qdr_link_process_deliveries(qdr_core_t *core, 
qdr_link_t *link, int credit)
             if (!dlv->settled)
                 DEQ_INSERT_TAIL(link->unsettled, dlv);
             credit--;
+            link->total_deliveries++;
             offer = DEQ_SIZE(link->undelivered);
         } else
             drained = true;
@@ -183,26 +184,18 @@ void qdr_delivery_free(qdr_delivery_t *delivery)
 {
     if (delivery->msg)
         qd_message_free(delivery->msg);
+    if (delivery->to_addr)
+        qd_field_iterator_free(delivery->to_addr);
     free_qdr_delivery_t(delivery);
 }
 
 
-void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t 
*delivery, uint64_t disposition)
+void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t 
*delivery, uint64_t disposition, bool settled)
 {
     qdr_action_t *action = qdr_action(qdr_update_delivery_CT, 
"update_delivery");
     action->args.delivery.delivery    = delivery;
     action->args.delivery.disposition = disposition;
-    action->args.delivery.settled     = false;
-
-    qdr_action_enqueue(core, action);
-}
-
-
-void qdr_delivery_settle(qdr_core_t *core, qdr_delivery_t *delivery)
-{
-    qdr_action_t *action = qdr_action(qdr_update_delivery_CT, 
"update_delivery");
-    action->args.delivery.delivery = delivery;
-    action->args.delivery.settled  = true;
+    action->args.delivery.settled     = settled;
 
     qdr_action_enqueue(core, action);
 }
@@ -286,12 +279,17 @@ static void qdr_link_deliver_CT(qdr_core_t *core, 
qdr_action_t *action, bool dis
         qdr_address_t *addr = link->owning_addr;
         if (!addr && dlv->to_addr)
             qd_hash_retrieve(core->addr_hash, dlv->to_addr, (void**) &addr);
-        if (addr)
+        if (addr) {
             fanout = qdr_forward_message_CT(core, addr, dlv->msg, dlv, false,
                                             link->link_type == 
QD_LINK_CONTROL, link_exclude);
+            if (link->link_type != QD_LINK_CONTROL && link->link_type != 
QD_LINK_ROUTER)
+                addr->deliveries_ingress++;
+            link->total_deliveries++;
+        }
     }
 
     if (fanout == 0) {
+        printf("TODO fanout == 0\n");
         if (link->owning_addr) {
             //
             // Message was not delivered and the link is not anonymous.
@@ -303,7 +301,6 @@ static void qdr_link_deliver_CT(qdr_core_t *core, 
qdr_action_t *action, bool dis
             //
             // TODO - Release the delivery
             //
-            printf("TODO fanout == 0\n");
         }
     } else if (fanout == 1) {
         if (presettled) {
@@ -335,13 +332,14 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t 
*action, bool discard)
 
         qd_address_iterator_reset_view(addr_field->iterator, 
ITER_VIEW_ADDRESS_HASH);
         qd_hash_retrieve(core->addr_hash, addr_field->iterator, (void**) 
&addr);
-        if (addr)
+        if (addr) {
             //
             // Forward the message.  We don't care what the fanout count is.
             //
             (void) qdr_forward_message_CT(core, addr, msg, 0, 
action->args.io.exclude_inprocess,
                                           action->args.io.control, 0);
-        else
+            addr->deliveries_from_container++;
+        } else
             qd_log(core->log, QD_LOG_DEBUG, "In-process send to an unknown 
address");
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/86852b2b/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index b16b862..2235ba3 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -311,6 +311,7 @@ static void router_rx_handler(void* context, qd_link_t 
*link, pn_delivery_t *pnd
             //
             pn_delivery_update(pnd, PN_REJECTED);
             pn_delivery_settle(pnd);
+            qd_server_activate(qd_link_connection(link));
         }
 
         //
@@ -333,6 +334,7 @@ static void router_rx_handler(void* context, qd_link_t 
*link, pn_delivery_t *pnd
         //
         pn_delivery_update(pnd, PN_REJECTED);
         pn_delivery_settle(pnd);
+        qd_server_activate(qd_link_connection(link));
     }
 }
 
@@ -349,19 +351,24 @@ static void router_disposition_handler(void* context, 
qd_link_t *link, pn_delive
         return;
 
     //
-    // Update the disposition of the delivery
-    //
-    qdr_delivery_update_disposition(router->router_core, delivery, 
pn_delivery_remote_state(pnd));
-
-    //
-    // If the delivery is settled, remove the linkage between pn-delivery and 
qdr-delivery
-    // and settle the qdr-delivery.
+    // If the delivery is settled, remove the linkage between the PN and QDR 
deliveries.
     //
     if (pn_delivery_settled(pnd)) {
         pn_delivery_set_context(pnd, 0);
         qdr_delivery_set_context(delivery, 0);
-        qdr_delivery_settle(router->router_core, delivery);
     }
+
+    //
+    // Update the disposition of the delivery
+    //
+    qdr_delivery_update_disposition(router->router_core, delivery,
+                                    pn_delivery_remote_state(pnd), 
pn_delivery_settled(pnd));
+
+    //
+    // If settled, close out the delivery
+    //
+    if (pn_delivery_settled(pnd))
+        pn_delivery_settle(pnd);
 }
 
 
@@ -700,6 +707,9 @@ static void qd_router_delivery_update(void *context, 
qdr_delivery_t *dlv, uint64
 {
     pn_delivery_t *pnd = (pn_delivery_t*) qdr_delivery_get_context(dlv);
 
+    if (!pnd)
+        return;
+
     //
     // If the disposition has changed, update the proton delivery.
     //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/86852b2b/tests/parse_test.c
----------------------------------------------------------------------
diff --git a/tests/parse_test.c b/tests/parse_test.c
index 066d8ef..4d30cce 100644
--- a/tests/parse_test.c
+++ b/tests/parse_test.c
@@ -56,6 +56,7 @@ struct fs_vector_t {
 {"\x81\x01\x02\x03\x04\x05\x06\x07\x08",
                          9, QD_AMQP_LONG,       0, 0, 0, 1, 0, 
0x0102030405060708},  // 15
 {"\x55\x08",             2, QD_AMQP_SMALLLONG,  0, 0, 0, 1, 0, 0x08},          
      // 16
+{"\x45",                 1, QD_AMQP_LIST0,      0, 0, 0, 0, 0, 0},             
      // 17
 {0, 0, 0, 0, 0}
 };
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/86852b2b/tools/qdstat
----------------------------------------------------------------------
diff --git a/tools/qdstat b/tools/qdstat
index 3bcc33d..39bcc0c 100755
--- a/tools/qdstat
+++ b/tools/qdstat
@@ -202,8 +202,10 @@ class BusManager(Node):
         heads.append(Header("class"))
         heads.append(Header("address"))
         heads.append(Header("phase"))
-        heads.append(Header("event-fifo"))
-        heads.append(Header("msg-fifo"))
+        heads.append(Header("capacity"))
+        heads.append(Header("undelivered"))
+        heads.append(Header("unsettled"))
+        heads.append(Header("deliveries"))
         rows = []
 
         objects = self.query('org.apache.qpid.dispatch.router.link')
@@ -212,18 +214,14 @@ class BusManager(Node):
             row = []
             row.append(link.linkType)
             row.append(link.linkDir)
-            if link.linkType == "inter-router":
-                row.append(self._identity_clean(link.identity))
-            else:
-                row.append('-')
+            row.append(self._identity_clean(link.identity))
             row.append(self._addr_class(link.owningAddr))
             row.append(self._addr_text(link.owningAddr))
             row.append(self._addr_phase(link.owningAddr))
-            row.append(link.eventFifoDepth)
-            if link.linkDir == 'out':
-                row.append(link.msgFifoDepth)
-            else:
-                row.append('-')
+            row.append(link.capacity)
+            row.append(link.undeliveredCount)
+            row.append(link.unsettledCount)
+            row.append(link.deliveryCount)
             rows.append(row)
         title = "Router Links"
         dispRows = rows


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to