This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 8ad74c6cde0217a5984947e19bdd6421a4c8c91b
Author: Andrew Stitcher <[email protected]>
AuthorDate: Fri Jun 11 17:52:18 2021 -0400

    PROTON-2451: Refactor to separate out extracting performative code
    
    ... From rest of performative frame structure
---
 c/src/core/consumers.h        | 61 +++++++++++++++++++++++++++++++++
 c/src/core/dispatch_actions.h | 28 +++++++--------
 c/src/core/dispatcher.c       | 43 +++++++----------------
 c/src/core/dispatcher.h       |  2 +-
 c/src/core/framing.c          | 12 +++++++
 c/src/core/framing.h          |  3 ++
 c/src/core/logger.c           |  6 ++--
 c/src/core/transport.c        | 80 +++++++++++++++++++++++++++++++++----------
 c/src/sasl/sasl.c             | 35 ++++++++++++++++---
 9 files changed, 197 insertions(+), 73 deletions(-)

diff --git a/c/src/core/consumers.h b/c/src/core/consumers.h
index 5bba0c0..d2542b1 100644
--- a/c/src/core/consumers.h
+++ b/c/src/core/consumers.h
@@ -93,6 +93,25 @@ static inline bool pni_consumer_readf32(pni_consumer_t 
*consumer, uint32_t* resu
   return true;
 }
 
+static inline bool pni_consumer_readf64(pni_consumer_t *consumer, uint64_t* 
result)
+{
+  uint32_t a;
+  if (!pni_consumer_readf32(consumer, &a)) return false;
+  uint32_t b;
+  if (!pni_consumer_readf32(consumer, &b)) return false;
+  *result = (uint64_t)a << 32 | (uint64_t)b;
+  return true;
+}
+
+static inline bool pni_consumer_readf128(pni_consumer_t *consumer, void *dst)
+{
+  if (consumer->position+16 > consumer->size) return false;
+
+  memcpy(dst, &consumer->output_start[consumer->position], 16);
+  consumer->position += 16;
+  return true;
+}
+
 static inline bool pni_consumer_read_value_not_described(pni_consumer_t* 
consumer, uint8_t type, pn_bytes_t *value) {
   uint8_t subcategory = type >> 4;
   switch (subcategory) {
@@ -161,4 +180,46 @@ static inline bool 
pni_consumer_read_value_not_described(pni_consumer_t* consume
   return false;
 }
 
+///////////////////////////////////////////////////////////////////////////////
+
+static inline bool consume_expected_ubyte(pni_consumer_t* consumer, uint8_t 
expected)
+{
+    uint8_t e;
+    return pni_consumer_readf8(consumer, &e) && e==expected;
+}
+
+static inline bool consume_ulong(pni_consumer_t* consumer, uint64_t *ulong) {
+  uint8_t type;
+  if (!pni_consumer_readf8(consumer, &type)) return false;
+  switch (type) {
+    case PNE_SMALLULONG: {
+      uint8_t ul;
+      if (!pni_consumer_readf8(consumer, &ul)) return false;
+      *ulong = ul;
+      break;
+    }
+    case PNE_ULONG: {
+      uint64_t ul;
+      if (!pni_consumer_readf64(consumer, &ul)) return false;
+      *ulong = ul;
+      break;
+    }
+    case PNE_ULONG0: {
+      *ulong = 0;
+      break;
+    }
+    default:
+      return false;
+  }
+  return true;
+}
+
+// XXX: assuming numeric -
+// if we get a symbol we should map it to the numeric value and dispatch on 
that
+static inline bool consume_descriptor(pni_consumer_t* consumer, uint64_t 
*descriptor) {
+  return
+    consume_expected_ubyte(consumer, PNE_DESCRIPTOR) &&
+    consume_ulong(consumer, descriptor);
+}
+
 #endif // PROTON_CONSUMERS_H
diff --git a/c/src/core/dispatch_actions.h b/c/src/core/dispatch_actions.h
index 916ff55..5551364 100644
--- a/c/src/core/dispatch_actions.h
+++ b/c/src/core/dispatch_actions.h
@@ -26,21 +26,21 @@
 
 
 /* AMQP actions */
-int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload);
-int pn_do_begin(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload);
-int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload);
-int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload);
-int pn_do_flow(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload);
-int pn_do_disposition(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload);
-int pn_do_detach(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload);
-int pn_do_end(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, 
pn_data_t *args, const pn_bytes_t *payload);
-int pn_do_close(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload);
+int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload);
+int pn_do_begin(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload);
+int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload);
+int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload);
+int pn_do_flow(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload);
+int pn_do_disposition(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload);
+int pn_do_detach(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload);
+int pn_do_end(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, 
pn_bytes_t payload);
+int pn_do_close(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload);
 
 /* SASL actions */
-int pn_do_init(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload);
-int pn_do_mechanisms(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload);
-int pn_do_challenge(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload);
-int pn_do_response(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload);
-int pn_do_outcome(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload);
+int pn_do_init(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload);
+int pn_do_mechanisms(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload);
+int pn_do_challenge(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload);
+int pn_do_response(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload);
+int pn_do_outcome(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload);
 
 #endif // _PROTON_DISPATCH_ACTIONS_H
diff --git a/c/src/core/dispatcher.c b/c/src/core/dispatcher.c
index 7df9ade..97e8c0a 100644
--- a/c/src/core/dispatcher.c
+++ b/c/src/core/dispatcher.c
@@ -21,26 +21,27 @@
 
 #include "dispatcher.h"
 
+#include "consumers.h"
+#include "dispatch_actions.h"
 #include "engine-internal.h"
 #include "framing.h"
 #include "logger_private.h"
 #include "protocol.h"
 
-#include "dispatch_actions.h"
 
-int pni_bad_frame(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload) {
+int pni_bad_frame(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload) {
   PN_LOG(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, "Error 
dispatching frame: type: %d: Unknown performative", frame_type);
   return PN_ERR;
 }
 
-int pni_bad_frame_type(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload) {
+int pni_bad_frame_type(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload) {
   PN_LOG(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, "Error 
dispatching frame: Unknown frame type: %d", frame_type);
   return PN_ERR;
 }
 
 // We could use a table based approach here if we needed to dynamically
 // add new performatives
-static inline int pni_dispatch_action(pn_transport_t* transport, uint64_t 
lcode, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t 
*payload)
+static inline int pni_dispatch_action(pn_transport_t* transport, uint64_t 
lcode, uint8_t frame_type, uint16_t channel, pn_bytes_t frame_payload)
 {
   pn_action_t *action;
   switch (frame_type) {
@@ -72,46 +73,28 @@ static inline int pni_dispatch_action(pn_transport_t* 
transport, uint64_t lcode,
     break;
   default:              action = pni_bad_frame_type; break;
   };
-  return action(transport, frame_type, channel, args, payload);
+  return action(transport, frame_type, channel, frame_payload);
 }
 
-
-static int pni_dispatch_frame(pn_frame_t frame, pn_logger_t *logger, 
pn_transport_t * transport, pn_data_t *args)
+static int pni_dispatch_frame(pn_frame_t frame, pn_logger_t *logger, 
pn_transport_t * transport)
 {
   pn_bytes_t frame_payload = frame.frame_payload0;
 
   if (frame_payload.size == 0) { // ignore null frames
     return 0;
   }
-  ssize_t dsize = pn_data_decode(args, frame_payload.start, 
frame_payload.size);
-  if (dsize < 0) {
-    PN_LOG_MSG_DATA(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, frame_payload,
-                    "Error decoding frame: %s %s\n", pn_code(dsize), 
pn_error_text(pn_data_error(args)));
-    return dsize;
-  }
 
-  uint8_t frame_type = frame.type;
-  uint16_t channel = frame.channel;
-  // XXX: assuming numeric -
-  // if we get a symbol we should map it to the numeric value and dispatch on 
that
   uint64_t lcode;
-  bool scanned;
-  int e = pn_data_scan(args, "D?L.", &scanned, &lcode);
-  if (e) {
-    PN_LOG(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, "Scan error");
-    return e;
-  }
-  if (!scanned) {
+  pni_consumer_t consumer = make_consumer_from_bytes(frame_payload);
+  if (!consume_descriptor(&consumer, &lcode)) {
     PN_LOG(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, "Error dispatching 
frame");
     return PN_ERR;
   }
-  size_t payload_size = frame_payload.size - dsize;
-  const char *payload_mem = payload_size ? frame_payload.start + dsize : NULL;
-  pn_bytes_t payload = {payload_size, payload_mem};
 
-  int err = pni_dispatch_action(transport, lcode, frame_type, channel, args, 
&payload);
+  uint8_t frame_type = frame.type;
+  uint16_t channel = frame.channel;
 
-  pn_data_clear(args);
+  int err = pni_dispatch_action(transport, lcode, frame_type, channel, 
frame_payload);
 
   return err;
 }
@@ -128,7 +111,7 @@ ssize_t pn_dispatcher_input(pn_transport_t *transport, 
const char *bytes, size_t
       read += n;
       available -= n;
       transport->input_frames_ct += 1;
-      int e = pni_dispatch_frame(frame, &transport->logger, transport, 
transport->args);
+      int e = pni_dispatch_frame(frame, &transport->logger, transport);
       if (e) return e;
     } else if (n < 0) {
       pn_do_error(transport, "amqp:connection:framing-error", "malformed 
frame");
diff --git a/c/src/core/dispatcher.h b/c/src/core/dispatcher.h
index 29881b5..2d3cf21 100644
--- a/c/src/core/dispatcher.h
+++ b/c/src/core/dispatcher.h
@@ -29,7 +29,7 @@
 #include "proton/codec.h"
 #include "proton/types.h"
 
-typedef int (pn_action_t)(pn_transport_t *transport, uint8_t frame_type, 
uint16_t channel, pn_data_t *args, const pn_bytes_t *payload);
+typedef int (pn_action_t)(pn_transport_t *transport, uint8_t frame_type, 
uint16_t channel, const pn_bytes_t frame_payload);
 
 ssize_t pn_dispatcher_input(pn_transport_t* transport, const char* bytes, 
size_t available, bool batch, bool* halt);
 ssize_t pn_dispatcher_output(pn_transport_t *transport, char *bytes, size_t 
size);
diff --git a/c/src/core/framing.c b/c/src/core/framing.c
index 79ead83..1f791d5 100644
--- a/c/src/core/framing.c
+++ b/c/src/core/framing.c
@@ -158,3 +158,15 @@ int pn_framing_send_sasl(pn_transport_t *transport, 
pn_bytes_t performative)
   transport->output_frames_ct += 1;
   return 0;
 }
+
+ssize_t pn_framing_recv_amqp(pn_data_t *args, pn_logger_t *logger, const 
pn_bytes_t bytes)
+{
+  pn_data_clear(args);
+  ssize_t dsize = pn_data_decode(args, bytes.start, bytes.size);
+  if (dsize < 0) {
+    PN_LOG_MSG_DATA(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, bytes,
+                    "Error decoding frame: %s %s\n", pn_code(dsize), 
pn_error_text(pn_data_error(args)));
+  }
+
+  return dsize;
+}
diff --git a/c/src/core/framing.h b/c/src/core/framing.h
index 795db83..1c44ed4 100644
--- a/c/src/core/framing.h
+++ b/c/src/core/framing.h
@@ -25,6 +25,7 @@
 #include "buffer.h"
 #include "logger_private.h"
 
+#include "proton/codec.h"
 #include "proton/types.h"
 
 #include <stddef.h>
@@ -51,4 +52,6 @@ int pn_framing_send_amqp(pn_transport_t *transport, uint16_t 
ch, pn_bytes_t perf
 int pn_framing_send_amqp_with_payload(pn_transport_t *transport, uint16_t ch, 
pn_bytes_t performative, pn_bytes_t payload);
 int pn_framing_send_sasl(pn_transport_t *transport, pn_bytes_t performative);
 
+ssize_t pn_framing_recv_amqp(pn_data_t *args, pn_logger_t  *logger, const 
pn_bytes_t frame_payload);
+
 #endif /* framing.h */
diff --git a/c/src/core/logger.c b/c/src/core/logger.c
index 4f9df7e..cc794fd 100644
--- a/c/src/core/logger.c
+++ b/c/src/core/logger.c
@@ -191,11 +191,9 @@ void pni_logger_log_data(pn_logger_t *logger, 
pn_log_subsystem_t subsystem, pn_l
   char buf[256];
   ssize_t n = pn_quote_data(buf, 256, bytes, size);
   if (n >= 0) {
-    pn_logger_logf(logger, subsystem, severity, "%s: %s", msg, buf);
+    pn_logger_logf(logger, subsystem, severity, "%s: \"%s\"", msg, buf);
   } else if (n == PN_OVERFLOW) {
-    pn_logger_logf(logger, subsystem, severity, "%s: %s (truncated)", msg, 
buf);
-  } else {
-    pn_logger_logf(logger, subsystem, severity, "%s: cannot log data: %s", 
msg, pn_code(n));
+    pn_logger_logf(logger, subsystem, severity, "%s: \"%s\"... (truncated)", 
msg, buf);
   }
 }
 
diff --git a/c/src/core/transport.c b/c/src/core/transport.c
index 0e0f62e..de63431 100644
--- a/c/src/core/transport.c
+++ b/c/src/core/transport.c
@@ -1045,9 +1045,8 @@ static char *pn_bytes_strdup(pn_bytes_t str)
   return pn_strndup(str.start, str.size);
 }
 
-int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload)
+int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload)
 {
-  pn_connection_t *conn = transport->connection;
   bool container_q, hostname_q, remote_channel_max_q, remote_max_frame_q;
   uint16_t remote_channel_max;
   uint32_t remote_max_frame;
@@ -1055,6 +1054,10 @@ int pn_do_open(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel,
   pn_data_clear(transport->remote_offered_capabilities);
   pn_data_clear(transport->remote_desired_capabilities);
   pn_data_clear(transport->remote_properties);
+
+  pn_data_t *args = transport->args;
+  ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
+  if (dsize < 0) return dsize;
   int err = pn_data_scan(args, "D.[?S?S?I?HI..CCC]",
                          &container_q, &remote_container,
                          &hostname_q, &remote_hostname,
@@ -1087,6 +1090,7 @@ int pn_do_open(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel,
   pni_mem_deallocate(PN_CLASSCLASS(pn_strdup), transport->remote_hostname);
   transport->remote_hostname = hostname_q ? pn_bytes_strdup(remote_hostname) : 
NULL;
 
+  pn_connection_t *conn = transport->connection;
   if (conn) {
     PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_ACTIVE);
     pni_post_remote_open_events(transport, conn);
@@ -1098,11 +1102,16 @@ int pn_do_open(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel,
   return 0;
 }
 
-int pn_do_begin(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload)
+int pn_do_begin(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload)
 {
+
   bool reply;
   uint16_t remote_channel;
   pn_sequence_t next;
+
+  pn_data_t *args = transport->args;
+  ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
+  if (dsize < 0) return dsize;
   int err = pn_data_scan(args, "D.[?HI]", &reply, &remote_channel, &next);
   if (err) return err;
 
@@ -1206,7 +1215,7 @@ int pn_terminus_set_address_bytes(pn_terminus_t 
*terminus, pn_bytes_t address)
   return pn_string_setn(terminus->address, address.start, address.size);
 }
 
-int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload)
+int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload)
 {
   pn_bytes_t name;
   uint32_t handle;
@@ -1222,6 +1231,10 @@ int pn_do_attach(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel
   uint8_t snd_settle_mode, rcv_settle_mode;
   uint64_t max_msgsz;
   bool has_props;
+
+  pn_data_t *args = transport->args;
+  ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
+  if (dsize < 0) return dsize;
   pn_data_t *rem_props = pn_data(0);
   int err = pn_data_scan(args, "D.[SIo?B?BD.[SIsIo.s]D.[SIsIo]..IL..?C]", 
&name, &handle,
                          &is_sender,
@@ -1231,8 +1244,8 @@ int pn_do_attach(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel
                          &target, &tgt_dr, &tgt_exp, &tgt_timeout, 
&tgt_dynamic,
                          &idc, &max_msgsz, &has_props, rem_props);
   if (err) {
-      pn_free(rem_props);
-      return err;
+    pn_free(rem_props);
+    return err;
   }
   char strbuf[128];      // avoid malloc for most link names
   char *strheap = (name.size >= sizeof(strbuf)) ? (char *) malloc(name.size + 
1) : NULL;
@@ -1242,10 +1255,10 @@ int pn_do_attach(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel
 
   pn_session_t *ssn = pni_channel_state(transport, channel);
   if (!ssn) {
-      pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", 
channel);
-      if (strheap) free(strheap);
-      pn_free(rem_props);
-      return PN_EOS;
+    pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", channel);
+    if (strheap) free(strheap);
+    pn_free(rem_props);
+    return PN_EOS;
   }
   pn_link_t *link = pni_find_link(ssn, name, is_sender);
   if (link && (int32_t)link->state.remote_handle >= 0) {
@@ -1372,7 +1385,7 @@ static void pn_full_settle(pn_delivery_map_t *db, 
pn_delivery_t *delivery)
   pn_decref(delivery);
 }
 
-int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload)
+int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload)
 {
   // XXX: multi transfer
   uint32_t handle;
@@ -1385,10 +1398,17 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t 
frame_type, uint16_t chann
   bool resume, aborted, batchable;
   uint64_t type;
   pn_data_clear(transport->disp_data);
+
+  pn_data_t *args = transport->args;
+  ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
+  if (dsize < 0) return dsize;
   int err = pn_data_scan(args, "D.[I?Iz.?oo.D?LCooo]", &handle, &id_present, 
&id, &tag,
                          &settled_set, &settled, &more, &has_type, &type, 
transport->disp_data,
                          &resume, &aborted, &batchable);
   if (err) return err;
+  payload.size -= dsize;
+  payload.start += dsize;
+
   pn_session_t *ssn = pni_channel_state(transport, channel);
   if (!ssn) {
     return pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", 
channel);
@@ -1458,7 +1478,7 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t 
frame_type, uint16_t chann
   }
 
   if (delivery) {
-    pn_buffer_append(delivery->bytes, payload->start, payload->size);
+    pn_buffer_append(delivery->bytes, payload.start, payload.size);
     if (more) {
       if (!link->more_pending) {
         // First frame of a multi-frame transfer. Remember at link level.
@@ -1488,7 +1508,7 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t 
frame_type, uint16_t chann
     pn_collector_put(transport->connection->collector, PN_OBJECT, delivery, 
PN_DELIVERY);
   }
 
-  ssn->incoming_bytes += payload->size;
+  ssn->incoming_bytes += payload.size;
   ssn->state.incoming_transfer_count++;
   ssn->state.incoming_window--;
 
@@ -1500,12 +1520,16 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t 
frame_type, uint16_t chann
   return 0;
 }
 
-int pn_do_flow(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload)
+int pn_do_flow(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload)
 {
   pn_sequence_t onext, inext, delivery_count;
   uint32_t iwin, owin, link_credit;
   uint32_t handle;
   bool inext_init, handle_init, dcount_init, drain;
+
+  pn_data_t *args = transport->args;
+  ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
+  if (dsize < 0) return dsize;
   int err = pn_data_scan(args, "D.[?IIII?I?II.o]", &inext_init, &inext, &iwin,
                          &onext, &owin, &handle_init, &handle, &dcount_init,
                          &delivery_count, &link_credit, &drain);
@@ -1647,13 +1671,19 @@ static int pni_do_delivery_disposition(pn_transport_t * 
transport, pn_delivery_t
   return 0;
 }
 
-int pn_do_disposition(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload)
+int pn_do_disposition(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload)
 {
   bool role;
   pn_sequence_t first, last;
   uint64_t type = 0;
-  bool last_init, settled, type_init;
+  bool last_init, settled;
+
+  bool type_init;
   pn_data_clear(transport->disp_data);
+
+  pn_data_t *args = transport->args;
+  ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
+  if (dsize < 0) return dsize;
   int err = pn_data_scan(args, "D.[oI?IoD?LC]", &role, &first, &last_init,
                          &last, &settled, &type_init, &type,
                          transport->disp_data);
@@ -1710,10 +1740,14 @@ int pn_do_disposition(pn_transport_t *transport, 
uint8_t frame_type, uint16_t ch
   return 0;
 }
 
-int pn_do_detach(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload)
+int pn_do_detach(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload)
 {
   uint32_t handle;
   bool closed;
+
+  pn_data_t *args = transport->args;
+  ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
+  if (dsize < 0) return dsize;
   int err = pn_data_scan(args, "D.[Io]", &handle, &closed);
   if (err) return err;
 
@@ -1741,12 +1775,16 @@ int pn_do_detach(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel
   return 0;
 }
 
-int pn_do_end(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, 
pn_data_t *args, const pn_bytes_t *payload)
+int pn_do_end(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, 
pn_bytes_t payload)
 {
   pn_session_t *ssn = pni_channel_state(transport, channel);
   if (!ssn) {
     return pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", 
channel);
   }
+
+  pn_data_t *args = transport->args;
+  ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
+  if (dsize < 0) return dsize;
   int err = pn_scan_error(args, &ssn->endpoint.remote_condition, 
SCAN_ERROR_DEFAULT);
   if (err) return err;
   PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_CLOSED);
@@ -1755,9 +1793,13 @@ int pn_do_end(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel, p
   return 0;
 }
 
-int pn_do_close(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload)
+int pn_do_close(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload)
 {
   pn_connection_t *conn = transport->connection;
+
+  pn_data_t *args = transport->args;
+  ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
+  if (dsize < 0) return dsize;
   int err = pn_scan_error(args, &transport->remote_condition, 
SCAN_ERROR_DEFAULT);
   if (err) return err;
   transport->close_rcvd = true;
diff --git a/c/src/sasl/sasl.c b/c/src/sasl/sasl.c
index d9fd79b..80ffad8 100644
--- a/c/src/sasl/sasl.c
+++ b/c/src/sasl/sasl.c
@@ -881,7 +881,7 @@ pn_sasl_outcome_t pn_sasl_outcome(pn_sasl_t *sasl0)
 }
 
 // Received Server side
-int pn_do_init(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload)
+int pn_do_init(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload)
 {
   pni_sasl_t *sasl = transport->sasl;
 
@@ -894,6 +894,11 @@ int pn_do_init(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel,
 
   pn_bytes_t mech;
   pn_bytes_t recv;
+
+  pn_data_t *args = transport->args;
+  ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
+  if (dsize < 0) return dsize;
+
   int err = pn_data_scan(args, "D.[sz]", &mech, &recv);
   if (err) return err;
   sasl->selected_mechanism = pn_strndup(mech.start, mech.size);
@@ -913,7 +918,7 @@ int pn_do_init(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel,
 }
 
 // Received client side
-int pn_do_mechanisms(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload)
+int pn_do_mechanisms(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload)
 {
   pni_sasl_t *sasl = transport->sasl;
 
@@ -930,6 +935,11 @@ int pn_do_mechanisms(pn_transport_t *transport, uint8_t 
frame_type, uint16_t cha
 
   // Try array of symbols for mechanism list
   bool array = false;
+
+  pn_data_t *args = transport->args;
+  ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
+  if (dsize < 0) return dsize;
+
   int err = pn_data_scan(args, "D.[?@[", &array);
   if (err) return err;
 
@@ -969,7 +979,7 @@ int pn_do_mechanisms(pn_transport_t *transport, uint8_t 
frame_type, uint16_t cha
 }
 
 // Received client side
-int pn_do_challenge(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload)
+int pn_do_challenge(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload)
 {
   pni_sasl_t *sasl = transport->sasl;
 
@@ -981,6 +991,11 @@ int pn_do_challenge(pn_transport_t *transport, uint8_t 
frame_type, uint16_t chan
   if (!sasl->client) return PN_ERR;
 
   pn_bytes_t recv;
+
+  pn_data_t *args = transport->args;
+  ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
+  if (dsize < 0) return dsize;
+
   int err = pn_data_scan(args, "D.[z]", &recv);
   if (err) return err;
 
@@ -990,7 +1005,7 @@ int pn_do_challenge(pn_transport_t *transport, uint8_t 
frame_type, uint16_t chan
 }
 
 // Received server side
-int pn_do_response(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload)
+int pn_do_response(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload)
 {
   pni_sasl_t *sasl = transport->sasl;
 
@@ -1002,6 +1017,11 @@ int pn_do_response(pn_transport_t *transport, uint8_t 
frame_type, uint16_t chann
   if (sasl->client) return PN_ERR;
 
   pn_bytes_t recv;
+
+  pn_data_t *args = transport->args;
+  ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
+  if (dsize < 0) return dsize;
+
   int err = pn_data_scan(args, "D.[z]", &recv);
   if (err) return err;
 
@@ -1011,7 +1031,7 @@ int pn_do_response(pn_transport_t *transport, uint8_t 
frame_type, uint16_t chann
 }
 
 // Received client side
-int pn_do_outcome(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload)
+int pn_do_outcome(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_bytes_t payload)
 {
   pni_sasl_t *sasl = transport->sasl;
 
@@ -1024,6 +1044,11 @@ int pn_do_outcome(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channe
 
   uint8_t outcome;
   pn_bytes_t recv;
+
+  pn_data_t *args = transport->args;
+  ssize_t dsize = pn_framing_recv_amqp(args, &transport->logger, payload);
+  if (dsize < 0) return dsize;
+
   int err = pn_data_scan(args, "D.[Bz]", &outcome, &recv);
   if (err) return err;
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to