PROTON-1687: consolidate transport buffers in transport - Use pn_buffer_t for transport output buffer.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/85755ac4 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/85755ac4 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/85755ac4 Branch: refs/heads/go1 Commit: 85755ac4bde591a02953d080a36a23af80287be8 Parents: 7a936e9 Author: Andrew Stitcher <[email protected]> Authored: Tue Aug 16 17:32:23 2016 -0400 Committer: Andrew Stitcher <[email protected]> Committed: Fri Dec 8 15:46:21 2017 -0500 ---------------------------------------------------------------------- proton-c/src/core/buffer.c | 19 +++++++++++----- proton-c/src/core/buffer.h | 3 ++- proton-c/src/core/dispatcher.c | 6 ++--- proton-c/src/core/engine-internal.h | 7 +++--- proton-c/src/core/framing.c | 12 ++++++---- proton-c/src/core/framing.h | 4 +++- proton-c/src/core/transport.c | 38 +++++++++++--------------------- proton-c/src/core/util.c | 12 ++++++---- proton-c/src/core/util.h | 2 ++ proton-c/src/sasl/sasl.c | 2 +- 10 files changed, 56 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85755ac4/proton-c/src/core/buffer.c ---------------------------------------------------------------------- diff --git a/proton-c/src/core/buffer.c b/proton-c/src/core/buffer.c index 96c6b37..7bfc813 100644 --- a/proton-c/src/core/buffer.c +++ b/proton-c/src/core/buffer.c @@ -238,6 +238,11 @@ int pn_buffer_trim(pn_buffer_t *buf, size_t left, size_t right) { if (left + right > buf->size) return PN_ARG_ERR; + // In special case where we trim everything just clear buffer + if (left + right == buf->size) { + pn_buffer_clear(buf); + return 0; + } buf->start += left; if (buf->start >= buf->capacity) buf->start -= buf->capacity; @@ -301,11 +306,15 @@ pn_rwbytes_t pn_buffer_memory(pn_buffer_t *buf) } } -int pn_buffer_print(pn_buffer_t *buf) +int pn_buffer_quote(pn_buffer_t *buf, pn_string_t *str, size_t n) { - printf("pn_buffer(\""); - pn_print_data(buf->bytes + pni_buffer_head(buf), pni_buffer_head_size(buf)); - pn_print_data(buf->bytes, pni_buffer_tail_size(buf)); - printf("\")"); + size_t hsize = pni_buffer_head_size(buf); + size_t tsize = pni_buffer_tail_size(buf); + if (hsize >= n) { + pn_quote(str, buf->bytes + pni_buffer_head(buf), n); + return 0; + } + pn_quote(str, buf->bytes + pni_buffer_head(buf), hsize); + if (tsize-(n-hsize) > 0) pn_quote(str, buf->bytes, tsize-(n-hsize)); return 0; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85755ac4/proton-c/src/core/buffer.h ---------------------------------------------------------------------- diff --git a/proton-c/src/core/buffer.h b/proton-c/src/core/buffer.h index da557ef..7193f8f 100644 --- a/proton-c/src/core/buffer.h +++ b/proton-c/src/core/buffer.h @@ -23,6 +23,7 @@ */ #include <proton/import_export.h> +#include <proton/object.h> #include <proton/types.h> #ifdef __cplusplus @@ -45,7 +46,7 @@ void pn_buffer_clear(pn_buffer_t *buf); int pn_buffer_defrag(pn_buffer_t *buf); pn_bytes_t pn_buffer_bytes(pn_buffer_t *buf); pn_rwbytes_t pn_buffer_memory(pn_buffer_t *buf); -int pn_buffer_print(pn_buffer_t *buf); +int pn_buffer_quote(pn_buffer_t *buf, pn_string_t *string, size_t n); #ifdef __cplusplus } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85755ac4/proton-c/src/core/dispatcher.c ---------------------------------------------------------------------- diff --git a/proton-c/src/core/dispatcher.c b/proton-c/src/core/dispatcher.c index 36f8cc9..87e4d97 100644 --- a/proton-c/src/core/dispatcher.c +++ b/proton-c/src/core/dispatcher.c @@ -149,10 +149,8 @@ ssize_t pn_dispatcher_input(pn_transport_t *transport, const char *bytes, size_t ssize_t pn_dispatcher_output(pn_transport_t *transport, char *bytes, size_t size) { - int n = transport->available < size ? transport->available : size; - memmove(bytes, transport->output, n); - memmove(transport->output, transport->output + n, transport->available - n); - transport->available -= n; + int n = pn_buffer_get(transport->output_buffer, 0, size, bytes); + pn_buffer_trim(transport->output_buffer, n, 0); // XXX: need to check for errors return n; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85755ac4/proton-c/src/core/engine-internal.h ---------------------------------------------------------------------- diff --git a/proton-c/src/core/engine-internal.h b/proton-c/src/core/engine-internal.h index 39d1572..ec1603d 100644 --- a/proton-c/src/core/engine-internal.h +++ b/proton-c/src/core/engine-internal.h @@ -164,10 +164,9 @@ struct pn_transport_t { pn_data_t *args; pn_data_t *output_args; pn_buffer_t *frame; // frame under construction - // Temporary - size_t capacity; - size_t available; /* number of raw bytes pending output */ - char *output; + + // Temporary - ?? + pn_buffer_t *output_buffer; /* statistics */ uint64_t bytes_input; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85755ac4/proton-c/src/core/framing.c ---------------------------------------------------------------------- diff --git a/proton-c/src/core/framing.c b/proton-c/src/core/framing.c index 18e3c38..9f78666 100644 --- a/proton-c/src/core/framing.c +++ b/proton-c/src/core/framing.c @@ -83,20 +83,24 @@ ssize_t pn_read_frame(pn_frame_t *frame, const char *bytes, size_t available, ui return size; } -size_t pn_write_frame(char *bytes, size_t available, pn_frame_t frame) +size_t pn_write_frame(pn_buffer_t* buffer, pn_frame_t frame) { size_t size = AMQP_HEADER_SIZE + frame.ex_size + frame.size; - if (size <= available) + if (size <= pn_buffer_available(buffer)) { + // Prepare header + char bytes[8]; pn_i_write32(&bytes[0], size); int doff = (frame.ex_size + AMQP_HEADER_SIZE - 1)/4 + 1; bytes[4] = doff; bytes[5] = frame.type; pn_i_write16(&bytes[6], frame.channel); + // Write header then rest of frame + pn_buffer_append(buffer, bytes, 8); if (frame.extended) - memmove(bytes + AMQP_HEADER_SIZE, frame.extended, frame.ex_size); - memmove(bytes + 4*doff, frame.payload, frame.size); + pn_buffer_append(buffer, frame.extended, frame.ex_size); + pn_buffer_append(buffer, frame.payload, frame.size); return size; } else { return 0; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85755ac4/proton-c/src/core/framing.h ---------------------------------------------------------------------- diff --git a/proton-c/src/core/framing.h b/proton-c/src/core/framing.h index ecb88a4..792d664 100644 --- a/proton-c/src/core/framing.h +++ b/proton-c/src/core/framing.h @@ -22,6 +22,8 @@ * */ +#include "buffer.h" + #include <proton/import_export.h> #include <proton/type_compat.h> #include <proton/error.h> @@ -39,6 +41,6 @@ typedef struct { } pn_frame_t; ssize_t pn_read_frame(pn_frame_t *frame, const char *bytes, size_t available, uint32_t max); -size_t pn_write_frame(char *bytes, size_t size, pn_frame_t frame); +size_t pn_write_frame(pn_buffer_t* buffer, pn_frame_t frame); #endif /* framing.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85755ac4/proton-c/src/core/transport.c ---------------------------------------------------------------------- diff --git a/proton-c/src/core/transport.c b/proton-c/src/core/transport.c index 3538e93..21d62db 100644 --- a/proton-c/src/core/transport.c +++ b/proton-c/src/core/transport.c @@ -578,10 +578,8 @@ pn_transport_t *pn_transport(void) return NULL; } - transport->capacity = 4*1024; - transport->available = 0; - transport->output = (char *) malloc(transport->capacity); - if (!transport->output) { + transport->output_buffer = pn_buffer(4*1024); + if (!transport->output_buffer) { pn_transport_free(transport); return NULL; } @@ -682,7 +680,7 @@ static void pn_transport_finalize(void *object) pn_data_free(transport->output_args); pn_buffer_free(transport->frame); pn_free(transport->context); - free(transport->output); + pn_buffer_free(transport->output_buffer); } static void pni_post_remote_open_events(pn_transport_t *transport, pn_connection_t *connection) { @@ -943,25 +941,20 @@ int pn_post_frame(pn_transport_t *transport, uint8_t type, uint16_t ch, const ch return PN_ERR; } - pn_frame_t frame = {0,}; + pn_frame_t frame = {AMQP_FRAME_TYPE}; frame.type = type; frame.channel = ch; frame.payload = buf.start; frame.size = wr; - size_t n; - while (!(n = pn_write_frame(transport->output + transport->available, - transport->capacity - transport->available, frame))) { - transport->capacity *= 2; - transport->output = (char *) realloc(transport->output, transport->capacity); - } + pn_buffer_ensure(transport->output_buffer, AMQP_HEADER_SIZE+frame.ex_size+frame.size); + pn_write_frame(transport->output_buffer, frame); transport->output_frames_ct += 1; if (transport->trace & PN_TRACE_RAW) { pn_string_set(transport->scratch, "RAW: \""); - pn_quote(transport->scratch, transport->output + transport->available, n); + pn_buffer_quote(transport->output_buffer, transport->scratch, AMQP_HEADER_SIZE+frame.ex_size+frame.size); pn_string_addf(transport->scratch, "\""); pn_transport_log(transport, pn_string_get(transport->scratch)); } - transport->available += n; return 0; } @@ -1053,21 +1046,16 @@ static int pni_post_amqp_transfer_frame(pn_transport_t *transport, uint16_t ch, frame.payload = buf.start; frame.size = buf.size; - size_t n; - while (!(n = pn_write_frame(transport->output + transport->available, - transport->capacity - transport->available, frame))) { - transport->capacity *= 2; - transport->output = (char *) realloc(transport->output, transport->capacity); - } + pn_buffer_ensure(transport->output_buffer, AMQP_HEADER_SIZE+frame.ex_size+frame.size); + pn_write_frame(transport->output_buffer, frame); transport->output_frames_ct += 1; framecount++; if (transport->trace & PN_TRACE_RAW) { pn_string_set(transport->scratch, "RAW: \""); - pn_quote(transport->scratch, transport->output + transport->available, n); + pn_buffer_quote(transport->output_buffer, transport->scratch, AMQP_HEADER_SIZE+frame.ex_size+frame.size); pn_string_addf(transport->scratch, "\""); pn_transport_log(transport, pn_string_get(transport->scratch)); } - transport->available += n; } while (payload->size > 0 && framecount < frame_limit); return framecount; @@ -2608,10 +2596,10 @@ static pn_timestamp_t pn_tick_amqp(pn_transport_t* transport, unsigned int layer transport->last_bytes_output = transport->bytes_output; } else if (transport->keepalive_deadline <= now) { transport->keepalive_deadline = now + (pn_timestamp_t)(transport->remote_idle_timeout/2.0); - if (transport->available == 0) { // no outbound data pending + if (pn_buffer_size(transport->output_buffer) == 0) { // no outbound data pending // so send empty frame (and account for it!) pn_post_frame(transport, AMQP_FRAME_TYPE, 0, ""); - transport->last_bytes_output += transport->available; + transport->last_bytes_output += pn_buffer_size(transport->output_buffer); } } timeout = pn_timestamp_min( timeout, transport->keepalive_deadline ); @@ -2653,7 +2641,7 @@ static ssize_t pn_output_write_amqp(pn_transport_t* transport, unsigned int laye // write out any buffered data _before_ returning PN_EOS, else we // could truncate an outgoing Close frame containing a useful error // status - if (!transport->available && transport->close_sent) { + if (!pn_buffer_size(transport->output_buffer) && transport->close_sent) { return PN_EOS; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85755ac4/proton-c/src/core/util.c ---------------------------------------------------------------------- diff --git a/proton-c/src/core/util.c b/proton-c/src/core/util.c index 4309de3..a676e9f 100644 --- a/proton-c/src/core/util.c +++ b/proton-c/src/core/util.c @@ -19,15 +19,19 @@ * */ +#include "util.h" + +#include "buffer.h" + +#include <proton/error.h> +#include <proton/types.h> +#include <proton/type_compat.h> + #include <stdarg.h> #include <stdio.h> #include <stdlib.h> -#include <proton/type_compat.h> #include <ctype.h> #include <string.h> -#include <proton/error.h> -#include <proton/types.h> -#include "util.h" ssize_t pn_quote_data(char *dst, size_t capacity, const char *src, size_t size) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85755ac4/proton-c/src/core/util.h ---------------------------------------------------------------------- diff --git a/proton-c/src/core/util.h b/proton-c/src/core/util.h index 4d3ba3b..78b1c4d 100644 --- a/proton-c/src/core/util.h +++ b/proton-c/src/core/util.h @@ -22,6 +22,8 @@ * */ +#include "buffer.h" + #include <errno.h> #ifndef __cplusplus #include <stdbool.h> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/85755ac4/proton-c/src/sasl/sasl.c ---------------------------------------------------------------------- diff --git a/proton-c/src/sasl/sasl.c b/proton-c/src/sasl/sasl.c index fe778be..bb95429 100644 --- a/proton-c/src/sasl/sasl.c +++ b/proton-c/src/sasl/sasl.c @@ -637,7 +637,7 @@ static ssize_t pn_output_write_sasl(pn_transport_t* transport, unsigned int laye pni_post_sasl_frame(transport); - if (transport->available != 0 || !pni_sasl_is_final_output_state(sasl)) { + if (pn_buffer_size(transport->output_buffer) != 0 || !pni_sasl_is_final_output_state(sasl)) { return pn_dispatcher_output(transport, bytes, available); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
