This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 8ad74c6cde0217a5984947e19bdd6421a4c8c91b Author: Andrew Stitcher <[email protected]> AuthorDate: Fri Jun 11 17:52:18 2021 -0400 PROTON-2451: Refactor to separate out extracting performative code ... From rest of performative frame structure --- c/src/core/consumers.h | 61 +++++++++++++++++++++++++++++++++ c/src/core/dispatch_actions.h | 28 +++++++-------- c/src/core/dispatcher.c | 43 +++++++---------------- c/src/core/dispatcher.h | 2 +- c/src/core/framing.c | 12 +++++++ c/src/core/framing.h | 3 ++ c/src/core/logger.c | 6 ++-- c/src/core/transport.c | 80 +++++++++++++++++++++++++++++++++---------- c/src/sasl/sasl.c | 35 ++++++++++++++++--- 9 files changed, 197 insertions(+), 73 deletions(-) diff --git a/c/src/core/consumers.h b/c/src/core/consumers.h index 5bba0c0..d2542b1 100644 --- a/c/src/core/consumers.h +++ b/c/src/core/consumers.h @@ -93,6 +93,25 @@ static inline bool pni_consumer_readf32(pni_consumer_t *consumer, uint32_t* resu return true; } +static inline bool pni_consumer_readf64(pni_consumer_t *consumer, uint64_t* result) +{ + uint32_t a; + if (!pni_consumer_readf32(consumer, &a)) return false; + uint32_t b; + if (!pni_consumer_readf32(consumer, &b)) return false; + *result = (uint64_t)a << 32 | (uint64_t)b; + return true; +} + +static inline bool pni_consumer_readf128(pni_consumer_t *consumer, void *dst) +{ + if (consumer->position+16 > consumer->size) return false; + + memcpy(dst, &consumer->output_start[consumer->position], 16); + consumer->position += 16; + return true; +} + static inline bool pni_consumer_read_value_not_described(pni_consumer_t* consumer, uint8_t type, pn_bytes_t *value) { uint8_t subcategory = type >> 4; switch (subcategory) { @@ -161,4 +180,46 @@ static inline bool pni_consumer_read_value_not_described(pni_consumer_t* consume return false; } +/////////////////////////////////////////////////////////////////////////////// + +static inline bool consume_expected_ubyte(pni_consumer_t* consumer, uint8_t expected) +{ + uint8_t e; + return pni_consumer_readf8(consumer, &e) && e==expected; +} + +static inline bool consume_ulong(pni_consumer_t* consumer, uint64_t *ulong) { + uint8_t type; + if (!pni_consumer_readf8(consumer, &type)) return false; + switch (type) { + case PNE_SMALLULONG: { + uint8_t ul; + if (!pni_consumer_readf8(consumer, &ul)) return false; + *ulong = ul; + break; + } + case PNE_ULONG: { + uint64_t ul; + if (!pni_consumer_readf64(consumer, &ul)) return false; + *ulong = ul; + break; + } + case PNE_ULONG0: { + *ulong = 0; + break; + } + default: + return false; + } + return true; +} + +// XXX: assuming numeric - +// if we get a symbol we should map it to the numeric value and dispatch on that +static inline bool consume_descriptor(pni_consumer_t* consumer, uint64_t *descriptor) { + return + consume_expected_ubyte(consumer, PNE_DESCRIPTOR) && + consume_ulong(consumer, descriptor); +} + #endif // PROTON_CONSUMERS_H diff --git a/c/src/core/dispatch_actions.h b/c/src/core/dispatch_actions.h index 916ff55..5551364 100644 --- a/c/src/core/dispatch_actions.h +++ b/c/src/core/dispatch_actions.h @@ -26,21 +26,21 @@ /* AMQP actions */ -int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); -int pn_do_begin(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); -int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); -int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); -int pn_do_flow(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); -int pn_do_disposition(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); -int pn_do_detach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); -int pn_do_end(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); -int pn_do_close(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); +int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload); +int pn_do_begin(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload); +int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload); +int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload); +int pn_do_flow(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload); +int pn_do_disposition(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload); +int pn_do_detach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload); +int pn_do_end(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload); +int pn_do_close(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload); /* SASL actions */ -int pn_do_init(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); -int pn_do_mechanisms(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); -int pn_do_challenge(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); -int pn_do_response(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); -int pn_do_outcome(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); +int pn_do_init(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload); +int pn_do_mechanisms(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload); +int pn_do_challenge(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload); +int pn_do_response(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload); +int pn_do_outcome(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload); #endif // _PROTON_DISPATCH_ACTIONS_H diff --git a/c/src/core/dispatcher.c b/c/src/core/dispatcher.c index 7df9ade..97e8c0a 100644 --- a/c/src/core/dispatcher.c +++ b/c/src/core/dispatcher.c @@ -21,26 +21,27 @@ #include "dispatcher.h" +#include "consumers.h" +#include "dispatch_actions.h" #include "engine-internal.h" #include "framing.h" #include "logger_private.h" #include "protocol.h" -#include "dispatch_actions.h" -int pni_bad_frame(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) { +int pni_bad_frame(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload) { PN_LOG(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, "Error dispatching frame: type: %d: Unknown performative", frame_type); return PN_ERR; } -int pni_bad_frame_type(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) { +int pni_bad_frame_type(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload) { PN_LOG(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, "Error dispatching frame: Unknown frame type: %d", frame_type); return PN_ERR; } // We could use a table based approach here if we needed to dynamically // add new performatives -static inline int pni_dispatch_action(pn_transport_t* transport, uint64_t lcode, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) +static inline int pni_dispatch_action(pn_transport_t* transport, uint64_t lcode, uint8_t frame_type, uint16_t channel, pn_bytes_t frame_payload) { pn_action_t *action; switch (frame_type) { @@ -72,46 +73,28 @@ static inline int pni_dispatch_action(pn_transport_t* transport, uint64_t lcode, break; default: action = pni_bad_frame_type; break; }; - return action(transport, frame_type, channel, args, payload); + return action(transport, frame_type, channel, frame_payload); } - -static int pni_dispatch_frame(pn_frame_t frame, pn_logger_t *logger, pn_transport_t * transport, pn_data_t *args) +static int pni_dispatch_frame(pn_frame_t frame, pn_logger_t *logger, pn_transport_t * transport) { pn_bytes_t frame_payload = frame.frame_payload0; if (frame_payload.size == 0) { // ignore null frames return 0; } - ssize_t dsize = pn_data_decode(args, frame_payload.start, frame_payload.size); - if (dsize < 0) { - PN_LOG_MSG_DATA(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, frame_payload, - "Error decoding frame: %s %s\n", pn_code(dsize), pn_error_text(pn_data_error(args))); - return dsize; - } - uint8_t frame_type = frame.type; - uint16_t channel = frame.channel; - // XXX: assuming numeric - - // if we get a symbol we should map it to the numeric value and dispatch on that uint64_t lcode; - bool scanned; - int e = pn_data_scan(args, "D?L.", &scanned, &lcode); - if (e) { - PN_LOG(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, "Scan error"); - return e; - } - if (!scanned) { + pni_consumer_t consumer = make_consumer_from_bytes(frame_payload); + if (!consume_descriptor(&consumer, &lcode)) { PN_LOG(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, "Error dispatching frame"); return PN_ERR; } - size_t payload_size = frame_payload.size - dsize; - const char *payload_mem = payload_size ? frame_payload.start + dsize : NULL; - pn_bytes_t payload = {payload_size, payload_mem}; - int err = pni_dispatch_action(transport, lcode, frame_type, channel, args, &payload); + uint8_t frame_type = frame.type; + uint16_t channel = frame.channel; - pn_data_clear(args); + int err = pni_dispatch_action(transport, lcode, frame_type, channel, frame_payload); return err; } @@ -128,7 +111,7 @@ ssize_t pn_dispatcher_input(pn_transport_t *transport, const char *bytes, size_t read += n; available -= n; transport->input_frames_ct += 1; - int e = pni_dispatch_frame(frame, &transport->logger, transport, transport->args); + int e = pni_dispatch_frame(frame, &transport->logger, transport); if (e) return e; } else if (n < 0) { pn_do_error(transport, "amqp:connection:framing-error", "malformed frame"); diff --git a/c/src/core/dispatcher.h b/c/src/core/dispatcher.h index 29881b5..2d3cf21 100644 --- a/c/src/core/dispatcher.h +++ b/c/src/core/dispatcher.h @@ -29,7 +29,7 @@ #include "proton/codec.h" #include "proton/types.h" -typedef int (pn_action_t)(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); +typedef int (pn_action_t)(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, const pn_bytes_t frame_payload); ssize_t pn_dispatcher_input(pn_transport_t* transport, const char* bytes, size_t available, bool batch, bool* halt); ssize_t pn_dispatcher_output(pn_transport_t *transport, char *bytes, size_t size); diff --git a/c/src/core/framing.c b/c/src/core/framing.c index 79ead83..1f791d5 100644 --- a/c/src/core/framing.c +++ b/c/src/core/framing.c @@ -158,3 +158,15 @@ int pn_framing_send_sasl(pn_transport_t *transport, pn_bytes_t performative) transport->output_frames_ct += 1; return 0; } + +ssize_t pn_framing_recv_amqp(pn_data_t *args, pn_logger_t *logger, const pn_bytes_t bytes) +{ + pn_data_clear(args); + ssize_t dsize = pn_data_decode(args, bytes.start, bytes.size); + if (dsize < 0) { + PN_LOG_MSG_DATA(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, bytes, + "Error decoding frame: %s %s\n", pn_code(dsize), pn_error_text(pn_data_error(args))); + } + + return dsize; +} diff --git a/c/src/core/framing.h b/c/src/core/framing.h index 795db83..1c44ed4 100644 --- a/c/src/core/framing.h +++ b/c/src/core/framing.h @@ -25,6 +25,7 @@ #include "buffer.h" #include "logger_private.h" +#include "proton/codec.h" #include "proton/types.h" #include <stddef.h> @@ -51,4 +52,6 @@ int pn_framing_send_amqp(pn_transport_t *transport, uint16_t ch, pn_bytes_t perf int pn_framing_send_amqp_with_payload(pn_transport_t *transport, uint16_t ch, pn_bytes_t performative, pn_bytes_t payload); int pn_framing_send_sasl(pn_transport_t *transport, pn_bytes_t performative); +ssize_t pn_framing_recv_amqp(pn_data_t *args, pn_logger_t *logger, const pn_bytes_t frame_payload); + #endif /* framing.h */ diff --git a/c/src/core/logger.c b/c/src/core/logger.c index 4f9df7e..cc794fd 100644 --- a/c/src/core/logger.c +++ b/c/src/core/logger.c @@ -191,11 +191,9 @@ void pni_logger_log_data(pn_logger_t *logger, pn_log_subsystem_t subsystem, pn_l char buf[256]; ssize_t n = pn_quote_data(buf, 256, bytes, size); if (n >= 0) { - pn_logger_logf(logger, subsystem, severity, "%s: %s", msg, buf); + pn_logger_logf(logger, subsystem, severity, "%s: \"%s\"", msg, buf); } else if (n == PN_OVERFLOW) { - pn_logger_logf(logger, subsystem, severity, "%s: %s (truncated)", msg, buf); - } else { - pn_logger_logf(logger, subsystem, severity, "%s: cannot log data: %s", msg, pn_code(n)); + pn_logger_logf(logger, subsystem, severity, "%s: \"%s\"... (truncated)", msg, buf); } } diff --git a/c/src/core/transport.c b/c/src/core/transport.c index 0e0f62e..de63431 100644 --- a/c/src/core/transport.c +++ b/c/src/core/transport.c @@ -1045,9 +1045,8 @@ static char *pn_bytes_strdup(pn_bytes_t str) return pn_strndup(str.start, str.size); } -int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) +int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload) { - pn_connection_t *conn = transport->connection; bool container_q, hostname_q, remote_channel_max_q, remote_max_frame_q; uint16_t remote_channel_max; uint32_t remote_max_frame; @@ -1055,6 +1054,10 @@ int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_clear(transport->remote_offered_capabilities); pn_data_clear(transport->remote_desired_capabilities); pn_data_clear(transport->remote_properties); + + pn_data_t *args = transport->args; + ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); + if (dsize < 0) return dsize; int err = pn_data_scan(args, "D.[?S?S?I?HI..CCC]", &container_q, &remote_container, &hostname_q, &remote_hostname, @@ -1087,6 +1090,7 @@ int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pni_mem_deallocate(PN_CLASSCLASS(pn_strdup), transport->remote_hostname); transport->remote_hostname = hostname_q ? pn_bytes_strdup(remote_hostname) : NULL; + pn_connection_t *conn = transport->connection; if (conn) { PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_ACTIVE); pni_post_remote_open_events(transport, conn); @@ -1098,11 +1102,16 @@ int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, return 0; } -int pn_do_begin(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) +int pn_do_begin(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload) { + bool reply; uint16_t remote_channel; pn_sequence_t next; + + pn_data_t *args = transport->args; + ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); + if (dsize < 0) return dsize; int err = pn_data_scan(args, "D.[?HI]", &reply, &remote_channel, &next); if (err) return err; @@ -1206,7 +1215,7 @@ int pn_terminus_set_address_bytes(pn_terminus_t *terminus, pn_bytes_t address) return pn_string_setn(terminus->address, address.start, address.size); } -int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) +int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload) { pn_bytes_t name; uint32_t handle; @@ -1222,6 +1231,10 @@ int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel uint8_t snd_settle_mode, rcv_settle_mode; uint64_t max_msgsz; bool has_props; + + pn_data_t *args = transport->args; + ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); + if (dsize < 0) return dsize; pn_data_t *rem_props = pn_data(0); int err = pn_data_scan(args, "D.[SIo?B?BD.[SIsIo.s]D.[SIsIo]..IL..?C]", &name, &handle, &is_sender, @@ -1231,8 +1244,8 @@ int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel &target, &tgt_dr, &tgt_exp, &tgt_timeout, &tgt_dynamic, &idc, &max_msgsz, &has_props, rem_props); if (err) { - pn_free(rem_props); - return err; + pn_free(rem_props); + return err; } char strbuf[128]; // avoid malloc for most link names char *strheap = (name.size >= sizeof(strbuf)) ? (char *) malloc(name.size + 1) : NULL; @@ -1242,10 +1255,10 @@ int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel pn_session_t *ssn = pni_channel_state(transport, channel); if (!ssn) { - pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", channel); - if (strheap) free(strheap); - pn_free(rem_props); - return PN_EOS; + pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", channel); + if (strheap) free(strheap); + pn_free(rem_props); + return PN_EOS; } pn_link_t *link = pni_find_link(ssn, name, is_sender); if (link && (int32_t)link->state.remote_handle >= 0) { @@ -1372,7 +1385,7 @@ static void pn_full_settle(pn_delivery_map_t *db, pn_delivery_t *delivery) pn_decref(delivery); } -int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) +int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload) { // XXX: multi transfer uint32_t handle; @@ -1385,10 +1398,17 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t chann bool resume, aborted, batchable; uint64_t type; pn_data_clear(transport->disp_data); + + pn_data_t *args = transport->args; + ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); + if (dsize < 0) return dsize; int err = pn_data_scan(args, "D.[I?Iz.?oo.D?LCooo]", &handle, &id_present, &id, &tag, &settled_set, &settled, &more, &has_type, &type, transport->disp_data, &resume, &aborted, &batchable); if (err) return err; + payload.size -= dsize; + payload.start += dsize; + pn_session_t *ssn = pni_channel_state(transport, channel); if (!ssn) { return pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", channel); @@ -1458,7 +1478,7 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t chann } if (delivery) { - pn_buffer_append(delivery->bytes, payload->start, payload->size); + pn_buffer_append(delivery->bytes, payload.start, payload.size); if (more) { if (!link->more_pending) { // First frame of a multi-frame transfer. Remember at link level. @@ -1488,7 +1508,7 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t chann pn_collector_put(transport->connection->collector, PN_OBJECT, delivery, PN_DELIVERY); } - ssn->incoming_bytes += payload->size; + ssn->incoming_bytes += payload.size; ssn->state.incoming_transfer_count++; ssn->state.incoming_window--; @@ -1500,12 +1520,16 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t chann return 0; } -int pn_do_flow(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) +int pn_do_flow(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload) { pn_sequence_t onext, inext, delivery_count; uint32_t iwin, owin, link_credit; uint32_t handle; bool inext_init, handle_init, dcount_init, drain; + + pn_data_t *args = transport->args; + ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); + if (dsize < 0) return dsize; int err = pn_data_scan(args, "D.[?IIII?I?II.o]", &inext_init, &inext, &iwin, &onext, &owin, &handle_init, &handle, &dcount_init, &delivery_count, &link_credit, &drain); @@ -1647,13 +1671,19 @@ static int pni_do_delivery_disposition(pn_transport_t * transport, pn_delivery_t return 0; } -int pn_do_disposition(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) +int pn_do_disposition(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload) { bool role; pn_sequence_t first, last; uint64_t type = 0; - bool last_init, settled, type_init; + bool last_init, settled; + + bool type_init; pn_data_clear(transport->disp_data); + + pn_data_t *args = transport->args; + ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); + if (dsize < 0) return dsize; int err = pn_data_scan(args, "D.[oI?IoD?LC]", &role, &first, &last_init, &last, &settled, &type_init, &type, transport->disp_data); @@ -1710,10 +1740,14 @@ int pn_do_disposition(pn_transport_t *transport, uint8_t frame_type, uint16_t ch return 0; } -int pn_do_detach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) +int pn_do_detach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload) { uint32_t handle; bool closed; + + pn_data_t *args = transport->args; + ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); + if (dsize < 0) return dsize; int err = pn_data_scan(args, "D.[Io]", &handle, &closed); if (err) return err; @@ -1741,12 +1775,16 @@ int pn_do_detach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel return 0; } -int pn_do_end(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) +int pn_do_end(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload) { pn_session_t *ssn = pni_channel_state(transport, channel); if (!ssn) { return pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", channel); } + + pn_data_t *args = transport->args; + ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); + if (dsize < 0) return dsize; int err = pn_scan_error(args, &ssn->endpoint.remote_condition, SCAN_ERROR_DEFAULT); if (err) return err; PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_CLOSED); @@ -1755,9 +1793,13 @@ int pn_do_end(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, p return 0; } -int pn_do_close(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) +int pn_do_close(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload) { pn_connection_t *conn = transport->connection; + + pn_data_t *args = transport->args; + ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); + if (dsize < 0) return dsize; int err = pn_scan_error(args, &transport->remote_condition, SCAN_ERROR_DEFAULT); if (err) return err; transport->close_rcvd = true; diff --git a/c/src/sasl/sasl.c b/c/src/sasl/sasl.c index d9fd79b..80ffad8 100644 --- a/c/src/sasl/sasl.c +++ b/c/src/sasl/sasl.c @@ -881,7 +881,7 @@ pn_sasl_outcome_t pn_sasl_outcome(pn_sasl_t *sasl0) } // Received Server side -int pn_do_init(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) +int pn_do_init(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload) { pni_sasl_t *sasl = transport->sasl; @@ -894,6 +894,11 @@ int pn_do_init(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t mech; pn_bytes_t recv; + + pn_data_t *args = transport->args; + ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); + if (dsize < 0) return dsize; + int err = pn_data_scan(args, "D.[sz]", &mech, &recv); if (err) return err; sasl->selected_mechanism = pn_strndup(mech.start, mech.size); @@ -913,7 +918,7 @@ int pn_do_init(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, } // Received client side -int pn_do_mechanisms(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) +int pn_do_mechanisms(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload) { pni_sasl_t *sasl = transport->sasl; @@ -930,6 +935,11 @@ int pn_do_mechanisms(pn_transport_t *transport, uint8_t frame_type, uint16_t cha // Try array of symbols for mechanism list bool array = false; + + pn_data_t *args = transport->args; + ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); + if (dsize < 0) return dsize; + int err = pn_data_scan(args, "D.[?@[", &array); if (err) return err; @@ -969,7 +979,7 @@ int pn_do_mechanisms(pn_transport_t *transport, uint8_t frame_type, uint16_t cha } // Received client side -int pn_do_challenge(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) +int pn_do_challenge(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload) { pni_sasl_t *sasl = transport->sasl; @@ -981,6 +991,11 @@ int pn_do_challenge(pn_transport_t *transport, uint8_t frame_type, uint16_t chan if (!sasl->client) return PN_ERR; pn_bytes_t recv; + + pn_data_t *args = transport->args; + ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); + if (dsize < 0) return dsize; + int err = pn_data_scan(args, "D.[z]", &recv); if (err) return err; @@ -990,7 +1005,7 @@ int pn_do_challenge(pn_transport_t *transport, uint8_t frame_type, uint16_t chan } // Received server side -int pn_do_response(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) +int pn_do_response(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload) { pni_sasl_t *sasl = transport->sasl; @@ -1002,6 +1017,11 @@ int pn_do_response(pn_transport_t *transport, uint8_t frame_type, uint16_t chann if (sasl->client) return PN_ERR; pn_bytes_t recv; + + pn_data_t *args = transport->args; + ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); + if (dsize < 0) return dsize; + int err = pn_data_scan(args, "D.[z]", &recv); if (err) return err; @@ -1011,7 +1031,7 @@ int pn_do_response(pn_transport_t *transport, uint8_t frame_type, uint16_t chann } // Received client side -int pn_do_outcome(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) +int pn_do_outcome(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload) { pni_sasl_t *sasl = transport->sasl; @@ -1024,6 +1044,11 @@ int pn_do_outcome(pn_transport_t *transport, uint8_t frame_type, uint16_t channe uint8_t outcome; pn_bytes_t recv; + + pn_data_t *args = transport->args; + ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); + if (dsize < 0) return dsize; + int err = pn_data_scan(args, "D.[Bz]", &outcome, &recv); if (err) return err; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
