PROTON-749: Refactoring of io layers: - Eliminate some unnecessary stuff. - Make pn_io_layer_t a pure interface. - Simplify amqp header code; remove header_count member from pn_transport_t
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/c814d5c3 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/c814d5c3 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/c814d5c3 Branch: refs/heads/examples Commit: c814d5c39147afa642e95e1b1ad51650b04d9739 Parents: 2794da5 Author: Andrew Stitcher <[email protected]> Authored: Wed Aug 6 17:57:56 2014 -0400 Committer: Andrew Stitcher <[email protected]> Committed: Mon Nov 17 14:55:03 2014 -0500 ---------------------------------------------------------------------- proton-c/src/engine/engine-internal.h | 20 ++- proton-c/src/sasl/sasl.c | 128 +++++++++++------- proton-c/src/ssl/openssl.c | 143 +++++++++++--------- proton-c/src/transport/transport.c | 204 ++++++++++++++--------------- proton-c/src/windows/schannel.c | 155 ++++++++++++---------- 5 files changed, 354 insertions(+), 296 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c814d5c3/proton-c/src/engine/engine-internal.h ---------------------------------------------------------------------- diff --git a/proton-c/src/engine/engine-internal.h b/proton-c/src/engine/engine-internal.h index dd4c44e..86f5161 100644 --- a/proton-c/src/engine/engine-internal.h +++ b/proton-c/src/engine/engine-internal.h @@ -100,18 +100,16 @@ typedef struct { #include <proton/ssl.h> typedef struct pn_io_layer_t { - void *context; - struct pn_io_layer_t *next; - ssize_t (*process_input)(struct pn_io_layer_t *io_layer, const char *, size_t); - ssize_t (*process_output)(struct pn_io_layer_t *io_layer, char *, size_t); - pn_timestamp_t (*process_tick)(struct pn_io_layer_t *io_layer, pn_timestamp_t); - size_t (*buffered_output)(struct pn_io_layer_t *); // how much output is held - size_t (*buffered_input)(struct pn_io_layer_t *); // how much input is held + ssize_t (*process_input)(struct pn_transport_t *transport, unsigned int layer, const char *, size_t); + ssize_t (*process_output)(struct pn_transport_t *transport, unsigned int layer, char *, size_t); + pn_timestamp_t (*process_tick)(struct pn_transport_t *transport, unsigned int layer, pn_timestamp_t); + size_t (*buffered_output)(struct pn_transport_t *); // how much output is held } pn_io_layer_t; +extern const pn_io_layer_t pni_passthru_layer; + struct pn_transport_t { pn_tracer_t tracer; - size_t header_count; pn_sasl_t *sasl; pn_ssl_t *ssl; pn_connection_t *connection; // reference counted @@ -134,7 +132,7 @@ struct pn_transport_t { #define PN_IO_SASL 1 #define PN_IO_AMQP 2 #define PN_IO_LAYER_CT (PN_IO_AMQP+1) - pn_io_layer_t io_layers[PN_IO_LAYER_CT]; + const pn_io_layer_t *io_layers[PN_IO_LAYER_CT]; /* dead remote detection */ pn_millis_t local_idle_timeout; @@ -302,9 +300,7 @@ void pn_link_dump(pn_link_t *link); void pn_dump(pn_connection_t *conn); void pn_transport_sasl_init(pn_transport_t *transport); -ssize_t pn_io_layer_input_passthru(pn_io_layer_t *, const char *, size_t ); -ssize_t pn_io_layer_output_passthru(pn_io_layer_t *, char *, size_t ); -pn_timestamp_t pn_io_layer_tick_passthru(pn_io_layer_t *, pn_timestamp_t); +pn_timestamp_t pn_io_layer_tick_passthru(pn_transport_t *, unsigned int, pn_timestamp_t); void pn_condition_init(pn_condition_t *condition); void pn_condition_tini(pn_condition_t *condition); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c814d5c3/proton-c/src/sasl/sasl.c ---------------------------------------------------------------------- diff --git a/proton-c/src/sasl/sasl.c b/proton-c/src/sasl/sasl.c index a82ec02..97bead4 100644 --- a/proton-c/src/sasl/sasl.c +++ b/proton-c/src/sasl/sasl.c @@ -36,7 +36,6 @@ struct pn_sasl_t { pn_transport_t *transport; - pn_io_layer_t *io_layer; pn_dispatcher_t *disp; char *mechanisms; char *remote_mechanisms; @@ -50,12 +49,42 @@ struct pn_sasl_t { bool rcvd_init; bool sent_done; bool rcvd_done; + bool input_bypass; + bool output_bypass; }; -static ssize_t pn_input_read_sasl_header(pn_io_layer_t *io_layer, const char *bytes, size_t available); -static ssize_t pn_input_read_sasl(pn_io_layer_t *io_layer, const char *bytes, size_t available); -static ssize_t pn_output_write_sasl_header(pn_io_layer_t *io_layer, char *bytes, size_t available); -static ssize_t pn_output_write_sasl(pn_io_layer_t *io_layer, char *bytes, size_t available); +static ssize_t pn_input_read_sasl_header(pn_transport_t* transport, unsigned int layer, const char* bytes, size_t available); +static ssize_t pn_input_read_sasl(pn_transport_t *transport, unsigned int layer, const char *bytes, size_t available); +static ssize_t pn_output_write_sasl_header(pn_transport_t* transport, unsigned int layer, char* bytes, size_t size); +static ssize_t pn_output_write_sasl(pn_transport_t *transport, unsigned int layer, char *bytes, size_t available); + +const pn_io_layer_t sasl_headers_layer = { + pn_input_read_sasl_header, + pn_output_write_sasl_header, + pn_io_layer_tick_passthru, + NULL +}; + +const pn_io_layer_t sasl_write_header_layer = { + pn_input_read_sasl, + pn_output_write_sasl_header, + pn_io_layer_tick_passthru, + NULL +}; + +const pn_io_layer_t sasl_read_header_layer = { + pn_input_read_sasl_header, + pn_output_write_sasl, + pn_io_layer_tick_passthru, + NULL +}; + +const pn_io_layer_t sasl_layer = { + pn_input_read_sasl, + pn_output_write_sasl, + pn_io_layer_tick_passthru, + NULL +}; pn_sasl_t *pn_sasl(pn_transport_t *transport) { @@ -76,14 +105,12 @@ pn_sasl_t *pn_sasl(pn_transport_t *transport) sasl->rcvd_init = false; sasl->sent_done = false; sasl->rcvd_done = false; + sasl->input_bypass = false; + sasl->output_bypass = false; transport->sasl = sasl; sasl->transport = transport; - sasl->io_layer = &transport->io_layers[PN_IO_SASL]; - sasl->io_layer->context = sasl; - sasl->io_layer->process_input = pn_input_read_sasl_header; - sasl->io_layer->process_output = pn_output_write_sasl_header; - sasl->io_layer->process_tick = pn_io_layer_tick_passthru; + transport->io_layers[PN_IO_SASL] = &sasl_headers_layer; } return transport->sasl; @@ -404,9 +431,9 @@ int pn_do_outcome(pn_dispatcher_t *disp) #define AMQP_HEADER ("AMQP\x00\x01\x00\x00") #define SASL_HEADER_LEN 8 -static ssize_t pn_input_read_sasl_header(pn_io_layer_t *io_layer, const char *bytes, size_t available) +static ssize_t pn_input_read_sasl_header(pn_transport_t* transport, unsigned int layer, const char* bytes, size_t available) { - pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context; + pn_sasl_t *sasl = transport->sasl; if (available > 0) { if (available < SASL_HEADER_LEN) { if (memcmp(bytes, SASL_HEADER, available) == 0 || @@ -414,20 +441,22 @@ static ssize_t pn_input_read_sasl_header(pn_io_layer_t *io_layer, const char *by return 0; } else { if (memcmp(bytes, SASL_HEADER, SASL_HEADER_LEN) == 0) { - sasl->io_layer->process_input = pn_input_read_sasl; + if (transport->io_layers[layer] == &sasl_read_header_layer) { + transport->io_layers[layer] = &sasl_layer; + } else { + transport->io_layers[layer] = &sasl_write_header_layer; + } if (sasl->disp->trace & PN_TRACE_FRM) - pn_transport_logf(sasl->transport, " <- %s", "SASL"); + pn_transport_logf(transport, " <- %s", "SASL"); return SASL_HEADER_LEN; } if (memcmp(bytes, AMQP_HEADER, SASL_HEADER_LEN) == 0) { if (sasl->allow_skip) { sasl->outcome = PN_SASL_SKIPPED; - sasl->io_layer->process_input = pn_io_layer_input_passthru; - sasl->io_layer->process_output = pn_io_layer_output_passthru; - pn_io_layer_t *io_next = sasl->io_layer->next; - return io_next->process_input( io_next, bytes, available ); + transport->io_layers[layer] = &pni_passthru_layer; + return pni_passthru_layer.process_input(transport, layer, bytes, available); } else { - pn_do_error(sasl->transport, "amqp:connection:policy-error", + pn_do_error(transport, "amqp:connection:policy-error", "Client skipped SASL exchange - forbidden"); return PN_EOS; } @@ -436,50 +465,57 @@ static ssize_t pn_input_read_sasl_header(pn_io_layer_t *io_layer, const char *by } char quoted[1024]; pn_quote_data(quoted, 1024, bytes, available); - pn_do_error(sasl->transport, "amqp:connection:framing-error", + pn_do_error(transport, "amqp:connection:framing-error", "%s header mismatch: '%s'", "SASL", quoted); return PN_EOS; } -static ssize_t pn_input_read_sasl(pn_io_layer_t *io_layer, const char *bytes, size_t available) +static ssize_t pn_input_read_sasl(pn_transport_t* transport, unsigned int layer, const char* bytes, size_t available) { - pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context; - ssize_t n = pn_sasl_input(sasl, bytes, available); - if (n == PN_EOS) { - sasl->io_layer->process_input = pn_io_layer_input_passthru; - pn_io_layer_t *io_next = sasl->io_layer->next; - return io_next->process_input( io_next, bytes, available ); + pn_sasl_t *sasl = transport->sasl; + if (!sasl->input_bypass) { + ssize_t n = pn_sasl_input(sasl, bytes, available); + if (n != PN_EOS) return n; + + sasl->input_bypass = true; + if (sasl->output_bypass) + transport->io_layers[layer] = &pni_passthru_layer; } - return n; + return pni_passthru_layer.process_input(transport, layer, bytes, available ); } -static ssize_t pn_output_write_sasl_header(pn_io_layer_t *io_layer, char *bytes, size_t size) +static ssize_t pn_output_write_sasl_header(pn_transport_t *transport, unsigned int layer, char *bytes, size_t size) { - pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context; + pn_sasl_t *sasl = transport->sasl; if (sasl->disp->trace & PN_TRACE_FRM) - pn_transport_logf(sasl->transport, " -> %s", "SASL"); + pn_transport_logf(transport, " -> %s", "SASL"); assert(size >= SASL_HEADER_LEN); memmove(bytes, SASL_HEADER, SASL_HEADER_LEN); - sasl->io_layer->process_output = pn_output_write_sasl; + if (transport->io_layers[layer]==&sasl_write_header_layer) { + transport->io_layers[layer] = &sasl_layer; + } else { + transport->io_layers[layer] = &sasl_read_header_layer; + } return SASL_HEADER_LEN; } -static ssize_t pn_output_write_sasl(pn_io_layer_t *io_layer, char *bytes, size_t size) +static ssize_t pn_output_write_sasl(pn_transport_t* transport, unsigned int layer, char* bytes, size_t available) { - pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context; - // this accounts for when pn_do_error is invoked, e.g. by idle timeout - ssize_t n; - if (sasl->transport->close_sent) { - n = PN_EOS; - } else { - n = pn_sasl_output(sasl, bytes, size); - } + pn_sasl_t *sasl = transport->sasl; + if (!sasl->output_bypass) { + // this accounts for when pn_do_error is invoked, e.g. by idle timeout + ssize_t n; + if (transport->close_sent) { + n = PN_EOS; + } else { + n = pn_sasl_output(sasl, bytes, available); + } + if (n != PN_EOS) return n; - if (n == PN_EOS) { - sasl->io_layer->process_output = pn_io_layer_output_passthru; - pn_io_layer_t *io_next = sasl->io_layer->next; - return io_next->process_output( io_next, bytes, size ); + sasl->output_bypass = true; + if (sasl->input_bypass) + transport->io_layers[layer] = &pni_passthru_layer; } - return n; + return pni_passthru_layer.process_output(transport, layer, bytes, available ); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c814d5c3/proton-c/src/ssl/openssl.c ---------------------------------------------------------------------- diff --git a/proton-c/src/ssl/openssl.c b/proton-c/src/ssl/openssl.c index dd1b88b..a763cfb 100644 --- a/proton-c/src/ssl/openssl.c +++ b/proton-c/src/ssl/openssl.c @@ -87,9 +87,7 @@ struct pn_ssl_domain_t { struct pn_ssl_t { - pn_transport_t *transport; - pn_io_layer_t *io_layer; pn_ssl_domain_t *domain; const char *session_id; const char *peer_hostname; @@ -134,19 +132,18 @@ struct pn_ssl_session_t { /* */ static int keyfile_pw_cb(char *buf, int size, int rwflag, void *userdata); -static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_data, size_t len); -static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *input_data, size_t len); -static ssize_t process_input_unknown( pn_io_layer_t *io_layer, const char *input_data, size_t len); -static ssize_t process_output_unknown( pn_io_layer_t *io_layer, char *input_data, size_t len); -static ssize_t process_input_done(pn_io_layer_t *io_layer, const char *input_data, size_t len); -static ssize_t process_output_done(pn_io_layer_t *io_layer, char *input_data, size_t len); +static ssize_t process_input_ssl( pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len); +static ssize_t process_output_ssl( pn_transport_t *transport, unsigned int layer, char *input_data, size_t len); +static ssize_t process_input_unknown( pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len); +static ssize_t process_output_unknown( pn_transport_t *transport, unsigned int layer, char *input_data, size_t len); +static ssize_t process_input_done(pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len); +static ssize_t process_output_done(pn_transport_t *transport, unsigned int layer, char *input_data, size_t len); static connection_mode_t check_for_ssl_connection( const char *data, size_t len ); static int init_ssl_socket( pn_ssl_t * ); static void release_ssl_socket( pn_ssl_t * ); static pn_ssl_session_t *ssn_cache_find( pn_ssl_domain_t *, const char * ); static void ssl_session_free( pn_ssl_session_t *); -static size_t buffered_output( pn_io_layer_t *io_layer ); -static size_t buffered_input( pn_io_layer_t *io_layer ); +static size_t buffered_output( pn_transport_t *transport ); // @todo: used to avoid littering the code with calls to printf... static void _log_error(pn_ssl_t *ssl, const char *fmt, ...) @@ -670,6 +667,40 @@ int pn_ssl_domain_set_peer_authentication(pn_ssl_domain_t *domain, return 0; } +const pn_io_layer_t unknown_layer = { + process_input_unknown, + process_output_unknown, + pn_io_layer_tick_passthru, + NULL +}; + +const pn_io_layer_t ssl_layer = { + process_input_ssl, + process_output_ssl, + pn_io_layer_tick_passthru, + buffered_output +}; + +const pn_io_layer_t ssl_input_closed_layer = { + process_input_done, + process_output_ssl, + pn_io_layer_tick_passthru, + buffered_output +}; + +const pn_io_layer_t ssl_output_closed_layer = { + process_input_ssl, + process_output_done, + pn_io_layer_tick_passthru, + buffered_output +}; + +const pn_io_layer_t ssl_closed_layer = { + process_input_done, + process_output_done, + pn_io_layer_tick_passthru, + buffered_output +}; int pn_ssl_init( pn_ssl_t *ssl, pn_ssl_domain_t *domain, const char *session_id) { @@ -678,13 +709,10 @@ int pn_ssl_init( pn_ssl_t *ssl, pn_ssl_domain_t *domain, const char *session_id) ssl->domain = domain; domain->ref_count++; if (domain->allow_unsecured) { - ssl->io_layer->process_input = process_input_unknown; - ssl->io_layer->process_output = process_output_unknown; + ssl->transport->io_layers[PN_IO_SSL] = &unknown_layer; } else { - ssl->io_layer->process_input = process_input_ssl; - ssl->io_layer->process_output = process_output_ssl; + ssl->transport->io_layers[PN_IO_SSL] = &ssl_layer; } - if (session_id && domain->mode == PN_SSL_MODE_CLIENT) ssl->session_id = pn_strdup(session_id); @@ -773,13 +801,7 @@ pn_ssl_t *pn_ssl(pn_transport_t *transport) ssl->transport = transport; transport->ssl = ssl; - ssl->io_layer = &transport->io_layers[PN_IO_SSL]; - ssl->io_layer->context = ssl; - ssl->io_layer->process_input = pn_io_layer_input_passthru; - ssl->io_layer->process_output = pn_io_layer_output_passthru; - ssl->io_layer->process_tick = pn_io_layer_tick_passthru; - ssl->io_layer->buffered_output = buffered_output; - ssl->io_layer->buffered_input = buffered_input; + transport->io_layers[PN_IO_SSL] = &pni_passthru_layer; ssl->trace = (transport->disp) ? transport->disp->trace : PN_TRACE_OFF; @@ -823,11 +845,9 @@ static int start_ssl_shutdown( pn_ssl_t *ssl ) -static int setup_ssl_connection( pn_ssl_t *ssl ) +static int setup_ssl_connection(pn_transport_t *transport, unsigned int layer) { - _log( ssl, "SSL connection detected."); - ssl->io_layer->process_input = process_input_ssl; - ssl->io_layer->process_output = process_output_ssl; + transport->io_layers[layer] = &ssl_layer; return 0; } @@ -836,9 +856,9 @@ static int setup_ssl_connection( pn_ssl_t *ssl ) // take data from the network, and pass it into SSL. Attempt to read decrypted data from // SSL socket and pass it to the application. -static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_data, size_t available) +static ssize_t process_input_ssl( pn_transport_t *transport, unsigned int layer, const char *input_data, size_t available) { - pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context; + pn_ssl_t *ssl = transport->ssl; if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_EOS; _log( ssl, "process_input_ssl( data size=%d )",available ); @@ -910,8 +930,7 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat if (!ssl->app_input_closed) { if (ssl->in_count > 0 || ssl->ssl_closed) { /* if ssl_closed, send 0 count */ - pn_io_layer_t *io_next = ssl->io_layer->next; - ssize_t consumed = io_next->process_input( io_next, ssl->inbuf, ssl->in_count); + ssize_t consumed = transport->io_layers[layer+1]->process_input(transport, layer+1, ssl->inbuf, ssl->in_count); if (consumed > 0) { ssl->in_count -= consumed; if (ssl->in_count) @@ -973,15 +992,19 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat //} if (ssl->app_input_closed && (SSL_get_shutdown(ssl->ssl) & SSL_SENT_SHUTDOWN) ) { consumed = ssl->app_input_closed; - ssl->io_layer->process_input = process_input_done; + if (transport->io_layers[layer]==&ssl_output_closed_layer) { + transport->io_layers[layer] = &ssl_closed_layer; + } else { + transport->io_layers[layer] = &ssl_input_closed_layer; + } } _log(ssl, "process_input_ssl() returning %d", (int) consumed); return consumed; } -static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t max_len) +static ssize_t process_output_ssl( pn_transport_t *transport, unsigned int layer, char *buffer, size_t max_len) { - pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context; + pn_ssl_t *ssl = transport->ssl; if (!ssl) return PN_EOS; if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_EOS; @@ -993,8 +1016,7 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t // first, get any pending application output, if possible if (!ssl->app_output_closed && ssl->out_count < ssl->out_size) { - pn_io_layer_t *io_next = ssl->io_layer->next; - ssize_t app_bytes = io_next->process_output( io_next, &ssl->outbuf[ssl->out_count], ssl->out_size - ssl->out_count); + ssize_t app_bytes = transport->io_layers[layer+1]->process_output(transport, layer+1, &ssl->outbuf[ssl->out_count], ssl->out_size - ssl->out_count); if (app_bytes > 0) { ssl->out_count += app_bytes; work_pending = true; @@ -1086,7 +1108,11 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t //} if (written == 0 && (SSL_get_shutdown(ssl->ssl) & SSL_SENT_SHUTDOWN) && BIO_pending(ssl->bio_net_io) == 0) { written = ssl->app_output_closed ? ssl->app_output_closed : PN_EOS; - ssl->io_layer->process_output = process_output_done; + if (transport->io_layers[layer]==&ssl_input_closed_layer) { + transport->io_layers[layer] = &ssl_closed_layer; + } else { + transport->io_layers[layer] = &ssl_output_closed_layer; + } } _log(ssl, "process_output_ssl() returning %d", (int) written); return written; @@ -1169,33 +1195,34 @@ static void release_ssl_socket( pn_ssl_t *ssl ) } -static int setup_cleartext_connection( pn_ssl_t *ssl ) +static int setup_cleartext_connection(pn_transport_t *transport, unsigned int layer) { - _log( ssl, "Cleartext connection detected."); - ssl->io_layer->process_input = pn_io_layer_input_passthru; - ssl->io_layer->process_output = pn_io_layer_output_passthru; + transport->io_layers[layer] = &pni_passthru_layer; return 0; } // until we determine if the client is using SSL or not: -static ssize_t process_input_unknown(pn_io_layer_t *io_layer, const char *input_data, size_t len) +static ssize_t process_input_unknown(pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len) { - pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context; + pn_ssl_t *ssl = transport->ssl; switch (check_for_ssl_connection( input_data, len )) { case SSL_CONNECTION: - setup_ssl_connection( ssl ); - return ssl->io_layer->process_input( ssl->io_layer, input_data, len ); + _log( ssl, "SSL connection detected.\n"); + setup_ssl_connection(transport, layer); + break; case CLEAR_CONNECTION: - setup_cleartext_connection( ssl ); - return ssl->io_layer->process_input( ssl->io_layer, input_data, len ); + _log( ssl, "Cleartext connection detected.\n"); + setup_cleartext_connection(transport, layer); + break; default: return 0; } + return transport->io_layers[layer]->process_input(transport, layer, input_data, len ); } -static ssize_t process_output_unknown(pn_io_layer_t *io_layer, char *input_data, size_t len) +static ssize_t process_output_unknown(pn_transport_t *transport, unsigned int layer, char *input_data, size_t len) { // do not do output until we know if SSL is used or not return 0; @@ -1307,20 +1334,20 @@ int pn_ssl_get_peer_hostname( pn_ssl_t *ssl, char *hostname, size_t *bufsize ) return 0; } -static ssize_t process_input_done(pn_io_layer_t *io_layer, const char *input_data, size_t len) +static ssize_t process_input_done(pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len) { return PN_EOS; } -static ssize_t process_output_done(pn_io_layer_t *io_layer, char *input_data, size_t len) +static ssize_t process_output_done(pn_transport_t *transport, unsigned int layer, char *input_data, size_t len) { return PN_EOS; } // return # output bytes sitting in this layer -static size_t buffered_output(pn_io_layer_t *io_layer) +static size_t buffered_output(pn_transport_t *transport) { size_t count = 0; - pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context; + pn_ssl_t *ssl = transport->ssl; if (ssl) { count += ssl->out_count; if (ssl->bio_net_io) { // pick up any bytes waiting for network io @@ -1329,17 +1356,3 @@ static size_t buffered_output(pn_io_layer_t *io_layer) } return count; } - -// return # input bytes sitting in this layer -static size_t buffered_input( pn_io_layer_t *io_layer ) -{ - size_t count = 0; - pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context; - if (ssl) { - count += ssl->in_count; - if (ssl->bio_ssl) { // pick up any bytes waiting to be read - count += BIO_ctrl_pending(ssl->bio_ssl); - } - } - return count; -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c814d5c3/proton-c/src/transport/transport.c ---------------------------------------------------------------------- diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c index 601d6a2..d93e16f 100644 --- a/proton-c/src/transport/transport.c +++ b/proton-c/src/transport/transport.c @@ -92,17 +92,55 @@ void pn_delivery_map_clear(pn_delivery_map_t *dm) dm->next = 0; } -static ssize_t pn_input_read_amqp_header(pn_io_layer_t *io_layer, const char *bytes, size_t available); -static ssize_t pn_input_read_amqp(pn_io_layer_t *io_layer, const char *bytes, size_t available); -static ssize_t pn_output_write_amqp_header(pn_io_layer_t *io_layer, char *bytes, size_t available); -static ssize_t pn_output_write_amqp(pn_io_layer_t *io_layer, char *bytes, size_t available); -static pn_timestamp_t pn_tick_amqp(pn_io_layer_t *io_layer, pn_timestamp_t now); +static ssize_t pn_io_layer_input_passthru(pn_transport_t *, unsigned int, const char *, size_t ); +static ssize_t pn_io_layer_output_passthru(pn_transport_t *, unsigned int, char *, size_t ); + +static ssize_t pn_input_read_amqp_header(pn_transport_t *transport, unsigned int layer, const char *bytes, size_t available); +static ssize_t pn_input_read_amqp(pn_transport_t *transport, unsigned int layer, const char *bytes, size_t available); +static ssize_t pn_output_write_amqp_header(pn_transport_t *transport, unsigned int layer, char *bytes, size_t available); +static ssize_t pn_output_write_amqp(pn_transport_t *transport, unsigned int layer, char *bytes, size_t available); +static pn_timestamp_t pn_tick_amqp(pn_transport_t *transport, unsigned int layer, pn_timestamp_t now); static void pni_default_tracer(pn_transport_t *transport, const char *message) { fprintf(stderr, "[%p]:%s\n", (void *) transport, message); } +const pn_io_layer_t pni_passthru_layer = { + pn_io_layer_input_passthru, + pn_io_layer_output_passthru, + pn_io_layer_tick_passthru, + NULL +}; + +const pn_io_layer_t amqp_header_layer = { + pn_input_read_amqp_header, + pn_output_write_amqp_header, + pn_tick_amqp, + NULL +}; + +const pn_io_layer_t amqp_write_header_layer = { + pn_input_read_amqp, + pn_output_write_amqp_header, + pn_tick_amqp, + NULL +}; + +const pn_io_layer_t amqp_read_header_layer = { + pn_input_read_amqp_header, + pn_output_write_amqp, + pn_tick_amqp, + NULL +}; + +const pn_io_layer_t amqp_layer = { + pn_input_read_amqp, + pn_output_write_amqp, + pn_tick_amqp, + NULL +}; + static void pn_transport_initialize(void *object) { pn_transport_t *transport = (pn_transport_t *)object; @@ -112,33 +150,16 @@ static void pn_transport_initialize(void *object) transport->input_buf = NULL; transport->input_size = PN_DEFAULT_MAX_FRAME_SIZE ? PN_DEFAULT_MAX_FRAME_SIZE : 16 * 1024; transport->tracer = pni_default_tracer; - transport->header_count = 0; transport->sasl = NULL; transport->ssl = NULL; transport->scratch = pn_string(NULL); transport->disp = pn_dispatcher(0, transport); transport->connection = NULL; - pn_io_layer_t *io_layer = transport->io_layers; - while (io_layer != &transport->io_layers[PN_IO_AMQP]) { - io_layer->context = NULL; - io_layer->next = io_layer + 1; - io_layer->process_input = pn_io_layer_input_passthru; - io_layer->process_output = pn_io_layer_output_passthru; - io_layer->process_tick = pn_io_layer_tick_passthru; - io_layer->buffered_output = NULL; - io_layer->buffered_input = NULL; - ++io_layer; - } - - pn_io_layer_t *amqp = &transport->io_layers[PN_IO_AMQP]; - amqp->context = transport; - amqp->process_input = pn_input_read_amqp_header; - amqp->process_output = pn_output_write_amqp_header; - amqp->process_tick = pn_io_layer_tick_passthru; - amqp->buffered_output = NULL; - amqp->buffered_input = NULL; - amqp->next = NULL; + for (int layer=0; layer<PN_IO_LAYER_CT; ++layer) { + transport->io_layers[layer] = &pni_passthru_layer; + } + transport->io_layers[PN_IO_AMQP] = &amqp_header_layer; transport->open_sent = false; transport->open_rcvd = false; @@ -550,8 +571,6 @@ int pn_do_open(pn_dispatcher_t *disp) } else { transport->disp->halt = true; } - if (transport->remote_idle_timeout) - transport->io_layers[PN_IO_AMQP].process_tick = pn_tick_amqp; // enable timeouts transport->open_rcvd = true; return 0; } @@ -1072,14 +1091,14 @@ ssize_t pn_transport_input(pn_transport_t *transport, const char *bytes, size_t // process pending input until none remaining or EOS static ssize_t transport_consume(pn_transport_t *transport) { - pn_io_layer_t *io_layer = transport->io_layers; size_t consumed = 0; while (transport->input_pending || transport->tail_closed) { ssize_t n; - n = io_layer->process_input( io_layer, - transport->input_buf + consumed, - transport->input_pending ); + n = transport->io_layers[PN_IO_SSL]-> + process_input( transport, PN_IO_SSL, + transport->input_buf + consumed, + transport->input_pending ); if (n > 0) { consumed += n; transport->input_pending -= n; @@ -1101,44 +1120,34 @@ static ssize_t transport_consume(pn_transport_t *transport) return consumed; } -static ssize_t pn_input_read_header(pn_transport_t *transport, const char *bytes, size_t available, - const char *header, size_t size, const char *protocol, - ssize_t (*next)(pn_io_layer_t *, const char *, size_t)) +#define AMQP_HEADER ("AMQP\x00\x01\x00\x00") + +static ssize_t pn_input_read_amqp_header(pn_transport_t* transport, unsigned int layer, const char* bytes, size_t available) { - const char *point = header + transport->header_count; - int delta = pn_min(available, size - transport->header_count); - if (!available || memcmp(bytes, point, delta)) { + unsigned readable = pn_min(8, available); + bool eos = pn_transport_capacity(transport)==PN_EOS; + if (memcmp(bytes, AMQP_HEADER, readable) || (readable<8 && eos) ) { char quoted[1024]; pn_quote_data(quoted, 1024, bytes, available); pn_do_error(transport, "amqp:connection:framing-error", - "%s header mismatch: '%s'%s", protocol, quoted, - available ? "" : " (connection aborted)"); + "%s header mismatch: '%s'%s", "AMQP", quoted, + !eos ? "" : " (connection aborted)"); return PN_EOS; - } else { - transport->header_count += delta; - if (transport->header_count == size) { - transport->header_count = 0; - transport->io_layers[PN_IO_AMQP].process_input = next; - - if (transport->disp->trace & PN_TRACE_FRM) - pn_transport_logf(transport, " <- %s", protocol); + } else if (readable==8) { + if (transport->io_layers[layer] == &amqp_read_header_layer) { + transport->io_layers[layer] = &amqp_layer; + } else { + transport->io_layers[layer] = &amqp_write_header_layer; } - return delta; + if (transport->disp->trace & PN_TRACE_FRM) + pn_transport_logf(transport, " <- %s", "AMQP"); + return 8; } + return 0; } -#define AMQP_HEADER ("AMQP\x00\x01\x00\x00") - -static ssize_t pn_input_read_amqp_header(pn_io_layer_t *io_layer, const char *bytes, size_t available) -{ - pn_transport_t *transport = (pn_transport_t *)io_layer->context; - return pn_input_read_header(transport, bytes, available, AMQP_HEADER, 8, - "AMQP", pn_input_read_amqp); -} - -static ssize_t pn_input_read_amqp(pn_io_layer_t *io_layer, const char *bytes, size_t available) +static ssize_t pn_input_read_amqp(pn_transport_t* transport, unsigned int layer, const char* bytes, size_t available) { - pn_transport_t *transport = (pn_transport_t *)io_layer->context; if (transport->close_rcvd) { if (available > 0) { pn_do_error(transport, "amqp:connection:framing-error", "data after close"); @@ -1164,10 +1173,9 @@ static ssize_t pn_input_read_amqp(pn_io_layer_t *io_layer, const char *bytes, si } /* process AMQP related timer events */ -static pn_timestamp_t pn_tick_amqp(pn_io_layer_t *io_layer, pn_timestamp_t now) +static pn_timestamp_t pn_tick_amqp(pn_transport_t* transport, unsigned int layer, pn_timestamp_t now) { pn_timestamp_t timeout = 0; - pn_transport_t *transport = (pn_transport_t *)io_layer->context; if (transport->local_idle_timeout) { if (transport->dead_remote_deadline == 0 || @@ -1827,30 +1835,22 @@ int pn_process(pn_transport_t *transport) return 0; } -static ssize_t pn_output_write_header(pn_transport_t *transport, - char *bytes, size_t size, - const char *header, size_t hdrsize, - const char *protocol, - ssize_t (*next)(pn_io_layer_t *, char *, size_t)) +static ssize_t pn_output_write_amqp_header(pn_transport_t* transport, unsigned int layer, char* bytes, size_t available) { if (transport->disp->trace & PN_TRACE_FRM) - pn_transport_logf(transport, " -> %s", protocol); - assert(size >= hdrsize); - memmove(bytes, header, hdrsize); - transport->io_layers[PN_IO_AMQP].process_output = next; - return hdrsize; -} - -static ssize_t pn_output_write_amqp_header(pn_io_layer_t *io_layer, char *bytes, size_t size) -{ - pn_transport_t *transport = (pn_transport_t *)io_layer->context; - return pn_output_write_header(transport, bytes, size, AMQP_HEADER, 8, "AMQP", - pn_output_write_amqp); + pn_transport_logf(transport, " -> %s", "AMQP"); + assert(available >= 8); + memmove(bytes, AMQP_HEADER, 8); + if (transport->io_layers[layer] == &amqp_write_header_layer) { + transport->io_layers[layer] = &amqp_layer; + } else { + transport->io_layers[layer] = &amqp_read_header_layer; + } + return 8; } -static ssize_t pn_output_write_amqp(pn_io_layer_t *io_layer, char *bytes, size_t size) +static ssize_t pn_output_write_amqp(pn_transport_t* transport, unsigned int layer, char* bytes, size_t available) { - pn_transport_t *transport = (pn_transport_t *)io_layer->context; if (transport->connection && !transport->done_processing) { int err = pn_process(transport); if (err) { @@ -1866,7 +1866,7 @@ static ssize_t pn_output_write_amqp(pn_io_layer_t *io_layer, char *bytes, size_t return PN_EOS; } - return pn_dispatcher_output(transport->disp, bytes, size); + return pn_dispatcher_output(transport->disp, bytes, available); } static void pni_close_head(pn_transport_t *transport) @@ -1884,7 +1884,6 @@ static ssize_t transport_produce(pn_transport_t *transport) { if (transport->head_closed) return PN_EOS; - pn_io_layer_t *io_layer = transport->io_layers; ssize_t space = transport->output_size - transport->output_pending; if (space <= 0) { // can we expand the buffer? @@ -1905,9 +1904,10 @@ static ssize_t transport_produce(pn_transport_t *transport) while (space > 0) { ssize_t n; - n = io_layer->process_output( io_layer, - &transport->output_buf[transport->output_pending], - space ); + n = transport->io_layers[PN_IO_SSL]-> + process_output( transport, PN_IO_SSL, + &transport->output_buf[transport->output_pending], + space ); if (n > 0) { space -= n; transport->output_pending += n; @@ -2043,7 +2043,6 @@ pn_millis_t pn_transport_get_idle_timeout(pn_transport_t *transport) void pn_transport_set_idle_timeout(pn_transport_t *transport, pn_millis_t timeout) { transport->local_idle_timeout = timeout; - transport->io_layers[PN_IO_AMQP].process_tick = pn_tick_amqp; } pn_millis_t pn_transport_get_remote_idle_timeout(pn_transport_t *transport) @@ -2053,8 +2052,7 @@ pn_millis_t pn_transport_get_remote_idle_timeout(pn_transport_t *transport) pn_timestamp_t pn_transport_tick(pn_transport_t *transport, pn_timestamp_t now) { - pn_io_layer_t *io_layer = transport->io_layers; - return io_layer->process_tick( io_layer, now ); + return transport->io_layers[PN_IO_SSL]->process_tick(transport, PN_IO_SSL, now); } uint64_t pn_transport_get_frames_output(const pn_transport_t *transport) @@ -2072,29 +2070,26 @@ uint64_t pn_transport_get_frames_input(const pn_transport_t *transport) } /** Pass through input handler */ -ssize_t pn_io_layer_input_passthru(pn_io_layer_t *io_layer, const char *data, size_t available) +ssize_t pn_io_layer_input_passthru(pn_transport_t *transport, unsigned int layer, const char *data, size_t available) { - pn_io_layer_t *next = io_layer->next; - if (next) - return next->process_input( next, data, available ); + if (layer+1<PN_IO_LAYER_CT) + return transport->io_layers[layer+1]->process_input(transport, layer+1, data, available); return PN_EOS; } /** Pass through output handler */ -ssize_t pn_io_layer_output_passthru(pn_io_layer_t *io_layer, char *bytes, size_t size) +ssize_t pn_io_layer_output_passthru(pn_transport_t *transport, unsigned int layer, char *data, size_t available) { - pn_io_layer_t *next = io_layer->next; - if (next) - return next->process_output( next, bytes, size ); + if (layer+1<PN_IO_LAYER_CT) + return transport->io_layers[layer+1]->process_output(transport, layer+1, data, available); return PN_EOS; } /** Pass through tick handler */ -pn_timestamp_t pn_io_layer_tick_passthru(pn_io_layer_t *io_layer, pn_timestamp_t now) +pn_timestamp_t pn_io_layer_tick_passthru(pn_transport_t *transport, unsigned int layer, pn_timestamp_t now) { - pn_io_layer_t *next = io_layer->next; - if (next) - return next->process_tick( next, now ); + if (layer+1<PN_IO_LAYER_CT) + return transport->io_layers[layer+1]->process_tick(transport, layer+1, now); return 0; } @@ -2253,11 +2248,10 @@ bool pn_transport_quiesced(pn_transport_t *transport) if (pending < 0) return true; // output done else if (pending > 0) return false; // no pending at transport, but check if data is buffered in I/O layers - pn_io_layer_t *io_layer = transport->io_layers; - while (io_layer != &transport->io_layers[PN_IO_LAYER_CT]) { - if (io_layer->buffered_output && io_layer->buffered_output( io_layer )) + for (int layer = 0; layer<PN_IO_LAYER_CT; ++layer) { + if (transport->io_layers[layer]->buffered_output && + transport->io_layers[layer]->buffered_output( transport )) return false; - ++io_layer; } return true; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c814d5c3/proton-c/src/windows/schannel.c ---------------------------------------------------------------------- diff --git a/proton-c/src/windows/schannel.c b/proton-c/src/windows/schannel.c index de6e117..397fa21 100644 --- a/proton-c/src/windows/schannel.c +++ b/proton-c/src/windows/schannel.c @@ -82,7 +82,6 @@ typedef enum { CREATED, CLIENT_HELLO, NEGOTIATING, struct pn_ssl_t { pn_transport_t *transport; - pn_io_layer_t *io_layer; pn_ssl_domain_t *domain; const char *session_id; const char *peer_hostname; @@ -136,17 +135,16 @@ struct pn_ssl_session_t { }; -static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_data, size_t len); -static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *input_data, size_t len); -static ssize_t process_input_unknown( pn_io_layer_t *io_layer, const char *input_data, size_t len); -static ssize_t process_output_unknown( pn_io_layer_t *io_layer, char *input_data, size_t len); -static ssize_t process_input_done(pn_io_layer_t *io_layer, const char *input_data, size_t len); -static ssize_t process_output_done(pn_io_layer_t *io_layer, char *input_data, size_t len); +static ssize_t process_input_ssl( pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len); +static ssize_t process_output_ssl( pn_transport_t *transport, unsigned int layer, char *input_data, size_t len); +static ssize_t process_input_unknown( pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len); +static ssize_t process_output_unknown( pn_transport_t *transport, unsigned int layer, char *input_data, size_t len); +static ssize_t process_input_done(pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len); +static ssize_t process_output_done(pn_transport_t *transport, unsigned int layer, char *input_data, size_t len); static connection_mode_t check_for_ssl_connection( const char *data, size_t len ); static pn_ssl_session_t *ssn_cache_find( pn_ssl_domain_t *, const char * ); static void ssl_session_free( pn_ssl_session_t *); -static size_t buffered_output( pn_io_layer_t *io_layer ); -static size_t buffered_input( pn_io_layer_t *io_layer ); +static size_t buffered_output( pn_transport_t *transport ); static void start_ssl_shutdown(pn_ssl_t *ssl); static void rewind_sc_inbuf(pn_ssl_t *ssl); static bool grow_inbuf2(pn_ssl_t *ssl, size_t minimum_size); @@ -350,6 +348,41 @@ int pn_ssl_domain_set_peer_authentication(pn_ssl_domain_t *domain, return 0; } +const pn_io_layer_t unknown_layer = { + process_input_unknown, + process_output_unknown, + pn_io_layer_tick_passthru, + NULL +}; + +const pn_io_layer_t ssl_layer = { + process_input_ssl, + process_output_ssl, + pn_io_layer_tick_passthru, + buffered_output +}; + +const pn_io_layer_t ssl_input_closed_layer = { + process_input_done, + process_output_ssl, + pn_io_layer_tick_passthru, + buffered_output +}; + +const pn_io_layer_t ssl_output_closed_layer = { + process_input_ssl, + process_output_done, + pn_io_layer_tick_passthru, + buffered_output +}; + +const pn_io_layer_t ssl_closed_layer = { + process_input_done, + process_output_done, + pn_io_layer_tick_passthru, + buffered_output +}; + int pn_ssl_init(pn_ssl_t *ssl, pn_ssl_domain_t *domain, const char *session_id) { if (!ssl || !domain || ssl->domain) return -1; @@ -358,13 +391,11 @@ int pn_ssl_init(pn_ssl_t *ssl, pn_ssl_domain_t *domain, const char *session_id) ssl->domain = domain; domain->ref_count++; if (domain->allow_unsecured) { - ssl->io_layer->process_input = process_input_unknown; - ssl->io_layer->process_output = process_output_unknown; - } else { - ssl->io_layer->process_input = process_input_ssl; - ssl->io_layer->process_output = process_output_ssl; + ssl->transport->io_layers[PN_IO_SSL] = &unknown_layer; + } + else { + ssl->transport->io_layers[PN_IO_SSL] = &ssl_layer; } - if (session_id && domain->mode == PN_SSL_MODE_CLIENT) ssl->session_id = pn_strdup(session_id); @@ -460,13 +491,7 @@ pn_ssl_t *pn_ssl(pn_transport_t *transport) ssl->transport = transport; transport->ssl = ssl; - ssl->io_layer = &transport->io_layers[PN_IO_SSL]; - ssl->io_layer->context = ssl; - ssl->io_layer->process_input = pn_io_layer_input_passthru; - ssl->io_layer->process_output = pn_io_layer_output_passthru; - ssl->io_layer->process_tick = pn_io_layer_tick_passthru; - ssl->io_layer->buffered_output = buffered_output; - ssl->io_layer->buffered_input = buffered_input; + transport->io_layers[PN_IO_SSL] = &pni_passthru_layer; ssl->trace = (transport->disp) ? transport->disp->trace : PN_TRACE_OFF; SecInvalidateHandle(&ssl->cred_handle); @@ -849,11 +874,9 @@ static void start_ssl_shutdown(pn_ssl_t *ssl) ssl_handshake(ssl); } -static int setup_ssl_connection(pn_ssl_t *ssl) +static int setup_ssl_connection(pn_transport_t *transport, unsigned int layer) { - ssl_log( ssl, "SSL connection detected.\n"); - ssl->io_layer->process_input = process_input_ssl; - ssl->io_layer->process_output = process_output_ssl; + transport->io_layers[layer] = &ssl_layer; return 0; } @@ -976,14 +999,14 @@ static void app_inbytes_advance(pn_ssl_t *ssl, size_t consumed) app_inbytes_progress(ssl, 0); } -static void read_closed(pn_ssl_t *ssl, ssize_t error) +static void read_closed(pn_transport_t *transport, unsigned int layer, ssize_t error) { + pn_ssl_t *ssl = transport->ssl; if (ssl->app_input_closed) return; if (ssl->state == RUNNING && !error) { - pn_io_layer_t *io_next = ssl->io_layer->next; // Signal end of stream - ssl->app_input_closed = io_next->process_input(io_next, ssl->app_inbytes.start, 0); + ssl->app_input_closed = transport->io_layers[layer+1]->process_input(transport, layer+1, ssl->app_inbytes.start, 0); } if (!ssl->app_input_closed) ssl->app_input_closed = error ? error : PN_ERR; @@ -1000,9 +1023,9 @@ static void read_closed(pn_ssl_t *ssl, ssize_t error) // Read up to "available" bytes from the network, decrypt it and pass plaintext to application. -static ssize_t process_input_ssl(pn_io_layer_t *io_layer, const char *input_data, size_t available) +static ssize_t process_input_ssl(pn_transport_t *transport, unsigned int layer, const char *input_data, size_t available) { - pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context; + pn_ssl_t *ssl = transport->ssl; ssl_log( ssl, "process_input_ssl( data size=%d )\n",available ); ssize_t consumed = 0; ssize_t forwarded = 0; @@ -1010,14 +1033,14 @@ static ssize_t process_input_ssl(pn_io_layer_t *io_layer, const char *input_data if (available == 0) { // No more inbound network data - read_closed(ssl,0); + read_closed(transport, layer, 0); return 0; } do { if (ssl->sc_input_shutdown) { // TLS protocol shutdown detected on input - read_closed(ssl,0); + read_closed(transport, layer, 0); return consumed; } @@ -1097,8 +1120,7 @@ static ssize_t process_input_ssl(pn_io_layer_t *io_layer, const char *input_data // present app_inbytes to io_next only if it has new content while (ssl->app_inbytes.size > 0) { if (!ssl->app_input_closed) { - pn_io_layer_t *io_next = ssl->io_layer->next; - ssize_t count = io_next->process_input(io_next, ssl->app_inbytes.start, ssl->app_inbytes.size); + ssize_t count = transport->io_layers[layer+1]->process_input(transport, layer+1, ssl->app_inbytes.start, ssl->app_inbytes.size); if (count > 0) { forwarded += count; // advance() can increase app_inbytes.size if double buffered @@ -1115,7 +1137,7 @@ static ssize_t process_input_ssl(pn_io_layer_t *io_layer, const char *input_data ssl_log(ssl, "Application layer closed its input, error=%d (discarding %d bytes)\n", (int) count, (int)ssl->app_inbytes.size); app_inbytes_advance(ssl, ssl->app_inbytes.size); // discard - read_closed(ssl, count); + read_closed(transport, layer, count); } } else { ssl_log(ssl, "Input closed discard %d bytes\n", @@ -1128,15 +1150,19 @@ static ssize_t process_input_ssl(pn_io_layer_t *io_layer, const char *input_data if (ssl->app_input_closed && ssl->state >= SHUTTING_DOWN) { consumed = ssl->app_input_closed; - ssl->io_layer->process_input = process_input_done; + if (transport->io_layers[layer]==&ssl_output_closed_layer) { + transport->io_layers[layer] = &ssl_closed_layer; + } else { + transport->io_layers[layer] = &ssl_input_closed_layer; + } } ssl_log(ssl, "process_input_ssl() returning %d, forwarded %d\n", (int) consumed, (int) forwarded); return consumed; } -static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t max_len) +static ssize_t process_output_ssl( pn_transport_t *transport, unsigned int layer, char *buffer, size_t max_len) { - pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context; + pn_ssl_t *ssl = transport->ssl; if (!ssl) return PN_EOS; ssl_log( ssl, "process_output_ssl( max_len=%d )\n",max_len ); @@ -1173,8 +1199,7 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t size_t remaining = ssl->max_data_size; ssize_t app_bytes; do { - pn_io_layer_t *io_next = ssl->io_layer->next; - app_bytes = io_next->process_output(io_next, app_outp, remaining); + app_bytes = transport->io_layers[layer+1]->process_output(transport, layer+1, app_outp, remaining); if (app_bytes > 0) { app_outp += app_bytes; remaining -= app_bytes; @@ -1212,40 +1237,45 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t if (written == 0 && ssl->state == SSL_CLOSED) { written = ssl->app_output_closed ? ssl->app_output_closed : PN_EOS; - ssl->io_layer->process_output = process_output_done; + if (transport->io_layers[layer]==&ssl_input_closed_layer) { + transport->io_layers[layer] = &ssl_closed_layer; + } else { + transport->io_layers[layer] = &ssl_output_closed_layer; + } } ssl_log(ssl, "process_output_ssl() returning %d\n", (int) written); return written; } -static int setup_cleartext_connection( pn_ssl_t *ssl ) +static int setup_cleartext_connection(pn_transport_t *transport, unsigned int layer) { - ssl_log( ssl, "Cleartext connection detected.\n"); - ssl->io_layer->process_input = pn_io_layer_input_passthru; - ssl->io_layer->process_output = pn_io_layer_output_passthru; + transport->io_layers[layer] = &pni_passthru_layer; return 0; } // until we determine if the client is using SSL or not: -static ssize_t process_input_unknown(pn_io_layer_t *io_layer, const char *input_data, size_t len) +static ssize_t process_input_unknown(pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len) { - pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context; + pn_ssl_t *ssl = transport->ssl; switch (check_for_ssl_connection( input_data, len )) { case SSL_CONNECTION: - setup_ssl_connection( ssl ); - return ssl->io_layer->process_input( ssl->io_layer, input_data, len ); + ssl_log(ssl, "SSL connection detected.\n"); + setup_ssl_connection(transport, layer); + break; case CLEAR_CONNECTION: - setup_cleartext_connection( ssl ); - return ssl->io_layer->process_input( ssl->io_layer, input_data, len ); + ssl_log(ssl, "Cleartext connection detected.\n"); + setup_cleartext_connection(transport, layer); + break; default: return 0; } + return transport->io_layers[layer]->process_input(transport, layer, input_data, len); } -static ssize_t process_output_unknown(pn_io_layer_t *io_layer, char *input_data, size_t len) +static ssize_t process_output_unknown(pn_transport_t *transport, unsigned int layer, char *input_data, size_t len) { // do not do output until we know if SSL is used or not return 0; @@ -1304,21 +1334,21 @@ static connection_mode_t check_for_ssl_connection( const char *data, size_t len return UNKNOWN_CONNECTION; } -static ssize_t process_input_done(pn_io_layer_t *io_layer, const char *input_data, size_t len) +static ssize_t process_input_done(pn_transport_t *transport, unsigned int layer, const char *input_data, size_t len) { return PN_EOS; } -static ssize_t process_output_done(pn_io_layer_t *io_layer, char *input_data, size_t len) +static ssize_t process_output_done(pn_transport_t *transport, unsigned int layer, char *input_data, size_t len) { return PN_EOS; } // return # output bytes sitting in this layer -static size_t buffered_output(pn_io_layer_t *io_layer) +static size_t buffered_output(pn_transport_t *transport) { size_t count = 0; - pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context; + pn_ssl_t *ssl = transport->ssl; if (ssl) { count += ssl->network_out_pending; if (count == 0 && ssl->state == SHUTTING_DOWN && ssl->queued_shutdown) @@ -1326,14 +1356,3 @@ static size_t buffered_output(pn_io_layer_t *io_layer) } return count; } - -// return # input bytes sitting in this layer -static size_t buffered_input( pn_io_layer_t *io_layer ) -{ - size_t count = 0; - pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context; - if (ssl) { - count += ssl->in_data_count; - } - return count; -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
