This is an automated email from the ASF dual-hosted git repository.
cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/main by this push:
new 60ab050bd PROTON-2790: finer grained session flow control
60ab050bd is described below
commit 60ab050bd4da40fd845a0a329bb134bdb4e3903a
Author: Clifford Jansen <[email protected]>
AuthorDate: Tue Oct 8 17:22:34 2024 -0700
PROTON-2790: finer grained session flow control
---
c/docs/buffering.md | 2 +-
c/include/proton/session.h | 82 +++++++++++
c/include/proton/transport.h | 8 +-
c/include/proton/types.h | 7 +
c/src/core/engine-internal.h | 6 +
c/src/core/engine.c | 86 ++++++++++-
c/src/core/transport.c | 75 +++++++---
c/tests/connection_driver_test.cpp | 7 +-
c/tests/engine_test.cpp | 290 +++++++++++++++++++++++++++++++++++++
9 files changed, 533 insertions(+), 30 deletions(-)
diff --git a/c/docs/buffering.md b/c/docs/buffering.md
index 71dafbf7b..32567a2f0 100644
--- a/c/docs/buffering.md
+++ b/c/docs/buffering.md
@@ -16,7 +16,7 @@ gets a @ref PN_LINK_FLOW event.
The AMQP protocol allows peers to exchange session limits so they can predict
their buffering requirements for incoming data (
-`pn_session_set_incoming_capacity()` and
+`pn_session_set_incoming_incoming_window_and_lwm()` and
`pn_session_set_outgoing_window()`). Proton will not exceed those limits when
sending to or receiving from the peer. However proton does *not* limit the
amount of data buffered in local memory at the request of the application. It
diff --git a/c/include/proton/session.h b/c/include/proton/session.h
index e09d41113..ac30ccd2b 100644
--- a/c/include/proton/session.h
+++ b/c/include/proton/session.h
@@ -194,6 +194,8 @@ PN_EXTERN void pn_session_open(pn_session_t *session);
PN_EXTERN void pn_session_close(pn_session_t *session);
/**
+ * **Deprecated** - Use ::pn_session_incoming_window().
+ *
* Get the incoming capacity of the session measured in bytes.
*
* The incoming capacity of a session determines how much incoming
@@ -205,6 +207,8 @@ PN_EXTERN void pn_session_close(pn_session_t *session);
PN_EXTERN size_t pn_session_get_incoming_capacity(pn_session_t *session);
/**
+ * **Deprecated** - Use ::pn_session_set_incoming_window_and_lwm().
+ *
* Set the incoming capacity for a session object.
*
* The incoming capacity of a session determines how much incoming message
@@ -223,6 +227,84 @@ PN_EXTERN size_t
pn_session_get_incoming_capacity(pn_session_t *session);
*/
PN_EXTERN void pn_session_set_incoming_capacity(pn_session_t *session, size_t
capacity);
+/**
+ * Get the maximum incoming window window for a session object.
+ *
+ * The maximum incoming window can be set by
::pn_session_set_incoming_window_and_lwm.
+ *
+ * @param[in] session the session object
+ * @return the maximum size of the incoming window or 0 if not set.
+ **/
+PN_EXTERN pn_frame_count_t pn_session_incoming_window(pn_session_t *session);
+
+/**
+ * Get the low water mark for the session incoming window.
+ *
+ * The low water mark governs how frequently the session updates the remote
+ * peer with changes to the incoming window.
+ *
+ * A value of zero indicates that Proton will choose a default strategy for
+ * updating the peer.
+ *
+ * The low water mark can be set by ::pn_session_set_incoming_window_and_lwm.
+ *
+ * @param[in] session the session object
+ * @return the low water mark of incoming window.
+ **/
+PN_EXTERN pn_frame_count_t pn_session_incoming_window_lwm(pn_session_t
*session);
+
+/**
+ * Set the maximum incoming window and low water mark for a session object.
+ *
+ * The session incoming window is a count of the number of AMQP transfer frames
+ * that can be accepted and buffered locally by the session object until
+ * processed by the application (i.e. consumed by ::pn_link_recv or dropped by
+ * ::pn_link_advance). The maximum bytes buffered by the session will never
+ * exceed (max_incoming_window * max_frame_size). The incoming window frame
count
+ * decreases 1-1 with incoming AMQP transfer frames. Whenever the application
+ * processes the buffered incoming bytes, the incoming window increases to the
+ * largest frame count that can be used by the peer without causing the local
+ * buffered bytes to exceed the maximum stated above.
+ *
+ * The session will defer updating the peer with a changed incoming window
until
+ * it drops below the low water mark (lwm). Too many updates can delay
+ * other traffic on the connection without providing improved performance on
the
+ * session. Too few can leave a remote sender frequently unable to send due
+ * to a closed window. The best balance is application specific. Note that
the
+ * session incoming window is always updated along with the link credit on any
+ * of its child links, so the frequency of link credit updates is also a
+ * consideration when choosing a low water mark.
+ *
+ * The low water mark must be less than or equal to the incoming window. If
+ * set to zero, Proton will choose a default strategy for updating the
+ * incoming window.
+ *
+ * This call is only valid before the call to ::pn_session_open on the session.
+ * Subsequently, the settings are fixed for the life of the session and only
+ * have effect if a max frame size is also set on the session's connection.
+ *
+ * @param[in] session the session object
+ * @param[in] window the maximum incoming window buffered by the session
+ * @param[in] lwm the low water mark (or 0 for default window updating)
+ *
+ * @return 0 on success, PN_ARG_ERR if window is zero or lwm is greater than
+ * window, or PN_STATE_ERR if the session is already open.
+ */
+PN_EXTERN int pn_session_set_incoming_window_and_lwm(pn_session_t *session,
pn_frame_count_t window, pn_frame_count_t lwm);
+
+/**
+ * Get the remote view of the incoming window for the session.
+ *
+ * This evaluates to the most recent incoming window value communicated by the
+ * peer minus any subsequent transfer frames for the session that have been
+ * sent. It does not include transfer frames that may be created in future
+ * for locally buffered content tracked by @ref pn_session_outgoing_bytes.
+ *
+ * @param[in] session the session object
+ * @return the remote incoming window
+ */
+PN_EXTERN pn_frame_count_t pn_session_remote_incoming_window(pn_session_t
*session);
+
/**
* Get the outgoing window for a session object.
*
diff --git a/c/include/proton/transport.h b/c/include/proton/transport.h
index 8b8b66403..3aea0f281 100644
--- a/c/include/proton/transport.h
+++ b/c/include/proton/transport.h
@@ -432,10 +432,16 @@ PN_EXTERN uint32_t
pn_transport_get_max_frame(pn_transport_t *transport);
/**
* Set the maximum frame size of a transport.
*
+ * The negotiated frame size cannot change over the life of the transport.
After
+ * the transport has started sending AMQP frames to the peer, this function
call
+ * has no effect. Typically, the maximum frame size is set when the transport
is
+ * created.
+ *
* @param[in] transport a transport object
* @param[in] size the maximum frame size for the transport object
*
- * @internal XXX Deprecate when moved to connection
+ * @internal XXX Deprecate when moved to connection, note size can change on
+ * reconnect with new transport, consider status return on new API.
*/
PN_EXTERN void pn_transport_set_max_frame(pn_transport_t *transport, uint32_t
size);
diff --git a/c/include/proton/types.h b/c/include/proton/types.h
index 2fed2b74d..b0b8faae2 100644
--- a/c/include/proton/types.h
+++ b/c/include/proton/types.h
@@ -141,6 +141,13 @@ extern "C" {
*/
typedef uint32_t pn_sequence_t;
+/**
+ * A count or limit of AMQP transfer frames.
+ *
+ * @ingroup api_types
+ */
+typedef uint32_t pn_frame_count_t;
+
/**
* A span of time in milliseconds.
*
diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h
index 2e836b8cf..62aa12434 100644
--- a/c/src/core/engine-internal.h
+++ b/c/src/core/engine-internal.h
@@ -272,6 +272,11 @@ struct pn_session_t {
pn_sequence_t incoming_deliveries;
pn_sequence_t outgoing_deliveries;
pn_sequence_t outgoing_window;
+ pn_frame_count_t incoming_window_lwm;
+ pn_frame_count_t max_incoming_window;
+ bool check_flow;
+ bool need_flow;
+ bool lwm_default;
};
struct pn_terminus_t {
@@ -395,6 +400,7 @@ void pn_ep_incref(pn_endpoint_t *endpoint);
void pn_ep_decref(pn_endpoint_t *endpoint);
ssize_t pni_transport_grow_capacity(pn_transport_t *transport, size_t n);
+ void pni_session_update_incoming_lwm(pn_session_t *ssn);
#if __cplusplus
}
diff --git a/c/src/core/engine.c b/c/src/core/engine.c
index 3d3de7ade..4ad8c4b05 100644
--- a/c/src/core/engine.c
+++ b/c/src/core/engine.c
@@ -1049,6 +1049,10 @@ pn_session_t *pn_session(pn_connection_t *conn)
ssn->outgoing_deliveries = 0;
ssn->outgoing_window = AMQP_MAX_WINDOW_SIZE;
ssn->local_handle_max = PN_IMPL_HANDLE_MAX;
+ ssn->incoming_window_lwm = 1;
+ ssn->check_flow = false;
+ ssn->need_flow = false;
+ ssn->lwm_default = true;
// begin transport state
memset(&ssn->state, 0, sizeof(ssn->state));
@@ -1095,11 +1099,74 @@ size_t pn_session_get_incoming_capacity(pn_session_t
*ssn)
return ssn->incoming_capacity;
}
+// Update required when (re)set by user or when session started (proxy: BEGIN
frame). No
+// session flow control actually means flow control with huge window, so set
lwm to 1. There is
+// low probability of a stall. Any link credit flow frame will update session
credit too.
+void pni_session_update_incoming_lwm(pn_session_t *ssn) {
+ if (ssn->incoming_capacity) {
+ // Old API.
+ if (!ssn->connection->transport)
+ return; // Defer until called again from BEGIN frame setup with max
frame known.
+ if (ssn->connection->transport->local_max_frame) {
+ ssn->incoming_window_lwm = (ssn->incoming_capacity /
ssn->connection->transport->local_max_frame) / 2;
+ if (!ssn->incoming_window_lwm)
+ ssn->incoming_window_lwm = 1; // Zero may hang.
+ } else {
+ ssn->incoming_window_lwm = 1;
+ }
+ } else if (ssn->max_incoming_window) {
+ // New API.
+ // Only need to deal with default. Called whensending BEGIN frame.
+ if (ssn->connection->transport &&
ssn->connection->transport->local_max_frame && ssn->lwm_default) {
+ ssn->incoming_window_lwm = (ssn->max_incoming_window + 1) / 2;
+ }
+ } else {
+ ssn->incoming_window_lwm = 1;
+ }
+ assert(ssn->incoming_window_lwm != 0); // 0 allows session flow to hang
+}
+
void pn_session_set_incoming_capacity(pn_session_t *ssn, size_t capacity)
{
assert(ssn);
- // XXX: should this trigger a flow?
ssn->incoming_capacity = capacity;
+ ssn->max_incoming_window = 0;
+ ssn->incoming_window_lwm = 1;
+ ssn->lwm_default = true;
+ if (ssn->connection->transport) {
+ ssn->check_flow = true;
+ ssn->need_flow = true;
+ pn_modified(ssn->connection, &ssn->endpoint, false);
+ }
+ pni_session_update_incoming_lwm(ssn);
+ // If capacity invalid, failure occurs when transport calculates value of
incoming window.
+}
+
+int pn_session_set_incoming_window_and_lwm(pn_session_t *ssn, pn_frame_count_t
window, pn_frame_count_t lwm) {
+ assert(ssn);
+ if (!window || (lwm && lwm > window))
+ return PN_ARG_ERR;
+ // Settings fixed after session open for simplicity. AMPQ actually allows
dynamic change with risk
+ // of overflow if window reduced while transfers in flight.
+ if (ssn->endpoint.state & PN_LOCAL_ACTIVE)
+ return PN_STATE_ERR;
+ ssn->incoming_capacity = 0;
+ ssn->max_incoming_window = window;
+ ssn->lwm_default = (lwm == 0);
+ ssn->incoming_window_lwm = lwm;
+ return 0;
+}
+
+pn_frame_count_t pn_session_incoming_window(pn_session_t *ssn) {
+ return ssn->max_incoming_window;
+}
+
+pn_frame_count_t pn_session_incoming_window_lwm(pn_session_t *ssn) {
+ return (!ssn->max_incoming_window || ssn->lwm_default) ? 0 :
ssn->incoming_window_lwm;
+}
+
+pn_frame_count_t pn_session_remote_incoming_window(pn_session_t *ssn) {
+ return ssn->state.remote_incoming_window;
}
size_t pn_session_get_outgoing_window(pn_session_t *ssn)
@@ -1873,11 +1940,16 @@ static void pni_advance_receiver(pn_link_t *link)
link->session->incoming_deliveries--;
pn_delivery_t *current = link->current;
- link->session->incoming_bytes -= pn_buffer_size(current->bytes);
+ size_t drop_count = pn_buffer_size(current->bytes);
pn_buffer_clear(current->bytes);
- if (!link->session->state.incoming_window) {
- pni_add_tpwork(current);
+ if (drop_count) {
+ pn_session_t *ssn = link->session;
+ ssn->incoming_bytes -= drop_count;
+ if (!ssn->check_flow && ssn->state.incoming_window <
ssn->incoming_window_lwm) {
+ ssn->check_flow = true;
+ pni_add_tpwork(current);
+ }
}
link->current = link->current->unsettled_next;
@@ -2025,8 +2097,10 @@ ssize_t pn_link_recv(pn_link_t *receiver, char *bytes,
size_t n)
size_t size = pn_buffer_get(delivery->bytes, 0, n, bytes);
pn_buffer_trim(delivery->bytes, size, 0);
if (size) {
- receiver->session->incoming_bytes -= size;
- if (!receiver->session->state.incoming_window) {
+ pn_session_t *ssn = receiver->session;
+ ssn->incoming_bytes -= size;
+ if (!ssn->check_flow && ssn->state.incoming_window <
ssn->incoming_window_lwm) {
+ ssn->check_flow = true;
pni_add_tpwork(delivery);
}
return size;
diff --git a/c/src/core/transport.c b/c/src/core/transport.c
index 89a50f70b..d89625797 100644
--- a/c/src/core/transport.c
+++ b/c/src/core/transport.c
@@ -1101,6 +1101,7 @@ int pn_do_begin(pn_transport_t *transport, uint8_t
frame_type, uint16_t channel,
} else {
ssn = pn_session(transport->connection);
}
+ ssn->state.remote_incoming_window = incoming_window;
ssn->state.incoming_transfer_count = next;
if (handle_max_q) {
ssn->state.remote_handle_max = handle_max;
@@ -1459,9 +1460,11 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t
frame_type, uint16_t chann
ssn->state.incoming_transfer_count++;
ssn->state.incoming_window--;
- // XXX: need better policy for when to refresh window
- if (!ssn->state.incoming_window && (int32_t) link->state.local_handle >= 0) {
- pni_post_flow(transport, ssn, link);
+ if ((int32_t) link->state.local_handle >= 0 && ssn->state.incoming_window <
ssn->incoming_window_lwm) {
+ if (!ssn->check_flow) {
+ ssn->check_flow = true;
+ pn_modified(ssn->connection, &link->endpoint, false);
+ }
}
return 0;
@@ -1873,23 +1876,35 @@ static size_t pni_session_outgoing_window(pn_session_t
*ssn)
return ssn->outgoing_window;
}
-static size_t pni_session_incoming_window(pn_session_t *ssn)
+// Calculate the available incomming window
+static pn_frame_count_t pni_session_incoming_window(pn_session_t *ssn)
{
pn_transport_t *t = ssn->connection->transport;
uint32_t size = t->local_max_frame;
size_t capacity = ssn->incoming_capacity;
- if (!size || !capacity) { /* session flow control is not enabled */
+ if (!size || (!capacity && !ssn->max_incoming_window)) { /* session flow
control is not enabled */
return AMQP_MAX_WINDOW_SIZE;
- } else if (capacity >= size) { /* precondition */
- return (capacity - ssn->incoming_bytes) / size;
- } else { /* error: we will never have a non-zero window
*/
- pn_condition_format(
- pn_transport_condition(t),
- "amqp:internal-error",
- "session capacity %zu is less than frame size %" PRIu32,
- capacity, size);
- pn_transport_close_tail(t);
- return 0;
+ }
+ // Calculate depending on whether application specified capacity or a frame
count.
+ assert(capacity || ssn->max_incoming_window);
+ if (capacity) {
+ // Old API
+ if (capacity >= size) { /* precondition */
+ return capacity > ssn->incoming_bytes ? (pn_frame_count_t) (capacity -
ssn->incoming_bytes) / size : 0;
+ } else { /* error: we will never have a non-zero
window */
+ pn_condition_format(
+ pn_transport_condition(t),
+ "amqp:internal-error",
+ "session capacity %zu is less than frame size %"
PRIu32,
+ capacity, size);
+ pn_transport_close_tail(t);
+ return 0;
+ }
+ } else {
+ // New API
+ // Find smallest number of frames that could have sent the buffered bytes.
+ pn_frame_count_t nominal_fc = (ssn->incoming_bytes + size - 1) / size;
+ return nominal_fc >= ssn->max_incoming_window ? 0 :
ssn->max_incoming_window - nominal_fc;
}
}
@@ -1920,6 +1935,8 @@ static int pni_process_ssn_setup(pn_transport_t
*transport, pn_endpoint_t *endpo
pn_logger_logf(&transport->logger, PN_SUBSYSTEM_AMQP,
PN_LEVEL_WARNING, "unable to find an open available channel within limit of
%u", transport->channel_max );
return PN_ERR;
}
+ if (ssn->incoming_capacity || ssn->max_incoming_window)
+ pni_session_update_incoming_lwm(ssn);
state->incoming_window = pni_session_incoming_window(ssn);
state->outgoing_window = pni_session_outgoing_window(ssn);
/* "DL[?HIIII]" */
@@ -2055,10 +2072,12 @@ static int pni_process_link_setup(pn_transport_t
*transport, pn_endpoint_t *endp
static int pni_post_flow(pn_transport_t *transport, pn_session_t *ssn,
pn_link_t *link)
{
+ ssn->check_flow = false;
+ ssn->need_flow = false;
ssn->state.incoming_window = pni_session_incoming_window(ssn);
ssn->state.outgoing_window = pni_session_outgoing_window(ssn);
bool linkq = (bool) link;
- pn_link_state_t *state = &link->state;
+ pn_link_state_t *state = linkq ? &link->state : NULL;
/* "DL[?IIII?I?I?In?o]" */
pn_bytes_t buf =
pn_amqp_encode_DLEQIIIIQIQIQInQoe(&transport->scratch_space, FLOW,
(int16_t) ssn->state.remote_channel >= 0,
ssn->state.incoming_transfer_count,
@@ -2072,6 +2091,17 @@ static int pni_post_flow(pn_transport_t *transport,
pn_session_t *ssn, pn_link_t
return pn_framing_send_amqp(transport, ssn->state.local_channel, buf);
}
+static inline bool pni_session_need_flow(pn_session_t *ssn) {
+ if (ssn->need_flow)
+ return true;
+ if (ssn->check_flow && ssn->state.incoming_window < ssn->incoming_window_lwm
&&
+ pni_session_incoming_window(ssn) > ssn->state.incoming_window)
+ return true;
+
+ ssn->check_flow = false;
+ return false;
+}
+
static int pni_process_flow_receiver(pn_transport_t *transport, pn_endpoint_t
*endpoint)
{
if (endpoint->type == RECEIVER && endpoint->state & PN_LOCAL_ACTIVE)
@@ -2081,7 +2111,7 @@ static int pni_process_flow_receiver(pn_transport_t
*transport, pn_endpoint_t *e
pn_link_state_t *state = &rcv->state;
if ((int16_t) ssn->state.local_channel >= 0 &&
(int32_t) state->local_handle >= 0 &&
- ((rcv->drain || state->link_credit != rcv->credit - rcv->queued) ||
!ssn->state.incoming_window)) {
+ ((rcv->drain || state->link_credit != rcv->credit - rcv->queued) ||
pni_session_need_flow(ssn))) {
state->link_credit = rcv->credit - rcv->queued;
return pni_post_flow(transport, ssn, rcv);
}
@@ -2244,8 +2274,7 @@ static int pni_process_tpwork_receiver(pn_transport_t
*transport, pn_delivery_t
if (err) return err;
}
- // XXX: need to centralize this policy and improve it
- if (!ssn->state.incoming_window) {
+ if (pni_session_need_flow(ssn)) {
int err = pni_post_flow(transport, ssn, link);
if (err) return err;
}
@@ -2299,6 +2328,10 @@ static int pni_process_flush_disp(pn_transport_t
*transport, pn_endpoint_t *endp
{
int err = pni_flush_disp(transport, session);
if (err) return err;
+ if (session->need_flow) {
+ err = pni_post_flow(transport, session, NULL);
+ if (err) return err;
+ }
}
}
@@ -2832,6 +2865,10 @@ uint32_t pn_transport_get_max_frame(pn_transport_t
*transport)
void pn_transport_set_max_frame(pn_transport_t *transport, uint32_t size)
{
+ if (transport->open_sent) {
+ pn_logger_logf(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_WARNING,
"Cannot change local max-frame after OPEN frame sent.");
+ return;
+ }
// if size == 0, no advertised limit to input frame size.
if (size && size < AMQP_MIN_MAX_FRAME_SIZE)
size = AMQP_MIN_MAX_FRAME_SIZE;
diff --git a/c/tests/connection_driver_test.cpp
b/c/tests/connection_driver_test.cpp
index 46dcb9c59..ff1b597c0 100644
--- a/c/tests/connection_driver_test.cpp
+++ b/c/tests/connection_driver_test.cpp
@@ -397,7 +397,8 @@ TEST_CASE("driver_message_abort_mixed") {
static void set_capacity_and_max_frame(size_t capacity, size_t max_frame,
pn_test::driver_pair &d,
const char *data) {
- pn_transport_set_max_frame(d.client.transport, max_frame);
+ if (pn_transport_get_max_frame(d.client.transport) != max_frame)
+ pn_transport_set_max_frame(d.client.transport, max_frame);
pn_connection_open(d.client.connection);
pn_session_t *ssn = pn_session(d.client.connection);
pn_session_set_incoming_capacity(ssn, capacity);
@@ -444,11 +445,11 @@ TEST_CASE("driver_session_flow_control") {
}
/* Capacity smaller than frame size is an error */
- set_capacity_and_max_frame(1234, 12345, d, "foo");
+ set_capacity_and_max_frame(1233, 1234, d, "foo");
CHECK_THAT(
*client.last_condition,
cond_matches("amqp:internal-error",
- "session capacity 1234 is less than frame size 12345"));
+ "session capacity 1233 is less than frame size 1234"));
free(buf.start);
}
diff --git a/c/tests/engine_test.cpp b/c/tests/engine_test.cpp
index f8b174e7c..2a4825545 100644
--- a/c/tests/engine_test.cpp
+++ b/c/tests/engine_test.cpp
@@ -22,6 +22,7 @@
#include "./pn_test.hpp"
#include <proton/engine.h>
+#include <cstring>
using namespace pn_test;
@@ -366,3 +367,292 @@ TEST_CASE("link_properties)") {
pn_transport_free(t2);
pn_connection_free(c2);
}
+
+static ssize_t link_send(pn_link_t *s, size_t n) {
+ char buf[5120];
+ if (n > 5120) return PN_ARG_ERR;
+ memset(buf, 'x', n);
+ return pn_link_send(s, buf, n);
+}
+
+static ssize_t link_recv(pn_link_t *r, size_t n) {
+ char buf[5120];
+ if (n > 5120) return PN_ARG_ERR;
+ return pn_link_recv(r, buf, n);
+}
+
+TEST_CASE("session_capacity") {
+ pn_connection_t *c1 = pn_connection();
+ pn_transport_t *t1 = pn_transport();
+ pn_transport_bind(t1, c1);
+
+ pn_connection_t *c2 = pn_connection();
+ pn_transport_t *t2 = pn_transport();
+ pn_transport_set_server(t2);
+ // Use 1K max frame size for test.
+ pn_transport_set_max_frame(t2, 1024);
+ pn_transport_bind(t2, c2);
+
+ pn_connection_open(c1);
+ pn_connection_open(c2);
+
+ pn_session_t *s1 = pn_session(c1);
+ REQUIRE(pn_session_get_incoming_capacity(s1) == 0);
+ pn_session_open(s1);
+
+ pn_link_t *tx = pn_sender(s1, "tx");
+ pn_link_open(tx);
+
+ while (pump(t1, t2)) {
+ process_endpoints(c1);
+ process_endpoints(c2);
+ }
+
+ // session and link should be up, c2 should have a receiver link:
+ REQUIRE(pn_link_state(tx) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+ pn_session_t *s2 = pn_session_head(c2, (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+ pn_link_t *rx = pn_link_head(c2, (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+ REQUIRE(s2 != NULL);
+ REQUIRE(rx != NULL);
+
+ REQUIRE(pn_session_get_incoming_capacity(s2) == 0);
+
+ // Don't count partial max frame.
+ pn_session_set_incoming_capacity(s2, (1024 * 4) + 512); // 4.5 max frame
+ pn_link_flow(rx, 1);
+ while (pump(t1, t2));
+ REQUIRE(pn_session_remote_incoming_window(s1) == 4);
+ REQUIRE(pn_link_credit(tx) > 0);
+
+ // Send frames and check window.
+
+ // This is complicated by messy accounting: max_frame_size is a proxy for
frames buffered on the
+ // receiver side, but payload per transfer frame is strictly less than max
frame size due to
+ // frame headers. For this test 997 bytes of payload fits in a 1024 byte
transfer frame.
+ // Senders and receivers count/update frames a bit differently.
+
+ size_t payloadsz = 997;
+ size_t onefrm = 1 * payloadsz;
+ size_t fourfrm = 4 * payloadsz;
+ size_t fivefrm = 5 * payloadsz;
+
+ pn_delivery_t *d1 = pn_delivery(tx, pn_dtag("tag-1", 6));
+ REQUIRE(link_send(tx, fivefrm) == (ssize_t) fivefrm);
+ while (pump(t1, t2));
+ // Expect 4 frames sent and 1 remaining, window 0
+ pn_delivery_t *d2 = pn_link_current(rx);
+ REQUIRE(d2);
+ REQUIRE(pn_delivery_pending(d2) == fourfrm);
+ REQUIRE(pn_delivery_partial(d2));
+ REQUIRE(pn_delivery_pending(d1) == onefrm);
+ REQUIRE(pn_session_remote_incoming_window(s1) == 0);
+
+ // Extract 3 frames. tx can send remaining bytes.
+ REQUIRE(link_recv(rx, 3072) == 3072);
+ while (pump(t1, t2));
+ // Window should be 2
+ REQUIRE(pn_delivery_pending(d1) == 0);
+ int remaining = pn_delivery_pending(d2);
+ REQUIRE(link_recv(rx, 5120) == remaining);
+ while (pump(t1, t2));
+
+ pn_transport_unbind(t1);
+ pn_transport_free(t1);
+ pn_connection_free(c1);
+
+ pn_transport_unbind(t2);
+ pn_transport_free(t2);
+ pn_connection_free(c2);
+}
+
+TEST_CASE("session_window") {
+ // 1 = client/sender, 2=server/receiver
+ // "a" = default lwm, "b" = user specified lwm
+ pn_connection_t *c1 = pn_connection();
+ pn_transport_t *t1 = pn_transport();
+ pn_transport_bind(t1, c1);
+
+ pn_connection_t *c2 = pn_connection();
+ pn_transport_t *t2 = pn_transport();
+ pn_transport_set_server(t2);
+ // Use 1K max frame size for test.
+ pn_transport_set_max_frame(t2, 1024);
+ pn_transport_bind(t2, c2);
+ pn_connection_open(c1);
+ pn_connection_open(c2);
+
+ // s0 not used for transfers, only for non-runtime checks.
+ pn_session_t *s0 = pn_session(c1);
+ REQUIRE(pn_session_incoming_window(s0) == 0);
+ REQUIRE(pn_session_incoming_window_lwm(s0) == 0);
+ // Incoming window arg 0
+ REQUIRE(pn_session_set_incoming_window_and_lwm(s0, 0, 0) == PN_ARG_ERR);
+ // lwm > incoming window
+ REQUIRE(pn_session_set_incoming_window_and_lwm(s0, 5, 6) == PN_ARG_ERR);
+ REQUIRE(pn_session_set_incoming_window_and_lwm(s0, 6, 5) == 0);
+ pn_session_open(s0);
+ // Check can't change after open
+ REQUIRE(pn_session_set_incoming_window_and_lwm(s0, 7, 3) == PN_STATE_ERR);
+ REQUIRE(pn_session_incoming_window(s0) == 6);
+ REQUIRE(pn_session_incoming_window_lwm(s0) == 5);
+
+ // Set up sessions for transfers
+ pn_session_t *s2a = pn_session(c2);
+ pn_session_t *s2b = pn_session(c2);
+ // Test relies on knowing implemented default lwm is ((max_incoming_window +
1) / 2)
+ REQUIRE(pn_session_set_incoming_window_and_lwm(s2a, 4, 0) == 0); // lwm will
be 2
+ REQUIRE(pn_session_set_incoming_window_and_lwm(s2b, 4, 3) == 0);
+ pn_link_t *rxa = pn_receiver(s2a, "linka");
+ pn_link_t *rxb = pn_receiver(s2b, "linkb");
+ pn_session_open(s2a);
+ pn_session_open(s2b);
+ pn_link_open(rxa);
+ pn_link_open(rxb);
+ pn_link_flow(rxa, 1);
+ pn_link_flow(rxb, 1);
+
+
+ while (pump(t1, t2)) {
+ process_endpoints(c1);
+ process_endpoints(c2);
+ }
+
+ // sessions and links should be up, c2 should have two receiver links
+ REQUIRE(pn_link_state(rxa) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+ REQUIRE(pn_link_state(rxb) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+ pn_link_t *txa = pn_link_head(c1, (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+ REQUIRE(strcmp("linka", pn_link_name(txa)) == 0);
+ pn_link_t *txb = pn_link_next(txa, (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+ REQUIRE(strcmp("linkb", pn_link_name(txb)) == 0);
+ pn_session_t *s1a = pn_link_session(txa);
+ pn_session_t *s1b = pn_link_session(txb);
+ REQUIRE(pn_session_remote_incoming_window(s1a) == 4);
+ REQUIRE(pn_session_remote_incoming_window(s1b) == 4);
+
+ // Send frames and check window.
+
+ // This is complicated by messy accounting: max_frame_size is a proxy for
frames buffered on the
+ // receiver side, but payload per transfer frame is strictly less than max
frame size due to
+ // frame headers. For this test 997 bytes of payload fits in a 1024 byte
transfer frame.
+ // Senders and receivers count/update frames a bit differently.
+
+ size_t payloadsz = 997;
+ size_t onefrm = 1 * payloadsz;
+ size_t fourfrm = 4 * payloadsz;
+ size_t fivefrm = 5 * payloadsz;
+
+ REQUIRE(pn_link_credit(txa) > 0);
+ REQUIRE(pn_link_credit(txb) > 0);
+ pn_delivery_t *dta1 = pn_delivery(txa, pn_dtag("dt-a1", 6));
+ pn_delivery_t *dtb1 = pn_delivery(txb, pn_dtag("dt-b1", 6));
+ REQUIRE(link_send(txa, fivefrm) == (ssize_t) fivefrm);
+ REQUIRE(link_send(txb, fivefrm) == (ssize_t) fivefrm);
+ while (pump(t1, t2));
+
+ // Expect 4 frames sent and 1 remaining, window 0
+ // linka
+ pn_delivery_t *dra1 = pn_link_current(rxa);
+ REQUIRE(dra1);
+ REQUIRE(pn_delivery_pending(dra1) == fourfrm);
+ REQUIRE(pn_delivery_partial(dra1));
+ REQUIRE(pn_delivery_pending(dta1) == onefrm);
+ REQUIRE(pn_session_remote_incoming_window(s1a) == 0);
+ // linkb
+ pn_delivery_t *drb1 = pn_link_current(rxb);
+ REQUIRE(drb1);
+ REQUIRE(pn_delivery_pending(drb1) == fourfrm);
+ REQUIRE(pn_delivery_partial(drb1));
+ REQUIRE(pn_delivery_pending(dtb1) == onefrm);
+ REQUIRE(pn_session_remote_incoming_window(s1b) == 0);
+
+ // Extract 3 frames, tx can send remaining bytes in one frame.
+ REQUIRE(link_recv(rxa, 3072) == 3072);
+ REQUIRE(link_recv(rxb, 3072) == 3072);
+ while (pump(t1, t2));
+ REQUIRE(pn_delivery_pending(dta1) == 0);
+ REQUIRE(pn_delivery_pending(dtb1) == 0);
+ // Window should be 2 as seen by sender
+ REQUIRE(pn_session_remote_incoming_window(s1a) == 2);
+ REQUIRE(pn_session_remote_incoming_window(s1b) == 2);
+
+ // Drain receivers. "b" is below lwm so peer gets update. Opposite for "a".
+ int remaining = pn_delivery_pending(dra1);
+ REQUIRE(link_recv(rxa, 5120) == remaining);
+ remaining = pn_delivery_pending(drb1);
+ REQUIRE(link_recv(rxb, 5120) == remaining);
+ while (pump(t1, t2));
+ REQUIRE(pn_session_remote_incoming_window(s1a) == 2);
+ REQUIRE(pn_session_remote_incoming_window(s1b) == 4);
+
+ // Send and consume one more frame. Now "a" incoming_window drops below lwm
but "b" does not.
+ REQUIRE(link_send(txa, onefrm) == (ssize_t) onefrm);
+ REQUIRE(link_send(txb, onefrm) == (ssize_t) onefrm);
+ REQUIRE(xfer(t1,t2) > 0);
+ REQUIRE(pn_session_remote_incoming_window(s1a) == 1);
+ REQUIRE(pn_session_remote_incoming_window(s1b) == 3);
+ remaining = pn_delivery_pending(dra1);
+ REQUIRE(link_recv(rxa, 5120) == remaining);
+ remaining = pn_delivery_pending(drb1);
+ REQUIRE(link_recv(rxb, 5120) == remaining);
+ while (pump(t1, t2));
+ REQUIRE(pn_session_remote_incoming_window(s1a) == 4);
+ REQUIRE(pn_session_remote_incoming_window(s1b) == 3);
+
+ pn_transport_unbind(t1);
+ pn_transport_free(t1);
+ pn_connection_free(c1);
+
+ pn_transport_unbind(t2);
+ pn_transport_free(t2);
+ pn_connection_free(c2);
+}
+
+TEST_CASE("max_frame") {
+ const uint32_t amqp_min_max_frame_size = 512;
+ pn_connection_t *c1 = pn_connection();
+ pn_transport_t *t1 = pn_transport();
+ pn_transport_bind(t1, c1);
+
+ pn_connection_t *c2 = pn_connection();
+ pn_transport_t *t2 = pn_transport();
+ pn_transport_set_server(t2);
+
+ // Can set to zero, i.e. no max frame
+ pn_transport_set_max_frame(t2, 0);
+ REQUIRE(pn_transport_get_max_frame(t2) == 0);
+ // Restricted to AMQP minimum.
+ pn_transport_set_max_frame(t2, 1);
+ REQUIRE(pn_transport_get_max_frame(t2) == amqp_min_max_frame_size);
+ // Otherwise OK
+ pn_transport_set_max_frame(t2, amqp_min_max_frame_size + 1);
+ REQUIRE(pn_transport_get_max_frame(t2) == amqp_min_max_frame_size + 1);
+ pn_transport_set_max_frame(t2, UINT32_MAX);
+ REQUIRE(pn_transport_get_max_frame(t2) == UINT32_MAX);
+
+ // Can still change post bind
+ pn_transport_bind(t2, c2);
+ pn_transport_set_max_frame(t2, 4096);
+ REQUIRE(pn_transport_get_max_frame(t2) == 4096);
+
+ pn_connection_open(c1);
+ pn_connection_open(c2);
+ while (pump(t1, t2)) {
+ process_endpoints(c1);
+ process_endpoints(c2);
+ }
+ REQUIRE(pn_connection_state(c2) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+
+ // No change allowed after peers have negotiated OPEN frame.
+ pn_transport_set_max_frame(t2, 4097); // Should be silently ignored.
+ REQUIRE(pn_transport_get_max_frame(t2) == 4096);
+ pn_transport_set_max_frame(t2, 0); // Can't turn off either.
+ REQUIRE(pn_transport_get_max_frame(t2) == 4096);
+
+ pn_transport_unbind(t1);
+ pn_transport_free(t1);
+ pn_connection_free(c1);
+
+ pn_transport_unbind(t2);
+ pn_transport_free(t2);
+ pn_connection_free(c2);
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]