This is an automated email from the ASF dual-hosted git repository. kgiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new 1e13e0e DISPATCH-2191: avoid counting header buffers for Q2 1e13e0e is described below commit 1e13e0e7399186d50fb8d45ff0d3d8eb6ea9f1a6 Author: Kenneth Giusti <kgiu...@apache.org> AuthorDate: Fri Jul 9 15:35:12 2021 -0400 DISPATCH-2191: avoid counting header buffers for Q2 This closes #1293 --- include/qpid/dispatch/message.h | 20 --------- src/message.c | 81 +++++++++++++++++++++++------------- src/message_private.h | 5 +++ tests/message_test.c | 91 +++++++++++++++++++++++++++++++++++++++-- 4 files changed, 146 insertions(+), 51 deletions(-) diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index fdeb4b7..6072e6d 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -553,26 +553,6 @@ void qd_message_add_fanout(qd_message_t *in_msg, void qd_message_Q2_holdoff_disable(qd_message_t *msg); /** - * Test if attempt to retrieve message data through qd_message_recv should block - * due to Q2 input holdoff limit being exceeded. This message has enough - * buffers in the internal buffer chain and any calls to to qd_message_receive - * will not result in a call to pn_link_receive to retrieve more data. - * - * @param msg A pointer to the message - */ -bool qd_message_Q2_holdoff_should_block(qd_message_t *msg); - -/** - * Test if a message that is blocked by Q2 input holdoff has enough room - * to begin receiving again. This message has transmitted and disposed of - * enough buffers to begin receiving more data from the underlying proton link. - * - * @param msg A pointer to the message - */ -bool qd_message_Q2_holdoff_should_unblock(qd_message_t *msg); - - -/** * Check if a message has hit its Q2 limit and is currently blocked. * When blocked no further message data will be read from the link. * diff --git a/src/message.c b/src/message.c index e80c295..e867913 100644 --- a/src/message.c +++ b/src/message.c @@ -646,7 +646,8 @@ typedef enum { QD_SECTION_NEED_MORE // not enough data in the buffer chain - try again } qd_section_status_t; -static qd_section_status_t message_section_check_LH(qd_buffer_t **buffer, +static qd_section_status_t message_section_check_LH(qd_message_content_t *content, + qd_buffer_t **buffer, unsigned char **cursor, const unsigned char *pattern, int pattern_length, @@ -770,6 +771,7 @@ static qd_section_status_t message_section_check_LH(qd_buffer_t **buffer // the end. And cursor + buffer will be null if the parsed section ends at // the end of the buffer chain, so be careful of that, too! // + bool buffers_protected = false; qd_buffer_t *start = *buffer; qd_buffer_t *last = test_buffer; if (last && last != start) { @@ -781,10 +783,27 @@ static qd_section_status_t message_section_check_LH(qd_buffer_t **buffer while (start) { qd_buffer_inc_fanout(start); + buffers_protected = true; if (start == last) break; start = DEQ_NEXT(start); } + + // DISPATCH-2191: protected buffers are never released - even after + // being sent - because they are referenced by the content->section_xxx + // location fields and remain valid for the life of the content + // instance. Since these buffers are never freed they must not be + // included in the Q2 threshold check! + if (buffers_protected) { + content->protected_buffers = 0; + start = DEQ_HEAD(content->buffers); + while (start) { + ++content->protected_buffers; + if (start == last) + break; + start = DEQ_NEXT(start); + } + } } location->parsed = 1; @@ -1036,7 +1055,7 @@ void qd_message_free(qd_message_t *in_msg) // may be released assert(DEQ_IS_EMPTY(msg->stream_data_list)); - const bool was_blocked = !qd_message_Q2_holdoff_should_unblock(in_msg); + const bool was_blocked = !_Q2_holdoff_should_unblock_LH(content); qd_buffer_t *buf = msg->cursor.buffer; while (buf) { qd_buffer_t *next_buf = DEQ_NEXT(buf); @@ -1053,7 +1072,7 @@ void qd_message_free(qd_message_t *in_msg) // if (content->q2_input_holdoff && was_blocked - && qd_message_Q2_holdoff_should_unblock(in_msg)) { + && _Q2_holdoff_should_unblock_LH(content)) { content->q2_input_holdoff = false; q2_unblock = content->q2_unblocker; } @@ -1558,7 +1577,7 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) qd_buffer_set_fanout(content->pending, content->fanout); DEQ_INSERT_TAIL(content->buffers, content->pending); content->pending = 0; - if (qd_message_Q2_holdoff_should_block((qd_message_t *)msg)) { + if (_Q2_holdoff_should_block_LH(content)) { if (!qd_link_is_q2_limit_unbounded(qdl)) { content->q2_input_holdoff = true; UNLOCK(content->lock); @@ -1944,7 +1963,7 @@ void qd_message_send(qd_message_t *in_msg, // by freeing a buffer there now may be room to restart a // stalled message receiver if (content->q2_input_holdoff) { - if (qd_message_Q2_holdoff_should_unblock((qd_message_t*) msg)) { + if (_Q2_holdoff_should_unblock_LH(content)) { // wake up receive side // Note: clearing holdoff here is easy compared to // clearing it in the deferred callback. Tracing @@ -2011,9 +2030,9 @@ static qd_message_depth_status_t message_check_depth_LH(qd_message_content_t *co return QD_MESSAGE_DEPTH_OK; qd_section_status_t rc; - rc = message_section_check_LH(&content->parse_buffer, &content->parse_cursor, short_pattern, SHORT, expected_tags, location, false, protect_buffer); + rc = message_section_check_LH(content, &content->parse_buffer, &content->parse_cursor, short_pattern, SHORT, expected_tags, location, false, protect_buffer); if (rc == QD_SECTION_NO_MATCH) // try the alternative - rc = message_section_check_LH(&content->parse_buffer, &content->parse_cursor, long_pattern, LONG, expected_tags, location, false, protect_buffer); + rc = message_section_check_LH(content, &content->parse_buffer, &content->parse_cursor, long_pattern, LONG, expected_tags, location, false, protect_buffer); if (rc == QD_SECTION_MATCH || (optional && rc == QD_SECTION_NO_MATCH)) { content->parse_depth = depth; @@ -2398,8 +2417,8 @@ int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field, bool *q2_bl DEQ_APPEND(content->buffers, (*buffers)); count = DEQ_SIZE(content->buffers); - // buffers added - much check for Q2: - if (qd_message_Q2_holdoff_should_block(msg)) { + // buffers added - must check for Q2: + if (_Q2_holdoff_should_block_LH(content)) { content->q2_input_holdoff = true; if (q2_blocked) *q2_blocked = true; @@ -2639,7 +2658,7 @@ void qd_message_stream_data_release(qd_message_stream_data_t *stream_data) LOCK(content->lock); - bool was_blocked = !qd_message_Q2_holdoff_should_unblock((qd_message_t*) pvt); + bool was_blocked = !_Q2_holdoff_should_unblock_LH(content); qd_message_q2_unblocker_t q2_unblock = {0}; if (pvt->is_fanout) { @@ -2670,7 +2689,7 @@ void qd_message_stream_data_release(qd_message_stream_data_t *stream_data) // if (content->q2_input_holdoff && was_blocked - && qd_message_Q2_holdoff_should_unblock((qd_message_t*) pvt)) { + && _Q2_holdoff_should_unblock_LH(content)) { content->q2_input_holdoff = false; q2_unblock = content->q2_unblocker; } @@ -2741,14 +2760,16 @@ qd_message_stream_data_result_t qd_message_next_stream_data(qd_message_t *in_msg LOCK(content->lock); - section_status = message_section_check_LH(&msg->body_buffer, &msg->body_cursor, + section_status = message_section_check_LH(content, + &msg->body_buffer, &msg->body_cursor, BODY_DATA_SHORT, 3, TAGS_BINARY, &location, true, // allow duplicates false); // do not inc buffer fanout if (section_status == QD_SECTION_NO_MATCH) { is_footer = true; - section_status = message_section_check_LH(&msg->body_buffer, &msg->body_cursor, + section_status = message_section_check_LH(content, + &msg->body_buffer, &msg->body_cursor, FOOTER_SHORT, 3, TAGS_MAP, &location, true, false); } @@ -2907,18 +2928,19 @@ void qd_message_Q2_holdoff_disable(qd_message_t *msg) } -bool qd_message_Q2_holdoff_should_block(qd_message_t *msg) +bool _Q2_holdoff_should_block_LH(const qd_message_content_t *content) { - if (!msg) - return false; - qd_message_pvt_t *msg_pvt = (qd_message_pvt_t*) msg; - return !msg_pvt->content->disable_q2_holdoff && DEQ_SIZE(msg_pvt->content->buffers) >= QD_QLIMIT_Q2_UPPER; + const size_t buff_ct = DEQ_SIZE(content->buffers); + assert(buff_ct >= content->protected_buffers); + return !content->disable_q2_holdoff && (buff_ct - content->protected_buffers) >= QD_QLIMIT_Q2_UPPER; } -bool qd_message_Q2_holdoff_should_unblock(qd_message_t *msg) +bool _Q2_holdoff_should_unblock_LH(const qd_message_content_t *content) { - return DEQ_SIZE(((qd_message_pvt_t*)msg)->content->buffers) < QD_QLIMIT_Q2_LOWER; + const size_t buff_ct = DEQ_SIZE(content->buffers); + assert(buff_ct >= content->protected_buffers); + return content->disable_q2_holdoff || (buff_ct - content->protected_buffers) < QD_QLIMIT_Q2_LOWER; } @@ -2963,13 +2985,16 @@ int qd_message_stream_data_append(qd_message_t *message, qd_buffer_list_t *data, if (length == 0) return rc; - // DISPATCH-1803: ensure no body data section can violate the Q2 threshold. - // This allows the egress router to wait for an entire body data section - // to arrive and be validated before sending it out to the endpoint. + // DISPATCH-1803: ensure no body data section can exceed the + // QD_QLIMIT_Q2_LOWER. This allows the egress router to wait for an entire + // body data section to arrive and be validated before sending it out to + // the endpoint without preventing Q2 from being relieved (DISPATCH-2191). // - while (length > QD_QLIMIT_Q2_LOWER) { + const size_t buf_limit = QD_QLIMIT_Q2_LOWER - 2; // reserve 1 extra for performative header + assert(buf_limit); + while (length > buf_limit) { qd_buffer_t *buf = DEQ_HEAD(*data); - for (int i = 0; i < QD_QLIMIT_Q2_LOWER; ++i) { + for (int i = 0; i < buf_limit; ++i) { buf = DEQ_NEXT(buf); } @@ -2981,14 +3006,14 @@ int qd_message_stream_data_append(qd_message_t *message, qd_buffer_list_t *data, DEQ_TAIL(*data) = DEQ_PREV(buf); DEQ_NEXT(DEQ_TAIL(*data)) = 0; DEQ_PREV(buf) = 0; - DEQ_SIZE(trailer) = length - QD_QLIMIT_Q2_LOWER; - DEQ_SIZE(*data) = QD_QLIMIT_Q2_LOWER; + DEQ_SIZE(trailer) = length - buf_limit; + DEQ_SIZE(*data) = buf_limit; field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field); qd_compose_insert_binary_buffers(field, data); DEQ_MOVE(trailer, *data); - length -= QD_QLIMIT_Q2_LOWER; + length -= buf_limit; } field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field); diff --git a/src/message_private.h b/src/message_private.h index bb7970f..cdb0e28 100644 --- a/src/message_private.h +++ b/src/message_private.h @@ -132,6 +132,7 @@ typedef struct { bool ma_stream; // indicates whether this message is streaming uint64_t max_message_size; // configured max; 0 if no max to enforce uint64_t bytes_received; // bytes returned by pn_link_recv() when enforcing max_message_size + size_t protected_buffers; // count of permanent buffers that hold the message headers uint32_t fanout; // The number of receivers for this message, including in-process subscribers. qd_message_q2_unblocker_t q2_unblocker; // callback and context to signal Q2 unblocked to receiver @@ -179,6 +180,10 @@ void qd_message_initialize(); #define QDR_MAX_PRIORITY (QDR_N_PRIORITIES - 1) #define QDR_DEFAULT_PRIORITY 4 +// These expect content->lock to be locked. +bool _Q2_holdoff_should_block_LH(const qd_message_content_t *content); +bool _Q2_holdoff_should_unblock_LH(const qd_message_content_t *content); + ///@} #endif diff --git a/tests/message_test.c b/tests/message_test.c index 64969dc..5e146da 100644 --- a/tests/message_test.c +++ b/tests/message_test.c @@ -484,11 +484,11 @@ static char* test_q2_input_holdoff_sensing(void *context) qd_message_content_t *content = MSG_CONTENT(msg); set_content_bufs(content, nbufs); - if (qd_message_Q2_holdoff_should_block(msg) != (nbufs >= QD_QLIMIT_Q2_UPPER)) { + if (_Q2_holdoff_should_block_LH(content) != (nbufs >= QD_QLIMIT_Q2_UPPER)) { qd_message_free(msg); return "qd_message_holdoff_would_block was miscalculated"; } - if (qd_message_Q2_holdoff_should_unblock(msg) != (nbufs < QD_QLIMIT_Q2_LOWER)) { + if (_Q2_holdoff_should_unblock_LH(content) != (nbufs < QD_QLIMIT_Q2_LOWER)) { qd_message_free(msg); return "qd_message_holdoff_would_unblock was miscalculated"; } @@ -1053,7 +1053,7 @@ static char *test_check_stream_data_append(void * context) // smaller lists that are no bigger than QD_QLIMIT_Q2_LOWER buffers // long body_buffers += qd_message_stream_data_buffer_count(stream_data); - if (qd_message_stream_data_buffer_count(stream_data) > QD_QLIMIT_Q2_LOWER) { + if (qd_message_stream_data_buffer_count(stream_data) >= QD_QLIMIT_Q2_LOWER) { result = "Body data list length too long!"; goto exit; } @@ -1449,6 +1449,90 @@ exit: } +// Ensure that the Q2 calculation does not include header buffers. Header +// buffers are held until the message is freed, so they should not be a factor +// in flow control (DISPATCH-2191). +// +static char *test_q2_ignore_headers(void *context) +{ + char *result = 0; + qd_message_t *msg = qd_message(); + qd_message_content_t *content = MSG_CONTENT(msg); + + // create a message and add a bunch of headers. Put each header in its own + // buffer to increase the buffer count. + + qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0); + qd_compose_start_list(field); + qd_compose_insert_bool(field, 0); // durable + qd_compose_insert_null(field); // priority + qd_compose_end_list(field); + qd_buffer_list_t field_buffers; + qd_compose_take_buffers(field, &field_buffers); + qd_compose_free(field); + content->buffers = field_buffers; + + const qd_amqp_performative_t plist[3] = { + QD_PERFORMATIVE_DELIVERY_ANNOTATIONS, + QD_PERFORMATIVE_MESSAGE_ANNOTATIONS, + QD_PERFORMATIVE_APPLICATION_PROPERTIES}; + + for (int i = 0; i < 3; ++i) { + field = qd_compose(plist[i], 0); + qd_compose_start_map(field); + qd_compose_insert_symbol(field, "Key"); + qd_compose_insert_string(field, "Value"); + qd_compose_end_map(field); + qd_compose_take_buffers(field, &field_buffers); + qd_compose_free(field); + DEQ_APPEND(content->buffers, field_buffers); + } + + // validate the message - this will mark the buffers that contain header + // data + if (qd_message_check_depth(msg, QD_DEPTH_APPLICATION_PROPERTIES) != QD_MESSAGE_DEPTH_OK) { + result = "Unexpected depth check failure"; + goto exit; + } + + const size_t header_ct = DEQ_SIZE(content->buffers); + assert(header_ct); + assert(!_Q2_holdoff_should_block_LH(content)); + + // Now append buffers until Q2 blocks + while (!_Q2_holdoff_should_block_LH(content)) { + qd_buffer_t *buffy = qd_buffer(); + qd_buffer_insert(buffy, qd_buffer_capacity(buffy)); + DEQ_INSERT_TAIL(content->buffers, buffy); + } + + // expect: block occurs when length == QD_QLIMIT_Q2_UPPER + header_ct + if (DEQ_SIZE(content->buffers) != QD_QLIMIT_Q2_UPPER + header_ct) { + result = "Wrong buffer length for Q2 activate!"; + goto exit; + } + + // now remove buffers until Q2 is relieved + + while (!_Q2_holdoff_should_unblock_LH(content)) { + qd_buffer_t *buffy = DEQ_TAIL(content->buffers); + DEQ_REMOVE_TAIL(content->buffers); + qd_buffer_free(buffy); + } + + // expect: Q2 deactivates when list length < QD_QDLIMIT_Q2_LOWER + header_ct + if (DEQ_SIZE(content->buffers) != (QD_QLIMIT_Q2_LOWER + header_ct) - 1) { + result = "Wrong buffer length for Q2 deactivate!"; + goto exit; + } + +exit: + + qd_message_free(msg); + return result; +} + + int message_tests(void) { int result = 0; @@ -1467,6 +1551,7 @@ int message_tests(void) TEST_CASE(test_check_stream_data_fanout, 0); TEST_CASE(test_check_stream_data_footer, 0); TEST_CASE(test_q2_callback_on_disable, 0); + TEST_CASE(test_q2_ignore_headers, 0); return result; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org