This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new c47f5ad DISPATCH-1406: resupply credit to non-endpoint links if
address destination unavailable
c47f5ad is described below
commit c47f5ad7dedee2b1d6627e63b73aab2b27e687eb
Author: Kenneth Giusti <[email protected]>
AuthorDate: Fri Sep 6 13:54:42 2019 -0400
DISPATCH-1406: resupply credit to non-endpoint links if address destination
unavailable
---
python/qpid_dispatch/management/qdrouter.json | 2 +-
src/router_core/delivery.c | 19 ++++++++++++++++++
src/router_core/delivery.h | 1 +
src/router_core/transfer.c | 29 ++++++++++++++++++++++-----
4 files changed, 45 insertions(+), 6 deletions(-)
diff --git a/python/qpid_dispatch/management/qdrouter.json
b/python/qpid_dispatch/management/qdrouter.json
index 2b494fc..707ae46 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -489,7 +489,7 @@
},
"defaultDistribution": {
"type": ["multicast", "closest", "balanced",
"unavailable"],
- "description": "Default forwarding treatment for any
address without a specified treatment. multicast - one copy of each message
delivered to all subscribers; closest - messages delivered to only the closest
subscriber; balanced - messages delivered to one subscriber with load balanced
across subscribers; unavailable - this address is unavailable, link attaches to
an address of unavilable distribution will be rejected.",
+ "description": "Default forwarding treatment for any
address without a specified treatment. multicast - one copy of each message
delivered to all subscribers; closest - messages delivered to only the closest
subscriber; balanced - messages delivered to one subscriber with load balanced
across subscribers; unavailable - this address is unavailable, messages sent
and link attaches to the address will be rejected.",
"create": true,
"required": false,
"default": "balanced"
diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c
index 3992585..3e21ed1 100644
--- a/src/router_core/delivery.c
+++ b/src/router_core/delivery.c
@@ -264,6 +264,25 @@ void qdr_delivery_failed_CT(qdr_core_t *core,
qdr_delivery_t *dlv)
}
+void qdr_delivery_reject_CT(qdr_core_t *core, qdr_delivery_t *dlv)
+{
+ bool push = dlv->disposition != PN_REJECTED;
+
+ dlv->disposition = PN_REJECTED;
+ dlv->settled = true;
+ bool moved = qdr_delivery_settled_CT(core, dlv);
+
+ if (push || moved)
+ qdr_delivery_push_CT(core, dlv);
+
+ //
+ // Remove the unsettled reference
+ //
+ if (moved)
+ qdr_delivery_decref_CT(core, dlv, "qdr_delivery_reject_CT - remove
from unsettled list");
+}
+
+
bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv)
{
//
diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h
index 9619bfb..16fcc43 100644
--- a/src/router_core/delivery.h
+++ b/src/router_core/delivery.h
@@ -125,6 +125,7 @@ qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core,
qdr_delivery_t *delivery)
/* update settlement and/or disposition and schedule I/O processing */
void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery);
+void qdr_delivery_reject_CT(qdr_core_t *core, qdr_delivery_t *delivery);
void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *delivery);
bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *delivery);
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 0d314cd..17067c4 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -480,20 +480,39 @@ static void qdr_link_forward_CT(qdr_core_t *core,
qdr_link_t *link, qdr_delivery
}
//
- // There is no address that we can send this delivery to, which means the
addr was not found in our hastable. This
+ // There is no address that we can send this delivery to, which means the
addr was not found in our hash table. This
// can be because there were no receivers or because the address was not
defined in the config file.
- // If the treatment for such addresses is set to be unavailable, we send
back a rejected disposition and detach the link
//
else if (core->qd->default_treatment == QD_TREATMENT_UNAVAILABLE) {
- dlv->disposition = PN_REJECTED;
- dlv->error = qdr_error(QD_AMQP_COND_NOT_FOUND, "Deliveries cannot be
sent to an unavailable address");
- qdr_delivery_push_CT(core, dlv);
+ //
+ // If the treatment for these addresses is set to be unavailable, we
+ // stop trying to forward it. If the link is a locally attached client
+ // we reject the message if the link is not anonymous as per the
+ // documentation of the router's defaultTreatment=unavailable. We
+ // simply release it for other link types as the message did have a
+ // destination at some point (it was forwarded to this router after
+ // all) - the loss of the destination may be temporary.
+ //
+ if (link->link_type == QD_LINK_ENDPOINT) {
+ dlv->error = qdr_error(QD_AMQP_COND_NOT_FOUND, "Deliveries cannot
be sent to an unavailable address");
+ qdr_delivery_reject_CT(core, dlv);
+ if (qdr_link_is_anonymous(link)) {
+ qdr_link_issue_credit_CT(core, link, 1, false);
+ } else {
+ // cannot forward on this targeted link. withhold credit and
drain
+ qdr_link_issue_credit_CT(core, link, 0, true);
+ }
+ } else {
+ qdr_delivery_release_CT(core, dlv);
+ qdr_link_issue_credit_CT(core, link, 1, false);
+ }
//
// We will not detach this link because this could be anonymous
sender. We don't know
// which address the sender will be sending to next
// If this was not an anonymous sender, the initial attach would have
been rejected if the target address was unavailable.
//
+ qdr_delivery_decref_CT(core, dlv, "qdr_link_forward_CT - removed from
action (treatment unavailable)");
return;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]