This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e13e0e  DISPATCH-2191: avoid counting header buffers for Q2
1e13e0e is described below

commit 1e13e0e7399186d50fb8d45ff0d3d8eb6ea9f1a6
Author: Kenneth Giusti <kgiu...@apache.org>
AuthorDate: Fri Jul 9 15:35:12 2021 -0400

    DISPATCH-2191: avoid counting header buffers for Q2
    
    This closes #1293
---
 include/qpid/dispatch/message.h | 20 ---------
 src/message.c                   | 81 +++++++++++++++++++++++-------------
 src/message_private.h           |  5 +++
 tests/message_test.c            | 91 +++++++++++++++++++++++++++++++++++++++--
 4 files changed, 146 insertions(+), 51 deletions(-)

diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index fdeb4b7..6072e6d 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -553,26 +553,6 @@ void qd_message_add_fanout(qd_message_t *in_msg,
 void qd_message_Q2_holdoff_disable(qd_message_t *msg);
 
 /**
- * Test if attempt to retrieve message data through qd_message_recv should 
block
- * due to Q2 input holdoff limit being exceeded. This message has enough
- * buffers in the internal buffer chain and any calls to to qd_message_receive
- * will not result in a call to pn_link_receive to retrieve more data.
- *
- * @param msg A pointer to the message
- */
-bool qd_message_Q2_holdoff_should_block(qd_message_t *msg);
-
-/**
- * Test if a message that is blocked by Q2 input holdoff has enough room
- * to begin receiving again. This message has transmitted and disposed of
- * enough buffers to begin receiving more data from the underlying proton link.
- *
- * @param msg A pointer to the message
- */
-bool qd_message_Q2_holdoff_should_unblock(qd_message_t *msg);
-
-
-/**
  * Check if a message has hit its Q2 limit and is currently blocked.
  * When blocked no further message data will be read from the link.
  *
diff --git a/src/message.c b/src/message.c
index e80c295..e867913 100644
--- a/src/message.c
+++ b/src/message.c
@@ -646,7 +646,8 @@ typedef enum {
     QD_SECTION_NEED_MORE  // not enough data in the buffer chain - try again
 } qd_section_status_t;
 
-static qd_section_status_t message_section_check_LH(qd_buffer_t         
**buffer,
+static qd_section_status_t message_section_check_LH(qd_message_content_t 
*content,
+                                                    qd_buffer_t         
**buffer,
                                                     unsigned char       
**cursor,
                                                     const unsigned char  
*pattern,
                                                     int                   
pattern_length,
@@ -770,6 +771,7 @@ static qd_section_status_t 
message_section_check_LH(qd_buffer_t         **buffer
         // the end.  And cursor + buffer will be null if the parsed section 
ends at
         // the end of the buffer chain, so be careful of that, too!
         //
+        bool buffers_protected = false;
         qd_buffer_t *start = *buffer;
         qd_buffer_t *last = test_buffer;
         if (last && last != start) {
@@ -781,10 +783,27 @@ static qd_section_status_t 
message_section_check_LH(qd_buffer_t         **buffer
 
         while (start) {
             qd_buffer_inc_fanout(start);
+            buffers_protected = true;
             if (start == last)
                 break;
             start = DEQ_NEXT(start);
         }
+
+        // DISPATCH-2191: protected buffers are never released - even after
+        // being sent - because they are referenced by the content->section_xxx
+        // location fields and remain valid for the life of the content
+        // instance.  Since these buffers are never freed they must not be
+        // included in the Q2 threshold check!
+        if (buffers_protected) {
+            content->protected_buffers = 0;
+            start = DEQ_HEAD(content->buffers);
+            while (start) {
+                ++content->protected_buffers;
+                if (start == last)
+                    break;
+                start = DEQ_NEXT(start);
+            }
+        }
     }
 
     location->parsed = 1;
@@ -1036,7 +1055,7 @@ void qd_message_free(qd_message_t *in_msg)
         // may be released
         assert(DEQ_IS_EMPTY(msg->stream_data_list));
 
-        const bool was_blocked = !qd_message_Q2_holdoff_should_unblock(in_msg);
+        const bool was_blocked = !_Q2_holdoff_should_unblock_LH(content);
         qd_buffer_t *buf = msg->cursor.buffer;
         while (buf) {
             qd_buffer_t *next_buf = DEQ_NEXT(buf);
@@ -1053,7 +1072,7 @@ void qd_message_free(qd_message_t *in_msg)
         //
         if (content->q2_input_holdoff
             && was_blocked
-            && qd_message_Q2_holdoff_should_unblock(in_msg)) {
+            && _Q2_holdoff_should_unblock_LH(content)) {
             content->q2_input_holdoff = false;
             q2_unblock = content->q2_unblocker;
         }
@@ -1558,7 +1577,7 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
                 qd_buffer_set_fanout(content->pending, content->fanout);
                 DEQ_INSERT_TAIL(content->buffers, content->pending);
                 content->pending = 0;
-                if (qd_message_Q2_holdoff_should_block((qd_message_t *)msg)) {
+                if (_Q2_holdoff_should_block_LH(content)) {
                     if (!qd_link_is_q2_limit_unbounded(qdl)) {
                         content->q2_input_holdoff = true;
                         UNLOCK(content->lock);
@@ -1944,7 +1963,7 @@ void qd_message_send(qd_message_t *in_msg,
                         // by freeing a buffer there now may be room to 
restart a
                         // stalled message receiver
                         if (content->q2_input_holdoff) {
-                            if 
(qd_message_Q2_holdoff_should_unblock((qd_message_t*) msg)) {
+                            if (_Q2_holdoff_should_unblock_LH(content)) {
                                 // wake up receive side
                                 // Note: clearing holdoff here is easy 
compared to
                                 // clearing it in the deferred callback. 
Tracing
@@ -2011,9 +2030,9 @@ static qd_message_depth_status_t 
message_check_depth_LH(qd_message_content_t *co
         return QD_MESSAGE_DEPTH_OK;
 
     qd_section_status_t rc;
-    rc = message_section_check_LH(&content->parse_buffer, 
&content->parse_cursor, short_pattern, SHORT, expected_tags, location, false, 
protect_buffer);
+    rc = message_section_check_LH(content, &content->parse_buffer, 
&content->parse_cursor, short_pattern, SHORT, expected_tags, location, false, 
protect_buffer);
     if (rc == QD_SECTION_NO_MATCH)  // try the alternative
-        rc = message_section_check_LH(&content->parse_buffer, 
&content->parse_cursor, long_pattern,  LONG,  expected_tags, location, false, 
protect_buffer);
+        rc = message_section_check_LH(content, &content->parse_buffer, 
&content->parse_cursor, long_pattern,  LONG,  expected_tags, location, false, 
protect_buffer);
 
     if (rc == QD_SECTION_MATCH || (optional && rc == QD_SECTION_NO_MATCH)) {
         content->parse_depth = depth;
@@ -2398,8 +2417,8 @@ int qd_message_extend(qd_message_t *msg, 
qd_composed_field_t *field, bool *q2_bl
     DEQ_APPEND(content->buffers, (*buffers));
     count = DEQ_SIZE(content->buffers);
 
-    // buffers added - much check for Q2:
-    if (qd_message_Q2_holdoff_should_block(msg)) {
+    // buffers added - must check for Q2:
+    if (_Q2_holdoff_should_block_LH(content)) {
         content->q2_input_holdoff = true;
         if (q2_blocked)
             *q2_blocked = true;
@@ -2639,7 +2658,7 @@ void 
qd_message_stream_data_release(qd_message_stream_data_t *stream_data)
 
     LOCK(content->lock);
 
-    bool                      was_blocked = 
!qd_message_Q2_holdoff_should_unblock((qd_message_t*) pvt);
+    bool                      was_blocked = 
!_Q2_holdoff_should_unblock_LH(content);
     qd_message_q2_unblocker_t q2_unblock  = {0};
 
     if (pvt->is_fanout) {
@@ -2670,7 +2689,7 @@ void 
qd_message_stream_data_release(qd_message_stream_data_t *stream_data)
     //
     if (content->q2_input_holdoff
         && was_blocked
-        && qd_message_Q2_holdoff_should_unblock((qd_message_t*) pvt)) {
+        && _Q2_holdoff_should_unblock_LH(content)) {
         content->q2_input_holdoff = false;
         q2_unblock = content->q2_unblocker;
     }
@@ -2741,14 +2760,16 @@ qd_message_stream_data_result_t 
qd_message_next_stream_data(qd_message_t *in_msg
 
     LOCK(content->lock);
 
-    section_status = message_section_check_LH(&msg->body_buffer, 
&msg->body_cursor,
+    section_status = message_section_check_LH(content,
+                                              &msg->body_buffer, 
&msg->body_cursor,
                                               BODY_DATA_SHORT, 3, TAGS_BINARY,
                                               &location,
                                               true,  // allow duplicates
                                               false);  // do not inc buffer 
fanout
     if (section_status == QD_SECTION_NO_MATCH) {
         is_footer      = true;
-        section_status = message_section_check_LH(&msg->body_buffer, 
&msg->body_cursor,
+        section_status = message_section_check_LH(content,
+                                                  &msg->body_buffer, 
&msg->body_cursor,
                                                   FOOTER_SHORT, 3, TAGS_MAP,
                                                   &location, true, false);
     }
@@ -2907,18 +2928,19 @@ void qd_message_Q2_holdoff_disable(qd_message_t *msg)
 }
 
 
-bool qd_message_Q2_holdoff_should_block(qd_message_t *msg)
+bool _Q2_holdoff_should_block_LH(const qd_message_content_t *content)
 {
-    if (!msg)
-        return false;
-    qd_message_pvt_t *msg_pvt = (qd_message_pvt_t*) msg;
-    return !msg_pvt->content->disable_q2_holdoff && 
DEQ_SIZE(msg_pvt->content->buffers) >= QD_QLIMIT_Q2_UPPER;
+    const size_t buff_ct = DEQ_SIZE(content->buffers);
+    assert(buff_ct >= content->protected_buffers);
+    return !content->disable_q2_holdoff && (buff_ct - 
content->protected_buffers) >= QD_QLIMIT_Q2_UPPER;
 }
 
 
-bool qd_message_Q2_holdoff_should_unblock(qd_message_t *msg)
+bool _Q2_holdoff_should_unblock_LH(const qd_message_content_t *content)
 {
-    return DEQ_SIZE(((qd_message_pvt_t*)msg)->content->buffers) < 
QD_QLIMIT_Q2_LOWER;
+    const size_t buff_ct = DEQ_SIZE(content->buffers);
+    assert(buff_ct >= content->protected_buffers);
+    return content->disable_q2_holdoff || (buff_ct - 
content->protected_buffers) < QD_QLIMIT_Q2_LOWER;
 }
 
 
@@ -2963,13 +2985,16 @@ int qd_message_stream_data_append(qd_message_t 
*message, qd_buffer_list_t *data,
     if (length == 0)
         return rc;
 
-    // DISPATCH-1803: ensure no body data section can violate the Q2 threshold.
-    // This allows the egress router to wait for an entire body data section
-    // to arrive and be validated before sending it out to the endpoint.
+    // DISPATCH-1803: ensure no body data section can exceed the
+    // QD_QLIMIT_Q2_LOWER.  This allows the egress router to wait for an entire
+    // body data section to arrive and be validated before sending it out to
+    // the endpoint without preventing Q2 from being relieved (DISPATCH-2191).
     //
-    while (length > QD_QLIMIT_Q2_LOWER) {
+    const size_t buf_limit = QD_QLIMIT_Q2_LOWER - 2;  // reserve 1 extra for 
performative header
+    assert(buf_limit);
+    while (length > buf_limit) {
         qd_buffer_t *buf = DEQ_HEAD(*data);
-        for (int i = 0; i < QD_QLIMIT_Q2_LOWER; ++i) {
+        for (int i = 0; i < buf_limit; ++i) {
             buf = DEQ_NEXT(buf);
         }
 
@@ -2981,14 +3006,14 @@ int qd_message_stream_data_append(qd_message_t 
*message, qd_buffer_list_t *data,
         DEQ_TAIL(*data) = DEQ_PREV(buf);
         DEQ_NEXT(DEQ_TAIL(*data)) = 0;
         DEQ_PREV(buf) = 0;
-        DEQ_SIZE(trailer) = length - QD_QLIMIT_Q2_LOWER;
-        DEQ_SIZE(*data) = QD_QLIMIT_Q2_LOWER;
+        DEQ_SIZE(trailer) = length - buf_limit;
+        DEQ_SIZE(*data) = buf_limit;
 
         field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field);
         qd_compose_insert_binary_buffers(field, data);
 
         DEQ_MOVE(trailer, *data);
-        length -= QD_QLIMIT_Q2_LOWER;
+        length -= buf_limit;
     }
 
     field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field);
diff --git a/src/message_private.h b/src/message_private.h
index bb7970f..cdb0e28 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -132,6 +132,7 @@ typedef struct {
     bool                 ma_stream;                      // indicates whether 
this message is streaming
     uint64_t             max_message_size;               // configured max; 0 
if no max to enforce
     uint64_t             bytes_received;                 // bytes returned by 
pn_link_recv() when enforcing max_message_size
+    size_t               protected_buffers;              // count of permanent 
buffers that hold the message headers
     uint32_t             fanout;                         // The number of 
receivers for this message, including in-process subscribers.
 
     qd_message_q2_unblocker_t q2_unblocker;              // callback and 
context to signal Q2 unblocked to receiver
@@ -179,6 +180,10 @@ void qd_message_initialize();
 #define QDR_MAX_PRIORITY     (QDR_N_PRIORITIES - 1)
 #define QDR_DEFAULT_PRIORITY  4
 
+// These expect content->lock to be locked.
+bool _Q2_holdoff_should_block_LH(const qd_message_content_t *content);
+bool _Q2_holdoff_should_unblock_LH(const qd_message_content_t *content);
+
 ///@}
 
 #endif
diff --git a/tests/message_test.c b/tests/message_test.c
index 64969dc..5e146da 100644
--- a/tests/message_test.c
+++ b/tests/message_test.c
@@ -484,11 +484,11 @@ static char* test_q2_input_holdoff_sensing(void *context)
         qd_message_content_t *content = MSG_CONTENT(msg);
 
         set_content_bufs(content, nbufs);
-        if (qd_message_Q2_holdoff_should_block(msg) != (nbufs >= 
QD_QLIMIT_Q2_UPPER)) {
+        if (_Q2_holdoff_should_block_LH(content) != (nbufs >= 
QD_QLIMIT_Q2_UPPER)) {
             qd_message_free(msg);
             return "qd_message_holdoff_would_block was miscalculated";
         }
-        if (qd_message_Q2_holdoff_should_unblock(msg) != (nbufs < 
QD_QLIMIT_Q2_LOWER)) {
+        if (_Q2_holdoff_should_unblock_LH(content) != (nbufs < 
QD_QLIMIT_Q2_LOWER)) {
             qd_message_free(msg);
             return "qd_message_holdoff_would_unblock was miscalculated";
         }
@@ -1053,7 +1053,7 @@ static char *test_check_stream_data_append(void * context)
             // smaller lists that are no bigger than QD_QLIMIT_Q2_LOWER buffers
             // long
             body_buffers += qd_message_stream_data_buffer_count(stream_data);
-            if (qd_message_stream_data_buffer_count(stream_data) > 
QD_QLIMIT_Q2_LOWER) {
+            if (qd_message_stream_data_buffer_count(stream_data) >= 
QD_QLIMIT_Q2_LOWER) {
                 result = "Body data list length too long!";
                 goto exit;
             }
@@ -1449,6 +1449,90 @@ exit:
 }
 
 
+// Ensure that the Q2 calculation does not include header buffers.  Header
+// buffers are held until the message is freed, so they should not be a factor
+// in flow control (DISPATCH-2191).
+//
+static char *test_q2_ignore_headers(void *context)
+{
+    char *result = 0;
+    qd_message_t *msg = qd_message();
+    qd_message_content_t *content = MSG_CONTENT(msg);
+
+    // create a message and add a bunch of headers.  Put each header in its own
+    // buffer to increase the buffer count.
+
+    qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+    qd_compose_start_list(field);
+    qd_compose_insert_bool(field, 0);     // durable
+    qd_compose_insert_null(field);        // priority
+    qd_compose_end_list(field);
+    qd_buffer_list_t field_buffers;
+    qd_compose_take_buffers(field, &field_buffers);
+    qd_compose_free(field);
+    content->buffers = field_buffers;
+
+    const qd_amqp_performative_t plist[3] = {
+        QD_PERFORMATIVE_DELIVERY_ANNOTATIONS,
+        QD_PERFORMATIVE_MESSAGE_ANNOTATIONS,
+        QD_PERFORMATIVE_APPLICATION_PROPERTIES};
+
+    for (int i = 0; i < 3; ++i) {
+        field = qd_compose(plist[i], 0);
+        qd_compose_start_map(field);
+        qd_compose_insert_symbol(field, "Key");
+        qd_compose_insert_string(field, "Value");
+        qd_compose_end_map(field);
+        qd_compose_take_buffers(field, &field_buffers);
+        qd_compose_free(field);
+        DEQ_APPEND(content->buffers, field_buffers);
+    }
+
+    // validate the message - this will mark the buffers that contain header
+    // data
+    if (qd_message_check_depth(msg, QD_DEPTH_APPLICATION_PROPERTIES) != 
QD_MESSAGE_DEPTH_OK) {
+        result = "Unexpected depth check failure";
+        goto exit;
+    }
+
+    const size_t header_ct = DEQ_SIZE(content->buffers);
+    assert(header_ct);
+    assert(!_Q2_holdoff_should_block_LH(content));
+
+    // Now append buffers until Q2 blocks
+    while (!_Q2_holdoff_should_block_LH(content)) {
+        qd_buffer_t *buffy = qd_buffer();
+        qd_buffer_insert(buffy, qd_buffer_capacity(buffy));
+        DEQ_INSERT_TAIL(content->buffers, buffy);
+    }
+
+    // expect: block occurs when length == QD_QLIMIT_Q2_UPPER + header_ct
+    if (DEQ_SIZE(content->buffers) != QD_QLIMIT_Q2_UPPER + header_ct) {
+        result = "Wrong buffer length for Q2 activate!";
+        goto exit;
+    }
+
+    // now remove buffers until Q2 is relieved
+
+    while (!_Q2_holdoff_should_unblock_LH(content)) {
+        qd_buffer_t *buffy = DEQ_TAIL(content->buffers);
+        DEQ_REMOVE_TAIL(content->buffers);
+        qd_buffer_free(buffy);
+    }
+
+    // expect: Q2 deactivates when list length < QD_QDLIMIT_Q2_LOWER + 
header_ct
+    if (DEQ_SIZE(content->buffers) != (QD_QLIMIT_Q2_LOWER + header_ct) - 1) {
+        result = "Wrong buffer length for Q2 deactivate!";
+        goto exit;
+    }
+
+exit:
+
+    qd_message_free(msg);
+    return result;
+}
+
+
 int message_tests(void)
 {
     int result = 0;
@@ -1467,6 +1551,7 @@ int message_tests(void)
     TEST_CASE(test_check_stream_data_fanout, 0);
     TEST_CASE(test_check_stream_data_footer, 0);
     TEST_CASE(test_q2_callback_on_disable, 0);
+    TEST_CASE(test_q2_ignore_headers, 0);
 
     return result;
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to