PROTON-1809: PROTON-636: Unable to receive messages when max-frame-size > 2^20

Caused when the frame size was greater than the default session-capacity so the
incoming windows is always 0.

Fixes:

1. No default session-capacity. Session flow control is enabled only if both
   session-capacity and max-frame-size are set. Neither value is deduced
   automatically.

2. Transport error if both are set and session-capacity is less than
   max-frame-size. In this case the incoming window is always 0 so
   communication is impossible.

3. Update API doc for pn_session_set_capacity

4. Add tests to verify this behavior


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e828055b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e828055b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e828055b

Branch: refs/heads/master
Commit: e828055b2a0fade8835cd597e91d0ae3c8bb3f5a
Parents: c7717a4
Author: Alan Conway <acon...@redhat.com>
Authored: Tue Apr 10 10:31:56 2018 -0400
Committer: Alan Conway <acon...@redhat.com>
Committed: Tue Apr 10 13:28:24 2018 -0400

----------------------------------------------------------------------
 c/examples/receive.c        |  2 +-
 c/include/proton/message.h  |  3 ++
 c/include/proton/session.h  | 11 ++++---
 c/src/core/engine.c         |  5 ++--
 c/src/core/framing.h        |  1 +
 c/src/core/transport.c      | 20 +++++++++----
 c/tests/connection_driver.c | 64 +++++++++++++++++++++++++++++++++++++++-
 c/tests/test_handler.h      | 12 ++++++++
 c/tests/test_tools.h        |  8 ++++-
 9 files changed, 110 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/examples/receive.c
----------------------------------------------------------------------
diff --git a/c/examples/receive.c b/c/examples/receive.c
index 0d0c988..8280345 100644
--- a/c/examples/receive.c
+++ b/c/examples/receive.c
@@ -93,7 +93,7 @@ static bool handle(app_data_t* app, pn_event_t* event) {
    } break;
 
    case PN_DELIVERY: {
-     /* A message has been received */
+     /* A message (or part of a message) has been received */
      pn_delivery_t *d = pn_event_delivery(event);
      if (pn_delivery_readable(d)) {
        pn_link_t *l = pn_delivery_link(d);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/include/proton/message.h
----------------------------------------------------------------------
diff --git a/c/include/proton/message.h b/c/include/proton/message.h
index 0f094be..d7b9663 100644
--- a/c/include/proton/message.h
+++ b/c/include/proton/message.h
@@ -748,6 +748,9 @@ struct pn_link_t;
  * - call pn_link_send() to send the encoded message bytes
  * - call pn_link_advance() to indicate the message is complete
  *
+ * Note: you must create a delivery for the message before calling
+ * pn_message_send() see pn_delivery()
+ *
  * @param[in] msg A message object.
  * @param[in] sender A sending link.
  * The message will be encoded and sent with pn_link_send()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/include/proton/session.h
----------------------------------------------------------------------
diff --git a/c/include/proton/session.h b/c/include/proton/session.h
index cabb1f2..512e004 100644
--- a/c/include/proton/session.h
+++ b/c/include/proton/session.h
@@ -198,9 +198,7 @@ PN_EXTERN void pn_session_close(pn_session_t *session);
  * Get the incoming capacity of the session measured in bytes.
  *
  * The incoming capacity of a session determines how much incoming
- * message data the session will buffer. Note that if this value is
- * less than the negotiated frame size of the transport, it will be
- * rounded up to one full frame.
+ * message data the session will buffer.
  *
  * @param[in] session the session object
  * @return the incoming capacity of the session in bytes
@@ -211,9 +209,10 @@ PN_EXTERN size_t 
pn_session_get_incoming_capacity(pn_session_t *session);
  * Set the incoming capacity for a session object.
  *
  * The incoming capacity of a session determines how much incoming
- * message data the session will buffer. Note that if this value is
- * less than the negotiated frame size of the transport, it will be
- * rounded up to one full frame.
+ * message data the session will buffer.
+ *
+ * NOTE: If set, this value must be greater than or equal to the negotiated
+ * frame size of the transport.
  *
  * @param[in] session the session object
  * @param[in] capacity the incoming capacity for the session

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/src/core/engine.c
----------------------------------------------------------------------
diff --git a/c/src/core/engine.c b/c/src/core/engine.c
index f49886d..070c751 100644
--- a/c/src/core/engine.c
+++ b/c/src/core/engine.c
@@ -20,6 +20,7 @@
  */
 
 #include "engine-internal.h"
+#include "framing.h"
 #include <stdlib.h>
 #include <string.h>
 #include "protocol.h"
@@ -987,12 +988,12 @@ pn_session_t *pn_session(pn_connection_t *conn)
   ssn->links = pn_list(PN_WEAKREF, 0);
   ssn->freed = pn_list(PN_WEAKREF, 0);
   ssn->context = pn_record();
-  ssn->incoming_capacity = 1024*1024;
+  ssn->incoming_capacity = 0;
   ssn->incoming_bytes = 0;
   ssn->outgoing_bytes = 0;
   ssn->incoming_deliveries = 0;
   ssn->outgoing_deliveries = 0;
-  ssn->outgoing_window = 2147483647;
+  ssn->outgoing_window = AMQP_MAX_WINDOW_SIZE;
 
   // begin transport state
   memset(&ssn->state, 0, sizeof(ssn->state));

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/src/core/framing.h
----------------------------------------------------------------------
diff --git a/c/src/core/framing.h b/c/src/core/framing.h
index 792d664..92c1f7d 100644
--- a/c/src/core/framing.h
+++ b/c/src/core/framing.h
@@ -30,6 +30,7 @@
 
 #define AMQP_HEADER_SIZE (8)
 #define AMQP_MIN_MAX_FRAME_SIZE ((uint32_t)512) // minimum allowable max-frame
+#define AMQP_MAX_WINDOW_SIZE (2147483647)
 
 typedef struct {
   uint8_t type;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/src/core/transport.c
----------------------------------------------------------------------
diff --git a/c/src/core/transport.c b/c/src/core/transport.c
index 96b54f2..1a05261 100644
--- a/c/src/core/transport.c
+++ b/c/src/core/transport.c
@@ -1893,11 +1893,21 @@ static size_t pni_session_outgoing_window(pn_session_t 
*ssn)
 
 static size_t pni_session_incoming_window(pn_session_t *ssn)
 {
-  uint32_t size = ssn->connection->transport->local_max_frame;
-  if (!size) {
-    return 2147483647; // biggest legal value
-  } else {
-    return (ssn->incoming_capacity - ssn->incoming_bytes)/size;
+  pn_transport_t *t = ssn->connection->transport;
+  uint32_t size = t->local_max_frame;
+  size_t capacity = ssn->incoming_capacity;
+  if (!size || !capacity) {     /* session flow control is not enabled */
+    return AMQP_MAX_WINDOW_SIZE;
+  } else if (capacity >= size) { /* precondition */
+    return (capacity - ssn->incoming_bytes) / size;
+  } else {                     /* error: we will never have a non-zero window 
*/
+    pn_condition_format(
+      pn_transport_condition(t),
+      "amqp:internal-error",
+      "session capacity %"PN_ZU" is less than frame size %"PN_ZU,
+      capacity, size);
+    pn_transport_close_tail(t);
+    return 0;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/tests/connection_driver.c
----------------------------------------------------------------------
diff --git a/c/tests/connection_driver.c b/c/tests/connection_driver.c
index 62655a7..5997f52 100644
--- a/c/tests/connection_driver.c
+++ b/c/tests/connection_driver.c
@@ -379,6 +379,68 @@ static void test_message_abort_mixed(test_t *t) {
   test_connection_driver_destroy(&server);
 }
 
+/* Set capacity and max frame, send a single message */
+static void set_capacity_and_max_frame(
+  size_t capacity, size_t max_frame,
+  test_connection_driver_t *client, test_connection_driver_t *server,
+  const char* data)
+{
+  pn_transport_set_max_frame(client->driver.transport, max_frame);
+  pn_connection_open(client->driver.connection);
+  pn_session_t *ssn = pn_session(client->driver.connection);
+  pn_session_set_incoming_capacity(ssn, capacity);
+  pn_session_open(ssn);
+  pn_link_t *snd = pn_sender(ssn, "x");
+  pn_link_open(snd);
+  test_connection_drivers_run(client, server);
+  pn_link_flow(server->handler.link, 1);
+  test_connection_drivers_run(client, server);
+  if (pn_transport_closed(client->driver.transport) ||
+      pn_transport_closed(server->driver.transport))
+    return;
+  /* Send a message */
+  pn_message_t *m = pn_message();
+  pn_message_set_address(m, data);
+  pn_delivery(snd, PN_BYTES_LITERAL(x));
+  pn_message_send(m, snd, NULL);
+  pn_message_free(m);
+  test_connection_drivers_run(client, server);
+}
+
+#define MAX_WINDOW (2147483647)
+#define MAX_FRAME (4294967295)
+/* Test different settings for max-frame, outgoing-window, incoming-capacity */
+static void test_session_flow_control(test_t *t) {
+  test_connection_driver_t client, server;
+  test_connection_drivers_init(t, &client, open_handler, &server, 
delivery_handler);
+  pn_message_t *m = pn_message();
+  pn_rwbytes_t buf= {0};
+
+  /* Capacity equal to frame size OK */
+  set_capacity_and_max_frame(1234, 1234, &client, &server, "foo");
+  pn_delivery_t *dlv = server.handler.delivery;
+  if (TEST_CHECK(t, dlv)) {
+    message_decode(m, dlv, &buf);
+    TEST_STR_EQUAL(t, "foo", pn_message_get_address(m));
+  }
+
+  /* Capacity bigger than frame size OK */
+  set_capacity_and_max_frame(12345, 1234, &client, &server, "foo");
+  dlv = server.handler.delivery;
+  if (TEST_CHECK(t, dlv)) {
+    message_decode(m, dlv, &buf);
+    TEST_STR_EQUAL(t, "foo", pn_message_get_address(m));
+  }
+
+  /* Capacity smaller than frame size is an error */
+  set_capacity_and_max_frame(1234, 12345, &client, &server, "foo");
+  TEST_COND_NAME(t, "amqp:internal-error", 
pn_transport_condition(client.driver.transport));
+  TEST_COND_DESC(t, "session capacity 1234 is less than frame size 12345", 
pn_transport_condition(client.driver.transport));
+
+  pn_message_free(m);
+  free(buf.start);
+  test_connection_drivers_destroy(&client, &server);
+}
 
 int main(int argc, char **argv) {
   int failed = 0;
@@ -386,6 +448,6 @@ int main(int argc, char **argv) {
   RUN_ARGV_TEST(failed, t, test_message_stream(&t));
   RUN_ARGV_TEST(failed, t, test_message_abort(&t));
   RUN_ARGV_TEST(failed, t, test_message_abort_mixed(&t));
+  RUN_ARGV_TEST(failed, t, test_session_flow_control(&t));
   return failed;
-
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/tests/test_handler.h
----------------------------------------------------------------------
diff --git a/c/tests/test_handler.h b/c/tests/test_handler.h
index 108d0d9..7ffd4d3 100644
--- a/c/tests/test_handler.h
+++ b/c/tests/test_handler.h
@@ -46,6 +46,7 @@ typedef struct test_handler_t {
   pn_link_t *sender;
   pn_link_t *receiver;
   pn_delivery_t *delivery;
+  pn_message_t *message;
   pn_ssl_domain_t *ssl_domain;
 } test_handler_t;
 
@@ -167,4 +168,15 @@ test_connection_driver_t* 
test_connection_drivers_run(test_connection_driver_t *
   return NULL;
 }
 
+/* Initialize a client-server driver pair */
+void test_connection_drivers_init(test_t *t, test_connection_driver_t *a, 
test_handler_fn fa, test_connection_driver_t *b, test_handler_fn fb) {
+  test_connection_driver_init(a, t, fa, NULL);
+  test_connection_driver_init(b, t, fb, NULL);
+  pn_transport_set_server(b->driver.transport);
+}
+
+void test_connection_drivers_destroy(test_connection_driver_t *a, 
test_connection_driver_t *b) {
+  test_connection_driver_destroy(a);
+  test_connection_driver_destroy(b);
+}
 #endif // TESTS_TEST_DRIVER_H

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/tests/test_tools.h
----------------------------------------------------------------------
diff --git a/c/tests/test_tools.h b/c/tests/test_tools.h
index d046a43..7596d60 100644
--- a/c/tests/test_tools.h
+++ b/c/tests/test_tools.h
@@ -148,6 +148,11 @@ bool test_int_equal_(test_t *t, int want, int got, const 
char *file, int line) {
 }
 #define TEST_INT_EQUAL(TEST, WANT, GOT) test_int_equal_((TEST), (WANT), (GOT), 
__FILE__, __LINE__)
 
+bool test_size_equal_(test_t *t, size_t want, size_t got, const char *file, 
int line) {
+  return test_check_(t, want == got, NULL, file, line, "want %zd, got %zd", 
want, got);
+}
+#define TEST_SIZE_EQUAL(TEST, WANT, GOT) test_size_equal_((TEST), (WANT), 
(GOT), __FILE__, __LINE__)
+
 bool test_str_equal_(test_t *t, const char* want, const char* got, const char 
*file, int line) {
   return test_check_(t, !strcmp(want, got), NULL, file, line, "want '%s', got 
'%s'", want, got);
 }
@@ -231,7 +236,8 @@ void message_decode(pn_message_t *m, pn_delivery_t *d, 
pn_rwbytes_t *buf) {
   pn_link_t *l = pn_delivery_link(d);
   ssize_t size = pn_delivery_pending(d);
   rwbytes_ensure(buf, size);
-  TEST_ASSERT(size == pn_link_recv(l, buf->start, size));
+  ssize_t result = pn_link_recv(l, buf->start, size);
+  TEST_ASSERTF(size == result, "%ld != %ld", (long)size, (long)result);
   pn_message_clear(m);
   TEST_ASSERTF(!pn_message_decode(m, buf->start, size), "decode: %s", 
pn_error_text(pn_message_error(m)));
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to