This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/main by this push:
     new 60ab050bd PROTON-2790: finer grained session flow control
60ab050bd is described below

commit 60ab050bd4da40fd845a0a329bb134bdb4e3903a
Author: Clifford Jansen <[email protected]>
AuthorDate: Tue Oct 8 17:22:34 2024 -0700

    PROTON-2790: finer grained session flow control
---
 c/docs/buffering.md                |   2 +-
 c/include/proton/session.h         |  82 +++++++++++
 c/include/proton/transport.h       |   8 +-
 c/include/proton/types.h           |   7 +
 c/src/core/engine-internal.h       |   6 +
 c/src/core/engine.c                |  86 ++++++++++-
 c/src/core/transport.c             |  75 +++++++---
 c/tests/connection_driver_test.cpp |   7 +-
 c/tests/engine_test.cpp            | 290 +++++++++++++++++++++++++++++++++++++
 9 files changed, 533 insertions(+), 30 deletions(-)

diff --git a/c/docs/buffering.md b/c/docs/buffering.md
index 71dafbf7b..32567a2f0 100644
--- a/c/docs/buffering.md
+++ b/c/docs/buffering.md
@@ -16,7 +16,7 @@ gets a @ref PN_LINK_FLOW event.
 
 The AMQP protocol allows peers to exchange session limits so they can predict
 their buffering requirements for incoming data (
-`pn_session_set_incoming_capacity()` and
+`pn_session_set_incoming_incoming_window_and_lwm()` and
 `pn_session_set_outgoing_window()`). Proton will not exceed those limits when
 sending to or receiving from the peer. However proton does *not* limit the
 amount of data buffered in local memory at the request of the application.  It
diff --git a/c/include/proton/session.h b/c/include/proton/session.h
index e09d41113..ac30ccd2b 100644
--- a/c/include/proton/session.h
+++ b/c/include/proton/session.h
@@ -194,6 +194,8 @@ PN_EXTERN void pn_session_open(pn_session_t *session);
 PN_EXTERN void pn_session_close(pn_session_t *session);
 
 /**
+ * **Deprecated** - Use ::pn_session_incoming_window().
+ *
  * Get the incoming capacity of the session measured in bytes.
  *
  * The incoming capacity of a session determines how much incoming
@@ -205,6 +207,8 @@ PN_EXTERN void pn_session_close(pn_session_t *session);
 PN_EXTERN size_t pn_session_get_incoming_capacity(pn_session_t *session);
 
 /**
+ * **Deprecated** - Use ::pn_session_set_incoming_window_and_lwm().
+ *
  * Set the incoming capacity for a session object.
  *
  * The incoming capacity of a session determines how much incoming message
@@ -223,6 +227,84 @@ PN_EXTERN size_t 
pn_session_get_incoming_capacity(pn_session_t *session);
  */
 PN_EXTERN void pn_session_set_incoming_capacity(pn_session_t *session, size_t 
capacity);
 
+/**
+ * Get the maximum incoming window window for a session object.
+ *
+ * The maximum incoming window can be set by 
::pn_session_set_incoming_window_and_lwm.
+ *
+ * @param[in] session the session object
+ * @return the maximum size of the incoming window or 0 if not set.
+ **/
+PN_EXTERN pn_frame_count_t pn_session_incoming_window(pn_session_t *session);
+
+/**
+ * Get the low water mark for the session incoming window.
+ *
+ * The low water mark governs how frequently the session updates the remote
+ * peer with changes to the incoming window.
+ *
+ * A value of zero indicates that Proton will choose a default strategy for
+ * updating the peer.
+ *
+ * The low water mark can be set by ::pn_session_set_incoming_window_and_lwm.
+ *
+ * @param[in] session the session object
+ * @return the low water mark of incoming window.
+ **/
+PN_EXTERN pn_frame_count_t pn_session_incoming_window_lwm(pn_session_t 
*session);
+
+/**
+ * Set the maximum incoming window and low water mark for a session object.
+ *
+ * The session incoming window is a count of the number of AMQP transfer frames
+ * that can be accepted and buffered locally by the session object until
+ * processed by the application (i.e. consumed by ::pn_link_recv or dropped by
+ * ::pn_link_advance).  The maximum bytes buffered by the session will never
+ * exceed (max_incoming_window * max_frame_size). The incoming window frame 
count
+ * decreases 1-1 with incoming AMQP transfer frames.  Whenever the application
+ * processes the buffered incoming bytes, the incoming window increases to the
+ * largest frame count that can be used by the peer without causing the local
+ * buffered bytes to exceed the maximum stated above.
+ *
+ * The session will defer updating the peer with a changed incoming window 
until
+ * it drops below the low water mark (lwm).  Too many updates can delay
+ * other traffic on the connection without providing improved performance on 
the
+ * session.  Too few can leave a remote sender frequently unable to send due
+ * to a closed window.  The best balance is application specific.  Note that 
the
+ * session incoming window is always updated along with the link credit on any
+ * of its child links, so the frequency of link credit updates is also a
+ * consideration when choosing a low water mark.
+ *
+ * The low water mark must be less than or equal to the incoming window.  If
+ * set to zero, Proton will choose a default strategy for updating the
+ * incoming window.
+ *
+ * This call is only valid before the call to ::pn_session_open on the session.
+ * Subsequently, the settings are fixed for the life of the session and only
+ * have effect if a max frame size is also set on the session's connection.
+ *
+ * @param[in] session the session object
+ * @param[in] window the maximum incoming window buffered by the session
+ * @param[in] lwm the low water mark (or 0 for default window updating)
+ *
+ * @return 0 on success, PN_ARG_ERR if window is zero or lwm is greater than
+ * window, or PN_STATE_ERR if the session is already open.
+ */
+PN_EXTERN int pn_session_set_incoming_window_and_lwm(pn_session_t *session, 
pn_frame_count_t window, pn_frame_count_t lwm);
+
+/**
+ * Get the remote view of the incoming window for the session.
+ *
+ * This evaluates to the most recent incoming window value communicated by the
+ * peer minus any subsequent transfer frames for the session that have been
+ * sent.  It does not include transfer frames that may be created in future
+ * for locally buffered content tracked by @ref pn_session_outgoing_bytes.
+ *
+ * @param[in] session the session object
+ * @return the remote incoming window
+ */
+PN_EXTERN pn_frame_count_t pn_session_remote_incoming_window(pn_session_t 
*session);
+
 /**
  * Get the outgoing window for a session object.
  *
diff --git a/c/include/proton/transport.h b/c/include/proton/transport.h
index 8b8b66403..3aea0f281 100644
--- a/c/include/proton/transport.h
+++ b/c/include/proton/transport.h
@@ -432,10 +432,16 @@ PN_EXTERN uint32_t 
pn_transport_get_max_frame(pn_transport_t *transport);
 /**
  * Set the maximum frame size of a transport.
  *
+ * The negotiated frame size cannot change over the life of the transport.  
After
+ * the transport has started sending AMQP frames to the peer, this function 
call
+ * has no effect.  Typically, the maximum frame size is set when the transport 
is
+ * created.
+ *
  * @param[in] transport a transport object
  * @param[in] size the maximum frame size for the transport object
  *
- * @internal XXX Deprecate when moved to connection
+ * @internal XXX Deprecate when moved to connection, note size can change on
+ * reconnect with new transport, consider status return on new API.
  */
 PN_EXTERN void pn_transport_set_max_frame(pn_transport_t *transport, uint32_t 
size);
 
diff --git a/c/include/proton/types.h b/c/include/proton/types.h
index 2fed2b74d..b0b8faae2 100644
--- a/c/include/proton/types.h
+++ b/c/include/proton/types.h
@@ -141,6 +141,13 @@ extern "C" {
  */
 typedef uint32_t  pn_sequence_t;
 
+/**
+ * A count or limit of AMQP transfer frames.
+ *
+ * @ingroup api_types
+ */
+typedef uint32_t pn_frame_count_t;
+
 /**
  * A span of time in milliseconds.
  *
diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h
index 2e836b8cf..62aa12434 100644
--- a/c/src/core/engine-internal.h
+++ b/c/src/core/engine-internal.h
@@ -272,6 +272,11 @@ struct pn_session_t {
   pn_sequence_t incoming_deliveries;
   pn_sequence_t outgoing_deliveries;
   pn_sequence_t outgoing_window;
+  pn_frame_count_t incoming_window_lwm;
+  pn_frame_count_t max_incoming_window;
+  bool check_flow;
+  bool need_flow;
+  bool lwm_default;
 };
 
 struct pn_terminus_t {
@@ -395,6 +400,7 @@ void pn_ep_incref(pn_endpoint_t *endpoint);
 void pn_ep_decref(pn_endpoint_t *endpoint);
 
 ssize_t pni_transport_grow_capacity(pn_transport_t *transport, size_t n);
+  void pni_session_update_incoming_lwm(pn_session_t *ssn);
 
 #if __cplusplus
 }
diff --git a/c/src/core/engine.c b/c/src/core/engine.c
index 3d3de7ade..4ad8c4b05 100644
--- a/c/src/core/engine.c
+++ b/c/src/core/engine.c
@@ -1049,6 +1049,10 @@ pn_session_t *pn_session(pn_connection_t *conn)
   ssn->outgoing_deliveries = 0;
   ssn->outgoing_window = AMQP_MAX_WINDOW_SIZE;
   ssn->local_handle_max = PN_IMPL_HANDLE_MAX;
+  ssn->incoming_window_lwm = 1;
+  ssn->check_flow = false;
+  ssn->need_flow = false;
+  ssn->lwm_default = true;
 
   // begin transport state
   memset(&ssn->state, 0, sizeof(ssn->state));
@@ -1095,11 +1099,74 @@ size_t pn_session_get_incoming_capacity(pn_session_t 
*ssn)
   return ssn->incoming_capacity;
 }
 
+// Update required when (re)set by user or when session started (proxy: BEGIN 
frame).  No
+// session flow control actually means flow control with huge window, so set 
lwm to 1.  There is
+// low probability of a stall.  Any link credit flow frame will update session 
credit too.
+void pni_session_update_incoming_lwm(pn_session_t *ssn) {
+  if (ssn->incoming_capacity) {
+    // Old API.
+    if (!ssn->connection->transport)
+      return; // Defer until called again from BEGIN frame setup with max 
frame known.
+    if (ssn->connection->transport->local_max_frame) {
+      ssn->incoming_window_lwm = (ssn->incoming_capacity / 
ssn->connection->transport->local_max_frame) / 2;
+      if (!ssn->incoming_window_lwm)
+        ssn->incoming_window_lwm = 1; // Zero may hang.
+    } else {
+      ssn->incoming_window_lwm = 1;
+    }
+  } else if (ssn->max_incoming_window) {
+    // New API.
+    // Only need to deal with default.  Called whensending BEGIN frame.
+    if (ssn->connection->transport && 
ssn->connection->transport->local_max_frame && ssn->lwm_default) {
+      ssn->incoming_window_lwm = (ssn->max_incoming_window + 1) / 2;
+    }
+  } else {
+    ssn->incoming_window_lwm = 1;
+  }
+  assert(ssn->incoming_window_lwm != 0);  // 0 allows session flow to hang
+}
+
 void pn_session_set_incoming_capacity(pn_session_t *ssn, size_t capacity)
 {
   assert(ssn);
-  // XXX: should this trigger a flow?
   ssn->incoming_capacity = capacity;
+  ssn->max_incoming_window = 0;
+  ssn->incoming_window_lwm = 1;
+  ssn->lwm_default = true;
+  if (ssn->connection->transport) {
+    ssn->check_flow = true;
+    ssn->need_flow = true;
+    pn_modified(ssn->connection, &ssn->endpoint, false);
+  }
+  pni_session_update_incoming_lwm(ssn);
+  // If capacity invalid, failure occurs when transport calculates value of 
incoming window.
+}
+
+int pn_session_set_incoming_window_and_lwm(pn_session_t *ssn, pn_frame_count_t 
window, pn_frame_count_t lwm) {
+  assert(ssn);
+  if (!window || (lwm && lwm > window))
+    return PN_ARG_ERR;
+  // Settings fixed after session open for simplicity.  AMPQ actually allows 
dynamic change with risk
+  // of overflow if window reduced while transfers in flight.
+  if (ssn->endpoint.state & PN_LOCAL_ACTIVE)
+    return PN_STATE_ERR;
+  ssn->incoming_capacity = 0;
+  ssn->max_incoming_window = window;
+  ssn->lwm_default = (lwm == 0);
+  ssn->incoming_window_lwm = lwm;
+  return 0;
+}
+
+pn_frame_count_t pn_session_incoming_window(pn_session_t *ssn) {
+  return ssn->max_incoming_window;
+}
+
+pn_frame_count_t pn_session_incoming_window_lwm(pn_session_t *ssn) {
+  return (!ssn->max_incoming_window || ssn->lwm_default) ? 0 : 
ssn->incoming_window_lwm;
+}
+
+pn_frame_count_t pn_session_remote_incoming_window(pn_session_t *ssn) {
+  return ssn->state.remote_incoming_window;
 }
 
 size_t pn_session_get_outgoing_window(pn_session_t *ssn)
@@ -1873,11 +1940,16 @@ static void pni_advance_receiver(pn_link_t *link)
   link->session->incoming_deliveries--;
 
   pn_delivery_t *current = link->current;
-  link->session->incoming_bytes -= pn_buffer_size(current->bytes);
+  size_t drop_count = pn_buffer_size(current->bytes);
   pn_buffer_clear(current->bytes);
 
-  if (!link->session->state.incoming_window) {
-    pni_add_tpwork(current);
+  if (drop_count) {
+    pn_session_t *ssn = link->session;
+    ssn->incoming_bytes -= drop_count;
+    if (!ssn->check_flow && ssn->state.incoming_window < 
ssn->incoming_window_lwm) {
+      ssn->check_flow = true;
+      pni_add_tpwork(current);
+    }
   }
 
   link->current = link->current->unsettled_next;
@@ -2025,8 +2097,10 @@ ssize_t pn_link_recv(pn_link_t *receiver, char *bytes, 
size_t n)
   size_t size = pn_buffer_get(delivery->bytes, 0, n, bytes);
   pn_buffer_trim(delivery->bytes, size, 0);
   if (size) {
-    receiver->session->incoming_bytes -= size;
-    if (!receiver->session->state.incoming_window) {
+    pn_session_t *ssn = receiver->session;
+    ssn->incoming_bytes -= size;
+    if (!ssn->check_flow && ssn->state.incoming_window < 
ssn->incoming_window_lwm) {
+      ssn->check_flow = true;
       pni_add_tpwork(delivery);
     }
     return size;
diff --git a/c/src/core/transport.c b/c/src/core/transport.c
index 89a50f70b..d89625797 100644
--- a/c/src/core/transport.c
+++ b/c/src/core/transport.c
@@ -1101,6 +1101,7 @@ int pn_do_begin(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel,
   } else {
     ssn = pn_session(transport->connection);
   }
+  ssn->state.remote_incoming_window = incoming_window;
   ssn->state.incoming_transfer_count = next;
   if (handle_max_q) {
     ssn->state.remote_handle_max = handle_max;
@@ -1459,9 +1460,11 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t 
frame_type, uint16_t chann
   ssn->state.incoming_transfer_count++;
   ssn->state.incoming_window--;
 
-  // XXX: need better policy for when to refresh window
-  if (!ssn->state.incoming_window && (int32_t) link->state.local_handle >= 0) {
-    pni_post_flow(transport, ssn, link);
+  if ((int32_t) link->state.local_handle >= 0 && ssn->state.incoming_window < 
ssn->incoming_window_lwm) {
+    if (!ssn->check_flow) {
+      ssn->check_flow = true;
+      pn_modified(ssn->connection, &link->endpoint, false);
+    }
   }
 
   return 0;
@@ -1873,23 +1876,35 @@ static size_t pni_session_outgoing_window(pn_session_t 
*ssn)
   return ssn->outgoing_window;
 }
 
-static size_t pni_session_incoming_window(pn_session_t *ssn)
+// Calculate the available incomming window
+static pn_frame_count_t pni_session_incoming_window(pn_session_t *ssn)
 {
   pn_transport_t *t = ssn->connection->transport;
   uint32_t size = t->local_max_frame;
   size_t capacity = ssn->incoming_capacity;
-  if (!size || !capacity) {     /* session flow control is not enabled */
+  if (!size || (!capacity && !ssn->max_incoming_window)) {     /* session flow 
control is not enabled */
     return AMQP_MAX_WINDOW_SIZE;
-  } else if (capacity >= size) { /* precondition */
-    return (capacity - ssn->incoming_bytes) / size;
-  } else {                     /* error: we will never have a non-zero window 
*/
-    pn_condition_format(
-      pn_transport_condition(t),
-      "amqp:internal-error",
-      "session capacity %zu is less than frame size %" PRIu32,
-      capacity, size);
-    pn_transport_close_tail(t);
-    return 0;
+  }
+  // Calculate depending on whether application specified capacity or a frame 
count.
+  assert(capacity || ssn->max_incoming_window);
+  if (capacity) {
+    // Old API
+    if (capacity >= size) { /* precondition */
+      return capacity > ssn->incoming_bytes ? (pn_frame_count_t) (capacity - 
ssn->incoming_bytes) / size : 0;
+    } else {                     /* error: we will never have a non-zero 
window */
+      pn_condition_format(
+                          pn_transport_condition(t),
+                          "amqp:internal-error",
+                          "session capacity %zu is less than frame size %" 
PRIu32,
+                          capacity, size);
+      pn_transport_close_tail(t);
+      return 0;
+    }
+  } else {
+    // New API
+    // Find smallest number of frames that could have sent the buffered bytes.
+    pn_frame_count_t nominal_fc = (ssn->incoming_bytes + size - 1) / size;
+    return nominal_fc >= ssn->max_incoming_window ? 0 : 
ssn->max_incoming_window - nominal_fc;
   }
 }
 
@@ -1920,6 +1935,8 @@ static int pni_process_ssn_setup(pn_transport_t 
*transport, pn_endpoint_t *endpo
         pn_logger_logf(&transport->logger, PN_SUBSYSTEM_AMQP, 
PN_LEVEL_WARNING, "unable to find an open available channel within limit of 
%u", transport->channel_max );
         return PN_ERR;
       }
+      if (ssn->incoming_capacity || ssn->max_incoming_window)
+        pni_session_update_incoming_lwm(ssn);
       state->incoming_window = pni_session_incoming_window(ssn);
       state->outgoing_window = pni_session_outgoing_window(ssn);
       /* "DL[?HIIII]" */
@@ -2055,10 +2072,12 @@ static int pni_process_link_setup(pn_transport_t 
*transport, pn_endpoint_t *endp
 
 static int pni_post_flow(pn_transport_t *transport, pn_session_t *ssn, 
pn_link_t *link)
 {
+  ssn->check_flow = false;
+  ssn->need_flow = false;
   ssn->state.incoming_window = pni_session_incoming_window(ssn);
   ssn->state.outgoing_window = pni_session_outgoing_window(ssn);
   bool linkq = (bool) link;
-  pn_link_state_t *state = &link->state;
+  pn_link_state_t *state = linkq ? &link->state : NULL;
   /* "DL[?IIII?I?I?In?o]" */
   pn_bytes_t buf = 
pn_amqp_encode_DLEQIIIIQIQIQInQoe(&transport->scratch_space, FLOW,
                        (int16_t) ssn->state.remote_channel >= 0, 
ssn->state.incoming_transfer_count,
@@ -2072,6 +2091,17 @@ static int pni_post_flow(pn_transport_t *transport, 
pn_session_t *ssn, pn_link_t
   return pn_framing_send_amqp(transport, ssn->state.local_channel, buf);
 }
 
+static inline bool pni_session_need_flow(pn_session_t *ssn) {
+  if (ssn->need_flow)
+    return true;
+  if (ssn->check_flow && ssn->state.incoming_window < ssn->incoming_window_lwm 
&&
+      pni_session_incoming_window(ssn) > ssn->state.incoming_window)
+    return true;
+
+  ssn->check_flow = false;
+  return false;
+}
+
 static int pni_process_flow_receiver(pn_transport_t *transport, pn_endpoint_t 
*endpoint)
 {
   if (endpoint->type == RECEIVER && endpoint->state & PN_LOCAL_ACTIVE)
@@ -2081,7 +2111,7 @@ static int pni_process_flow_receiver(pn_transport_t 
*transport, pn_endpoint_t *e
     pn_link_state_t *state = &rcv->state;
     if ((int16_t) ssn->state.local_channel >= 0 &&
         (int32_t) state->local_handle >= 0 &&
-        ((rcv->drain || state->link_credit != rcv->credit - rcv->queued) || 
!ssn->state.incoming_window)) {
+        ((rcv->drain || state->link_credit != rcv->credit - rcv->queued) || 
pni_session_need_flow(ssn))) {
       state->link_credit = rcv->credit - rcv->queued;
       return pni_post_flow(transport, ssn, rcv);
     }
@@ -2244,8 +2274,7 @@ static int pni_process_tpwork_receiver(pn_transport_t 
*transport, pn_delivery_t
     if (err) return err;
   }
 
-  // XXX: need to centralize this policy and improve it
-  if (!ssn->state.incoming_window) {
+  if (pni_session_need_flow(ssn)) {
     int err = pni_post_flow(transport, ssn, link);
     if (err) return err;
   }
@@ -2299,6 +2328,10 @@ static int pni_process_flush_disp(pn_transport_t 
*transport, pn_endpoint_t *endp
     {
       int err = pni_flush_disp(transport, session);
       if (err) return err;
+      if (session->need_flow) {
+        err = pni_post_flow(transport, session, NULL);
+        if (err) return err;
+      }
     }
   }
 
@@ -2832,6 +2865,10 @@ uint32_t pn_transport_get_max_frame(pn_transport_t 
*transport)
 
 void pn_transport_set_max_frame(pn_transport_t *transport, uint32_t size)
 {
+  if (transport->open_sent) {
+    pn_logger_logf(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_WARNING, 
"Cannot change local max-frame after OPEN frame sent.");
+    return;
+  }
   // if size == 0, no advertised limit to input frame size.
   if (size && size < AMQP_MIN_MAX_FRAME_SIZE)
     size = AMQP_MIN_MAX_FRAME_SIZE;
diff --git a/c/tests/connection_driver_test.cpp 
b/c/tests/connection_driver_test.cpp
index 46dcb9c59..ff1b597c0 100644
--- a/c/tests/connection_driver_test.cpp
+++ b/c/tests/connection_driver_test.cpp
@@ -397,7 +397,8 @@ TEST_CASE("driver_message_abort_mixed") {
 static void set_capacity_and_max_frame(size_t capacity, size_t max_frame,
                                        pn_test::driver_pair &d,
                                        const char *data) {
-  pn_transport_set_max_frame(d.client.transport, max_frame);
+  if (pn_transport_get_max_frame(d.client.transport) != max_frame)
+    pn_transport_set_max_frame(d.client.transport, max_frame);
   pn_connection_open(d.client.connection);
   pn_session_t *ssn = pn_session(d.client.connection);
   pn_session_set_incoming_capacity(ssn, capacity);
@@ -444,11 +445,11 @@ TEST_CASE("driver_session_flow_control") {
   }
 
   /* Capacity smaller than frame size is an error */
-  set_capacity_and_max_frame(1234, 12345, d, "foo");
+  set_capacity_and_max_frame(1233, 1234, d, "foo");
   CHECK_THAT(
       *client.last_condition,
       cond_matches("amqp:internal-error",
-                   "session capacity 1234 is less than frame size 12345"));
+                   "session capacity 1233 is less than frame size 1234"));
   free(buf.start);
 }
 
diff --git a/c/tests/engine_test.cpp b/c/tests/engine_test.cpp
index f8b174e7c..2a4825545 100644
--- a/c/tests/engine_test.cpp
+++ b/c/tests/engine_test.cpp
@@ -22,6 +22,7 @@
 #include "./pn_test.hpp"
 
 #include <proton/engine.h>
+#include <cstring>
 
 using namespace pn_test;
 
@@ -366,3 +367,292 @@ TEST_CASE("link_properties)") {
   pn_transport_free(t2);
   pn_connection_free(c2);
 }
+
+static ssize_t link_send(pn_link_t *s, size_t n) {
+  char buf[5120];
+  if (n > 5120) return PN_ARG_ERR;
+  memset(buf, 'x', n);
+  return pn_link_send(s, buf, n);
+}
+
+static ssize_t link_recv(pn_link_t *r, size_t n) {
+  char buf[5120];
+  if (n > 5120) return PN_ARG_ERR;
+  return pn_link_recv(r, buf, n);
+}
+
+TEST_CASE("session_capacity") {
+  pn_connection_t *c1 = pn_connection();
+  pn_transport_t *t1 = pn_transport();
+  pn_transport_bind(t1, c1);
+
+  pn_connection_t *c2 = pn_connection();
+  pn_transport_t *t2 = pn_transport();
+  pn_transport_set_server(t2);
+  // Use 1K max frame size for test.
+  pn_transport_set_max_frame(t2, 1024);
+  pn_transport_bind(t2, c2);
+
+  pn_connection_open(c1);
+  pn_connection_open(c2);
+
+  pn_session_t *s1 = pn_session(c1);
+  REQUIRE(pn_session_get_incoming_capacity(s1) == 0);
+  pn_session_open(s1);  
+
+  pn_link_t *tx = pn_sender(s1, "tx");
+  pn_link_open(tx);
+
+  while (pump(t1, t2)) {
+    process_endpoints(c1);
+    process_endpoints(c2);
+  }
+
+  // session and link should be up, c2 should have a receiver link:
+  REQUIRE(pn_link_state(tx) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+  pn_session_t *s2 = pn_session_head(c2, (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+  pn_link_t *rx = pn_link_head(c2, (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+  REQUIRE(s2 != NULL);
+  REQUIRE(rx != NULL);
+
+  REQUIRE(pn_session_get_incoming_capacity(s2) == 0);
+
+  // Don't count partial max frame.
+  pn_session_set_incoming_capacity(s2, (1024 * 4) + 512); // 4.5 max frame
+  pn_link_flow(rx, 1);
+  while (pump(t1, t2));
+  REQUIRE(pn_session_remote_incoming_window(s1) == 4);
+  REQUIRE(pn_link_credit(tx) > 0);
+
+  // Send frames and check window.
+
+  // This is complicated by messy accounting: max_frame_size is a proxy for 
frames buffered on the
+  // receiver side, but payload per transfer frame is strictly less than max 
frame size due to
+  // frame headers.  For this test 997 bytes of payload fits in a 1024 byte 
transfer frame.
+  // Senders and receivers count/update frames a bit differently.
+
+  size_t payloadsz = 997;
+  size_t onefrm = 1 * payloadsz;
+  size_t fourfrm = 4 * payloadsz;
+  size_t fivefrm = 5 * payloadsz;
+
+  pn_delivery_t *d1 = pn_delivery(tx, pn_dtag("tag-1", 6));
+  REQUIRE(link_send(tx, fivefrm) == (ssize_t) fivefrm);
+  while (pump(t1, t2));
+  // Expect 4 frames sent and 1 remaining, window 0
+  pn_delivery_t *d2 = pn_link_current(rx);
+  REQUIRE(d2);
+  REQUIRE(pn_delivery_pending(d2) == fourfrm);
+  REQUIRE(pn_delivery_partial(d2));
+  REQUIRE(pn_delivery_pending(d1) == onefrm);
+  REQUIRE(pn_session_remote_incoming_window(s1) == 0);
+
+  // Extract 3 frames.  tx can send remaining bytes.
+  REQUIRE(link_recv(rx, 3072) == 3072);
+  while (pump(t1, t2));
+  // Window should be 2
+  REQUIRE(pn_delivery_pending(d1) == 0);
+  int remaining = pn_delivery_pending(d2);
+  REQUIRE(link_recv(rx, 5120) == remaining);
+  while (pump(t1, t2));
+
+  pn_transport_unbind(t1);
+  pn_transport_free(t1);
+  pn_connection_free(c1);
+
+  pn_transport_unbind(t2);
+  pn_transport_free(t2);
+  pn_connection_free(c2);
+}
+
+TEST_CASE("session_window") {
+  // 1 = client/sender, 2=server/receiver
+  // "a" = default lwm, "b" = user specified lwm
+  pn_connection_t *c1 = pn_connection();
+  pn_transport_t *t1 = pn_transport();
+  pn_transport_bind(t1, c1);
+
+  pn_connection_t *c2 = pn_connection();
+  pn_transport_t *t2 = pn_transport();
+  pn_transport_set_server(t2);
+  // Use 1K max frame size for test.
+  pn_transport_set_max_frame(t2, 1024);
+  pn_transport_bind(t2, c2);
+  pn_connection_open(c1);
+  pn_connection_open(c2);
+
+  // s0 not used for transfers, only for non-runtime checks.
+  pn_session_t *s0 = pn_session(c1);
+  REQUIRE(pn_session_incoming_window(s0) == 0);
+  REQUIRE(pn_session_incoming_window_lwm(s0) == 0);
+  // Incoming window arg 0
+  REQUIRE(pn_session_set_incoming_window_and_lwm(s0, 0, 0) == PN_ARG_ERR);
+  // lwm > incoming window
+  REQUIRE(pn_session_set_incoming_window_and_lwm(s0, 5, 6) == PN_ARG_ERR);
+  REQUIRE(pn_session_set_incoming_window_and_lwm(s0, 6, 5) == 0);
+  pn_session_open(s0);
+  // Check can't change after open
+  REQUIRE(pn_session_set_incoming_window_and_lwm(s0, 7, 3) == PN_STATE_ERR);
+  REQUIRE(pn_session_incoming_window(s0) == 6);
+  REQUIRE(pn_session_incoming_window_lwm(s0) == 5);
+
+  // Set up sessions for transfers
+  pn_session_t *s2a = pn_session(c2);
+  pn_session_t *s2b = pn_session(c2);
+  // Test relies on knowing implemented default lwm is ((max_incoming_window + 
1) / 2)
+  REQUIRE(pn_session_set_incoming_window_and_lwm(s2a, 4, 0) == 0); // lwm will 
be 2
+  REQUIRE(pn_session_set_incoming_window_and_lwm(s2b, 4, 3) == 0);
+  pn_link_t *rxa = pn_receiver(s2a, "linka");
+  pn_link_t *rxb = pn_receiver(s2b, "linkb");
+  pn_session_open(s2a);
+  pn_session_open(s2b);
+  pn_link_open(rxa);
+  pn_link_open(rxb);
+  pn_link_flow(rxa, 1);
+  pn_link_flow(rxb, 1);
+
+
+  while (pump(t1, t2)) {
+    process_endpoints(c1);
+    process_endpoints(c2);
+  }
+
+  // sessions and links should be up, c2 should have two receiver links
+  REQUIRE(pn_link_state(rxa) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+  REQUIRE(pn_link_state(rxb) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+  pn_link_t *txa = pn_link_head(c1, (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+  REQUIRE(strcmp("linka", pn_link_name(txa)) == 0);
+  pn_link_t *txb = pn_link_next(txa, (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+  REQUIRE(strcmp("linkb", pn_link_name(txb)) == 0);
+  pn_session_t *s1a = pn_link_session(txa);
+  pn_session_t *s1b = pn_link_session(txb);
+  REQUIRE(pn_session_remote_incoming_window(s1a) == 4);
+  REQUIRE(pn_session_remote_incoming_window(s1b) == 4);
+
+  // Send frames and check window.
+
+  // This is complicated by messy accounting: max_frame_size is a proxy for 
frames buffered on the
+  // receiver side, but payload per transfer frame is strictly less than max 
frame size due to
+  // frame headers.  For this test 997 bytes of payload fits in a 1024 byte 
transfer frame.
+  // Senders and receivers count/update frames a bit differently.
+
+  size_t payloadsz = 997;
+  size_t onefrm = 1 * payloadsz;
+  size_t fourfrm = 4 * payloadsz;
+  size_t fivefrm = 5 * payloadsz;
+
+  REQUIRE(pn_link_credit(txa) > 0);
+  REQUIRE(pn_link_credit(txb) > 0);
+  pn_delivery_t *dta1 = pn_delivery(txa, pn_dtag("dt-a1", 6));
+  pn_delivery_t *dtb1 = pn_delivery(txb, pn_dtag("dt-b1", 6));
+  REQUIRE(link_send(txa, fivefrm) == (ssize_t) fivefrm);
+  REQUIRE(link_send(txb, fivefrm) == (ssize_t) fivefrm);
+  while (pump(t1, t2));
+
+  // Expect 4 frames sent and 1 remaining, window 0
+  // linka
+  pn_delivery_t *dra1 = pn_link_current(rxa);
+  REQUIRE(dra1);
+  REQUIRE(pn_delivery_pending(dra1) == fourfrm);
+  REQUIRE(pn_delivery_partial(dra1));
+  REQUIRE(pn_delivery_pending(dta1) == onefrm);
+  REQUIRE(pn_session_remote_incoming_window(s1a) == 0);
+  // linkb
+  pn_delivery_t *drb1 = pn_link_current(rxb);
+  REQUIRE(drb1);
+  REQUIRE(pn_delivery_pending(drb1) == fourfrm);
+  REQUIRE(pn_delivery_partial(drb1));
+  REQUIRE(pn_delivery_pending(dtb1) == onefrm);
+  REQUIRE(pn_session_remote_incoming_window(s1b) == 0);
+
+  // Extract 3 frames, tx can send remaining bytes in one frame.
+  REQUIRE(link_recv(rxa, 3072) == 3072);
+  REQUIRE(link_recv(rxb, 3072) == 3072);
+  while (pump(t1, t2));
+  REQUIRE(pn_delivery_pending(dta1) == 0);
+  REQUIRE(pn_delivery_pending(dtb1) == 0);
+  // Window should be 2 as seen by sender
+  REQUIRE(pn_session_remote_incoming_window(s1a) == 2);
+  REQUIRE(pn_session_remote_incoming_window(s1b) == 2);
+
+  // Drain receivers.  "b" is below lwm so peer gets update.  Opposite for "a".
+  int remaining = pn_delivery_pending(dra1);
+  REQUIRE(link_recv(rxa, 5120) == remaining);
+  remaining = pn_delivery_pending(drb1);
+  REQUIRE(link_recv(rxb, 5120) == remaining);
+  while (pump(t1, t2));
+  REQUIRE(pn_session_remote_incoming_window(s1a) == 2);
+  REQUIRE(pn_session_remote_incoming_window(s1b) == 4);
+
+  // Send and consume one more frame.  Now "a" incoming_window drops below lwm 
but "b" does not.
+  REQUIRE(link_send(txa, onefrm) == (ssize_t) onefrm);
+  REQUIRE(link_send(txb, onefrm) == (ssize_t) onefrm);
+  REQUIRE(xfer(t1,t2) > 0);
+  REQUIRE(pn_session_remote_incoming_window(s1a) == 1);
+  REQUIRE(pn_session_remote_incoming_window(s1b) == 3);
+  remaining = pn_delivery_pending(dra1);
+  REQUIRE(link_recv(rxa, 5120) == remaining);
+  remaining = pn_delivery_pending(drb1);
+  REQUIRE(link_recv(rxb, 5120) == remaining);
+  while (pump(t1, t2));
+  REQUIRE(pn_session_remote_incoming_window(s1a) == 4);
+  REQUIRE(pn_session_remote_incoming_window(s1b) == 3);
+
+  pn_transport_unbind(t1);
+  pn_transport_free(t1);
+  pn_connection_free(c1);
+
+  pn_transport_unbind(t2);
+  pn_transport_free(t2);
+  pn_connection_free(c2);
+}
+
+TEST_CASE("max_frame") {
+  const uint32_t amqp_min_max_frame_size = 512;
+  pn_connection_t *c1 = pn_connection();
+  pn_transport_t *t1 = pn_transport();
+  pn_transport_bind(t1, c1);
+
+  pn_connection_t *c2 = pn_connection();
+  pn_transport_t *t2 = pn_transport();
+  pn_transport_set_server(t2);
+
+  // Can set to zero, i.e. no max frame
+  pn_transport_set_max_frame(t2, 0);
+  REQUIRE(pn_transport_get_max_frame(t2) == 0);
+  // Restricted to AMQP minimum.
+  pn_transport_set_max_frame(t2, 1);
+  REQUIRE(pn_transport_get_max_frame(t2) == amqp_min_max_frame_size);
+  // Otherwise OK
+  pn_transport_set_max_frame(t2, amqp_min_max_frame_size + 1);
+  REQUIRE(pn_transport_get_max_frame(t2) == amqp_min_max_frame_size + 1);
+  pn_transport_set_max_frame(t2, UINT32_MAX);
+  REQUIRE(pn_transport_get_max_frame(t2) == UINT32_MAX);
+
+  // Can still change post bind
+  pn_transport_bind(t2, c2);
+  pn_transport_set_max_frame(t2, 4096);
+  REQUIRE(pn_transport_get_max_frame(t2) == 4096);
+
+  pn_connection_open(c1);
+  pn_connection_open(c2);
+  while (pump(t1, t2)) {
+    process_endpoints(c1);
+    process_endpoints(c2);
+  }
+  REQUIRE(pn_connection_state(c2) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+
+  // No change allowed after peers have negotiated OPEN frame.
+  pn_transport_set_max_frame(t2, 4097);  // Should be silently ignored.
+  REQUIRE(pn_transport_get_max_frame(t2) == 4096);
+  pn_transport_set_max_frame(t2, 0);     // Can't turn off either.
+  REQUIRE(pn_transport_get_max_frame(t2) == 4096);
+
+  pn_transport_unbind(t1);
+  pn_transport_free(t1);
+  pn_connection_free(c1);
+
+  pn_transport_unbind(t2);
+  pn_transport_free(t2);
+  pn_connection_free(c2);
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to