This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new 5ad0968 DISPATCH-2136: Fix tsan race: convert message aborted flag to atomic 5ad0968 is described below commit 5ad09687d17259ebb79550f1af46035613005100 Author: Chuck Rolke <c...@apache.org> AuthorDate: Thu May 27 10:22:09 2021 -0400 DISPATCH-2136: Fix tsan race: convert message aborted flag to atomic The message aborted flag is read and written by I/O and core threads. Conversion from a naked bool to a sys_atomic ensures consistent state. This closes #1235 --- src/message.c | 33 ++++++++++++++++++++++----------- src/message_private.h | 2 +- tests/tsan.supp | 3 --- 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/src/message.c b/src/message.c index f345da3..37d8370 100644 --- a/src/message.c +++ b/src/message.c @@ -45,6 +45,10 @@ #define LOCK sys_mutex_lock #define UNLOCK sys_mutex_unlock +// Implement bool flags with atomic variables +#define SET_FLAG(flag) sys_atomic_set(flag, 1) +#define IS_FLAG_SET(flag) (sys_atomic_get(flag) == 1) + const char *STR_AMQP_NULL = "null"; const char *STR_AMQP_TRUE = "T"; const char *STR_AMQP_FALSE = "F"; @@ -1004,6 +1008,7 @@ qd_message_t *qd_message() ZERO(msg->content); msg->content->lock = sys_mutex(); sys_atomic_init(&msg->content->ref_count, 1); + sys_atomic_init(&msg->content->aborted, 0); msg->content->parse_depth = QD_DEPTH_NONE; return (qd_message_t*) msg; } @@ -1083,6 +1088,7 @@ void qd_message_free(qd_message_t *in_msg) qd_buffer_free(content->pending); sys_mutex_free(content->lock); + sys_atomic_destroy(&content->aborted); free_qd_message_content_t(content); } @@ -1391,13 +1397,15 @@ qd_message_t *discard_receive(pn_delivery_t *delivery, break; } else if (rc == PN_EOS || rc < 0) { // End of message or error: finalize message_receive handling - msg->content->aborted = pn_delivery_aborted(delivery); + if (pn_delivery_aborted(delivery)) { + SET_FLAG(&msg->content->aborted); + } pn_record_t *record = pn_delivery_attachments(delivery); pn_record_set(record, PN_DELIVERY_CTX, 0); if (msg->content->oversize) { // Aborting the content disposes of downstream copies. // This has no effect on the received message. - msg->content->aborted = true; + SET_FLAG(&msg->content->aborted); } qd_message_set_receive_complete((qd_message_t*) msg); break; @@ -1519,8 +1527,9 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) content->receive_complete = true; content->q2_unblocker.handler = 0; qd_nullify_safe_ptr(&content->q2_unblocker.context); - content->aborted = pn_delivery_aborted(delivery); - + if (pn_delivery_aborted(delivery)) { + SET_FLAG(&msg->content->aborted); + } // unlink message and delivery pn_record_set(record, PN_DELIVERY_CTX, 0); } @@ -1764,11 +1773,11 @@ void qd_message_send(qd_message_t *in_msg, if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) { - if (content->aborted) { + if (IS_FLAG_SET(&content->aborted)) { // Message is aborted before any part of it has been sent. // Declare the message to be sent, msg->send_complete = true; - // the link has an outgoing deliver. abort it. + // If the outgoing delivery is not already aborted then abort it. if (!pn_delivery_aborted(pn_link_current(pnl))) { pn_delivery_abort(pn_link_current(pnl)); } @@ -1871,7 +1880,7 @@ void qd_message_send(qd_message_t *in_msg, pn_session_t *pns = pn_link_session(pnl); const size_t q3_upper = BUFFER_SIZE * QD_QLIMIT_Q3_UPPER; - while (!content->aborted + while (!IS_FLAG_SET(&content->aborted) && buf && pn_session_outgoing_bytes(pns) < q3_upper) { @@ -1893,7 +1902,7 @@ void qd_message_send(qd_message_t *in_msg, // send error - likely the link has failed and we will eventually // get a link detach event for this link // - content->aborted = true; + SET_FLAG(&content->aborted); msg->send_complete = true; if (!pn_delivery_aborted(pn_link_current(pnl))) { pn_delivery_abort(pn_link_current(pnl)); @@ -1970,7 +1979,7 @@ void qd_message_send(qd_message_t *in_msg, if (q2_unblock.handler) q2_unblock.handler(q2_unblock.context); - if (content->aborted) { + if (IS_FLAG_SET(&content->aborted)) { if (pn_link_current(pnl)) { msg->send_complete = true; if (!pn_delivery_aborted(pn_link_current(pnl))) { @@ -2903,7 +2912,9 @@ bool qd_message_is_Q2_blocked(const qd_message_t *msg) bool qd_message_aborted(const qd_message_t *msg) { - return ((qd_message_pvt_t *)msg)->content->aborted; + assert(msg); + qd_message_pvt_t * msg_pvt = (qd_message_pvt_t *)msg; + return IS_FLAG_SET(&msg_pvt->content->aborted); } void qd_message_set_aborted(const qd_message_t *msg) @@ -2911,7 +2922,7 @@ void qd_message_set_aborted(const qd_message_t *msg) if (!msg) return; qd_message_pvt_t * msg_pvt = (qd_message_pvt_t *)msg; - msg_pvt->content->aborted = true; + SET_FLAG(&msg_pvt->content->aborted); } diff --git a/src/message_private.h b/src/message_private.h index ab2b69d..bb7970f 100644 --- a/src/message_private.h +++ b/src/message_private.h @@ -140,13 +140,13 @@ typedef struct { bool discard; // Should this message be discarded? bool receive_complete; // true if the message has been completely received, false otherwise bool q2_input_holdoff; // hold off calling pn_link_recv - bool aborted; // receive completed with abort flag set bool disable_q2_holdoff; // Disable the Q2 flow control bool priority_parsed; bool priority_present; bool oversize; // policy oversize handling in effect bool no_body; // Used for http2 messages. If no_body is true, the HTTP request had no body uint8_t priority; // The priority of this message + sys_atomic_t aborted; } qd_message_content_t; struct qd_message_pvt_t { diff --git a/tests/tsan.supp b/tests/tsan.supp index b47c98e..8b2d294 100644 --- a/tests/tsan.supp +++ b/tests/tsan.supp @@ -59,9 +59,6 @@ race:tsan_reset_delivery_ids # DISPATCH-2135 race:qd_message_Q2_holdoff_disable -# DISPATCH-2136 -race:qd_message_set_aborted - # DISPATCH-2137 race:remote_sasl_process_init race:remote_sasl_prepare --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org