This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 72c87537a136dbfc63a0c811d159631ecd0e3986
Author: Andrew Stitcher <[email protected]>
AuthorDate: Fri Jul 30 13:37:41 2021 -0400

    PROTON-2451: Use generated code in transport/message/sasl receive code
---
 c/src/core/message.c   | 142 +++++++++++++++++++++++++---
 c/src/core/transport.c | 249 ++++++++++++++++++++++++++++++++++++++++++++++---
 c/src/sasl/sasl.c      |  73 +++++++++++++--
 3 files changed, 430 insertions(+), 34 deletions(-)

diff --git a/c/src/core/message.c b/c/src/core/message.c
index 55bbb18..a94c034 100644
--- a/c/src/core/message.c
+++ b/c/src/core/message.c
@@ -28,6 +28,7 @@
 
 #ifndef GENERATE_CODEC_CODE
 #include "core/frame_generators.h"
+#include "core/frame_consumers.h"
 #endif
 
 #include <proton/link.h>
@@ -55,12 +56,13 @@ struct pn_message_t {
   pn_string_t *group_id;
   pn_string_t *reply_to_group_id;
 
-  pn_data_t *data;
   pn_data_t *instructions;
   pn_data_t *annotations;
   pn_data_t *properties;
   pn_data_t *body;
-
+#ifdef GENERATE_CODEC_CODE
+  pn_data_t *data;
+#endif
   pn_error_t *error;
 
   pn_sequence_t group_sequence;
@@ -87,7 +89,9 @@ void pn_message_finalize(void *obj)
   pn_free(msg->reply_to_group_id);
   pn_data_free(msg->id);
   pn_data_free(msg->correlation_id);
+#ifdef GENERATE_CODEC_CODE
   pn_data_free(msg->data);
+#endif
   pn_data_free(msg->instructions);
   pn_data_free(msg->annotations);
   pn_data_free(msg->properties);
@@ -333,7 +337,9 @@ static pn_message_t *pni_message_new(size_t size)
   msg->reply_to_group_id = pn_string(NULL);
 
   msg->inferred = false;
+#ifdef GENERATE_CODEC_CODE
   msg->data = pn_data(16);
+#endif
   msg->instructions = pn_data(16);
   msg->annotations = pn_data(16);
   msg->properties = pn_data(16);
@@ -387,7 +393,9 @@ void pn_message_clear(pn_message_t *msg)
   msg->group_sequence = 0;
   pn_string_clear(msg->reply_to_group_id);
   msg->inferred = false;
+#ifdef GENERATE_CODEC_CODE
   pn_data_clear(msg->data);
+#endif
   pn_data_clear(msg->instructions);
   pn_data_clear(msg->annotations);
   pn_data_clear(msg->properties);
@@ -652,6 +660,7 @@ int pn_message_set_reply_to_group_id(pn_message_t *msg, 
const char *reply_to_gro
   return pn_string_set(msg->reply_to_group_id, reply_to_group_id);
 }
 
+#ifdef GENERATE_CODEC_CODE
 int pn_message_decode(pn_message_t *msg, const char *bytes, size_t size)
 {
   assert(msg && bytes && size);
@@ -729,31 +738,36 @@ int pn_message_decode(pn_message_t *msg, const char 
*bytes, size_t size)
       }
       break;
     case DELIVERY_ANNOTATIONS:
-      pn_data_narrow(msg->data);
-      err = pn_data_copy(msg->instructions, msg->data);
+      pn_data_clear(msg->instructions);
+      err = pn_data_scan(msg->data, "D.C", msg->instructions);
+      pn_data_rewind(msg->instructions);
       if (err) return err;
       break;
     case MESSAGE_ANNOTATIONS:
-      pn_data_narrow(msg->data);
-      err = pn_data_copy(msg->annotations, msg->data);
+      pn_data_clear(msg->annotations);
+      err = pn_data_scan(msg->data, "D.C", msg->annotations);
+      pn_data_rewind(msg->annotations);
       if (err) return err;
       break;
     case APPLICATION_PROPERTIES:
-      pn_data_narrow(msg->data);
-      err = pn_data_copy(msg->properties, msg->data);
+      pn_data_clear(msg->properties);
+      err = pn_data_scan(msg->data, "D.C", msg->properties);
+      pn_data_rewind(msg->properties);
       if (err) return err;
       break;
     case DATA:
     case AMQP_SEQUENCE:
       msg->inferred = true;
-      pn_data_narrow(msg->data);
-      err = pn_data_copy(msg->body, msg->data);
+      pn_data_clear(msg->body);
+      err = pn_data_scan(msg->data, "D.C", msg->body);
+      pn_data_rewind(msg->body);
       if (err) return err;
       break;
     case AMQP_VALUE:
       msg->inferred = false;
-      pn_data_narrow(msg->data);
-      err = pn_data_copy(msg->body, msg->data);
+      pn_data_clear(msg->body);
+      err = pn_data_scan(msg->data, "D.C", msg->body);
+      pn_data_rewind(msg->body);
       if (err) return err;
       break;
     case FOOTER:
@@ -768,6 +782,110 @@ int pn_message_decode(pn_message_t *msg, const char 
*bytes, size_t size)
   pn_data_clear(msg->data);
   return 0;
 }
+#else
+int pn_message_decode(pn_message_t *msg, const char *bytes, size_t size)
+{
+  assert(msg && bytes && size);
+
+  pn_bytes_t msg_bytes = {.size=size, .start=bytes};
+  while (msg_bytes.size) {
+    bool scanned;
+    uint64_t desc;
+    size_t section_size = pn_amqp_decode_DQLq(msg_bytes, &scanned, &desc);
+    if (!scanned) {
+      desc = 0;
+    }
+
+    switch (desc) {
+      case HEADER: {
+        bool priority_q;
+        uint8_t priority;
+        pn_amqp_decode_DqEoQBIoIe(msg_bytes,
+                                  &msg->durable,
+                                  &priority_q, &priority,
+                                  &msg->ttl,
+                                  &msg->first_acquirer,
+                                  &msg->delivery_count);
+        msg->priority = priority_q ? priority : HEADER_PRIORITY_DEFAULT;
+        break;
+      }
+      case PROPERTIES: {
+        pn_bytes_t user_id, address, subject, reply_to, ctype, cencoding,
+        group_id, reply_to_group_id;
+        pn_data_clear(msg->id);
+        pn_data_clear(msg->correlation_id);
+        pn_amqp_decode_DqECzSSSCssttSISe(msg_bytes,  msg->id,
+                           &user_id, &address, &subject, &reply_to,
+                           msg->correlation_id, &ctype, &cencoding,
+                           &msg->expiry_time, &msg->creation_time, &group_id,
+                           &msg->group_sequence, &reply_to_group_id);
+        int err = pn_string_set_bytes(msg->user_id, user_id);
+        if (err) return pn_error_format(msg->error, err, "error setting 
user_id");
+        err = pn_string_setn(msg->address, address.start, address.size);
+        if (err) return pn_error_format(msg->error, err, "error setting 
address");
+        err = pn_string_setn(msg->subject, subject.start, subject.size);
+        if (err) return pn_error_format(msg->error, err, "error setting 
subject");
+        err = pn_string_setn(msg->reply_to, reply_to.start, reply_to.size);
+        if (err) return pn_error_format(msg->error, err, "error setting 
reply_to");
+        err = pn_string_setn(msg->content_type, ctype.start, ctype.size);
+        if (err) return pn_error_format(msg->error, err, "error setting 
content_type");
+        err = pn_string_setn(msg->content_encoding, cencoding.start,
+                             cencoding.size);
+        if (err) return pn_error_format(msg->error, err, "error setting 
content_encoding");
+        err = pn_string_setn(msg->group_id, group_id.start, group_id.size);
+        if (err) return pn_error_format(msg->error, err, "error setting 
group_id");
+        err = pn_string_setn(msg->reply_to_group_id, reply_to_group_id.start,
+                             reply_to_group_id.size);
+        if (err) return pn_error_format(msg->error, err, "error setting 
reply_to_group_id");
+        break;
+      }
+      case DELIVERY_ANNOTATIONS: {
+        pn_data_clear(msg->instructions);
+        pn_amqp_decode_DqC(msg_bytes, msg->instructions);
+        pn_data_rewind(msg->instructions);
+        break;
+      }
+      case MESSAGE_ANNOTATIONS: {
+        pn_data_clear(msg->annotations);
+        pn_amqp_decode_DqC(msg_bytes, msg->annotations);
+        pn_data_rewind(msg->annotations);
+        break;
+      }
+      case APPLICATION_PROPERTIES: {
+        pn_data_clear(msg->properties);
+        pn_amqp_decode_DqC(msg_bytes, msg->properties);
+        pn_data_rewind(msg->properties);
+        break;
+      }
+      case DATA:
+      case AMQP_SEQUENCE: {
+        msg->inferred = true;
+        pn_data_clear(msg->body);
+        pn_amqp_decode_DqC(msg_bytes, msg->body);
+        pn_data_rewind(msg->body);
+        break;
+      }
+      case AMQP_VALUE: {
+        msg->inferred = false;
+        pn_data_clear(msg->body);
+        pn_amqp_decode_DqC(msg_bytes, msg->body);
+        pn_data_rewind(msg->body);
+        break;
+      }
+      case FOOTER:
+        break;
+      default: {
+        pn_data_clear(msg->body);
+        pn_data_decode(msg->body, msg_bytes.start, msg_bytes.size);
+        pn_data_rewind(msg->body);
+        break;
+      }
+    }
+    msg_bytes = (pn_bytes_t){.size=msg_bytes.size-section_size, 
.start=msg_bytes.start+section_size};
+  }
+  return 0;
+}
+#endif
 
 #ifdef GENERATE_CODEC_CODE
 int pn_message_encode(pn_message_t *msg, char *bytes, size_t *size)
diff --git a/c/src/core/transport.c b/c/src/core/transport.c
index 9c5bf99..bebb5b8 100644
--- a/c/src/core/transport.c
+++ b/c/src/core/transport.c
@@ -23,6 +23,7 @@
 #include "framing.h"
 #ifndef GENERATE_CODEC_CODE
 #include "core/frame_generators.h"
+#include "core/frame_consumers.h"
 #endif
 #include "memory.h"
 #include "platform/platform.h"
@@ -1079,6 +1080,7 @@ int pn_do_open(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel,
   pn_data_clear(transport->remote_desired_capabilities);
   pn_data_clear(transport->remote_properties);
 
+#ifdef GENERATE_CODEC_CODE
   pn_data_t *args = transport->args;
   ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
   if (dsize < 0) return dsize;
@@ -1092,6 +1094,17 @@ int pn_do_open(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel,
                          transport->remote_desired_capabilities,
                          transport->remote_properties);
   if (err) return err;
+#else
+  pn_amqp_decode_DqEQSQSQIQHIqqCCCe(payload,
+                                    &container_q, &remote_container,
+                                    &hostname_q, &remote_hostname,
+                                    &remote_max_frame_q, &remote_max_frame,
+                                    &remote_channel_max_q, &remote_channel_max,
+                                    &transport->remote_idle_timeout,
+                                    transport->remote_offered_capabilities,
+                                    transport->remote_desired_capabilities,
+                                    transport->remote_properties);
+#endif
   /*
    * The default value is already stored in the variable.
    * But the scanner zeroes out values if it does not
@@ -1128,16 +1141,19 @@ int pn_do_open(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel,
 
 int pn_do_begin(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload)
 {
-
   bool reply;
   uint16_t remote_channel;
   pn_sequence_t next;
 
+#ifdef GENERATE_CODEC_CODE
   pn_data_t *args = transport->args;
   ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
   if (dsize < 0) return dsize;
   int err = pn_data_scan(args, "D.[?HI]", &reply, &remote_channel, &next);
   if (err) return err;
+#else
+  pn_amqp_decode_DqEQHIe(payload, &reply, &remote_channel, &next);
+#endif
 
   // AMQP 1.0 section 2.7.1 - if the peer doesn't honor our channel_max --
   // express our displeasure by closing the connection with a framing error.
@@ -1256,6 +1272,7 @@ int pn_do_attach(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel
   uint64_t max_msgsz;
   bool has_props;
 
+#ifdef GENERATE_CODEC_CODE
   pn_data_t *args = transport->args;
   ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
   if (dsize < 0) return dsize;
@@ -1271,6 +1288,17 @@ int pn_do_attach(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel
     pn_free(rem_props);
     return err;
   }
+#else
+  pn_data_t *rem_props = pn_data(0);
+  pn_amqp_decode_DqESIoQBQBDqESIsIoqseDqESIsIoeqqILqqQCe(payload,
+                                                         &name, &handle,
+                                                         &is_sender,
+                                                         &snd_settle, 
&snd_settle_mode,
+                                                         &rcv_settle, 
&rcv_settle_mode,
+                                                         &source, &src_dr, 
&src_exp, &src_timeout, &src_dynamic, &dist_mode,
+                                                         &target, &tgt_dr, 
&tgt_exp, &tgt_timeout, &tgt_dynamic,
+                                                         &idc, &max_msgsz, 
&has_props, rem_props);
+#endif
   char strbuf[128];      // avoid malloc for most link names
   char *strheap = (name.size >= sizeof(strbuf)) ? (char *) malloc(name.size + 
1) : NULL;
   char *strname = strheap ? strheap : strbuf;
@@ -1333,8 +1361,12 @@ int pn_do_attach(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel
     pn_terminus_set_dynamic(rtgt, tgt_dynamic);
   } else {
     uint64_t code = 0;
+#ifdef GENERATE_CODEC_CODE
     err = pn_data_scan(args, "D.[.....D..DL....]", &code);
     if (err) return err;
+#else
+    pn_amqp_decode_DqEqqqqqDqqDLqqqqe(payload, &code);
+#endif
     if (code == COORDINATOR) {
       pn_terminus_set_type(rtgt, PN_COORDINATOR);
     } else if (code == TARGET) {
@@ -1354,6 +1386,7 @@ int pn_do_attach(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel
   pn_data_clear(link->remote_source.outcomes);
   pn_data_clear(link->remote_source.capabilities);
 
+#ifdef GENERATE_CODEC_CODE
   err = pn_data_scan(args, "D.[.....D.[.....C.C.CC]",
                      link->remote_source.properties,
                      link->remote_source.filter,
@@ -1361,6 +1394,13 @@ int pn_do_attach(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel
                      link->remote_source.capabilities);
 
   if (err) return err;
+#else
+  pn_amqp_decode_DqEqqqqqDqEqqqqqCqCqCCee(payload,
+                                          link->remote_source.properties,
+                                          link->remote_source.filter,
+                                          link->remote_source.outcomes,
+                                          link->remote_source.capabilities);
+#endif
 
   pn_data_rewind(link->remote_source.properties);
   pn_data_rewind(link->remote_source.filter);
@@ -1372,14 +1412,25 @@ int pn_do_attach(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel
 
   if (pn_terminus_get_type(&link->remote_target) == PN_COORDINATOR) {
     // coordinator target only has a capabilities field
+#ifdef GENERATE_CODEC_CODE
     err = pn_data_scan(args, "D.[.....D..D.[C]...]",
                        link->remote_target.capabilities);
     if (err) return err;
+#else
+    pn_amqp_decode_DqEqqqqqDqqDqECeqqqe(payload,
+                                        link->remote_target.capabilities);
+#endif
   } else {
+#ifdef GENERATE_CODEC_CODE
     err = pn_data_scan(args, "D.[.....D..D.[.....CC]",
                        link->remote_target.properties,
                        link->remote_target.capabilities);
     if (err) return err;
+#else
+    pn_amqp_decode_DqEqqqqqDqqDqEqqqqqCCee(payload,
+                                           link->remote_target.properties,
+                                           link->remote_target.capabilities);
+#endif
   }
 
   pn_data_rewind(link->remote_target.properties);
@@ -1421,15 +1472,23 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t 
frame_type, uint16_t chann
   bool has_type, settled_set;
   bool resume, aborted, batchable;
   uint64_t type;
-  pn_data_clear(transport->disp_data);
 
+#ifdef GENERATE_CODEC_CODE
   pn_data_t *args = transport->args;
   ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
   if (dsize < 0) return dsize;
+  pn_data_clear(transport->disp_data);
   int err = pn_data_scan(args, "D.[I?Iz.?oo.D?LCooo]", &handle, &id_present, 
&id, &tag,
                          &settled_set, &settled, &more, &has_type, &type, 
transport->disp_data,
                          &resume, &aborted, &batchable);
   if (err) return err;
+#else
+  pn_bytes_t disp_data;
+  size_t dsize =
+    pn_amqp_decode_DqEIQIzqQooqDQLRoooe(payload, &handle, &id_present, &id, 
&tag,
+                                        &settled_set, &settled, &more, 
&has_type, &type, &disp_data,
+                                        &resume, &aborted, &batchable);
+#endif
   payload.size -= dsize;
   payload.start += dsize;
 
@@ -1491,11 +1550,18 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t 
frame_type, uint16_t chann
                          "sequencing error, expected delivery-id %u, got %u",
                          state->id, id);
     }
+#ifdef GENERATE_CODEC_CODE
     if (has_type) {
       delivery->remote.type = type;
       pn_data_copy(delivery->remote.data, transport->disp_data);
     }
-
+#else
+    if (has_type) {
+      delivery->remote.type = type;
+      pn_data_clear(delivery->remote.data);
+      pn_data_decode(delivery->remote.data, disp_data.start, disp_data.size);
+    }
+#endif
     link->state.delivery_count++;
     link->state.link_credit--;
     link->queued++;
@@ -1551,6 +1617,7 @@ int pn_do_flow(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel,
   uint32_t handle;
   bool inext_init, handle_init, dcount_init, drain;
 
+#ifdef GENERATE_CODEC_CODE
   pn_data_t *args = transport->args;
   ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
   if (dsize < 0) return dsize;
@@ -1558,6 +1625,11 @@ int pn_do_flow(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel,
                          &onext, &owin, &handle_init, &handle, &dcount_init,
                          &delivery_count, &link_credit, &drain);
   if (err) return err;
+#else
+  pn_amqp_decode_DqEQIIIIQIQIIqoe(payload, &inext_init, &inext, &iwin,
+                                  &onext, &owin, &handle_init, &handle, 
&dcount_init,
+                                  &delivery_count, &link_credit, &drain);
+#endif
 
   pn_session_t *ssn = pni_channel_state(transport, channel);
   if (!ssn) {
@@ -1605,6 +1677,19 @@ int pn_do_flow(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel,
   return 0;
 }
 
+static void pn_condition_set(pn_condition_t *condition, pn_bytes_t cond, 
pn_bytes_t desc)
+{
+  if (condition->name == NULL) {
+    condition->name = pn_string(NULL);
+  }
+  pn_string_setn(condition->name, cond.start, cond.size);
+  if (condition->description == NULL) {
+    condition->description = pn_string(NULL);
+  }
+  pn_string_setn(condition->description, desc.start, desc.size);
+}
+
+#ifdef GENERATE_CODEC_CODE
 #define SCAN_ERROR_DEFAULT ("D.[D.[sSC]")
 #define SCAN_ERROR_DETACH ("D.[..D.[sSC]")
 #define SCAN_ERROR_DISP ("[D.[sSC]")
@@ -1616,22 +1701,17 @@ static int pn_scan_error(pn_data_t *data, 
pn_condition_t *condition, const char
   pn_condition_clear(condition);
   int err = pn_data_scan(data, fmt, &cond, &desc, 
pn_condition_info(condition));
   if (err) return err;
-  if (condition->name == NULL) {
-    condition->name = pn_string(NULL);
-  }
-  pn_string_setn(condition->name, cond.start, cond.size);
-  if (condition->description == NULL) {
-    condition->description = pn_string(NULL);
-  }
-  pn_string_setn(condition->description, desc.start, desc.size);
+  pn_condition_set(condition, cond,desc);
   pn_data_rewind(pn_condition_info(condition));
   return 0;
 }
+#endif
 
 static inline bool sequence_lte(pn_sequence_t a, pn_sequence_t b) {
   return b-a <= INT32_MAX;
 }
 
+#ifdef GENERATE_CODEC_CODE
 static int pni_do_delivery_disposition(pn_transport_t * transport, 
pn_delivery_t *delivery, bool settled, bool remote_data, bool type_init, 
uint64_t type) {
   pn_disposition_t *remote = &delivery->remote;
 
@@ -1694,15 +1774,81 @@ static int pni_do_delivery_disposition(pn_transport_t * 
transport, pn_delivery_t
   pn_collector_put(transport->connection->collector, PN_OBJECT, delivery, 
PN_DELIVERY);
   return 0;
 }
+#else
+static int pni_do_delivery_disposition(pn_transport_t * transport, 
pn_delivery_t *delivery, bool settled, bool remote_data, bool type_init, 
uint64_t type, pn_bytes_t disp_data) {
+  pn_disposition_t *remote = &delivery->remote;
+
+  if (type_init) remote->type = type;
+
+  if (remote_data) {
+    switch (type) {
+      case PN_RECEIVED: {
+        bool qnumber;
+        uint32_t number;
+        bool qoffset;
+        uint64_t offset;
+        pn_amqp_decode_DqEQIQLe(disp_data, &qnumber, &number, &qoffset, 
&offset);
+
+        if (qnumber) {
+          remote->section_number = number;
+        }
+        if (qoffset) {
+          remote->section_offset = offset;
+        }
+        break;
+      }
+      case PN_ACCEPTED:
+        break;
+
+      case PN_REJECTED: {
+        pn_bytes_t cond;
+        pn_bytes_t desc;
+        pn_amqp_decode_DqEDqEsSCee(disp_data, &cond, &desc, 
pn_condition_info(&remote->condition));
+        pn_condition_set(&remote->condition, cond, desc);
+
+        break;
+      }
+      case PN_RELEASED:
+        break;
+
+      case PN_MODIFIED: {
+        bool qfailed;
+        bool failed;
+        bool qundeliverable;
+        bool undeliverable;
+        pn_amqp_decode_DqEQoQoCe(disp_data, &qfailed, &failed, 
&qundeliverable, &undeliverable, remote->annotations);
+
+        if (qfailed) {
+          remote->failed = failed;
+        }
+        if (qundeliverable) {
+          remote->undeliverable = undeliverable;
+        }
+        break;
+      }
+      default:
+        pn_amqp_decode_DqC(disp_data, remote->data);
+        break;
+    }
+  }
+
+  remote->settled = settled;
+  delivery->updated = true;
+  pn_work_update(transport->connection, delivery);
+
+  pn_collector_put(transport->connection->collector, PN_OBJECT, delivery, 
PN_DELIVERY);
+  return 0;
+}
+#endif
 
 int pn_do_disposition(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload)
 {
   bool role;
   pn_sequence_t first, last;
-  uint64_t type = 0;
   bool last_init, settled;
-
+#ifdef GENERATE_CODEC_CODE
   bool type_init;
+  uint64_t type = 0;
   pn_data_clear(transport->disp_data);
 
   pn_data_t *args = transport->args;
@@ -1712,6 +1858,11 @@ int pn_do_disposition(pn_transport_t *transport, uint8_t 
frame_type, uint16_t ch
                          &last, &settled, &type_init, &type,
                          transport->disp_data);
   if (err) return err;
+#else
+  pn_bytes_t disp_data;
+  pn_amqp_decode_DqEoIQIoRe(payload, &role, &first, &last_init,
+                            &last, &settled, &disp_data);
+#endif
   if (!last_init) last = first;
 
   pn_session_t *ssn = pni_channel_state(transport, channel);
@@ -1730,6 +1881,7 @@ int pn_do_disposition(pn_transport_t *transport, uint8_t 
frame_type, uint16_t ch
     deliveries = &ssn->state.incoming;
   }
 
+#ifdef GENERATE_CODEC_CODE
   pn_data_rewind(transport->disp_data);
   bool remote_data = (pn_data_next(transport->disp_data) &&
                       pn_data_get_list(transport->disp_data) > 0);
@@ -1747,7 +1899,7 @@ int pn_do_disposition(pn_transport_t *transport, uint8_t 
frame_type, uint16_t ch
       pn_sequence_t key = pn_hash_key(dh, entry);
       if (sequence_lte(first, key) && sequence_lte(key, last)) {
         pn_delivery_t *delivery = (pn_delivery_t*) pn_hash_value(dh, entry);
-        err = pni_do_delivery_disposition(transport, delivery, settled, 
remote_data, type_init, type);
+        int err = pni_do_delivery_disposition(transport, delivery, settled, 
remote_data, type_init, type);
         if (err) return err;
       }
     }
@@ -1755,12 +1907,44 @@ int pn_do_disposition(pn_transport_t *transport, 
uint8_t frame_type, uint16_t ch
     for (pn_sequence_t id = first; sequence_lte(id, last); ++id) {
       pn_delivery_t *delivery = pni_delivery_map_get(deliveries, id);
       if (delivery) {
-        err = pni_do_delivery_disposition(transport, delivery, settled, 
remote_data, type_init, type);
+        int err = pni_do_delivery_disposition(transport, delivery, settled, 
remote_data, type_init, type);
         if (err) return err;
       }
     }
   }
 
+#else
+  bool type_init;
+  uint64_t type;
+  bool remote_data;
+  pn_amqp_decode_DQLQq(disp_data, &type_init, &type, &remote_data);
+
+
+  // Do some validation of received first and last values
+  // TODO: We should really also clamp the first value here, but we're not 
keeping track of the earliest
+  // unsettled delivery sequence no
+  last = sequence_lte(last, deliveries->next) ? last : deliveries->next;
+
+  // If there are fewer deliveries in the session than the range then look at 
every delivery in the session
+  // otherwise look at every delivery_id in the disposition performative
+  pn_hash_t *dh = deliveries->deliveries;
+  if (last-first+1 >= pn_hash_size(dh)) {
+    for (pn_handle_t entry = pn_hash_head(dh); entry!=0 ; entry = 
pn_hash_next(dh, entry)) {
+      pn_sequence_t key = pn_hash_key(dh, entry);
+      if (sequence_lte(first, key) && sequence_lte(key, last)) {
+        pn_delivery_t *delivery = (pn_delivery_t*) pn_hash_value(dh, entry);
+        pni_do_delivery_disposition(transport, delivery, settled, remote_data, 
type_init, type, disp_data);
+      }
+    }
+  } else {
+    for (pn_sequence_t id = first; sequence_lte(id, last); ++id) {
+      pn_delivery_t *delivery = pni_delivery_map_get(deliveries, id);
+      if (delivery) {
+        pni_do_delivery_disposition(transport, delivery, settled, remote_data, 
type_init, type, disp_data);
+      }
+    }
+  }
+#endif
   return 0;
 }
 
@@ -1769,11 +1953,16 @@ int pn_do_detach(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel
   uint32_t handle;
   bool closed;
 
+#ifdef GENERATE_CODEC_CODE
   pn_data_t *args = transport->args;
   ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
   if (dsize < 0) return dsize;
   int err = pn_data_scan(args, "D.[Io]", &handle, &closed);
   if (err) return err;
+#else
+  pn_bytes_t error_condition;
+  pn_amqp_decode_DqEIoRe(payload, &handle, &closed, &error_condition);
+#endif
 
   pn_session_t *ssn = pni_channel_state(transport, channel);
   if (!ssn) {
@@ -1784,8 +1973,18 @@ int pn_do_detach(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel
     return pn_do_error(transport, "amqp:invalid-field", "no such handle: %u", 
handle);
   }
 
+#ifdef GENERATE_CODEC_CODE
   err = pn_scan_error(args, &link->endpoint.remote_condition, 
SCAN_ERROR_DETACH);
   if (err) return err;
+#else
+  pn_bytes_t cond;
+  pn_bytes_t desc;
+  pn_condition_t* condition = &link->endpoint.remote_condition;
+  pn_condition_clear(condition);
+  pn_amqp_decode_DqEsSCe(error_condition, &cond, &desc, 
pn_condition_info(condition));
+  pn_condition_set(condition, cond, desc);
+  pn_data_rewind(pn_condition_info(condition));
+#endif
 
   if (closed)
   {
@@ -1806,11 +2005,21 @@ int pn_do_end(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel, p
     return pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", 
channel);
   }
 
+#ifdef GENERATE_CODEC_CODE
   pn_data_t *args = transport->args;
   ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
   if (dsize < 0) return dsize;
   int err = pn_scan_error(args, &ssn->endpoint.remote_condition, 
SCAN_ERROR_DEFAULT);
   if (err) return err;
+#else
+  pn_bytes_t cond;
+  pn_bytes_t desc;
+  pn_condition_t* condition = &ssn->endpoint.remote_condition;
+  pn_condition_clear(condition);
+  pn_amqp_decode_DqEDqEsSCee(payload, &cond, &desc, 
pn_condition_info(condition));
+  pn_condition_set(condition, cond, desc);
+  pn_data_rewind(pn_condition_info(condition));
+#endif
   PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_CLOSED);
   pn_collector_put(transport->connection->collector, PN_OBJECT, ssn, 
PN_SESSION_REMOTE_CLOSE);
   pni_unmap_remote_channel(ssn);
@@ -1821,11 +2030,21 @@ int pn_do_close(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel,
 {
   pn_connection_t *conn = transport->connection;
 
+#ifdef GENERATE_CODEC_CODE
   pn_data_t *args = transport->args;
   ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
   if (dsize < 0) return dsize;
   int err = pn_scan_error(args, &transport->remote_condition, 
SCAN_ERROR_DEFAULT);
   if (err) return err;
+#else
+  pn_bytes_t cond;
+  pn_bytes_t desc;
+  pn_condition_t* condition = &transport->remote_condition;
+  pn_condition_clear(condition);
+  pn_amqp_decode_DqEDqEsSCee(payload, &cond, &desc, 
pn_condition_info(condition));
+  pn_condition_set(condition, cond, desc);
+  pn_data_rewind(pn_condition_info(condition));
+#endif
   transport->close_rcvd = true;
   PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_CLOSED);
   pn_collector_put(transport->connection->collector, PN_OBJECT, conn, 
PN_CONNECTION_REMOTE_CLOSE);
diff --git a/c/src/sasl/sasl.c b/c/src/sasl/sasl.c
index 1e853b8..65571d8 100644
--- a/c/src/sasl/sasl.c
+++ b/c/src/sasl/sasl.c
@@ -24,7 +24,9 @@
 #include "core/autodetect.h"
 #include "core/framing.h"
 #ifndef GENERATE_CODEC_CODE
+#include "core/consumers.h"
 #include "core/frame_generators.h"
+#include "core/frame_consumers.h"
 #endif
 #include "core/engine-internal.h"
 #include "core/util.h"
@@ -922,12 +924,16 @@ int pn_do_init(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel,
   pn_bytes_t mech;
   pn_bytes_t recv;
 
+#ifdef GENERATE_CODEC_CODE
   pn_data_t *args = transport->args;
   ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
   if (dsize < 0) return dsize;
 
   int err = pn_data_scan(args, "D.[sz]", &mech, &recv);
   if (err) return err;
+#else
+  pn_amqp_decode_DqEsze(payload, &mech, &recv);
+#endif
   sasl->selected_mechanism = pn_strndup(mech.start, mech.size);
 
   // We need to filter out a supplied mech in in the inclusion list
@@ -956,10 +962,9 @@ int pn_do_mechanisms(pn_transport_t *transport, uint8_t 
frame_type, uint16_t cha
   // We should only receive this if we are a sasl client
   if (!sasl->client) return PN_ERR;
 
-  // This scanning relies on pn_data_scan leaving the pn_data_t cursors
-  // where they are after finishing the scan
   pn_string_t *mechs = pn_string("");
 
+#ifdef GENERATE_CODEC_CODE
   // Try array of symbols for mechanism list
   bool array = false;
 
@@ -967,6 +972,8 @@ int pn_do_mechanisms(pn_transport_t *transport, uint8_t 
frame_type, uint16_t cha
   ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
   if (dsize < 0) return dsize;
 
+  // This scanning relies on pn_data_scan leaving the pn_data_t cursors
+  // where they are after finishing the scan
   int err = pn_data_scan(args, "D.[?@[", &array);
   if (err) return err;
 
@@ -975,7 +982,7 @@ int pn_do_mechanisms(pn_transport_t *transport, uint8_t 
frame_type, uint16_t cha
     while(pn_data_next(args)) {
       pn_bytes_t s = pn_data_get_symbol(args);
       if (pni_sasl_client_included_mech(sasl->included_mechanisms, s)) {
-        pn_string_addf(mechs, "%*s ", (int)s.size, s.start);
+        pn_string_addf(mechs, "%.*s ", (int)s.size, s.start);
       }
     }
 
@@ -993,6 +1000,50 @@ int pn_do_mechanisms(pn_transport_t *transport, uint8_t 
frame_type, uint16_t cha
       pn_string_setn(mechs, symbol.start, symbol.size);
     }
   }
+#else
+  pn_bytes_t subpayload;
+  pn_amqp_decode_DqERe(payload, &subpayload);
+  pni_consumer_t consumer = make_consumer_from_bytes(subpayload);
+
+  uint8_t element_type;
+  uint32_t element_count;
+  pni_consumer_t subconsumer;
+  if (consume_array(&consumer, &subconsumer, &element_count, &element_type) && 
(element_type==PNE_SYM32 || element_type==PNE_SYM8)) {
+    // If this is an array of symbols decode each symbol
+    pn_bytes_t symbol;
+    switch (element_type) {
+      case PNE_SYM8:
+        while (element_count) {
+          pni_consumer_readv8(&subconsumer, &symbol);
+          if (pni_sasl_client_included_mech(sasl->included_mechanisms, 
symbol)) {
+            pn_string_addf(mechs, "%.*s ", (int)symbol.size, symbol.start);
+          }
+          --element_count;
+        }
+        break;
+      case PNE_SYM32:
+        while (element_count) {
+          pni_consumer_readv32(&subconsumer, &symbol);
+          if (pni_sasl_client_included_mech(sasl->included_mechanisms, 
symbol)) {
+            pn_string_addf(mechs, "%.*s ", (int)symbol.size, symbol.start);
+          }
+          --element_count;
+        }
+        break;
+    }
+    if (pn_string_size(mechs)) {
+      pn_string_buffer(mechs)[pn_string_size(mechs)-1] = 0;
+    }
+  } else {
+    // If not then see if it is a single symbol
+    pn_bytes_t symbol;
+    pn_amqp_decode_DqEse(payload, &symbol);
+
+    if (pni_sasl_client_included_mech(sasl->included_mechanisms, symbol)) {
+      pn_string_setn(mechs, symbol.start, symbol.size);
+    }
+  }
+#endif
 
   if (!(pni_sasl_impl_init_client(transport) &&
         pn_string_size(mechs) &&
@@ -1019,12 +1070,15 @@ int pn_do_challenge(pn_transport_t *transport, uint8_t 
frame_type, uint16_t chan
 
   pn_bytes_t recv;
 
+#ifdef GENERATE_CODEC_CODE
   pn_data_t *args = transport->args;
   ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
   if (dsize < 0) return dsize;
-
   int err = pn_data_scan(args, "D.[z]", &recv);
   if (err) return err;
+#else
+  pn_amqp_decode_DqEze(payload, &recv);
+#endif
 
   pni_sasl_impl_process_challenge(transport, &recv);
 
@@ -1045,12 +1099,15 @@ int pn_do_response(pn_transport_t *transport, uint8_t 
frame_type, uint16_t chann
 
   pn_bytes_t recv;
 
+#ifdef GENERATE_CODEC_CODE
   pn_data_t *args = transport->args;
   ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
   if (dsize < 0) return dsize;
-
   int err = pn_data_scan(args, "D.[z]", &recv);
   if (err) return err;
+#else
+  pn_amqp_decode_DqEze(payload, &recv);
+#endif
 
   pni_sasl_impl_process_response(transport, &recv);
 
@@ -1072,13 +1129,15 @@ int pn_do_outcome(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channe
   uint8_t outcome;
   pn_bytes_t recv;
 
+#ifdef GENERATE_CODEC_CODE
   pn_data_t *args = transport->args;
   ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
   if (dsize < 0) return dsize;
-
   int err = pn_data_scan(args, "D.[Bz]", &outcome, &recv);
   if (err) return err;
-
+#else
+  pn_amqp_decode_DqEBze(payload, &outcome, &recv);
+#endif
   // Preset the outcome to what the server sent us - the plugin can alter this.
   // In practise the plugin processing here should only fail because it fails
   // to authenticate the server id after the server authenticates our user.

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to