asfgit closed pull request #431: DISPATCH-1231: correct credit handling in core
client
URL: https://github.com/apache/qpid-dispatch/pull/431
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/src/router_core/core_client_api.c
b/src/router_core/core_client_api.c
index 74838692..479a45ff 100644
--- a/src/router_core/core_client_api.c
+++ b/src/router_core/core_client_api.c
@@ -435,11 +435,11 @@ static void _sender_flow_CT(void *context,
qdrc_client_t *client = (qdrc_client_t *)context;
qdr_core_t *core = client->core;
+ client->tx_credit += available_credit;
qd_log(core->log, QD_LOG_TRACE,
"Core client sender flow granted c=%p credit=%d d=%s",
- client, available_credit, (drain) ? "T" : "F");
- client->tx_credit = available_credit;
- if (available_credit > 0) {
+ client, client->tx_credit, (drain) ? "T" : "F");
+ if (client->tx_credit > 0) {
_flush_send_queue_CT(client);
}
diff --git a/src/router_core/modules/address_lookup_client/lookup_client.c
b/src/router_core/modules/address_lookup_client/lookup_client.c
index 048a3997..197bd34d 100644
--- a/src/router_core/modules/address_lookup_client/lookup_client.c
+++ b/src/router_core/modules/address_lookup_client/lookup_client.c
@@ -575,12 +575,12 @@ static void on_state(qdr_core_t *core,
static void on_flow(qdr_core_t *core,
qdrc_client_t *api_client,
void *user_context,
- int available_credit,
+ int more_credit,
bool drain)
{
qcm_lookup_client_t *client = (qcm_lookup_client_t*) user_context;
- client->request_credit = available_credit;
+ client->request_credit += more_credit;
//
// If we have positive credit, process any pending requests
diff --git a/src/router_core/modules/edge_router/edge_mgmt.c
b/src/router_core/modules/edge_router/edge_mgmt.c
index 7dd62454..e3f85324 100644
--- a/src/router_core/modules/edge_router/edge_mgmt.c
+++ b/src/router_core/modules/edge_router/edge_mgmt.c
@@ -139,10 +139,7 @@ static void _mgmt_on_state_cb_CT(qdr_core_t *core,
user_context,
(active) ? "active" : "down");
- if (!active) {
- // stop the syncing of link routes by setting credit=0
- qcm_edge_link_route_proxy_flow_CT(core, 0, true);
- }
+ qcm_edge_link_route_proxy_state_CT(core, active);
}
@@ -150,16 +147,16 @@ static void _mgmt_on_state_cb_CT(qdr_core_t *core,
static void _mgmt_on_flow_cb_CT(qdr_core_t *core,
qdrc_client_t *client,
void *user_context,
- int available_credit,
+ int more_credit,
bool drain)
{
qd_log(core->log, QD_LOG_TRACE,
"edge mgmt client flow: uc=%p c=%d d=%s",
- user_context, available_credit,
+ user_context, more_credit,
(drain) ? "T" : "F");
qcm_edge_link_route_proxy_flow_CT(core,
- available_credit,
+ more_credit,
drain);
}
diff --git a/src/router_core/modules/edge_router/link_route_proxy.c
b/src/router_core/modules/edge_router/link_route_proxy.c
index 79f51ede..eaf6ea9c 100644
--- a/src/router_core/modules/edge_router/link_route_proxy.c
+++ b/src/router_core/modules/edge_router/link_route_proxy.c
@@ -430,11 +430,20 @@ static void _on_addr_event(void *context,
// Public API
//
+// called by edge mgmt API when link(s) detach
+void qcm_edge_link_route_proxy_state_CT(qdr_core_t *core, bool active)
+{
+ if (!active)
+ _available_credit = 0; // stop sending pending syncs
+ else if (_available_credit > 0)
+ _sync_interior_proxies(core);
+}
+
// called by the edge mgmt API when credit has been granted:
void qcm_edge_link_route_proxy_flow_CT(qdr_core_t *core, int available_credit,
bool drain)
{
- _available_credit = available_credit;
+ _available_credit += available_credit;
_sync_interior_proxies(core);
if (drain) {
_available_credit = 0;
diff --git a/src/router_core/modules/edge_router/link_route_proxy.h
b/src/router_core/modules/edge_router/link_route_proxy.h
index 3f8b0eec..f5307c58 100644
--- a/src/router_core/modules/edge_router/link_route_proxy.h
+++ b/src/router_core/modules/edge_router/link_route_proxy.h
@@ -29,5 +29,5 @@
void qcm_edge_link_route_init_CT(qdr_core_t *core);
void qcm_edge_link_route_final_CT(qdr_core_t *core);
void qcm_edge_link_route_proxy_flow_CT(qdr_core_t *core, int available_credit,
bool drain);
-
+void qcm_edge_link_route_proxy_state_CT(qdr_core_t *core, bool active);
#endif
diff --git a/src/router_core/modules/test_hooks/core_test_hooks.c
b/src/router_core/modules/test_hooks/core_test_hooks.c
index c3d25a3f..ed9cc0d2 100644
--- a/src/router_core/modules/test_hooks/core_test_hooks.c
+++ b/src/router_core/modules/test_hooks/core_test_hooks.c
@@ -474,6 +474,8 @@ static void
qdrc_test_hooks_core_endpoint_finalize(test_module_t *module)
// tests. Any changes here may require updates to those tests.
//
+static void _do_send(test_client_t *tc);
+
struct test_client_t {
test_module_t *module;
qdrc_event_subscription_t *conn_events;
@@ -511,6 +513,7 @@ static void _client_on_ack_cb(qdr_core_t *core,
request_context, disposition);
assert((int64_t)request_context < tc->counter);
}
+
static void _client_on_done_cb(qdr_core_t *core,
qdrc_client_t *client,
void *user_context,
@@ -519,16 +522,21 @@ static void _client_on_done_cb(qdr_core_t *core,
{
// the system_tests_core_client.py looks for the following
// log message during the tests
+ test_client_t *tc = (test_client_t *)user_context;
qd_log_level_t level = (error) ? QD_LOG_ERROR : QD_LOG_TRACE;
qd_log(core->log, level,
"client test request done error=%s",
(error) ? error : "None");
+ if (!error && tc->credit > 0) {
+ _do_send(tc);
+ }
}
+// send a single request if credit available
static void _do_send(test_client_t *tc)
{
int rc = 0;
- while (tc->credit > 0) {
+ if (tc->credit > 0) {
qd_composed_field_t *props =
qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
qd_composed_field_t *body =
qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
@@ -553,7 +561,7 @@ static void _do_send(test_client_t *tc)
++tc->counter;
--tc->credit;
qd_log(tc->module->core->log, QD_LOG_TRACE,
- "client test message sent id=%"PRIi64" c=%d", tc->counter + 1,
tc->credit);
+ "client test message sent id=%"PRIi64" c=%d", tc->counter - 1,
tc->credit);
}
}
@@ -577,9 +585,12 @@ static void _client_on_flow_cb(qdr_core_t *core,
qdrc_client_t *core_client,
qd_log(tc->module->core->log, QD_LOG_TRACE,
"client test on flow c=%d d=%c", available_credit, drain ? 'T' :
'F');
tc->credit = available_credit;
- _do_send(tc);
- if (drain)
- tc->credit = 0;
+ if (drain) {
+ while (tc->credit > 0)
+ _do_send(tc);
+ } else {
+ _do_send(tc);
+ }
}
static void _on_conn_event(void *context, qdrc_event_t type, qdr_connection_t
*conn)
@@ -607,7 +618,7 @@ static void _on_conn_event(void *context, qdrc_event_t
type, qdr_connection_t *c
tc->core_client = qdrc_client_CT(tc->module->core,
tc->conn,
target,
- 10, // credit window
+ 10, // reply credit window
tc, // user context
_client_on_state_cb,
_client_on_flow_cb);
diff --git a/tests/system_tests_core_client.py
b/tests/system_tests_core_client.py
index 83c3338e..9030bcdb 100644
--- a/tests/system_tests_core_client.py
+++ b/tests/system_tests_core_client.py
@@ -32,11 +32,11 @@
from proton.handlers import MessagingHandler
from proton.reactor import Container
-# test the request/response core client messaging API These tests rely on
-# enabling the router test hooks, which instantiates a test client (see
-# modules/test_hooks/core_test_hooks)
+# test the request/response core client messaging API
+#
+# These tests rely on enabling the router test hooks, which instantiates a test
+# client (see modules/test_hooks/core_test_hooks) see core_test_hooks.c
-# see core_test_hooks.c
CONTAINER_ID = "org.apache.qpid.dispatch.test_core_client"
TARGET_ADDR = "test_core_client_address"
@@ -53,11 +53,11 @@ def setUpClass(cls):
cls.router = cls.tester.qdrouterd("A", config, cl_args=["-T"])
def test_send_receive(self):
- ts = TestService(self.router.addresses[0], credit=10)
+ ts = TestService(self.router.addresses[0], credit=250)
ts.run()
self.assertTrue(ts.error is None)
- self.assertEqual(10, ts.in_count)
- self.assertEqual(10, ts.out_count)
+ self.assertEqual(250, ts.in_count)
+ self.assertEqual(250, ts.out_count)
def test_credit_starve(self):
ts = TestCreditStarve(self.router.addresses[0])
@@ -72,7 +72,6 @@ def test_unexpected_conn_close(self):
self.assertTrue(ts.error is None)
self.assertTrue(ts.in_count >= 1)
-
def test_bad_format(self):
ts = TestNoCorrelationId(self.router.addresses[0])
ts.run()
@@ -125,7 +124,6 @@ def timeout(self):
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, self.Timeout(self))
-
self._conn = event.container.connect(self.address)
def on_link_opening(self, event):
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]