This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit a3d71b6defa17a4ac59a72f45077ed07d1c30d81 Author: Andrew Stitcher <[email protected]> AuthorDate: Wed Jun 23 11:59:06 2021 -0400 PROTON-2451: Consume functions for use in receiving amqp frames and decoding messages Also modify dispatch code to use the finished consume_descriptor() function. --- c/src/core/consumers.h | 387 ++++++++++++++++++++++++++++++++++++++++++++++-- c/src/core/dispatcher.c | 3 +- 2 files changed, 375 insertions(+), 15 deletions(-) diff --git a/c/src/core/consumers.h b/c/src/core/consumers.h index d2542b1..df240c8 100644 --- a/c/src/core/consumers.h +++ b/c/src/core/consumers.h @@ -105,13 +105,39 @@ static inline bool pni_consumer_readf64(pni_consumer_t *consumer, uint64_t* resu static inline bool pni_consumer_readf128(pni_consumer_t *consumer, void *dst) { - if (consumer->position+16 > consumer->size) return false; - + if (consumer->position+16 > consumer->size) { + consumer->position = consumer->size; + return false; + } memcpy(dst, &consumer->output_start[consumer->position], 16); consumer->position += 16; return true; } +static inline bool pni_consumer_readv8(pni_consumer_t *consumer, pn_bytes_t* bytes){ + uint8_t size; + if (!pni_consumer_readf8(consumer, &size)) return false; + if (consumer->position+size > consumer->size) { + consumer->position = consumer->size; + return false; + } + *bytes = (pn_bytes_t){.size=size,.start=(const char *)consumer->output_start+consumer->position}; + consumer->position += size; + return true; +} + +static inline bool pni_consumer_readv32(pni_consumer_t *consumer, pn_bytes_t* bytes){ + uint32_t size; + if (!pni_consumer_readf32(consumer, &size)) return false; + if (consumer->position+size > consumer->size) { + consumer->position = consumer->size; + return false; + } + *bytes = (pn_bytes_t){.size=size,.start=(const char *)consumer->output_start+consumer->position}; + consumer->position += size; + return true; +} + static inline bool pni_consumer_read_value_not_described(pni_consumer_t* consumer, uint8_t type, pn_bytes_t *value) { uint8_t subcategory = type >> 4; switch (subcategory) { @@ -180,15 +206,69 @@ static inline bool pni_consumer_read_value_not_described(pni_consumer_t* consume return false; } +static inline bool pni_consumer_skip_value_not_described(pni_consumer_t* consumer, uint8_t type) { + pn_bytes_t value; + return pni_consumer_read_value_not_described(consumer, type, &value); +} + +static inline bool pni_consumer_skip_value(pni_consumer_t* consumer, uint8_t type) { + // Check for described type + if (type==0) { + // Skip descriptor + if (!pni_consumer_readf8(consumer, &type)) return false; + if (!pni_consumer_skip_value_not_described(consumer, type)) return false; + if (!pni_consumer_readf8(consumer, &type)) return false; + return pni_consumer_skip_value_not_described(consumer, type); + } + return pni_consumer_skip_value_not_described(consumer, type); +} + /////////////////////////////////////////////////////////////////////////////// -static inline bool consume_expected_ubyte(pni_consumer_t* consumer, uint8_t expected) -{ - uint8_t e; - return pni_consumer_readf8(consumer, &e) && e==expected; +static inline bool consume_single_value_not_described(pni_consumer_t* consumer, uint8_t* type) { + uint8_t t; + if (!pni_consumer_readf8(consumer, &t)) return false; + if (!pni_consumer_skip_value_not_described(consumer, t)) return false; + if (t==0) return false; + *type = t; + return true; +} + +static inline bool consume_single_value(pni_consumer_t* consumer, uint8_t* type) { + uint8_t t; + if (!pni_consumer_readf8(consumer, &t)) return false; + *type = t; + if (t==0) { + uint8_t dummy; + // Descriptor + bool dq = consume_single_value_not_described(consumer, &dummy); + // Value + bool vq = consume_single_value_not_described(consumer, &dummy); + return dq && vq; + } else { + return pni_consumer_skip_value(consumer, t); + } +} + +static inline bool consume_raw(pni_consumer_t* consumer, pn_bytes_t* raw) { + size_t start = consumer->position; + uint8_t dummy; + bool succeed = consume_single_value(consumer, &dummy); + if (succeed) { + *raw = (pn_bytes_t){.size=consumer->position-start, .start=(const char*)consumer->output_start+start}; + } else { + *raw = (pn_bytes_t) {0, NULL}; + } + return succeed; +} + +static inline bool consume_anything(pni_consumer_t* consumer) { + uint8_t dummy; + return consume_single_value(consumer, &dummy); } static inline bool consume_ulong(pni_consumer_t* consumer, uint64_t *ulong) { + *ulong = 0; uint8_t type; if (!pni_consumer_readf8(consumer, &type)) return false; switch (type) { @@ -196,30 +276,309 @@ static inline bool consume_ulong(pni_consumer_t* consumer, uint64_t *ulong) { uint8_t ul; if (!pni_consumer_readf8(consumer, &ul)) return false; *ulong = ul; - break; + return true; } case PNE_ULONG: { uint64_t ul; if (!pni_consumer_readf64(consumer, &ul)) return false; *ulong = ul; - break; + return true; } case PNE_ULONG0: { *ulong = 0; - break; + return true; + } + default: + pni_consumer_skip_value(consumer, type); + return false; + } +} + +static inline bool consume_uint(pni_consumer_t* consumer, uint32_t *uint) { + *uint = 0; + uint8_t type; + if (!pni_consumer_readf8(consumer, &type)) return false; + switch (type) { + case PNE_SMALLUINT: { + uint8_t ui; + if (!pni_consumer_readf8(consumer, &ui)) return false; + *uint = ui; + return true; + } + case PNE_UINT: { + uint32_t ui; + if (!pni_consumer_readf32(consumer, &ui)) return false; + *uint = ui; + return true; + } + case PNE_UINT0: { + *uint = 0; + return true; + } + default: + pni_consumer_skip_value(consumer, type); + return false; + } +} + +static inline bool consume_ushort(pni_consumer_t* consumer, uint16_t *ushort) { + *ushort = 0; + uint8_t type; + if (!pni_consumer_readf8(consumer, &type)) return false; + switch (type) { + case PNE_USHORT: { + uint16_t us; + if (!pni_consumer_readf16(consumer, &us)) return false; + *ushort = us; + return true; + } + default: + pni_consumer_skip_value(consumer, type); + return false; + } +} + +static inline bool consume_ubyte(pni_consumer_t* consumer, uint8_t *ubyte) { + *ubyte = 0; + uint8_t type; + if (!pni_consumer_readf8(consumer, &type)) return false; + switch (type) { + case PNE_UBYTE: { + uint8_t ub; + if (!pni_consumer_readf8(consumer, &ub)) return false; + *ubyte = ub; + return true; + } + default: + pni_consumer_skip_value(consumer, type); + return false; + } +} + +static inline bool consume_bool(pni_consumer_t* consumer, bool *b) { + *b = false; + uint8_t type; + if (!pni_consumer_readf8(consumer, &type)) return false; + switch (type) { + case PNE_BOOLEAN: { + uint8_t ub; + if (!pni_consumer_readf8(consumer, &ub)) return false; + *b = ub; + return true; + } + case PNE_FALSE: + *b = false; + return true; + case PNE_TRUE: + *b = true; + return true; + default: + pni_consumer_skip_value(consumer, type); + return false; + } +} + +static inline bool consume_timestamp(pni_consumer_t* consumer, pn_timestamp_t *timestamp) { + *timestamp = 0; + uint8_t type; + if (!pni_consumer_readf8(consumer, &type)) return false; + switch (type) { + case PNE_MS64: { + return pni_consumer_readf64(consumer, (uint64_t*)timestamp); } default: return false; } - return true; } // XXX: assuming numeric - // if we get a symbol we should map it to the numeric value and dispatch on that -static inline bool consume_descriptor(pni_consumer_t* consumer, uint64_t *descriptor) { - return - consume_expected_ubyte(consumer, PNE_DESCRIPTOR) && - consume_ulong(consumer, descriptor); +static inline bool consume_descriptor(pni_consumer_t* consumer, pni_consumer_t *subconsumer, uint64_t *descriptor) { + *descriptor = 0; + *subconsumer = (pni_consumer_t){.output_start=consumer->output_start+consumer->position, .position=0, .size=0}; + uint8_t type; + if (!pni_consumer_readf8(consumer, &type)) return false; + switch (type) { + case PNE_DESCRIPTOR: { + bool lq = consume_ulong(consumer, descriptor); + size_t sposition = consumer->position; + uint8_t type; + consume_single_value_not_described(consumer, &type); + *subconsumer = (pni_consumer_t){.output_start=consumer->output_start+sposition, .position=0, .size=consumer->position-sposition}; + return lq; + } + default: + pni_consumer_skip_value_not_described(consumer, type); + return false; + } +} + +static inline bool consume_list(pni_consumer_t* consumer, pni_consumer_t *subconsumer, uint32_t *count) { + *subconsumer = (pni_consumer_t){.output_start=consumer->output_start+consumer->position, .position=0, .size=0}; + *count = 0; + uint8_t type; + if (!pni_consumer_readf8(consumer, &type)) return false; + switch (type) { + case PNE_LIST32: { + uint32_t s; + if (!pni_consumer_readf32(consumer, &s)) return false; + *subconsumer = (pni_consumer_t){.output_start=consumer->output_start+consumer->position, .position=0, .size=s}; + consumer->position += s; + return pni_consumer_readf32(subconsumer, count); + } + case PNE_LIST8: { + uint8_t s; + if (!pni_consumer_readf8(consumer, &s)) return false; + *subconsumer = (pni_consumer_t){.output_start=consumer->output_start+consumer->position, .position=0, .size=s}; + consumer->position += s; + uint8_t c; + if (!pni_consumer_readf8(subconsumer, &c)) return false; + *count = c; + return true; + } + case PNE_LIST0: + return true; + default: + pni_consumer_skip_value(consumer, type); + return false; + } +} + +// TODO: This is currently a placeholder - maybe not actually needed +static inline bool consume_end_list(pni_consumer_t *consumer) { + return true; +} + +static inline bool consume_array(pni_consumer_t* consumer, pni_consumer_t *subconsumer, uint32_t *count, uint8_t *element_type) { + *subconsumer = (pni_consumer_t){.output_start=consumer->output_start+consumer->position, .position=0, .size=0}; + *count = 0; + *element_type = 0; + uint8_t type; + if (!pni_consumer_readf8(consumer, &type)) return false; + switch (type) { + case PNE_ARRAY32: { + uint32_t s; + if (!pni_consumer_readf32(consumer, &s)) return false; + *subconsumer = (pni_consumer_t){.output_start=consumer->output_start+consumer->position, .position=0, .size=s}; + consumer->position += s; + if (!pni_consumer_readf32(subconsumer, count)) return false; + return pni_consumer_readf8(subconsumer, element_type); + } + case PNE_ARRAY8: { + uint8_t s; + if (!pni_consumer_readf8(consumer, &s)) return false; + *subconsumer = (pni_consumer_t){.output_start=consumer->output_start+consumer->position, .position=0, .size=s}; + consumer->position += s; + uint8_t c; + if (!pni_consumer_readf8(subconsumer, &c)) return false; + *count = c; + return pni_consumer_readf8(subconsumer, element_type); + } + default: + pni_consumer_skip_value(consumer, type); + return false; + } +} + +static inline bool consume_described_anything(pni_consumer_t* consumer) { + uint64_t type; + pni_consumer_t subconsumer; + return consume_descriptor(consumer, &subconsumer, &type); +} + +static inline bool consume_described_type_anything(pni_consumer_t* consumer, uint64_t *type) { + pni_consumer_t subconsumer; + return consume_descriptor(consumer, &subconsumer, type); +} + +static inline bool consume_described_maybe_type_anything(pni_consumer_t* consumer, bool *qtype, uint64_t *type) { + pni_consumer_t subconsumer; + *qtype = consume_descriptor(consumer, &subconsumer, type); + return *qtype; +} + +static inline bool consume_copy(pni_consumer_t *consumer, pn_data_t *data) { + size_t iposition = consumer->position; + uint8_t type; + bool tq = consume_single_value(consumer, &type); + if (!tq || type==PNE_NULL) return false; + + pn_bytes_t value = {.size = consumer->position-iposition, .start = (const char*)consumer->output_start+iposition}; + ssize_t err = pn_data_decode(data, value.start, value.size); + return err>=0 && err==(ssize_t)value.size; +} + +static inline bool consume_described_maybe_type_raw(pni_consumer_t *consumer, bool *qtype, uint64_t *type, pn_bytes_t *raw) { + pni_consumer_t subconsumer; + *qtype = consume_descriptor(consumer, &subconsumer, type); + return *qtype && consume_raw(&subconsumer, raw); +} + +static inline bool consume_described_maybe_type_maybe_anything(pni_consumer_t *consumer, bool *qtype, uint64_t *type, bool *qanything) { + pni_consumer_t subconsumer; + *qtype = consume_descriptor(consumer, &subconsumer, type); + *qanything = consume_anything(&subconsumer); + return *qtype && *qanything; +} + +static inline bool consume_described_copy(pni_consumer_t *consumer, pn_data_t *data) { + pni_consumer_t subconsumer; + uint64_t type; + return consume_descriptor(consumer, &subconsumer, &type) && consume_copy(&subconsumer, data); +} + +static inline bool consume_string(pni_consumer_t *consumer, pn_bytes_t *string) { + uint8_t type; + *string = (pn_bytes_t){.size=0, .start=0}; + if (!pni_consumer_readf8(consumer, &type)) return false; + switch (type) { + case PNE_STR32_UTF8: { + return pni_consumer_readv32(consumer, string); + } + case PNE_STR8_UTF8: { + return pni_consumer_readv8(consumer, string); + } + default: + pni_consumer_skip_value(consumer, type); + return false; + } +} + +static inline bool consume_symbol(pni_consumer_t *consumer, pn_bytes_t *symbol) { + uint8_t type; + *symbol = (pn_bytes_t){.size=0, .start=0}; + if (!pni_consumer_readf8(consumer, &type)) return false; + switch (type) { + case PNE_SYM32:{ + return pni_consumer_readv32(consumer, symbol); + } + case PNE_SYM8:{ + return pni_consumer_readv8(consumer, symbol); + } + default: + pni_consumer_skip_value(consumer, type); + return false; + } +} + +static inline bool consume_binaryornull(pni_consumer_t *consumer, pn_bytes_t *binary) { + uint8_t type; + *binary = (pn_bytes_t){.size=0, .start=0}; + if (!pni_consumer_readf8(consumer, &type)) return false; + switch (type) { + case PNE_NULL:{ + return true; + } + case PNE_VBIN32:{ + return pni_consumer_readv32(consumer, binary); + } + case PNE_VBIN8:{ + return pni_consumer_readv8(consumer, binary); + } + default: + pni_consumer_skip_value(consumer, type); + return false; + } } #endif // PROTON_CONSUMERS_H diff --git a/c/src/core/dispatcher.c b/c/src/core/dispatcher.c index 97e8c0a..2498943 100644 --- a/c/src/core/dispatcher.c +++ b/c/src/core/dispatcher.c @@ -86,7 +86,8 @@ static int pni_dispatch_frame(pn_frame_t frame, pn_logger_t *logger, pn_transpor uint64_t lcode; pni_consumer_t consumer = make_consumer_from_bytes(frame_payload); - if (!consume_descriptor(&consumer, &lcode)) { + pni_consumer_t subconsumer; + if (!consume_descriptor(&consumer, &subconsumer, &lcode)) { PN_LOG(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, "Error dispatching frame"); return PN_ERR; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
