Repository: qpid-dispatch
Updated Branches:
  refs/heads/tross-DISPATCH-179-1 b8facf8fe -> c695e93f5


DISPATCH-179 - Wired in the management agent (core and python).
               Handle settlement of messages consumed in-process.
               Added infrastructure for delivery disposition and settlement.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/c695e93f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/c695e93f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/c695e93f

Branch: refs/heads/tross-DISPATCH-179-1
Commit: c695e93f52f42172ffe08cc53c0008a143104264
Parents: b8facf8
Author: Ted Ross <[email protected]>
Authored: Thu Jan 21 15:53:01 2016 -0500
Committer: Ted Ross <[email protected]>
Committed: Thu Jan 21 15:53:01 2016 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h             | 12 +--
 .../qpid_dispatch_internal/management/agent.py  |  7 +-
 .../qpid_dispatch_internal/management/config.py |  2 +-
 src/router_core/connections.c                   | 26 +++---
 src/router_core/forwarder.c                     |  9 ++
 src/router_core/management_agent.c              |  2 +-
 src/router_core/router_core.c                   | 17 ++++
 src/router_core/router_core_private.h           | 55 +++++++++----
 src/router_core/transfer.c                      | 87 +++++++++++++++++---
 src/router_node.c                               | 45 ++++++++--
 tools/qdstat                                    |  1 +
 11 files changed, 207 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c695e93f/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h 
b/include/qpid/dispatch/router_core.h
index 70b9294..ceaa57b 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -498,7 +498,8 @@ typedef void (*qdr_link_flow_t)          (void *context, 
qdr_link_t *link, int c
 typedef void (*qdr_link_offer_t)         (void *context, qdr_link_t *link, int 
delivery_count);
 typedef void (*qdr_link_drained_t)       (void *context, qdr_link_t *link);
 typedef void (*qdr_link_push_t)          (void *context, qdr_link_t *link);
-typedef void (*qdr_link_deliver_t)       (void *context, qdr_link_t *link, 
qdr_delivery_t *delivery);
+typedef void (*qdr_link_deliver_t)       (void *context, qdr_link_t *link, 
qdr_delivery_t *delivery, bool settled);
+typedef void (*qdr_delivery_update_t)    (void *context, qdr_delivery_t *dlv, 
uint64_t disp, bool settled);
 
 void qdr_connection_handlers(qdr_core_t                *core,
                              void                      *context,
@@ -510,21 +511,22 @@ void qdr_connection_handlers(qdr_core_t                
*core,
                              qdr_link_offer_t           offer,
                              qdr_link_drained_t         drained,
                              qdr_link_push_t            push,
-                             qdr_link_deliver_t         deliver);
+                             qdr_link_deliver_t         deliver,
+                             qdr_delivery_update_t      delivery_update);
 
 /**
  ******************************************************************************
  * Delivery functions
  ******************************************************************************
  */
+void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t 
*delivery, uint64_t disp);
+void qdr_delivery_settle(qdr_core_t *core, qdr_delivery_t *delivery);
+
 void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context);
 void *qdr_delivery_get_context(qdr_delivery_t *delivery);
-uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery);
-bool qdr_delivery_is_settled(const qdr_delivery_t *delivery);
 void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int 
*length);
 qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery);
 
-void qdr_delivery_update_disposition(qdr_delivery_t *delivery);
 void qdr_delivery_update_flow(qdr_delivery_t *delivery);
 void qdr_delivery_process(qdr_delivery_t *delivery);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c695e93f/python/qpid_dispatch_internal/management/agent.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/management/agent.py 
b/python/qpid_dispatch_internal/management/agent.py
index 0750971..4d75f8b 100644
--- a/python/qpid_dispatch_internal/management/agent.py
+++ b/python/qpid_dispatch_internal/management/agent.py
@@ -73,7 +73,7 @@ from cProfile import Profile
 from cStringIO import StringIO
 from ctypes import c_void_p, py_object, c_long
 from subprocess import Popen
-from ..dispatch import IoAdapter, LogAdapter, LOG_INFO, LOG_DEBUG, LOG_ERROR
+from ..dispatch import IoAdapter, LogAdapter, LOG_INFO, LOG_DEBUG, LOG_ERROR, 
SEMANTICS_ANYCAST_CLOSEST
 from qpid_dispatch.management.error import ManagementError, OK, CREATED, 
NO_CONTENT, STATUS_TEXT, \
     BadRequestStatus, InternalServerErrorStatus, NotImplementedStatus, 
NotFoundStatus
 from qpid_dispatch.management.entity import camelcase
@@ -603,8 +603,7 @@ class Agent(object):
         """Register the management address to receive management requests"""
         self.entities.refresh_from_c()
         self.log(LOG_INFO, "Activating management agent on %s" % address)
-        self.io = [IoAdapter(self.receive, address),
-                   IoAdapter(self.receive, address, True)] # Global
+        self.io = IoAdapter(self.receive, address, 'L', '0', 
SEMANTICS_ANYCAST_CLOSEST)
 
     def entity_class(self, entity_type):
         """Return the class that implements entity_type"""
@@ -636,7 +635,7 @@ class Agent(object):
             body=body)
         self.log(LOG_DEBUG, "Agent response:\n  %s\n  Responding to: \n  
%s"%(response, request))
         try:
-            self.io[0].send(response)
+            self.io.send(response)
         except:
             self.log(LOG_ERROR, "Can't respond to %s: %s"%(request, 
format_exc()))
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c695e93f/python/qpid_dispatch_internal/management/config.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/management/config.py 
b/python/qpid_dispatch_internal/management/config.py
index 0ab9bbb..1ecae6b 100644
--- a/python/qpid_dispatch_internal/management/config.py
+++ b/python/qpid_dispatch_internal/management/config.py
@@ -146,8 +146,8 @@ def configure_dispatch(dispatch, lib_handle, filename):
     configure(config.by_type('container')[0])
     configure(config.by_type('router')[0])
     qd.qd_dispatch_prepare(dispatch)
-    #agent.activate("$management_internal")
     qd.qd_router_setup_late(dispatch) # Actions requiring active management 
agent.
+    agent.activate("$_management_internal")
 
     # Remaining configuration
     for t in "fixedAddress", "listener", "connector", "waypoint", 
"linkRoutePattern":

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c695e93f/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index e641b00..7e998f0 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -294,18 +294,20 @@ void qdr_connection_handlers(qdr_core_t                
*core,
                              qdr_link_offer_t           offer,
                              qdr_link_drained_t         drained,
                              qdr_link_push_t            push,
-                             qdr_link_deliver_t         deliver)
-{
-    core->user_context          = context;
-    core->activate_handler      = activate;
-    core->first_attach_handler  = first_attach;
-    core->second_attach_handler = second_attach;
-    core->detach_handler        = detach;
-    core->flow_handler          = flow;
-    core->offer_handler         = offer;
-    core->drained_handler       = drained;
-    core->push_handler          = push;
-    core->deliver_handler       = deliver;
+                             qdr_link_deliver_t         deliver,
+                             qdr_delivery_update_t      delivery_update)
+{
+    core->user_context            = context;
+    core->activate_handler        = activate;
+    core->first_attach_handler    = first_attach;
+    core->second_attach_handler   = second_attach;
+    core->detach_handler          = detach;
+    core->flow_handler            = flow;
+    core->offer_handler           = offer;
+    core->drained_handler         = drained;
+    core->push_handler            = push;
+    core->deliver_handler         = deliver;
+    core->delivery_update_handler = delivery_update;
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c695e93f/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 865b4da..30a6d9c 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -242,6 +242,15 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
             qdr_forward_on_message_CT(core, sub, in_delivery ? 
in_delivery->link : 0, msg);
 
             //
+            // If the incoming delivery is not settled, it should be accepted 
and settled here.
+            //
+            if (in_delivery) {
+                in_delivery->disposition = PN_ACCEPTED;
+                in_delivery->settled     = true;
+                qdr_delivery_push_CT(core, in_delivery);
+            }
+
+            //
             // Rotate this subscription to the end of the list to get 
round-robin distribution.
             //
             if (DEQ_SIZE(addr->subscriptions) > 1) {

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c695e93f/src/router_core/management_agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/management_agent.c 
b/src/router_core/management_agent.c
index bfd0d80..27041cb 100644
--- a/src/router_core/management_agent.c
+++ b/src/router_core/management_agent.c
@@ -48,7 +48,7 @@ const char * const correlation_id = "correlation-id";
 const char * const results = "results";
 const char * const status_code = "statusCode";
 
-const char * MANAGEMENT_INTERNAL = "$management_internal";
+const char * MANAGEMENT_INTERNAL = "_local/$_management_internal";
 
 //TODO - Move these to amqp.h
 const unsigned char *MANAGEMENT_QUERY = (unsigned char*) "QUERY";

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c695e93f/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 98badb2..6fbf2b7 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -24,6 +24,7 @@ ALLOC_DEFINE(qdr_query_t);
 ALLOC_DEFINE(qdr_address_t);
 ALLOC_DEFINE(qdr_node_t);
 ALLOC_DEFINE(qdr_delivery_t);
+ALLOC_DEFINE(qdr_delivery_ref_t);
 ALLOC_DEFINE(qdr_link_t);
 ALLOC_DEFINE(qdr_router_ref_t);
 ALLOC_DEFINE(qdr_link_ref_t);
@@ -272,6 +273,22 @@ void qdr_del_node_ref(qdr_router_ref_list_t *ref_list, 
qdr_node_t *rnode)
 }
 
 
+void qdr_add_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_t *dlv)
+{
+    qdr_delivery_ref_t *ref = new_qdr_delivery_ref_t();
+    DEQ_ITEM_INIT(ref);
+    ref->dlv = dlv;
+    DEQ_INSERT_TAIL(*list, ref);
+}
+
+
+void qdr_del_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_ref_t 
*ref)
+{
+    DEQ_REMOVE(*list, ref);
+    free_qdr_delivery_ref_t(ref);
+}
+
+
 static void qdr_general_handler(void *context)
 {
     qdr_core_t              *core = (qdr_core_t*) context;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c695e93f/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index f31f1e5..dd7d1e0 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -91,6 +91,15 @@ struct qdr_action_t {
         } connection;
 
         //
+        // Arguments for delivery state updates
+        //
+        struct {
+            qdr_delivery_t *delivery;
+            uint64_t        disposition;
+            bool            settled;
+        } delivery;
+
+        //
         // Arguments for in-process messaging
         //
         struct {
@@ -181,6 +190,17 @@ struct qdr_delivery_t {
 ALLOC_DECLARE(qdr_delivery_t);
 DEQ_DECLARE(qdr_delivery_t, qdr_delivery_list_t);
 
+typedef struct qdr_delivery_ref_t {
+    DEQ_LINKS(struct qdr_delivery_ref_t);
+    qdr_delivery_t *dlv;
+} qdr_delivery_ref_t;
+
+ALLOC_DECLARE(qdr_delivery_ref_t);
+DEQ_DECLARE(qdr_delivery_ref_t, qdr_delivery_ref_list_t);
+
+void qdr_add_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_t *dlv);
+void qdr_del_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_ref_t 
*ref);
+
 #define QDR_LINK_LIST_CLASS_ADDRESS  0
 #define QDR_LINK_LIST_CLASS_DELIVERY 1
 #define QDR_LINK_LIST_CLASS_FLOW     2
@@ -188,22 +208,23 @@ DEQ_DECLARE(qdr_delivery_t, qdr_delivery_list_t);
 
 struct qdr_link_t {
     DEQ_LINKS(qdr_link_t);
-    qdr_core_t          *core;
-    void                *user_context;
-    qdr_connection_t    *conn;            ///< [ref] Connection that owns this 
link
-    qd_link_type_t       link_type;
-    qd_direction_t       link_direction;
-    char                *name;
-    qdr_address_t       *owning_addr;     ///< [ref] Address record that owns 
this link
-    qdr_link_t          *connected_link;  ///< [ref] If this is a link-route, 
reference the connected link
-    qdr_link_ref_t      *ref[QDR_LINK_LIST_CLASSES];  ///< Pointers to 
containing reference objects
-    qdr_delivery_list_t  undelivered;     ///< Deliveries to be forwarded or 
sent
-    qdr_delivery_list_t  unsettled;       ///< Unsettled deliveries
-    bool                 strip_annotations_in;
-    bool                 strip_annotations_out;
-    int                  capacity;
-    int                  incremental_credit_CT;
-    int                  incremental_credit;
+    qdr_core_t              *core;
+    void                    *user_context;
+    qdr_connection_t        *conn;               ///< [ref] Connection that 
owns this link
+    qd_link_type_t           link_type;
+    qd_direction_t           link_direction;
+    char                    *name;
+    qdr_address_t           *owning_addr;        ///< [ref] Address record 
that owns this link
+    qdr_link_t              *connected_link;     ///< [ref] If this is a 
link-route, reference the connected link
+    qdr_link_ref_t          *ref[QDR_LINK_LIST_CLASSES];  ///< Pointers to 
containing reference objects
+    qdr_delivery_list_t      undelivered;        ///< Deliveries to be 
forwarded or sent
+    qdr_delivery_list_t      unsettled;          ///< Unsettled deliveries
+    qdr_delivery_ref_list_t  updated_deliveries; ///< References to deliveries 
(in the unsettled list) with updates.
+    bool                     strip_annotations_in;
+    bool                     strip_annotations_out;
+    int                      capacity;
+    int                      incremental_credit_CT;
+    int                      incremental_credit;
 };
 
 ALLOC_DECLARE(qdr_link_t);
@@ -403,6 +424,7 @@ struct qdr_core_t {
     qdr_link_drained_t         drained_handler;
     qdr_link_push_t            push_handler;
     qdr_link_deliver_t         deliver_handler;
+    qdr_delivery_update_t      delivery_update_handler;
 
     const char *router_area;
     const char *router_id;
@@ -440,6 +462,7 @@ void  qdr_forwarder_setup_CT(qdr_core_t *core);
 qdr_action_t *qdr_action(qdr_action_handler_t action_handler, const char 
*label);
 void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
 void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit);
+void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv);
 void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query);
 
 void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c695e93f/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 5ff321d..5c44a7f 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -24,7 +24,7 @@
 
 static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool 
discard);
 static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool 
discard);
-
+static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard);
 
 
//==================================================================================
 // Internal Functions
@@ -124,7 +124,7 @@ void qdr_link_process_deliveries(qdr_core_t *core, 
qdr_link_t *link, int credit)
         sys_mutex_unlock(conn->work_lock);
 
         if (dlv)
-            core->deliver_handler(core->user_context, link, dlv);
+            core->deliver_handler(core->user_context, link, dlv, dlv->settled);
     }
 
     if (drained)
@@ -133,8 +133,19 @@ void qdr_link_process_deliveries(qdr_core_t *core, 
qdr_link_t *link, int credit)
         core->offer_handler(core->user_context, link, offer);
 
     //
-    // TODO - handle disposition/settlement updates
+    // Handle disposition/settlement updates
     //
+    qdr_delivery_ref_list_t updated_deliveries;
+    sys_mutex_lock(conn->work_lock);
+    DEQ_MOVE(link->updated_deliveries, updated_deliveries);
+    sys_mutex_unlock(conn->work_lock);
+
+    qdr_delivery_ref_t *ref = DEQ_HEAD(updated_deliveries);
+    while (ref) {
+        core->delivery_update_handler(core->user_context, ref->dlv, 
ref->dlv->disposition, ref->dlv->settled);
+        qdr_del_delivery_ref(&updated_deliveries, ref);
+        ref = DEQ_HEAD(updated_deliveries);
+    }
 }
 
 
@@ -162,27 +173,36 @@ void qdr_send_to2(qdr_core_t *core, qd_message_t *msg, 
const char *addr, bool ex
 }
 
 
-void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context)
+void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t 
*delivery, uint64_t disposition)
 {
-    delivery->context = context;
+    qdr_action_t *action = qdr_action(qdr_update_delivery_CT, 
"update_delivery");
+    action->args.delivery.delivery    = delivery;
+    action->args.delivery.disposition = disposition;
+    action->args.delivery.settled     = false;
+
+    qdr_action_enqueue(core, action);
 }
 
 
-void *qdr_delivery_get_context(qdr_delivery_t *delivery)
+void qdr_delivery_settle(qdr_core_t *core, qdr_delivery_t *delivery)
 {
-    return delivery->context;
+    qdr_action_t *action = qdr_action(qdr_update_delivery_CT, 
"update_delivery");
+    action->args.delivery.delivery = delivery;
+    action->args.delivery.settled  = true;
+
+    qdr_action_enqueue(core, action);
 }
 
 
-uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery)
+void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context)
 {
-    return delivery->disposition;
+    delivery->context = context;
 }
 
 
-bool qdr_delivery_is_settled(const qdr_delivery_t *delivery)
+void *qdr_delivery_get_context(qdr_delivery_t *delivery)
 {
-    return delivery->settled;
+    return delivery->context;
 }
 
 
@@ -270,7 +290,11 @@ static void qdr_link_deliver_CT(qdr_core_t *core, 
qdr_action_t *action, bool dis
             //
         }
     } else if (count == 1) {
-        if (qdr_delivery_is_settled(dlv))
+        if (dlv->settled)
+            //
+            // The delivery was pre-settled.  Issue replacement credit now 
that it's
+            // been forwarded.
+            //
             qdr_link_issue_credit_CT(core, link, 1);
         else
             DEQ_INSERT_TAIL(link->unsettled, dlv);
@@ -306,3 +330,42 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t 
*action, bool discard)
     qd_message_free(msg);
 }
 
+
+static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard)
+{
+    qdr_delivery_t *dlv     = action->args.delivery.delivery;
+    uint64_t        disp    = action->args.delivery.disposition;
+    //    bool            settled = action->args.delivery.settled;
+
+    if (disp != dlv->disposition) {
+        //
+        // Disposition has changed, propagate the change to the peer delivery.
+        //
+        dlv->disposition = disp;
+    }
+}
+
+
+void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv)
+{
+    if (!dlv || !dlv->link)
+        return;
+
+    qdr_link_t *link = dlv->link;
+
+    sys_mutex_lock(link->conn->work_lock);
+    qdr_add_delivery_ref(&link->updated_deliveries, dlv);
+
+    //
+    // Put this link on the connection's list of links with delivery activity.
+    //
+    qdr_add_link_ref(&link->conn->links_with_deliveries, link, 
QDR_LINK_LIST_CLASS_DELIVERY);
+    sys_mutex_unlock(link->conn->work_lock);
+
+    //
+    // Activate the connection
+    //
+    qdr_connection_activate_CT(core, link->conn);
+}
+
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c695e93f/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index d40fa2c..188f244 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -338,13 +338,26 @@ static void router_rx_handler(void* context, qd_link_t 
*link, pn_delivery_t *pnd
  */
 static void router_disposition_handler(void* context, qd_link_t *link, 
pn_delivery_t *pnd)
 {
-    //qd_router_t    *router   = (qd_router_t*) context;
+    qd_router_t    *router   = (qd_router_t*) context;
     qdr_delivery_t *delivery = (qdr_delivery_t*) pn_delivery_get_context(pnd);
 
     if (!delivery)
         return;
 
-    // TODO - hook into the core
+    //
+    // Update the disposition of the delivery
+    //
+    qdr_delivery_update_disposition(router->router_core, delivery, 
pn_delivery_remote_state(pnd));
+
+    //
+    // If the delivery is settled, remove the linkage between pn-delivery and 
qdr-delivery
+    // and settle the qdr-delivery.
+    //
+    if (pn_delivery_settled(pnd)) {
+        pn_delivery_set_context(pnd, 0);
+        qdr_delivery_set_context(delivery, 0);
+        qdr_delivery_settle(router->router_core, delivery);
+    }
 }
 
 
@@ -655,7 +668,7 @@ static void qd_router_link_push(void *context, qdr_link_t 
*link)
 }
 
 
-static void qd_router_link_deliver(void *context, qdr_link_t *link, 
qdr_delivery_t *dlv)
+static void qd_router_link_deliver(void *context, qdr_link_t *link, 
qdr_delivery_t *dlv, bool settled)
 {
     qd_link_t  *qlink = (qd_link_t*) qdr_link_get_context(link);
     pn_link_t  *plink = qd_link_pn(qlink);
@@ -671,12 +684,33 @@ static void qd_router_link_deliver(void *context, 
qdr_link_t *link, qdr_delivery
     qdr_delivery_set_context(dlv, pdlv);
 
     qd_message_send(qdr_delivery_message(dlv), qlink, 
qdr_link_strip_annotations_out(link));
-    if (qdr_delivery_is_settled(dlv))
+    if (settled)
         pn_delivery_settle(pdlv);
     pn_link_advance(plink);
 }
 
 
+static void qd_router_delivery_update(void *context, qdr_delivery_t *dlv, 
uint64_t disp, bool settled)
+{
+    pn_delivery_t *pnd = (pn_delivery_t*) qdr_delivery_get_context(dlv);
+
+    //
+    // If the disposition has changed, update the proton delivery.
+    //
+    if (disp != pn_delivery_remote_state(pnd))
+        pn_delivery_update(pnd, disp);
+
+    //
+    // If the delivery is settled, remove the linkage and settle the proton 
delivery.
+    //
+    if (settled) {
+        qdr_delivery_set_context(dlv, 0);
+        pn_delivery_set_context(pnd, 0);
+        pn_delivery_settle(pnd);
+    }
+}
+
+
 void qd_router_setup_late(qd_dispatch_t *qd)
 {
     qd->router->tracemask   = qd_tracemask();
@@ -691,7 +725,8 @@ void qd_router_setup_late(qd_dispatch_t *qd)
                             qd_router_link_offer,
                             qd_router_link_drained,
                             qd_router_link_push,
-                            qd_router_link_deliver);
+                            qd_router_link_deliver,
+                            qd_router_delivery_update);
 
     qd_router_python_setup(qd->router);
     qd_timer_schedule(qd->router->timer, 1000);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c695e93f/tools/qdstat
----------------------------------------------------------------------
diff --git a/tools/qdstat b/tools/qdstat
index ce1e850..3bcc33d 100755
--- a/tools/qdstat
+++ b/tools/qdstat
@@ -138,6 +138,7 @@ class BusManager(Node):
         if addr[0] == 'R' : return "router"
         if addr[0] == 'A' : return "area"
         if addr[0] == 'L' : return "local"
+        if addr[0] == 'T' : return "topo"
         if addr[0] == 'C' : return "link-incoming"
         if addr[0] == 'D' : return "link-outgoing"
         return "unknown: %s" % addr[0]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to