This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new fc745cd  DISPATCH-2166: Improve message, message_content multithread 
access correctness
fc745cd is described below

commit fc745cdf9759d3d2bf0bd1aea752a81e401a3653
Author: Chuck Rolke <c...@apache.org>
AuthorDate: Wed Jul 21 16:32:02 2021 -0400

    DISPATCH-2166: Improve message, message_content multithread access 
correctness
    
    Many message and message_content variables have implicit thread ownership.
    
     * Each message has only one receiver that creates the content.
     * Each message content may have many senders that consume the content.
    
    Some variables are "owned" by the message_receive thread and are never
    read nor written by the message_send threads. And other variables are owned
    by the message send threads.
    
    Then more variables are shared among all the senders and receiver. This
    patch addresses the shared variables.
    
     * Unsuppress tsan qd_message_set_send_complete
     * Convert many bools to sys_atomic_t; adjust access methods
     * Add locking to some variable accesses
    
    This closes #1308
---
 include/qpid/dispatch/atomic.h |   7 +--
 src/message.c                  | 111 +++++++++++++++++++++++++----------------
 src/message_private.h          |  79 +++++++++++++++--------------
 tests/message_test.c           |  12 ++---
 tests/tsan.supp                |   3 --
 5 files changed, 119 insertions(+), 93 deletions(-)

diff --git a/include/qpid/dispatch/atomic.h b/include/qpid/dispatch/atomic.h
index 9eb09d6..32f4967 100644
--- a/include/qpid/dispatch/atomic.h
+++ b/include/qpid/dispatch/atomic.h
@@ -205,10 +205,11 @@ static inline void sys_atomic_destroy(sys_atomic_t *ref)
 
 #endif
 
-#define    SET_ATOMIC_FLAG(flag)   sys_atomic_set(flag, 1)
-#define    CLEAR_ATOMIC_FLAG(flag) sys_atomic_set(flag, 0)
+#define    SET_ATOMIC_FLAG(flag)        sys_atomic_set((flag), 1)
+#define  CLEAR_ATOMIC_FLAG(flag)        sys_atomic_set((flag), 0)
+#define    SET_ATOMIC_BOOL(flag, value) sys_atomic_set((flag), ((value) ? 1 : 
0))
 
-#define IS_ATOMIC_FLAG_SET(flag) (sys_atomic_get(flag) == 1)
+#define IS_ATOMIC_FLAG_SET(flag)       (sys_atomic_get(flag) == 1)
 
 /** Atomic increase: NOTE returns value *before* increase, like i++ */
 static inline uint32_t sys_atomic_inc(sys_atomic_t *ref) { return 
sys_atomic_add((ref), 1); }
diff --git a/src/message.c b/src/message.c
index e867913..2d8d579 100644
--- a/src/message.c
+++ b/src/message.c
@@ -926,8 +926,7 @@ static void qd_message_parse_priority(qd_message_t *in_msg)
     qd_message_content_t *content  = MSG_CONTENT(in_msg);
     qd_iterator_t        *iter     = qd_message_field_iterator(in_msg, 
QD_FIELD_HEADER);
 
-    content->priority_parsed  = true;
-    content->priority_present = false;
+    SET_ATOMIC_FLAG(&content->priority_parsed);
 
     if (!!iter) {
         qd_parsed_field_t *field = qd_parse(iter);
@@ -936,8 +935,8 @@ static void qd_message_parse_priority(qd_message_t *in_msg)
                 qd_parsed_field_t *priority_field = qd_parse_sub_value(field, 
1);
                 if (qd_parse_tag(priority_field) != QD_AMQP_NULL) {
                     uint32_t value = qd_parse_as_uint(priority_field);
-                    content->priority = value > QDR_MAX_PRIORITY ? 
QDR_MAX_PRIORITY : (uint8_t) (value & 0x00ff);
-                    content->priority_present = true;
+                    value = MIN(value, QDR_MAX_PRIORITY);
+                    sys_atomic_set(&content->priority, value);
                 }
             }
         }
@@ -1022,8 +1021,15 @@ qd_message_t *qd_message()
 
     ZERO(msg->content);
     msg->content->lock = sys_mutex();
-    sys_atomic_init(&msg->content->ref_count, 1);
     sys_atomic_init(&msg->content->aborted, 0);
+    sys_atomic_init(&msg->content->discard, 0);
+    sys_atomic_init(&msg->content->ma_stream, 0);
+    sys_atomic_init(&msg->content->no_body, 0);
+    sys_atomic_init(&msg->content->oversize, 0);
+    sys_atomic_init(&msg->content->priority, QDR_DEFAULT_PRIORITY);
+    sys_atomic_init(&msg->content->priority_parsed, 0);
+    sys_atomic_init(&msg->content->receive_complete, 0);
+    sys_atomic_init(&msg->content->ref_count, 1);
     msg->content->parse_depth = QD_DEPTH_NONE;
     return (qd_message_t*) msg;
 }
@@ -1040,6 +1046,8 @@ void qd_message_free(qd_message_t *in_msg)
     qd_buffer_list_free_buffers(&msg->ma_trace);
     qd_buffer_list_free_buffers(&msg->ma_ingress);
 
+    sys_atomic_destroy(&msg->send_complete);
+
     qd_message_content_t *content = msg->content;
 
     if (msg->is_fanout) {
@@ -1104,6 +1112,14 @@ void qd_message_free(qd_message_t *in_msg)
 
         sys_mutex_free(content->lock);
         sys_atomic_destroy(&content->aborted);
+        sys_atomic_destroy(&content->discard);
+        sys_atomic_destroy(&content->ma_stream);
+        sys_atomic_destroy(&content->no_body);
+        sys_atomic_destroy(&content->oversize);
+        sys_atomic_destroy(&content->priority);
+        sys_atomic_destroy(&content->priority_parsed);
+        sys_atomic_destroy(&content->receive_complete);
+        sys_atomic_destroy(&content->ref_count);
         free_qd_message_content_t(content);
     }
 
@@ -1133,7 +1149,7 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg)
     copy->sent_depth    = QD_DEPTH_NONE;
     copy->cursor.buffer = 0;
     copy->cursor.cursor = 0;
-    copy->send_complete = false;
+    sys_atomic_init(&copy->send_complete, 0);
     copy->tag_sent      = false;
     copy->is_fanout     = false;
 
@@ -1186,7 +1202,7 @@ void qd_message_message_annotations(qd_message_t *in_msg)
     }
 
     if (ma_pf_stream) {
-        content->ma_stream = qd_parse_as_int(ma_pf_stream);
+        SET_ATOMIC_BOOL(&content->ma_stream, qd_parse_as_int(ma_pf_stream));
         qd_parse_free(ma_pf_stream);
     }
 
@@ -1225,7 +1241,7 @@ int qd_message_get_phase_annotation(const qd_message_t 
*in_msg)
 void qd_message_set_stream_annotation(qd_message_t *in_msg, bool stream)
 {
     qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
-    msg->content->ma_stream = stream;
+    SET_ATOMIC_BOOL(&msg->content->ma_stream, stream);
 }
 
 void qd_message_set_ingress_annotation(qd_message_t *in_msg, 
qd_composed_field_t *ingress_field)
@@ -1241,7 +1257,7 @@ bool qd_message_is_discard(qd_message_t *msg)
     if (!msg)
         return false;
     qd_message_pvt_t *pvt_msg = (qd_message_pvt_t*) msg;
-    return pvt_msg->content->discard;
+    return IS_ATOMIC_FLAG_SET(&pvt_msg->content->discard);
 }
 
 void qd_message_set_discard(qd_message_t *msg, bool discard)
@@ -1250,7 +1266,7 @@ void qd_message_set_discard(qd_message_t *msg, bool 
discard)
         return;
 
     qd_message_pvt_t *pvt_msg = (qd_message_pvt_t*) msg;
-    pvt_msg->content->discard = discard;
+    SET_ATOMIC_BOOL(&pvt_msg->content->discard, discard);
 }
 
 
@@ -1304,10 +1320,10 @@ uint8_t qd_message_get_priority(qd_message_t *msg)
 {
     qd_message_content_t *content = MSG_CONTENT(msg);
 
-    if (!content->priority_parsed)
+    if (!IS_ATOMIC_FLAG_SET(&content->priority_parsed))
         qd_message_parse_priority(msg);
 
-    return content->priority_present ? content->priority : 
QDR_DEFAULT_PRIORITY;
+    return sys_atomic_get(&content->priority);
 }
 
 bool qd_message_receive_complete(qd_message_t *in_msg)
@@ -1315,7 +1331,7 @@ bool qd_message_receive_complete(qd_message_t *in_msg)
     if (!in_msg)
         return false;
     qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
-    return msg->content->receive_complete;
+    return IS_ATOMIC_FLAG_SET(&msg->content->receive_complete);
 }
 
 
@@ -1325,7 +1341,7 @@ bool qd_message_send_complete(qd_message_t *in_msg)
         return false;
 
     qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
-    return msg->send_complete;
+    return IS_ATOMIC_FLAG_SET(&msg->send_complete);
 }
 
 
@@ -1333,7 +1349,7 @@ void qd_message_set_send_complete(qd_message_t *in_msg)
 {
     if (!!in_msg) {
         qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
-        msg->send_complete = true;
+        SET_ATOMIC_FLAG(&msg->send_complete);
     }
 }
 
@@ -1346,7 +1362,7 @@ void qd_message_set_receive_complete(qd_message_t *in_msg)
 
         LOCK(content->lock);
 
-        content->receive_complete = true;
+        SET_ATOMIC_FLAG(&content->receive_complete);
         if (content->q2_input_holdoff) {
             content->q2_input_holdoff = false;
             q2_unblock = content->q2_unblocker;
@@ -1365,7 +1381,7 @@ void qd_message_set_no_body(qd_message_t *in_msg)
 {
     if (!!in_msg) {
         qd_message_content_t *content = MSG_CONTENT(in_msg);
-        content->no_body = true;
+        SET_ATOMIC_FLAG(&content->no_body);
     }
 }
 
@@ -1373,7 +1389,7 @@ bool qd_message_no_body(qd_message_t *in_msg)
 {
     if (!!in_msg) {
         qd_message_content_t *content = MSG_CONTENT(in_msg);
-        return content->no_body;
+        return IS_ATOMIC_FLAG_SET(&content->no_body);
     }
 
     return false;
@@ -1425,7 +1441,7 @@ qd_message_t *discard_receive(pn_delivery_t *delivery,
             }
             pn_record_t *record = pn_delivery_attachments(delivery);
             pn_record_set(record, PN_DELIVERY_CTX, 0);
-            if (msg->content->oversize) {
+            if (IS_ATOMIC_FLAG_SET(&msg->content->oversize)) {
                 // Aborting the content disposes of downstream copies.
                 // This has no effect on the received message.
                 SET_ATOMIC_FLAG(&msg->content->aborted);
@@ -1499,7 +1515,7 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
     // but not process the message for delivery.
     // Oversize messages are also discarded.
     //
-    if (msg->content->discard) {
+    if (IS_ATOMIC_FLAG_SET(&msg->content->discard)) {
         return discard_receive(delivery, link, (qd_message_t *)msg);
     }
 
@@ -1509,11 +1525,14 @@ qd_message_t *qd_message_receive(pn_delivery_t 
*delivery)
     //      have been processed and freed by outbound processing then
     //      message holdoff is cleared and receiving may continue.
     //
+    LOCK(msg->content->lock);
     if (!qd_link_is_q2_limit_unbounded(qdl) && 
!msg->content->disable_q2_holdoff) {
         if (msg->content->q2_input_holdoff) {
+            UNLOCK(msg->content->lock);
             return (qd_message_t*)msg;
         }
     }
+    UNLOCK(msg->content->lock);
 
     // Loop until msg is complete, error seen, or incoming bytes are consumed
     qd_message_content_t *content = msg->content;
@@ -1616,8 +1635,8 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
                     qd_connection_t *conn = qd_link_connection(qdl);
                     qd_connection_log_policy_denial(qdl, "DENY AMQP Transfer 
maxMessageSize exceeded");
                     qd_policy_count_max_size_event(link, conn);
-                    content->discard = true;
-                    content->oversize = true;
+                    SET_ATOMIC_FLAG(&content->discard);
+                    SET_ATOMIC_FLAG(&content->oversize);
                     return discard_receive(delivery, link, (qd_message_t*)msg);
                 }
             }
@@ -1690,7 +1709,7 @@ static void 
compose_message_annotations_v1(qd_message_pvt_t *msg, qd_buffer_list
         !DEQ_IS_EMPTY(msg->ma_trace) ||
         !DEQ_IS_EMPTY(msg->ma_ingress) ||
         msg->ma_phase != 0 ||
-        msg->content->ma_stream) {
+        IS_ATOMIC_FLAG_SET(&msg->content->ma_stream)) {
 
         if (!map_started) {
             qd_compose_start_map(out_ma);
@@ -1721,9 +1740,9 @@ static void 
compose_message_annotations_v1(qd_message_pvt_t *msg, qd_buffer_list
             field_count++;
         }
 
-        if (msg->content->ma_stream) {
+        if (IS_ATOMIC_FLAG_SET(&msg->content->ma_stream)) {
             qd_compose_insert_symbol(field, QD_MA_STREAM);
-            qd_compose_insert_int(field, msg->content->ma_stream);
+            qd_compose_insert_int(field, 1);
             field_count++;
         }
         // pad out to N fields
@@ -1799,7 +1818,7 @@ void qd_message_send(qd_message_t *in_msg,
         if (IS_ATOMIC_FLAG_SET(&content->aborted)) {
             // Message is aborted before any part of it has been sent.
             // Declare the message to be sent,
-            msg->send_complete = true;
+            SET_ATOMIC_FLAG(&msg->send_complete);
             // If the outgoing delivery is not already aborted then abort it.
             if (!pn_delivery_aborted(pn_link_current(pnl))) {
                 pn_delivery_abort(pn_link_current(pnl));
@@ -1926,7 +1945,7 @@ void qd_message_send(qd_message_t *in_msg,
             // get a link detach event for this link
             //
             SET_ATOMIC_FLAG(&content->aborted);
-            msg->send_complete = true;
+            SET_ATOMIC_FLAG(&msg->send_complete);
             if (!pn_delivery_aborted(pn_link_current(pnl))) {
                 pn_delivery_abort(pn_link_current(pnl));
             }
@@ -1979,7 +1998,7 @@ void qd_message_send(qd_message_t *in_msg,
                     msg->cursor.buffer = next_buf;
                     msg->cursor.cursor = (next_buf) ? qd_buffer_base(next_buf) 
: 0;
 
-                    msg->send_complete = (complete && !next_buf);
+                    SET_ATOMIC_BOOL(&msg->send_complete, (complete && 
!next_buf));
                 }
 
                 buf = next_buf;
@@ -2004,7 +2023,7 @@ void qd_message_send(qd_message_t *in_msg,
 
     if (IS_ATOMIC_FLAG_SET(&content->aborted)) {
         if (pn_link_current(pnl)) {
-            msg->send_complete = true;
+            SET_ATOMIC_FLAG(&msg->send_complete);
             if (!pn_delivery_aborted(pn_link_current(pnl))) {
                 pn_delivery_abort(pn_link_current(pnl));
             }
@@ -2040,7 +2059,7 @@ static qd_message_depth_status_t 
message_check_depth_LH(qd_message_content_t *co
     }
 
     if (rc == QD_SECTION_NEED_MORE) {
-        if (!content->receive_complete)
+        if (!IS_ATOMIC_FLAG_SET(&content->receive_complete))
             return QD_MESSAGE_DEPTH_INCOMPLETE;
 
         // no more data is going to come. OK if at the end and optional:
@@ -2066,7 +2085,7 @@ static qd_message_depth_status_t 
qd_message_check_LH(qd_message_content_t *conte
 
     qd_buffer_t *buffer  = DEQ_HEAD(content->buffers);
     if (!buffer) {
-        return content->receive_complete ? QD_MESSAGE_DEPTH_INVALID : 
QD_MESSAGE_DEPTH_INCOMPLETE;
+        return IS_ATOMIC_FLAG_SET(&content->receive_complete) ? 
QD_MESSAGE_DEPTH_INVALID : QD_MESSAGE_DEPTH_INCOMPLETE;
     }
 
     if (content->parse_buffer == 0) {
@@ -2295,7 +2314,7 @@ void qd_message_compose_1(qd_message_t *msg, const char 
*to, qd_buffer_list_t *b
 {
     qd_composed_field_t  *field   = qd_compose(QD_PERFORMATIVE_HEADER, 0);
     qd_message_content_t *content = MSG_CONTENT(msg);
-    content->receive_complete     = true;
+    SET_ATOMIC_FLAG(&content->receive_complete);
 
     qd_compose_start_list(field);
     qd_compose_insert_bool(field, 0);     // durable
@@ -2347,7 +2366,7 @@ void qd_message_compose_2(qd_message_t *msg, 
qd_composed_field_t *field, bool co
     qd_buffer_list_t     *field_buffers = qd_compose_buffers(field);
 
     content->buffers          = *field_buffers;
-    content->receive_complete = complete;
+    SET_ATOMIC_BOOL(&content->receive_complete, complete);
 
     DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers.
 }
@@ -2356,7 +2375,7 @@ void qd_message_compose_2(qd_message_t *msg, 
qd_composed_field_t *field, bool co
 void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *field1, 
qd_composed_field_t *field2, bool receive_complete)
 {
     qd_message_content_t *content        = MSG_CONTENT(msg);
-    content->receive_complete            = receive_complete;
+    SET_ATOMIC_BOOL(&content->receive_complete, receive_complete);
     qd_buffer_list_t     *field1_buffers = qd_compose_buffers(field1);
     qd_buffer_list_t     *field2_buffers = qd_compose_buffers(field2);
 
@@ -2369,7 +2388,7 @@ void qd_message_compose_3(qd_message_t *msg, 
qd_composed_field_t *field1, qd_com
 void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *field1, 
qd_composed_field_t *field2, qd_composed_field_t *field3, bool receive_complete)
 {
     qd_message_content_t *content        = MSG_CONTENT(msg);
-    content->receive_complete            = receive_complete;
+    SET_ATOMIC_BOOL(&content->receive_complete, receive_complete);
     qd_buffer_list_t     *field1_buffers = qd_compose_buffers(field1);
     qd_buffer_list_t     *field2_buffers = qd_compose_buffers(field2);
     qd_buffer_list_t     *field3_buffers = qd_compose_buffers(field3);
@@ -2383,7 +2402,7 @@ void qd_message_compose_4(qd_message_t *msg, 
qd_composed_field_t *field1, qd_com
 void qd_message_compose_5(qd_message_t *msg, qd_composed_field_t *field1, 
qd_composed_field_t *field2, qd_composed_field_t *field3, qd_composed_field_t 
*field4, bool receive_complete)
 {
     qd_message_content_t *content        = MSG_CONTENT(msg);
-    content->receive_complete            = receive_complete;
+    SET_ATOMIC_BOOL(&content->receive_complete, receive_complete);
     qd_buffer_list_t     *field1_buffers = qd_compose_buffers(field1);
     qd_buffer_list_t     *field2_buffers = qd_compose_buffers(field2);
     qd_buffer_list_t     *field3_buffers = qd_compose_buffers(field3);
@@ -2803,10 +2822,8 @@ qd_message_stream_data_result_t 
qd_message_next_stream_data(qd_message_t *in_msg
         break;
 
     case QD_SECTION_NEED_MORE:
-        if (msg->content->receive_complete)
-            result = QD_MESSAGE_STREAM_DATA_NO_MORE;
-        else
-            result = QD_MESSAGE_STREAM_DATA_INCOMPLETE;
+        result = IS_ATOMIC_FLAG_SET(&msg->content->receive_complete) ?
+            QD_MESSAGE_STREAM_DATA_NO_MORE : QD_MESSAGE_STREAM_DATA_INCOMPLETE;
         break;
     }
 
@@ -2901,7 +2918,8 @@ int qd_message_get_phase_val(qd_message_t *msg)
 
 int qd_message_is_streaming(qd_message_t *msg)
 {
-    return ((qd_message_pvt_t*) msg)->content->ma_stream;
+    qd_message_pvt_t *msg_pvt = (qd_message_pvt_t *)msg;
+    return IS_ATOMIC_FLAG_SET(&msg_pvt->content->ma_stream);
 }
 
 
@@ -2946,7 +2964,14 @@ bool _Q2_holdoff_should_unblock_LH(const 
qd_message_content_t *content)
 
 bool qd_message_is_Q2_blocked(const qd_message_t *msg)
 {
-    return ((const qd_message_pvt_t*)msg)->content->q2_input_holdoff;
+    qd_message_pvt_t     *msg_pvt = (qd_message_pvt_t*) msg;
+    qd_message_content_t *content = msg_pvt->content;
+
+    bool blocked;
+    LOCK(content->lock);
+    blocked = content->q2_input_holdoff;
+    UNLOCK(content->lock);
+    return blocked;
 }
 
 
@@ -2969,7 +2994,7 @@ void qd_message_set_aborted(const qd_message_t *msg)
 bool qd_message_oversize(const qd_message_t *msg)
 {
     qd_message_content_t * mc = MSG_CONTENT(msg);
-    return mc->oversize;
+    return IS_ATOMIC_FLAG_SET(&mc->oversize);
 }
 
 
diff --git a/src/message_private.h b/src/message_private.h
index cdb0e28..e7217e8 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -115,57 +115,60 @@ typedef struct {
     qd_field_location_t  field_group_sequence;
     qd_field_location_t  field_reply_to_group_id;
 
-    qd_buffer_t         *parse_buffer;                    // Pointer to the 
buffer where parsing should resume, if needed
-    unsigned char       *parse_cursor;                    // Pointer to octet 
in parse_buffer where parsing should resume, if needed
-    qd_message_depth_t   parse_depth;                     // The depth to 
which this message content has been parsed
-    qd_iterator_t       *ma_field_iter_in;                // 'message field 
iterator' for msg.FIELD_MESSAGE_ANNOTATION
+    qd_buffer_t         *parse_buffer;                    // Buffer where 
parsing should resume
+    unsigned char       *parse_cursor;                    // Octet in 
parse_buffer where parsing should resume
+    qd_message_depth_t   parse_depth;                     // Depth to which 
message content has been parsed
+    qd_iterator_t       *ma_field_iter_in;                // Iter for 
msg.FIELD_MESSAGE_ANNOTATION
 
     qd_iterator_pointer_t ma_user_annotation_blob;        // Original user 
annotations
-                                                          // with router 
annotations stripped
+                                                          //  with router 
annotations stripped
     uint32_t             ma_count;                        // Number of map 
elements in blob
-                                                          // after the router 
fields stripped
+                                                          //  after router 
fields stripped
     qd_parsed_field_t   *ma_pf_ingress;
     qd_parsed_field_t   *ma_pf_phase;
     qd_parsed_field_t   *ma_pf_to_override;
     qd_parsed_field_t   *ma_pf_trace;
     int                  ma_int_phase;
-    bool                 ma_stream;                      // indicates whether 
this message is streaming
-    uint64_t             max_message_size;               // configured max; 0 
if no max to enforce
-    uint64_t             bytes_received;                 // bytes returned by 
pn_link_recv() when enforcing max_message_size
-    size_t               protected_buffers;              // count of permanent 
buffers that hold the message headers
-    uint32_t             fanout;                         // The number of 
receivers for this message, including in-process subscribers.
-
-    qd_message_q2_unblocker_t q2_unblocker;              // callback and 
context to signal Q2 unblocked to receiver
-
-    bool                 ma_parsed;                      // have parsed 
annotations in incoming message
-    bool                 discard;                        // Should this 
message be discarded?
-    bool                 receive_complete;               // true if the 
message has been completely received, false otherwise
-    bool                 q2_input_holdoff;               // hold off calling 
pn_link_recv
-    bool                 disable_q2_holdoff;             // Disable the Q2 
flow control
-    bool                 priority_parsed;
-    bool                 priority_present;
-    bool                 oversize;                       // policy oversize 
handling in effect
-    bool                 no_body;                        // Used for http2 
messages. If no_body is true, the HTTP request had no body
-    uint8_t              priority;                       // The priority of 
this message
-    sys_atomic_t         aborted;
+    sys_atomic_t         ma_stream;                      // Message is 
streaming
+    uint64_t             max_message_size;               // Configured max; 0 
if no max to enforce
+    uint64_t             bytes_received;                 // Bytes returned by 
pn_link_recv()
+                                                         //  when enforcing 
max_message_size
+    size_t               protected_buffers;              // Count of permanent 
buffers that hold message headers
+    uint32_t             fanout;                         // Number of 
receivers for this message
+                                                         //  including 
in-process subscribers.
+
+    qd_message_q2_unblocker_t q2_unblocker;              // Callback and 
context to signal Q2 unblocked to receiver
+
+    bool                 ma_parsed;                      // Have parsed 
incoming message annotations message
+    sys_atomic_t         discard;                        // Message is being 
discarded
+    sys_atomic_t         receive_complete;               // Message has been 
completely received
+    bool                 q2_input_holdoff;               // Q2 state: hold off 
calling pn_link_recv
+    bool                 disable_q2_holdoff;             // Disable Q2 flow 
control
+    sys_atomic_t         priority_parsed;                // Message priority 
has been parsed
+    sys_atomic_t         oversize;                       // Policy 
oversize-message handling in effect
+    sys_atomic_t         no_body;                        // HTTP2 request has 
no body
+    sys_atomic_t         priority;                       // Message AMQP 
priority
+    sys_atomic_t         aborted;                        // Message has been 
aborted
 } qd_message_content_t;
 
 struct qd_message_pvt_t {
-    qd_iterator_pointer_t          cursor;          // A pointer to the 
current location of the outgoing byte stream.
-    qd_message_depth_t             message_depth;   // What is the depth of 
the message that has been received so far
-    qd_message_depth_t             sent_depth;      // How much of the message 
has been sent?  QD_DEPTH_NONE means nothing has been sent so far, 
QD_DEPTH_HEADER means the header has already been sent, dont send it again and 
so on.
-    qd_message_content_t          *content;         // The actual content of 
the message. The content is never copied
-    qd_buffer_list_t               ma_to_override;  // to field in outgoing 
message annotations.
-    qd_buffer_list_t               ma_trace;        // trace list in outgoing 
message annotations
-    qd_buffer_list_t               ma_ingress;      // ingress field in 
outgoing message annotations
-    int                            ma_phase;        // phase for the override 
address
-    qd_message_stream_data_list_t  stream_data_list;  // TODO - move this to 
the content for one-time parsing (TLR)
-    unsigned char                 *body_cursor;     // tracks the point in the 
content buffer chain
-    qd_buffer_t                   *body_buffer;     // to parse the next body 
data section (if it exists)
+    qd_iterator_pointer_t          cursor;          // Pointer to current 
location of outgoing byte stream.
+    qd_message_depth_t             message_depth;   // Depth of incoming 
received message
+    qd_message_depth_t             sent_depth;      // Depth of outgoing sent 
message
+    qd_message_content_t          *content;         // Singleton content 
shared by reference between
+                                                    //  incoming and all 
outgoing copies
+    qd_buffer_list_t               ma_to_override;  // To field in outgoing 
message annotations.
+    qd_buffer_list_t               ma_trace;        // Trace list in outgoing 
message annotations
+    qd_buffer_list_t               ma_ingress;      // Ingress field in 
outgoing message annotations
+    int                            ma_phase;        // Phase for override 
address
+    qd_message_stream_data_list_t  stream_data_list;// Stream data parse 
structure
+                                                    // TODO - move this to the 
content for one-time parsing (TLR)
+    unsigned char                 *body_cursor;     // Stream: tracks the 
point in the content buffer chain
+    qd_buffer_t                   *body_buffer;     // Stream: to parse the 
next body data section, if any
     bool                           strip_annotations_in;
-    bool                           send_complete;   // Has the message been 
completely received and completely sent?
+    sys_atomic_t                   send_complete;   // Message has been been 
completely sent
     bool                           tag_sent;        // Tags are sent
-    bool                           is_fanout;       // If msg is an outgoing 
fanout
+    bool                           is_fanout;       // Message is an outgoing 
fanout
 };
 
 ALLOC_DECLARE(qd_message_t);
diff --git a/tests/message_test.c b/tests/message_test.c
index 5e146da..242f819 100644
--- a/tests/message_test.c
+++ b/tests/message_test.c
@@ -63,7 +63,7 @@ static void set_content(qd_message_content_t *content, 
unsigned char *buffer, si
         qd_buffer_insert(buf, segment);
         DEQ_INSERT_TAIL(content->buffers, buf);
     }
-    content->receive_complete = true;
+    SET_ATOMIC_FLAG(&content->receive_complete);
 }
 
 
@@ -593,7 +593,7 @@ static char *test_incomplete_annotations(void *context)
     msg = qd_message();
     qd_message_content_t *content = MSG_CONTENT(msg);
     set_content(content, buffer, 100);
-    content->receive_complete = false;   // more data coming!
+    CLEAR_ATOMIC_FLAG(&content->receive_complete);   // more data coming!
     if (qd_message_check_depth(msg, QD_DEPTH_MESSAGE_ANNOTATIONS) != 
QD_MESSAGE_DEPTH_INCOMPLETE) {
         result = "Error: incomplete message was not detected!";
         goto exit;
@@ -626,7 +626,7 @@ static char *test_check_weird_messages(void *context)
                               0xc1, 0x01, 0x00};
     // first test an incomplete pattern:
     set_content(MSG_CONTENT(msg), da_map, 4);
-    MSG_CONTENT(msg)->receive_complete = false;
+    CLEAR_ATOMIC_FLAG(&(MSG_CONTENT(msg)->receive_complete));
     qd_message_depth_status_t mc = qd_message_check_depth(msg, 
QD_DEPTH_DELIVERY_ANNOTATIONS);
     if (mc != QD_MESSAGE_DEPTH_INCOMPLETE) {
         result = "Expected INCOMPLETE status";
@@ -635,7 +635,7 @@ static char *test_check_weird_messages(void *context)
 
     // full pattern, but no tag
     set_content(MSG_CONTENT(msg), &da_map[4], 6);
-    MSG_CONTENT(msg)->receive_complete = false;
+    CLEAR_ATOMIC_FLAG(&(MSG_CONTENT(msg)->receive_complete));
     mc = qd_message_check_depth(msg, QD_DEPTH_DELIVERY_ANNOTATIONS);
     if (mc != QD_MESSAGE_DEPTH_INCOMPLETE) {
         result = "Expected INCOMPLETE status";
@@ -644,7 +644,7 @@ static char *test_check_weird_messages(void *context)
 
     // add tag, but incomplete field:
     set_content(MSG_CONTENT(msg), &da_map[10], 1);
-    MSG_CONTENT(msg)->receive_complete = false;
+    CLEAR_ATOMIC_FLAG(&(MSG_CONTENT(msg)->receive_complete));
     mc = qd_message_check_depth(msg, QD_DEPTH_DELIVERY_ANNOTATIONS);
     if (mc != QD_MESSAGE_DEPTH_INCOMPLETE) {
         result = "Expected INCOMPLETE status";
@@ -664,7 +664,7 @@ static char *test_check_weird_messages(void *context)
     qd_message_free(msg);
     msg = qd_message();
     set_content(MSG_CONTENT(msg), bad_hdr, sizeof(bad_hdr));
-    MSG_CONTENT(msg)->receive_complete = false;
+    CLEAR_ATOMIC_FLAG(&(MSG_CONTENT(msg)->receive_complete));
     mc = qd_message_check_depth(msg, QD_DEPTH_DELIVERY_ANNOTATIONS); // 
looking _past_ header!
     if (mc != QD_MESSAGE_DEPTH_INVALID) {
         result = "Bad tag not detected!";
diff --git a/tests/tsan.supp b/tests/tsan.supp
index 4312eae..9196c62 100644
--- a/tests/tsan.supp
+++ b/tests/tsan.supp
@@ -79,9 +79,6 @@ race:qdr_delivery_mcast_outbound_disposition_CT
 # DISPATCH-2157
 race:^qd_message_send$
 
-# DISPATCH-2166
-race:qd_message_set_send_complete
-
 #
 # External libraries
 #

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to