DISPATCH-807: Define limits, state, and accessors for Q2 input holdoff
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/71aaf446 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/71aaf446 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/71aaf446 Branch: refs/heads/master Commit: 71aaf446cfcb983e4c5cb7ddb433481dfc1cddcf Parents: e584a3e Author: Chuck Rolke <cro...@redhat.com> Authored: Thu Sep 14 09:53:55 2017 -0400 Committer: Chuck Rolke <cro...@redhat.com> Committed: Fri Sep 15 14:50:33 2017 -0400 ---------------------------------------------------------------------- include/qpid/dispatch/message.h | 41 ++++++++++++++++++++++++++++++++++++ src/message.c | 24 +++++++++++++++++++++ src/message_private.h | 1 + tests/message_test.c | 33 +++++++++++++++++++++++++++++ 4 files changed, 99 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/71aaf446/include/qpid/dispatch/message.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index 111c6a0..e0df420 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -36,6 +36,13 @@ * @{ */ +// DISPATCH-807 Queue depth limits +// upper and lower limits for bang bang hysteresis control +// +// Q2 defines the number of buffers allowed in a message's buffer chain +#define QD_QLIMIT_Q2_UPPER 128 +#define QD_QLIMIT_Q2_LOWER 120 + // Callback for status change (confirmed persistent, loaded-in-memory, etc.) typedef struct qd_message_t qd_message_t; @@ -351,6 +358,40 @@ size_t qd_message_fanout(qd_message_t *msg); */ void qd_message_add_fanout(qd_message_t *msg); +/** + * Setter for message Q2 input_holdoff state + * + * @param msg A pointer to the message + */ +void qd_message_set_Q2_input_holdoff(qd_message_t *msg, bool holdoff); + +/** + * Accessor for message Q2 input_holdoff state + * + * @param msg A pointer to the message + * @return true if input is being held off + */ +bool qd_message_get_Q2_input_holdoff(qd_message_t *msg); + +/** + * Test if attempt to retreive message data through qd_message_recv should block + * due to Q2 input holdoff limit being exceeded. This message has enough + * buffers in the internal buffer chain and any calls to to qd_message_receive + * will not result in a call to pn_link_receive to retrieve more data. + * + * @param msg A pointer to the message + */ +bool qd_message_Q2_holdoff_should_block(qd_message_t *msg); + +/** + * Test if a message that is blocked by Q2 input holdoff has enough room + * to begin receiving again. This message has transmitted and disposed of + * enough buffers to begin receiving more data from the underlying proton link. + * + * @param msg A pointer to the message + */ +bool qd_message_Q2_holdoff_should_unblock(qd_message_t *msg); + ///@} #endif http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/71aaf446/src/message.c ---------------------------------------------------------------------- diff --git a/src/message.c b/src/message.c index 6074ebb..12b09ce 100644 --- a/src/message.c +++ b/src/message.c @@ -1865,3 +1865,27 @@ int qd_message_get_phase_val(qd_message_t *msg) { return ((qd_message_pvt_t*)msg)->content->ma_int_phase; } + + +void qd_message_set_Q2_input_holdoff(qd_message_t *msg, bool holdoff) +{ + ((qd_message_pvt_t*)msg)->q2_input_holdoff = holdoff; +} + + +bool qd_message_get_Q2_input_holdoff(qd_message_t *msg) +{ + return ((qd_message_pvt_t*)msg)->q2_input_holdoff; +} + + +bool qd_message_Q2_holdoff_should_block(qd_message_t *msg) +{ + return DEQ_SIZE(((qd_message_pvt_t*)msg)->content->buffers) >= QD_QLIMIT_Q2_UPPER; +} + + +bool qd_message_Q2_holdoff_should_unblock(qd_message_t *msg) +{ + return DEQ_SIZE(((qd_message_pvt_t*)msg)->content->buffers) < QD_QLIMIT_Q2_LOWER; +} http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/71aaf446/src/message_private.h ---------------------------------------------------------------------- diff --git a/src/message_private.h b/src/message_private.h index eab4ae0..aa01831 100644 --- a/src/message_private.h +++ b/src/message_private.h @@ -127,6 +127,7 @@ typedef struct { qd_buffer_list_t ma_ingress; // ingress field in outgoing message annotations int ma_phase; // phase for the override address bool strip_annotations_in; + bool q2_input_holdoff;// hold off calling pn_link_recv } qd_message_pvt_t; ALLOC_DECLARE(qd_message_t); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/71aaf446/tests/message_test.c ---------------------------------------------------------------------- diff --git a/tests/message_test.c b/tests/message_test.c index 116e304..0ef1d8b 100644 --- a/tests/message_test.c +++ b/tests/message_test.c @@ -61,6 +61,17 @@ static void set_content(qd_message_content_t *content, size_t len) } +static void set_content_bufs(qd_message_content_t *content, int nbufs) +{ + for (; nbufs > 0; nbufs--) { + qd_buffer_t *buf = qd_buffer(); + size_t segment = qd_buffer_capacity(buf); + qd_buffer_insert(buf, segment); + DEQ_INSERT_TAIL(content->buffers, buf); + } +} + + static char* test_send_to_messenger(void *context) { qd_message_t *msg = qd_message(); @@ -314,6 +325,27 @@ static char* test_send_message_annotations(void *context) } +static char* test_q2_input_holdoff_sensing(void *context) +{ + if (QD_QLIMIT_Q2_LOWER >= QD_QLIMIT_Q2_UPPER) + return "QD_LIMIT_Q2 lower limit is bigger than upper limit"; + + for (int nbufs=1; nbufs<QD_QLIMIT_Q2_UPPER + 1; nbufs++) { + qd_message_t *msg = qd_message(); + qd_message_content_t *content = MSG_CONTENT(msg); + + set_content_bufs(content, nbufs); + if (qd_message_Q2_holdoff_should_block(msg) != (nbufs >= QD_QLIMIT_Q2_UPPER)) + return "qd_message_holdoff_would_block was miscalculated"; + if (qd_message_Q2_holdoff_should_unblock(msg) != (nbufs < QD_QLIMIT_Q2_LOWER)) + return "qd_message_holdoff_would_unblock was miscalculated"; + + qd_message_free(msg); + } + return 0; +} + + int message_tests(void) { int result = 0; @@ -324,6 +356,7 @@ int message_tests(void) TEST_CASE(test_message_properties, 0); TEST_CASE(test_check_multiple, 0); TEST_CASE(test_send_message_annotations, 0); + TEST_CASE(test_q2_input_holdoff_sensing, 0); return result; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org