This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit ace3feea6314c49ffc188fc6732a3f6b67c26e28 Author: Andrew Stitcher <[email protected]> AuthorDate: Tue Jun 22 15:51:45 2021 -0400 PROTON-2451: Refactor AMQP frame to allow code generation for fill --- c/CMakeLists.txt | 1 + c/src/core/dispatch_actions.h | 3 - c/src/core/engine-internal.h | 2 +- c/src/core/framing.c | 43 ++++++++++++ c/src/core/framing.h | 7 ++ c/src/core/post_frame.c | 77 ++++++++++++++++++++++ c/src/core/transport.c | 149 +++++++++++++----------------------------- c/src/sasl/sasl.c | 24 ++++--- 8 files changed, 188 insertions(+), 118 deletions(-) diff --git a/c/CMakeLists.txt b/c/CMakeLists.txt index bba89ef..580f83f 100644 --- a/c/CMakeLists.txt +++ b/c/CMakeLists.txt @@ -227,6 +227,7 @@ set (qpid-proton-core src/core/types.c src/core/framing.c + src/core/post_frame.c src/core/value_dump.c src/core/codec.c diff --git a/c/src/core/dispatch_actions.h b/c/src/core/dispatch_actions.h index ea2d8b2..916ff55 100644 --- a/c/src/core/dispatch_actions.h +++ b/c/src/core/dispatch_actions.h @@ -24,9 +24,6 @@ #include "dispatcher.h" -#define AMQP_FRAME_TYPE (0) -#define SASL_FRAME_TYPE (1) - /* AMQP actions */ int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h index 471f597..4313ab4 100644 --- a/c/src/core/engine-internal.h +++ b/c/src/core/engine-internal.h @@ -378,7 +378,7 @@ void pn_link_unbound(pn_link_t* link); void pn_ep_incref(pn_endpoint_t *endpoint); void pn_ep_decref(pn_endpoint_t *endpoint); -int pn_post_frame(pn_transport_t *transport, uint8_t type, uint16_t ch, const char *fmt, ...); +pn_bytes_t pn_fill_performative(pn_transport_t *transport, const char *fmt, ...); #if __cplusplus } diff --git a/c/src/core/framing.c b/c/src/core/framing.c index 547938a..79ead83 100644 --- a/c/src/core/framing.c +++ b/c/src/core/framing.c @@ -115,3 +115,46 @@ size_t pn_write_frame(pn_buffer_t* buffer, pn_frame_t frame, pn_logger_t *logger return 0; } } + +static inline void pn_post_frame(pn_buffer_t *output, pn_logger_t *logger, uint8_t type, uint16_t ch, pn_bytes_t performative, pn_bytes_t payload) +{ + pn_frame_t frame = { + .type = type, + .channel = ch, + .frame_payload0 = performative, + .frame_payload1 = payload + }; + pn_buffer_ensure(output, AMQP_HEADER_SIZE+frame.extended.size+frame.frame_payload0.size+frame.frame_payload1.size); + pn_write_frame(output, frame, logger); +} + +int pn_framing_send_amqp(pn_transport_t *transport, uint16_t ch, pn_bytes_t performative) +{ + if (!performative.start) + return PN_ERR; + + pn_post_frame(transport->output_buffer, &transport->logger, AMQP_FRAME_TYPE, ch, performative, (pn_bytes_t){0, NULL}); + transport->output_frames_ct += 1; + return 0; +} + +int pn_framing_send_amqp_with_payload(pn_transport_t *transport, uint16_t ch, pn_bytes_t performative, pn_bytes_t payload) +{ + if (!performative.start) + return PN_ERR; + + pn_post_frame(transport->output_buffer, &transport->logger, AMQP_FRAME_TYPE, ch, performative, payload); + transport->output_frames_ct += 1; + return 0; +} + +int pn_framing_send_sasl(pn_transport_t *transport, pn_bytes_t performative) +{ + if (!performative.start) + return PN_ERR; + + // All SASL frames go on channel 0 + pn_post_frame(transport->output_buffer, &transport->logger, SASL_FRAME_TYPE, 0, performative, (pn_bytes_t){0, NULL}); + transport->output_frames_ct += 1; + return 0; +} diff --git a/c/src/core/framing.h b/c/src/core/framing.h index c5fcaac..795db83 100644 --- a/c/src/core/framing.h +++ b/c/src/core/framing.h @@ -33,6 +33,9 @@ #define AMQP_MIN_MAX_FRAME_SIZE ((uint32_t)512) // minimum allowable max-frame #define AMQP_MAX_WINDOW_SIZE (2147483647) +#define AMQP_FRAME_TYPE (0) +#define SASL_FRAME_TYPE (1) + typedef struct { uint8_t type; uint16_t channel; @@ -44,4 +47,8 @@ typedef struct { ssize_t pn_read_frame(pn_frame_t *frame, const char *bytes, size_t available, uint32_t max, pn_logger_t *logger); size_t pn_write_frame(pn_buffer_t* buffer, pn_frame_t frame, pn_logger_t *logger); +int pn_framing_send_amqp(pn_transport_t *transport, uint16_t ch, pn_bytes_t performative); +int pn_framing_send_amqp_with_payload(pn_transport_t *transport, uint16_t ch, pn_bytes_t performative, pn_bytes_t payload); +int pn_framing_send_sasl(pn_transport_t *transport, pn_bytes_t performative); + #endif /* framing.h */ diff --git a/c/src/core/post_frame.c b/c/src/core/post_frame.c new file mode 100644 index 0000000..5125816 --- /dev/null +++ b/c/src/core/post_frame.c @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/codec.h" +#include "proton/logger.h" +#include "proton/object.h" +#include "proton/type_compat.h" + +#include "buffer.h" +#include "engine-internal.h" +#include "framing.h" +#include "dispatch_actions.h" + +static inline struct out {int err; pn_bytes_t bytes;} pn_vfill_performative(pn_buffer_t *frame_buf, pn_data_t *output_args, const char *fmt, va_list ap) +{ + pn_data_clear(output_args); + int err = pn_data_vfill(output_args, fmt, ap); + if (err) { + return (struct out){err, pn_bytes_null}; + } + +encode_performatives: + pn_buffer_clear( frame_buf ); + pn_rwbytes_t buf = pn_buffer_memory( frame_buf ); + buf.size = pn_buffer_available( frame_buf ); + + ssize_t wr = pn_data_encode( output_args, buf.start, buf.size ); + if (wr < 0) { + if (wr == PN_OVERFLOW) { + pn_buffer_ensure( frame_buf, pn_buffer_available( frame_buf ) * 2 ); + goto encode_performatives; + } + return (struct out){wr, pn_bytes_null}; + } + return (struct out){0, {.size = wr, .start = buf.start}}; +} + +static inline void pn_log_fill_error(pn_logger_t *logger, pn_data_t *data, int error, const char *fmt) { + if (pn_error_code(pn_data_error(data))) { + pn_logger_logf(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, + "error posting frame: %s, %s: %s", fmt, pn_code(error), + pn_error_text(pn_data_error(data))); + } else { + pn_logger_logf(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, + "error posting frame: %s", pn_code(error)); + } +} + +pn_bytes_t pn_fill_performative(pn_transport_t *transport, const char *fmt, ...) +{ + va_list ap; + va_start(ap, fmt); + struct out out = pn_vfill_performative(transport->frame, transport->output_args, fmt, ap); + va_end(ap); + if (out.err){ + pn_log_fill_error(&transport->logger, transport->output_args, out.err, fmt); + } + return out.bytes; +} diff --git a/c/src/core/transport.c b/c/src/core/transport.c index fb1e02e..0e0f62e 100644 --- a/c/src/core/transport.c +++ b/c/src/core/transport.c @@ -881,53 +881,10 @@ static int pni_disposition_encode(pn_disposition_t *disposition, pn_data_t *data } } - -int pn_post_frame(pn_transport_t *transport, uint8_t type, uint16_t ch, const char *fmt, ...) -{ - pn_buffer_t *frame_buf = transport->frame; - va_list ap; - va_start(ap, fmt); - pn_data_clear(transport->output_args); - int err = pn_data_vfill(transport->output_args, fmt, ap); - va_end(ap); - if (err) { - pn_logger_logf(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, - "error posting frame: %s, %s: %s", fmt, pn_code(err), - pn_error_text(pn_data_error(transport->output_args))); - return PN_ERR; - } - - encode_performatives: - pn_buffer_clear( frame_buf ); - pn_rwbytes_t buf = pn_buffer_memory( frame_buf ); - buf.size = pn_buffer_available( frame_buf ); - - ssize_t wr = pn_data_encode( transport->output_args, buf.start, buf.size ); - if (wr < 0) { - if (wr == PN_OVERFLOW) { - pn_buffer_ensure( frame_buf, pn_buffer_available( frame_buf ) * 2 ); - goto encode_performatives; - } - pn_logger_logf(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, - "error posting frame: %s", pn_code(wr)); - return PN_ERR; - } - - pn_frame_t frame = {AMQP_FRAME_TYPE}; - frame.type = type; - frame.channel = ch; - frame.frame_payload0 = (pn_bytes_t){.size=wr, .start=buf.start}; - pn_buffer_ensure(transport->output_buffer, AMQP_HEADER_SIZE+frame.extended.size+frame.frame_payload0.size+frame.frame_payload1.size); - pn_write_frame(transport->output_buffer, frame, &transport->logger); - transport->output_frames_ct += 1; - - return 0; -} - static int pni_post_amqp_transfer_frame(pn_transport_t *transport, uint16_t ch, uint32_t handle, pn_sequence_t id, - pn_bytes_t *payload, + pn_bytes_t *full_payload, const pn_bytes_t *tag, uint32_t message_format, bool settled, @@ -941,13 +898,11 @@ static int pni_post_amqp_transfer_frame(pn_transport_t *transport, uint16_t ch, { bool more_flag = more; unsigned framecount = 0; - pn_buffer_t *frame = transport->frame; - // create performatives, assuming 'more' flag need not change - - compute_performatives: - pn_data_clear(transport->output_args); - int err = pn_data_fill(transport->output_args, "DL[IIzI?o?on?DLC?o?o?o]", TRANSFER, + // create performative, assuming 'more' flag need not change + compute_performatives:; + pn_bytes_t performative = + pn_fill_performative(transport, "DL[IIzI?o?on?DLC?o?o?o]", TRANSFER, handle, id, tag->size, tag->start, @@ -958,36 +913,19 @@ static int pni_post_amqp_transfer_frame(pn_transport_t *transport, uint16_t ch, resume, resume, aborted, aborted, batchable, batchable); - if (err) { - pn_logger_logf(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, - "error posting transfer frame: %s: %s", pn_code(err), - pn_error_text(pn_data_error(transport->output_args))); + if (!performative.start) { return PN_ERR; } - do { // send as many frames as possible without changing the 'more' flag... + // At this point the side affect of the fill is to encode the performative into transport->frame - encode_performatives: - pn_buffer_clear( frame ); - pn_rwbytes_t buf = pn_buffer_memory( frame ); - buf.size = pn_buffer_available( frame ); - - ssize_t wr = pn_data_encode(transport->output_args, buf.start, buf.size); - if (wr < 0) { - if (wr == PN_OVERFLOW) { - pn_buffer_ensure( frame, pn_buffer_available( frame ) * 2 ); - goto encode_performatives; - } - pn_logger_logf(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, "error posting frame: %s", pn_code(wr)); - return PN_ERR; - } - buf.size = wr; + do { // send as many frames as possible without changing the 'more' flag... // check if we need to break up the outbound frame - size_t available = payload->size; + size_t available = full_payload->size; if (transport->remote_max_frame) { - if ((available + buf.size) > transport->remote_max_frame - 8) { - available = transport->remote_max_frame - 8 - buf.size; + if ((available + performative.size) > transport->remote_max_frame - AMQP_HEADER_SIZE) { + available = transport->remote_max_frame - AMQP_HEADER_SIZE - performative.size; if (more_flag == false) { more_flag = true; goto compute_performatives; // deal with flag change @@ -998,20 +936,13 @@ static int pni_post_amqp_transfer_frame(pn_transport_t *transport, uint16_t ch, goto compute_performatives; } } + pn_bytes_t payload = {.size = available, .start = full_payload->start}; + pn_framing_send_amqp_with_payload(transport, ch, performative, payload); - pn_frame_t frame = {AMQP_FRAME_TYPE}; - frame.channel = ch; - frame.frame_payload0 = (pn_bytes_t){.size=buf.size, .start=buf.start}; - frame.frame_payload1 = (pn_bytes_t){.size=available, .start=payload->start}; - - payload->start += available; - payload->size -= available; - - pn_buffer_ensure(transport->output_buffer, AMQP_HEADER_SIZE+frame.extended.size+frame.frame_payload0.size+frame.frame_payload1.size); - pn_write_frame(transport->output_buffer, frame, &transport->logger); - transport->output_frames_ct += 1; + full_payload->start += available; + full_payload->size -= available; framecount++; - } while (payload->size > 0 && framecount < frame_limit); + } while (full_payload->size > 0 && framecount < frame_limit); return framecount; } @@ -1030,8 +961,9 @@ static int pni_post_close(pn_transport_t *transport, pn_condition_t *cond) info = pn_condition_info(cond); } - return pn_post_frame(transport, AMQP_FRAME_TYPE, 0, "DL[?DL[sSC]]", CLOSE, + pn_bytes_t buf = pn_fill_performative(transport, "DL[?DL[sSC]]", CLOSE, (bool) condition, ERROR, condition, description, info); + return pn_framing_send_amqp(transport, 0, buf); } static pn_collector_t *pni_transport_collector(pn_transport_t *transport) @@ -1917,7 +1849,7 @@ static int pni_process_conn_setup(pn_transport_t *transport, pn_endpoint_t *endp pn_connection_t *connection = (pn_connection_t *) endpoint; const char *cid = pn_string_get(connection->container); pni_calculate_channel_max(transport); - int err = pn_post_frame(transport, AMQP_FRAME_TYPE, 0, "DL[SS?I?H?InnMMC]", OPEN, + pn_bytes_t buf = pn_fill_performative(transport, "DL[SS?I?H?InnMMC]", OPEN, cid ? cid : "", pn_string_get(connection->hostname), // TODO: This is messy, because we also have to allow local_max_frame_ to be 0 to mean unlimited @@ -1929,6 +1861,7 @@ static int pni_process_conn_setup(pn_transport_t *transport, pn_endpoint_t *endp connection->offered_capabilities, connection->desired_capabilities, connection->properties); + int err = pn_framing_send_amqp(transport, 0, buf); if (err) return err; transport->open_sent = true; } @@ -1968,7 +1901,7 @@ static size_t pni_session_incoming_window(pn_session_t *ssn) pn_condition_format( pn_transport_condition(t), "amqp:internal-error", - "session capacity %" PN_ZU " is less than frame size %" PRIu32, + "session capacity %zu is less than frame size %" PRIu32, capacity, size); pn_transport_close_tail(t); return 0; @@ -2004,11 +1937,12 @@ static int pni_process_ssn_setup(pn_transport_t *transport, pn_endpoint_t *endpo } state->incoming_window = pni_session_incoming_window(ssn); state->outgoing_window = pni_session_outgoing_window(ssn); - pn_post_frame(transport, AMQP_FRAME_TYPE, state->local_channel, "DL[?HIII]", BEGIN, + pn_bytes_t buf = pn_fill_performative(transport, "DL[?HIII]", BEGIN, ((int16_t) state->remote_channel >= 0), state->remote_channel, state->outgoing_transfer_count, state->incoming_window, state->outgoing_window); + pn_framing_send_amqp(transport, state->local_channel, buf); } } @@ -2060,8 +1994,7 @@ static int pni_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endp pni_map_local_handle(link); const pn_distribution_mode_t dist_mode = (pn_distribution_mode_t) link->source.distribution_mode; if (link->target.type == PN_COORDINATOR) { - int err = pn_post_frame(transport, AMQP_FRAME_TYPE, ssn_state->local_channel, - "DL[SIoBB?DL[SIsIoC?sCnCC]DL[C]nnI]", ATTACH, + pn_bytes_t buf = pn_fill_performative(transport, "DL[SIoBB?DL[SIsIoC?sCnCC]DL[C]nnI]", ATTACH, pn_string_get(link->name), state->local_handle, endpoint->type == RECEIVER, @@ -2080,10 +2013,10 @@ static int pni_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endp link->source.capabilities, COORDINATOR, link->target.capabilities, 0); + int err = pn_framing_send_amqp(transport, ssn_state->local_channel, buf); if (err) return err; } else { - int err = pn_post_frame(transport, AMQP_FRAME_TYPE, ssn_state->local_channel, - "DL[SIoBB?DL[SIsIoC?sCnMM]?DL[SIsIoCM]nnILnnC]", ATTACH, + pn_bytes_t buf = pn_fill_performative(transport, "DL[SIoBB?DL[SIsIoC?sCnMM]?DL[SIsIoCM]nnILnnC]", ATTACH, pn_string_get(link->name), state->local_handle, endpoint->type == RECEIVER, @@ -2114,6 +2047,7 @@ static int pni_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endp 0, link->max_message_size, link->properties); + int err = pn_framing_send_amqp(transport, ssn_state->local_channel, buf); if (err) return err; } } @@ -2128,7 +2062,7 @@ static int pni_post_flow(pn_transport_t *transport, pn_session_t *ssn, pn_link_t ssn->state.outgoing_window = pni_session_outgoing_window(ssn); bool linkq = (bool) link; pn_link_state_t *state = &link->state; - return pn_post_frame(transport, AMQP_FRAME_TYPE, ssn->state.local_channel, "DL[?IIII?I?I?In?o]", FLOW, + pn_bytes_t buf = pn_fill_performative(transport, "DL[?IIII?I?I?In?o]", FLOW, (int16_t) ssn->state.remote_channel >= 0, ssn->state.incoming_transfer_count, ssn->state.incoming_window, ssn->state.outgoing_transfer_count, @@ -2137,6 +2071,7 @@ static int pni_post_flow(pn_transport_t *transport, pn_session_t *ssn, pn_link_t linkq, linkq ? state->delivery_count : 0, linkq, linkq ? state->link_credit : 0, linkq, linkq ? link->drain : false); + return pn_framing_send_amqp(transport, ssn->state.local_channel, buf); } static int pni_process_flow_receiver(pn_transport_t *transport, pn_endpoint_t *endpoint) @@ -2162,12 +2097,13 @@ static int pni_flush_disp(pn_transport_t *transport, pn_session_t *ssn) uint64_t code = ssn->state.disp_code; bool settled = ssn->state.disp_settled; if (ssn->state.disp) { - int err = pn_post_frame(transport, AMQP_FRAME_TYPE, ssn->state.local_channel, "DL[oI?I?o?DL[]]", DISPOSITION, + pn_bytes_t buf = pn_fill_performative(transport, "DL[oI?I?o?DL[]]", DISPOSITION, ssn->state.disp_type, ssn->state.disp_first, ssn->state.disp_last!=ssn->state.disp_first, ssn->state.disp_last, settled, settled, (bool)code, code); + int err = pn_framing_send_amqp(transport, ssn->state.local_channel, buf); if (err) return err; ssn->state.disp_type = 0; ssn->state.disp_code = 0; @@ -2197,11 +2133,11 @@ static int pni_post_disp(pn_transport_t *transport, pn_delivery_t *delivery) if (!pni_disposition_batchable(&delivery->local)) { pn_data_clear(transport->disp_data); PN_RETURN_IF_ERROR(pni_disposition_encode(&delivery->local, transport->disp_data)); - return pn_post_frame(transport, AMQP_FRAME_TYPE, ssn->state.local_channel, - "DL[oIn?o?DLC]", DISPOSITION, + pn_bytes_t buf = pn_fill_performative(transport, "DL[oIn?o?DLC]", DISPOSITION, role, state->id, delivery->local.settled, delivery->local.settled, (bool)code, code, transport->disp_data); + return pn_framing_send_amqp(transport, ssn->state.local_channel, buf); } if (ssn_state->disp && code == ssn_state->disp_code && @@ -2435,11 +2371,11 @@ static int pni_process_link_teardown(pn_transport_t *transport, pn_endpoint_t *e info = pn_condition_info(&endpoint->condition); } - int err = - pn_post_frame(transport, AMQP_FRAME_TYPE, ssn_state->local_channel, - "DL[I?o?DL[sSC]]", DETACH, state->local_handle, - !link->detached, !link->detached, - (bool)name, ERROR, name, description, info); + pn_bytes_t buf = pn_fill_performative(transport, "DL[I?o?DL[sSC]]", DETACH, + state->local_handle, + !link->detached, !link->detached, + (bool)name, ERROR, name, description, info); + int err = pn_framing_send_amqp(transport, ssn_state->local_channel, buf); if (err) return err; pni_unmap_local_handle(link); } @@ -2511,8 +2447,9 @@ static int pni_process_ssn_teardown(pn_transport_t *transport, pn_endpoint_t *en info = pn_condition_info(&endpoint->condition); } - int err = pn_post_frame(transport, AMQP_FRAME_TYPE, state->local_channel, "DL[?DL[sSC]]", END, + pn_bytes_t buf = pn_fill_performative(transport, "DL[?DL[sSC]]", END, (bool) name, ERROR, name, description, info); + int err = pn_framing_send_amqp(transport, state->local_channel, buf); if (err) return err; pni_unmap_local_channel(session); } @@ -2586,7 +2523,8 @@ static void pn_error_amqp(pn_transport_t* transport, unsigned int layer) { if (!transport->close_sent) { if (!transport->open_sent) { - pn_post_frame(transport, AMQP_FRAME_TYPE, 0, "DL[S]", OPEN, ""); + pn_bytes_t buf = pn_fill_performative(transport, "DL[S]", OPEN, ""); + pn_framing_send_amqp(transport, 0, buf); } pni_post_close(transport, &transport->condition); @@ -2683,7 +2621,8 @@ static int64_t pn_tick_amqp(pn_transport_t* transport, unsigned int layer, int64 transport->keepalive_deadline = now + (pn_timestamp_t)(transport->remote_idle_timeout/2.0); if (pn_buffer_size(transport->output_buffer) == 0) { // no outbound data pending // so send empty frame (and account for it!) - pn_post_frame(transport, AMQP_FRAME_TYPE, 0, ""); + pn_bytes_t buf = pn_bytes(0,""); + pn_framing_send_amqp(transport, 0, buf); transport->last_bytes_output += pn_buffer_size(transport->output_buffer); } } diff --git a/c/src/sasl/sasl.c b/c/src/sasl/sasl.c index dfb4530..d9fd79b 100644 --- a/c/src/sasl/sasl.c +++ b/c/src/sasl/sasl.c @@ -22,7 +22,7 @@ #include "sasl-internal.h" #include "core/autodetect.h" -#include "core/dispatch_actions.h" +#include "core/framing.h" #include "core/engine-internal.h" #include "core/util.h" #include "platform/platform_fmt.h" @@ -484,11 +484,12 @@ static void pni_post_sasl_frame(pn_transport_t *transport) enum pnx_sasl_state desired_state = sasl->desired_state; while (sasl->desired_state > sasl->last_state) { switch (desired_state) { - case SASL_POSTED_INIT: - pn_post_frame(transport, SASL_FRAME_TYPE, 0, "DL[szS]", SASL_INIT, sasl->selected_mechanism, - out.size, out.start, sasl->local_fqdn); + case SASL_POSTED_INIT: { + pn_bytes_t buf = pn_fill_performative(transport, "DL[szS]", SASL_INIT, sasl->selected_mechanism, out.size, out.start, sasl->local_fqdn); + pn_framing_send_sasl(transport, buf); pni_emit(transport); break; + } case SASL_POSTED_MECHANISMS: { // TODO(PROTON-2122) Replace magic number 32 with dynamically sized memory char *mechs[32]; @@ -499,14 +500,16 @@ static void pni_post_sasl_frame(pn_transport_t *transport) pni_split_mechs(mechlist, sasl->included_mechanisms, mechs, &count); } - pn_post_frame(transport, SASL_FRAME_TYPE, 0, "DL[@T[*s]]", SASL_MECHANISMS, PN_SYMBOL, count, mechs); + pn_bytes_t buf = pn_fill_performative(transport, "DL[@T[*s]]", SASL_MECHANISMS, PN_SYMBOL, count, mechs); free(mechlist); + pn_framing_send_sasl(transport, buf); pni_emit(transport); break; } case SASL_POSTED_RESPONSE: if (sasl->last_state != SASL_POSTED_RESPONSE) { - pn_post_frame(transport, SASL_FRAME_TYPE, 0, "DL[Z]", SASL_RESPONSE, out.size, out.start); + pn_bytes_t buf = pn_fill_performative(transport, "DL[Z]", SASL_RESPONSE, out.size, out.start); + pn_framing_send_sasl(transport, buf); pni_emit(transport); } break; @@ -515,16 +518,18 @@ static void pni_post_sasl_frame(pn_transport_t *transport) desired_state = SASL_POSTED_MECHANISMS; continue; } else if (sasl->last_state != SASL_POSTED_CHALLENGE) { - pn_post_frame(transport, SASL_FRAME_TYPE, 0, "DL[Z]", SASL_CHALLENGE, out.size, out.start); + pn_bytes_t buf = pn_fill_performative(transport, "DL[Z]", SASL_CHALLENGE, out.size, out.start); + pn_framing_send_sasl(transport, buf); pni_emit(transport); } break; - case SASL_POSTED_OUTCOME: + case SASL_POSTED_OUTCOME: { if (sasl->last_state < SASL_POSTED_MECHANISMS) { desired_state = SASL_POSTED_MECHANISMS; continue; } - pn_post_frame(transport, SASL_FRAME_TYPE, 0, "DL[Bz]", SASL_OUTCOME, sasl->outcome, out.size, out.start); + pn_bytes_t buf = pn_fill_performative(transport, "DL[Bz]", SASL_OUTCOME, sasl->outcome, out.size, out.start); + pn_framing_send_sasl(transport, buf); pni_emit(transport); if (sasl->outcome!=PN_SASL_OK) { pn_do_error(transport, "amqp:unauthorized-access", "Failed to authenticate client [mech=%s]", @@ -532,6 +537,7 @@ static void pni_post_sasl_frame(pn_transport_t *transport) desired_state = SASL_ERROR; } break; + } case SASL_RECVED_SUCCESS: if (sasl->last_state < SASL_POSTED_INIT) { desired_state = SASL_POSTED_INIT; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
