This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit ace3feea6314c49ffc188fc6732a3f6b67c26e28
Author: Andrew Stitcher <[email protected]>
AuthorDate: Tue Jun 22 15:51:45 2021 -0400

    PROTON-2451: Refactor AMQP frame to allow code generation for fill
---
 c/CMakeLists.txt              |   1 +
 c/src/core/dispatch_actions.h |   3 -
 c/src/core/engine-internal.h  |   2 +-
 c/src/core/framing.c          |  43 ++++++++++++
 c/src/core/framing.h          |   7 ++
 c/src/core/post_frame.c       |  77 ++++++++++++++++++++++
 c/src/core/transport.c        | 149 +++++++++++++-----------------------------
 c/src/sasl/sasl.c             |  24 ++++---
 8 files changed, 188 insertions(+), 118 deletions(-)

diff --git a/c/CMakeLists.txt b/c/CMakeLists.txt
index bba89ef..580f83f 100644
--- a/c/CMakeLists.txt
+++ b/c/CMakeLists.txt
@@ -227,6 +227,7 @@ set (qpid-proton-core
   src/core/types.c
 
   src/core/framing.c
+  src/core/post_frame.c
   src/core/value_dump.c
 
   src/core/codec.c
diff --git a/c/src/core/dispatch_actions.h b/c/src/core/dispatch_actions.h
index ea2d8b2..916ff55 100644
--- a/c/src/core/dispatch_actions.h
+++ b/c/src/core/dispatch_actions.h
@@ -24,9 +24,6 @@
 
 #include "dispatcher.h"
 
-#define AMQP_FRAME_TYPE (0)
-#define SASL_FRAME_TYPE (1)
-
 
 /* AMQP actions */
 int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t 
channel, pn_data_t *args, const pn_bytes_t *payload);
diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h
index 471f597..4313ab4 100644
--- a/c/src/core/engine-internal.h
+++ b/c/src/core/engine-internal.h
@@ -378,7 +378,7 @@ void pn_link_unbound(pn_link_t* link);
 void pn_ep_incref(pn_endpoint_t *endpoint);
 void pn_ep_decref(pn_endpoint_t *endpoint);
 
-int pn_post_frame(pn_transport_t *transport, uint8_t type, uint16_t ch, const 
char *fmt, ...);
+pn_bytes_t pn_fill_performative(pn_transport_t *transport, const char *fmt, 
...);
 
 #if __cplusplus
 }
diff --git a/c/src/core/framing.c b/c/src/core/framing.c
index 547938a..79ead83 100644
--- a/c/src/core/framing.c
+++ b/c/src/core/framing.c
@@ -115,3 +115,46 @@ size_t pn_write_frame(pn_buffer_t* buffer, pn_frame_t 
frame, pn_logger_t *logger
     return 0;
   }
 }
+
+static inline void pn_post_frame(pn_buffer_t *output, pn_logger_t *logger, 
uint8_t type, uint16_t ch, pn_bytes_t performative, pn_bytes_t payload)
+{
+  pn_frame_t frame = {
+    .type = type,
+    .channel = ch,
+    .frame_payload0 = performative,
+    .frame_payload1 = payload
+  };
+  pn_buffer_ensure(output, 
AMQP_HEADER_SIZE+frame.extended.size+frame.frame_payload0.size+frame.frame_payload1.size);
+  pn_write_frame(output, frame, logger);
+}
+
+int pn_framing_send_amqp(pn_transport_t *transport, uint16_t ch, pn_bytes_t 
performative)
+{
+  if (!performative.start)
+    return PN_ERR;
+
+  pn_post_frame(transport->output_buffer, &transport->logger, AMQP_FRAME_TYPE, 
ch, performative, (pn_bytes_t){0, NULL});
+  transport->output_frames_ct += 1;
+  return 0;
+}
+
+int pn_framing_send_amqp_with_payload(pn_transport_t *transport, uint16_t ch, 
pn_bytes_t performative, pn_bytes_t payload)
+{
+  if (!performative.start)
+    return PN_ERR;
+
+  pn_post_frame(transport->output_buffer, &transport->logger, AMQP_FRAME_TYPE, 
ch, performative, payload);
+  transport->output_frames_ct += 1;
+  return 0;
+}
+
+int pn_framing_send_sasl(pn_transport_t *transport, pn_bytes_t performative)
+{
+  if (!performative.start)
+    return PN_ERR;
+
+  // All SASL frames go on channel 0
+  pn_post_frame(transport->output_buffer, &transport->logger, SASL_FRAME_TYPE, 
0, performative, (pn_bytes_t){0, NULL});
+  transport->output_frames_ct += 1;
+  return 0;
+}
diff --git a/c/src/core/framing.h b/c/src/core/framing.h
index c5fcaac..795db83 100644
--- a/c/src/core/framing.h
+++ b/c/src/core/framing.h
@@ -33,6 +33,9 @@
 #define AMQP_MIN_MAX_FRAME_SIZE ((uint32_t)512) // minimum allowable max-frame
 #define AMQP_MAX_WINDOW_SIZE (2147483647)
 
+#define AMQP_FRAME_TYPE (0)
+#define SASL_FRAME_TYPE (1)
+
 typedef struct {
   uint8_t type;
   uint16_t channel;
@@ -44,4 +47,8 @@ typedef struct {
 ssize_t pn_read_frame(pn_frame_t *frame, const char *bytes, size_t available, 
uint32_t max, pn_logger_t *logger);
 size_t pn_write_frame(pn_buffer_t* buffer, pn_frame_t frame, pn_logger_t 
*logger);
 
+int pn_framing_send_amqp(pn_transport_t *transport, uint16_t ch, pn_bytes_t 
performative);
+int pn_framing_send_amqp_with_payload(pn_transport_t *transport, uint16_t ch, 
pn_bytes_t performative, pn_bytes_t payload);
+int pn_framing_send_sasl(pn_transport_t *transport, pn_bytes_t performative);
+
 #endif /* framing.h */
diff --git a/c/src/core/post_frame.c b/c/src/core/post_frame.c
new file mode 100644
index 0000000..5125816
--- /dev/null
+++ b/c/src/core/post_frame.c
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/codec.h"
+#include "proton/logger.h"
+#include "proton/object.h"
+#include "proton/type_compat.h"
+
+#include "buffer.h"
+#include "engine-internal.h"
+#include "framing.h"
+#include "dispatch_actions.h"
+
+static inline struct out {int err; pn_bytes_t bytes;} 
pn_vfill_performative(pn_buffer_t *frame_buf, pn_data_t *output_args, const 
char *fmt, va_list ap)
+{
+  pn_data_clear(output_args);
+  int err = pn_data_vfill(output_args, fmt, ap);
+  if (err) {
+    return (struct out){err, pn_bytes_null};
+  }
+
+encode_performatives:
+  pn_buffer_clear( frame_buf );
+  pn_rwbytes_t buf = pn_buffer_memory( frame_buf );
+  buf.size = pn_buffer_available( frame_buf );
+
+  ssize_t wr = pn_data_encode( output_args, buf.start, buf.size );
+  if (wr < 0) {
+    if (wr == PN_OVERFLOW) {
+      pn_buffer_ensure( frame_buf, pn_buffer_available( frame_buf ) * 2 );
+      goto encode_performatives;
+    }
+    return (struct out){wr, pn_bytes_null};
+  }
+  return (struct out){0, {.size = wr, .start = buf.start}};
+}
+
+static inline void pn_log_fill_error(pn_logger_t *logger, pn_data_t *data, int 
error, const char *fmt) {
+  if (pn_error_code(pn_data_error(data))) {
+    pn_logger_logf(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR,
+                   "error posting frame: %s, %s: %s", fmt, pn_code(error),
+                   pn_error_text(pn_data_error(data)));
+  } else {
+    pn_logger_logf(logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR,
+                   "error posting frame: %s", pn_code(error));
+  }
+}
+
+pn_bytes_t pn_fill_performative(pn_transport_t *transport, const char *fmt, 
...)
+{
+  va_list ap;
+  va_start(ap, fmt);
+  struct out out = pn_vfill_performative(transport->frame, 
transport->output_args, fmt, ap);
+  va_end(ap);
+  if (out.err){
+    pn_log_fill_error(&transport->logger, transport->output_args, out.err, 
fmt);
+  }
+  return out.bytes;
+}
diff --git a/c/src/core/transport.c b/c/src/core/transport.c
index fb1e02e..0e0f62e 100644
--- a/c/src/core/transport.c
+++ b/c/src/core/transport.c
@@ -881,53 +881,10 @@ static int pni_disposition_encode(pn_disposition_t 
*disposition, pn_data_t *data
   }
 }
 
-
-int pn_post_frame(pn_transport_t *transport, uint8_t type, uint16_t ch, const 
char *fmt, ...)
-{
-  pn_buffer_t *frame_buf = transport->frame;
-  va_list ap;
-  va_start(ap, fmt);
-  pn_data_clear(transport->output_args);
-  int err = pn_data_vfill(transport->output_args, fmt, ap);
-  va_end(ap);
-  if (err) {
-    pn_logger_logf(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR,
-                      "error posting frame: %s, %s: %s", fmt, pn_code(err),
-                      pn_error_text(pn_data_error(transport->output_args)));
-    return PN_ERR;
-  }
-
- encode_performatives:
-  pn_buffer_clear( frame_buf );
-  pn_rwbytes_t buf = pn_buffer_memory( frame_buf );
-  buf.size = pn_buffer_available( frame_buf );
-
-  ssize_t wr = pn_data_encode( transport->output_args, buf.start, buf.size );
-  if (wr < 0) {
-    if (wr == PN_OVERFLOW) {
-      pn_buffer_ensure( frame_buf, pn_buffer_available( frame_buf ) * 2 );
-      goto encode_performatives;
-    }
-    pn_logger_logf(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR,
-                      "error posting frame: %s", pn_code(wr));
-    return PN_ERR;
-  }
-
-  pn_frame_t frame = {AMQP_FRAME_TYPE};
-  frame.type = type;
-  frame.channel = ch;
-  frame.frame_payload0 = (pn_bytes_t){.size=wr, .start=buf.start};
-  pn_buffer_ensure(transport->output_buffer, 
AMQP_HEADER_SIZE+frame.extended.size+frame.frame_payload0.size+frame.frame_payload1.size);
-  pn_write_frame(transport->output_buffer, frame, &transport->logger);
-  transport->output_frames_ct += 1;
-
-  return 0;
-}
-
 static int pni_post_amqp_transfer_frame(pn_transport_t *transport, uint16_t ch,
                                         uint32_t handle,
                                         pn_sequence_t id,
-                                        pn_bytes_t *payload,
+                                        pn_bytes_t *full_payload,
                                         const pn_bytes_t *tag,
                                         uint32_t message_format,
                                         bool settled,
@@ -941,13 +898,11 @@ static int pni_post_amqp_transfer_frame(pn_transport_t 
*transport, uint16_t ch,
 {
   bool more_flag = more;
   unsigned framecount = 0;
-  pn_buffer_t *frame = transport->frame;
 
-  // create performatives, assuming 'more' flag need not change
-
- compute_performatives:
-  pn_data_clear(transport->output_args);
-  int err = pn_data_fill(transport->output_args, "DL[IIzI?o?on?DLC?o?o?o]", 
TRANSFER,
+  // create performative, assuming 'more' flag need not change
+ compute_performatives:;
+  pn_bytes_t performative =
+    pn_fill_performative(transport, "DL[IIzI?o?on?DLC?o?o?o]", TRANSFER,
                          handle,
                          id,
                          tag->size, tag->start,
@@ -958,36 +913,19 @@ static int pni_post_amqp_transfer_frame(pn_transport_t 
*transport, uint16_t ch,
                          resume, resume,
                          aborted, aborted,
                          batchable, batchable);
-  if (err) {
-    pn_logger_logf(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR,
-                      "error posting transfer frame: %s: %s", pn_code(err),
-                      pn_error_text(pn_data_error(transport->output_args)));
+  if (!performative.start) {
     return PN_ERR;
   }
 
-  do { // send as many frames as possible without changing the 'more' flag...
+  // At this point the side affect of the fill is to encode the performative 
into transport->frame
 
-  encode_performatives:
-    pn_buffer_clear( frame );
-    pn_rwbytes_t buf = pn_buffer_memory( frame );
-    buf.size = pn_buffer_available( frame );
-
-    ssize_t wr = pn_data_encode(transport->output_args, buf.start, buf.size);
-    if (wr < 0) {
-      if (wr == PN_OVERFLOW) {
-        pn_buffer_ensure( frame, pn_buffer_available( frame ) * 2 );
-        goto encode_performatives;
-      }
-      pn_logger_logf(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_ERROR, 
"error posting frame: %s", pn_code(wr));
-      return PN_ERR;
-    }
-    buf.size = wr;
+  do { // send as many frames as possible without changing the 'more' flag...
 
     // check if we need to break up the outbound frame
-    size_t available = payload->size;
+    size_t available = full_payload->size;
     if (transport->remote_max_frame) {
-      if ((available + buf.size) > transport->remote_max_frame - 8) {
-        available = transport->remote_max_frame - 8 - buf.size;
+      if ((available + performative.size) > transport->remote_max_frame - 
AMQP_HEADER_SIZE) {
+        available = transport->remote_max_frame - AMQP_HEADER_SIZE - 
performative.size;
         if (more_flag == false) {
           more_flag = true;
           goto compute_performatives;  // deal with flag change
@@ -998,20 +936,13 @@ static int pni_post_amqp_transfer_frame(pn_transport_t 
*transport, uint16_t ch,
         goto compute_performatives;
       }
     }
+    pn_bytes_t payload = {.size = available, .start = full_payload->start};
+    pn_framing_send_amqp_with_payload(transport, ch, performative, payload);
 
-    pn_frame_t frame = {AMQP_FRAME_TYPE};
-    frame.channel = ch;
-    frame.frame_payload0 = (pn_bytes_t){.size=buf.size, .start=buf.start};
-    frame.frame_payload1 = (pn_bytes_t){.size=available, 
.start=payload->start};
-
-    payload->start += available;
-    payload->size -= available;
-
-    pn_buffer_ensure(transport->output_buffer, 
AMQP_HEADER_SIZE+frame.extended.size+frame.frame_payload0.size+frame.frame_payload1.size);
-    pn_write_frame(transport->output_buffer, frame, &transport->logger);
-    transport->output_frames_ct += 1;
+    full_payload->start += available;
+    full_payload->size -= available;
     framecount++;
-  } while (payload->size > 0 && framecount < frame_limit);
+  } while (full_payload->size > 0 && framecount < frame_limit);
 
   return framecount;
 }
@@ -1030,8 +961,9 @@ static int pni_post_close(pn_transport_t *transport, 
pn_condition_t *cond)
     info = pn_condition_info(cond);
   }
 
-  return pn_post_frame(transport, AMQP_FRAME_TYPE, 0, "DL[?DL[sSC]]", CLOSE,
+  pn_bytes_t buf = pn_fill_performative(transport, "DL[?DL[sSC]]", CLOSE,
                        (bool) condition, ERROR, condition, description, info);
+  return pn_framing_send_amqp(transport, 0, buf);
 }
 
 static pn_collector_t *pni_transport_collector(pn_transport_t *transport)
@@ -1917,7 +1849,7 @@ static int pni_process_conn_setup(pn_transport_t 
*transport, pn_endpoint_t *endp
       pn_connection_t *connection = (pn_connection_t *) endpoint;
       const char *cid = pn_string_get(connection->container);
       pni_calculate_channel_max(transport);
-      int err = pn_post_frame(transport, AMQP_FRAME_TYPE, 0, 
"DL[SS?I?H?InnMMC]", OPEN,
+      pn_bytes_t buf = pn_fill_performative(transport, "DL[SS?I?H?InnMMC]", 
OPEN,
                               cid ? cid : "",
                               pn_string_get(connection->hostname),
                               // TODO: This is messy, because we also have to 
allow local_max_frame_ to be 0 to mean unlimited
@@ -1929,6 +1861,7 @@ static int pni_process_conn_setup(pn_transport_t 
*transport, pn_endpoint_t *endp
                               connection->offered_capabilities,
                               connection->desired_capabilities,
                               connection->properties);
+      int err = pn_framing_send_amqp(transport, 0, buf);
       if (err) return err;
       transport->open_sent = true;
     }
@@ -1968,7 +1901,7 @@ static size_t pni_session_incoming_window(pn_session_t 
*ssn)
     pn_condition_format(
       pn_transport_condition(t),
       "amqp:internal-error",
-      "session capacity %" PN_ZU " is less than frame size %" PRIu32,
+      "session capacity %zu is less than frame size %" PRIu32,
       capacity, size);
     pn_transport_close_tail(t);
     return 0;
@@ -2004,11 +1937,12 @@ static int pni_process_ssn_setup(pn_transport_t 
*transport, pn_endpoint_t *endpo
       }
       state->incoming_window = pni_session_incoming_window(ssn);
       state->outgoing_window = pni_session_outgoing_window(ssn);
-      pn_post_frame(transport, AMQP_FRAME_TYPE, state->local_channel, 
"DL[?HIII]", BEGIN,
+      pn_bytes_t buf = pn_fill_performative(transport, "DL[?HIII]", BEGIN,
                     ((int16_t) state->remote_channel >= 0), 
state->remote_channel,
                     state->outgoing_transfer_count,
                     state->incoming_window,
                     state->outgoing_window);
+      pn_framing_send_amqp(transport, state->local_channel, buf);
     }
   }
 
@@ -2060,8 +1994,7 @@ static int pni_process_link_setup(pn_transport_t 
*transport, pn_endpoint_t *endp
       pni_map_local_handle(link);
       const pn_distribution_mode_t dist_mode = (pn_distribution_mode_t) 
link->source.distribution_mode;
       if (link->target.type == PN_COORDINATOR) {
-        int err = pn_post_frame(transport, AMQP_FRAME_TYPE, 
ssn_state->local_channel,
-                                "DL[SIoBB?DL[SIsIoC?sCnCC]DL[C]nnI]", ATTACH,
+        pn_bytes_t buf = pn_fill_performative(transport, 
"DL[SIoBB?DL[SIsIoC?sCnCC]DL[C]nnI]", ATTACH,
                                 pn_string_get(link->name),
                                 state->local_handle,
                                 endpoint->type == RECEIVER,
@@ -2080,10 +2013,10 @@ static int pni_process_link_setup(pn_transport_t 
*transport, pn_endpoint_t *endp
                                 link->source.capabilities,
                                 COORDINATOR, link->target.capabilities,
                                 0);
+        int err = pn_framing_send_amqp(transport, ssn_state->local_channel, 
buf);
         if (err) return err;
       } else {
-        int err = pn_post_frame(transport, AMQP_FRAME_TYPE, 
ssn_state->local_channel,
-                                
"DL[SIoBB?DL[SIsIoC?sCnMM]?DL[SIsIoCM]nnILnnC]", ATTACH,
+        pn_bytes_t buf = pn_fill_performative(transport, 
"DL[SIoBB?DL[SIsIoC?sCnMM]?DL[SIsIoCM]nnILnnC]", ATTACH,
                                 pn_string_get(link->name),
                                 state->local_handle,
                                 endpoint->type == RECEIVER,
@@ -2114,6 +2047,7 @@ static int pni_process_link_setup(pn_transport_t 
*transport, pn_endpoint_t *endp
                                 0,
                                 link->max_message_size,
                                 link->properties);
+        int err = pn_framing_send_amqp(transport, ssn_state->local_channel, 
buf);
         if (err) return err;
       }
     }
@@ -2128,7 +2062,7 @@ static int pni_post_flow(pn_transport_t *transport, 
pn_session_t *ssn, pn_link_t
   ssn->state.outgoing_window = pni_session_outgoing_window(ssn);
   bool linkq = (bool) link;
   pn_link_state_t *state = &link->state;
-  return pn_post_frame(transport, AMQP_FRAME_TYPE, ssn->state.local_channel, 
"DL[?IIII?I?I?In?o]", FLOW,
+  pn_bytes_t buf = pn_fill_performative(transport, "DL[?IIII?I?I?In?o]", FLOW,
                        (int16_t) ssn->state.remote_channel >= 0, 
ssn->state.incoming_transfer_count,
                        ssn->state.incoming_window,
                        ssn->state.outgoing_transfer_count,
@@ -2137,6 +2071,7 @@ static int pni_post_flow(pn_transport_t *transport, 
pn_session_t *ssn, pn_link_t
                        linkq, linkq ? state->delivery_count : 0,
                        linkq, linkq ? state->link_credit : 0,
                        linkq, linkq ? link->drain : false);
+  return pn_framing_send_amqp(transport, ssn->state.local_channel, buf);
 }
 
 static int pni_process_flow_receiver(pn_transport_t *transport, pn_endpoint_t 
*endpoint)
@@ -2162,12 +2097,13 @@ static int pni_flush_disp(pn_transport_t *transport, 
pn_session_t *ssn)
   uint64_t code = ssn->state.disp_code;
   bool settled = ssn->state.disp_settled;
   if (ssn->state.disp) {
-    int err = pn_post_frame(transport, AMQP_FRAME_TYPE, 
ssn->state.local_channel, "DL[oI?I?o?DL[]]", DISPOSITION,
+    pn_bytes_t buf = pn_fill_performative(transport, "DL[oI?I?o?DL[]]", 
DISPOSITION,
                             ssn->state.disp_type,
                             ssn->state.disp_first,
                             ssn->state.disp_last!=ssn->state.disp_first, 
ssn->state.disp_last,
                             settled, settled,
                             (bool)code, code);
+    int err = pn_framing_send_amqp(transport, ssn->state.local_channel, buf);
     if (err) return err;
     ssn->state.disp_type = 0;
     ssn->state.disp_code = 0;
@@ -2197,11 +2133,11 @@ static int pni_post_disp(pn_transport_t *transport, 
pn_delivery_t *delivery)
   if (!pni_disposition_batchable(&delivery->local)) {
     pn_data_clear(transport->disp_data);
     PN_RETURN_IF_ERROR(pni_disposition_encode(&delivery->local, 
transport->disp_data));
-    return pn_post_frame(transport, AMQP_FRAME_TYPE, ssn->state.local_channel,
-      "DL[oIn?o?DLC]", DISPOSITION,
+    pn_bytes_t buf = pn_fill_performative(transport, "DL[oIn?o?DLC]", 
DISPOSITION,
       role, state->id,
       delivery->local.settled, delivery->local.settled,
       (bool)code, code, transport->disp_data);
+    return pn_framing_send_amqp(transport, ssn->state.local_channel, buf);
   }
 
   if (ssn_state->disp && code == ssn_state->disp_code &&
@@ -2435,11 +2371,11 @@ static int pni_process_link_teardown(pn_transport_t 
*transport, pn_endpoint_t *e
         info = pn_condition_info(&endpoint->condition);
       }
 
-      int err =
-          pn_post_frame(transport, AMQP_FRAME_TYPE, ssn_state->local_channel,
-                        "DL[I?o?DL[sSC]]", DETACH, state->local_handle,
-                        !link->detached, !link->detached,
-                        (bool)name, ERROR, name, description, info);
+      pn_bytes_t buf = pn_fill_performative(transport, "DL[I?o?DL[sSC]]", 
DETACH,
+                                            state->local_handle,
+                                            !link->detached, !link->detached,
+                                            (bool)name, ERROR, name, 
description, info);
+      int err = pn_framing_send_amqp(transport, ssn_state->local_channel, buf);
       if (err) return err;
       pni_unmap_local_handle(link);
     }
@@ -2511,8 +2447,9 @@ static int pni_process_ssn_teardown(pn_transport_t 
*transport, pn_endpoint_t *en
         info = pn_condition_info(&endpoint->condition);
       }
 
-      int err = pn_post_frame(transport, AMQP_FRAME_TYPE, 
state->local_channel, "DL[?DL[sSC]]", END,
+      pn_bytes_t buf = pn_fill_performative(transport, "DL[?DL[sSC]]", END,
                               (bool) name, ERROR, name, description, info);
+      int err = pn_framing_send_amqp(transport, state->local_channel, buf);
       if (err) return err;
       pni_unmap_local_channel(session);
     }
@@ -2586,7 +2523,8 @@ static void pn_error_amqp(pn_transport_t* transport, 
unsigned int layer)
 {
   if (!transport->close_sent) {
     if (!transport->open_sent) {
-      pn_post_frame(transport, AMQP_FRAME_TYPE, 0, "DL[S]", OPEN, "");
+      pn_bytes_t buf = pn_fill_performative(transport, "DL[S]", OPEN, "");
+      pn_framing_send_amqp(transport, 0, buf);
     }
 
     pni_post_close(transport, &transport->condition);
@@ -2683,7 +2621,8 @@ static int64_t pn_tick_amqp(pn_transport_t* transport, 
unsigned int layer, int64
       transport->keepalive_deadline = now + 
(pn_timestamp_t)(transport->remote_idle_timeout/2.0);
       if (pn_buffer_size(transport->output_buffer) == 0) {    // no outbound 
data pending
         // so send empty frame (and account for it!)
-        pn_post_frame(transport, AMQP_FRAME_TYPE, 0, "");
+        pn_bytes_t buf = pn_bytes(0,"");
+        pn_framing_send_amqp(transport, 0, buf);
         transport->last_bytes_output += 
pn_buffer_size(transport->output_buffer);
       }
     }
diff --git a/c/src/sasl/sasl.c b/c/src/sasl/sasl.c
index dfb4530..d9fd79b 100644
--- a/c/src/sasl/sasl.c
+++ b/c/src/sasl/sasl.c
@@ -22,7 +22,7 @@
 #include "sasl-internal.h"
 
 #include "core/autodetect.h"
-#include "core/dispatch_actions.h"
+#include "core/framing.h"
 #include "core/engine-internal.h"
 #include "core/util.h"
 #include "platform/platform_fmt.h"
@@ -484,11 +484,12 @@ static void pni_post_sasl_frame(pn_transport_t *transport)
   enum pnx_sasl_state desired_state = sasl->desired_state;
   while (sasl->desired_state > sasl->last_state) {
     switch (desired_state) {
-    case SASL_POSTED_INIT:
-      pn_post_frame(transport, SASL_FRAME_TYPE, 0, "DL[szS]", SASL_INIT, 
sasl->selected_mechanism,
-                    out.size, out.start, sasl->local_fqdn);
+    case SASL_POSTED_INIT: {
+      pn_bytes_t buf = pn_fill_performative(transport, "DL[szS]", SASL_INIT, 
sasl->selected_mechanism, out.size, out.start, sasl->local_fqdn);
+      pn_framing_send_sasl(transport, buf);
       pni_emit(transport);
       break;
+    }
     case SASL_POSTED_MECHANISMS: {
       // TODO(PROTON-2122) Replace magic number 32 with dynamically sized 
memory
       char *mechs[32];
@@ -499,14 +500,16 @@ static void pni_post_sasl_frame(pn_transport_t *transport)
         pni_split_mechs(mechlist, sasl->included_mechanisms, mechs, &count);
       }
 
-      pn_post_frame(transport, SASL_FRAME_TYPE, 0, "DL[@T[*s]]", 
SASL_MECHANISMS, PN_SYMBOL, count, mechs);
+      pn_bytes_t buf = pn_fill_performative(transport, "DL[@T[*s]]", 
SASL_MECHANISMS, PN_SYMBOL, count, mechs);
       free(mechlist);
+      pn_framing_send_sasl(transport, buf);
       pni_emit(transport);
       break;
     }
     case SASL_POSTED_RESPONSE:
       if (sasl->last_state != SASL_POSTED_RESPONSE) {
-        pn_post_frame(transport, SASL_FRAME_TYPE, 0, "DL[Z]", SASL_RESPONSE, 
out.size, out.start);
+        pn_bytes_t buf = pn_fill_performative(transport, "DL[Z]", 
SASL_RESPONSE, out.size, out.start);
+        pn_framing_send_sasl(transport, buf);
         pni_emit(transport);
       }
       break;
@@ -515,16 +518,18 @@ static void pni_post_sasl_frame(pn_transport_t *transport)
         desired_state = SASL_POSTED_MECHANISMS;
         continue;
       } else if (sasl->last_state != SASL_POSTED_CHALLENGE) {
-        pn_post_frame(transport, SASL_FRAME_TYPE, 0, "DL[Z]", SASL_CHALLENGE, 
out.size, out.start);
+        pn_bytes_t buf = pn_fill_performative(transport, "DL[Z]", 
SASL_CHALLENGE, out.size, out.start);
+        pn_framing_send_sasl(transport, buf);
         pni_emit(transport);
       }
       break;
-    case SASL_POSTED_OUTCOME:
+    case SASL_POSTED_OUTCOME: {
       if (sasl->last_state < SASL_POSTED_MECHANISMS) {
         desired_state = SASL_POSTED_MECHANISMS;
         continue;
       }
-      pn_post_frame(transport, SASL_FRAME_TYPE, 0, "DL[Bz]", SASL_OUTCOME, 
sasl->outcome, out.size, out.start);
+      pn_bytes_t buf = pn_fill_performative(transport, "DL[Bz]", SASL_OUTCOME, 
sasl->outcome, out.size, out.start);
+      pn_framing_send_sasl(transport, buf);
       pni_emit(transport);
       if (sasl->outcome!=PN_SASL_OK) {
         pn_do_error(transport, "amqp:unauthorized-access", "Failed to 
authenticate client [mech=%s]",
@@ -532,6 +537,7 @@ static void pni_post_sasl_frame(pn_transport_t *transport)
         desired_state = SASL_ERROR;
       }
       break;
+    }
     case SASL_RECVED_SUCCESS:
       if (sasl->last_state < SASL_POSTED_INIT) {
         desired_state = SASL_POSTED_INIT;

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to