This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit a3d71b6defa17a4ac59a72f45077ed07d1c30d81
Author: Andrew Stitcher <[email protected]>
AuthorDate: Wed Jun 23 11:59:06 2021 -0400

    PROTON-2451: Consume functions for use in receiving amqp frames and 
decoding messages
    
    Also modify dispatch code to use the finished consume_descriptor() function.
---
 c/src/core/consumers.h  | 387 ++++++++++++++++++++++++++++++++++++++++++++++--
 c/src/core/dispatcher.c |   3 +-
 2 files changed, 375 insertions(+), 15 deletions(-)

diff --git a/c/src/core/consumers.h b/c/src/core/consumers.h
index d2542b1..df240c8 100644
--- a/c/src/core/consumers.h
+++ b/c/src/core/consumers.h
@@ -105,13 +105,39 @@ static inline bool pni_consumer_readf64(pni_consumer_t 
*consumer, uint64_t* resu
 
 static inline bool pni_consumer_readf128(pni_consumer_t *consumer, void *dst)
 {
-  if (consumer->position+16 > consumer->size) return false;
-
+  if (consumer->position+16 > consumer->size) {
+    consumer->position = consumer->size;
+    return false;
+  }
   memcpy(dst, &consumer->output_start[consumer->position], 16);
   consumer->position += 16;
   return true;
 }
 
+static inline bool pni_consumer_readv8(pni_consumer_t *consumer, pn_bytes_t* 
bytes){
+  uint8_t size;
+  if (!pni_consumer_readf8(consumer, &size)) return false;
+  if (consumer->position+size > consumer->size) {
+    consumer->position = consumer->size;
+    return false;
+  }
+  *bytes = (pn_bytes_t){.size=size,.start=(const char 
*)consumer->output_start+consumer->position};
+  consumer->position += size;
+  return true;
+}
+
+static inline bool pni_consumer_readv32(pni_consumer_t *consumer, pn_bytes_t* 
bytes){
+  uint32_t size;
+  if (!pni_consumer_readf32(consumer, &size)) return false;
+  if (consumer->position+size > consumer->size) {
+    consumer->position = consumer->size;
+    return false;
+  }
+  *bytes = (pn_bytes_t){.size=size,.start=(const char 
*)consumer->output_start+consumer->position};
+  consumer->position += size;
+  return true;
+}
+
 static inline bool pni_consumer_read_value_not_described(pni_consumer_t* 
consumer, uint8_t type, pn_bytes_t *value) {
   uint8_t subcategory = type >> 4;
   switch (subcategory) {
@@ -180,15 +206,69 @@ static inline bool 
pni_consumer_read_value_not_described(pni_consumer_t* consume
   return false;
 }
 
+static inline bool pni_consumer_skip_value_not_described(pni_consumer_t* 
consumer, uint8_t type) {
+  pn_bytes_t value;
+  return pni_consumer_read_value_not_described(consumer, type, &value);
+}
+
+static inline bool pni_consumer_skip_value(pni_consumer_t* consumer, uint8_t 
type) {
+  // Check for described type
+  if (type==0) {
+    // Skip descriptor
+    if (!pni_consumer_readf8(consumer, &type)) return false;
+    if (!pni_consumer_skip_value_not_described(consumer, type)) return false;
+    if (!pni_consumer_readf8(consumer, &type)) return false;
+    return pni_consumer_skip_value_not_described(consumer, type);
+  }
+  return pni_consumer_skip_value_not_described(consumer, type);
+}
+
 ///////////////////////////////////////////////////////////////////////////////
 
-static inline bool consume_expected_ubyte(pni_consumer_t* consumer, uint8_t 
expected)
-{
-    uint8_t e;
-    return pni_consumer_readf8(consumer, &e) && e==expected;
+static inline bool consume_single_value_not_described(pni_consumer_t* 
consumer, uint8_t* type) {
+  uint8_t t;
+  if (!pni_consumer_readf8(consumer, &t)) return false;
+  if (!pni_consumer_skip_value_not_described(consumer, t)) return false;
+  if (t==0) return false;
+  *type = t;
+  return true;
+}
+
+static inline bool consume_single_value(pni_consumer_t* consumer, uint8_t* 
type) {
+  uint8_t t;
+  if (!pni_consumer_readf8(consumer, &t)) return false;
+  *type = t;
+  if (t==0) {
+    uint8_t dummy;
+    // Descriptor
+    bool dq = consume_single_value_not_described(consumer, &dummy);
+    // Value
+    bool vq = consume_single_value_not_described(consumer, &dummy);
+    return dq && vq;
+  } else {
+    return pni_consumer_skip_value(consumer, t);
+  }
+}
+
+static inline bool consume_raw(pni_consumer_t* consumer, pn_bytes_t* raw) {
+  size_t start = consumer->position;
+  uint8_t dummy;
+  bool succeed = consume_single_value(consumer, &dummy);
+  if (succeed) {
+    *raw = (pn_bytes_t){.size=consumer->position-start, .start=(const 
char*)consumer->output_start+start};
+  } else {
+    *raw = (pn_bytes_t) {0, NULL};
+  }
+  return succeed;
+}
+
+static inline bool consume_anything(pni_consumer_t* consumer) {
+  uint8_t dummy;
+  return consume_single_value(consumer, &dummy);
 }
 
 static inline bool consume_ulong(pni_consumer_t* consumer, uint64_t *ulong) {
+  *ulong = 0;
   uint8_t type;
   if (!pni_consumer_readf8(consumer, &type)) return false;
   switch (type) {
@@ -196,30 +276,309 @@ static inline bool consume_ulong(pni_consumer_t* 
consumer, uint64_t *ulong) {
       uint8_t ul;
       if (!pni_consumer_readf8(consumer, &ul)) return false;
       *ulong = ul;
-      break;
+      return true;
     }
     case PNE_ULONG: {
       uint64_t ul;
       if (!pni_consumer_readf64(consumer, &ul)) return false;
       *ulong = ul;
-      break;
+      return true;
     }
     case PNE_ULONG0: {
       *ulong = 0;
-      break;
+      return true;
+    }
+    default:
+      pni_consumer_skip_value(consumer, type);
+      return false;
+  }
+}
+
+static inline bool consume_uint(pni_consumer_t* consumer, uint32_t *uint) {
+  *uint = 0;
+  uint8_t type;
+  if (!pni_consumer_readf8(consumer, &type)) return false;
+  switch (type) {
+    case PNE_SMALLUINT: {
+      uint8_t ui;
+      if (!pni_consumer_readf8(consumer, &ui)) return false;
+      *uint = ui;
+      return true;
+    }
+    case PNE_UINT: {
+      uint32_t ui;
+      if (!pni_consumer_readf32(consumer, &ui)) return false;
+      *uint = ui;
+      return true;
+    }
+    case PNE_UINT0: {
+      *uint = 0;
+      return true;
+    }
+    default:
+      pni_consumer_skip_value(consumer, type);
+      return false;
+  }
+}
+
+static inline bool consume_ushort(pni_consumer_t* consumer, uint16_t *ushort) {
+  *ushort = 0;
+  uint8_t type;
+  if (!pni_consumer_readf8(consumer, &type)) return false;
+  switch (type) {
+    case PNE_USHORT: {
+      uint16_t us;
+      if (!pni_consumer_readf16(consumer, &us)) return false;
+      *ushort = us;
+      return true;
+    }
+    default:
+      pni_consumer_skip_value(consumer, type);
+      return false;
+  }
+}
+
+static inline bool consume_ubyte(pni_consumer_t* consumer, uint8_t *ubyte) {
+  *ubyte = 0;
+  uint8_t type;
+  if (!pni_consumer_readf8(consumer, &type)) return false;
+  switch (type) {
+    case PNE_UBYTE: {
+      uint8_t ub;
+      if (!pni_consumer_readf8(consumer, &ub)) return false;
+      *ubyte = ub;
+      return true;
+    }
+    default:
+      pni_consumer_skip_value(consumer, type);
+      return false;
+  }
+}
+
+static inline bool consume_bool(pni_consumer_t* consumer, bool *b) {
+  *b = false;
+  uint8_t type;
+  if (!pni_consumer_readf8(consumer, &type)) return false;
+  switch (type) {
+    case PNE_BOOLEAN: {
+      uint8_t ub;
+      if (!pni_consumer_readf8(consumer, &ub)) return false;
+      *b = ub;
+      return true;
+    }
+    case PNE_FALSE:
+      *b = false;
+      return true;
+    case PNE_TRUE:
+      *b = true;
+      return true;
+    default:
+      pni_consumer_skip_value(consumer, type);
+      return false;
+  }
+}
+
+static inline bool consume_timestamp(pni_consumer_t* consumer, pn_timestamp_t 
*timestamp) {
+  *timestamp = 0;
+  uint8_t type;
+  if (!pni_consumer_readf8(consumer, &type)) return false;
+  switch (type) {
+    case PNE_MS64: {
+      return pni_consumer_readf64(consumer, (uint64_t*)timestamp);
     }
     default:
       return false;
   }
-  return true;
 }
 
 // XXX: assuming numeric -
 // if we get a symbol we should map it to the numeric value and dispatch on 
that
-static inline bool consume_descriptor(pni_consumer_t* consumer, uint64_t 
*descriptor) {
-  return
-    consume_expected_ubyte(consumer, PNE_DESCRIPTOR) &&
-    consume_ulong(consumer, descriptor);
+static inline bool consume_descriptor(pni_consumer_t* consumer, pni_consumer_t 
*subconsumer, uint64_t *descriptor) {
+  *descriptor = 0;
+  *subconsumer = 
(pni_consumer_t){.output_start=consumer->output_start+consumer->position, 
.position=0, .size=0};
+  uint8_t type;
+  if (!pni_consumer_readf8(consumer, &type)) return false;
+  switch (type) {
+    case PNE_DESCRIPTOR: {
+      bool lq = consume_ulong(consumer, descriptor);
+      size_t sposition = consumer->position;
+      uint8_t type;
+      consume_single_value_not_described(consumer, &type);
+      *subconsumer = 
(pni_consumer_t){.output_start=consumer->output_start+sposition, .position=0, 
.size=consumer->position-sposition};
+      return lq;
+    }
+    default:
+      pni_consumer_skip_value_not_described(consumer, type);
+      return false;
+  }
+}
+
+static inline bool consume_list(pni_consumer_t* consumer, pni_consumer_t 
*subconsumer, uint32_t *count) {
+  *subconsumer = 
(pni_consumer_t){.output_start=consumer->output_start+consumer->position, 
.position=0, .size=0};
+  *count = 0;
+  uint8_t type;
+  if (!pni_consumer_readf8(consumer, &type)) return false;
+  switch (type) {
+    case PNE_LIST32: {
+      uint32_t s;
+      if (!pni_consumer_readf32(consumer, &s)) return false;
+      *subconsumer = 
(pni_consumer_t){.output_start=consumer->output_start+consumer->position, 
.position=0, .size=s};
+      consumer->position += s;
+      return pni_consumer_readf32(subconsumer, count);
+    }
+    case PNE_LIST8: {
+      uint8_t s;
+      if (!pni_consumer_readf8(consumer, &s)) return false;
+      *subconsumer = 
(pni_consumer_t){.output_start=consumer->output_start+consumer->position, 
.position=0, .size=s};
+      consumer->position += s;
+      uint8_t c;
+      if (!pni_consumer_readf8(subconsumer, &c)) return false;
+      *count = c;
+      return true;
+    }
+    case PNE_LIST0:
+      return true;
+    default:
+      pni_consumer_skip_value(consumer, type);
+      return false;
+  }
+}
+
+// TODO: This is currently a placeholder - maybe not actually needed
+static inline bool consume_end_list(pni_consumer_t *consumer) {
+  return true;
+}
+
+static inline bool consume_array(pni_consumer_t* consumer, pni_consumer_t 
*subconsumer, uint32_t *count, uint8_t *element_type) {
+  *subconsumer = 
(pni_consumer_t){.output_start=consumer->output_start+consumer->position, 
.position=0, .size=0};
+  *count = 0;
+  *element_type = 0;
+  uint8_t type;
+  if (!pni_consumer_readf8(consumer, &type)) return false;
+  switch (type) {
+    case PNE_ARRAY32: {
+      uint32_t s;
+      if (!pni_consumer_readf32(consumer, &s)) return false;
+      *subconsumer = 
(pni_consumer_t){.output_start=consumer->output_start+consumer->position, 
.position=0, .size=s};
+      consumer->position += s;
+      if (!pni_consumer_readf32(subconsumer, count)) return false;
+      return pni_consumer_readf8(subconsumer, element_type);
+    }
+    case PNE_ARRAY8: {
+      uint8_t s;
+      if (!pni_consumer_readf8(consumer, &s)) return false;
+      *subconsumer = 
(pni_consumer_t){.output_start=consumer->output_start+consumer->position, 
.position=0, .size=s};
+      consumer->position += s;
+      uint8_t c;
+      if (!pni_consumer_readf8(subconsumer, &c)) return false;
+      *count = c;
+      return pni_consumer_readf8(subconsumer, element_type);
+    }
+    default:
+      pni_consumer_skip_value(consumer, type);
+      return false;
+  }
+}
+
+static inline bool consume_described_anything(pni_consumer_t* consumer) {
+  uint64_t type;
+  pni_consumer_t subconsumer;
+  return consume_descriptor(consumer, &subconsumer, &type);
+}
+
+static inline bool consume_described_type_anything(pni_consumer_t* consumer, 
uint64_t *type) {
+  pni_consumer_t subconsumer;
+  return consume_descriptor(consumer, &subconsumer, type);
+}
+
+static inline bool consume_described_maybe_type_anything(pni_consumer_t* 
consumer, bool *qtype, uint64_t *type) {
+  pni_consumer_t subconsumer;
+  *qtype = consume_descriptor(consumer, &subconsumer, type);
+  return *qtype;
+}
+
+static inline bool consume_copy(pni_consumer_t *consumer, pn_data_t *data) {
+  size_t iposition = consumer->position;
+  uint8_t type;
+  bool tq = consume_single_value(consumer, &type);
+  if (!tq || type==PNE_NULL) return false;
+
+  pn_bytes_t value = {.size = consumer->position-iposition, .start = (const 
char*)consumer->output_start+iposition};
+  ssize_t err = pn_data_decode(data, value.start, value.size);
+  return err>=0 && err==(ssize_t)value.size;
+}
+
+static inline bool consume_described_maybe_type_raw(pni_consumer_t *consumer, 
bool *qtype, uint64_t *type, pn_bytes_t *raw) {
+  pni_consumer_t subconsumer;
+  *qtype = consume_descriptor(consumer, &subconsumer, type);
+  return *qtype && consume_raw(&subconsumer, raw);
+}
+
+static inline bool consume_described_maybe_type_maybe_anything(pni_consumer_t 
*consumer, bool *qtype, uint64_t *type, bool *qanything) {
+  pni_consumer_t subconsumer;
+  *qtype = consume_descriptor(consumer, &subconsumer, type);
+  *qanything = consume_anything(&subconsumer);
+  return *qtype && *qanything;
+}
+
+static inline bool consume_described_copy(pni_consumer_t *consumer, pn_data_t 
*data) {
+  pni_consumer_t subconsumer;
+  uint64_t type;
+  return consume_descriptor(consumer, &subconsumer, &type) && 
consume_copy(&subconsumer, data);
+}
+
+static inline bool consume_string(pni_consumer_t *consumer, pn_bytes_t 
*string) {
+  uint8_t type;
+  *string = (pn_bytes_t){.size=0, .start=0};
+  if (!pni_consumer_readf8(consumer, &type)) return false;
+  switch (type) {
+    case PNE_STR32_UTF8: {
+      return pni_consumer_readv32(consumer, string);
+    }
+    case PNE_STR8_UTF8: {
+      return pni_consumer_readv8(consumer, string);
+    }
+    default:
+      pni_consumer_skip_value(consumer, type);
+      return false;
+  }
+}
+
+static inline bool consume_symbol(pni_consumer_t *consumer, pn_bytes_t 
*symbol) {
+  uint8_t type;
+  *symbol = (pn_bytes_t){.size=0, .start=0};
+  if (!pni_consumer_readf8(consumer, &type)) return false;
+  switch (type) {
+    case PNE_SYM32:{
+      return pni_consumer_readv32(consumer, symbol);
+    }
+    case PNE_SYM8:{
+      return pni_consumer_readv8(consumer, symbol);
+    }
+    default:
+      pni_consumer_skip_value(consumer, type);
+      return false;
+  }
+}
+
+static inline bool consume_binaryornull(pni_consumer_t *consumer, pn_bytes_t 
*binary) {
+  uint8_t type;
+  *binary  = (pn_bytes_t){.size=0, .start=0};
+  if (!pni_consumer_readf8(consumer, &type)) return false;
+  switch (type) {
+    case PNE_NULL:{
+      return true;
+    }
+    case PNE_VBIN32:{
+      return pni_consumer_readv32(consumer, binary);
+    }
+    case PNE_VBIN8:{
+      return pni_consumer_readv8(consumer, binary);
+    }
+    default:
+      pni_consumer_skip_value(consumer, type);
+      return false;
+  }
 }
 
 #endif // PROTON_CONSUMERS_H
diff --git a/c/src/core/dispatcher.c b/c/src/core/dispatcher.c
index 97e8c0a..2498943 100644
--- a/c/src/core/dispatcher.c
+++ b/c/src/core/dispatcher.c
@@ -86,7 +86,8 @@ static int pni_dispatch_frame(pn_frame_t frame, pn_logger_t 
*logger, pn_transpor
 
   uint64_t lcode;
   pni_consumer_t consumer = make_consumer_from_bytes(frame_payload);
-  if (!consume_descriptor(&consumer, &lcode)) {
+  pni_consumer_t subconsumer;
+  if (!consume_descriptor(&consumer, &subconsumer, &lcode)) {
     PN_LOG(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, "Error dispatching 
frame");
     return PN_ERR;
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to