PROTON-740: fixed shutdown and event related issues with idle timeout during sasl
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a5d65452 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a5d65452 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a5d65452 Branch: refs/heads/examples Commit: a5d654521d47498355089cf93281028876244b3e Parents: 8a042a2 Author: Rafael Schloming <[email protected]> Authored: Sat Nov 8 08:22:39 2014 -0500 Committer: Rafael Schloming <[email protected]> Committed: Sat Nov 8 08:23:14 2014 -0500 ---------------------------------------------------------------------- proton-c/include/proton/transport.h | 13 +++ proton-c/src/engine/engine-internal.h | 6 +- proton-c/src/sasl/sasl.c | 9 +- proton-c/src/ssl/openssl.c | 133 ++++++++++++++--------------- proton-c/src/transport/transport.c | 94 +++++++++++--------- proton-c/src/windows/schannel.c | 1 - tests/python/proton_tests/engine.py | 27 ++++++ 7 files changed, 170 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5d65452/proton-c/include/proton/transport.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/transport.h b/proton-c/include/proton/transport.h index 33bb3a5..2262e7c 100644 --- a/proton-c/include/proton/transport.h +++ b/proton-c/include/proton/transport.h @@ -209,6 +209,19 @@ PN_EXTERN void pn_transport_log(pn_transport_t *transport, const char *message); * * @param[in] transport a transport object * @param[in] fmt the printf formatted message to be logged + * @param[in] ap a vector containing the format arguments + */ +PN_EXTERN void pn_transport_vlogf(pn_transport_t *transport, const char *fmt, va_list ap); + +/** + * Log a printf formatted message using a transport's logging + * mechanism. + * + * This can be useful in a debugging context as the log message will + * be prefixed with the transport's identifier. + * + * @param[in] transport a transport object + * @param[in] fmt the printf formatted message to be logged */ PN_EXTERN void pn_transport_logf(pn_transport_t *transport, const char *fmt, ...); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5d65452/proton-c/src/engine/engine-internal.h ---------------------------------------------------------------------- diff --git a/proton-c/src/engine/engine-internal.h b/proton-c/src/engine/engine-internal.h index 97f7ead..dd4c44e 100644 --- a/proton-c/src/engine/engine-internal.h +++ b/proton-c/src/engine/engine-internal.h @@ -176,8 +176,7 @@ struct pn_transport_t { bool tail_closed; // input stream closed by driver bool head_closed; bool done_processing; // if true, don't call pn_process again - bool posted_head_closed; - bool posted_tail_closed; + bool posted_idle_timeout; }; struct pn_connection_t { @@ -319,7 +318,4 @@ int pn_do_error(pn_transport_t *transport, const char *condition, const char *fm void pn_session_unbound(pn_session_t* ssn); void pn_link_unbound(pn_link_t* link); -void pni_close_tail(pn_transport_t *transport); - - #endif /* engine-internal.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5d65452/proton-c/src/sasl/sasl.c ---------------------------------------------------------------------- diff --git a/proton-c/src/sasl/sasl.c b/proton-c/src/sasl/sasl.c index f926b1b..2cc77c2 100644 --- a/proton-c/src/sasl/sasl.c +++ b/proton-c/src/sasl/sasl.c @@ -467,7 +467,14 @@ static ssize_t pn_output_write_sasl_header(pn_io_layer_t *io_layer, char *bytes, static ssize_t pn_output_write_sasl(pn_io_layer_t *io_layer, char *bytes, size_t size) { pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context; - ssize_t n = pn_sasl_output(sasl, bytes, size); + // this accounts for pn_do_error is invoked, e.g. by idle timeout + ssize_t n; + if (sasl->transport->close_sent) { + n = PN_EOS; + } else { + n = pn_sasl_output(sasl, bytes, size); + } + if (n == PN_EOS) { sasl->io_layer->process_output = pn_io_layer_output_passthru; pn_io_layer_t *io_next = sasl->io_layer->next; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5d65452/proton-c/src/ssl/openssl.c ---------------------------------------------------------------------- diff --git a/proton-c/src/ssl/openssl.c b/proton-c/src/ssl/openssl.c index c9536e2..ea2bd5b 100644 --- a/proton-c/src/ssl/openssl.c +++ b/proton-c/src/ssl/openssl.c @@ -149,11 +149,11 @@ static size_t buffered_output( pn_io_layer_t *io_layer ); static size_t buffered_input( pn_io_layer_t *io_layer ); // @todo: used to avoid littering the code with calls to printf... -static void _log_error(const char *fmt, ...) +static void _log_error(pn_ssl_t *ssl, const char *fmt, ...) { va_list ap; va_start(ap, fmt); - vfprintf(stderr, fmt, ap); + pn_transport_vlogf(ssl ? ssl->transport : NULL, fmt, ap); va_end(ap); } @@ -163,27 +163,27 @@ static void _log(pn_ssl_t *ssl, const char *fmt, ...) if (PN_TRACE_DRV & ssl->trace) { va_list ap; va_start(ap, fmt); - vfprintf(stderr, fmt, ap); + pn_transport_vlogf(ssl->transport, fmt, ap); va_end(ap); } } // log an error and dump the SSL error stack -static void _log_ssl_error( const char *fmt, ...) +static void _log_ssl_error(pn_ssl_t *ssl, const char *fmt, ...) { char buf[128]; // see "man ERR_error_string_n()" va_list ap; if (fmt) { va_start(ap, fmt); - vfprintf(stderr, fmt, ap); + pn_transport_vlogf(ssl ? ssl->transport : NULL, fmt, ap); va_end(ap); } unsigned long err = ERR_get_error(); while (err) { ERR_error_string_n(err, buf, sizeof(buf)); - _log_error("%s\n", buf); + _log_error(ssl, "%s", buf); err = ERR_get_error(); } } @@ -211,8 +211,7 @@ static int ssl_failed(pn_ssl_t *ssl) if (ssl_err) { ERR_error_string_n( ssl_err, buf, sizeof(buf) ); } - _log_ssl_error(NULL); // spit out any remaining errors to the log file - pni_close_tail(ssl->transport); + _log_ssl_error(ssl, NULL); // spit out any remaining errors to the log file pn_do_error(ssl->transport, "amqp:connection:framing-error", "SSL Failure: %s", buf); return PN_EOS; } @@ -284,23 +283,23 @@ static int verify_callback(int preverify_ok, X509_STORE_CTX *ctx) X509 *cert = X509_STORE_CTX_get_current_cert(ctx); SSL *ssn = (SSL *) X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx()); if (!ssn) { - _log_error("Error: unexpected error - SSL session info not available for peer verify!\n"); + _log_error(NULL, "Error: unexpected error - SSL session info not available for peer verify!"); return 0; // fail connection } pn_ssl_t *ssl = (pn_ssl_t *)SSL_get_ex_data(ssn, ssl_ex_data_index); if (!ssl) { - _log_error("Error: unexpected error - SSL context info not available for peer verify!\n"); + _log_error(NULL, "Error: unexpected error - SSL context info not available for peer verify!"); return 0; // fail connection } if (ssl->domain->verify_mode != PN_SSL_VERIFY_PEER_NAME) return preverify_ok; if (!ssl->peer_hostname) { - _log_error("Error: configuration error: PN_SSL_VERIFY_PEER_NAME configured, but no peer hostname set!\n"); + _log_error(ssl, "Error: configuration error: PN_SSL_VERIFY_PEER_NAME configured, but no peer hostname set!"); return 0; // fail connection } - _log( ssl, "Checking identifying name in peer cert against '%s'\n", ssl->peer_hostname); + _log( ssl, "Checking identifying name in peer cert against '%s'", ssl->peer_hostname); bool matched = false; @@ -317,7 +316,7 @@ static int verify_callback(int preverify_ok, X509_STORE_CTX *ctx) unsigned char *str; int len = ASN1_STRING_to_UTF8( &str, asn1 ); if (len >= 0) { - _log( ssl, "SubjectAltName (dns) from peer cert = '%.*s'\n", len, str ); + _log( ssl, "SubjectAltName (dns) from peer cert = '%.*s'", len, str ); matched = match_dns_pattern( ssl->peer_hostname, (const char *)str, len ); OPENSSL_free( str ); } @@ -337,7 +336,7 @@ static int verify_callback(int preverify_ok, X509_STORE_CTX *ctx) unsigned char *str; int len = ASN1_STRING_to_UTF8( &str, name_asn1); if (len >= 0) { - _log( ssl, "commonName from peer cert = '%.*s'\n", len, str ); + _log( ssl, "commonName from peer cert = '%.*s'", len, str ); matched = match_dns_pattern( ssl->peer_hostname, (const char *)str, len ); OPENSSL_free(str); } @@ -345,14 +344,14 @@ static int verify_callback(int preverify_ok, X509_STORE_CTX *ctx) } if (!matched) { - _log( ssl, "Error: no name matching %s found in peer cert - rejecting handshake.\n", + _log( ssl, "Error: no name matching %s found in peer cert - rejecting handshake.", ssl->peer_hostname); preverify_ok = 0; #ifdef X509_V_ERR_APPLICATION_VERIFICATION X509_STORE_CTX_set_error( ctx, X509_V_ERR_APPLICATION_VERIFICATION ); #endif } else { - _log( ssl, "Name from peer cert matched - peer is valid.\n" ); + _log( ssl, "Name from peer cert matched - peer is valid." ); } return preverify_ok; } @@ -459,7 +458,7 @@ pn_ssl_domain_t *pn_ssl_domain( pn_ssl_mode_t mode ) case PN_SSL_MODE_CLIENT: domain->ctx = SSL_CTX_new(SSLv23_client_method()); // and TLSv1+ if (!domain->ctx) { - _log_ssl_error( "Unable to initialize OpenSSL context.\n"); + _log_ssl_error(NULL, "Unable to initialize OpenSSL context."); free(domain); return NULL; } @@ -468,14 +467,14 @@ pn_ssl_domain_t *pn_ssl_domain( pn_ssl_mode_t mode ) case PN_SSL_MODE_SERVER: domain->ctx = SSL_CTX_new(SSLv23_server_method()); // and TLSv1+ if (!domain->ctx) { - _log_ssl_error("Unable to initialize OpenSSL context.\n"); + _log_ssl_error(NULL, "Unable to initialize OpenSSL context."); free(domain); return NULL; } break; default: - _log_error("Invalid value for pn_ssl_mode_t: %d\n", mode); + _log_error(NULL, "Invalid value for pn_ssl_mode_t: %d", mode); free(domain); return NULL; } @@ -488,7 +487,7 @@ pn_ssl_domain_t *pn_ssl_domain( pn_ssl_mode_t mode ) // by default, allow anonymous ciphers so certificates are not required 'out of the box' if (!SSL_CTX_set_cipher_list( domain->ctx, CIPHERS_ANONYMOUS )) { - _log_ssl_error("Failed to set cipher list to %s\n", CIPHERS_ANONYMOUS); + _log_ssl_error(NULL, "Failed to set cipher list to %s", CIPHERS_ANONYMOUS); pn_ssl_domain_free(domain); return NULL; } @@ -537,7 +536,7 @@ int pn_ssl_domain_set_credentials( pn_ssl_domain_t *domain, if (!domain || !domain->ctx) return -1; if (SSL_CTX_use_certificate_chain_file(domain->ctx, certificate_file) != 1) { - _log_ssl_error( "SSL_CTX_use_certificate_chain_file( %s ) failed\n", certificate_file); + _log_ssl_error(NULL, "SSL_CTX_use_certificate_chain_file( %s ) failed", certificate_file); return -3; } @@ -548,12 +547,12 @@ int pn_ssl_domain_set_credentials( pn_ssl_domain_t *domain, } if (SSL_CTX_use_PrivateKey_file(domain->ctx, private_key_file, SSL_FILETYPE_PEM) != 1) { - _log_ssl_error( "SSL_CTX_use_PrivateKey_file( %s ) failed\n", private_key_file); + _log_ssl_error(NULL, "SSL_CTX_use_PrivateKey_file( %s ) failed", private_key_file); return -4; } if (SSL_CTX_check_private_key(domain->ctx) != 1) { - _log_ssl_error( "The key file %s is not consistent with the certificate %s\n", + _log_ssl_error(NULL, "The key file %s is not consistent with the certificate %s", private_key_file, certificate_file); return -5; } @@ -564,7 +563,7 @@ int pn_ssl_domain_set_credentials( pn_ssl_domain_t *domain, // cipher was negotiated. TLSv1 will reject such a request. Hack: once a cert is // configured, allow only authenticated ciphers. if (!SSL_CTX_set_cipher_list( domain->ctx, CIPHERS_AUTHENTICATE )) { - _log_ssl_error( "Failed to set cipher list to %s\n", CIPHERS_AUTHENTICATE); + _log_ssl_error(NULL, "Failed to set cipher list to %s", CIPHERS_AUTHENTICATE); return -6; } @@ -581,7 +580,7 @@ int pn_ssl_domain_set_trusted_ca_db(pn_ssl_domain_t *domain, // to SSL_CTX_load_verify_locations() struct stat sbuf; if (stat( certificate_db, &sbuf ) != 0) { - _log_error("stat(%s) failed: %s\n", certificate_db, strerror(errno)); + _log_error(NULL, "stat(%s) failed: %s", certificate_db, strerror(errno)); return -1; } @@ -596,7 +595,7 @@ int pn_ssl_domain_set_trusted_ca_db(pn_ssl_domain_t *domain, } if (SSL_CTX_load_verify_locations( domain->ctx, file, dir ) != 1) { - _log_ssl_error( "SSL_CTX_load_verify_locations( %s ) failed\n", certificate_db); + _log_ssl_error(NULL, "SSL_CTX_load_verify_locations( %s ) failed", certificate_db); return -1; } @@ -617,8 +616,8 @@ int pn_ssl_domain_set_peer_authentication(pn_ssl_domain_t *domain, case PN_SSL_VERIFY_PEER_NAME: if (!domain->has_ca_db) { - _log_error("Error: cannot verify peer without a trusted CA configured.\n" - " Use pn_ssl_domain_set_trusted_ca_db()\n"); + _log_error(NULL, "Error: cannot verify peer without a trusted CA configured.\n" + " Use pn_ssl_domain_set_trusted_ca_db()"); return -1; } @@ -626,12 +625,12 @@ int pn_ssl_domain_set_peer_authentication(pn_ssl_domain_t *domain, // openssl requires that server connections supply a list of trusted CAs which is // sent to the client if (!trusted_CAs) { - _log_error("Error: a list of trusted CAs must be provided.\n"); + _log_error(NULL, "Error: a list of trusted CAs must be provided."); return -1; } if (!domain->has_certificate) { - _log_error("Error: Server cannot verify peer without configuring a certificate.\n" - " Use pn_ssl_domain_set_credentials()\n"); + _log_error(NULL, "Error: Server cannot verify peer without configuring a certificate.\n" + " Use pn_ssl_domain_set_credentials()"); } if (domain->trusted_CAs) free(domain->trusted_CAs); @@ -641,7 +640,7 @@ int pn_ssl_domain_set_peer_authentication(pn_ssl_domain_t *domain, if (cert_names != NULL) SSL_CTX_set_client_CA_list(domain->ctx, cert_names); else { - _log_error("Error: Unable to process file of trusted CAs: %s\n", trusted_CAs); + _log_error(NULL, "Error: Unable to process file of trusted CAs: %s", trusted_CAs); return -1; } } @@ -658,7 +657,7 @@ int pn_ssl_domain_set_peer_authentication(pn_ssl_domain_t *domain, break; default: - _log_error( "Invalid peer authentication mode given.\n" ); + _log_error(NULL, "Invalid peer authentication mode given." ); return -1; } @@ -692,7 +691,7 @@ int pn_ssl_domain_allow_unsecured_client(pn_ssl_domain_t *domain) { if (!domain) return -1; if (domain->mode != PN_SSL_MODE_SERVER) { - _log_error("Cannot permit unsecured clients - not a server.\n"); + _log_error(NULL, "Cannot permit unsecured clients - not a server."); return -1; } domain->allow_unsecured = true; @@ -734,7 +733,7 @@ bool pn_ssl_get_protocol_name(pn_ssl_t *ssl, char *buffer, size_t size ) void pn_ssl_free( pn_ssl_t *ssl) { if (!ssl) return; - _log( ssl, "SSL socket freed.\n" ); + _log( ssl, "SSL socket freed." ); release_ssl_socket( ssl ); if (ssl->domain) pn_ssl_domain_free(ssl->domain); if (ssl->session_id) free((void *)ssl->session_id); @@ -796,7 +795,7 @@ static int keyfile_pw_cb(char *buf, int size, int rwflag, void *userdata) static int start_ssl_shutdown( pn_ssl_t *ssl ) { if (!ssl->ssl_shutdown) { - _log(ssl, "Shutting down SSL connection...\n"); + _log(ssl, "Shutting down SSL connection..."); if (ssl->session_id) { // save the negotiated credentials before we close the connection pn_ssl_session_t *ssn = (pn_ssl_session_t *)calloc( 1, sizeof(pn_ssl_session_t)); @@ -804,7 +803,7 @@ static int start_ssl_shutdown( pn_ssl_t *ssl ) ssn->id = pn_strdup( ssl->session_id ); ssn->session = SSL_get1_session( ssl->ssl ); if (ssn->session) { - _log( ssl, "Saving SSL session as %s\n", ssl->session_id ); + _log( ssl, "Saving SSL session as %s", ssl->session_id ); LL_ADD( ssl->domain, ssn_cache, ssn ); } else { ssl_session_free( ssn ); @@ -821,7 +820,7 @@ static int start_ssl_shutdown( pn_ssl_t *ssl ) static int setup_ssl_connection( pn_ssl_t *ssl ) { - _log( ssl, "SSL connection detected.\n"); + _log( ssl, "SSL connection detected."); ssl->io_layer->process_input = process_input_ssl; ssl->io_layer->process_output = process_output_ssl; return 0; @@ -837,7 +836,7 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context; if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_EOS; - _log( ssl, "process_input_ssl( data size=%d )\n",available ); + _log( ssl, "process_input_ssl( data size=%d )",available ); ssize_t consumed = 0; bool work_pending; @@ -856,12 +855,12 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat consumed += written; ssl->read_blocked = false; work_pending = (available > 0); - _log( ssl, "Wrote %d bytes to BIO Layer, %d left over\n", written, available ); + _log( ssl, "Wrote %d bytes to BIO Layer, %d left over", written, available ); } } else if (shutdown_input) { // lower layer (caller) has closed. Close the WRITE side of the BIO. This will cause // an EOF to be passed to SSL once all pending inbound data has been consumed. - _log( ssl, "Lower layer closed - shutting down BIO write side\n"); + _log( ssl, "Lower layer closed - shutting down BIO write side"); (void)BIO_shutdown_wr( ssl->bio_net_io ); shutdown_input = false; } @@ -871,7 +870,7 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat if (!ssl->ssl_closed && ssl->in_count < ssl->in_size) { int read = BIO_read( ssl->bio_ssl, &ssl->inbuf[ssl->in_count], ssl->in_size - ssl->in_count ); if (read > 0) { - _log( ssl, "Read %d bytes from SSL socket for app\n", read ); + _log( ssl, "Read %d bytes from SSL socket for app", read ); _log_clear_data( ssl, &ssl->inbuf[ssl->in_count], read ); ssl->in_count += read; work_pending = true; @@ -881,7 +880,7 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat switch (reason) { case SSL_ERROR_ZERO_RETURN: // SSL closed cleanly - _log(ssl, "SSL connection has closed\n"); + _log(ssl, "SSL connection has closed"); start_ssl_shutdown(ssl); // KAG: not sure - this may not be necessary ssl->ssl_closed = true; break; @@ -892,11 +891,11 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat } else { if (BIO_should_write( ssl->bio_ssl )) { ssl->write_blocked = true; - _log(ssl, "Detected write-blocked\n"); + _log(ssl, "Detected write-blocked"); } if (BIO_should_read( ssl->bio_ssl )) { ssl->read_blocked = true; - _log(ssl, "Detected read-blocked\n"); + _log(ssl, "Detected read-blocked"); } } } @@ -913,9 +912,9 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat if (ssl->in_count) memmove( ssl->inbuf, ssl->inbuf + consumed, ssl->in_count ); work_pending = true; - _log( ssl, "Application consumed %d bytes from peer\n", (int) consumed ); + _log( ssl, "Application consumed %d bytes from peer", (int) consumed ); } else if (consumed < 0) { - _log(ssl, "Application layer closed its input, error=%d (discarding %d bytes)\n", + _log(ssl, "Application layer closed its input, error=%d (discarding %d bytes)", (int) consumed, (int)ssl->in_count); ssl->in_count = 0; // discard any pending input ssl->app_input_closed = consumed; @@ -945,7 +944,7 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat // the application _must_ have enough data to process. If // this is an oversized frame, the app _must_ handle it // by returning an error code to SSL. - _log_error("Error: application unable to consume input.\n"); + _log_error(ssl, "Error: application unable to consume input."); } } } @@ -954,7 +953,7 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat } while (work_pending); - //_log(ssl, "ssl_closed=%d in_count=%d app_input_closed=%d app_output_closed=%d\n", + //_log(ssl, "ssl_closed=%d in_count=%d app_input_closed=%d app_output_closed=%d", // ssl->ssl_closed, ssl->in_count, ssl->app_input_closed, ssl->app_output_closed ); // PROTON-82: Instead, close the input side as soon as we've completed enough of the SSL @@ -971,7 +970,7 @@ static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_dat consumed = ssl->app_input_closed; ssl->io_layer->process_input = process_input_done; } - _log(ssl, "process_input_ssl() returning %d\n", (int) consumed); + _log(ssl, "process_input_ssl() returning %d", (int) consumed); return consumed; } @@ -994,10 +993,10 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t if (app_bytes > 0) { ssl->out_count += app_bytes; work_pending = true; - _log( ssl, "Gathered %d bytes from app to send to peer\n", app_bytes ); + _log( ssl, "Gathered %d bytes from app to send to peer", app_bytes ); } else { if (app_bytes < 0) { - _log(ssl, "Application layer closed its output, error=%d (%d bytes pending send)\n", + _log(ssl, "Application layer closed its output, error=%d (%d bytes pending send)", (int) app_bytes, (int) ssl->out_count); ssl->app_output_closed = app_bytes; } @@ -1014,14 +1013,14 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t data += wrote; ssl->out_count -= wrote; work_pending = true; - _log( ssl, "Wrote %d bytes from app to socket\n", wrote ); + _log( ssl, "Wrote %d bytes from app to socket", wrote ); } else { if (!BIO_should_retry(ssl->bio_ssl)) { int reason = SSL_get_error( ssl->ssl, wrote ); switch (reason) { case SSL_ERROR_ZERO_RETURN: // SSL closed cleanly - _log(ssl, "SSL connection has closed\n"); + _log(ssl, "SSL connection has closed"); start_ssl_shutdown(ssl); // KAG: not sure - this may not be necessary ssl->out_count = 0; // can no longer write to socket, so erase app output data ssl->ssl_closed = true; @@ -1033,11 +1032,11 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t } else { if (BIO_should_read( ssl->bio_ssl )) { ssl->read_blocked = true; - _log(ssl, "Detected read-blocked\n"); + _log(ssl, "Detected read-blocked"); } if (BIO_should_write( ssl->bio_ssl )) { ssl->write_blocked = true; - _log(ssl, "Detected write-blocked\n"); + _log(ssl, "Detected write-blocked"); } } } @@ -1063,13 +1062,13 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t written += available; ssl->write_blocked = false; work_pending = work_pending || max_len > 0; - _log( ssl, "Read %d bytes from BIO Layer\n", available ); + _log( ssl, "Read %d bytes from BIO Layer", available ); } } } while (work_pending); - //_log(ssl, "written=%d ssl_closed=%d in_count=%d app_input_closed=%d app_output_closed=%d bio_pend=%d\n", + //_log(ssl, "written=%d ssl_closed=%d in_count=%d app_input_closed=%d app_output_closed=%d bio_pend=%d", // written, ssl->ssl_closed, ssl->in_count, ssl->app_input_closed, ssl->app_output_closed, BIO_pending(ssl->bio_net_io) ); // PROTON-82: close the output side as soon as we've sent the SSL close_notify. @@ -1084,7 +1083,7 @@ static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t written = ssl->app_output_closed ? ssl->app_output_closed : PN_EOS; ssl->io_layer->process_output = process_output_done; } - _log(ssl, "process_output_ssl() returning %d\n", (int) written); + _log(ssl, "process_output_ssl() returning %d", (int) written); return written; } @@ -1095,7 +1094,7 @@ static int init_ssl_socket( pn_ssl_t *ssl ) ssl->ssl = SSL_new(ssl->domain->ctx); if (!ssl->ssl) { - _log_error( "SSL socket setup failure.\n" ); + _log_error(ssl, "SSL socket setup failure." ); return -1; } @@ -1112,10 +1111,10 @@ static int init_ssl_socket( pn_ssl_t *ssl ) if (ssl->session_id) { pn_ssl_session_t *ssn = ssn_cache_find( ssl->domain, ssl->session_id ); if (ssn) { - _log( ssl, "Restoring previous session id=%s\n", ssn->id ); + _log( ssl, "Restoring previous session id=%s", ssn->id ); int rc = SSL_set_session( ssl->ssl, ssn->session ); if (rc != 1) { - _log( ssl, "Session restore failed, id=%s\n", ssn->id ); + _log( ssl, "Session restore failed, id=%s", ssn->id ); } LL_REMOVE( ssl->domain, ssn_cache, ssn ); ssl_session_free( ssn ); @@ -1125,14 +1124,14 @@ static int init_ssl_socket( pn_ssl_t *ssl ) // now layer a BIO over the SSL socket ssl->bio_ssl = BIO_new(BIO_f_ssl()); if (!ssl->bio_ssl) { - _log_error( "BIO setup failure.\n" ); + _log_error(ssl, "BIO setup failure." ); return -1; } (void)BIO_set_ssl(ssl->bio_ssl, ssl->ssl, BIO_NOCLOSE); // create the "lower" BIO "pipe", and attach it below the SSL layer if (!BIO_new_bio_pair(&ssl->bio_ssl_io, 0, &ssl->bio_net_io, 0)) { - _log_error( "BIO setup failure.\n" ); + _log_error(ssl, "BIO setup failure." ); return -1; } SSL_set_bio(ssl->ssl, ssl->bio_ssl_io, ssl->bio_ssl_io); @@ -1140,11 +1139,11 @@ static int init_ssl_socket( pn_ssl_t *ssl ) if (ssl->domain->mode == PN_SSL_MODE_SERVER) { SSL_set_accept_state(ssl->ssl); BIO_set_ssl_mode(ssl->bio_ssl, 0); // server mode - _log( ssl, "Server SSL socket created.\n" ); + _log( ssl, "Server SSL socket created." ); } else { // client mode SSL_set_connect_state(ssl->ssl); BIO_set_ssl_mode(ssl->bio_ssl, 1); // client mode - _log( ssl, "Client SSL socket created.\n" ); + _log( ssl, "Client SSL socket created." ); } return 0; } @@ -1167,7 +1166,7 @@ static void release_ssl_socket( pn_ssl_t *ssl ) static int setup_cleartext_connection( pn_ssl_t *ssl ) { - _log( ssl, "Cleartext connection detected.\n"); + _log( ssl, "Cleartext connection detected."); ssl->io_layer->process_input = pn_io_layer_input_passthru; ssl->io_layer->process_output = pn_io_layer_output_passthru; return 0; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5d65452/proton-c/src/transport/transport.c ---------------------------------------------------------------------- diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c index d91b55a..601d6a2 100644 --- a/proton-c/src/transport/transport.c +++ b/proton-c/src/transport/transport.c @@ -177,8 +177,7 @@ static void pn_transport_initialize(void *object) transport->done_processing = false; - transport->posted_head_closed = false; - transport->posted_tail_closed = false; + transport->posted_idle_timeout = false; } pn_session_t *pn_channel_state(pn_transport_t *transport, uint16_t channel) @@ -457,6 +456,24 @@ static pn_collector_t *pni_transport_collector(pn_transport_t *transport) } } +static void pni_maybe_post_closed(pn_transport_t *transport) +{ + pn_collector_t *collector = pni_transport_collector(transport); + if (transport->head_closed && transport->tail_closed) { + pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_CLOSED); + } +} + +static void pni_close_tail(pn_transport_t *transport) +{ + if (!transport->tail_closed) { + transport->tail_closed = true; + pn_collector_t *collector = pni_transport_collector(transport); + pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_TAIL_CLOSED); + pni_maybe_post_closed(transport); + } +} + int pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...) { va_list ap; @@ -479,6 +496,8 @@ int pn_do_error(pn_transport_t *transport, const char *condition, const char *fm pn_collector_t *collector = pni_transport_collector(transport); pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_ERROR); pn_transport_logf(transport, "ERROR %s %s", condition, buf); + transport->done_processing = true; + pni_close_tail(transport); return PN_ERR; } @@ -1050,14 +1069,6 @@ ssize_t pn_transport_input(pn_transport_t *transport, const char *bytes, size_t return original - available; } -static void pni_maybe_post_closed(pn_transport_t *transport) -{ - pn_collector_t *collector = pni_transport_collector(transport); - if (transport->posted_head_closed && transport->posted_tail_closed) { - pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_CLOSED); - } -} - // process pending input until none remaining or EOS static ssize_t transport_consume(pn_transport_t *transport) { @@ -1079,12 +1090,6 @@ static ssize_t transport_consume(pn_transport_t *transport) if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM)) pn_transport_log(transport, " <- EOS"); transport->input_pending = 0; // XXX ??? - if (!transport->posted_tail_closed) { - pn_collector_t *collector = pni_transport_collector(transport); - pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_TAIL_CLOSED); - transport->posted_tail_closed = true; - pni_maybe_post_closed(transport); - } return n; } } @@ -1171,8 +1176,11 @@ static pn_timestamp_t pn_tick_amqp(pn_io_layer_t *io_layer, pn_timestamp_t now) transport->last_bytes_input = transport->bytes_input; } else if (transport->dead_remote_deadline <= now) { transport->dead_remote_deadline = now + transport->local_idle_timeout; - // Note: AMQP-1.0 really should define a generic "timeout" error, but does not. - pn_do_error(transport, "amqp:resource-limit-exceeded", "local-idle-timeout expired"); + if (!transport->posted_idle_timeout) { + transport->posted_idle_timeout = true; + // Note: AMQP-1.0 really should define a generic "timeout" error, but does not. + pn_do_error(transport, "amqp:resource-limit-exceeded", "local-idle-timeout expired"); + } } timeout = transport->dead_remote_deadline; } @@ -1861,9 +1869,21 @@ static ssize_t pn_output_write_amqp(pn_io_layer_t *io_layer, char *bytes, size_t return pn_dispatcher_output(transport->disp, bytes, size); } +static void pni_close_head(pn_transport_t *transport) +{ + if (!transport->head_closed) { + transport->head_closed = true; + pn_collector_t *collector = pni_transport_collector(transport); + pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_HEAD_CLOSED); + pni_maybe_post_closed(transport); + } +} + // generate outbound data, return amount of pending output else error static ssize_t transport_produce(pn_transport_t *transport) { + if (transport->head_closed) return PN_EOS; + pn_io_layer_t *io_layer = transport->io_layers; ssize_t space = transport->output_size - transport->output_pending; @@ -1900,13 +1920,12 @@ static ssize_t transport_produce(pn_transport_t *transport) if (n < 0) { pn_transport_log(transport, " -> EOS"); } - /*else - pn_transport_logf(transport, " -> EOS (%" PN_ZI ") %s", n, - pn_error_text(transport->error));*/ } + pni_close_head(transport); return n; } } + return transport->output_pending; } @@ -1963,15 +1982,24 @@ void pn_transport_log(pn_transport_t *transport, const char *message) transport->tracer(transport, message); } +void pn_transport_vlogf(pn_transport_t *transport, const char *fmt, va_list ap) +{ + if (transport) { + pn_string_vformat(transport->scratch, fmt, ap); + pn_transport_log(transport, pn_string_get(transport->scratch)); + } else { + vfprintf(stderr, fmt, ap); + fprintf(stderr, "\n"); + } +} + void pn_transport_logf(pn_transport_t *transport, const char *fmt, ...) { va_list ap; va_start(ap, fmt); - pn_string_vformat(transport->scratch, fmt, ap); + pn_transport_vlogf(transport, fmt, ap); va_end(ap); - - pn_transport_log(transport, pn_string_get(transport->scratch)); } uint16_t pn_transport_get_channel_max(pn_transport_t *transport) @@ -2132,13 +2160,6 @@ ssize_t pn_transport_push(pn_transport_t *transport, const char *src, size_t siz } } -void pni_close_tail(pn_transport_t *transport) -{ - if (!transport->tail_closed) { - transport->tail_closed = true; - } -} - int pn_transport_process(pn_transport_t *transport, size_t size) { assert(transport); @@ -2168,7 +2189,6 @@ int pn_transport_close_tail(pn_transport_t *transport) ssize_t pn_transport_pending(pn_transport_t *transport) /* <0 == done */ { assert(transport); - if (transport->head_closed) return PN_EOS; return transport_produce( transport ); } @@ -2211,12 +2231,8 @@ void pn_transport_pop(pn_transport_t *transport, size_t size) transport->output_pending ); } - if (!transport->output_pending && pn_transport_pending(transport) < 0 && - !transport->posted_head_closed) { - pn_collector_t *collector = pni_transport_collector(transport); - pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_HEAD_CLOSED); - transport->posted_head_closed = true; - pni_maybe_post_closed(transport); + if (!transport->output_pending && pn_transport_pending(transport) < 0) { + pni_close_head(transport); } } } @@ -2224,7 +2240,7 @@ void pn_transport_pop(pn_transport_t *transport, size_t size) int pn_transport_close_head(pn_transport_t *transport) { size_t pending = pn_transport_pending(transport); - transport->head_closed = true; + pni_close_head(transport); pn_transport_pop(transport, pending); return 0; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5d65452/proton-c/src/windows/schannel.c ---------------------------------------------------------------------- diff --git a/proton-c/src/windows/schannel.c b/proton-c/src/windows/schannel.c index 373dc51..abf4b85 100644 --- a/proton-c/src/windows/schannel.c +++ b/proton-c/src/windows/schannel.c @@ -222,7 +222,6 @@ static int ssl_failed(pn_ssl_t *ssl, const char *reason) ssl->ssl_closed = true; ssl->app_input_closed = ssl->app_output_closed = PN_EOS; ssl->state = SSL_CLOSED; - pni_close_tail(ssl->transport); pn_do_error(ssl->transport, "amqp:connection:framing-error", "SSL Failure: %s", reason); return PN_EOS; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5d65452/tests/python/proton_tests/engine.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/engine.py b/tests/python/proton_tests/engine.py index eec73d0..d17a57c 100644 --- a/tests/python/proton_tests/engine.py +++ b/tests/python/proton_tests/engine.py @@ -2413,3 +2413,30 @@ class TeardownLeakTest(PeerTest): def testLeak(self): self.doLeak(False, False) + +class IdleTimeoutEventTest(PeerTest): + + def half_pump(self): + p = self.transport.pending() + self.transport.pop(p) + + def testTimeoutWithZombieServer(self): + self.transport.idle_timeout = self.delay + self.connection.open() + self.half_pump() + self.transport.tick(time()) + sleep(self.delay*2) + self.transport.tick(time()) + self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND, + Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT, + Event.TRANSPORT_ERROR, Event.TRANSPORT_TAIL_CLOSED) + assert self.transport.capacity() < 0 + assert self.transport.pending() > 0 + self.half_pump() + self.expect(Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED) + assert self.transport.pending() < 0 + + def testTimeoutWithZombieServerAndSASL(self): + sasl = self.transport.sasl() + sasl.client() + self.testTimeoutWithZombieServer() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
