This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new b952edc DISPATCH-1226: fix race during link detach handling
b952edc is described below
commit b952edc093113f9b9ecada9df46322d09fdd0e87
Author: Kenneth Giusti <[email protected]>
AuthorDate: Tue Dec 18 09:52:47 2018 -0500
DISPATCH-1226: fix race during link detach handling
This closes #427
---
src/router_core/connections.c | 70 +++++++++++++++++++++--------------
src/router_core/router_core_private.h | 3 +-
2 files changed, 44 insertions(+), 29 deletions(-)
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index a1717a8..54a27be 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -32,7 +32,8 @@ static void qdr_connection_closed_CT(qdr_core_t *core,
qdr_action_t *action, boo
static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t
*action, bool discard);
static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t
*action, bool discard);
static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action,
bool discard);
-static void qdr_link_delete_CT(qdr_core_t *core, qdr_action_t *action, bool
discard);
+static void qdr_link_detach_sent_CT(qdr_core_t *core, qdr_action_t *action,
bool discard);
+static void qdr_link_detach_sent(qdr_link_t *link);
ALLOC_DEFINE(qdr_connection_t);
ALLOC_DEFINE(qdr_connection_work_t);
@@ -217,7 +218,7 @@ int qdr_connection_process(qdr_connection_t *conn)
qdr_link_ref_t *ref;
qdr_link_t *link;
- bool free_link;
+ bool detach_sent;
int event_count = 0;
@@ -264,7 +265,7 @@ int qdr_connection_process(qdr_connection_t *conn)
for (int priority = QDR_MAX_PRIORITY; priority >= 0; -- priority) {
do {
qdr_link_work_t *link_work;
- free_link = false;
+ detach_sent = false;
ref = DEQ_HEAD(links_with_work[priority]);
if (ref) {
@@ -325,12 +326,11 @@ int qdr_connection_process(qdr_connection_t *conn)
break;
case QDR_LINK_WORK_FIRST_DETACH :
- core->detach_handler(core->user_context, link,
link_work->error, true, link_work->close_link);
- break;
-
case QDR_LINK_WORK_SECOND_DETACH :
- core->detach_handler(core->user_context, link,
link_work->error, false, link_work->close_link);
- free_link = true;
+ core->detach_handler(core->user_context, link,
link_work->error,
+ link_work->work_type ==
QDR_LINK_WORK_FIRST_DETACH,
+ link_work->close_link);
+ detach_sent = true;
break;
}
@@ -352,10 +352,12 @@ int qdr_connection_process(qdr_connection_t *conn)
event_count++;
}
- if (free_link)
- qdr_link_delete(link);
+ if (detach_sent) {
+ // let the core thread know so it can clean up
+ qdr_link_detach_sent(link);
+ }
}
- } while (free_link || link);
+ } while (detach_sent || link);
}
return event_count;
@@ -528,9 +530,11 @@ void qdr_link_detach(qdr_link_t *link, qd_detach_type_t
dt, qdr_error_t *error)
}
-void qdr_link_delete(qdr_link_t *link)
+/* let the core thread know that a dispatch has been sent by the I/O thread
+ */
+static void qdr_link_detach_sent(qdr_link_t *link)
{
- qdr_action_t *action = qdr_action(qdr_link_delete_CT, "link_delete");
+ qdr_action_t *action = qdr_action(qdr_link_detach_sent_CT,
"link_detach_sent");
action->args.connection.link = link;
qdr_action_enqueue(link->core, action);
@@ -911,9 +915,6 @@ void qdr_link_outbound_detach_CT(qdr_core_t *core,
qdr_link_t *link, qdr_error_t
qdr_link_work_t *work = new_qdr_link_work_t();
ZERO(work);
work->work_type = ++link->detach_count == 1 ? QDR_LINK_WORK_FIRST_DETACH
: QDR_LINK_WORK_SECOND_DETACH;
- if (work->work_type == QDR_LINK_WORK_SECOND_DETACH) {
- link->detach_received = true;
- }
work->close_link = close;
if (error)
@@ -1538,10 +1539,11 @@ static void qdr_link_inbound_detach_CT(qdr_core_t
*core, qdr_action_t *action, b
qd_detach_type_t dt = action->args.connection.dt;
qdr_address_t *addr = link->owning_addr;
- //
- // Bump the detach count to track half and full detaches
- //
- link->detach_count++;
+ if (link->detach_received)
+ return;
+
+ link->detach_received = true;
+ ++link->detach_count;
if (link->core_endpoint) {
qdrc_endpoint_do_detach_CT(core, link->core_endpoint, error);
@@ -1570,7 +1572,7 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core,
qdr_action_t *action, b
//
// If the link is completely detached, release its resources
//
- if (link->detach_count == 2) {
+ if (link->detach_send_done) {
qdr_link_cleanup_CT(core, conn, link);
free_qdr_link_t(link);
}
@@ -1657,13 +1659,19 @@ static void qdr_link_inbound_detach_CT(qdr_core_t
*core, qdr_action_t *action, b
// Handle the disposition of any deliveries that remain on the link
//
qdr_link_cleanup_deliveries_CT(core, conn, link);
-
+
//
// If the detach occurred via protocol, send a detach back.
//
- if (dt != QD_LOST)
+ if (dt != QD_LOST) {
qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NONE, dt
== QD_CLOSED);
- } else {
+ } else {
+ // no detach can be sent out because the connection was lost
+ qdr_link_cleanup_CT(core, conn, link);
+ free_qdr_link_t(link);
+ }
+ } else if (link->detach_send_done) { // detach count indicates detach has
been scheduled
+ // I/O thread is finished sending detach, ok to free link now
qdr_link_cleanup_CT(core, conn, link);
free_qdr_link_t(link);
}
@@ -1680,16 +1688,22 @@ static void qdr_link_inbound_detach_CT(qdr_core_t
*core, qdr_action_t *action, b
}
-static void qdr_link_delete_CT(qdr_core_t *core, qdr_action_t *action, bool
discard)
+/* invoked on core thread to signal that the I/O thread has sent the detach
+ */
+static void qdr_link_detach_sent_CT(qdr_core_t *core, qdr_action_t *action,
bool discard)
{
if (discard)
return;
qdr_link_t *link = action->args.connection.link;
- if (link && link->conn) {
- qdr_link_cleanup_CT(core, link->conn, link);
- free_qdr_link_t(link);
+ if (link) {
+ link->detach_send_done = true;
+ if (link->conn && link->detach_received) {
+ // link is fully detached
+ qdr_link_cleanup_CT(core, link->conn, link);
+ free_qdr_link_t(link);
+ }
}
}
diff --git a/src/router_core/router_core_private.h
b/src/router_core/router_core_private.h
index 7d6f2a3..1cc21e3 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -441,7 +441,8 @@ struct qdr_link_t {
bool strip_annotations_out;
bool drain_mode;
bool stalled_outbound; ///< Indicates that this link
is stalled on outbound buffer backpressure
- bool detach_received;
+ bool detach_received; ///< True on core receipt of
inbound attach
+ bool detach_send_done; ///< True once the detach has
been sent by the I/O thread
bool edge; ///< True if this link is in
an edge-connection
char *strip_prefix;
char *insert_prefix;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]