This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit a2c54b3a245356b734e22bbdd5f732ed00d44398 Author: Andrew Stitcher <[email protected]> AuthorDate: Mon May 8 20:15:42 2023 -0400 PROTON-2751: Use pn_rwbytes_t not pn_buffer_t for scratch space --- c/src/core/emitters.h | 15 ++++++++------- c/src/core/engine-internal.h | 2 +- c/src/core/transport.c | 28 ++++++++++++++-------------- c/src/core/util.h | 18 ++++++++++++++++++ c/src/sasl/sasl.c | 10 +++++----- c/tools/codec-generator/generate.py | 4 ++-- 6 files changed, 48 insertions(+), 29 deletions(-) diff --git a/c/src/core/emitters.h b/c/src/core/emitters.h index 9b4ec1b50..a9399ba5c 100644 --- a/c/src/core/emitters.h +++ b/c/src/core/emitters.h @@ -24,7 +24,9 @@ /* Definitions of AMQP type codes */ #include "encodings.h" -#include "buffer.h" +#include "engine-internal.h" +#include "protocol.h" +#include "util.h" #include <proton/codec.h> @@ -56,11 +58,10 @@ typedef struct pni_emitter_t { size_t position; } pni_emitter_t; -static inline pni_emitter_t make_emitter_from_buffer(pn_buffer_t* buffer) { - pn_rwbytes_t output_bytes = pn_buffer_free_memory(buffer); +static inline pni_emitter_t make_emitter_from_rwbytes(pn_rwbytes_t* output_bytes) { return (pni_emitter_t){ - .output_start = output_bytes.start, - .size = output_bytes.size, + .output_start = output_bytes->start, + .size = output_bytes->size, .position = 0 }; } @@ -81,8 +82,8 @@ static inline bool resize_required(pni_emitter_t* emitter) { return emitter->position > emitter->size; } -static inline void size_buffer_to_emitter(pn_buffer_t* buffer, pni_emitter_t* emitter) { - pn_buffer_ensure(buffer, pn_buffer_capacity(buffer)+(emitter->position-emitter->size)); +static inline void size_buffer_to_emitter(pn_rwbytes_t* buffer, pni_emitter_t* emitter) { + pn_rwbytes_realloc(buffer, buffer->size+emitter->position-emitter->size); } static inline bool encode_succeeded(pni_emitter_t* emitter, pni_compound_context* compound) { diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h index 29e84eb02..4cf6ea502 100644 --- a/c/src/core/engine-internal.h +++ b/c/src/core/engine-internal.h @@ -165,7 +165,7 @@ struct pn_transport_t { /* scratch area */ - pn_buffer_t *frame; // frame under construction + pn_rwbytes_t scratch_space; // Temporary - ?? pn_buffer_t *output_buffer; diff --git a/c/src/core/transport.c b/c/src/core/transport.c index 38d3a188d..e414789e5 100644 --- a/c/src/core/transport.c +++ b/c/src/core/transport.c @@ -410,7 +410,7 @@ static void pn_transport_initialize(void *object) transport->tracer = NULL; transport->sasl = NULL; transport->ssl = NULL; - transport->frame = pn_buffer(PN_TRANSPORT_INITIAL_FRAME_SIZE); + transport->scratch_space = pn_rwbytes_alloc(PN_TRANSPORT_INITIAL_FRAME_SIZE); transport->input_frames_ct = 0; transport->output_frames_ct = 0; @@ -662,7 +662,7 @@ static void pn_transport_finalize(void *object) pn_free(transport->remote_channels); pni_mem_subdeallocate(pn_class(transport), transport, transport->input_buf); pni_mem_subdeallocate(pn_class(transport), transport, transport->output_buf); - pn_buffer_free(transport->frame); + pn_rwbytes_free(transport->scratch_space); pn_free(transport->context); pn_buffer_free(transport->output_buffer); pni_logger_fini(&transport->logger); @@ -901,7 +901,7 @@ static int pni_post_amqp_transfer_frame(pn_transport_t *transport, uint16_t ch, compute_performatives:; /* "DL[IIzI?o?on?DLC?o?o?o]" */ pn_bytes_t performative = - pn_amqp_encode_DLEIIzIQoQonQDLCQoQoQoe(transport->frame, TRANSFER, + pn_amqp_encode_DLEIIzIQoQonQDLCQoQoQoe(&transport->scratch_space, TRANSFER, handle, id, tag->size, tag->start, @@ -960,7 +960,7 @@ static int pni_post_close(pn_transport_t *transport, pn_condition_t *cond) info = pn_condition_info(cond); } /* "DL[?DL[sSC]]" */ - pn_bytes_t buf = pn_amqp_encode_DLEQDLEsSCee(transport->frame, CLOSE, + pn_bytes_t buf = pn_amqp_encode_DLEQDLEsSCee(&transport->scratch_space, CLOSE, (bool) condition, ERROR, condition, description, info); return pn_framing_send_amqp(transport, 0, buf); } @@ -1871,7 +1871,7 @@ static int pni_process_conn_setup(pn_transport_t *transport, pn_endpoint_t *endp const char *cid = pn_string_get(connection->container); pni_calculate_channel_max(transport); /* "DL[SS?I?H?InnMMC]" */ - pn_bytes_t buf = pn_amqp_encode_DLESSQIQHQInnMMCe(transport->frame, OPEN, + pn_bytes_t buf = pn_amqp_encode_DLESSQIQHQInnMMCe(&transport->scratch_space, OPEN, cid ? cid : "", pn_string_get(connection->hostname), // TODO: This is messy, because we also have to allow local_max_frame_ to be 0 to mean unlimited @@ -1960,7 +1960,7 @@ static int pni_process_ssn_setup(pn_transport_t *transport, pn_endpoint_t *endpo state->incoming_window = pni_session_incoming_window(ssn); state->outgoing_window = pni_session_outgoing_window(ssn); /* "DL[?HIIII]" */ - pn_bytes_t buf = pn_amqp_encode_DLEQHIIIIe(transport->frame, BEGIN, + pn_bytes_t buf = pn_amqp_encode_DLEQHIIIIe(&transport->scratch_space, BEGIN, ((int16_t) state->remote_channel >= 0), state->remote_channel, state->outgoing_transfer_count, state->incoming_window, @@ -2021,7 +2021,7 @@ static int pni_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endp const pn_distribution_mode_t dist_mode = (pn_distribution_mode_t) link->source.distribution_mode; if (link->target.type == PN_COORDINATOR) { /* "DL[SIoBB?DL[SIsIoC?sCnCC]DL[C]nnI]" */ - pn_bytes_t buf = pn_amqp_encode_DLESIoBBQDLESIsIoCQsCnCCeDLECennIe(transport->frame, ATTACH, + pn_bytes_t buf = pn_amqp_encode_DLESIoBBQDLESIsIoCQsCnCCeDLECennIe(&transport->scratch_space, ATTACH, pn_string_get(link->name), state->local_handle, endpoint->type == RECEIVER, @@ -2044,7 +2044,7 @@ static int pni_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endp if (err) return err; } else { /* "DL[SIoBB?DL[SIsIoC?sCnMM]?DL[SIsIoCM]nnILnnC]" */ - pn_bytes_t buf = pn_amqp_encode_DLESIoBBQDLESIsIoCQsCnMMeQDLESIsIoCMennILnnCe(transport->frame, ATTACH, + pn_bytes_t buf = pn_amqp_encode_DLESIoBBQDLESIsIoCQsCnMMeQDLESIsIoCMennILnnCe(&transport->scratch_space, ATTACH, pn_string_get(link->name), state->local_handle, endpoint->type == RECEIVER, @@ -2091,7 +2091,7 @@ static int pni_post_flow(pn_transport_t *transport, pn_session_t *ssn, pn_link_t bool linkq = (bool) link; pn_link_state_t *state = &link->state; /* "DL[?IIII?I?I?In?o]" */ - pn_bytes_t buf = pn_amqp_encode_DLEQIIIIQIQIQInQoe(transport->frame, FLOW, + pn_bytes_t buf = pn_amqp_encode_DLEQIIIIQIQIQInQoe(&transport->scratch_space, FLOW, (int16_t) ssn->state.remote_channel >= 0, ssn->state.incoming_transfer_count, ssn->state.incoming_window, ssn->state.outgoing_transfer_count, @@ -2127,7 +2127,7 @@ static int pni_flush_disp(pn_transport_t *transport, pn_session_t *ssn) bool settled = ssn->state.disp_settled; if (ssn->state.disp) { /* "DL[oI?I?o?DL[]]" */ - pn_bytes_t buf = pn_amqp_encode_DLEoIQIQoQDLEee(transport->frame, DISPOSITION, + pn_bytes_t buf = pn_amqp_encode_DLEoIQIQoQDLEee(&transport->scratch_space, DISPOSITION, ssn->state.disp_type, ssn->state.disp_first, ssn->state.disp_last!=ssn->state.disp_first, ssn->state.disp_last, @@ -2164,7 +2164,7 @@ static int pni_post_disp(pn_transport_t *transport, pn_delivery_t *delivery) pn_data_clear(transport->disp_data); PN_RETURN_IF_ERROR(pni_disposition_encode(&delivery->local, transport->disp_data)); /* "DL[oIn?o?DLC]" */ - pn_bytes_t buf = pn_amqp_encode_DLEoInQoQDLCe(transport->frame,DISPOSITION, + pn_bytes_t buf = pn_amqp_encode_DLEoInQoQDLCe(&transport->scratch_space,DISPOSITION, role, state->id, delivery->local.settled, delivery->local.settled, (bool)code, code, transport->disp_data); @@ -2402,7 +2402,7 @@ static int pni_process_link_teardown(pn_transport_t *transport, pn_endpoint_t *e info = pn_condition_info(&endpoint->condition); } /* "DL[I?o?DL[sSC]]" */ - pn_bytes_t buf = pn_amqp_encode_DLEIQoQDLEsSCee(transport->frame, DETACH, + pn_bytes_t buf = pn_amqp_encode_DLEIQoQDLEsSCee(&transport->scratch_space, DETACH, state->local_handle, !link->detached, !link->detached, (bool)name, ERROR, name, description, info); @@ -2478,7 +2478,7 @@ static int pni_process_ssn_teardown(pn_transport_t *transport, pn_endpoint_t *en info = pn_condition_info(&endpoint->condition); } /* "DL[?DL[sSC]]" */ - pn_bytes_t buf = pn_amqp_encode_DLEQDLEsSCee(transport->frame, END, + pn_bytes_t buf = pn_amqp_encode_DLEQDLEsSCee(&transport->scratch_space, END, (bool) name, ERROR, name, description, info); int err = pn_framing_send_amqp(transport, state->local_channel, buf); if (err) return err; @@ -2555,7 +2555,7 @@ static void pn_error_amqp(pn_transport_t* transport, unsigned int layer) if (!transport->close_sent) { if (!transport->open_sent) { /* "DL[S]" */ - pn_bytes_t buf = pn_amqp_encode_DLESe(transport->frame, OPEN, ""); + pn_bytes_t buf = pn_amqp_encode_DLESe(&transport->scratch_space, OPEN, ""); pn_framing_send_amqp(transport, 0, buf); } diff --git a/c/src/core/util.h b/c/src/core/util.h index 7924bc9bd..7ed5b925f 100644 --- a/c/src/core/util.h +++ b/c/src/core/util.h @@ -55,6 +55,24 @@ static inline pn_bytes_t pn_string_bytes(struct pn_string_t *s) { return pn_bytes(pn_string_size(s), pn_string_get(s)); } +static inline pn_rwbytes_t pn_rwbytes_alloc(size_t size) { + char* space = malloc(size); + size_t s = space ? size : 0; + return (pn_rwbytes_t){.size=s, .start=space}; +} + + +static inline pn_rwbytes_t pn_rwbytes_realloc(pn_rwbytes_t *in, size_t size) { + char* space = realloc(in->start, size); + size_t s = space ? size : 0; + *in = (pn_rwbytes_t){.size=s, .start=space}; + return *in; +} + +static inline void pn_rwbytes_free(pn_rwbytes_t in) { + free((void*)in.start); +} + static inline void pni_write16(char *bytes, uint16_t value) { bytes[0] = 0xFF & (value >> 8); diff --git a/c/src/sasl/sasl.c b/c/src/sasl/sasl.c index 1e1d877af..0dff90a6a 100644 --- a/c/src/sasl/sasl.c +++ b/c/src/sasl/sasl.c @@ -491,7 +491,7 @@ static void pni_post_sasl_frame(pn_transport_t *transport) switch (desired_state) { case SASL_POSTED_INIT: { /* GENERATE_CODEC_CODE: "DL[szS]" */ - pn_bytes_t buf = pn_amqp_encode_DLEszSe(transport->frame, SASL_INIT, sasl->selected_mechanism, out.size, out.start, sasl->local_fqdn); + pn_bytes_t buf = pn_amqp_encode_DLEszSe(&transport->scratch_space, SASL_INIT, sasl->selected_mechanism, out.size, out.start, sasl->local_fqdn); pn_framing_send_sasl(transport, buf); pni_emit(transport); break; @@ -506,7 +506,7 @@ static void pni_post_sasl_frame(pn_transport_t *transport) pni_split_mechs(mechlist, sasl->included_mechanisms, mechs, &count); } /* GENERATE_CODEC_CODE: "DL[@T[*s]]" */ - pn_bytes_t buf = pn_amqp_encode_DLEATEjsee(transport->frame, SASL_MECHANISMS, PN_SYMBOL, count, mechs); + pn_bytes_t buf = pn_amqp_encode_DLEATEjsee(&transport->scratch_space, SASL_MECHANISMS, PN_SYMBOL, count, mechs); free(mechlist); pn_framing_send_sasl(transport, buf); pni_emit(transport); @@ -515,7 +515,7 @@ static void pni_post_sasl_frame(pn_transport_t *transport) case SASL_POSTED_RESPONSE: if (sasl->last_state != SASL_POSTED_RESPONSE) { /* "DL[Z]" */ - pn_bytes_t buf = pn_amqp_encode_DLEZe(transport->frame, SASL_RESPONSE, out.size, out.start); + pn_bytes_t buf = pn_amqp_encode_DLEZe(&transport->scratch_space, SASL_RESPONSE, out.size, out.start); pn_framing_send_sasl(transport, buf); pni_emit(transport); } @@ -526,7 +526,7 @@ static void pni_post_sasl_frame(pn_transport_t *transport) continue; } else if (sasl->last_state != SASL_POSTED_CHALLENGE) { /* "DL[Z]" */ - pn_bytes_t buf = pn_amqp_encode_DLEZe(transport->frame, SASL_CHALLENGE, out.size, out.start); + pn_bytes_t buf = pn_amqp_encode_DLEZe(&transport->scratch_space, SASL_CHALLENGE, out.size, out.start); pn_framing_send_sasl(transport, buf); pni_emit(transport); } @@ -537,7 +537,7 @@ static void pni_post_sasl_frame(pn_transport_t *transport) continue; } /* "DL[Bz]" */ - pn_bytes_t buf = pn_amqp_encode_DLEBze(transport->frame, SASL_OUTCOME, sasl->outcome, out.size, out.start); + pn_bytes_t buf = pn_amqp_encode_DLEBze(&transport->scratch_space, SASL_OUTCOME, sasl->outcome, out.size, out.start); pn_framing_send_sasl(transport, buf); pni_emit(transport); if (sasl->outcome!=PN_SASL_OK) { diff --git a/c/tools/codec-generator/generate.py b/c/tools/codec-generator/generate.py index 348df1bc0..e9ec2445c 100644 --- a/c/tools/codec-generator/generate.py +++ b/c/tools/codec-generator/generate.py @@ -414,7 +414,7 @@ def emit_function(name_prefix: str, fill_spec: str, prefix_args: List[Tuple[str, f'{function_spec}', '{', f'{p.mk_indent(1)}do {{', - f'{p.mk_indent(2)}pni_emitter_t emitter = make_emitter_from_buffer({", ".join(prefix_params)});', + f'{p.mk_indent(2)}pni_emitter_t emitter = make_emitter_from_rwbytes({", ".join(prefix_params)});', f'{p.mk_indent(2)}if ({p.mk_funcall(inner_function_name, ["&emitter", *args])}) {{', f'{p.mk_indent(3)}{p.mk_funcall("size_buffer_to_emitter", [*prefix_params, "&emitter"])};', f'{p.mk_indent(3)}continue;', @@ -494,7 +494,7 @@ def emit(fill_specs, decl_filename, impl_filename): decls: Dict[str, str] = {} defns: Dict[str, List[str]] = {} for fill_spec in fill_specs: - decl, defn = emit_function('pn_amqp_encode', fill_spec, [('buffer', 'pn_buffer_t*')]) + decl, defn = emit_function('pn_amqp_encode', fill_spec, [('buffer', 'pn_rwbytes_t*')]) decls[fill_spec] = decl defns[fill_spec] = defn if decl_filename and impl_filename: --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
