Repository: qpid-proton Updated Branches: refs/heads/master a0e7cced9 -> d042c4ed0
PROTON-770: Eliminate pn_dispatcher_t: - Moved all useful state from pn_dispatcher_t to pn_transport_t - Only left input frame dispatching in dispatcher.[ch] Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/117ac209 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/117ac209 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/117ac209 Branch: refs/heads/master Commit: 117ac209e084d44b565212745d5c7e478df9c9d0 Parents: d8e99db Author: Andrew Stitcher <[email protected]> Authored: Wed Dec 10 18:23:52 2014 -0500 Committer: Andrew Stitcher <[email protected]> Committed: Wed Dec 10 18:23:52 2014 -0500 ---------------------------------------------------------------------- proton-c/src/dispatcher/dispatcher.c | 270 +++-------------------------- proton-c/src/dispatcher/dispatcher.h | 41 +---- proton-c/src/engine/engine-internal.h | 21 ++- proton-c/src/sasl/sasl.c | 37 ++-- proton-c/src/transport/transport.c | 270 +++++++++++++++++++++++++---- 5 files changed, 299 insertions(+), 340 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/117ac209/proton-c/src/dispatcher/dispatcher.c ---------------------------------------------------------------------- diff --git a/proton-c/src/dispatcher/dispatcher.c b/proton-c/src/dispatcher/dispatcher.c index 3b96a67..6485198 100644 --- a/proton-c/src/dispatcher/dispatcher.c +++ b/proton-c/src/dispatcher/dispatcher.c @@ -19,11 +19,10 @@ * */ -#include "framing/framing.h" #include "dispatcher.h" + +#include "framing/framing.h" #include "protocol.h" -#include "util.h" -#include "platform_fmt.h" #include "engine/engine-internal.h" #include "dispatch_actions.h" @@ -75,81 +74,21 @@ static inline int pni_dispatch_action(pn_transport_t* transport, uint64_t lcode, return action(transport, frame_type, channel, args, payload); } -pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, pn_transport_t *transport) -{ - pn_dispatcher_t *disp = (pn_dispatcher_t *) calloc(sizeof(pn_dispatcher_t), 1); - - disp->frame_type = frame_type; - disp->transport = transport; - - disp->args = pn_data(16); - - disp->output_args = pn_data(16); - disp->frame = pn_buffer( 4*1024 ); - // XXX - disp->capacity = 4*1024; - disp->output = (char *) malloc(disp->capacity); - disp->available = 0; - - disp->halt = false; - disp->batch = true; - - disp->scratch = pn_string(NULL); - - return disp; -} - -void pn_dispatcher_free(pn_dispatcher_t *disp) -{ - if (disp) { - pn_data_free(disp->args); - pn_data_free(disp->output_args); - pn_buffer_free(disp->frame); - free(disp->output); - pn_free(disp->scratch); - free(disp); - } -} - -typedef enum {IN, OUT} pn_dir_t; - -static void pn_do_trace(pn_dispatcher_t *disp, uint16_t ch, pn_dir_t dir, - pn_data_t *args, const char *payload, size_t size) -{ - if (disp->transport->trace & PN_TRACE_FRM) { - pn_string_format(disp->scratch, "%u %s ", ch, dir == OUT ? "->" : "<-"); - pn_inspect(args, disp->scratch); - - if (pn_data_size(args)==0) { - pn_string_addf(disp->scratch, "(EMPTY FRAME)"); - } - - if (size) { - char buf[1024]; - int e = pn_quote_data(buf, 1024, payload, size); - pn_string_addf(disp->scratch, " (%" PN_ZU ") \"%s\"%s", size, buf, - e == PN_OVERFLOW ? "... (truncated)" : ""); - } - - pn_transport_log(disp->transport, pn_string_get(disp->scratch)); - } -} - -static int pni_dispatch_frame(pn_dispatcher_t *disp, pn_data_t *args, pn_frame_t frame) +static int pni_dispatch_frame(pn_transport_t * transport, pn_data_t *args, pn_frame_t frame) { if (frame.size == 0) { // ignore null frames - if (disp->transport->trace & PN_TRACE_FRM) - pn_transport_logf(disp->transport, "%u <- (EMPTY FRAME)\n", frame.channel); + if (transport->trace & PN_TRACE_FRM) + pn_transport_logf(transport, "%u <- (EMPTY FRAME)\n", frame.channel); return 0; } ssize_t dsize = pn_data_decode(args, frame.payload, frame.size); if (dsize < 0) { - pn_string_format(disp->scratch, + pn_string_format(transport->scratch, "Error decoding frame: %s %s\n", pn_code(dsize), pn_error_text(pn_data_error(args))); - pn_quote(disp->scratch, frame.payload, frame.size); - pn_transport_log(disp->transport, pn_string_get(disp->scratch)); + pn_quote(transport->scratch, frame.payload, frame.size); + pn_transport_log(transport, pn_string_get(transport->scratch)); return dsize; } @@ -161,217 +100,56 @@ static int pni_dispatch_frame(pn_dispatcher_t *disp, pn_data_t *args, pn_frame_t bool scanned; int e = pn_data_scan(args, "D?L.", &scanned, &lcode); if (e) { - pn_transport_log(disp->transport, "Scan error"); + pn_transport_log(transport, "Scan error"); return e; } if (!scanned) { - pn_transport_log(disp->transport, "Error dispatching frame"); + pn_transport_log(transport, "Error dispatching frame"); return PN_ERR; } size_t payload_size = frame.size - dsize; const char *payload_mem = payload_size ? frame.payload + dsize : NULL; pn_bytes_t payload = {payload_size, payload_mem}; - pn_do_trace(disp, channel, IN, args, payload_mem, payload_size); + pn_do_trace(transport, channel, IN, args, payload_mem, payload_size); - int err = pni_dispatch_action(disp->transport, lcode, frame_type, channel, args, &payload); + int err = pni_dispatch_action(transport, lcode, frame_type, channel, args, &payload); pn_data_clear(args); return err; } -ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, const char *bytes, size_t available) +ssize_t pn_dispatcher_input(pn_transport_t *transport, const char *bytes, size_t available, bool batch, bool *halt) { size_t read = 0; - while (available && !disp->halt) { + while (available && !*halt) { pn_frame_t frame; size_t n = pn_read_frame(&frame, bytes + read, available); if (n) { read += n; available -= n; - disp->input_frames_ct += 1; - int e = pni_dispatch_frame(disp, disp->args, frame); + transport->input_frames_ct += 1; + int e = pni_dispatch_frame(transport, transport->args, frame); if (e) return e; } else { break; } - if (!disp->batch) break; + if (!batch) break; } return read; } -void pn_set_payload(pn_dispatcher_t *disp, const char *data, size_t size) +ssize_t pn_dispatcher_output(pn_transport_t *transport, char *bytes, size_t size) { - disp->output_payload = data; - disp->output_size = size; -} - -int pn_post_frame(pn_dispatcher_t *disp, uint16_t ch, const char *fmt, ...) -{ - va_list ap; - va_start(ap, fmt); - pn_data_clear(disp->output_args); - int err = pn_data_vfill(disp->output_args, fmt, ap); - va_end(ap); - if (err) { - pn_transport_logf(disp->transport, - "error posting frame: %s, %s: %s", fmt, pn_code(err), - pn_error_text(pn_data_error(disp->output_args))); - return PN_ERR; - } - - pn_do_trace(disp, ch, OUT, disp->output_args, disp->output_payload, disp->output_size); - - encode_performatives: - pn_buffer_clear( disp->frame ); - pn_buffer_memory_t buf = pn_buffer_memory( disp->frame ); - buf.size = pn_buffer_available( disp->frame ); - - ssize_t wr = pn_data_encode( disp->output_args, buf.start, buf.size ); - if (wr < 0) { - if (wr == PN_OVERFLOW) { - pn_buffer_ensure( disp->frame, pn_buffer_available( disp->frame ) * 2 ); - goto encode_performatives; - } - pn_transport_logf(disp->transport, - "error posting frame: %s", pn_code(wr)); - return PN_ERR; - } - - pn_frame_t frame = {disp->frame_type}; - frame.channel = ch; - frame.payload = buf.start; - frame.size = wr; - size_t n; - while (!(n = pn_write_frame(disp->output + disp->available, - disp->capacity - disp->available, frame))) { - disp->capacity *= 2; - disp->output = (char *) realloc(disp->output, disp->capacity); - } - disp->output_frames_ct += 1; - if (disp->transport->trace & PN_TRACE_RAW) { - pn_string_set(disp->scratch, "RAW: \""); - pn_quote(disp->scratch, disp->output + disp->available, n); - pn_string_addf(disp->scratch, "\""); - pn_transport_log(disp->transport, pn_string_get(disp->scratch)); - } - disp->available += n; - - return 0; -} - -ssize_t pn_dispatcher_output(pn_dispatcher_t *disp, char *bytes, size_t size) -{ - int n = disp->available < size ? disp->available : size; - memmove(bytes, disp->output, n); - memmove(disp->output, disp->output + n, disp->available - n); - disp->available -= n; - // XXX: need to check for errors - return n; -} - - -int pn_post_transfer_frame(pn_dispatcher_t *disp, uint16_t ch, - uint32_t handle, - pn_sequence_t id, - const pn_bytes_t *tag, - uint32_t message_format, - bool settled, - bool more, - pn_sequence_t frame_limit) -{ - bool more_flag = more; - int framecount = 0; - - // create preformatives, assuming 'more' flag need not change - - compute_performatives: - pn_data_clear(disp->output_args); - int err = pn_data_fill(disp->output_args, "DL[IIzIoo]", TRANSFER, - handle, id, tag->size, tag->start, - message_format, - settled, more_flag); - if (err) { - pn_transport_logf(disp->transport, - "error posting transfer frame: %s: %s", pn_code(err), - pn_error_text(pn_data_error(disp->output_args))); - return PN_ERR; - } - - do { // send as many frames as possible without changing the 'more' flag... - - encode_performatives: - pn_buffer_clear( disp->frame ); - pn_buffer_memory_t buf = pn_buffer_memory( disp->frame ); - buf.size = pn_buffer_available( disp->frame ); - - ssize_t wr = pn_data_encode(disp->output_args, buf.start, buf.size); - if (wr < 0) { - if (wr == PN_OVERFLOW) { - pn_buffer_ensure( disp->frame, pn_buffer_available( disp->frame ) * 2 ); - goto encode_performatives; - } - pn_transport_logf(disp->transport, "error posting frame: %s", pn_code(wr)); - return PN_ERR; - } - buf.size = wr; - - // check if we need to break up the outbound frame - size_t available = disp->output_size; - if (disp->remote_max_frame) { - if ((available + buf.size) > disp->remote_max_frame - 8) { - available = disp->remote_max_frame - 8 - buf.size; - if (more_flag == false) { - more_flag = true; - goto compute_performatives; // deal with flag change - } - } else if (more_flag == true && more == false) { - // caller has no more, and this is the last frame - more_flag = false; - goto compute_performatives; - } - } - - if (pn_buffer_available( disp->frame ) < (available + buf.size)) { - // not enough room for payload - try again... - pn_buffer_ensure( disp->frame, available + buf.size ); - goto encode_performatives; - } - - pn_do_trace(disp, ch, OUT, disp->output_args, disp->output_payload, available); - - memmove( buf.start + buf.size, disp->output_payload, available); - disp->output_payload += available; - disp->output_size -= available; - buf.size += available; - - pn_frame_t frame = {disp->frame_type}; - frame.channel = ch; - frame.payload = buf.start; - frame.size = buf.size; - - size_t n; - while (!(n = pn_write_frame(disp->output + disp->available, - disp->capacity - disp->available, frame))) { - disp->capacity *= 2; - disp->output = (char *) realloc(disp->output, disp->capacity); - } - disp->output_frames_ct += 1; - framecount++; - if (disp->transport->trace & PN_TRACE_RAW) { - pn_string_set(disp->scratch, "RAW: \""); - pn_quote(disp->scratch, disp->output + disp->available, n); - pn_string_addf(disp->scratch, "\""); - pn_transport_log(disp->transport, pn_string_get(disp->scratch)); - } - disp->available += n; - } while (disp->output_size > 0 && framecount < frame_limit); - - disp->output_payload = NULL; - return framecount; + int n = transport->available < size ? transport->available : size; + memmove(bytes, transport->output, n); + memmove(transport->output, transport->output + n, transport->available - n); + transport->available -= n; + // XXX: need to check for errors + return n; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/117ac209/proton-c/src/dispatcher/dispatcher.h ---------------------------------------------------------------------- diff --git a/proton-c/src/dispatcher/dispatcher.h b/proton-c/src/dispatcher/dispatcher.h index b9bfa2b..752a71e 100644 --- a/proton-c/src/dispatcher/dispatcher.h +++ b/proton-c/src/dispatcher/dispatcher.h @@ -27,45 +27,12 @@ #include <stdbool.h> #endif -#include "proton/transport.h" -#include "buffer.h" - -typedef struct pn_dispatcher_t pn_dispatcher_t; +#include "proton/codec.h" +#include "proton/types.h" typedef int (pn_action_t)(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); -struct pn_dispatcher_t { - pn_data_t *args; - pn_data_t *output_args; - const char *output_payload; - size_t output_size; - size_t remote_max_frame; - pn_buffer_t *frame; // frame under construction - size_t capacity; - size_t available; /* number of raw bytes pending output */ - char *output; - pn_transport_t *transport; // TODO: We keep this to get access to logging - perhaps move logging - uint64_t output_frames_ct; - uint64_t input_frames_ct; - pn_string_t *scratch; - uint8_t frame_type; // Used when constructing outgoing frames - bool halt; - bool batch; -}; +ssize_t pn_dispatcher_input(pn_transport_t* transport, const char* bytes, size_t available, bool batch, bool* halt); +ssize_t pn_dispatcher_output(pn_transport_t *transport, char *bytes, size_t size); -pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, pn_transport_t *transport); -void pn_dispatcher_free(pn_dispatcher_t *disp); -void pn_set_payload(pn_dispatcher_t *disp, const char *data, size_t size); -int pn_post_frame(pn_dispatcher_t *disp, uint16_t ch, const char *fmt, ...); -ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, const char *bytes, size_t available); -ssize_t pn_dispatcher_output(pn_dispatcher_t *disp, char *bytes, size_t size); -int pn_post_transfer_frame(pn_dispatcher_t *disp, - uint16_t local_channel, - uint32_t handle, - pn_sequence_t delivery_id, - const pn_bytes_t *delivery_tag, - uint32_t message_format, - bool settled, - bool more, - pn_sequence_t frame_limit); #endif /* dispatcher.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/117ac209/proton-c/src/engine/engine-internal.h ---------------------------------------------------------------------- diff --git a/proton-c/src/engine/engine-internal.h b/proton-c/src/engine/engine-internal.h index 4ee9a69..bd9f952 100644 --- a/proton-c/src/engine/engine-internal.h +++ b/proton-c/src/engine/engine-internal.h @@ -120,7 +120,6 @@ struct pn_transport_t { pni_sasl_t *sasl; pni_ssl_t *ssl; pn_connection_t *connection; // reference counted - pn_dispatcher_t *disp; char *remote_container; char *remote_hostname; pn_data_t *remote_offered_capabilities; @@ -150,11 +149,23 @@ struct pn_transport_t { pn_hash_t *local_channels; pn_hash_t *remote_channels; + + + /* scratch area */ pn_string_t *scratch; + pn_data_t *args; + pn_data_t *output_args; + pn_buffer_t *frame; // frame under construction + // Temporary + size_t capacity; + size_t available; /* number of raw bytes pending output */ + char *output; /* statistics */ uint64_t bytes_input; uint64_t bytes_output; + uint64_t output_frames_ct; + uint64_t input_frames_ct; /* output buffered for send */ size_t output_size; @@ -182,6 +193,7 @@ struct pn_transport_t { bool done_processing; // if true, don't call pn_process again bool posted_idle_timeout; bool server; + bool halt; }; struct pn_connection_t { @@ -318,4 +330,11 @@ int pn_do_error(pn_transport_t *transport, const char *condition, const char *fm void pn_session_unbound(pn_session_t* ssn); void pn_link_unbound(pn_link_t* link); +int pn_post_frame(pn_transport_t *transport, uint8_t type, uint16_t ch, const char *fmt, ...); + +typedef enum {IN, OUT} pn_dir_t; + +void pn_do_trace(pn_transport_t *transport, uint16_t ch, pn_dir_t dir, + pn_data_t *args, const char *payload, size_t size); + #endif /* engine-internal.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/117ac209/proton-c/src/sasl/sasl.c ---------------------------------------------------------------------- diff --git a/proton-c/src/sasl/sasl.c b/proton-c/src/sasl/sasl.c index 5a174e2..5e68233 100644 --- a/proton-c/src/sasl/sasl.c +++ b/proton-c/src/sasl/sasl.c @@ -37,7 +37,6 @@ struct pni_sasl_t { - pn_dispatcher_t *disp; char *mechanisms; char *remote_mechanisms; pn_buffer_t *send_data; @@ -49,6 +48,7 @@ struct pni_sasl_t { bool rcvd_init; bool sent_done; bool rcvd_done; + bool halt; bool input_bypass; bool output_bypass; }; @@ -102,8 +102,6 @@ pn_sasl_t *pn_sasl(pn_transport_t *transport) { if (!transport->sasl) { pni_sasl_t *sasl = (pni_sasl_t *) malloc(sizeof(pni_sasl_t)); - sasl->disp = pn_dispatcher(1, transport); - sasl->disp->batch = false; sasl->client = !transport->server; sasl->mechanisms = NULL; @@ -118,6 +116,7 @@ pn_sasl_t *pn_sasl(pn_transport_t *transport) sasl->rcvd_done = false; sasl->input_bypass = false; sasl->output_bypass = false; + sasl->halt = false; transport->sasl = sasl; } @@ -274,22 +273,23 @@ void pn_sasl_free(pn_transport_t *transport) free(sasl->remote_mechanisms); pn_buffer_free(sasl->send_data); pn_buffer_free(sasl->recv_data); - pn_dispatcher_free(sasl->disp); free(sasl); } } } -void pn_client_init(pni_sasl_t *sasl) +void pn_client_init(pn_transport_t *transport) { + pni_sasl_t *sasl = transport->sasl; pn_buffer_memory_t bytes = pn_buffer_memory(sasl->send_data); - pn_post_frame(sasl->disp, 0, "DL[sz]", SASL_INIT, sasl->mechanisms, + pn_post_frame(transport, SASL_FRAME_TYPE, 0, "DL[sz]", SASL_INIT, sasl->mechanisms, bytes.size, bytes.start); pn_buffer_clear(sasl->send_data); } -void pn_server_init(pni_sasl_t *sasl) +void pn_server_init(pn_transport_t *transport) { + pni_sasl_t *sasl = transport->sasl; // XXX char *mechs[16]; int count = 0; @@ -316,13 +316,14 @@ void pn_server_init(pni_sasl_t *sasl) } } - pn_post_frame(sasl->disp, 0, "DL[@T[*s]]", SASL_MECHANISMS, PN_SYMBOL, count, mechs); + pn_post_frame(transport, SASL_FRAME_TYPE, 0, "DL[@T[*s]]", SASL_MECHANISMS, PN_SYMBOL, count, mechs); } void pn_server_done(pn_sasl_t *sasl0) { - pni_sasl_t *sasl = get_sasl_internal(sasl0); - pn_post_frame(sasl->disp, 0, "DL[B]", SASL_OUTCOME, sasl->outcome); + pn_transport_t *transport = get_transport_internal(sasl0); + pni_sasl_t *sasl = transport->sasl; + pn_post_frame(transport, SASL_FRAME_TYPE, 0, "DL[B]", SASL_OUTCOME, sasl->outcome); } void pn_sasl_process(pn_transport_t *transport) @@ -330,16 +331,16 @@ void pn_sasl_process(pn_transport_t *transport) pni_sasl_t *sasl = transport->sasl; if (!sasl->sent_init) { if (sasl->client) { - pn_client_init(sasl); + pn_client_init(transport); } else { - pn_server_init(sasl); + pn_server_init(transport); } sasl->sent_init = true; } if (pn_buffer_size(sasl->send_data)) { pn_buffer_memory_t bytes = pn_buffer_memory(sasl->send_data); - pn_post_frame(sasl->disp, 0, "DL[z]", sasl->client ? SASL_RESPONSE : SASL_CHALLENGE, + pn_post_frame(transport, SASL_FRAME_TYPE, 0, "DL[z]", sasl->client ? SASL_RESPONSE : SASL_CHALLENGE, bytes.size, bytes.start); pn_buffer_clear(sasl->send_data); } @@ -355,14 +356,14 @@ void pn_sasl_process(pn_transport_t *transport) // or challenges) from client if (!sasl->client && sasl->sent_done && sasl->rcvd_init) { sasl->rcvd_done = true; - sasl->disp->halt = true; + sasl->halt = true; } } ssize_t pn_sasl_input(pn_transport_t *transport, const char *bytes, size_t available) { pni_sasl_t *sasl = transport->sasl; - ssize_t n = pn_dispatcher_input(sasl->disp, bytes, available); + ssize_t n = pn_dispatcher_input(transport, bytes, available, false, &sasl->halt); if (n < 0) return n; pn_sasl_process(transport); @@ -388,7 +389,7 @@ ssize_t pn_sasl_output(pn_transport_t *transport, char *bytes, size_t size) pn_sasl_process(transport); pni_sasl_t *sasl = transport->sasl; - if (sasl->disp->available == 0 && sasl->sent_done) { + if (transport->available == 0 && sasl->sent_done) { if (pn_sasl_state((pn_sasl_t *)transport) == PN_SASL_PASS) { return PN_EOS; } else { @@ -396,7 +397,7 @@ ssize_t pn_sasl_output(pn_transport_t *transport, char *bytes, size_t size) return PN_ERR; } } else { - return pn_dispatcher_output(sasl->disp, bytes, size); + return pn_dispatcher_output(transport, bytes, size); } } @@ -449,7 +450,7 @@ int pn_do_outcome(pn_transport_t *transport, uint8_t frame_type, uint16_t channe sasl->outcome = (pn_sasl_outcome_t) outcome; sasl->rcvd_done = true; sasl->sent_done = true; - sasl->disp->halt = true; + sasl->halt = true; return 0; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/117ac209/proton-c/src/transport/transport.c ---------------------------------------------------------------------- diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c index 097d863..9c48def 100644 --- a/proton-c/src/transport/transport.c +++ b/proton-c/src/transport/transport.c @@ -305,8 +305,12 @@ static void pn_transport_initialize(void *object) transport->tracer = pni_default_tracer; transport->sasl = NULL; transport->ssl = NULL; + transport->scratch = pn_string(NULL); - transport->disp = pn_dispatcher(0, transport); + transport->args = pn_data(16); + transport->output_args = pn_data(16); + transport->frame = pn_buffer(4*1024); + transport->connection = NULL; transport->context = pn_record(); @@ -357,6 +361,7 @@ static void pn_transport_initialize(void *object) transport->posted_idle_timeout = false; transport->server = false; + transport->halt = false; transport->trace = (pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) | (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF) | @@ -413,6 +418,15 @@ pn_transport_t *pn_transport(void) pn_transport_free(transport); return NULL; } + + transport->capacity = 4*1024; + transport->available = 0; + transport->output = (char *) malloc(transport->capacity); + if (!transport->output) { + pn_transport_free(transport); + return NULL; + } + return transport; } @@ -439,7 +453,6 @@ static void pn_transport_finalize(void *object) pn_free(transport->context); pn_ssl_free(transport); pn_sasl_free(transport); - pn_dispatcher_free(transport->disp); free(transport->remote_container); free(transport->remote_hostname); pn_free(transport->remote_offered_capabilities); @@ -454,6 +467,10 @@ static void pn_transport_finalize(void *object) if (transport->input_buf) free(transport->input_buf); if (transport->output_buf) free(transport->output_buf); pn_free(transport->scratch); + pn_data_free(transport->args); + pn_data_free(transport->output_args); + pn_buffer_free(transport->frame); + free(transport->output); } int pn_transport_bind(pn_transport_t *transport, pn_connection_t *connection) @@ -474,7 +491,7 @@ int pn_transport_bind(pn_transport_t *transport, pn_connection_t *connection) if (transport->open_rcvd) { PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE); pn_collector_put(connection->collector, PN_OBJECT, connection, PN_CONNECTION_REMOTE_OPEN); - transport->disp->halt = false; + transport->halt = false; transport_consume(transport); // blech - testBindAfterOpen } @@ -623,6 +640,185 @@ void pni_disposition_encode(pn_disposition_t *disposition, pn_data_t *data) } } + +void pn_do_trace(pn_transport_t *transport, uint16_t ch, pn_dir_t dir, + pn_data_t *args, const char *payload, size_t size) +{ + if (transport->trace & PN_TRACE_FRM) { + pn_string_format(transport->scratch, "%u %s ", ch, dir == OUT ? "->" : "<-"); + pn_inspect(args, transport->scratch); + + if (pn_data_size(args)==0) { + pn_string_addf(transport->scratch, "(EMPTY FRAME)"); + } + + if (size) { + char buf[1024]; + int e = pn_quote_data(buf, 1024, payload, size); + pn_string_addf(transport->scratch, " (%" PN_ZU ") \"%s\"%s", size, buf, + e == PN_OVERFLOW ? "... (truncated)" : ""); + } + + pn_transport_log(transport, pn_string_get(transport->scratch)); + } +} + +int pn_post_frame(pn_transport_t *transport, uint8_t type, uint16_t ch, const char *fmt, ...) +{ + pn_buffer_t *frame_buf = transport->frame; + va_list ap; + va_start(ap, fmt); + pn_data_clear(transport->output_args); + int err = pn_data_vfill(transport->output_args, fmt, ap); + va_end(ap); + if (err) { + pn_transport_logf(transport, + "error posting frame: %s, %s: %s", fmt, pn_code(err), + pn_error_text(pn_data_error(transport->output_args))); + return PN_ERR; + } + + pn_do_trace(transport, ch, OUT, transport->output_args, NULL, 0); + + encode_performatives: + pn_buffer_clear( frame_buf ); + pn_buffer_memory_t buf = pn_buffer_memory( frame_buf ); + buf.size = pn_buffer_available( frame_buf ); + + ssize_t wr = pn_data_encode( transport->output_args, buf.start, buf.size ); + if (wr < 0) { + if (wr == PN_OVERFLOW) { + pn_buffer_ensure( frame_buf, pn_buffer_available( frame_buf ) * 2 ); + goto encode_performatives; + } + pn_transport_logf(transport, + "error posting frame: %s", pn_code(wr)); + return PN_ERR; + } + + pn_frame_t frame = {type}; + frame.channel = ch; + frame.payload = buf.start; + frame.size = wr; + size_t n; + while (!(n = pn_write_frame(transport->output + transport->available, + transport->capacity - transport->available, frame))) { + transport->capacity *= 2; + transport->output = (char *) realloc(transport->output, transport->capacity); + } + transport->output_frames_ct += 1; + if (transport->trace & PN_TRACE_RAW) { + pn_string_set(transport->scratch, "RAW: \""); + pn_quote(transport->scratch, transport->output + transport->available, n); + pn_string_addf(transport->scratch, "\""); + pn_transport_log(transport, pn_string_get(transport->scratch)); + } + transport->available += n; + + return 0; +} + +int pn_post_amqp_transfer_frame(pn_transport_t *transport, uint16_t ch, + uint32_t handle, + pn_sequence_t id, + pn_bytes_t *payload, + const pn_bytes_t *tag, + uint32_t message_format, + bool settled, + bool more, + pn_sequence_t frame_limit) +{ + bool more_flag = more; + int framecount = 0; + pn_buffer_t *frame = transport->frame; + + // create preformatives, assuming 'more' flag need not change + + compute_performatives: + pn_data_clear(transport->output_args); + int err = pn_data_fill(transport->output_args, "DL[IIzIoo]", TRANSFER, + handle, id, tag->size, tag->start, + message_format, + settled, more_flag); + if (err) { + pn_transport_logf(transport, + "error posting transfer frame: %s: %s", pn_code(err), + pn_error_text(pn_data_error(transport->output_args))); + return PN_ERR; + } + + do { // send as many frames as possible without changing the 'more' flag... + + encode_performatives: + pn_buffer_clear( frame ); + pn_buffer_memory_t buf = pn_buffer_memory( frame ); + buf.size = pn_buffer_available( frame ); + + ssize_t wr = pn_data_encode(transport->output_args, buf.start, buf.size); + if (wr < 0) { + if (wr == PN_OVERFLOW) { + pn_buffer_ensure( frame, pn_buffer_available( frame ) * 2 ); + goto encode_performatives; + } + pn_transport_logf(transport, "error posting frame: %s", pn_code(wr)); + return PN_ERR; + } + buf.size = wr; + + // check if we need to break up the outbound frame + size_t available = payload->size; + if (transport->remote_max_frame) { + if ((available + buf.size) > transport->remote_max_frame - 8) { + available = transport->remote_max_frame - 8 - buf.size; + if (more_flag == false) { + more_flag = true; + goto compute_performatives; // deal with flag change + } + } else if (more_flag == true && more == false) { + // caller has no more, and this is the last frame + more_flag = false; + goto compute_performatives; + } + } + + if (pn_buffer_available( frame ) < (available + buf.size)) { + // not enough room for payload - try again... + pn_buffer_ensure( frame, available + buf.size ); + goto encode_performatives; + } + + pn_do_trace(transport, ch, OUT, transport->output_args, payload->start, available); + + memmove( buf.start + buf.size, payload->start, available); + payload->start += available; + payload->size -= available; + buf.size += available; + + pn_frame_t frame = {AMQP_FRAME_TYPE}; + frame.channel = ch; + frame.payload = buf.start; + frame.size = buf.size; + + size_t n; + while (!(n = pn_write_frame(transport->output + transport->available, + transport->capacity - transport->available, frame))) { + transport->capacity *= 2; + transport->output = (char *) realloc(transport->output, transport->capacity); + } + transport->output_frames_ct += 1; + framecount++; + if (transport->trace & PN_TRACE_RAW) { + pn_string_set(transport->scratch, "RAW: \""); + pn_quote(transport->scratch, transport->output + transport->available, n); + pn_string_addf(transport->scratch, "\""); + pn_transport_log(transport, pn_string_get(transport->scratch)); + } + transport->available += n; + } while (payload->size > 0 && framecount < frame_limit); + + return framecount; +} + int pn_post_close(pn_transport_t *transport, const char *condition, const char *description) { pn_condition_t *cond = NULL; @@ -636,7 +832,7 @@ int pn_post_close(pn_transport_t *transport, const char *condition, const char * info = pn_condition_info(cond); } - return pn_post_frame(transport->disp, 0, "DL[?DL[sSC]]", CLOSE, + return pn_post_frame(transport, AMQP_FRAME_TYPE, 0, "DL[?DL[sSC]]", CLOSE, (bool) condition, ERROR, condition, description, info); } @@ -677,13 +873,13 @@ int pn_do_error(pn_transport_t *transport, const char *condition, const char *fm va_end(ap); if (!transport->close_sent) { if (!transport->open_sent) { - pn_post_frame(transport->disp, 0, "DL[S]", OPEN, ""); + pn_post_frame(transport, AMQP_FRAME_TYPE, 0, "DL[S]", OPEN, ""); } pn_post_close(transport, condition, buf); transport->close_sent = true; } - transport->disp->halt = true; + transport->halt = true; pn_condition_set_name(&transport->condition, condition); pn_condition_set_description(&transport->condition, buf); pn_collector_t *collector = pni_transport_collector(transport); @@ -722,8 +918,6 @@ int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, transport->remote_max_frame, AMQP_MIN_MAX_FRAME_SIZE); transport->remote_max_frame = AMQP_MIN_MAX_FRAME_SIZE; } - transport->disp->remote_max_frame = transport->remote_max_frame; - pn_buffer_clear( transport->disp->frame ); } if (container_q) { transport->remote_container = pn_bytes_strdup(remote_container); @@ -740,7 +934,7 @@ int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_ACTIVE); pn_collector_put(conn->collector, PN_OBJECT, conn, PN_CONNECTION_REMOTE_OPEN); } else { - transport->disp->halt = true; + transport->halt = true; } transport->open_rcvd = true; return 0; @@ -1325,7 +1519,7 @@ static ssize_t pn_input_read_amqp(pn_transport_t* transport, unsigned int layer, } - ssize_t n = pn_dispatcher_input(transport->disp, bytes, available); + ssize_t n = pn_dispatcher_input(transport, bytes, available, true, &transport->halt); if (n < 0) { //return pn_error_set(transport->error, n, "dispatch error"); return PN_EOS; @@ -1365,10 +1559,10 @@ static pn_timestamp_t pn_tick_amqp(pn_transport_t* transport, unsigned int layer transport->last_bytes_output = transport->bytes_output; } else if (transport->keepalive_deadline <= now) { transport->keepalive_deadline = now + (pn_timestamp_t)(transport->remote_idle_timeout/2.0); - if (transport->disp->available == 0) { // no outbound data pending + if (transport->available == 0) { // no outbound data pending // so send empty frame (and account for it!) - pn_post_frame(transport->disp, 0, ""); - transport->last_bytes_output += transport->disp->available; + pn_post_frame(transport, AMQP_FRAME_TYPE, 0, ""); + transport->last_bytes_output += transport->available; } } timeout = pn_timestamp_min( timeout, transport->keepalive_deadline ); @@ -1390,7 +1584,7 @@ int pn_process_conn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint) : 0; pn_connection_t *connection = (pn_connection_t *) endpoint; const char *cid = pn_string_get(connection->container); - int err = pn_post_frame(transport->disp, 0, "DL[SS?I?H?InnCCC]", OPEN, + int err = pn_post_frame(transport, AMQP_FRAME_TYPE, 0, "DL[SS?I?H?InnCCC]", OPEN, cid ? cid : "", pn_string_get(connection->hostname), // if not zero, advertise our max frame size and idle timeout @@ -1464,7 +1658,7 @@ int pn_process_ssn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint) pni_map_local_channel(ssn); state->incoming_window = pn_session_incoming_window(ssn); state->outgoing_window = pn_session_outgoing_window(ssn); - pn_post_frame(transport->disp, state->local_channel, "DL[?HIII]", BEGIN, + pn_post_frame(transport, AMQP_FRAME_TYPE, state->local_channel, "DL[?HIII]", BEGIN, ((int16_t) state->remote_channel >= 0), state->remote_channel, state->outgoing_transfer_count, state->incoming_window, @@ -1512,7 +1706,7 @@ int pn_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endpoint) pni_map_local_handle(link); const pn_distribution_mode_t dist_mode = link->source.distribution_mode; if (link->target.type == PN_COORDINATOR) { - int err = pn_post_frame(transport->disp, ssn_state->local_channel, + int err = pn_post_frame(transport, AMQP_FRAME_TYPE, ssn_state->local_channel, "DL[SIoBB?DL[SIsIoC?sCnCC]DL[C]nnI]", ATTACH, pn_string_get(link->name), state->local_handle, @@ -1534,7 +1728,7 @@ int pn_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endpoint) 0); if (err) return err; } else { - int err = pn_post_frame(transport->disp, ssn_state->local_channel, + int err = pn_post_frame(transport, AMQP_FRAME_TYPE, ssn_state->local_channel, "DL[SIoBB?DL[SIsIoC?sCnCC]?DL[SIsIoCC]nnI]", ATTACH, pn_string_get(link->name), state->local_handle, @@ -1575,7 +1769,7 @@ int pn_post_flow(pn_transport_t *transport, pn_session_t *ssn, pn_link_t *link) ssn->state.outgoing_window = pn_session_outgoing_window(ssn); bool linkq = (bool) link; pn_link_state_t *state = &link->state; - return pn_post_frame(transport->disp, ssn->state.local_channel, "DL[?IIII?I?I?In?o]", FLOW, + return pn_post_frame(transport, AMQP_FRAME_TYPE, ssn->state.local_channel, "DL[?IIII?I?I?In?o]", FLOW, (int16_t) ssn->state.remote_channel >= 0, ssn->state.incoming_transfer_count, ssn->state.incoming_window, ssn->state.outgoing_transfer_count, @@ -1609,7 +1803,7 @@ int pn_flush_disp(pn_transport_t *transport, pn_session_t *ssn) uint64_t code = ssn->state.disp_code; bool settled = ssn->state.disp_settled; if (ssn->state.disp) { - int err = pn_post_frame(transport->disp, ssn->state.local_channel, "DL[oIIo?DL[]]", DISPOSITION, + int err = pn_post_frame(transport, AMQP_FRAME_TYPE, ssn->state.local_channel, "DL[oIIo?DL[]]", DISPOSITION, ssn->state.disp_type, ssn->state.disp_first, ssn->state.disp_last, settled, (bool)code, code); if (err) return err; @@ -1641,7 +1835,7 @@ int pn_post_disp(pn_transport_t *transport, pn_delivery_t *delivery) if (!pni_disposition_batchable(&delivery->local)) { pn_data_clear(transport->disp_data); pni_disposition_encode(&delivery->local, transport->disp_data); - return pn_post_frame(transport->disp, ssn->state.local_channel, + return pn_post_frame(transport, AMQP_FRAME_TYPE, ssn->state.local_channel, "DL[oIIo?DLC]", DISPOSITION, role, state->id, state->id, delivery->local.settled, (bool)code, code, transport->disp_data); @@ -1690,22 +1884,22 @@ int pn_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *delivery, } pn_bytes_t bytes = pn_buffer_bytes(delivery->bytes); - pn_set_payload(transport->disp, bytes.start, bytes.size); + size_t full_size = bytes.size; pn_bytes_t tag = pn_buffer_bytes(delivery->tag); - int count = pn_post_transfer_frame(transport->disp, - ssn_state->local_channel, - link_state->local_handle, - state->id, &tag, - 0, // message-format - delivery->local.settled, - !delivery->done, - ssn_state->remote_incoming_window); + int count = pn_post_amqp_transfer_frame(transport, + ssn_state->local_channel, + link_state->local_handle, + state->id, &bytes, &tag, + 0, // message-format + delivery->local.settled, + !delivery->done, + ssn_state->remote_incoming_window); if (count < 0) return count; xfr_posted = true; ssn_state->outgoing_transfer_count += count; ssn_state->remote_incoming_window -= count; - int sent = bytes.size - transport->disp->output_size; + int sent = full_size - bytes.size; pn_buffer_trim(delivery->bytes, sent, 0); link->session->outgoing_bytes -= sent; if (!pn_buffer_size(delivery->bytes) && delivery->done) { @@ -1860,7 +2054,7 @@ int pn_process_link_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint) } int err = - pn_post_frame(transport->disp, ssn_state->local_channel, + pn_post_frame(transport, AMQP_FRAME_TYPE, ssn_state->local_channel, "DL[Io?DL[sSC]]", DETACH, state->local_handle, !link->detached, (bool)name, ERROR, name, description, info); if (err) return err; @@ -1931,7 +2125,7 @@ int pn_process_ssn_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint) info = pn_condition_info(&endpoint->condition); } - int err = pn_post_frame(transport->disp, state->local_channel, "DL[?DL[sSC]]", END, + int err = pn_post_frame(transport, AMQP_FRAME_TYPE, state->local_channel, "DL[?DL[sSC]]", END, (bool) name, ERROR, name, description, info); if (err) return err; pni_unmap_local_channel(session); @@ -2029,11 +2223,11 @@ static ssize_t pn_output_write_amqp(pn_transport_t* transport, unsigned int laye // write out any buffered data _before_ returning PN_EOS, else we // could truncate an outgoing Close frame containing a useful error // status - if (!transport->disp->available && transport->close_sent) { + if (!transport->available && transport->close_sent) { return PN_EOS; } - return pn_dispatcher_output(transport->disp, bytes, available); + return pn_dispatcher_output(transport, bytes, available); } static void pni_close_head(pn_transport_t *transport) @@ -2233,15 +2427,15 @@ pn_timestamp_t pn_transport_tick(pn_transport_t *transport, pn_timestamp_t now) uint64_t pn_transport_get_frames_output(const pn_transport_t *transport) { - if (transport && transport->disp) - return transport->disp->output_frames_ct; + if (transport) + return transport->output_frames_ct; return 0; } uint64_t pn_transport_get_frames_input(const pn_transport_t *transport) { - if (transport && transport->disp) - return transport->disp->input_frames_ct; + if (transport) + return transport->input_frames_ct; return 0; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
