Author: kgiusti
Date: Mon Oct 22 21:50:45 2012
New Revision: 1401095

URL: http://svn.apache.org/viewvc?rev=1401095&view=rev
Log:
PROTON-79: use variable size buffer for output frames.

Modified:
    qpid/proton/trunk/proton-c/include/proton/framing.h
    qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
    qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/engine/engine.c

Modified: qpid/proton/trunk/proton-c/include/proton/framing.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/framing.h?rev=1401095&r1=1401094&r2=1401095&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/framing.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/framing.h Mon Oct 22 21:50:45 2012
@@ -30,6 +30,7 @@ extern "C" {
 #endif
 
 #define AMQP_HEADER_SIZE (8)
+#define AMQP_MIN_MAX_FRAME_SIZE ((uint32_t)512) // minimum allowable max-frame
 
 typedef struct {
   uint8_t type;

Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c?rev=1401095&r1=1401094&r2=1401095&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c Mon Oct 22 21:50:45 
2012
@@ -46,6 +46,7 @@ pn_dispatcher_t *pn_dispatcher(uint8_t f
   disp->size = 0;
 
   disp->output_args = pn_data(16);
+  disp->frame = pn_buffer( 4*1024 );
   // XXX
   disp->capacity = 4*1024;
   disp->output = malloc(disp->capacity);
@@ -222,51 +223,38 @@ int pn_post_frame(pn_dispatcher_t *disp,
 
   pn_do_trace(disp, ch, OUT, disp->output_args, disp->output_payload, 
disp->output_size);
 
-  size_t size = 1024;
-
-  while (true) {
-    char buf[size];
-    ssize_t wr = pn_data_encode(disp->output_args, buf, size);
-    if (wr < 0)
-    {
-      if (wr == PN_OVERFLOW) {
-        size *= 2;
-        continue;
-      } else {
-        fprintf(stderr, "error posting frame: %s", pn_code(wr));
-        return PN_ERR;
-      }
-    } else if (size - wr < disp->output_size) {
-      size += wr + disp->output_size - size;
-      continue;
-    } else {
-      if (disp->output_size) {
-        memmove(buf + wr, disp->output_payload, disp->output_size);
-        wr += disp->output_size;
-        disp->output_payload = NULL;
-        disp->output_size = 0;
-      }
-
-      pn_frame_t frame = {disp->frame_type};
-      frame.channel = ch;
-      frame.payload = buf;
-      frame.size = wr;
-      size_t n;
-      while (!(n = pn_write_frame(disp->output + disp->available,
-                                  disp->capacity - disp->available, frame))) {
-        disp->capacity *= 2;
-        disp->output = realloc(disp->output, disp->capacity);
-      }
-      if (disp->trace & PN_TRACE_RAW) {
-        fprintf(stderr, "RAW: \"");
-        pn_fprint_data(stderr, disp->output + disp->available, n);
-        fprintf(stderr, "\"\n");
-      }
-      disp->available += n;
-      break;
+ encode_performatives:
+  pn_buffer_clear( disp->frame );
+  pn_bytes_t buf = pn_buffer_bytes( disp->frame );
+  buf.size = pn_buffer_available( disp->frame );
+
+  ssize_t wr = pn_data_encode( disp->output_args, buf.start, buf.size );
+  if (wr < 0) {
+    if (wr == PN_OVERFLOW) {
+      pn_buffer_ensure( disp->frame, pn_buffer_available( disp->frame ) * 2 );
+      goto encode_performatives;
     }
+    fprintf(stderr, "error posting frame: %s", pn_code(wr));
+    return PN_ERR;
   }
 
+  pn_frame_t frame = {disp->frame_type};
+  frame.channel = ch;
+  frame.payload = buf.start;
+  frame.size = wr;
+  size_t n;
+  while (!(n = pn_write_frame(disp->output + disp->available,
+                              disp->capacity - disp->available, frame))) {
+    disp->capacity *= 2;
+    disp->output = realloc(disp->output, disp->capacity);
+  }
+  if (disp->trace & PN_TRACE_RAW) {
+    fprintf(stderr, "RAW: \"");
+    pn_fprint_data(stderr, disp->output + disp->available, n);
+    fprintf(stderr, "\"\n");
+  }
+  disp->available += n;
+
   return 0;
 }
 
@@ -289,53 +277,70 @@ int pn_post_transfer_frame(pn_dispatcher
                            bool settled,
                            bool more)
 {
+  bool more_flag = more;
 
-  if (!disp->remote_max_frame)  // "unlimited" frame size accepted
-    return pn_post_frame( disp, ch, "DL[IIzIoo]", TRANSFER,
-                          handle, id, tag->size, tag->start,
-                          message_format,
-                          settled, more);
-
-  bool more_flag = true;        // optimize: check against hard-code size of 
performative
-  do {
-    pn_data_clear(disp->output_args);
-    int err = pn_data_fill(disp->output_args, "DL[IIzIoo]", TRANSFER,
-                           handle, id, tag->size, tag->start,
-                           message_format,
-                           settled, more_flag);
-    if (err) {
-      fprintf(stderr, "error posting transfer frame: %s: %s\n", pn_code(err), 
pn_data_error(disp->output_args));
-      return PN_ERR;
-    }
+  // create preformatives, assuming 'more' flag need not change
+
+ compute_performatives:
+  pn_data_clear(disp->output_args);
+  int err = pn_data_fill(disp->output_args, "DL[IIzIoo]", TRANSFER,
+                         handle, id, tag->size, tag->start,
+                         message_format,
+                         settled, more_flag);
+  if (err) {
+    fprintf(stderr, "error posting transfer frame: %s: %s\n", pn_code(err), 
pn_data_error(disp->output_args));
+    return PN_ERR;
+  }
+
+  do { // send as many frames as possible without changing the 'more' flag...
 
-    pn_buffer_clear(disp->frame);
-    pn_bytes_t frame_buf = pn_buffer_bytes( disp->frame );
-    ssize_t wr = pn_data_encode(disp->output_args, frame_buf.start,
-                                pn_buffer_available( disp->frame ));
+  encode_performatives:
+    pn_buffer_clear( disp->frame );
+    pn_bytes_t buf = pn_buffer_bytes( disp->frame );
+    buf.size = pn_buffer_available( disp->frame );
+
+    ssize_t wr = pn_data_encode(disp->output_args, buf.start, buf.size);
     if (wr < 0) {
+      if (wr == PN_OVERFLOW) {
+        pn_buffer_ensure( disp->frame, pn_buffer_available( disp->frame ) * 2 
);
+        goto encode_performatives;
+      }
       fprintf(stderr, "error posting frame: %s", pn_code(wr));
       return PN_ERR;
     }
-    frame_buf.size = wr;
+    buf.size = wr;
 
-    if (more == false && (frame_buf.size + disp->output_size) <= 
disp->remote_max_frame) {
-      more_flag = false;
-      continue;         // rebuild performative with new value for more flag
+    // check if we need to break up the outbound frame
+    size_t available = disp->output_size;
+    if (disp->remote_max_frame) {
+      if ((available + buf.size) > disp->remote_max_frame) {
+        available = disp->remote_max_frame - buf.size;
+        if (more_flag == false) {
+          more_flag = true;
+          goto compute_performatives;  // deal with flag change
+        }
+      } else if (more_flag == true && more == false) {
+        // caller has no more, and this is the last frame
+        more_flag = false;
+        goto compute_performatives;
+      }
     }
 
-    size_t available = disp->remote_max_frame - frame_buf.size;
-    if (available > disp->output_size) available = disp->output_size;
-    memmove(frame_buf.start + frame_buf.size, disp->output_payload, available);
-    frame_buf.size += available;
-    //disp->output_payload = NULL;
+    if (pn_buffer_available( disp->frame ) < (available + buf.size)) {
+      // not enough room for payload - try again...
+      pn_buffer_ensure( disp->frame, available + buf.size );
+      goto encode_performatives;
+    }
+    memmove( buf.start + buf.size, disp->output_payload, available);
     disp->output_size -= available;
-
-    pn_do_trace(disp, ch, OUT, disp->output_args, disp->output_payload, 
available);
+    buf.size += available;
 
     pn_frame_t frame = {disp->frame_type};
     frame.channel = ch;
-    frame.payload = frame_buf.start;
-    frame.size = frame_buf.size;
+    frame.payload = buf.start;
+    frame.size = buf.size;
+
+    pn_do_trace(disp, ch, OUT, disp->output_args, frame.payload, frame.size);
 
     size_t n;
     while (!(n = pn_write_frame(disp->output + disp->available,

Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h?rev=1401095&r1=1401094&r2=1401095&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h Mon Oct 22 21:50:45 
2012
@@ -56,7 +56,7 @@ struct pn_dispatcher_t {
   void *context;
   bool halt;
   bool batch;
-  char scratch[SCRATCH];        // ? Rafi - can I use this instead of frame 
(size for remote-max-frame?)
+  char scratch[SCRATCH];
 };
 
 pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, void *context);

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1401095&r1=1401094&r2=1401095&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Mon Oct 22 21:50:45 
2012
@@ -89,9 +89,6 @@ typedef struct {
 #include <proton/sasl.h>
 #include <proton/ssl.h>
 
-// minimum allowable max-frame
-#define PN_MIN_MAX_FRAME_SIZE ((uint32_t)512)
-
 struct pn_transport_t {
   ssize_t (*process_input)(pn_transport_t *, const char *, size_t);
   ssize_t (*process_output)(pn_transport_t *, char *, size_t);

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1401095&r1=1401094&r2=1401095&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Mon Oct 22 21:50:45 2012
@@ -1330,14 +1330,14 @@ int pn_do_open(pn_dispatcher_t *disp)
                          transport->remote_desired_capabilities);
   if (err) return err;
   if (transport->remote_max_frame > 0) {
-    if (transport->remote_max_frame < PN_MIN_MAX_FRAME_SIZE) {
+    if (transport->remote_max_frame < AMQP_MIN_MAX_FRAME_SIZE) {
       fprintf(stderr, "Peer advertised bad max-frame (%u), forcing to %u\n",
-              transport->remote_max_frame, PN_MIN_MAX_FRAME_SIZE);
-      transport->remote_max_frame = PN_MIN_MAX_FRAME_SIZE;
+              transport->remote_max_frame, AMQP_MIN_MAX_FRAME_SIZE);
+      transport->remote_max_frame = AMQP_MIN_MAX_FRAME_SIZE;
     }
     disp->remote_max_frame = transport->remote_max_frame;
-    if (disp->frame) pn_buffer_free( disp->frame );
-    disp->frame = pn_buffer( disp->remote_max_frame );
+    pn_buffer_clear( disp->frame );
+    pn_buffer_ensure( disp->frame, disp->remote_max_frame );
   }
   if (container_q) {
     transport->remote_container = pn_bytes_strdup(remote_container);
@@ -2375,8 +2375,8 @@ uint32_t pn_transport_get_max_frame(pn_t
 
 void pn_transport_set_max_frame(pn_transport_t *transport, uint32_t size)
 {
-  if (size && size < PN_MIN_MAX_FRAME_SIZE)
-    size = PN_MIN_MAX_FRAME_SIZE;
+  if (size && size < AMQP_MIN_MAX_FRAME_SIZE)
+    size = AMQP_MIN_MAX_FRAME_SIZE;
   transport->local_max_frame = size;
 }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to