Repository: qpid-proton
Updated Branches:
  refs/heads/master dc724c140 -> bdd986a1d (forced update)


PROTON-1823: [c] pn_message_send() simplified message sending for C

Encapsulates the awkward allocate-encode-expand dance required by 
pn_message_encode()
Supports the following 2 scenarios:

1. Simple: don't care about allocations, just send `pn_message_t *msg` and 
forget it:

    pn_message_send(msg, sender, NULL)

2. Efficient: re-use a buffer, buffer is allocated and expanded as required:

    pn_rwbytes_t buffer={0};     // Zero initialize, libary will do the 
allocation
    ...
    pn_message_send(msg, sender, &buffer); // Expand as needed
    pn_message_send(msg2, sender2, &buffer); // etc.
    ...
    free(buffer->start);        // Application must do final free of buffer

Note 2. assumes use of malloc/realloc/free, apps that need custom allocation can
use the original pn_message_encode() API or we could add a version that takes a 
pointer
to a function equivalent to realloc()

Updated examples to use this API.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/bdd986a1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/bdd986a1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/bdd986a1

Branch: refs/heads/master
Commit: bdd986a1d8da5b5b0811aa244d974f48a757e3d7
Parents: 3cb7a5c
Author: Alan Conway <acon...@redhat.com>
Authored: Fri Apr 6 15:54:11 2018 -0400
Committer: Alan Conway <acon...@redhat.com>
Committed: Fri Apr 6 17:45:38 2018 -0400

----------------------------------------------------------------------
 c/examples/direct.c         | 37 ++++++++----------------------------
 c/examples/send.c           | 41 +++++++++++-----------------------------
 c/include/proton/message.h  | 24 ++++++++++++++++++++++-
 c/src/core/message.c        | 28 +++++++++++++++++++++++++++
 c/tests/connection_driver.c |  8 +++-----
 5 files changed, 73 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bdd986a1/c/examples/direct.c
----------------------------------------------------------------------
diff --git a/c/examples/direct.c b/c/examples/direct.c
index 6d8642c..d2a4ae3 100644
--- a/c/examples/direct.c
+++ b/c/examples/direct.c
@@ -76,7 +76,7 @@ static void check_condition(pn_event_t *e, pn_condition_t 
*cond, app_data_t *app
 }
 
 /* Create a message with a map { "sequence" : number } encode it and return 
the encoded buffer. */
-static pn_bytes_t encode_message(app_data_t* app) {
+static void send_message(app_data_t *app, pn_link_t *sender) {
   /* Construct a message with the map { "sequence": app.sent } */
   pn_message_t* message = pn_message();
   pn_data_t* body = pn_message_body(message);
@@ -86,29 +86,11 @@ static pn_bytes_t encode_message(app_data_t* app) {
   pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
   pn_data_put_int(body, app->sent); /* The sequence number */
   pn_data_exit(body);
-
-  /* encode the message, expanding the encode buffer as needed */
-  if (app->msgout.start == NULL) {
-    static const size_t initial_size = 128;
-    app->msgout = pn_rwbytes(initial_size, (char*)malloc(initial_size));
-  }
-  /* app->msgout is the total buffer space available. */
-  /* mbuf wil point at just the portion used by the encoded message */
-  {
-  pn_rwbytes_t mbuf = pn_rwbytes(app->msgout.size, app->msgout.start);
-  int status = 0;
-  while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == 
PN_OVERFLOW) {
-    app->msgout.size *= 2;
-    app->msgout.start = (char*)realloc(app->msgout.start, app->msgout.size);
-    mbuf.size = app->msgout.size;
-  }
-  if (status != 0) {
-    fprintf(stderr, "error encoding message: %s\n", 
pn_error_text(pn_message_error(message)));
-    exit(1);
+  if (pn_message_send(message, sender, &app->msgout) < 0) {
+    fprintf(stderr, "send error: %s\n", 
pn_error_text(pn_message_error(message)));
+    exit_code = 1;
   }
   pn_message_free(message);
-  return pn_bytes(mbuf.size, mbuf.start);
-  }
 }
 
 static void decode_message(pn_rwbytes_t data) {
@@ -124,7 +106,7 @@ static void decode_message(pn_rwbytes_t data) {
     pn_message_free(m);
     free(data.start);
   } else {
-    fprintf(stderr, "decode_message: %s\n", pn_code(err));
+    fprintf(stderr, "decode error: %s\n", pn_error_text(pn_message_error(m)));
     exit_code = 1;
   }
 }
@@ -142,7 +124,7 @@ static void handle_receive(app_data_t *app, pn_event_t* 
event) {
    case PN_DELIVERY: {          /* Incoming message data */
      pn_delivery_t *d = pn_event_delivery(event);
      if (pn_delivery_readable(d)) {
-       pn_link_t *l = pn_delivery_link(d);
+     pn_link_t *l = pn_delivery_link(d);
        size_t size = pn_delivery_pending(d);
        pn_rwbytes_t* m = &app->msgin; /* Append data to incoming message 
buffer */
        ssize_t recv;
@@ -198,11 +180,8 @@ static void handle_send(app_data_t* app, pn_event_t* 
event) {
        ++app->sent;
        /* Use sent counter as unique delivery tag. */
        pn_delivery(sender, pn_dtag((const char *)&app->sent, 
sizeof(app->sent)));
-       {
-       pn_bytes_t msgbuf = encode_message(app);
-       pn_link_send(sender, msgbuf.start, msgbuf.size);
+       send_message(app, sender);
        pn_link_advance(sender);
-       }
      }
      break;
    }
@@ -308,7 +287,7 @@ static bool handle(app_data_t* app, pn_event_t* event) {
      }
    }
   }
-  return true;
+  return exit_code == 0;
 }
 
 void run(app_data_t *app) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bdd986a1/c/examples/send.c
----------------------------------------------------------------------
diff --git a/c/examples/send.c b/c/examples/send.c
index 9e8cc4a..8d979e6 100644
--- a/c/examples/send.c
+++ b/c/examples/send.c
@@ -38,6 +38,7 @@ typedef struct app_data_t {
   int message_count;
 
   pn_proactor_t *proactor;
+  pn_message_t *message;
   pn_rwbytes_t message_buffer;
   int sent;
   int acknowledged;
@@ -55,40 +56,21 @@ static void check_condition(pn_event_t *e, pn_condition_t 
*cond) {
 }
 
 /* Create a message with a map { "sequence" : number } encode it and return 
the encoded buffer. */
-static pn_bytes_t encode_message(app_data_t* app) {
+static void send_message(app_data_t* app, pn_link_t *sender) {
   /* Construct a message with the map { "sequence": app.sent } */
-  pn_message_t* message = pn_message();
-  pn_data_t* body = pn_message_body(message);
-  pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id 
also */
+  pn_data_t* body;
+  pn_message_clear(app->message);
+  body = pn_message_body(app->message);
+  pn_data_put_int(pn_message_id(app->message), app->sent); /* Set the 
message_id also */
   pn_data_put_map(body);
   pn_data_enter(body);
   pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
   pn_data_put_int(body, app->sent); /* The sequence number */
   pn_data_exit(body);
-
-  /* encode the message, expanding the encode buffer as needed */
-  if (app->message_buffer.start == NULL) {
-    static const size_t initial_size = 128;
-    app->message_buffer = pn_rwbytes(initial_size, 
(char*)malloc(initial_size));
-  }
-  /* app->message_buffer is the total buffer space available. */
-  /* mbuf wil point at just the portion used by the encoded message */
-  {
-  pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, 
app->message_buffer.start);
-  int status = 0;
-  while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == 
PN_OVERFLOW) {
-    app->message_buffer.size *= 2;
-    app->message_buffer.start = (char*)realloc(app->message_buffer.start, 
app->message_buffer.size);
-    mbuf.size = app->message_buffer.size;
-    mbuf.start = app->message_buffer.start;
-  }
-  if (status != 0) {
-    fprintf(stderr, "error encoding message: %s\n", 
pn_error_text(pn_message_error(message)));
+  if (pn_message_send(app->message, sender, &app->message_buffer) < 0) {
+    fprintf(stderr, "error sending message: %s\n", 
pn_error_text(pn_message_error(app->message)));
     exit(1);
   }
-  pn_message_free(message);
-  return pn_bytes(mbuf.size, mbuf.start);
-  }
 }
 
 /* Returns true to continue, false if finished */
@@ -116,10 +98,7 @@ static bool handle(app_data_t* app, pn_event_t* event) {
        ++app->sent;
        /* Use sent counter as unique delivery tag. */
        pn_delivery(sender, pn_dtag((const char *)&app->sent, 
sizeof(app->sent)));
-       {
-       pn_bytes_t msgbuf = encode_message(app);
-       pn_link_send(sender, msgbuf.start, msgbuf.size);
-       }
+       send_message(app, sender);
        pn_link_advance(sender);
      }
      break;
@@ -193,6 +172,7 @@ int main(int argc, char **argv) {
   app.port = (argc > 2) ? argv[2] : "amqp";
   app.amqp_address = (argc > 3) ? argv[3] : "examples";
   app.message_count = (argc > 4) ? atoi(argv[4]) : 10;
+  app.message = pn_message();
 
   app.proactor = pn_proactor();
   pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
@@ -200,5 +180,6 @@ int main(int argc, char **argv) {
   run(&app);
   pn_proactor_free(app.proactor);
   free(app.message_buffer.start);
+  pn_message_free(app.message);
   return exit_code;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bdd986a1/c/include/proton/message.h
----------------------------------------------------------------------
diff --git a/c/include/proton/message.h b/c/include/proton/message.h
index f9d62b0..fd69688 100644
--- a/c/include/proton/message.h
+++ b/c/include/proton/message.h
@@ -723,7 +723,7 @@ PN_EXTERN pn_data_t *pn_message_body(pn_message_t *msg);
 PN_EXTERN int pn_message_decode(pn_message_t *msg, const char *bytes, size_t 
size);
 
 /**
- * Encode/save message content as AMQP formatted binary data.
+ * Encode a message as AMQP formatted binary data.
  *
  * If the buffer space provided is insufficient to store the content
  * held in the message, the operation will fail and return a
@@ -737,6 +737,28 @@ PN_EXTERN int pn_message_decode(pn_message_t *msg, const 
char *bytes, size_t siz
  */
 PN_EXTERN int pn_message_encode(pn_message_t *msg, char *bytes, size_t *size);
 
+struct pn_link_t;
+
+/**
+ * Encode and send a message on a sender link.
+ *
+ * @param[in] msg A message object.
+ * @param[in] sender A sending link.
+ * The message will be encoded and sent with pn_link_send()
+ * @param[inout] buf Used to encode the message.
+ * - if buf == NULL, temporary space will be allocated and freed with 
malloc()/free()
+ * - if buf->start != NULL and buf->size is large enough, the message is 
encoded to
+ *   buf->start
+ * - if buf->start == NULL or buf->size is not enough, the buffer will be 
extended like this:
+ *
+ *       buf->size = new_size; buf->start = realloc(buf->start, new_size)
+ *
+ *   it is possible for the buffer to be extended more than once.
+ * @return The number of bytes encoded and sent on success.
+ * Returns an error code (< 0) on failure and sets pn_message_error() on msg
+ */
+PN_EXTERN ssize_t pn_message_send(pn_message_t *msg, pn_link_t *sender, 
pn_rwbytes_t *buf);
+
 /**
  * Save message content into a pn_data_t object data. The data object will 
first be cleared.
  */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bdd986a1/c/src/core/message.c
----------------------------------------------------------------------
diff --git a/c/src/core/message.c b/c/src/core/message.c
index 8f0bcf7..3312f8c 100644
--- a/c/src/core/message.c
+++ b/c/src/core/message.c
@@ -26,6 +26,7 @@
 #include "protocol.h"
 #include "util.h"
 
+#include <proton/link.h>
 #include <proton/object.h>
 #include <proton/codec.h>
 #include <proton/error.h>
@@ -895,3 +896,30 @@ pn_data_t *pn_message_body(pn_message_t *msg)
 {
   return msg ? msg->body : NULL;
 }
+
+PN_EXTERN ssize_t pn_message_send(pn_message_t *msg, pn_link_t *sender, 
pn_rwbytes_t *buffer) {
+  static const size_t initial_size = 256;
+  pn_rwbytes_t local_buf = { 0 };
+  ssize_t err = 0;
+  size_t size = 0;
+
+  if (buffer == NULL) buffer = &local_buf;
+  if (buffer->start == NULL) {
+    buffer->start = (char*)malloc(initial_size);
+    buffer->size = initial_size;
+  }
+  if (buffer->start == NULL) return PN_OUT_OF_MEMORY;
+  size = buffer->size;
+  while ((err = pn_message_encode(msg, buffer->start, &size)) == PN_OVERFLOW) {
+    buffer->size *= 2;
+    buffer->start = (char*)realloc(buffer->start, buffer->size);
+    if (buffer->start == NULL) return PN_OUT_OF_MEMORY;
+    size = buffer->size;
+  }
+  if (err == 0) {
+    err = pn_link_send(sender, buffer->start, size);
+    if (err < 0) pn_error_copy(pn_message_error(msg), pn_link_error(sender));
+  }
+  free(local_buf.start);
+  return err;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bdd986a1/c/tests/connection_driver.c
----------------------------------------------------------------------
diff --git a/c/tests/connection_driver.c b/c/tests/connection_driver.c
index f152e89..7e4489a 100644
--- a/c/tests/connection_driver.c
+++ b/c/tests/connection_driver.c
@@ -102,11 +102,10 @@ static void test_message_transfer(test_t *t) {
   /* Encode and send a message */
   pn_message_t *m = pn_message();
   pn_data_put_string(pn_message_body(m), pn_bytes(4, "abc")); /* Include 
trailing NULL */
-  pn_rwbytes_t buf = { 0 };
-  ssize_t size = message_encode(m, &buf);
-  pn_message_free(m);
   pn_delivery(snd, PN_BYTES_LITERAL(x));
-  TEST_INT_EQUAL(t, size, pn_link_send(snd, buf.start, size));
+  pn_message_send(m, snd, NULL);
+  pn_message_free(m);
+
   TEST_CHECK(t, pn_link_advance(snd));
   test_connection_drivers_run(&client, &server);
   TEST_HANDLER_EXPECT(&server.handler, PN_TRANSPORT, PN_DELIVERY, 0);
@@ -125,7 +124,6 @@ static void test_message_transfer(test_t *t) {
   TEST_STR_EQUAL(t, "abc", pn_data_get_string(pn_message_body(m2)).start);
   pn_message_free(m2);
 
-  free(buf.start);
   free(buf2.start);
   test_connection_driver_destroy(&client);
   test_connection_driver_destroy(&server);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to