This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 72c87537a136dbfc63a0c811d159631ecd0e3986 Author: Andrew Stitcher <[email protected]> AuthorDate: Fri Jul 30 13:37:41 2021 -0400 PROTON-2451: Use generated code in transport/message/sasl receive code --- c/src/core/message.c | 142 +++++++++++++++++++++++++--- c/src/core/transport.c | 249 ++++++++++++++++++++++++++++++++++++++++++++++--- c/src/sasl/sasl.c | 73 +++++++++++++-- 3 files changed, 430 insertions(+), 34 deletions(-) diff --git a/c/src/core/message.c b/c/src/core/message.c index 55bbb18..a94c034 100644 --- a/c/src/core/message.c +++ b/c/src/core/message.c @@ -28,6 +28,7 @@ #ifndef GENERATE_CODEC_CODE #include "core/frame_generators.h" +#include "core/frame_consumers.h" #endif #include <proton/link.h> @@ -55,12 +56,13 @@ struct pn_message_t { pn_string_t *group_id; pn_string_t *reply_to_group_id; - pn_data_t *data; pn_data_t *instructions; pn_data_t *annotations; pn_data_t *properties; pn_data_t *body; - +#ifdef GENERATE_CODEC_CODE + pn_data_t *data; +#endif pn_error_t *error; pn_sequence_t group_sequence; @@ -87,7 +89,9 @@ void pn_message_finalize(void *obj) pn_free(msg->reply_to_group_id); pn_data_free(msg->id); pn_data_free(msg->correlation_id); +#ifdef GENERATE_CODEC_CODE pn_data_free(msg->data); +#endif pn_data_free(msg->instructions); pn_data_free(msg->annotations); pn_data_free(msg->properties); @@ -333,7 +337,9 @@ static pn_message_t *pni_message_new(size_t size) msg->reply_to_group_id = pn_string(NULL); msg->inferred = false; +#ifdef GENERATE_CODEC_CODE msg->data = pn_data(16); +#endif msg->instructions = pn_data(16); msg->annotations = pn_data(16); msg->properties = pn_data(16); @@ -387,7 +393,9 @@ void pn_message_clear(pn_message_t *msg) msg->group_sequence = 0; pn_string_clear(msg->reply_to_group_id); msg->inferred = false; +#ifdef GENERATE_CODEC_CODE pn_data_clear(msg->data); +#endif pn_data_clear(msg->instructions); pn_data_clear(msg->annotations); pn_data_clear(msg->properties); @@ -652,6 +660,7 @@ int pn_message_set_reply_to_group_id(pn_message_t *msg, const char *reply_to_gro return pn_string_set(msg->reply_to_group_id, reply_to_group_id); } +#ifdef GENERATE_CODEC_CODE int pn_message_decode(pn_message_t *msg, const char *bytes, size_t size) { assert(msg && bytes && size); @@ -729,31 +738,36 @@ int pn_message_decode(pn_message_t *msg, const char *bytes, size_t size) } break; case DELIVERY_ANNOTATIONS: - pn_data_narrow(msg->data); - err = pn_data_copy(msg->instructions, msg->data); + pn_data_clear(msg->instructions); + err = pn_data_scan(msg->data, "D.C", msg->instructions); + pn_data_rewind(msg->instructions); if (err) return err; break; case MESSAGE_ANNOTATIONS: - pn_data_narrow(msg->data); - err = pn_data_copy(msg->annotations, msg->data); + pn_data_clear(msg->annotations); + err = pn_data_scan(msg->data, "D.C", msg->annotations); + pn_data_rewind(msg->annotations); if (err) return err; break; case APPLICATION_PROPERTIES: - pn_data_narrow(msg->data); - err = pn_data_copy(msg->properties, msg->data); + pn_data_clear(msg->properties); + err = pn_data_scan(msg->data, "D.C", msg->properties); + pn_data_rewind(msg->properties); if (err) return err; break; case DATA: case AMQP_SEQUENCE: msg->inferred = true; - pn_data_narrow(msg->data); - err = pn_data_copy(msg->body, msg->data); + pn_data_clear(msg->body); + err = pn_data_scan(msg->data, "D.C", msg->body); + pn_data_rewind(msg->body); if (err) return err; break; case AMQP_VALUE: msg->inferred = false; - pn_data_narrow(msg->data); - err = pn_data_copy(msg->body, msg->data); + pn_data_clear(msg->body); + err = pn_data_scan(msg->data, "D.C", msg->body); + pn_data_rewind(msg->body); if (err) return err; break; case FOOTER: @@ -768,6 +782,110 @@ int pn_message_decode(pn_message_t *msg, const char *bytes, size_t size) pn_data_clear(msg->data); return 0; } +#else +int pn_message_decode(pn_message_t *msg, const char *bytes, size_t size) +{ + assert(msg && bytes && size); + + pn_bytes_t msg_bytes = {.size=size, .start=bytes}; + while (msg_bytes.size) { + bool scanned; + uint64_t desc; + size_t section_size = pn_amqp_decode_DQLq(msg_bytes, &scanned, &desc); + if (!scanned) { + desc = 0; + } + + switch (desc) { + case HEADER: { + bool priority_q; + uint8_t priority; + pn_amqp_decode_DqEoQBIoIe(msg_bytes, + &msg->durable, + &priority_q, &priority, + &msg->ttl, + &msg->first_acquirer, + &msg->delivery_count); + msg->priority = priority_q ? priority : HEADER_PRIORITY_DEFAULT; + break; + } + case PROPERTIES: { + pn_bytes_t user_id, address, subject, reply_to, ctype, cencoding, + group_id, reply_to_group_id; + pn_data_clear(msg->id); + pn_data_clear(msg->correlation_id); + pn_amqp_decode_DqECzSSSCssttSISe(msg_bytes, msg->id, + &user_id, &address, &subject, &reply_to, + msg->correlation_id, &ctype, &cencoding, + &msg->expiry_time, &msg->creation_time, &group_id, + &msg->group_sequence, &reply_to_group_id); + int err = pn_string_set_bytes(msg->user_id, user_id); + if (err) return pn_error_format(msg->error, err, "error setting user_id"); + err = pn_string_setn(msg->address, address.start, address.size); + if (err) return pn_error_format(msg->error, err, "error setting address"); + err = pn_string_setn(msg->subject, subject.start, subject.size); + if (err) return pn_error_format(msg->error, err, "error setting subject"); + err = pn_string_setn(msg->reply_to, reply_to.start, reply_to.size); + if (err) return pn_error_format(msg->error, err, "error setting reply_to"); + err = pn_string_setn(msg->content_type, ctype.start, ctype.size); + if (err) return pn_error_format(msg->error, err, "error setting content_type"); + err = pn_string_setn(msg->content_encoding, cencoding.start, + cencoding.size); + if (err) return pn_error_format(msg->error, err, "error setting content_encoding"); + err = pn_string_setn(msg->group_id, group_id.start, group_id.size); + if (err) return pn_error_format(msg->error, err, "error setting group_id"); + err = pn_string_setn(msg->reply_to_group_id, reply_to_group_id.start, + reply_to_group_id.size); + if (err) return pn_error_format(msg->error, err, "error setting reply_to_group_id"); + break; + } + case DELIVERY_ANNOTATIONS: { + pn_data_clear(msg->instructions); + pn_amqp_decode_DqC(msg_bytes, msg->instructions); + pn_data_rewind(msg->instructions); + break; + } + case MESSAGE_ANNOTATIONS: { + pn_data_clear(msg->annotations); + pn_amqp_decode_DqC(msg_bytes, msg->annotations); + pn_data_rewind(msg->annotations); + break; + } + case APPLICATION_PROPERTIES: { + pn_data_clear(msg->properties); + pn_amqp_decode_DqC(msg_bytes, msg->properties); + pn_data_rewind(msg->properties); + break; + } + case DATA: + case AMQP_SEQUENCE: { + msg->inferred = true; + pn_data_clear(msg->body); + pn_amqp_decode_DqC(msg_bytes, msg->body); + pn_data_rewind(msg->body); + break; + } + case AMQP_VALUE: { + msg->inferred = false; + pn_data_clear(msg->body); + pn_amqp_decode_DqC(msg_bytes, msg->body); + pn_data_rewind(msg->body); + break; + } + case FOOTER: + break; + default: { + pn_data_clear(msg->body); + pn_data_decode(msg->body, msg_bytes.start, msg_bytes.size); + pn_data_rewind(msg->body); + break; + } + } + msg_bytes = (pn_bytes_t){.size=msg_bytes.size-section_size, .start=msg_bytes.start+section_size}; + } + return 0; +} +#endif #ifdef GENERATE_CODEC_CODE int pn_message_encode(pn_message_t *msg, char *bytes, size_t *size) diff --git a/c/src/core/transport.c b/c/src/core/transport.c index 9c5bf99..bebb5b8 100644 --- a/c/src/core/transport.c +++ b/c/src/core/transport.c @@ -23,6 +23,7 @@ #include "framing.h" #ifndef GENERATE_CODEC_CODE #include "core/frame_generators.h" +#include "core/frame_consumers.h" #endif #include "memory.h" #include "platform/platform.h" @@ -1079,6 +1080,7 @@ int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_clear(transport->remote_desired_capabilities); pn_data_clear(transport->remote_properties); +#ifdef GENERATE_CODEC_CODE pn_data_t *args = transport->args; ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); if (dsize < 0) return dsize; @@ -1092,6 +1094,17 @@ int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, transport->remote_desired_capabilities, transport->remote_properties); if (err) return err; +#else + pn_amqp_decode_DqEQSQSQIQHIqqCCCe(payload, + &container_q, &remote_container, + &hostname_q, &remote_hostname, + &remote_max_frame_q, &remote_max_frame, + &remote_channel_max_q, &remote_channel_max, + &transport->remote_idle_timeout, + transport->remote_offered_capabilities, + transport->remote_desired_capabilities, + transport->remote_properties); +#endif /* * The default value is already stored in the variable. * But the scanner zeroes out values if it does not @@ -1128,16 +1141,19 @@ int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, int pn_do_begin(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload) { - bool reply; uint16_t remote_channel; pn_sequence_t next; +#ifdef GENERATE_CODEC_CODE pn_data_t *args = transport->args; ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); if (dsize < 0) return dsize; int err = pn_data_scan(args, "D.[?HI]", &reply, &remote_channel, &next); if (err) return err; +#else + pn_amqp_decode_DqEQHIe(payload, &reply, &remote_channel, &next); +#endif // AMQP 1.0 section 2.7.1 - if the peer doesn't honor our channel_max -- // express our displeasure by closing the connection with a framing error. @@ -1256,6 +1272,7 @@ int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel uint64_t max_msgsz; bool has_props; +#ifdef GENERATE_CODEC_CODE pn_data_t *args = transport->args; ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); if (dsize < 0) return dsize; @@ -1271,6 +1288,17 @@ int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel pn_free(rem_props); return err; } +#else + pn_data_t *rem_props = pn_data(0); + pn_amqp_decode_DqESIoQBQBDqESIsIoqseDqESIsIoeqqILqqQCe(payload, + &name, &handle, + &is_sender, + &snd_settle, &snd_settle_mode, + &rcv_settle, &rcv_settle_mode, + &source, &src_dr, &src_exp, &src_timeout, &src_dynamic, &dist_mode, + &target, &tgt_dr, &tgt_exp, &tgt_timeout, &tgt_dynamic, + &idc, &max_msgsz, &has_props, rem_props); +#endif char strbuf[128]; // avoid malloc for most link names char *strheap = (name.size >= sizeof(strbuf)) ? (char *) malloc(name.size + 1) : NULL; char *strname = strheap ? strheap : strbuf; @@ -1333,8 +1361,12 @@ int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel pn_terminus_set_dynamic(rtgt, tgt_dynamic); } else { uint64_t code = 0; +#ifdef GENERATE_CODEC_CODE err = pn_data_scan(args, "D.[.....D..DL....]", &code); if (err) return err; +#else + pn_amqp_decode_DqEqqqqqDqqDLqqqqe(payload, &code); +#endif if (code == COORDINATOR) { pn_terminus_set_type(rtgt, PN_COORDINATOR); } else if (code == TARGET) { @@ -1354,6 +1386,7 @@ int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel pn_data_clear(link->remote_source.outcomes); pn_data_clear(link->remote_source.capabilities); +#ifdef GENERATE_CODEC_CODE err = pn_data_scan(args, "D.[.....D.[.....C.C.CC]", link->remote_source.properties, link->remote_source.filter, @@ -1361,6 +1394,13 @@ int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel link->remote_source.capabilities); if (err) return err; +#else + pn_amqp_decode_DqEqqqqqDqEqqqqqCqCqCCee(payload, + link->remote_source.properties, + link->remote_source.filter, + link->remote_source.outcomes, + link->remote_source.capabilities); +#endif pn_data_rewind(link->remote_source.properties); pn_data_rewind(link->remote_source.filter); @@ -1372,14 +1412,25 @@ int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel if (pn_terminus_get_type(&link->remote_target) == PN_COORDINATOR) { // coordinator target only has a capabilities field +#ifdef GENERATE_CODEC_CODE err = pn_data_scan(args, "D.[.....D..D.[C]...]", link->remote_target.capabilities); if (err) return err; +#else + pn_amqp_decode_DqEqqqqqDqqDqECeqqqe(payload, + link->remote_target.capabilities); +#endif } else { +#ifdef GENERATE_CODEC_CODE err = pn_data_scan(args, "D.[.....D..D.[.....CC]", link->remote_target.properties, link->remote_target.capabilities); if (err) return err; +#else + pn_amqp_decode_DqEqqqqqDqqDqEqqqqqCCee(payload, + link->remote_target.properties, + link->remote_target.capabilities); +#endif } pn_data_rewind(link->remote_target.properties); @@ -1421,15 +1472,23 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t chann bool has_type, settled_set; bool resume, aborted, batchable; uint64_t type; - pn_data_clear(transport->disp_data); +#ifdef GENERATE_CODEC_CODE pn_data_t *args = transport->args; ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); if (dsize < 0) return dsize; + pn_data_clear(transport->disp_data); int err = pn_data_scan(args, "D.[I?Iz.?oo.D?LCooo]", &handle, &id_present, &id, &tag, &settled_set, &settled, &more, &has_type, &type, transport->disp_data, &resume, &aborted, &batchable); if (err) return err; +#else + pn_bytes_t disp_data; + size_t dsize = + pn_amqp_decode_DqEIQIzqQooqDQLRoooe(payload, &handle, &id_present, &id, &tag, + &settled_set, &settled, &more, &has_type, &type, &disp_data, + &resume, &aborted, &batchable); +#endif payload.size -= dsize; payload.start += dsize; @@ -1491,11 +1550,18 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t chann "sequencing error, expected delivery-id %u, got %u", state->id, id); } +#ifdef GENERATE_CODEC_CODE if (has_type) { delivery->remote.type = type; pn_data_copy(delivery->remote.data, transport->disp_data); } - +#else + if (has_type) { + delivery->remote.type = type; + pn_data_clear(delivery->remote.data); + pn_data_decode(delivery->remote.data, disp_data.start, disp_data.size); + } +#endif link->state.delivery_count++; link->state.link_credit--; link->queued++; @@ -1551,6 +1617,7 @@ int pn_do_flow(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, uint32_t handle; bool inext_init, handle_init, dcount_init, drain; +#ifdef GENERATE_CODEC_CODE pn_data_t *args = transport->args; ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); if (dsize < 0) return dsize; @@ -1558,6 +1625,11 @@ int pn_do_flow(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, &onext, &owin, &handle_init, &handle, &dcount_init, &delivery_count, &link_credit, &drain); if (err) return err; +#else + pn_amqp_decode_DqEQIIIIQIQIIqoe(payload, &inext_init, &inext, &iwin, + &onext, &owin, &handle_init, &handle, &dcount_init, + &delivery_count, &link_credit, &drain); +#endif pn_session_t *ssn = pni_channel_state(transport, channel); if (!ssn) { @@ -1605,6 +1677,19 @@ int pn_do_flow(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, return 0; } +static void pn_condition_set(pn_condition_t *condition, pn_bytes_t cond, pn_bytes_t desc) +{ + if (condition->name == NULL) { + condition->name = pn_string(NULL); + } + pn_string_setn(condition->name, cond.start, cond.size); + if (condition->description == NULL) { + condition->description = pn_string(NULL); + } + pn_string_setn(condition->description, desc.start, desc.size); +} + +#ifdef GENERATE_CODEC_CODE #define SCAN_ERROR_DEFAULT ("D.[D.[sSC]") #define SCAN_ERROR_DETACH ("D.[..D.[sSC]") #define SCAN_ERROR_DISP ("[D.[sSC]") @@ -1616,22 +1701,17 @@ static int pn_scan_error(pn_data_t *data, pn_condition_t *condition, const char pn_condition_clear(condition); int err = pn_data_scan(data, fmt, &cond, &desc, pn_condition_info(condition)); if (err) return err; - if (condition->name == NULL) { - condition->name = pn_string(NULL); - } - pn_string_setn(condition->name, cond.start, cond.size); - if (condition->description == NULL) { - condition->description = pn_string(NULL); - } - pn_string_setn(condition->description, desc.start, desc.size); + pn_condition_set(condition, cond,desc); pn_data_rewind(pn_condition_info(condition)); return 0; } +#endif static inline bool sequence_lte(pn_sequence_t a, pn_sequence_t b) { return b-a <= INT32_MAX; } +#ifdef GENERATE_CODEC_CODE static int pni_do_delivery_disposition(pn_transport_t * transport, pn_delivery_t *delivery, bool settled, bool remote_data, bool type_init, uint64_t type) { pn_disposition_t *remote = &delivery->remote; @@ -1694,15 +1774,81 @@ static int pni_do_delivery_disposition(pn_transport_t * transport, pn_delivery_t pn_collector_put(transport->connection->collector, PN_OBJECT, delivery, PN_DELIVERY); return 0; } +#else +static int pni_do_delivery_disposition(pn_transport_t * transport, pn_delivery_t *delivery, bool settled, bool remote_data, bool type_init, uint64_t type, pn_bytes_t disp_data) { + pn_disposition_t *remote = &delivery->remote; + + if (type_init) remote->type = type; + + if (remote_data) { + switch (type) { + case PN_RECEIVED: { + bool qnumber; + uint32_t number; + bool qoffset; + uint64_t offset; + pn_amqp_decode_DqEQIQLe(disp_data, &qnumber, &number, &qoffset, &offset); + + if (qnumber) { + remote->section_number = number; + } + if (qoffset) { + remote->section_offset = offset; + } + break; + } + case PN_ACCEPTED: + break; + + case PN_REJECTED: { + pn_bytes_t cond; + pn_bytes_t desc; + pn_amqp_decode_DqEDqEsSCee(disp_data, &cond, &desc, pn_condition_info(&remote->condition)); + pn_condition_set(&remote->condition, cond, desc); + + break; + } + case PN_RELEASED: + break; + + case PN_MODIFIED: { + bool qfailed; + bool failed; + bool qundeliverable; + bool undeliverable; + pn_amqp_decode_DqEQoQoCe(disp_data, &qfailed, &failed, &qundeliverable, &undeliverable, remote->annotations); + + if (qfailed) { + remote->failed = failed; + } + if (qundeliverable) { + remote->undeliverable = undeliverable; + } + break; + } + default: + pn_amqp_decode_DqC(disp_data, remote->data); + break; + } + } + + remote->settled = settled; + delivery->updated = true; + pn_work_update(transport->connection, delivery); + + pn_collector_put(transport->connection->collector, PN_OBJECT, delivery, PN_DELIVERY); + return 0; +} +#endif int pn_do_disposition(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload) { bool role; pn_sequence_t first, last; - uint64_t type = 0; bool last_init, settled; - +#ifdef GENERATE_CODEC_CODE bool type_init; + uint64_t type = 0; pn_data_clear(transport->disp_data); pn_data_t *args = transport->args; @@ -1712,6 +1858,11 @@ int pn_do_disposition(pn_transport_t *transport, uint8_t frame_type, uint16_t ch &last, &settled, &type_init, &type, transport->disp_data); if (err) return err; +#else + pn_bytes_t disp_data; + pn_amqp_decode_DqEoIQIoRe(payload, &role, &first, &last_init, + &last, &settled, &disp_data); +#endif if (!last_init) last = first; pn_session_t *ssn = pni_channel_state(transport, channel); @@ -1730,6 +1881,7 @@ int pn_do_disposition(pn_transport_t *transport, uint8_t frame_type, uint16_t ch deliveries = &ssn->state.incoming; } +#ifdef GENERATE_CODEC_CODE pn_data_rewind(transport->disp_data); bool remote_data = (pn_data_next(transport->disp_data) && pn_data_get_list(transport->disp_data) > 0); @@ -1747,7 +1899,7 @@ int pn_do_disposition(pn_transport_t *transport, uint8_t frame_type, uint16_t ch pn_sequence_t key = pn_hash_key(dh, entry); if (sequence_lte(first, key) && sequence_lte(key, last)) { pn_delivery_t *delivery = (pn_delivery_t*) pn_hash_value(dh, entry); - err = pni_do_delivery_disposition(transport, delivery, settled, remote_data, type_init, type); + int err = pni_do_delivery_disposition(transport, delivery, settled, remote_data, type_init, type); if (err) return err; } } @@ -1755,12 +1907,44 @@ int pn_do_disposition(pn_transport_t *transport, uint8_t frame_type, uint16_t ch for (pn_sequence_t id = first; sequence_lte(id, last); ++id) { pn_delivery_t *delivery = pni_delivery_map_get(deliveries, id); if (delivery) { - err = pni_do_delivery_disposition(transport, delivery, settled, remote_data, type_init, type); + int err = pni_do_delivery_disposition(transport, delivery, settled, remote_data, type_init, type); if (err) return err; } } } +#else + bool type_init; + uint64_t type; + bool remote_data; + pn_amqp_decode_DQLQq(disp_data, &type_init, &type, &remote_data); + + + // Do some validation of received first and last values + // TODO: We should really also clamp the first value here, but we're not keeping track of the earliest + // unsettled delivery sequence no + last = sequence_lte(last, deliveries->next) ? last : deliveries->next; + + // If there are fewer deliveries in the session than the range then look at every delivery in the session + // otherwise look at every delivery_id in the disposition performative + pn_hash_t *dh = deliveries->deliveries; + if (last-first+1 >= pn_hash_size(dh)) { + for (pn_handle_t entry = pn_hash_head(dh); entry!=0 ; entry = pn_hash_next(dh, entry)) { + pn_sequence_t key = pn_hash_key(dh, entry); + if (sequence_lte(first, key) && sequence_lte(key, last)) { + pn_delivery_t *delivery = (pn_delivery_t*) pn_hash_value(dh, entry); + pni_do_delivery_disposition(transport, delivery, settled, remote_data, type_init, type, disp_data); + } + } + } else { + for (pn_sequence_t id = first; sequence_lte(id, last); ++id) { + pn_delivery_t *delivery = pni_delivery_map_get(deliveries, id); + if (delivery) { + pni_do_delivery_disposition(transport, delivery, settled, remote_data, type_init, type, disp_data); + } + } + } +#endif return 0; } @@ -1769,11 +1953,16 @@ int pn_do_detach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel uint32_t handle; bool closed; +#ifdef GENERATE_CODEC_CODE pn_data_t *args = transport->args; ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); if (dsize < 0) return dsize; int err = pn_data_scan(args, "D.[Io]", &handle, &closed); if (err) return err; +#else + pn_bytes_t error_condition; + pn_amqp_decode_DqEIoRe(payload, &handle, &closed, &error_condition); +#endif pn_session_t *ssn = pni_channel_state(transport, channel); if (!ssn) { @@ -1784,8 +1973,18 @@ int pn_do_detach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel return pn_do_error(transport, "amqp:invalid-field", "no such handle: %u", handle); } +#ifdef GENERATE_CODEC_CODE err = pn_scan_error(args, &link->endpoint.remote_condition, SCAN_ERROR_DETACH); if (err) return err; +#else + pn_bytes_t cond; + pn_bytes_t desc; + pn_condition_t* condition = &link->endpoint.remote_condition; + pn_condition_clear(condition); + pn_amqp_decode_DqEsSCe(error_condition, &cond, &desc, pn_condition_info(condition)); + pn_condition_set(condition, cond, desc); + pn_data_rewind(pn_condition_info(condition)); +#endif if (closed) { @@ -1806,11 +2005,21 @@ int pn_do_end(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, p return pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", channel); } +#ifdef GENERATE_CODEC_CODE pn_data_t *args = transport->args; ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); if (dsize < 0) return dsize; int err = pn_scan_error(args, &ssn->endpoint.remote_condition, SCAN_ERROR_DEFAULT); if (err) return err; +#else + pn_bytes_t cond; + pn_bytes_t desc; + pn_condition_t* condition = &ssn->endpoint.remote_condition; + pn_condition_clear(condition); + pn_amqp_decode_DqEDqEsSCee(payload, &cond, &desc, pn_condition_info(condition)); + pn_condition_set(condition, cond, desc); + pn_data_rewind(pn_condition_info(condition)); +#endif PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_CLOSED); pn_collector_put(transport->connection->collector, PN_OBJECT, ssn, PN_SESSION_REMOTE_CLOSE); pni_unmap_remote_channel(ssn); @@ -1821,11 +2030,21 @@ int pn_do_close(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, { pn_connection_t *conn = transport->connection; +#ifdef GENERATE_CODEC_CODE pn_data_t *args = transport->args; ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); if (dsize < 0) return dsize; int err = pn_scan_error(args, &transport->remote_condition, SCAN_ERROR_DEFAULT); if (err) return err; +#else + pn_bytes_t cond; + pn_bytes_t desc; + pn_condition_t* condition = &transport->remote_condition; + pn_condition_clear(condition); + pn_amqp_decode_DqEDqEsSCee(payload, &cond, &desc, pn_condition_info(condition)); + pn_condition_set(condition, cond, desc); + pn_data_rewind(pn_condition_info(condition)); +#endif transport->close_rcvd = true; PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_CLOSED); pn_collector_put(transport->connection->collector, PN_OBJECT, conn, PN_CONNECTION_REMOTE_CLOSE); diff --git a/c/src/sasl/sasl.c b/c/src/sasl/sasl.c index 1e853b8..65571d8 100644 --- a/c/src/sasl/sasl.c +++ b/c/src/sasl/sasl.c @@ -24,7 +24,9 @@ #include "core/autodetect.h" #include "core/framing.h" #ifndef GENERATE_CODEC_CODE +#include "core/consumers.h" #include "core/frame_generators.h" +#include "core/frame_consumers.h" #endif #include "core/engine-internal.h" #include "core/util.h" @@ -922,12 +924,16 @@ int pn_do_init(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t mech; pn_bytes_t recv; +#ifdef GENERATE_CODEC_CODE pn_data_t *args = transport->args; ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); if (dsize < 0) return dsize; int err = pn_data_scan(args, "D.[sz]", &mech, &recv); if (err) return err; +#else + pn_amqp_decode_DqEsze(payload, &mech, &recv); +#endif sasl->selected_mechanism = pn_strndup(mech.start, mech.size); // We need to filter out a supplied mech in in the inclusion list @@ -956,10 +962,9 @@ int pn_do_mechanisms(pn_transport_t *transport, uint8_t frame_type, uint16_t cha // We should only receive this if we are a sasl client if (!sasl->client) return PN_ERR; - // This scanning relies on pn_data_scan leaving the pn_data_t cursors - // where they are after finishing the scan pn_string_t *mechs = pn_string(""); +#ifdef GENERATE_CODEC_CODE // Try array of symbols for mechanism list bool array = false; @@ -967,6 +972,8 @@ int pn_do_mechanisms(pn_transport_t *transport, uint8_t frame_type, uint16_t cha ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); if (dsize < 0) return dsize; + // This scanning relies on pn_data_scan leaving the pn_data_t cursors + // where they are after finishing the scan int err = pn_data_scan(args, "D.[?@[", &array); if (err) return err; @@ -975,7 +982,7 @@ int pn_do_mechanisms(pn_transport_t *transport, uint8_t frame_type, uint16_t cha while(pn_data_next(args)) { pn_bytes_t s = pn_data_get_symbol(args); if (pni_sasl_client_included_mech(sasl->included_mechanisms, s)) { - pn_string_addf(mechs, "%*s ", (int)s.size, s.start); + pn_string_addf(mechs, "%.*s ", (int)s.size, s.start); } } @@ -993,6 +1000,50 @@ int pn_do_mechanisms(pn_transport_t *transport, uint8_t frame_type, uint16_t cha pn_string_setn(mechs, symbol.start, symbol.size); } } +#else + pn_bytes_t subpayload; + pn_amqp_decode_DqERe(payload, &subpayload); + pni_consumer_t consumer = make_consumer_from_bytes(subpayload); + + uint8_t element_type; + uint32_t element_count; + pni_consumer_t subconsumer; + if (consume_array(&consumer, &subconsumer, &element_count, &element_type) && (element_type==PNE_SYM32 || element_type==PNE_SYM8)) { + // If this is an array of symbols decode each symbol + pn_bytes_t symbol; + switch (element_type) { + case PNE_SYM8: + while (element_count) { + pni_consumer_readv8(&subconsumer, &symbol); + if (pni_sasl_client_included_mech(sasl->included_mechanisms, symbol)) { + pn_string_addf(mechs, "%.*s ", (int)symbol.size, symbol.start); + } + --element_count; + } + break; + case PNE_SYM32: + while (element_count) { + pni_consumer_readv32(&subconsumer, &symbol); + if (pni_sasl_client_included_mech(sasl->included_mechanisms, symbol)) { + pn_string_addf(mechs, "%.*s ", (int)symbol.size, symbol.start); + } + --element_count; + } + break; + } + if (pn_string_size(mechs)) { + pn_string_buffer(mechs)[pn_string_size(mechs)-1] = 0; + } + } else { + // If not then see if it is a single symbol + pn_bytes_t symbol; + pn_amqp_decode_DqEse(payload, &symbol); + + if (pni_sasl_client_included_mech(sasl->included_mechanisms, symbol)) { + pn_string_setn(mechs, symbol.start, symbol.size); + } + } +#endif if (!(pni_sasl_impl_init_client(transport) && pn_string_size(mechs) && @@ -1019,12 +1070,15 @@ int pn_do_challenge(pn_transport_t *transport, uint8_t frame_type, uint16_t chan pn_bytes_t recv; +#ifdef GENERATE_CODEC_CODE pn_data_t *args = transport->args; ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); if (dsize < 0) return dsize; - int err = pn_data_scan(args, "D.[z]", &recv); if (err) return err; +#else + pn_amqp_decode_DqEze(payload, &recv); +#endif pni_sasl_impl_process_challenge(transport, &recv); @@ -1045,12 +1099,15 @@ int pn_do_response(pn_transport_t *transport, uint8_t frame_type, uint16_t chann pn_bytes_t recv; +#ifdef GENERATE_CODEC_CODE pn_data_t *args = transport->args; ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); if (dsize < 0) return dsize; - int err = pn_data_scan(args, "D.[z]", &recv); if (err) return err; +#else + pn_amqp_decode_DqEze(payload, &recv); +#endif pni_sasl_impl_process_response(transport, &recv); @@ -1072,13 +1129,15 @@ int pn_do_outcome(pn_transport_t *transport, uint8_t frame_type, uint16_t channe uint8_t outcome; pn_bytes_t recv; +#ifdef GENERATE_CODEC_CODE pn_data_t *args = transport->args; ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload); if (dsize < 0) return dsize; - int err = pn_data_scan(args, "D.[Bz]", &outcome, &recv); if (err) return err; - +#else + pn_amqp_decode_DqEBze(payload, &outcome, &recv); +#endif // Preset the outcome to what the server sent us - the plugin can alter this. // In practise the plugin processing here should only fail because it fails // to authenticate the server id after the server authenticates our user. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
