Repository: qpid-proton
Updated Branches:
  refs/heads/master a0e7cced9 -> d042c4ed0


PROTON-770: Eliminate pn_dispatcher_t:
- Moved all useful state from pn_dispatcher_t to pn_transport_t
- Only left input frame dispatching in dispatcher.[ch]


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/117ac209
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/117ac209
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/117ac209

Branch: refs/heads/master
Commit: 117ac209e084d44b565212745d5c7e478df9c9d0
Parents: d8e99db
Author: Andrew Stitcher <[email protected]>
Authored: Wed Dec 10 18:23:52 2014 -0500
Committer: Andrew Stitcher <[email protected]>
Committed: Wed Dec 10 18:23:52 2014 -0500

----------------------------------------------------------------------
 proton-c/src/dispatcher/dispatcher.c  | 270 +++--------------------------
 proton-c/src/dispatcher/dispatcher.h  |  41 +----
 proton-c/src/engine/engine-internal.h |  21 ++-
 proton-c/src/sasl/sasl.c              |  37 ++--
 proton-c/src/transport/transport.c    | 270 +++++++++++++++++++++++++----
 5 files changed, 299 insertions(+), 340 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/117ac209/proton-c/src/dispatcher/dispatcher.c
----------------------------------------------------------------------
diff --git a/proton-c/src/dispatcher/dispatcher.c 
b/proton-c/src/dispatcher/dispatcher.c
index 3b96a67..6485198 100644
--- a/proton-c/src/dispatcher/dispatcher.c
+++ b/proton-c/src/dispatcher/dispatcher.c
@@ -19,11 +19,10 @@
  *
  */
 
-#include "framing/framing.h"
 #include "dispatcher.h"
+
+#include "framing/framing.h"
 #include "protocol.h"
-#include "util.h"
-#include "platform_fmt.h"
 #include "engine/engine-internal.h"
 
 #include "dispatch_actions.h"
@@ -75,81 +74,21 @@ static inline int pni_dispatch_action(pn_transport_t* 
transport, uint64_t lcode,
   return action(transport, frame_type, channel, args, payload);
 }
 
-pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, pn_transport_t *transport)
-{
-  pn_dispatcher_t *disp = (pn_dispatcher_t *) calloc(sizeof(pn_dispatcher_t), 
1);
-
-  disp->frame_type = frame_type;
-  disp->transport = transport;
-
-  disp->args = pn_data(16);
-
-  disp->output_args = pn_data(16);
-  disp->frame = pn_buffer( 4*1024 );
-  // XXX
-  disp->capacity = 4*1024;
-  disp->output = (char *) malloc(disp->capacity);
-  disp->available = 0;
-
-  disp->halt = false;
-  disp->batch = true;
-
-  disp->scratch = pn_string(NULL);
-
-  return disp;
-}
-
-void pn_dispatcher_free(pn_dispatcher_t *disp)
-{
-  if (disp) {
-    pn_data_free(disp->args);
-    pn_data_free(disp->output_args);
-    pn_buffer_free(disp->frame);
-    free(disp->output);
-    pn_free(disp->scratch);
-    free(disp);
-  }
-}
-
-typedef enum {IN, OUT} pn_dir_t;
-
-static void pn_do_trace(pn_dispatcher_t *disp, uint16_t ch, pn_dir_t dir,
-                        pn_data_t *args, const char *payload, size_t size)
-{
-  if (disp->transport->trace & PN_TRACE_FRM) {
-    pn_string_format(disp->scratch, "%u %s ", ch, dir == OUT ? "->" : "<-");
-    pn_inspect(args, disp->scratch);
-
-    if (pn_data_size(args)==0) {
-        pn_string_addf(disp->scratch, "(EMPTY FRAME)");
-    }
-
-    if (size) {
-      char buf[1024];
-      int e = pn_quote_data(buf, 1024, payload, size);
-      pn_string_addf(disp->scratch, " (%" PN_ZU ") \"%s\"%s", size, buf,
-                     e == PN_OVERFLOW ? "... (truncated)" : "");
-    }
-
-    pn_transport_log(disp->transport, pn_string_get(disp->scratch));
-  }
-}
-
-static int pni_dispatch_frame(pn_dispatcher_t *disp, pn_data_t *args, 
pn_frame_t frame)
+static int pni_dispatch_frame(pn_transport_t * transport, pn_data_t *args, 
pn_frame_t frame)
 {
   if (frame.size == 0) { // ignore null frames
-    if (disp->transport->trace & PN_TRACE_FRM)
-      pn_transport_logf(disp->transport, "%u <- (EMPTY FRAME)\n", 
frame.channel);
+    if (transport->trace & PN_TRACE_FRM)
+      pn_transport_logf(transport, "%u <- (EMPTY FRAME)\n", frame.channel);
     return 0;
   }
 
   ssize_t dsize = pn_data_decode(args, frame.payload, frame.size);
   if (dsize < 0) {
-    pn_string_format(disp->scratch,
+    pn_string_format(transport->scratch,
                      "Error decoding frame: %s %s\n", pn_code(dsize),
                      pn_error_text(pn_data_error(args)));
-    pn_quote(disp->scratch, frame.payload, frame.size);
-    pn_transport_log(disp->transport, pn_string_get(disp->scratch));
+    pn_quote(transport->scratch, frame.payload, frame.size);
+    pn_transport_log(transport, pn_string_get(transport->scratch));
     return dsize;
   }
 
@@ -161,217 +100,56 @@ static int pni_dispatch_frame(pn_dispatcher_t *disp, 
pn_data_t *args, pn_frame_t
   bool scanned;
   int e = pn_data_scan(args, "D?L.", &scanned, &lcode);
   if (e) {
-    pn_transport_log(disp->transport, "Scan error");
+    pn_transport_log(transport, "Scan error");
     return e;
   }
   if (!scanned) {
-    pn_transport_log(disp->transport, "Error dispatching frame");
+    pn_transport_log(transport, "Error dispatching frame");
     return PN_ERR;
   }
   size_t payload_size = frame.size - dsize;
   const char *payload_mem = payload_size ? frame.payload + dsize : NULL;
   pn_bytes_t payload = {payload_size, payload_mem};
 
-  pn_do_trace(disp, channel, IN, args, payload_mem, payload_size);
+  pn_do_trace(transport, channel, IN, args, payload_mem, payload_size);
 
-  int err = pni_dispatch_action(disp->transport, lcode, frame_type, channel, 
args, &payload);
+  int err = pni_dispatch_action(transport, lcode, frame_type, channel, args, 
&payload);
 
   pn_data_clear(args);
 
   return err;
 }
 
-ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, const char *bytes, size_t 
available)
+ssize_t pn_dispatcher_input(pn_transport_t *transport, const char *bytes, 
size_t available, bool batch, bool *halt)
 {
   size_t read = 0;
 
-  while (available && !disp->halt) {
+  while (available && !*halt) {
     pn_frame_t frame;
 
     size_t n = pn_read_frame(&frame, bytes + read, available);
     if (n) {
       read += n;
       available -= n;
-      disp->input_frames_ct += 1;
-      int e = pni_dispatch_frame(disp, disp->args, frame);
+      transport->input_frames_ct += 1;
+      int e = pni_dispatch_frame(transport, transport->args, frame);
       if (e) return e;
     } else {
       break;
     }
 
-    if (!disp->batch) break;
+    if (!batch) break;
   }
 
   return read;
 }
 
-void pn_set_payload(pn_dispatcher_t *disp, const char *data, size_t size)
+ssize_t pn_dispatcher_output(pn_transport_t *transport, char *bytes, size_t 
size)
 {
-  disp->output_payload = data;
-  disp->output_size = size;
-}
-
-int pn_post_frame(pn_dispatcher_t *disp, uint16_t ch, const char *fmt, ...)
-{
-  va_list ap;
-  va_start(ap, fmt);
-  pn_data_clear(disp->output_args);
-  int err = pn_data_vfill(disp->output_args, fmt, ap);
-  va_end(ap);
-  if (err) {
-    pn_transport_logf(disp->transport,
-                      "error posting frame: %s, %s: %s", fmt, pn_code(err),
-                      pn_error_text(pn_data_error(disp->output_args)));
-    return PN_ERR;
-  }
-
-  pn_do_trace(disp, ch, OUT, disp->output_args, disp->output_payload, 
disp->output_size);
-
- encode_performatives:
-  pn_buffer_clear( disp->frame );
-  pn_buffer_memory_t buf = pn_buffer_memory( disp->frame );
-  buf.size = pn_buffer_available( disp->frame );
-
-  ssize_t wr = pn_data_encode( disp->output_args, buf.start, buf.size );
-  if (wr < 0) {
-    if (wr == PN_OVERFLOW) {
-      pn_buffer_ensure( disp->frame, pn_buffer_available( disp->frame ) * 2 );
-      goto encode_performatives;
-    }
-    pn_transport_logf(disp->transport,
-                      "error posting frame: %s", pn_code(wr));
-    return PN_ERR;
-  }
-
-  pn_frame_t frame = {disp->frame_type};
-  frame.channel = ch;
-  frame.payload = buf.start;
-  frame.size = wr;
-  size_t n;
-  while (!(n = pn_write_frame(disp->output + disp->available,
-                              disp->capacity - disp->available, frame))) {
-    disp->capacity *= 2;
-    disp->output = (char *) realloc(disp->output, disp->capacity);
-  }
-  disp->output_frames_ct += 1;
-  if (disp->transport->trace & PN_TRACE_RAW) {
-    pn_string_set(disp->scratch, "RAW: \"");
-    pn_quote(disp->scratch, disp->output + disp->available, n);
-    pn_string_addf(disp->scratch, "\"");
-    pn_transport_log(disp->transport, pn_string_get(disp->scratch));
-  }
-  disp->available += n;
-
-  return 0;
-}
-
-ssize_t pn_dispatcher_output(pn_dispatcher_t *disp, char *bytes, size_t size)
-{
-  int n = disp->available < size ? disp->available : size;
-  memmove(bytes, disp->output, n);
-  memmove(disp->output, disp->output + n, disp->available - n);
-  disp->available -= n;
-  // XXX: need to check for errors
-  return n;
-}
-
-
-int pn_post_transfer_frame(pn_dispatcher_t *disp, uint16_t ch,
-                           uint32_t handle,
-                           pn_sequence_t id,
-                           const pn_bytes_t *tag,
-                           uint32_t message_format,
-                           bool settled,
-                           bool more,
-                           pn_sequence_t frame_limit)
-{
-  bool more_flag = more;
-  int framecount = 0;
-
-  // create preformatives, assuming 'more' flag need not change
-
- compute_performatives:
-  pn_data_clear(disp->output_args);
-  int err = pn_data_fill(disp->output_args, "DL[IIzIoo]", TRANSFER,
-                         handle, id, tag->size, tag->start,
-                         message_format,
-                         settled, more_flag);
-  if (err) {
-    pn_transport_logf(disp->transport,
-                      "error posting transfer frame: %s: %s", pn_code(err),
-                      pn_error_text(pn_data_error(disp->output_args)));
-    return PN_ERR;
-  }
-
-  do { // send as many frames as possible without changing the 'more' flag...
-
-  encode_performatives:
-    pn_buffer_clear( disp->frame );
-    pn_buffer_memory_t buf = pn_buffer_memory( disp->frame );
-    buf.size = pn_buffer_available( disp->frame );
-
-    ssize_t wr = pn_data_encode(disp->output_args, buf.start, buf.size);
-    if (wr < 0) {
-      if (wr == PN_OVERFLOW) {
-        pn_buffer_ensure( disp->frame, pn_buffer_available( disp->frame ) * 2 
);
-        goto encode_performatives;
-      }
-      pn_transport_logf(disp->transport, "error posting frame: %s", 
pn_code(wr));
-      return PN_ERR;
-    }
-    buf.size = wr;
-
-    // check if we need to break up the outbound frame
-    size_t available = disp->output_size;
-    if (disp->remote_max_frame) {
-      if ((available + buf.size) > disp->remote_max_frame - 8) {
-        available = disp->remote_max_frame - 8 - buf.size;
-        if (more_flag == false) {
-          more_flag = true;
-          goto compute_performatives;  // deal with flag change
-        }
-      } else if (more_flag == true && more == false) {
-        // caller has no more, and this is the last frame
-        more_flag = false;
-        goto compute_performatives;
-      }
-    }
-
-    if (pn_buffer_available( disp->frame ) < (available + buf.size)) {
-      // not enough room for payload - try again...
-      pn_buffer_ensure( disp->frame, available + buf.size );
-      goto encode_performatives;
-    }
-
-    pn_do_trace(disp, ch, OUT, disp->output_args, disp->output_payload, 
available);
-
-    memmove( buf.start + buf.size, disp->output_payload, available);
-    disp->output_payload += available;
-    disp->output_size -= available;
-    buf.size += available;
-
-    pn_frame_t frame = {disp->frame_type};
-    frame.channel = ch;
-    frame.payload = buf.start;
-    frame.size = buf.size;
-
-    size_t n;
-    while (!(n = pn_write_frame(disp->output + disp->available,
-                                disp->capacity - disp->available, frame))) {
-      disp->capacity *= 2;
-      disp->output = (char *) realloc(disp->output, disp->capacity);
-    }
-    disp->output_frames_ct += 1;
-    framecount++;
-    if (disp->transport->trace & PN_TRACE_RAW) {
-      pn_string_set(disp->scratch, "RAW: \"");
-      pn_quote(disp->scratch, disp->output + disp->available, n);
-      pn_string_addf(disp->scratch, "\"");
-      pn_transport_log(disp->transport, pn_string_get(disp->scratch));
-    }
-    disp->available += n;
-  } while (disp->output_size > 0 && framecount < frame_limit);
-
-  disp->output_payload = NULL;
-  return framecount;
+    int n = transport->available < size ? transport->available : size;
+    memmove(bytes, transport->output, n);
+    memmove(transport->output, transport->output + n, transport->available - 
n);
+    transport->available -= n;
+    // XXX: need to check for errors
+    return n;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/117ac209/proton-c/src/dispatcher/dispatcher.h
----------------------------------------------------------------------
diff --git a/proton-c/src/dispatcher/dispatcher.h 
b/proton-c/src/dispatcher/dispatcher.h
index b9bfa2b..752a71e 100644
--- a/proton-c/src/dispatcher/dispatcher.h
+++ b/proton-c/src/dispatcher/dispatcher.h
@@ -27,45 +27,12 @@
 #include <stdbool.h>
 #endif
 
-#include "proton/transport.h"
-#include "buffer.h"
-
-typedef struct pn_dispatcher_t pn_dispatcher_t;
+#include "proton/codec.h"
+#include "proton/types.h"
 
 typedef int (pn_action_t)(pn_transport_t *transport, uint8_t frame_type, 
uint16_t channel, pn_data_t *args, const pn_bytes_t *payload);
 
-struct pn_dispatcher_t {
-  pn_data_t *args;
-  pn_data_t *output_args;
-  const char *output_payload;
-  size_t output_size;
-  size_t remote_max_frame;
-  pn_buffer_t *frame;  // frame under construction
-  size_t capacity;
-  size_t available; /* number of raw bytes pending output */
-  char *output;
-  pn_transport_t *transport; // TODO: We keep this to get access to logging - 
perhaps move logging
-  uint64_t output_frames_ct;
-  uint64_t input_frames_ct;
-  pn_string_t *scratch;
-  uint8_t frame_type; // Used when constructing outgoing frames
-  bool halt;
-  bool batch;
-};
+ssize_t pn_dispatcher_input(pn_transport_t* transport, const char* bytes, 
size_t available, bool batch, bool* halt);
+ssize_t pn_dispatcher_output(pn_transport_t *transport, char *bytes, size_t 
size);
 
-pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, pn_transport_t *transport);
-void pn_dispatcher_free(pn_dispatcher_t *disp);
-void pn_set_payload(pn_dispatcher_t *disp, const char *data, size_t size);
-int pn_post_frame(pn_dispatcher_t *disp, uint16_t ch, const char *fmt, ...);
-ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, const char *bytes, size_t 
available);
-ssize_t pn_dispatcher_output(pn_dispatcher_t *disp, char *bytes, size_t size);
-int pn_post_transfer_frame(pn_dispatcher_t *disp,
-                           uint16_t local_channel,
-                           uint32_t handle,
-                           pn_sequence_t delivery_id,
-                           const pn_bytes_t *delivery_tag,
-                           uint32_t message_format,
-                           bool settled,
-                           bool more,
-                           pn_sequence_t frame_limit);
 #endif /* dispatcher.h */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/117ac209/proton-c/src/engine/engine-internal.h
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/engine-internal.h 
b/proton-c/src/engine/engine-internal.h
index 4ee9a69..bd9f952 100644
--- a/proton-c/src/engine/engine-internal.h
+++ b/proton-c/src/engine/engine-internal.h
@@ -120,7 +120,6 @@ struct pn_transport_t {
   pni_sasl_t *sasl;
   pni_ssl_t *ssl;
   pn_connection_t *connection;  // reference counted
-  pn_dispatcher_t *disp;
   char *remote_container;
   char *remote_hostname;
   pn_data_t *remote_offered_capabilities;
@@ -150,11 +149,23 @@ struct pn_transport_t {
 
   pn_hash_t *local_channels;
   pn_hash_t *remote_channels;
+
+
+  /* scratch area */
   pn_string_t *scratch;
+  pn_data_t *args;
+  pn_data_t *output_args;
+  pn_buffer_t *frame;  // frame under construction
+  // Temporary
+  size_t capacity;
+  size_t available; /* number of raw bytes pending output */
+  char *output;
 
   /* statistics */
   uint64_t bytes_input;
   uint64_t bytes_output;
+  uint64_t output_frames_ct;
+  uint64_t input_frames_ct;
 
   /* output buffered for send */
   size_t output_size;
@@ -182,6 +193,7 @@ struct pn_transport_t {
   bool done_processing; // if true, don't call pn_process again
   bool posted_idle_timeout;
   bool server;
+  bool halt;
 };
 
 struct pn_connection_t {
@@ -318,4 +330,11 @@ int pn_do_error(pn_transport_t *transport, const char 
*condition, const char *fm
 void pn_session_unbound(pn_session_t* ssn);
 void pn_link_unbound(pn_link_t* link);
 
+int pn_post_frame(pn_transport_t *transport, uint8_t type, uint16_t ch, const 
char *fmt, ...);
+
+typedef enum {IN, OUT} pn_dir_t;
+
+void pn_do_trace(pn_transport_t *transport, uint16_t ch, pn_dir_t dir,
+                 pn_data_t *args, const char *payload, size_t size);
+
 #endif /* engine-internal.h */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/117ac209/proton-c/src/sasl/sasl.c
----------------------------------------------------------------------
diff --git a/proton-c/src/sasl/sasl.c b/proton-c/src/sasl/sasl.c
index 5a174e2..5e68233 100644
--- a/proton-c/src/sasl/sasl.c
+++ b/proton-c/src/sasl/sasl.c
@@ -37,7 +37,6 @@
 
 
 struct pni_sasl_t {
-  pn_dispatcher_t *disp;
   char *mechanisms;
   char *remote_mechanisms;
   pn_buffer_t *send_data;
@@ -49,6 +48,7 @@ struct pni_sasl_t {
   bool rcvd_init;
   bool sent_done;
   bool rcvd_done;
+  bool halt;
   bool input_bypass;
   bool output_bypass;
 };
@@ -102,8 +102,6 @@ pn_sasl_t *pn_sasl(pn_transport_t *transport)
 {
   if (!transport->sasl) {
     pni_sasl_t *sasl = (pni_sasl_t *) malloc(sizeof(pni_sasl_t));
-    sasl->disp = pn_dispatcher(1, transport);
-    sasl->disp->batch = false;
 
     sasl->client = !transport->server;
     sasl->mechanisms = NULL;
@@ -118,6 +116,7 @@ pn_sasl_t *pn_sasl(pn_transport_t *transport)
     sasl->rcvd_done = false;
     sasl->input_bypass = false;
     sasl->output_bypass = false;
+    sasl->halt = false;
 
     transport->sasl = sasl;
   }
@@ -274,22 +273,23 @@ void pn_sasl_free(pn_transport_t *transport)
       free(sasl->remote_mechanisms);
       pn_buffer_free(sasl->send_data);
       pn_buffer_free(sasl->recv_data);
-      pn_dispatcher_free(sasl->disp);
       free(sasl);
     }
   }
 }
 
-void pn_client_init(pni_sasl_t *sasl)
+void pn_client_init(pn_transport_t *transport)
 {
+  pni_sasl_t *sasl = transport->sasl;
   pn_buffer_memory_t bytes = pn_buffer_memory(sasl->send_data);
-  pn_post_frame(sasl->disp, 0, "DL[sz]", SASL_INIT, sasl->mechanisms,
+  pn_post_frame(transport, SASL_FRAME_TYPE, 0, "DL[sz]", SASL_INIT, 
sasl->mechanisms,
                 bytes.size, bytes.start);
   pn_buffer_clear(sasl->send_data);
 }
 
-void pn_server_init(pni_sasl_t *sasl)
+void pn_server_init(pn_transport_t *transport)
 {
+  pni_sasl_t *sasl = transport->sasl;
   // XXX
   char *mechs[16];
   int count = 0;
@@ -316,13 +316,14 @@ void pn_server_init(pni_sasl_t *sasl)
     }
   }
 
-  pn_post_frame(sasl->disp, 0, "DL[@T[*s]]", SASL_MECHANISMS, PN_SYMBOL, 
count, mechs);
+  pn_post_frame(transport, SASL_FRAME_TYPE, 0, "DL[@T[*s]]", SASL_MECHANISMS, 
PN_SYMBOL, count, mechs);
 }
 
 void pn_server_done(pn_sasl_t *sasl0)
 {
-  pni_sasl_t *sasl = get_sasl_internal(sasl0);
-  pn_post_frame(sasl->disp, 0, "DL[B]", SASL_OUTCOME, sasl->outcome);
+  pn_transport_t *transport = get_transport_internal(sasl0);
+  pni_sasl_t *sasl = transport->sasl;
+  pn_post_frame(transport, SASL_FRAME_TYPE, 0, "DL[B]", SASL_OUTCOME, 
sasl->outcome);
 }
 
 void pn_sasl_process(pn_transport_t *transport)
@@ -330,16 +331,16 @@ void pn_sasl_process(pn_transport_t *transport)
   pni_sasl_t *sasl = transport->sasl;
   if (!sasl->sent_init) {
     if (sasl->client) {
-      pn_client_init(sasl);
+      pn_client_init(transport);
     } else {
-      pn_server_init(sasl);
+      pn_server_init(transport);
     }
     sasl->sent_init = true;
   }
 
   if (pn_buffer_size(sasl->send_data)) {
     pn_buffer_memory_t bytes = pn_buffer_memory(sasl->send_data);
-    pn_post_frame(sasl->disp, 0, "DL[z]", sasl->client ? SASL_RESPONSE : 
SASL_CHALLENGE,
+    pn_post_frame(transport, SASL_FRAME_TYPE, 0, "DL[z]", sasl->client ? 
SASL_RESPONSE : SASL_CHALLENGE,
                   bytes.size, bytes.start);
     pn_buffer_clear(sasl->send_data);
   }
@@ -355,14 +356,14 @@ void pn_sasl_process(pn_transport_t *transport)
   //      or challenges) from client
   if (!sasl->client && sasl->sent_done && sasl->rcvd_init) {
     sasl->rcvd_done = true;
-    sasl->disp->halt = true;
+    sasl->halt = true;
   }
 }
 
 ssize_t pn_sasl_input(pn_transport_t *transport, const char *bytes, size_t 
available)
 {
   pni_sasl_t *sasl = transport->sasl;
-  ssize_t n = pn_dispatcher_input(sasl->disp, bytes, available);
+  ssize_t n = pn_dispatcher_input(transport, bytes, available, false, 
&sasl->halt);
   if (n < 0) return n;
 
   pn_sasl_process(transport);
@@ -388,7 +389,7 @@ ssize_t pn_sasl_output(pn_transport_t *transport, char 
*bytes, size_t size)
   pn_sasl_process(transport);
 
   pni_sasl_t *sasl = transport->sasl;
-  if (sasl->disp->available == 0 && sasl->sent_done) {
+  if (transport->available == 0 && sasl->sent_done) {
     if (pn_sasl_state((pn_sasl_t *)transport) == PN_SASL_PASS) {
       return PN_EOS;
     } else {
@@ -396,7 +397,7 @@ ssize_t pn_sasl_output(pn_transport_t *transport, char 
*bytes, size_t size)
       return PN_ERR;
     }
   } else {
-    return pn_dispatcher_output(sasl->disp, bytes, size);
+    return pn_dispatcher_output(transport, bytes, size);
   }
 }
 
@@ -449,7 +450,7 @@ int pn_do_outcome(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channe
   sasl->outcome = (pn_sasl_outcome_t) outcome;
   sasl->rcvd_done = true;
   sasl->sent_done = true;
-  sasl->disp->halt = true;
+  sasl->halt = true;
   return 0;
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/117ac209/proton-c/src/transport/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/transport/transport.c 
b/proton-c/src/transport/transport.c
index 097d863..9c48def 100644
--- a/proton-c/src/transport/transport.c
+++ b/proton-c/src/transport/transport.c
@@ -305,8 +305,12 @@ static void pn_transport_initialize(void *object)
   transport->tracer = pni_default_tracer;
   transport->sasl = NULL;
   transport->ssl = NULL;
+
   transport->scratch = pn_string(NULL);
-  transport->disp = pn_dispatcher(0, transport);
+  transport->args = pn_data(16);
+  transport->output_args = pn_data(16);
+  transport->frame = pn_buffer(4*1024);
+
   transport->connection = NULL;
   transport->context = pn_record();
 
@@ -357,6 +361,7 @@ static void pn_transport_initialize(void *object)
   transport->posted_idle_timeout = false;
 
   transport->server = false;
+  transport->halt = false;
 
   transport->trace = (pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : 
PN_TRACE_OFF) |
     (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF) |
@@ -413,6 +418,15 @@ pn_transport_t *pn_transport(void)
     pn_transport_free(transport);
     return NULL;
   }
+
+  transport->capacity = 4*1024;
+  transport->available = 0;
+  transport->output = (char *) malloc(transport->capacity);
+  if (!transport->output) {
+    pn_transport_free(transport);
+    return NULL;
+  }
+
   return transport;
 }
 
@@ -439,7 +453,6 @@ static void pn_transport_finalize(void *object)
   pn_free(transport->context);
   pn_ssl_free(transport);
   pn_sasl_free(transport);
-  pn_dispatcher_free(transport->disp);
   free(transport->remote_container);
   free(transport->remote_hostname);
   pn_free(transport->remote_offered_capabilities);
@@ -454,6 +467,10 @@ static void pn_transport_finalize(void *object)
   if (transport->input_buf) free(transport->input_buf);
   if (transport->output_buf) free(transport->output_buf);
   pn_free(transport->scratch);
+  pn_data_free(transport->args);
+  pn_data_free(transport->output_args);
+  pn_buffer_free(transport->frame);
+  free(transport->output);
 }
 
 int pn_transport_bind(pn_transport_t *transport, pn_connection_t *connection)
@@ -474,7 +491,7 @@ int pn_transport_bind(pn_transport_t *transport, 
pn_connection_t *connection)
   if (transport->open_rcvd) {
     PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE);
     pn_collector_put(connection->collector, PN_OBJECT, connection, 
PN_CONNECTION_REMOTE_OPEN);
-    transport->disp->halt = false;
+    transport->halt = false;
     transport_consume(transport);        // blech - testBindAfterOpen
   }
 
@@ -623,6 +640,185 @@ void pni_disposition_encode(pn_disposition_t 
*disposition, pn_data_t *data)
   }
 }
 
+
+void pn_do_trace(pn_transport_t *transport, uint16_t ch, pn_dir_t dir,
+                 pn_data_t *args, const char *payload, size_t size)
+{
+  if (transport->trace & PN_TRACE_FRM) {
+    pn_string_format(transport->scratch, "%u %s ", ch, dir == OUT ? "->" : 
"<-");
+    pn_inspect(args, transport->scratch);
+
+    if (pn_data_size(args)==0) {
+        pn_string_addf(transport->scratch, "(EMPTY FRAME)");
+    }
+
+    if (size) {
+      char buf[1024];
+      int e = pn_quote_data(buf, 1024, payload, size);
+      pn_string_addf(transport->scratch, " (%" PN_ZU ") \"%s\"%s", size, buf,
+                     e == PN_OVERFLOW ? "... (truncated)" : "");
+    }
+
+    pn_transport_log(transport, pn_string_get(transport->scratch));
+  }
+}
+
+int pn_post_frame(pn_transport_t *transport, uint8_t type, uint16_t ch, const 
char *fmt, ...)
+{
+  pn_buffer_t *frame_buf = transport->frame;
+  va_list ap;
+  va_start(ap, fmt);
+  pn_data_clear(transport->output_args);
+  int err = pn_data_vfill(transport->output_args, fmt, ap);
+  va_end(ap);
+  if (err) {
+    pn_transport_logf(transport,
+                      "error posting frame: %s, %s: %s", fmt, pn_code(err),
+                      pn_error_text(pn_data_error(transport->output_args)));
+    return PN_ERR;
+  }
+
+  pn_do_trace(transport, ch, OUT, transport->output_args, NULL, 0);
+
+ encode_performatives:
+  pn_buffer_clear( frame_buf );
+  pn_buffer_memory_t buf = pn_buffer_memory( frame_buf );
+  buf.size = pn_buffer_available( frame_buf );
+
+  ssize_t wr = pn_data_encode( transport->output_args, buf.start, buf.size );
+  if (wr < 0) {
+    if (wr == PN_OVERFLOW) {
+      pn_buffer_ensure( frame_buf, pn_buffer_available( frame_buf ) * 2 );
+      goto encode_performatives;
+    }
+    pn_transport_logf(transport,
+                      "error posting frame: %s", pn_code(wr));
+    return PN_ERR;
+  }
+
+  pn_frame_t frame = {type};
+  frame.channel = ch;
+  frame.payload = buf.start;
+  frame.size = wr;
+  size_t n;
+  while (!(n = pn_write_frame(transport->output + transport->available,
+                              transport->capacity - transport->available, 
frame))) {
+    transport->capacity *= 2;
+    transport->output = (char *) realloc(transport->output, 
transport->capacity);
+  }
+  transport->output_frames_ct += 1;
+  if (transport->trace & PN_TRACE_RAW) {
+    pn_string_set(transport->scratch, "RAW: \"");
+    pn_quote(transport->scratch, transport->output + transport->available, n);
+    pn_string_addf(transport->scratch, "\"");
+    pn_transport_log(transport, pn_string_get(transport->scratch));
+  }
+  transport->available += n;
+
+  return 0;
+}
+
+int pn_post_amqp_transfer_frame(pn_transport_t *transport, uint16_t ch,
+                                uint32_t handle,
+                                pn_sequence_t id,
+                                pn_bytes_t *payload,
+                                const pn_bytes_t *tag,
+                                uint32_t message_format,
+                                bool settled,
+                                bool more,
+                                pn_sequence_t frame_limit)
+{
+  bool more_flag = more;
+  int framecount = 0;
+  pn_buffer_t *frame = transport->frame;
+
+  // create preformatives, assuming 'more' flag need not change
+
+ compute_performatives:
+  pn_data_clear(transport->output_args);
+  int err = pn_data_fill(transport->output_args, "DL[IIzIoo]", TRANSFER,
+                         handle, id, tag->size, tag->start,
+                         message_format,
+                         settled, more_flag);
+  if (err) {
+    pn_transport_logf(transport,
+                      "error posting transfer frame: %s: %s", pn_code(err),
+                      pn_error_text(pn_data_error(transport->output_args)));
+    return PN_ERR;
+  }
+
+  do { // send as many frames as possible without changing the 'more' flag...
+
+  encode_performatives:
+    pn_buffer_clear( frame );
+    pn_buffer_memory_t buf = pn_buffer_memory( frame );
+    buf.size = pn_buffer_available( frame );
+
+    ssize_t wr = pn_data_encode(transport->output_args, buf.start, buf.size);
+    if (wr < 0) {
+      if (wr == PN_OVERFLOW) {
+        pn_buffer_ensure( frame, pn_buffer_available( frame ) * 2 );
+        goto encode_performatives;
+      }
+      pn_transport_logf(transport, "error posting frame: %s", pn_code(wr));
+      return PN_ERR;
+    }
+    buf.size = wr;
+
+    // check if we need to break up the outbound frame
+    size_t available = payload->size;
+    if (transport->remote_max_frame) {
+      if ((available + buf.size) > transport->remote_max_frame - 8) {
+        available = transport->remote_max_frame - 8 - buf.size;
+        if (more_flag == false) {
+          more_flag = true;
+          goto compute_performatives;  // deal with flag change
+        }
+      } else if (more_flag == true && more == false) {
+        // caller has no more, and this is the last frame
+        more_flag = false;
+        goto compute_performatives;
+      }
+    }
+
+    if (pn_buffer_available( frame ) < (available + buf.size)) {
+      // not enough room for payload - try again...
+      pn_buffer_ensure( frame, available + buf.size );
+      goto encode_performatives;
+    }
+
+    pn_do_trace(transport, ch, OUT, transport->output_args, payload->start, 
available);
+
+    memmove( buf.start + buf.size, payload->start, available);
+    payload->start += available;
+    payload->size -= available;
+    buf.size += available;
+
+    pn_frame_t frame = {AMQP_FRAME_TYPE};
+    frame.channel = ch;
+    frame.payload = buf.start;
+    frame.size = buf.size;
+
+    size_t n;
+    while (!(n = pn_write_frame(transport->output + transport->available,
+                                transport->capacity - transport->available, 
frame))) {
+      transport->capacity *= 2;
+      transport->output = (char *) realloc(transport->output, 
transport->capacity);
+    }
+    transport->output_frames_ct += 1;
+    framecount++;
+    if (transport->trace & PN_TRACE_RAW) {
+      pn_string_set(transport->scratch, "RAW: \"");
+      pn_quote(transport->scratch, transport->output + transport->available, 
n);
+      pn_string_addf(transport->scratch, "\"");
+      pn_transport_log(transport, pn_string_get(transport->scratch));
+    }
+    transport->available += n;
+  } while (payload->size > 0 && framecount < frame_limit);
+
+  return framecount;
+}
+
 int pn_post_close(pn_transport_t *transport, const char *condition, const char 
*description)
 {
   pn_condition_t *cond = NULL;
@@ -636,7 +832,7 @@ int pn_post_close(pn_transport_t *transport, const char 
*condition, const char *
     info = pn_condition_info(cond);
   }
 
-  return pn_post_frame(transport->disp, 0, "DL[?DL[sSC]]", CLOSE,
+  return pn_post_frame(transport, AMQP_FRAME_TYPE, 0, "DL[?DL[sSC]]", CLOSE,
                        (bool) condition, ERROR, condition, description, info);
 }
 
@@ -677,13 +873,13 @@ int pn_do_error(pn_transport_t *transport, const char 
*condition, const char *fm
   va_end(ap);
   if (!transport->close_sent) {
     if (!transport->open_sent) {
-      pn_post_frame(transport->disp, 0, "DL[S]", OPEN, "");
+      pn_post_frame(transport, AMQP_FRAME_TYPE, 0, "DL[S]", OPEN, "");
     }
 
     pn_post_close(transport, condition, buf);
     transport->close_sent = true;
   }
-  transport->disp->halt = true;
+  transport->halt = true;
   pn_condition_set_name(&transport->condition, condition);
   pn_condition_set_description(&transport->condition, buf);
   pn_collector_t *collector = pni_transport_collector(transport);
@@ -722,8 +918,6 @@ int pn_do_open(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel,
                         transport->remote_max_frame, AMQP_MIN_MAX_FRAME_SIZE);
       transport->remote_max_frame = AMQP_MIN_MAX_FRAME_SIZE;
     }
-    transport->disp->remote_max_frame = transport->remote_max_frame;
-    pn_buffer_clear( transport->disp->frame );
   }
   if (container_q) {
     transport->remote_container = pn_bytes_strdup(remote_container);
@@ -740,7 +934,7 @@ int pn_do_open(pn_transport_t *transport, uint8_t 
frame_type, uint16_t channel,
     PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_ACTIVE);
     pn_collector_put(conn->collector, PN_OBJECT, conn, 
PN_CONNECTION_REMOTE_OPEN);
   } else {
-    transport->disp->halt = true;
+    transport->halt = true;
   }
   transport->open_rcvd = true;
   return 0;
@@ -1325,7 +1519,7 @@ static ssize_t pn_input_read_amqp(pn_transport_t* 
transport, unsigned int layer,
   }
 
 
-  ssize_t n = pn_dispatcher_input(transport->disp, bytes, available);
+  ssize_t n = pn_dispatcher_input(transport, bytes, available, true, 
&transport->halt);
   if (n < 0) {
     //return pn_error_set(transport->error, n, "dispatch error");
     return PN_EOS;
@@ -1365,10 +1559,10 @@ static pn_timestamp_t pn_tick_amqp(pn_transport_t* 
transport, unsigned int layer
       transport->last_bytes_output = transport->bytes_output;
     } else if (transport->keepalive_deadline <= now) {
       transport->keepalive_deadline = now + 
(pn_timestamp_t)(transport->remote_idle_timeout/2.0);
-      if (transport->disp->available == 0) {    // no outbound data pending
+      if (transport->available == 0) {    // no outbound data pending
         // so send empty frame (and account for it!)
-        pn_post_frame(transport->disp, 0, "");
-        transport->last_bytes_output += transport->disp->available;
+        pn_post_frame(transport, AMQP_FRAME_TYPE, 0, "");
+        transport->last_bytes_output += transport->available;
       }
     }
     timeout = pn_timestamp_min( timeout, transport->keepalive_deadline );
@@ -1390,7 +1584,7 @@ int pn_process_conn_setup(pn_transport_t *transport, 
pn_endpoint_t *endpoint)
           : 0;
       pn_connection_t *connection = (pn_connection_t *) endpoint;
       const char *cid = pn_string_get(connection->container);
-      int err = pn_post_frame(transport->disp, 0, "DL[SS?I?H?InnCCC]", OPEN,
+      int err = pn_post_frame(transport, AMQP_FRAME_TYPE, 0, 
"DL[SS?I?H?InnCCC]", OPEN,
                               cid ? cid : "",
                               pn_string_get(connection->hostname),
                               // if not zero, advertise our max frame size and 
idle timeout
@@ -1464,7 +1658,7 @@ int pn_process_ssn_setup(pn_transport_t *transport, 
pn_endpoint_t *endpoint)
       pni_map_local_channel(ssn);
       state->incoming_window = pn_session_incoming_window(ssn);
       state->outgoing_window = pn_session_outgoing_window(ssn);
-      pn_post_frame(transport->disp, state->local_channel, "DL[?HIII]", BEGIN,
+      pn_post_frame(transport, AMQP_FRAME_TYPE, state->local_channel, 
"DL[?HIII]", BEGIN,
                     ((int16_t) state->remote_channel >= 0), 
state->remote_channel,
                     state->outgoing_transfer_count,
                     state->incoming_window,
@@ -1512,7 +1706,7 @@ int pn_process_link_setup(pn_transport_t *transport, 
pn_endpoint_t *endpoint)
       pni_map_local_handle(link);
       const pn_distribution_mode_t dist_mode = link->source.distribution_mode;
       if (link->target.type == PN_COORDINATOR) {
-        int err = pn_post_frame(transport->disp, ssn_state->local_channel,
+        int err = pn_post_frame(transport, AMQP_FRAME_TYPE, 
ssn_state->local_channel,
                                 "DL[SIoBB?DL[SIsIoC?sCnCC]DL[C]nnI]", ATTACH,
                                 pn_string_get(link->name),
                                 state->local_handle,
@@ -1534,7 +1728,7 @@ int pn_process_link_setup(pn_transport_t *transport, 
pn_endpoint_t *endpoint)
                                 0);
         if (err) return err;
       } else {
-        int err = pn_post_frame(transport->disp, ssn_state->local_channel,
+        int err = pn_post_frame(transport, AMQP_FRAME_TYPE, 
ssn_state->local_channel,
                                 "DL[SIoBB?DL[SIsIoC?sCnCC]?DL[SIsIoCC]nnI]", 
ATTACH,
                                 pn_string_get(link->name),
                                 state->local_handle,
@@ -1575,7 +1769,7 @@ int pn_post_flow(pn_transport_t *transport, pn_session_t 
*ssn, pn_link_t *link)
   ssn->state.outgoing_window = pn_session_outgoing_window(ssn);
   bool linkq = (bool) link;
   pn_link_state_t *state = &link->state;
-  return pn_post_frame(transport->disp, ssn->state.local_channel, 
"DL[?IIII?I?I?In?o]", FLOW,
+  return pn_post_frame(transport, AMQP_FRAME_TYPE, ssn->state.local_channel, 
"DL[?IIII?I?I?In?o]", FLOW,
                        (int16_t) ssn->state.remote_channel >= 0, 
ssn->state.incoming_transfer_count,
                        ssn->state.incoming_window,
                        ssn->state.outgoing_transfer_count,
@@ -1609,7 +1803,7 @@ int pn_flush_disp(pn_transport_t *transport, pn_session_t 
*ssn)
   uint64_t code = ssn->state.disp_code;
   bool settled = ssn->state.disp_settled;
   if (ssn->state.disp) {
-    int err = pn_post_frame(transport->disp, ssn->state.local_channel, 
"DL[oIIo?DL[]]", DISPOSITION,
+    int err = pn_post_frame(transport, AMQP_FRAME_TYPE, 
ssn->state.local_channel, "DL[oIIo?DL[]]", DISPOSITION,
                             ssn->state.disp_type, ssn->state.disp_first, 
ssn->state.disp_last,
                             settled, (bool)code, code);
     if (err) return err;
@@ -1641,7 +1835,7 @@ int pn_post_disp(pn_transport_t *transport, pn_delivery_t 
*delivery)
   if (!pni_disposition_batchable(&delivery->local)) {
     pn_data_clear(transport->disp_data);
     pni_disposition_encode(&delivery->local, transport->disp_data);
-    return pn_post_frame(transport->disp, ssn->state.local_channel,
+    return pn_post_frame(transport, AMQP_FRAME_TYPE, ssn->state.local_channel,
                          "DL[oIIo?DLC]", DISPOSITION,
                          role, state->id, state->id, delivery->local.settled,
                          (bool)code, code, transport->disp_data);
@@ -1690,22 +1884,22 @@ int pn_process_tpwork_sender(pn_transport_t *transport, 
pn_delivery_t *delivery,
       }
 
       pn_bytes_t bytes = pn_buffer_bytes(delivery->bytes);
-      pn_set_payload(transport->disp, bytes.start, bytes.size);
+      size_t full_size = bytes.size;
       pn_bytes_t tag = pn_buffer_bytes(delivery->tag);
-      int count = pn_post_transfer_frame(transport->disp,
-                                         ssn_state->local_channel,
-                                         link_state->local_handle,
-                                         state->id, &tag,
-                                         0, // message-format
-                                         delivery->local.settled,
-                                         !delivery->done,
-                                         ssn_state->remote_incoming_window);
+      int count = pn_post_amqp_transfer_frame(transport,
+                                              ssn_state->local_channel,
+                                              link_state->local_handle,
+                                              state->id, &bytes, &tag,
+                                              0, // message-format
+                                              delivery->local.settled,
+                                              !delivery->done,
+                                              
ssn_state->remote_incoming_window);
       if (count < 0) return count;
       xfr_posted = true;
       ssn_state->outgoing_transfer_count += count;
       ssn_state->remote_incoming_window -= count;
 
-      int sent = bytes.size - transport->disp->output_size;
+      int sent = full_size - bytes.size;
       pn_buffer_trim(delivery->bytes, sent, 0);
       link->session->outgoing_bytes -= sent;
       if (!pn_buffer_size(delivery->bytes) && delivery->done) {
@@ -1860,7 +2054,7 @@ int pn_process_link_teardown(pn_transport_t *transport, 
pn_endpoint_t *endpoint)
       }
 
       int err =
-          pn_post_frame(transport->disp, ssn_state->local_channel,
+          pn_post_frame(transport, AMQP_FRAME_TYPE, ssn_state->local_channel,
                         "DL[Io?DL[sSC]]", DETACH, state->local_handle, 
!link->detached,
                         (bool)name, ERROR, name, description, info);
       if (err) return err;
@@ -1931,7 +2125,7 @@ int pn_process_ssn_teardown(pn_transport_t *transport, 
pn_endpoint_t *endpoint)
         info = pn_condition_info(&endpoint->condition);
       }
 
-      int err = pn_post_frame(transport->disp, state->local_channel, 
"DL[?DL[sSC]]", END,
+      int err = pn_post_frame(transport, AMQP_FRAME_TYPE, 
state->local_channel, "DL[?DL[sSC]]", END,
                               (bool) name, ERROR, name, description, info);
       if (err) return err;
       pni_unmap_local_channel(session);
@@ -2029,11 +2223,11 @@ static ssize_t pn_output_write_amqp(pn_transport_t* 
transport, unsigned int laye
   // write out any buffered data _before_ returning PN_EOS, else we
   // could truncate an outgoing Close frame containing a useful error
   // status
-  if (!transport->disp->available && transport->close_sent) {
+  if (!transport->available && transport->close_sent) {
     return PN_EOS;
   }
 
-  return pn_dispatcher_output(transport->disp, bytes, available);
+  return pn_dispatcher_output(transport, bytes, available);
 }
 
 static void pni_close_head(pn_transport_t *transport)
@@ -2233,15 +2427,15 @@ pn_timestamp_t pn_transport_tick(pn_transport_t 
*transport, pn_timestamp_t now)
 
 uint64_t pn_transport_get_frames_output(const pn_transport_t *transport)
 {
-  if (transport && transport->disp)
-    return transport->disp->output_frames_ct;
+  if (transport)
+    return transport->output_frames_ct;
   return 0;
 }
 
 uint64_t pn_transport_get_frames_input(const pn_transport_t *transport)
 {
-  if (transport && transport->disp)
-    return transport->disp->input_frames_ct;
+  if (transport)
+    return transport->input_frames_ct;
   return 0;
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to