PROTON-1687: consolidate transport buffers in transport
- Use pn_buffer_t for transport output buffer.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/85755ac4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/85755ac4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/85755ac4

Branch: refs/heads/go1
Commit: 85755ac4bde591a02953d080a36a23af80287be8
Parents: 7a936e9
Author: Andrew Stitcher <[email protected]>
Authored: Tue Aug 16 17:32:23 2016 -0400
Committer: Andrew Stitcher <[email protected]>
Committed: Fri Dec 8 15:46:21 2017 -0500

----------------------------------------------------------------------
 proton-c/src/core/buffer.c          | 19 +++++++++++-----
 proton-c/src/core/buffer.h          |  3 ++-
 proton-c/src/core/dispatcher.c      |  6 ++---
 proton-c/src/core/engine-internal.h |  7 +++---
 proton-c/src/core/framing.c         | 12 ++++++----
 proton-c/src/core/framing.h         |  4 +++-
 proton-c/src/core/transport.c       | 38 +++++++++++---------------------
 proton-c/src/core/util.c            | 12 ++++++----
 proton-c/src/core/util.h            |  2 ++
 proton-c/src/sasl/sasl.c            |  2 +-
 10 files changed, 56 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85755ac4/proton-c/src/core/buffer.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/buffer.c b/proton-c/src/core/buffer.c
index 96c6b37..7bfc813 100644
--- a/proton-c/src/core/buffer.c
+++ b/proton-c/src/core/buffer.c
@@ -238,6 +238,11 @@ int pn_buffer_trim(pn_buffer_t *buf, size_t left, size_t 
right)
 {
   if (left + right > buf->size) return PN_ARG_ERR;
 
+  // In special case where we trim everything just clear buffer
+  if (left + right == buf->size) {
+    pn_buffer_clear(buf);
+    return 0;
+  }
   buf->start += left;
   if (buf->start >= buf->capacity)
     buf->start -= buf->capacity;
@@ -301,11 +306,15 @@ pn_rwbytes_t pn_buffer_memory(pn_buffer_t *buf)
   }
 }
 
-int pn_buffer_print(pn_buffer_t *buf)
+int pn_buffer_quote(pn_buffer_t *buf, pn_string_t *str, size_t n)
 {
-  printf("pn_buffer(\"");
-  pn_print_data(buf->bytes + pni_buffer_head(buf), pni_buffer_head_size(buf));
-  pn_print_data(buf->bytes, pni_buffer_tail_size(buf));
-  printf("\")");
+  size_t hsize = pni_buffer_head_size(buf);
+  size_t tsize = pni_buffer_tail_size(buf);
+  if (hsize >= n) {
+    pn_quote(str, buf->bytes + pni_buffer_head(buf), n);
+    return 0;
+  }
+  pn_quote(str, buf->bytes + pni_buffer_head(buf), hsize);
+  if (tsize-(n-hsize) > 0) pn_quote(str, buf->bytes, tsize-(n-hsize));
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85755ac4/proton-c/src/core/buffer.h
----------------------------------------------------------------------
diff --git a/proton-c/src/core/buffer.h b/proton-c/src/core/buffer.h
index da557ef..7193f8f 100644
--- a/proton-c/src/core/buffer.h
+++ b/proton-c/src/core/buffer.h
@@ -23,6 +23,7 @@
  */
 
 #include <proton/import_export.h>
+#include <proton/object.h>
 #include <proton/types.h>
 
 #ifdef __cplusplus
@@ -45,7 +46,7 @@ void pn_buffer_clear(pn_buffer_t *buf);
 int pn_buffer_defrag(pn_buffer_t *buf);
 pn_bytes_t pn_buffer_bytes(pn_buffer_t *buf);
 pn_rwbytes_t pn_buffer_memory(pn_buffer_t *buf);
-int pn_buffer_print(pn_buffer_t *buf);
+int pn_buffer_quote(pn_buffer_t *buf, pn_string_t *string, size_t n);
 
 #ifdef __cplusplus
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85755ac4/proton-c/src/core/dispatcher.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/dispatcher.c b/proton-c/src/core/dispatcher.c
index 36f8cc9..87e4d97 100644
--- a/proton-c/src/core/dispatcher.c
+++ b/proton-c/src/core/dispatcher.c
@@ -149,10 +149,8 @@ ssize_t pn_dispatcher_input(pn_transport_t *transport, 
const char *bytes, size_t
 
 ssize_t pn_dispatcher_output(pn_transport_t *transport, char *bytes, size_t 
size)
 {
-    int n = transport->available < size ? transport->available : size;
-    memmove(bytes, transport->output, n);
-    memmove(transport->output, transport->output + n, transport->available - 
n);
-    transport->available -= n;
+    int n = pn_buffer_get(transport->output_buffer, 0, size, bytes);
+    pn_buffer_trim(transport->output_buffer, n, 0);
     // XXX: need to check for errors
     return n;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85755ac4/proton-c/src/core/engine-internal.h
----------------------------------------------------------------------
diff --git a/proton-c/src/core/engine-internal.h 
b/proton-c/src/core/engine-internal.h
index 39d1572..ec1603d 100644
--- a/proton-c/src/core/engine-internal.h
+++ b/proton-c/src/core/engine-internal.h
@@ -164,10 +164,9 @@ struct pn_transport_t {
   pn_data_t *args;
   pn_data_t *output_args;
   pn_buffer_t *frame;  // frame under construction
-  // Temporary
-  size_t capacity;
-  size_t available; /* number of raw bytes pending output */
-  char *output;
+
+  // Temporary - ??
+  pn_buffer_t *output_buffer;
 
   /* statistics */
   uint64_t bytes_input;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85755ac4/proton-c/src/core/framing.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/framing.c b/proton-c/src/core/framing.c
index 18e3c38..9f78666 100644
--- a/proton-c/src/core/framing.c
+++ b/proton-c/src/core/framing.c
@@ -83,20 +83,24 @@ ssize_t pn_read_frame(pn_frame_t *frame, const char *bytes, 
size_t available, ui
   return size;
 }
 
-size_t pn_write_frame(char *bytes, size_t available, pn_frame_t frame)
+size_t pn_write_frame(pn_buffer_t* buffer, pn_frame_t frame)
 {
   size_t size = AMQP_HEADER_SIZE + frame.ex_size + frame.size;
-  if (size <= available)
+  if (size <= pn_buffer_available(buffer))
   {
+    // Prepare header
+    char bytes[8];
     pn_i_write32(&bytes[0], size);
     int doff = (frame.ex_size + AMQP_HEADER_SIZE - 1)/4 + 1;
     bytes[4] = doff;
     bytes[5] = frame.type;
     pn_i_write16(&bytes[6], frame.channel);
 
+    // Write header then rest of frame
+    pn_buffer_append(buffer, bytes, 8);
     if (frame.extended)
-      memmove(bytes + AMQP_HEADER_SIZE, frame.extended, frame.ex_size);
-    memmove(bytes + 4*doff, frame.payload, frame.size);
+        pn_buffer_append(buffer, frame.extended, frame.ex_size);
+    pn_buffer_append(buffer, frame.payload, frame.size);
     return size;
   } else {
     return 0;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85755ac4/proton-c/src/core/framing.h
----------------------------------------------------------------------
diff --git a/proton-c/src/core/framing.h b/proton-c/src/core/framing.h
index ecb88a4..792d664 100644
--- a/proton-c/src/core/framing.h
+++ b/proton-c/src/core/framing.h
@@ -22,6 +22,8 @@
  *
  */
 
+#include "buffer.h"
+
 #include <proton/import_export.h>
 #include <proton/type_compat.h>
 #include <proton/error.h>
@@ -39,6 +41,6 @@ typedef struct {
 } pn_frame_t;
 
 ssize_t pn_read_frame(pn_frame_t *frame, const char *bytes, size_t available, 
uint32_t max);
-size_t pn_write_frame(char *bytes, size_t size, pn_frame_t frame);
+size_t pn_write_frame(pn_buffer_t* buffer, pn_frame_t frame);
 
 #endif /* framing.h */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85755ac4/proton-c/src/core/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/transport.c b/proton-c/src/core/transport.c
index 3538e93..21d62db 100644
--- a/proton-c/src/core/transport.c
+++ b/proton-c/src/core/transport.c
@@ -578,10 +578,8 @@ pn_transport_t *pn_transport(void)
     return NULL;
   }
 
-  transport->capacity = 4*1024;
-  transport->available = 0;
-  transport->output = (char *) malloc(transport->capacity);
-  if (!transport->output) {
+  transport->output_buffer = pn_buffer(4*1024);
+  if (!transport->output_buffer) {
     pn_transport_free(transport);
     return NULL;
   }
@@ -682,7 +680,7 @@ static void pn_transport_finalize(void *object)
   pn_data_free(transport->output_args);
   pn_buffer_free(transport->frame);
   pn_free(transport->context);
-  free(transport->output);
+  pn_buffer_free(transport->output_buffer);
 }
 
 static void pni_post_remote_open_events(pn_transport_t *transport, 
pn_connection_t *connection) {
@@ -943,25 +941,20 @@ int pn_post_frame(pn_transport_t *transport, uint8_t 
type, uint16_t ch, const ch
     return PN_ERR;
   }
 
-  pn_frame_t frame = {0,};
+  pn_frame_t frame = {AMQP_FRAME_TYPE};
   frame.type = type;
   frame.channel = ch;
   frame.payload = buf.start;
   frame.size = wr;
-  size_t n;
-  while (!(n = pn_write_frame(transport->output + transport->available,
-                              transport->capacity - transport->available, 
frame))) {
-    transport->capacity *= 2;
-    transport->output = (char *) realloc(transport->output, 
transport->capacity);
-  }
+  pn_buffer_ensure(transport->output_buffer, 
AMQP_HEADER_SIZE+frame.ex_size+frame.size);
+  pn_write_frame(transport->output_buffer, frame);
   transport->output_frames_ct += 1;
   if (transport->trace & PN_TRACE_RAW) {
     pn_string_set(transport->scratch, "RAW: \"");
-    pn_quote(transport->scratch, transport->output + transport->available, n);
+    pn_buffer_quote(transport->output_buffer, transport->scratch, 
AMQP_HEADER_SIZE+frame.ex_size+frame.size);
     pn_string_addf(transport->scratch, "\"");
     pn_transport_log(transport, pn_string_get(transport->scratch));
   }
-  transport->available += n;
 
   return 0;
 }
@@ -1053,21 +1046,16 @@ static int pni_post_amqp_transfer_frame(pn_transport_t 
*transport, uint16_t ch,
     frame.payload = buf.start;
     frame.size = buf.size;
 
-    size_t n;
-    while (!(n = pn_write_frame(transport->output + transport->available,
-                                transport->capacity - transport->available, 
frame))) {
-      transport->capacity *= 2;
-      transport->output = (char *) realloc(transport->output, 
transport->capacity);
-    }
+    pn_buffer_ensure(transport->output_buffer, 
AMQP_HEADER_SIZE+frame.ex_size+frame.size);
+    pn_write_frame(transport->output_buffer, frame);
     transport->output_frames_ct += 1;
     framecount++;
     if (transport->trace & PN_TRACE_RAW) {
       pn_string_set(transport->scratch, "RAW: \"");
-      pn_quote(transport->scratch, transport->output + transport->available, 
n);
+      pn_buffer_quote(transport->output_buffer, transport->scratch, 
AMQP_HEADER_SIZE+frame.ex_size+frame.size);
       pn_string_addf(transport->scratch, "\"");
       pn_transport_log(transport, pn_string_get(transport->scratch));
     }
-    transport->available += n;
   } while (payload->size > 0 && framecount < frame_limit);
 
   return framecount;
@@ -2608,10 +2596,10 @@ static pn_timestamp_t pn_tick_amqp(pn_transport_t* 
transport, unsigned int layer
       transport->last_bytes_output = transport->bytes_output;
     } else if (transport->keepalive_deadline <= now) {
       transport->keepalive_deadline = now + 
(pn_timestamp_t)(transport->remote_idle_timeout/2.0);
-      if (transport->available == 0) {    // no outbound data pending
+      if (pn_buffer_size(transport->output_buffer) == 0) {    // no outbound 
data pending
         // so send empty frame (and account for it!)
         pn_post_frame(transport, AMQP_FRAME_TYPE, 0, "");
-        transport->last_bytes_output += transport->available;
+        transport->last_bytes_output += 
pn_buffer_size(transport->output_buffer);
       }
     }
     timeout = pn_timestamp_min( timeout, transport->keepalive_deadline );
@@ -2653,7 +2641,7 @@ static ssize_t pn_output_write_amqp(pn_transport_t* 
transport, unsigned int laye
   // write out any buffered data _before_ returning PN_EOS, else we
   // could truncate an outgoing Close frame containing a useful error
   // status
-  if (!transport->available && transport->close_sent) {
+  if (!pn_buffer_size(transport->output_buffer) && transport->close_sent) {
     return PN_EOS;
   }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85755ac4/proton-c/src/core/util.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/util.c b/proton-c/src/core/util.c
index 4309de3..a676e9f 100644
--- a/proton-c/src/core/util.c
+++ b/proton-c/src/core/util.c
@@ -19,15 +19,19 @@
  *
  */
 
+#include "util.h"
+
+#include "buffer.h"
+
+#include <proton/error.h>
+#include <proton/types.h>
+#include <proton/type_compat.h>
+
 #include <stdarg.h>
 #include <stdio.h>
 #include <stdlib.h>
-#include <proton/type_compat.h>
 #include <ctype.h>
 #include <string.h>
-#include <proton/error.h>
-#include <proton/types.h>
-#include "util.h"
 
 ssize_t pn_quote_data(char *dst, size_t capacity, const char *src, size_t size)
 {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85755ac4/proton-c/src/core/util.h
----------------------------------------------------------------------
diff --git a/proton-c/src/core/util.h b/proton-c/src/core/util.h
index 4d3ba3b..78b1c4d 100644
--- a/proton-c/src/core/util.h
+++ b/proton-c/src/core/util.h
@@ -22,6 +22,8 @@
  *
  */
 
+#include "buffer.h"
+
 #include <errno.h>
 #ifndef __cplusplus
 #include <stdbool.h>

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85755ac4/proton-c/src/sasl/sasl.c
----------------------------------------------------------------------
diff --git a/proton-c/src/sasl/sasl.c b/proton-c/src/sasl/sasl.c
index fe778be..bb95429 100644
--- a/proton-c/src/sasl/sasl.c
+++ b/proton-c/src/sasl/sasl.c
@@ -637,7 +637,7 @@ static ssize_t pn_output_write_sasl(pn_transport_t* 
transport, unsigned int laye
 
   pni_post_sasl_frame(transport);
 
-  if (transport->available != 0 || !pni_sasl_is_final_output_state(sasl)) {
+  if (pn_buffer_size(transport->output_buffer) != 0 || 
!pni_sasl_is_final_output_state(sasl)) {
     return pn_dispatcher_output(transport, bytes, available);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to