This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ad0968  DISPATCH-2136: Fix tsan race: convert message aborted flag to 
atomic
5ad0968 is described below

commit 5ad09687d17259ebb79550f1af46035613005100
Author: Chuck Rolke <c...@apache.org>
AuthorDate: Thu May 27 10:22:09 2021 -0400

    DISPATCH-2136: Fix tsan race: convert message aborted flag to atomic
    
    The message aborted flag is read and written by I/O and core threads.
    Conversion from a naked bool to a sys_atomic ensures consistent state.
    
    This closes #1235
---
 src/message.c         | 33 ++++++++++++++++++++++-----------
 src/message_private.h |  2 +-
 tests/tsan.supp       |  3 ---
 3 files changed, 23 insertions(+), 15 deletions(-)

diff --git a/src/message.c b/src/message.c
index f345da3..37d8370 100644
--- a/src/message.c
+++ b/src/message.c
@@ -45,6 +45,10 @@
 #define LOCK   sys_mutex_lock
 #define UNLOCK sys_mutex_unlock
 
+// Implement bool flags with atomic variables
+#define    SET_FLAG(flag)  sys_atomic_set(flag, 1)
+#define IS_FLAG_SET(flag) (sys_atomic_get(flag) == 1)
+
 const char *STR_AMQP_NULL = "null";
 const char *STR_AMQP_TRUE = "T";
 const char *STR_AMQP_FALSE = "F";
@@ -1004,6 +1008,7 @@ qd_message_t *qd_message()
     ZERO(msg->content);
     msg->content->lock = sys_mutex();
     sys_atomic_init(&msg->content->ref_count, 1);
+    sys_atomic_init(&msg->content->aborted, 0);
     msg->content->parse_depth = QD_DEPTH_NONE;
     return (qd_message_t*) msg;
 }
@@ -1083,6 +1088,7 @@ void qd_message_free(qd_message_t *in_msg)
             qd_buffer_free(content->pending);
 
         sys_mutex_free(content->lock);
+        sys_atomic_destroy(&content->aborted);
         free_qd_message_content_t(content);
     }
 
@@ -1391,13 +1397,15 @@ qd_message_t *discard_receive(pn_delivery_t *delivery,
             break;
         } else if (rc == PN_EOS || rc < 0) {
             // End of message or error: finalize message_receive handling
-            msg->content->aborted = pn_delivery_aborted(delivery);
+            if (pn_delivery_aborted(delivery)) {
+                SET_FLAG(&msg->content->aborted);
+            }
             pn_record_t *record = pn_delivery_attachments(delivery);
             pn_record_set(record, PN_DELIVERY_CTX, 0);
             if (msg->content->oversize) {
                 // Aborting the content disposes of downstream copies.
                 // This has no effect on the received message.
-                msg->content->aborted = true;
+                SET_FLAG(&msg->content->aborted);
             }
             qd_message_set_receive_complete((qd_message_t*) msg);
             break;
@@ -1519,8 +1527,9 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
                 content->receive_complete = true;
                 content->q2_unblocker.handler = 0;
                 qd_nullify_safe_ptr(&content->q2_unblocker.context);
-                content->aborted = pn_delivery_aborted(delivery);
-
+                if (pn_delivery_aborted(delivery)) {
+                    SET_FLAG(&msg->content->aborted);
+                }
                 // unlink message and delivery
                 pn_record_set(record, PN_DELIVERY_CTX, 0);
             }
@@ -1764,11 +1773,11 @@ void qd_message_send(qd_message_t *in_msg,
 
     if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) {
 
-        if (content->aborted) {
+        if (IS_FLAG_SET(&content->aborted)) {
             // Message is aborted before any part of it has been sent.
             // Declare the message to be sent,
             msg->send_complete = true;
-            // the link has an outgoing deliver. abort it.
+            // If the outgoing delivery is not already aborted then abort it.
             if (!pn_delivery_aborted(pn_link_current(pnl))) {
                 pn_delivery_abort(pn_link_current(pnl));
             }
@@ -1871,7 +1880,7 @@ void qd_message_send(qd_message_t *in_msg,
     pn_session_t              *pns        = pn_link_session(pnl);
     const size_t               q3_upper   = BUFFER_SIZE * QD_QLIMIT_Q3_UPPER;
 
-    while (!content->aborted
+    while (!IS_FLAG_SET(&content->aborted)
            && buf
            && pn_session_outgoing_bytes(pns) < q3_upper) {
 
@@ -1893,7 +1902,7 @@ void qd_message_send(qd_message_t *in_msg,
             // send error - likely the link has failed and we will eventually
             // get a link detach event for this link
             //
-            content->aborted = true;
+            SET_FLAG(&content->aborted);
             msg->send_complete = true;
             if (!pn_delivery_aborted(pn_link_current(pnl))) {
                 pn_delivery_abort(pn_link_current(pnl));
@@ -1970,7 +1979,7 @@ void qd_message_send(qd_message_t *in_msg,
     if (q2_unblock.handler)
         q2_unblock.handler(q2_unblock.context);
 
-    if (content->aborted) {
+    if (IS_FLAG_SET(&content->aborted)) {
         if (pn_link_current(pnl)) {
             msg->send_complete = true;
             if (!pn_delivery_aborted(pn_link_current(pnl))) {
@@ -2903,7 +2912,9 @@ bool qd_message_is_Q2_blocked(const qd_message_t *msg)
 
 bool qd_message_aborted(const qd_message_t *msg)
 {
-    return ((qd_message_pvt_t *)msg)->content->aborted;
+    assert(msg);
+    qd_message_pvt_t * msg_pvt = (qd_message_pvt_t *)msg;
+    return IS_FLAG_SET(&msg_pvt->content->aborted);
 }
 
 void qd_message_set_aborted(const qd_message_t *msg)
@@ -2911,7 +2922,7 @@ void qd_message_set_aborted(const qd_message_t *msg)
     if (!msg)
         return;
     qd_message_pvt_t * msg_pvt = (qd_message_pvt_t *)msg;
-    msg_pvt->content->aborted = true;
+    SET_FLAG(&msg_pvt->content->aborted);
 }
 
 
diff --git a/src/message_private.h b/src/message_private.h
index ab2b69d..bb7970f 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -140,13 +140,13 @@ typedef struct {
     bool                 discard;                        // Should this 
message be discarded?
     bool                 receive_complete;               // true if the 
message has been completely received, false otherwise
     bool                 q2_input_holdoff;               // hold off calling 
pn_link_recv
-    bool                 aborted;                        // receive completed 
with abort flag set
     bool                 disable_q2_holdoff;             // Disable the Q2 
flow control
     bool                 priority_parsed;
     bool                 priority_present;
     bool                 oversize;                       // policy oversize 
handling in effect
     bool                 no_body;                        // Used for http2 
messages. If no_body is true, the HTTP request had no body
     uint8_t              priority;                       // The priority of 
this message
+    sys_atomic_t         aborted;
 } qd_message_content_t;
 
 struct qd_message_pvt_t {
diff --git a/tests/tsan.supp b/tests/tsan.supp
index b47c98e..8b2d294 100644
--- a/tests/tsan.supp
+++ b/tests/tsan.supp
@@ -59,9 +59,6 @@ race:tsan_reset_delivery_ids
 # DISPATCH-2135
 race:qd_message_Q2_holdoff_disable
 
-# DISPATCH-2136
-race:qd_message_set_aborted
-
 # DISPATCH-2137
 race:remote_sasl_process_init
 race:remote_sasl_prepare

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to