Repository: qpid-dispatch
Updated Branches:
refs/heads/tross-DISPATCH-179-1 b8facf8fe -> c695e93f5
DISPATCH-179 - Wired in the management agent (core and python).
Handle settlement of messages consumed in-process.
Added infrastructure for delivery disposition and settlement.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/c695e93f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/c695e93f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/c695e93f
Branch: refs/heads/tross-DISPATCH-179-1
Commit: c695e93f52f42172ffe08cc53c0008a143104264
Parents: b8facf8
Author: Ted Ross <[email protected]>
Authored: Thu Jan 21 15:53:01 2016 -0500
Committer: Ted Ross <[email protected]>
Committed: Thu Jan 21 15:53:01 2016 -0500
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 12 +--
.../qpid_dispatch_internal/management/agent.py | 7 +-
.../qpid_dispatch_internal/management/config.py | 2 +-
src/router_core/connections.c | 26 +++---
src/router_core/forwarder.c | 9 ++
src/router_core/management_agent.c | 2 +-
src/router_core/router_core.c | 17 ++++
src/router_core/router_core_private.h | 55 +++++++++----
src/router_core/transfer.c | 87 +++++++++++++++++---
src/router_node.c | 45 ++++++++--
tools/qdstat | 1 +
11 files changed, 207 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c695e93f/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h
b/include/qpid/dispatch/router_core.h
index 70b9294..ceaa57b 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -498,7 +498,8 @@ typedef void (*qdr_link_flow_t) (void *context,
qdr_link_t *link, int c
typedef void (*qdr_link_offer_t) (void *context, qdr_link_t *link, int
delivery_count);
typedef void (*qdr_link_drained_t) (void *context, qdr_link_t *link);
typedef void (*qdr_link_push_t) (void *context, qdr_link_t *link);
-typedef void (*qdr_link_deliver_t) (void *context, qdr_link_t *link,
qdr_delivery_t *delivery);
+typedef void (*qdr_link_deliver_t) (void *context, qdr_link_t *link,
qdr_delivery_t *delivery, bool settled);
+typedef void (*qdr_delivery_update_t) (void *context, qdr_delivery_t *dlv,
uint64_t disp, bool settled);
void qdr_connection_handlers(qdr_core_t *core,
void *context,
@@ -510,21 +511,22 @@ void qdr_connection_handlers(qdr_core_t
*core,
qdr_link_offer_t offer,
qdr_link_drained_t drained,
qdr_link_push_t push,
- qdr_link_deliver_t deliver);
+ qdr_link_deliver_t deliver,
+ qdr_delivery_update_t delivery_update);
/**
******************************************************************************
* Delivery functions
******************************************************************************
*/
+void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t
*delivery, uint64_t disp);
+void qdr_delivery_settle(qdr_core_t *core, qdr_delivery_t *delivery);
+
void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context);
void *qdr_delivery_get_context(qdr_delivery_t *delivery);
-uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery);
-bool qdr_delivery_is_settled(const qdr_delivery_t *delivery);
void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int
*length);
qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery);
-void qdr_delivery_update_disposition(qdr_delivery_t *delivery);
void qdr_delivery_update_flow(qdr_delivery_t *delivery);
void qdr_delivery_process(qdr_delivery_t *delivery);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c695e93f/python/qpid_dispatch_internal/management/agent.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/management/agent.py
b/python/qpid_dispatch_internal/management/agent.py
index 0750971..4d75f8b 100644
--- a/python/qpid_dispatch_internal/management/agent.py
+++ b/python/qpid_dispatch_internal/management/agent.py
@@ -73,7 +73,7 @@ from cProfile import Profile
from cStringIO import StringIO
from ctypes import c_void_p, py_object, c_long
from subprocess import Popen
-from ..dispatch import IoAdapter, LogAdapter, LOG_INFO, LOG_DEBUG, LOG_ERROR
+from ..dispatch import IoAdapter, LogAdapter, LOG_INFO, LOG_DEBUG, LOG_ERROR,
SEMANTICS_ANYCAST_CLOSEST
from qpid_dispatch.management.error import ManagementError, OK, CREATED,
NO_CONTENT, STATUS_TEXT, \
BadRequestStatus, InternalServerErrorStatus, NotImplementedStatus,
NotFoundStatus
from qpid_dispatch.management.entity import camelcase
@@ -603,8 +603,7 @@ class Agent(object):
"""Register the management address to receive management requests"""
self.entities.refresh_from_c()
self.log(LOG_INFO, "Activating management agent on %s" % address)
- self.io = [IoAdapter(self.receive, address),
- IoAdapter(self.receive, address, True)] # Global
+ self.io = IoAdapter(self.receive, address, 'L', '0',
SEMANTICS_ANYCAST_CLOSEST)
def entity_class(self, entity_type):
"""Return the class that implements entity_type"""
@@ -636,7 +635,7 @@ class Agent(object):
body=body)
self.log(LOG_DEBUG, "Agent response:\n %s\n Responding to: \n
%s"%(response, request))
try:
- self.io[0].send(response)
+ self.io.send(response)
except:
self.log(LOG_ERROR, "Can't respond to %s: %s"%(request,
format_exc()))
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c695e93f/python/qpid_dispatch_internal/management/config.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/management/config.py
b/python/qpid_dispatch_internal/management/config.py
index 0ab9bbb..1ecae6b 100644
--- a/python/qpid_dispatch_internal/management/config.py
+++ b/python/qpid_dispatch_internal/management/config.py
@@ -146,8 +146,8 @@ def configure_dispatch(dispatch, lib_handle, filename):
configure(config.by_type('container')[0])
configure(config.by_type('router')[0])
qd.qd_dispatch_prepare(dispatch)
- #agent.activate("$management_internal")
qd.qd_router_setup_late(dispatch) # Actions requiring active management
agent.
+ agent.activate("$_management_internal")
# Remaining configuration
for t in "fixedAddress", "listener", "connector", "waypoint",
"linkRoutePattern":
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c695e93f/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index e641b00..7e998f0 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -294,18 +294,20 @@ void qdr_connection_handlers(qdr_core_t
*core,
qdr_link_offer_t offer,
qdr_link_drained_t drained,
qdr_link_push_t push,
- qdr_link_deliver_t deliver)
-{
- core->user_context = context;
- core->activate_handler = activate;
- core->first_attach_handler = first_attach;
- core->second_attach_handler = second_attach;
- core->detach_handler = detach;
- core->flow_handler = flow;
- core->offer_handler = offer;
- core->drained_handler = drained;
- core->push_handler = push;
- core->deliver_handler = deliver;
+ qdr_link_deliver_t deliver,
+ qdr_delivery_update_t delivery_update)
+{
+ core->user_context = context;
+ core->activate_handler = activate;
+ core->first_attach_handler = first_attach;
+ core->second_attach_handler = second_attach;
+ core->detach_handler = detach;
+ core->flow_handler = flow;
+ core->offer_handler = offer;
+ core->drained_handler = drained;
+ core->push_handler = push;
+ core->deliver_handler = deliver;
+ core->delivery_update_handler = delivery_update;
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c695e93f/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 865b4da..30a6d9c 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -242,6 +242,15 @@ int qdr_forward_closest_CT(qdr_core_t *core,
qdr_forward_on_message_CT(core, sub, in_delivery ?
in_delivery->link : 0, msg);
//
+ // If the incoming delivery is not settled, it should be accepted
and settled here.
+ //
+ if (in_delivery) {
+ in_delivery->disposition = PN_ACCEPTED;
+ in_delivery->settled = true;
+ qdr_delivery_push_CT(core, in_delivery);
+ }
+
+ //
// Rotate this subscription to the end of the list to get
round-robin distribution.
//
if (DEQ_SIZE(addr->subscriptions) > 1) {
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c695e93f/src/router_core/management_agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/management_agent.c
b/src/router_core/management_agent.c
index bfd0d80..27041cb 100644
--- a/src/router_core/management_agent.c
+++ b/src/router_core/management_agent.c
@@ -48,7 +48,7 @@ const char * const correlation_id = "correlation-id";
const char * const results = "results";
const char * const status_code = "statusCode";
-const char * MANAGEMENT_INTERNAL = "$management_internal";
+const char * MANAGEMENT_INTERNAL = "_local/$_management_internal";
//TODO - Move these to amqp.h
const unsigned char *MANAGEMENT_QUERY = (unsigned char*) "QUERY";
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c695e93f/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 98badb2..6fbf2b7 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -24,6 +24,7 @@ ALLOC_DEFINE(qdr_query_t);
ALLOC_DEFINE(qdr_address_t);
ALLOC_DEFINE(qdr_node_t);
ALLOC_DEFINE(qdr_delivery_t);
+ALLOC_DEFINE(qdr_delivery_ref_t);
ALLOC_DEFINE(qdr_link_t);
ALLOC_DEFINE(qdr_router_ref_t);
ALLOC_DEFINE(qdr_link_ref_t);
@@ -272,6 +273,22 @@ void qdr_del_node_ref(qdr_router_ref_list_t *ref_list,
qdr_node_t *rnode)
}
+void qdr_add_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_t *dlv)
+{
+ qdr_delivery_ref_t *ref = new_qdr_delivery_ref_t();
+ DEQ_ITEM_INIT(ref);
+ ref->dlv = dlv;
+ DEQ_INSERT_TAIL(*list, ref);
+}
+
+
+void qdr_del_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_ref_t
*ref)
+{
+ DEQ_REMOVE(*list, ref);
+ free_qdr_delivery_ref_t(ref);
+}
+
+
static void qdr_general_handler(void *context)
{
qdr_core_t *core = (qdr_core_t*) context;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c695e93f/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h
b/src/router_core/router_core_private.h
index f31f1e5..dd7d1e0 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -91,6 +91,15 @@ struct qdr_action_t {
} connection;
//
+ // Arguments for delivery state updates
+ //
+ struct {
+ qdr_delivery_t *delivery;
+ uint64_t disposition;
+ bool settled;
+ } delivery;
+
+ //
// Arguments for in-process messaging
//
struct {
@@ -181,6 +190,17 @@ struct qdr_delivery_t {
ALLOC_DECLARE(qdr_delivery_t);
DEQ_DECLARE(qdr_delivery_t, qdr_delivery_list_t);
+typedef struct qdr_delivery_ref_t {
+ DEQ_LINKS(struct qdr_delivery_ref_t);
+ qdr_delivery_t *dlv;
+} qdr_delivery_ref_t;
+
+ALLOC_DECLARE(qdr_delivery_ref_t);
+DEQ_DECLARE(qdr_delivery_ref_t, qdr_delivery_ref_list_t);
+
+void qdr_add_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_t *dlv);
+void qdr_del_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_ref_t
*ref);
+
#define QDR_LINK_LIST_CLASS_ADDRESS 0
#define QDR_LINK_LIST_CLASS_DELIVERY 1
#define QDR_LINK_LIST_CLASS_FLOW 2
@@ -188,22 +208,23 @@ DEQ_DECLARE(qdr_delivery_t, qdr_delivery_list_t);
struct qdr_link_t {
DEQ_LINKS(qdr_link_t);
- qdr_core_t *core;
- void *user_context;
- qdr_connection_t *conn; ///< [ref] Connection that owns this
link
- qd_link_type_t link_type;
- qd_direction_t link_direction;
- char *name;
- qdr_address_t *owning_addr; ///< [ref] Address record that owns
this link
- qdr_link_t *connected_link; ///< [ref] If this is a link-route,
reference the connected link
- qdr_link_ref_t *ref[QDR_LINK_LIST_CLASSES]; ///< Pointers to
containing reference objects
- qdr_delivery_list_t undelivered; ///< Deliveries to be forwarded or
sent
- qdr_delivery_list_t unsettled; ///< Unsettled deliveries
- bool strip_annotations_in;
- bool strip_annotations_out;
- int capacity;
- int incremental_credit_CT;
- int incremental_credit;
+ qdr_core_t *core;
+ void *user_context;
+ qdr_connection_t *conn; ///< [ref] Connection that
owns this link
+ qd_link_type_t link_type;
+ qd_direction_t link_direction;
+ char *name;
+ qdr_address_t *owning_addr; ///< [ref] Address record
that owns this link
+ qdr_link_t *connected_link; ///< [ref] If this is a
link-route, reference the connected link
+ qdr_link_ref_t *ref[QDR_LINK_LIST_CLASSES]; ///< Pointers to
containing reference objects
+ qdr_delivery_list_t undelivered; ///< Deliveries to be
forwarded or sent
+ qdr_delivery_list_t unsettled; ///< Unsettled deliveries
+ qdr_delivery_ref_list_t updated_deliveries; ///< References to deliveries
(in the unsettled list) with updates.
+ bool strip_annotations_in;
+ bool strip_annotations_out;
+ int capacity;
+ int incremental_credit_CT;
+ int incremental_credit;
};
ALLOC_DECLARE(qdr_link_t);
@@ -403,6 +424,7 @@ struct qdr_core_t {
qdr_link_drained_t drained_handler;
qdr_link_push_t push_handler;
qdr_link_deliver_t deliver_handler;
+ qdr_delivery_update_t delivery_update_handler;
const char *router_area;
const char *router_id;
@@ -440,6 +462,7 @@ void qdr_forwarder_setup_CT(qdr_core_t *core);
qdr_action_t *qdr_action(qdr_action_handler_t action_handler, const char
*label);
void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit);
+void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv);
void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query);
void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c695e93f/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 5ff321d..5c44a7f 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -24,7 +24,7 @@
static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool
discard);
static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool
discard);
-
+static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action,
bool discard);
//==================================================================================
// Internal Functions
@@ -124,7 +124,7 @@ void qdr_link_process_deliveries(qdr_core_t *core,
qdr_link_t *link, int credit)
sys_mutex_unlock(conn->work_lock);
if (dlv)
- core->deliver_handler(core->user_context, link, dlv);
+ core->deliver_handler(core->user_context, link, dlv, dlv->settled);
}
if (drained)
@@ -133,8 +133,19 @@ void qdr_link_process_deliveries(qdr_core_t *core,
qdr_link_t *link, int credit)
core->offer_handler(core->user_context, link, offer);
//
- // TODO - handle disposition/settlement updates
+ // Handle disposition/settlement updates
//
+ qdr_delivery_ref_list_t updated_deliveries;
+ sys_mutex_lock(conn->work_lock);
+ DEQ_MOVE(link->updated_deliveries, updated_deliveries);
+ sys_mutex_unlock(conn->work_lock);
+
+ qdr_delivery_ref_t *ref = DEQ_HEAD(updated_deliveries);
+ while (ref) {
+ core->delivery_update_handler(core->user_context, ref->dlv,
ref->dlv->disposition, ref->dlv->settled);
+ qdr_del_delivery_ref(&updated_deliveries, ref);
+ ref = DEQ_HEAD(updated_deliveries);
+ }
}
@@ -162,27 +173,36 @@ void qdr_send_to2(qdr_core_t *core, qd_message_t *msg,
const char *addr, bool ex
}
-void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context)
+void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t
*delivery, uint64_t disposition)
{
- delivery->context = context;
+ qdr_action_t *action = qdr_action(qdr_update_delivery_CT,
"update_delivery");
+ action->args.delivery.delivery = delivery;
+ action->args.delivery.disposition = disposition;
+ action->args.delivery.settled = false;
+
+ qdr_action_enqueue(core, action);
}
-void *qdr_delivery_get_context(qdr_delivery_t *delivery)
+void qdr_delivery_settle(qdr_core_t *core, qdr_delivery_t *delivery)
{
- return delivery->context;
+ qdr_action_t *action = qdr_action(qdr_update_delivery_CT,
"update_delivery");
+ action->args.delivery.delivery = delivery;
+ action->args.delivery.settled = true;
+
+ qdr_action_enqueue(core, action);
}
-uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery)
+void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context)
{
- return delivery->disposition;
+ delivery->context = context;
}
-bool qdr_delivery_is_settled(const qdr_delivery_t *delivery)
+void *qdr_delivery_get_context(qdr_delivery_t *delivery)
{
- return delivery->settled;
+ return delivery->context;
}
@@ -270,7 +290,11 @@ static void qdr_link_deliver_CT(qdr_core_t *core,
qdr_action_t *action, bool dis
//
}
} else if (count == 1) {
- if (qdr_delivery_is_settled(dlv))
+ if (dlv->settled)
+ //
+ // The delivery was pre-settled. Issue replacement credit now
that it's
+ // been forwarded.
+ //
qdr_link_issue_credit_CT(core, link, 1);
else
DEQ_INSERT_TAIL(link->unsettled, dlv);
@@ -306,3 +330,42 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t
*action, bool discard)
qd_message_free(msg);
}
+
+static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action,
bool discard)
+{
+ qdr_delivery_t *dlv = action->args.delivery.delivery;
+ uint64_t disp = action->args.delivery.disposition;
+ // bool settled = action->args.delivery.settled;
+
+ if (disp != dlv->disposition) {
+ //
+ // Disposition has changed, propagate the change to the peer delivery.
+ //
+ dlv->disposition = disp;
+ }
+}
+
+
+void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv)
+{
+ if (!dlv || !dlv->link)
+ return;
+
+ qdr_link_t *link = dlv->link;
+
+ sys_mutex_lock(link->conn->work_lock);
+ qdr_add_delivery_ref(&link->updated_deliveries, dlv);
+
+ //
+ // Put this link on the connection's list of links with delivery activity.
+ //
+ qdr_add_link_ref(&link->conn->links_with_deliveries, link,
QDR_LINK_LIST_CLASS_DELIVERY);
+ sys_mutex_unlock(link->conn->work_lock);
+
+ //
+ // Activate the connection
+ //
+ qdr_connection_activate_CT(core, link->conn);
+}
+
+
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c695e93f/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index d40fa2c..188f244 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -338,13 +338,26 @@ static void router_rx_handler(void* context, qd_link_t
*link, pn_delivery_t *pnd
*/
static void router_disposition_handler(void* context, qd_link_t *link,
pn_delivery_t *pnd)
{
- //qd_router_t *router = (qd_router_t*) context;
+ qd_router_t *router = (qd_router_t*) context;
qdr_delivery_t *delivery = (qdr_delivery_t*) pn_delivery_get_context(pnd);
if (!delivery)
return;
- // TODO - hook into the core
+ //
+ // Update the disposition of the delivery
+ //
+ qdr_delivery_update_disposition(router->router_core, delivery,
pn_delivery_remote_state(pnd));
+
+ //
+ // If the delivery is settled, remove the linkage between pn-delivery and
qdr-delivery
+ // and settle the qdr-delivery.
+ //
+ if (pn_delivery_settled(pnd)) {
+ pn_delivery_set_context(pnd, 0);
+ qdr_delivery_set_context(delivery, 0);
+ qdr_delivery_settle(router->router_core, delivery);
+ }
}
@@ -655,7 +668,7 @@ static void qd_router_link_push(void *context, qdr_link_t
*link)
}
-static void qd_router_link_deliver(void *context, qdr_link_t *link,
qdr_delivery_t *dlv)
+static void qd_router_link_deliver(void *context, qdr_link_t *link,
qdr_delivery_t *dlv, bool settled)
{
qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
pn_link_t *plink = qd_link_pn(qlink);
@@ -671,12 +684,33 @@ static void qd_router_link_deliver(void *context,
qdr_link_t *link, qdr_delivery
qdr_delivery_set_context(dlv, pdlv);
qd_message_send(qdr_delivery_message(dlv), qlink,
qdr_link_strip_annotations_out(link));
- if (qdr_delivery_is_settled(dlv))
+ if (settled)
pn_delivery_settle(pdlv);
pn_link_advance(plink);
}
+static void qd_router_delivery_update(void *context, qdr_delivery_t *dlv,
uint64_t disp, bool settled)
+{
+ pn_delivery_t *pnd = (pn_delivery_t*) qdr_delivery_get_context(dlv);
+
+ //
+ // If the disposition has changed, update the proton delivery.
+ //
+ if (disp != pn_delivery_remote_state(pnd))
+ pn_delivery_update(pnd, disp);
+
+ //
+ // If the delivery is settled, remove the linkage and settle the proton
delivery.
+ //
+ if (settled) {
+ qdr_delivery_set_context(dlv, 0);
+ pn_delivery_set_context(pnd, 0);
+ pn_delivery_settle(pnd);
+ }
+}
+
+
void qd_router_setup_late(qd_dispatch_t *qd)
{
qd->router->tracemask = qd_tracemask();
@@ -691,7 +725,8 @@ void qd_router_setup_late(qd_dispatch_t *qd)
qd_router_link_offer,
qd_router_link_drained,
qd_router_link_push,
- qd_router_link_deliver);
+ qd_router_link_deliver,
+ qd_router_delivery_update);
qd_router_python_setup(qd->router);
qd_timer_schedule(qd->router->timer, 1000);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c695e93f/tools/qdstat
----------------------------------------------------------------------
diff --git a/tools/qdstat b/tools/qdstat
index ce1e850..3bcc33d 100755
--- a/tools/qdstat
+++ b/tools/qdstat
@@ -138,6 +138,7 @@ class BusManager(Node):
if addr[0] == 'R' : return "router"
if addr[0] == 'A' : return "area"
if addr[0] == 'L' : return "local"
+ if addr[0] == 'T' : return "topo"
if addr[0] == 'C' : return "link-incoming"
if addr[0] == 'D' : return "link-outgoing"
return "unknown: %s" % addr[0]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]