http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/server.c ---------------------------------------------------------------------- diff --git a/src/server.c b/src/server.c index 848cfcb..15a2aef 100644 --- a/src/server.c +++ b/src/server.c @@ -24,6 +24,12 @@ #include <qpid/dispatch/amqp.h> #include <qpid/dispatch/server.h> #include <qpid/dispatch/failoverlist.h> + +#include <proton/event.h> +#include <proton/listener.h> +#include <proton/proactor.h> +#include <proton/sasl.h> + #include "qpid/dispatch/python_embedded.h" #include "entity.h" #include "entity_cache.h" @@ -34,94 +40,33 @@ #include "alloc.h" #include "config.h" #include <stdio.h> -#include <time.h> #include <string.h> #include <errno.h> #include <inttypes.h> -typedef struct qd_thread_t { - qd_server_t *qd_server; - int thread_id; - volatile int running; - volatile int canceled; - int using_thread; - sys_thread_t *thread; -} qd_thread_t; - - -typedef struct qd_work_item_t { - DEQ_LINKS(struct qd_work_item_t); - qdpn_connector_t *cxtr; -} qd_work_item_t; - -DEQ_DECLARE(qd_work_item_t, qd_work_list_t); - - struct qd_server_t { qd_dispatch_t *qd; - int thread_count; + const int thread_count; /* Immutable */ const char *container_name; const char *sasl_config_path; const char *sasl_config_name; - qdpn_driver_t *driver; + pn_proactor_t *proactor; + qd_container_t *container; qd_log_source_t *log_source; - qd_conn_handler_cb_t conn_handler; - qd_pn_event_handler_cb_t pn_event_handler; - qd_pn_event_complete_cb_t pn_event_complete_handler; void *start_context; - void *conn_handler_context; sys_cond_t *cond; sys_mutex_t *lock; - qd_thread_t **threads; - qd_work_list_t work_queue; - qd_timer_list_t pending_timers; - bool a_thread_is_waiting; int pause_requests; int threads_paused; int pause_next_sequence; int pause_now_serving; - qd_signal_handler_cb_t signal_handler; - bool signal_handler_running; - void *signal_context; - int pending_signal; - qd_timer_t *heartbeat_timer; uint64_t next_connection_id; void *py_displayname_obj; qd_http_server_t *http; }; -/** - * Listener objects represent the desire to accept incoming transport connections. - */ -struct qd_listener_t { - qd_server_t *server; - const qd_server_config_t *config; - void *context; - qdpn_listener_t *pn_listener; - qd_http_listener_t *http; -}; - - -/** - * Connector objects represent the desire to create and maintain an outgoing transport connection. - */ -struct qd_connector_t { - qd_server_t *server; - cxtr_state_t state; - const qd_server_config_t *config; - void *context; - qd_connection_t *ctx; - qd_timer_t *timer; - long delay; -}; - - - -static __thread qd_server_t *thread_server = 0; - #define HEARTBEAT_INTERVAL 1000 -ALLOC_DEFINE(qd_work_item_t); ALLOC_DEFINE(qd_listener_t); ALLOC_DEFINE(qd_connector_t); ALLOC_DEFINE(qd_deferred_call_t); @@ -142,49 +87,32 @@ const char CERT_FINGERPRINT_SHA256 = '2'; const char CERT_FINGERPRINT_SHA512 = '5'; char *COMPONENT_SEPARATOR = ";"; -static void setup_ssl_sasl_and_open(qd_connection_t *ctx); +static const int BACKLOG = 50; /* Listening backlog */ -static qd_thread_t *thread(qd_server_t *qd_server, int id) -{ - qd_thread_t *thread = NEW(qd_thread_t); - if (!thread) - return 0; - - thread->qd_server = qd_server; - thread->thread_id = id; - thread->running = 0; - thread->canceled = 0; - thread->using_thread = 0; - - return thread; -} +static void setup_ssl_sasl_and_open(qd_connection_t *ctx); -static void free_qd_connection(qd_connection_t *ctx) -{ - if (ctx->policy_settings) { - if (ctx->policy_settings->sources) - free(ctx->policy_settings->sources); - if (ctx->policy_settings->targets) - free(ctx->policy_settings->targets); - free (ctx->policy_settings); - ctx->policy_settings = 0; - } - if (ctx->pn_conn) { - pn_connection_set_context(ctx->pn_conn, 0); - pn_decref(ctx->pn_conn); - ctx->pn_conn = NULL; - } - if (ctx->collector) { - pn_collector_free(ctx->collector); - ctx->collector = NULL; +/* Construct a new qd_connectoin. */ +static qd_connection_t *qd_connection(qd_server_t *server, const char *role) { + qd_connection_t *ctx = new_qd_connection_t(); + if (!ctx) return NULL; + ZERO(ctx); + ctx->pn_conn = pn_connection(); + ctx->deferred_call_lock = sys_mutex(); + ctx->role = strdup(role); + if (!ctx->pn_conn || !ctx->deferred_call_lock || !role) { + if (ctx->pn_conn) pn_connection_free(ctx->pn_conn); + if (ctx->deferred_call_lock) sys_mutex_free(ctx->deferred_call_lock); + free(ctx->role); + return NULL; } - - if (ctx->free_user_id) - free((char*)ctx->user_id); - - free(ctx->role); - - free_qd_connection_t(ctx); + ctx->server = server; + DEQ_ITEM_INIT(ctx); + DEQ_INIT(ctx->deferred_calls); + DEQ_INIT(ctx->free_link_session_list); + sys_mutex_lock(server->lock); + ctx->connection_id = server->next_connection_id++; + sys_mutex_unlock(server->lock); + return ctx; } /** @@ -222,7 +150,7 @@ qd_error_t qd_register_display_name_service(qd_dispatch_t *qd, void *displayname static const char *transport_get_user(qd_connection_t *conn, pn_transport_t *tport) { const qd_server_config_t *config = - conn->connector ? conn->connector->config : conn->listener->config; + conn->connector ? &conn->connector->config : &conn->listener->config; if (config->ssl_uid_format) { // The ssl_uid_format length cannot be greater that 7 @@ -418,22 +346,6 @@ static const char *transport_get_user(qd_connection_t *conn, pn_transport_t *tpo } -/** - * Allocate a new qd_connection - * with DEQ items initialized, call lock allocated, and all other fields cleared. - */ -static qd_connection_t *connection_allocate() -{ - qd_connection_t *ctx = new_qd_connection_t(); - ZERO(ctx); - DEQ_ITEM_INIT(ctx); - DEQ_INIT(ctx->deferred_calls); - ctx->deferred_call_lock = sys_mutex(); - DEQ_INIT(ctx->free_link_session_list); - return ctx; -} - - void qd_connection_set_user(qd_connection_t *conn) { pn_transport_t *tport = pn_connection_transport(conn->pn_conn); @@ -507,20 +419,6 @@ static qd_error_t listener_setup_ssl(qd_connection_t *ctx, const qd_server_confi return QD_ERROR_NONE; } -// Format the identity of an incoming connection to buf for logging -static const char *log_incoming(char *buf, size_t size, qdpn_connector_t *cxtr) -{ - qd_listener_t *qd_listener = qdpn_listener_context(qdpn_connector_listener(cxtr)); - assert(qd_listener); - const char *cname = qdpn_connector_name(cxtr); - const char *host = qd_listener->config->host; - const char *port = qd_listener->config->port; - const char *protocol = qd_listener->config->http ? "HTTP" : "AMQP"; - snprintf(buf, size, "incoming %s connection from %s to %s:%s", - protocol, cname, host, port); - return buf; -} - static void decorate_connection(qd_server_t *qd_server, pn_connection_t *conn, const qd_server_config_t *config) { @@ -602,637 +500,383 @@ static void decorate_connection(qd_server_t *qd_server, pn_connection_t *conn, c } -static void thread_process_listeners_LH(qd_server_t *qd_server) +static void on_accept(pn_event_t *e) { - qdpn_driver_t *driver = qd_server->driver; - qdpn_listener_t *listener; - qdpn_connector_t *cxtr; - qd_connection_t *ctx; - - for (listener = qdpn_driver_listener(driver); listener; listener = qdpn_driver_listener(driver)) { - qd_listener_t *li = qdpn_listener_context(listener); - bool policy_counted = false; - cxtr = qdpn_listener_accept(listener, qd_server->qd->policy, &qd_policy_socket_accept, &policy_counted); - if (!cxtr) - continue; - - char logbuf[qd_log_max_len()]; - - ctx = connection_allocate(); - ctx->server = qd_server; - ctx->owner_thread = CONTEXT_UNSPECIFIED_OWNER; - ctx->pn_cxtr = cxtr; - ctx->listener = qdpn_listener_context(listener); - ctx->context = ctx->listener->context; - ctx->connection_id = qd_server->next_connection_id++; // Increment the connection id so the next connection can use it - ctx->policy_counted = policy_counted; - - // Copy the role from the listener config - int role_length = strlen(ctx->listener->config->role) + 1; - ctx->role = (char*) malloc(role_length); - strcpy(ctx->role, ctx->listener->config->role); - - pn_connection_t *conn = pn_connection(); - ctx->collector = pn_collector(); - pn_connection_collect(conn, ctx->collector); - decorate_connection(qd_server, conn, ctx->listener->config); - qdpn_connector_set_connection(cxtr, conn); - pn_connection_set_context(conn, ctx); - ctx->pn_conn = conn; - ctx->owner_thread = CONTEXT_NO_OWNER; - qdpn_connector_set_context(cxtr, ctx); - - qd_log(qd_server->log_source, QD_LOG_INFO, "Accepting %s with connection id [%"PRIu64"]", - log_incoming(logbuf, sizeof(logbuf), cxtr), ctx->connection_id); + assert(pn_event_type(e) == PN_LISTENER_ACCEPT); + pn_listener_t *pn_listener = pn_event_listener(e); + qd_listener_t *listener = pn_listener_get_context(pn_listener); + qd_connection_t *ctx = qd_connection(listener->server, listener->config.role); + if (!ctx) { + qd_log(listener->server->log_source, QD_LOG_CRITICAL, + "Allocation failure during accept to %s", listener->config.host_port); + return; + } + pn_connection_set_context(ctx->pn_conn, ctx); + ctx->listener = listener; + decorate_connection(listener->server, ctx->pn_conn, &ctx->listener->config); + qd_log(listener->server->log_source, QD_LOG_TRACE, + "[%"PRIu64"] Accepting incoming connection from %s to %s", + ctx->connection_id, qd_connection_name(ctx), ctx->listener->config.host_port); + /* Asynchronous accept, configure the transport on PN_CONNECTION_BOUND */ + pn_listener_accept(pn_listener, ctx->pn_conn); + } + + +/* Log the description, set the transport condition (name, description) close the transport tail. */ +void connect_fail(qd_connection_t *ctx, const char *name, const char *description, ...) +{ + va_list ap; + va_start(ap, description); + qd_verror(QD_ERROR_RUNTIME, description, ap); + va_end(ap); + if (ctx->pn_conn) { + pn_transport_t *t = pn_connection_transport(ctx->pn_conn); + /* Normally this is closing the transport but if not bound close the connection. */ + pn_condition_t *cond = t ? pn_transport_condition(t) : pn_connection_condition(ctx->pn_conn); + if (cond && !pn_condition_is_set(cond)) { + va_start(ap, description); + pn_condition_vformat(cond, name, description, ap); + va_end(ap); + } + if (t) { + pn_transport_close_tail(t); + } else { + pn_connection_close(ctx->pn_conn); + } + } +} - // - // Get a pointer to the transport so we can insert security components into it - // - pn_transport_t *tport = qdpn_connector_transport(cxtr); - const qd_server_config_t *config = ctx->listener->config; - // - // Configure the transport. - // +/* Get the host IP address for the remote end */ +static int set_remote_host_port(qd_connection_t *ctx) { + pn_transport_t *tport = pn_connection_transport(ctx->pn_conn); + const struct sockaddr_storage* addr = pn_proactor_addr_sockaddr(pn_proactor_addr_remote(tport)); + int err = 0; + if (!addr) { + err = -1; + qd_log(ctx->server->log_source, QD_LOG_ERROR, "No remote address for connection to %s"); + } else { + char rport[NI_MAXSERV] = ""; + int err = getnameinfo((struct sockaddr*)addr, sizeof(*addr), + ctx->rhost, sizeof(ctx->rhost), rport, sizeof(rport), + NI_NUMERICHOST | NI_NUMERICSERV); + if (!err) { + snprintf(ctx->rhost_port, sizeof(ctx->rhost_port), "%s:%s", ctx->rhost, rport); + } else { + qd_log(ctx->server->log_source, QD_LOG_ERROR, "No remote address for connection to %s"); + } + } + return err; +} + + +/* Configure the transport once it is bound to the connection */ +static void on_connection_bound(qd_server_t *server, pn_event_t *e) { + pn_connection_t *pn_conn = pn_event_connection(e); + qd_connection_t *ctx = pn_connection_get_context(pn_conn); + pn_transport_t *tport = pn_connection_transport(pn_conn); + pn_transport_set_context(tport, ctx); /* for transport_tracer */ + + // + // Proton pushes out its trace to transport_tracer() which in turn writes a trace + // message to the qdrouter log If trace level logging is enabled on the router set + // PN_TRACE_DRV | PN_TRACE_FRM | PN_TRACE_RAW on the proton transport + // + if (qd_log_enabled(ctx->server->log_source, QD_LOG_TRACE)) { + pn_transport_trace(tport, PN_TRACE_FRM); + pn_transport_set_tracer(tport, transport_tracer); + } + + const qd_server_config_t *config = NULL; + if (ctx->listener) { /* Accepting an incoming connection */ + config = &ctx->listener->config; + const char *name = config->host_port; pn_transport_set_server(tport); - pn_transport_set_max_frame(tport, config->max_frame_size); - pn_transport_set_channel_max(tport, config->max_sessions - 1); - pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000); - // - // Proton pushes out its trace to transport_tracer() which in turn writes a trace message to the qdrouter log - // If trace level logging is enabled on the router set PN_TRACE_DRV | PN_TRACE_FRM | PN_TRACE_RAW on the proton transport - // - pn_transport_set_context(tport, ctx); - if (qd_log_enabled(qd_server->log_source, QD_LOG_TRACE)) { - pn_transport_trace(tport, PN_TRACE_FRM); - pn_transport_set_tracer(tport, transport_tracer); + if (set_remote_host_port(ctx) == 0 && + qd_policy_socket_accept(server->qd->policy, ctx->rhost)) + { + ctx->policy_counted = true; + } else { + pn_transport_close_tail(tport); + pn_transport_close_head(tport); + return; } - if (li->http) { - // Set up HTTP if configured, HTTP provides its own SSL. - qd_log(qd_server->log_source, QD_LOG_TRACE, "Configuring HTTP%s on %s", - config->ssl_profile ? "S" : "", - log_incoming(logbuf, sizeof(logbuf), cxtr)); - qd_http_listener_accept(li->http, cxtr); - } else if (config->ssl_profile) { - // Set up SSL if configured and HTTP is not providing SSL. - qd_log(qd_server->log_source, QD_LOG_TRACE, "Configuring SSL on %s", - log_incoming(logbuf, sizeof(logbuf), cxtr)); + // Set up SSL + if (config->ssl_profile) { + qd_log(ctx->server->log_source, QD_LOG_TRACE, "Configuring SSL on %s", name); if (listener_setup_ssl(ctx, config, tport) != QD_ERROR_NONE) { - qd_log(qd_server->log_source, QD_LOG_ERROR, "%s on %s", - qd_error_message(), log_incoming(logbuf, sizeof(logbuf), cxtr)); - qdpn_connector_close(cxtr); - continue; + connect_fail(ctx, QD_AMQP_COND_INTERNAL_ERROR, "%s on %s", qd_error_message(), name); + return; } } - // // Set up SASL // + sys_mutex_lock(ctx->server->lock); pn_sasl_t *sasl = pn_sasl(tport); - if (qd_server->sasl_config_path) - pn_sasl_config_path(sasl, qd_server->sasl_config_path); - pn_sasl_config_name(sasl, qd_server->sasl_config_name); + if (ctx->server->sasl_config_path) + pn_sasl_config_path(sasl, ctx->server->sasl_config_path); + pn_sasl_config_name(sasl, ctx->server->sasl_config_name); if (config->sasl_mechanisms) pn_sasl_allowed_mechs(sasl, config->sasl_mechanisms); pn_transport_require_auth(tport, config->requireAuthentication); pn_transport_require_encryption(tport, config->requireEncryption); pn_sasl_set_allow_insecure_mechs(sasl, config->allowInsecureAuthentication); + sys_mutex_unlock(ctx->server->lock); + + qd_log(ctx->server->log_source, QD_LOG_INFO, "Accepted connection to %s from %s", + name, ctx->rhost_port); + } else if (ctx->connector) { /* Establishing an outgoing connection */ + config = &ctx->connector->config; + setup_ssl_sasl_and_open(ctx); + } else { /* No connector and no listener */ + connect_fail(ctx, QD_AMQP_COND_INTERNAL_ERROR, "unknown Connection"); + return; } -} - - -static void handle_signals_LH(qd_server_t *qd_server) -{ - int signum = qd_server->pending_signal; - - if (signum && !qd_server->signal_handler_running) { - qd_server->signal_handler_running = true; - qd_server->pending_signal = 0; - if (qd_server->signal_handler) { - sys_mutex_unlock(qd_server->lock); - qd_server->signal_handler(qd_server->signal_context, signum); - sys_mutex_lock(qd_server->lock); - } - qd_server->signal_handler_running = false; - } -} - -static void block_if_paused_LH(qd_server_t *qd_server) -{ - if (qd_server->pause_requests > 0) { - qd_server->threads_paused++; - sys_cond_signal_all(qd_server->cond); - while (qd_server->pause_requests > 0) - sys_cond_wait(qd_server->cond, qd_server->lock); - qd_server->threads_paused--; - } + // + // Common transport configuration. + // + pn_transport_set_max_frame(tport, config->max_frame_size); + pn_transport_set_channel_max(tport, config->max_sessions - 1); + pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000); } static void invoke_deferred_calls(qd_connection_t *conn, bool discard) { - qd_deferred_call_list_t calls; - qd_deferred_call_t *dc; - - // - // Copy the deferred calls out of the connection under lock. + // Lock access to deferred_calls, other threads may concurrently add to it. Invoke + // the calls outside of the critical section. // - DEQ_INIT(calls); sys_mutex_lock(conn->deferred_call_lock); - dc = DEQ_HEAD(conn->deferred_calls); - while (dc) { + qd_deferred_call_t *dc; + while ((dc = DEQ_HEAD(conn->deferred_calls))) { DEQ_REMOVE_HEAD(conn->deferred_calls); - DEQ_INSERT_TAIL(calls, dc); - dc = DEQ_HEAD(conn->deferred_calls); - } - sys_mutex_unlock(conn->deferred_call_lock); - - // - // Invoke the calls outside of the critical section. - // - dc = DEQ_HEAD(calls); - while (dc) { - DEQ_REMOVE_HEAD(calls); + sys_mutex_unlock(conn->deferred_call_lock); dc->call(dc->context, discard); free_qd_deferred_call_t(dc); - dc = DEQ_HEAD(calls); + sys_mutex_lock(conn->deferred_call_lock); } + sys_mutex_unlock(conn->deferred_call_lock); } - -static int process_connector(qd_server_t *qd_server, qdpn_connector_t *cxtr) -{ - qd_connection_t *ctx = qdpn_connector_context(cxtr); - int events = 0; - int passes = 0; - - if (ctx->closed) - return 0; - - do { - passes++; - - // - // If this connection is outbound and is just now opening, do the initial SSL/SASL setup - // - if (!ctx->opened && !!ctx->connector && !qdpn_connector_closed(cxtr)) - setup_ssl_sasl_and_open(ctx); - - // - // Step the engine for pre-handler processing - // - qdpn_connector_process(cxtr); - - // - // If the connector has closed, notify the client via callback. - // - if (qdpn_connector_closed(cxtr)) { - if (ctx->opened) - qd_server->conn_handler(qd_server->conn_handler_context, ctx->context, - QD_CONN_EVENT_CLOSE, - (qd_connection_t*) qdpn_connector_context(cxtr)); - ctx->closed = true; - events = 0; - break; - } - - invoke_deferred_calls(ctx, false); - - qd_connection_t *qd_conn = (qd_connection_t*) qdpn_connector_context(cxtr); - pn_collector_t *collector = qd_connection_collector(qd_conn); - pn_event_t *event; - - events = 0; - if (!ctx->event_stall) { - event = pn_collector_peek(collector); - while (event) { - // - // If we are transitioning to the open state, notify the client via callback. - // - if (!ctx->opened && pn_event_type(event) == PN_CONNECTION_REMOTE_OPEN) { - ctx->opened = true; - if (ctx->connector) { - ctx->connector->delay = 2000; // Delay on re-connect in case there is a recurring error - } else - assert(ctx->listener); - events = 1; - } else if (pn_event_type(event) == PN_TRANSPORT_ERROR) { - if (ctx->connector) { - const qd_server_config_t *config = ctx->connector->config; - qd_log(qd_server->log_source, QD_LOG_TRACE, "Connection to %s:%s failed", config->host, config->port); - } - } - - events += qd_server->pn_event_handler(qd_server->conn_handler_context, ctx->context, event, qd_conn); - pn_collector_pop(collector); - - event = ctx->event_stall ? 0 : pn_collector_peek(collector); +void qd_container_handle_event(qd_container_t *container, pn_event_t *event); + +static void handle_listener(pn_event_t *e, qd_server_t *qd_server) { + qd_log_source_t *log = qd_server->log_source; + + /* FIXME aconway 2017-02-20: HTTP support */ + qd_listener_t *li = (qd_listener_t*) pn_listener_get_context(pn_event_listener(e)); + const char *host_port = li->config.host_port; + switch (pn_event_type(e)) { + + case PN_LISTENER_OPEN: + qd_log(log, QD_LOG_NOTICE, "Listening on %s", host_port); + break; + + case PN_LISTENER_ACCEPT: + qd_log(log, QD_LOG_INFO, "Accepting connection on %s", host_port); + on_accept(e); + break; + + case PN_LISTENER_CLOSE: { + pn_condition_t *cond = pn_listener_condition(li->pn_listener); + if (pn_condition_is_set(cond)) { + qd_log(log, QD_LOG_ERROR, "Listener error on %s: %s (%s)", host_port, + pn_condition_get_description(cond), + pn_condition_get_name(cond)); + if (li->exit_on_error) { + qd_log(log, QD_LOG_CRITICAL, "Shutting down, required listener failed %s", + host_port); + exit(1); } - - // - // Free up any links and sessions that need to be freed since all the events have been popped from the collector. - // - qd_server->pn_event_complete_handler(qd_server->conn_handler_context, qd_conn); - events += qd_server->conn_handler(qd_server->conn_handler_context, ctx->context, QD_CONN_EVENT_WRITABLE, qd_conn); + } else { + qd_log(log, QD_LOG_TRACE, "Listener closed on %s", host_port); } - } while (events > 0); - - return passes > 1; + qd_listener_decref(li); + break; + } + default: + break; + } } -// -// TEMPORARY FUNCTION PROTOTYPES -// -void qdpn_driver_wait_1(qdpn_driver_t *d); -int qdpn_driver_wait_2(qdpn_driver_t *d, int timeout); -void qdpn_driver_wait_3(qdpn_driver_t *d); -// -// END TEMPORARY -// - -static void *thread_run(void *arg) +static void qd_connection_free(qd_connection_t *ctx) { - qd_thread_t *thread = (qd_thread_t*) arg; - qd_work_item_t *work; - qdpn_connector_t *cxtr; - qd_connection_t *ctx; - int error; - int poll_result; - - if (!thread) - return 0; + qd_server_t *qd_server = ctx->server; - qd_server_t *qd_server = thread->qd_server; - thread_server = qd_server; - thread->running = 1; + qd_entity_cache_remove(QD_CONNECTION_TYPE, ctx); /* Removed management entity */ - if (thread->canceled) - return 0; - - // - // Main Loop - // - while (thread->running) { - sys_mutex_lock(qd_server->lock); - - // - // Check for pending signals to process - // - handle_signals_LH(qd_server); - if (!thread->running) { - sys_mutex_unlock(qd_server->lock); - break; - } - - // - // Check to see if the server is pausing. If so, block here. - // - block_if_paused_LH(qd_server); - if (!thread->running) { - sys_mutex_unlock(qd_server->lock); - break; - } - - // - // Service pending timers. - // - qd_timer_t *timer = DEQ_HEAD(qd_server->pending_timers); - if (timer) { - DEQ_REMOVE_HEAD(qd_server->pending_timers); - - // - // Mark the timer as idle in case it reschedules itself. - // - qd_timer_idle_LH(timer); + // If this is a dispatch connector, schedule the re-connect timer + if (ctx->connector) { + sys_mutex_lock(ctx->connector->lock); + ctx->connector->ctx = 0; + ctx->connector->state = CXTR_STATE_CONNECTING; + sys_mutex_unlock(ctx->connector->lock); + qd_timer_schedule(ctx->connector->timer, ctx->connector->delay); + } - // - // Release the lock and invoke the connection handler. - // - sys_mutex_unlock(qd_server->lock); - timer->handler(timer->context); - qdpn_driver_wakeup(qd_server->driver); - continue; - } + // If counted for policy enforcement, notify it has closed + sys_mutex_lock(qd_server->lock); + if (ctx->policy_counted) { + qd_policy_socket_close(qd_server->qd->policy, ctx); + } + sys_mutex_unlock(qd_server->lock); - // - // Check the work queue for connectors scheduled for processing. - // - work = DEQ_HEAD(qd_server->work_queue); - if (!work) { - // - // There is no pending work to do - // - if (qd_server->a_thread_is_waiting) { - // - // Another thread is waiting on the proton driver, this thread must - // wait on the condition variable until signaled. - // - sys_cond_wait(qd_server->cond, qd_server->lock); - } else { - // - // This thread elects itself to wait on the proton driver. Set the - // thread-is-waiting flag so other idle threads will not interfere. - // - qd_server->a_thread_is_waiting = true; - - // - // Ask the timer module when its next timer is scheduled to fire. We'll - // use this value in driver_wait as the timeout. If there are no scheduled - // timers, the returned value will be -1. - // - qd_timestamp_t duration = qd_timer_next_duration_LH(); - - // - // Invoke the proton driver's wait sequence. This is a bit of a hack for now - // and will be improved in the future. The wait process is divided into three parts, - // the first and third of which need to be non-reentrant, and the second of which - // must be reentrant (and blocks). - // - qdpn_driver_wait_1(qd_server->driver); - sys_mutex_unlock(qd_server->lock); - - do { - error = 0; - poll_result = qdpn_driver_wait_2(qd_server->driver, duration); - if (poll_result == -1) - error = errno; - } while (error == EINTR); - if (error) { - exit(-1); - } + invoke_deferred_calls(ctx, true); // Discard any pending deferred calls + if (ctx->deferred_call_lock) + sys_mutex_free(ctx->deferred_call_lock); - sys_mutex_lock(qd_server->lock); - qdpn_driver_wait_3(qd_server->driver); + if (ctx->policy_settings) { + if (ctx->policy_settings->sources) + free(ctx->policy_settings->sources); + if (ctx->policy_settings->targets) + free(ctx->policy_settings->targets); + free (ctx->policy_settings); + ctx->policy_settings = 0; + } - if (!thread->running) { - sys_mutex_unlock(qd_server->lock); - break; - } + if (ctx->free_user_id) free((char*)ctx->user_id); + free(ctx->role); + free_qd_connection_t(ctx); - // - // Visit the timer module. - // - struct timespec tv; - clock_gettime(CLOCK_REALTIME, &tv); - qd_timestamp_t milliseconds = ((qd_timestamp_t)tv.tv_sec) * 1000 + tv.tv_nsec / 1000000; - qd_timer_visit_LH(milliseconds); - - // - // Process listeners (incoming connections). - // - thread_process_listeners_LH(qd_server); - - // - // Traverse the list of connectors-needing-service from the proton driver. - // If the connector is not already in the work queue and it is not currently - // being processed by another thread, put it in the work queue and signal the - // condition variable. - // - cxtr = qdpn_driver_connector(qd_server->driver); - while (cxtr) { - ctx = qdpn_connector_context(cxtr); - if (!ctx->enqueued && ctx->owner_thread == CONTEXT_NO_OWNER) { - ctx->enqueued = 1; - qd_work_item_t *workitem = new_qd_work_item_t(); - DEQ_ITEM_INIT(workitem); - workitem->cxtr = cxtr; - DEQ_INSERT_TAIL(qd_server->work_queue, workitem); - sys_cond_signal(qd_server->cond); - } - cxtr = qdpn_driver_connector(qd_server->driver); - } + /* Note: pn_conn is freed by the proactor */ +} - // - // Release our exclusive claim on qdpn_driver_wait. - // - qd_server->a_thread_is_waiting = false; - } - } - // - // If we were given a connector to work on from the work queue, mark it as - // owned by this thread and as no longer enqueued. - // - cxtr = 0; - if (work) { - DEQ_REMOVE_HEAD(qd_server->work_queue); - ctx = qdpn_connector_context(work->cxtr); - if (ctx->owner_thread == CONTEXT_NO_OWNER) { - ctx->owner_thread = thread->thread_id; - ctx->enqueued = 0; - cxtr = work->cxtr; - free_qd_work_item_t(work); - } else { - // - // This connector is being processed by another thread, re-queue it. - // - DEQ_INSERT_TAIL(qd_server->work_queue, work); +/* Events involving a connection or listener are serialized by the proactor so + * only one event per connection / listener will be processed at a time. + */ +static bool handle(pn_event_t *e, qd_server_t *qd_server) { + pn_connection_t *pn_conn = pn_event_connection(e); + qd_connection_t *ctx = pn_conn ? (qd_connection_t*) pn_connection_get_context(pn_conn) : NULL; + + switch (pn_event_type(e)) { + + case PN_PROACTOR_INTERRUPT: + /* Stop the current thread */ + return false; + + case PN_PROACTOR_TIMEOUT: + qd_timer_visit(); + break; + + case PN_LISTENER_OPEN: + case PN_LISTENER_ACCEPT: + case PN_LISTENER_CLOSE: + handle_listener(e, qd_server); + break; + + case PN_CONNECTION_BOUND: + on_connection_bound(qd_server, e); + break; + + case PN_CONNECTION_REMOTE_OPEN: + // If we are transitioning to the open state, notify the client via callback. + if (!ctx->opened) { + ctx->opened = true; + if (ctx->connector) { + ctx->connector->delay = 2000; // Delay re-connect in case there is a recurring error } } - sys_mutex_unlock(qd_server->lock); - - // - // Process the connector that we now have exclusive access to. - // - if (cxtr) { - int work_done = 1; - - if (qdpn_connector_failed(cxtr)) - qdpn_connector_close(cxtr); - - // - // Even if the connector has failed there are still events that - // must be processed so that associated links will be cleaned up. - // - work_done = process_connector(qd_server, cxtr); + break; - // - // Check to see if the connector was closed during processing - // - if (qdpn_connector_closed(cxtr)) { - qd_entity_cache_remove(QD_CONNECTION_TYPE, ctx); - // - // Connector is closed. Free the context and the connector. - // If this is a dispatch connector, schedule the re-connect timer - // - if (ctx->connector) { - ctx->connector->ctx = 0; - ctx->connector->state = CXTR_STATE_CONNECTING; - qd_timer_schedule(ctx->connector->timer, ctx->connector->delay); - } - - sys_mutex_lock(qd_server->lock); - - if (ctx->policy_counted) { - qd_policy_socket_close(qd_server->qd->policy, ctx); - } - - invoke_deferred_calls(ctx, true); // Discard any pending deferred calls - sys_mutex_free(ctx->deferred_call_lock); - qdpn_connector_free(cxtr); - free_qd_connection(ctx); - sys_mutex_unlock(qd_server->lock); - } else { - // - // The connector lives on. Mark it as no longer owned by this thread. - // - sys_mutex_lock(qd_server->lock); - ctx->owner_thread = CONTEXT_NO_OWNER; - sys_mutex_unlock(qd_server->lock); - } + case PN_CONNECTION_WAKE: + invoke_deferred_calls(ctx, false); + break; - // - // Wake up the proton driver to force it to reconsider its set of FDs - // in light of the processing that just occurred. - // - if (work_done) - qdpn_driver_wakeup(qd_server->driver); + case PN_TRANSPORT_ERROR: + if (ctx && ctx->connector) { /* Outgoing connection */ + const qd_server_config_t *config = &ctx->connector->config; + qd_log(qd_server->log_source, QD_LOG_TRACE, "Connection to %s failed", config->host_port); } - } - - return 0; -} - - -static void thread_start(qd_thread_t *thread) -{ - if (!thread) - return; - - thread->using_thread = 1; - thread->thread = sys_thread(thread_run, (void*) thread); -} - - -static void thread_cancel(qd_thread_t *thread) -{ - if (!thread) - return; - - thread->running = 0; - thread->canceled = 1; -} + break; + default: + break; + } // Switch event type -static void thread_join(qd_thread_t *thread) -{ - if (!thread) - return; + /* TODO aconway 2017-04-18: fold the container handler into the server */ + qd_container_handle_event(qd_server->container, e); - if (thread->using_thread) { - sys_thread_join(thread->thread); - sys_thread_free(thread->thread); + /* Free the connection after all other processing */ + if (ctx && pn_event_type(e) == PN_TRANSPORT_CLOSED) { + pn_connection_set_context(pn_conn, NULL); + qd_connection_free(ctx); } + return true; } - -static void thread_free(qd_thread_t *thread) +static void *thread_run(void *arg) { - if (!thread) - return; - - free(thread); + qd_server_t *qd_server = (qd_server_t*)arg; + bool running = true; + while (running) { + pn_event_batch_t *events = pn_proactor_wait(qd_server->proactor); + pn_event_t * e; + while (running && (e = pn_event_batch_next(events))) { + running = handle(e, qd_server); + } + pn_proactor_done(qd_server->proactor, events); + } + return NULL; } -static void cxtr_try_open(void *context) +/* Timer callback to try/retry connection open */ +static void try_open_lh(qd_connector_t *ct) { - qd_connector_t *ct = (qd_connector_t*) context; - if (ct->state != CXTR_STATE_CONNECTING) + if (ct->state != CXTR_STATE_CONNECTING) { + /* No longer referenced by pn_connection or timer */ + qd_connector_decref(ct); return; + } - qd_connection_t *ctx = connection_allocate(); - ctx->server = ct->server; - ctx->owner_thread = CONTEXT_UNSPECIFIED_OWNER; - ctx->pn_conn = pn_connection(); - ctx->collector = pn_collector(); - ctx->connector = ct; - ctx->context = ct->context; - - // Copy the role from the connector config - int role_length = strlen(ctx->connector->config->role) + 1; - ctx->role = (char*) malloc(role_length); - strcpy(ctx->role, ctx->connector->config->role); - - qd_log(ct->server->log_source, QD_LOG_INFO, "Connecting to %s:%s", ct->config->host, ct->config->port); - - pn_connection_collect(ctx->pn_conn, ctx->collector); - decorate_connection(ctx->server, ctx->pn_conn, ct->config); - - // - // qdpn_connector is not thread safe - // - sys_mutex_lock(ct->server->lock); - // Increment the connection id so the next connection can use it - ctx->connection_id = ct->server->next_connection_id++; - ctx->pn_cxtr = qdpn_connector(ct->server->driver, ct->config->host, ct->config->port, ct->config->protocol_family, (void*) ctx); - sys_mutex_unlock(ct->server->lock); - - const qd_server_config_t *config = ct->config; - - if (ctx->pn_cxtr == 0) { - sys_mutex_free(ctx->deferred_call_lock); - free_qd_connection(ctx); + qd_connection_t *ctx = qd_connection(ct->server, ct->config.role); + if (!ctx) { /* Try again later */ + qd_log(ct->server->log_source, QD_LOG_CRITICAL, "Allocation failure connecting to %s", + ct->config.host_port); ct->delay = 10000; qd_timer_schedule(ct->timer, ct->delay); return; } + ctx->connector = ct; + decorate_connection(ctx->server, ctx->pn_conn, &ct->config); + const qd_server_config_t *config = &ct->config; // - // Set the hostname on the pn_connection. This hostname will be used by proton as the hostname in the open frame. + // Set the hostname on the pn_connection. This hostname will be used by proton as the + // hostname in the open frame. // pn_connection_set_hostname(ctx->pn_conn, config->host); - // Set the sasl user name and password on the proton connection object. This has to be done before the call to qdpn_connector_transport() which - // binds the transport to the connection + // Set the sasl user name and password on the proton connection object. This has to be + // done before pn_proactor_connect which will bind a transport to the connection if(config->sasl_username) pn_connection_set_user(ctx->pn_conn, config->sasl_username); if (config->sasl_password) pn_connection_set_password(ctx->pn_conn, config->sasl_password); - qdpn_connector_set_connection(ctx->pn_cxtr, ctx->pn_conn); pn_connection_set_context(ctx->pn_conn, ctx); ctx->connector->state = CXTR_STATE_OPEN; - ct->ctx = ctx; ct->delay = 5000; - // - // Set up the transport, SASL, and SSL for the connection. - // - pn_transport_t *tport = qdpn_connector_transport(ctx->pn_cxtr); - - // - // Configure the transport - // - pn_transport_set_max_frame(tport, config->max_frame_size); - pn_transport_set_channel_max(tport, config->max_sessions - 1); - pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000); - - // - // Proton pushes out its trace to transport_tracer() which in turn writes a trace message to the qdrouter log - // - // If trace level logging is enabled on the router set PN_TRACE_DRV | PN_TRACE_FRM | PN_TRACE_RAW on the proton transport - pn_transport_set_context(tport, ctx); - if (qd_log_enabled(ct->server->log_source, QD_LOG_TRACE)) { - pn_transport_trace(tport, PN_TRACE_FRM); - pn_transport_set_tracer(tport, transport_tracer); - } - - ctx->owner_thread = CONTEXT_NO_OWNER; + qd_log(ct->server->log_source, QD_LOG_TRACE, + "[%"PRIu64"] Connecting to %s", ctx->connection_id, config->host_port); + /* Note: the transport is configured in the PN_CONNECTION_BOUND event */ + pn_proactor_connect(ct->server->proactor, ctx->pn_conn, config->host_port); } - static void setup_ssl_sasl_and_open(qd_connection_t *ctx) { qd_connector_t *ct = ctx->connector; - const qd_server_config_t *config = ct->config; - pn_transport_t *tport = qdpn_connector_transport(ctx->pn_cxtr); + const qd_server_config_t *config = &ct->config; + pn_transport_t *tport = pn_connection_transport(ctx->pn_conn); // // Set up SSL if appropriate @@ -1242,19 +886,16 @@ static void setup_ssl_sasl_and_open(qd_connection_t *ctx) if (!domain) { qd_error(QD_ERROR_RUNTIME, "SSL domain failed for connection to %s:%s", - ct->config->host, ct->config->port); - /* TODO aconway 2014-07-15: Close the connection, clean up. */ + ct->config.host, ct->config.port); return; } - /* TODO aconway 2014-07-15: error handling on all SSL calls. */ - // set our trusted database for checking the peer's cert: if (config->ssl_trusted_certificate_db) { if (pn_ssl_domain_set_trusted_ca_db(domain, config->ssl_trusted_certificate_db)) { qd_log(ct->server->log_source, QD_LOG_ERROR, "SSL CA configuration failed for %s:%s", - ct->config->host, ct->config->port); + ct->config.host, ct->config.port); } } // should we force the peer to provide a cert? @@ -1267,7 +908,7 @@ static void setup_ssl_sasl_and_open(qd_connection_t *ctx) trusted)) { qd_log(ct->server->log_source, QD_LOG_ERROR, "SSL peer auth configuration failed for %s:%s", - ct->config->host, ct->config->port); + config->host, config->port); } } @@ -1279,7 +920,7 @@ static void setup_ssl_sasl_and_open(qd_connection_t *ctx) config->ssl_password)) { qd_log(ct->server->log_source, QD_LOG_ERROR, "SSL local configuration failed for %s:%s", - ct->config->host, ct->config->port); + config->host, config->port); } } @@ -1288,7 +929,7 @@ static void setup_ssl_sasl_and_open(qd_connection_t *ctx) if (pn_ssl_domain_set_peer_authentication(domain, PN_SSL_VERIFY_PEER_NAME, NULL)) { qd_log(ct->server->log_source, QD_LOG_ERROR, "SSL peer host name verification failed for %s:%s", - ct->config->host, ct->config->port); + config->host, config->port); } } @@ -1310,58 +951,46 @@ static void setup_ssl_sasl_and_open(qd_connection_t *ctx) pn_connection_open(ctx->pn_conn); } - -static void heartbeat_cb(void *context) -{ - qd_server_t *qd_server = (qd_server_t*) context; - qdpn_activate_all(qd_server->driver); - qd_timer_schedule(qd_server->heartbeat_timer, HEARTBEAT_INTERVAL); +static void try_open_cb(void *context) { + qd_connector_t *ct = (qd_connector_t*) context; + sys_mutex_lock(ct->lock); + try_open_lh(ct); + sys_mutex_unlock(ct->lock); } qd_server_t *qd_server(qd_dispatch_t *qd, int thread_count, const char *container_name, const char *sasl_config_path, const char *sasl_config_name) { - int i; - + /* Initialize const members, 0 initialize all others. */ + qd_server_t tmp = { .thread_count = thread_count }; qd_server_t *qd_server = NEW(qd_server_t); if (qd_server == 0) return 0; + memcpy(qd_server, &tmp, sizeof(tmp)); qd_server->qd = qd; qd_server->log_source = qd_log_source("SERVER"); - qd_server->thread_count = thread_count; qd_server->container_name = container_name; qd_server->sasl_config_path = sasl_config_path; qd_server->sasl_config_name = sasl_config_name; - qd_server->driver = qdpn_driver(qd_server->log_source); - qd_server->conn_handler = 0; - qd_server->pn_event_handler = 0; - qd_server->signal_handler = 0; + qd_server->proactor = pn_proactor(); + qd_server->container = 0; qd_server->start_context = 0; - qd_server->signal_context = 0; qd_server->lock = sys_mutex(); qd_server->cond = sys_cond(); qd_timer_initialize(qd_server->lock); - qd_server->threads = NEW_PTR_ARRAY(qd_thread_t, thread_count); - for (i = 0; i < thread_count; i++) - qd_server->threads[i] = thread(qd_server, i); - - DEQ_INIT(qd_server->work_queue); - DEQ_INIT(qd_server->pending_timers); - qd_server->a_thread_is_waiting = false; qd_server->pause_requests = 0; qd_server->threads_paused = 0; qd_server->pause_next_sequence = 0; qd_server->pause_now_serving = 0; - qd_server->pending_signal = 0; - qd_server->signal_handler_running = false; - qd_server->heartbeat_timer = 0; qd_server->next_connection_id = 1; qd_server->py_displayname_obj = 0; - qd_server->http = qd_http_server(qd, qd_server->log_source); + + /* FIXME aconway 2017-01-20: restore HTTP support */ + qd_log(qd_server->log_source, QD_LOG_INFO, "Container Name: %s", qd_server->container_name); return qd_server; @@ -1371,191 +1000,64 @@ qd_server_t *qd_server(qd_dispatch_t *qd, int thread_count, const char *containe void qd_server_free(qd_server_t *qd_server) { if (!qd_server) return; - for (int i = 0; i < qd_server->thread_count; i++) - thread_free(qd_server->threads[i]); qd_http_server_free(qd_server->http); qd_timer_finalize(); - qdpn_driver_free(qd_server->driver); + pn_proactor_free(qd_server->proactor); sys_mutex_free(qd_server->lock); sys_cond_free(qd_server->cond); - free(qd_server->threads); Py_XDECREF((PyObject *)qd_server->py_displayname_obj); free(qd_server); } - -void qd_server_set_conn_handler(qd_dispatch_t *qd, - qd_conn_handler_cb_t handler, - qd_pn_event_handler_cb_t pn_event_handler, - qd_pn_event_complete_cb_t pn_event_complete_handler, - void *handler_context) -{ - qd->server->conn_handler = handler; - qd->server->pn_event_handler = pn_event_handler; - qd->server->pn_event_complete_handler = pn_event_complete_handler; - qd->server->conn_handler_context = handler_context; -} - - -void qd_server_set_signal_handler(qd_dispatch_t *qd, qd_signal_handler_cb_t handler, void *context) +void qd_server_set_container(qd_dispatch_t *qd, qd_container_t *container) { - qd->server->signal_handler = handler; - qd->server->signal_context = context; -} - - -static void qd_server_announce(qd_server_t* qd_server) -{ - qd_log(qd_server->log_source, QD_LOG_INFO, "Operational, %d Threads Running", qd_server->thread_count); -#ifndef NDEBUG - qd_log(qd_server->log_source, QD_LOG_INFO, "Running in DEBUG Mode"); -#endif + qd->server->container = container; } void qd_server_run(qd_dispatch_t *qd) { qd_server_t *qd_server = qd->server; - - int i; - if (!qd_server) - return; - - assert(qd_server->conn_handler); // Server can't run without a connection handler. - - for (i = 1; i < qd_server->thread_count; i++) - thread_start(qd_server->threads[i]); - - qd_server->heartbeat_timer = qd_timer(qd, heartbeat_cb, qd_server); - qd_timer_schedule(qd_server->heartbeat_timer, HEARTBEAT_INTERVAL); - - qd_server_announce(qd_server); - - thread_run((void*) qd_server->threads[0]); - - for (i = 1; i < qd_server->thread_count; i++) - thread_join(qd_server->threads[i]); - - for (i = 0; i < qd_server->thread_count; i++) - qd_server->threads[i]->canceled = 0; - - qd_log(qd_server->log_source, QD_LOG_INFO, "Shut Down"); -} - - -void qd_server_start(qd_dispatch_t *qd) -{ - qd_server_t *qd_server = qd->server; int i; + assert(qd_server); + assert(qd_server->container); // Server can't run without a container + qd_log(qd_server->log_source, QD_LOG_NOTICE, "Operational, %d Threads Running", + qd_server->thread_count); +#ifndef NDEBUG + qd_log(qd_server->log_source, QD_LOG_INFO, "Running in DEBUG Mode"); +#endif + int n = qd_server->thread_count - 1; /* Start count-1 threads + use current thread */ + sys_thread_t **threads = (sys_thread_t **)calloc(n, sizeof(sys_thread_t*)); + for (i = 0; i < n; i++) { + threads[i] = sys_thread(thread_run, qd_server); + } + thread_run(qd_server); /* Use the current thread */ + for (i = 0; i < n; i++) { + sys_thread_join(threads[i]); + sys_thread_free(threads[i]); + } + free(threads); - if (!qd_server) - return; - - assert(qd_server->conn_handler); // Server can't run without a connection handler. - - for (i = 0; i < qd_server->thread_count; i++) - thread_start(qd_server->threads[i]); - - qd_server->heartbeat_timer = qd_timer(qd, heartbeat_cb, qd_server); - qd_timer_schedule(qd_server->heartbeat_timer, HEARTBEAT_INTERVAL); - - qd_server_announce(qd_server); + qd_log(qd_server->log_source, QD_LOG_NOTICE, "Shut Down"); } void qd_server_stop(qd_dispatch_t *qd) { - qd_server_t *qd_server = qd->server; - int idx; - - sys_mutex_lock(qd_server->lock); - for (idx = 0; idx < qd_server->thread_count; idx++) - thread_cancel(qd_server->threads[idx]); - sys_cond_signal_all(qd_server->cond); - qdpn_driver_wakeup(qd_server->driver); - sys_mutex_unlock(qd_server->lock); - - if (thread_server != qd_server) { - for (idx = 0; idx < qd_server->thread_count; idx++) - thread_join(qd_server->threads[idx]); - qd_log(qd_server->log_source, QD_LOG_INFO, "Shut Down"); + /* Disconnect everything, interrupt threads */ + pn_proactor_disconnect(qd->server->proactor, NULL); + for (int i = 0; i < qd->server->thread_count; i++) { + pn_proactor_interrupt(qd->server->proactor); } } - -void qd_server_signal(qd_dispatch_t *qd, int signum) +void qd_server_activate(qd_connection_t *ctx) { - if (!qd) + if (!ctx || !ctx->pn_conn) return; - - qd_server_t *qd_server = qd->server; - - qd_server->pending_signal = signum; - sys_cond_signal_all(qd_server->cond); - qdpn_driver_wakeup(qd_server->driver); -} - - -void qd_server_pause(qd_dispatch_t *qd) -{ - qd_server_t *qd_server = qd->server; - - sys_mutex_lock(qd_server->lock); - - // - // Bump the request count to stop all the threads. - // - qd_server->pause_requests++; - int my_sequence = qd_server->pause_next_sequence++; - - // - // Awaken all threads that are currently blocking. - // - sys_cond_signal_all(qd_server->cond); - qdpn_driver_wakeup(qd_server->driver); - - // - // Wait for the paused thread count plus the number of threads requesting a pause to equal - // the total thread count. Also, don't exit the blocking loop until now_serving equals our - // sequence number. This ensures that concurrent pausers don't run at the same time. - // - while ((qd_server->threads_paused + qd_server->pause_requests < qd_server->thread_count) || - (my_sequence != qd_server->pause_now_serving)) - sys_cond_wait(qd_server->cond, qd_server->lock); - - sys_mutex_unlock(qd_server->lock); -} - - -void qd_server_resume(qd_dispatch_t *qd) -{ - qd_server_t *qd_server = qd->server; - - sys_mutex_lock(qd_server->lock); - qd_server->pause_requests--; - qd_server->pause_now_serving++; - sys_cond_signal_all(qd_server->cond); - sys_mutex_unlock(qd_server->lock); + pn_connection_wake(ctx->pn_conn); } - -void qd_server_activate(qd_connection_t *ctx, bool awaken) -{ - if (!ctx) - return; - - qdpn_connector_t *ctor = ctx->pn_cxtr; - if (!ctor) - return; - - if (!qdpn_connector_closed(ctor)) { - qdpn_connector_activate(ctor, QDPN_CONNECTOR_WRITABLE); - if (awaken) - qdpn_driver_wakeup(ctx->server->driver); - } -} - - void qd_connection_set_context(qd_connection_t *conn, void *context) { conn->user_context = context; @@ -1598,11 +1100,6 @@ bool qd_connection_inbound(qd_connection_t *conn) } -pn_collector_t *qd_connection_collector(qd_connection_t *conn) -{ - return conn->collector; -} - uint64_t qd_connection_connection_id(qd_connection_t *conn) { return conn->connection_id; @@ -1612,8 +1109,8 @@ uint64_t qd_connection_connection_id(qd_connection_t *conn) const qd_server_config_t *qd_connection_config(const qd_connection_t *conn) { if (conn->listener) - return conn->listener->config; - return conn->connector->config; + return &conn->listener->config; + return &conn->connector->config; } @@ -1628,131 +1125,106 @@ void qd_connection_invoke_deferred(qd_connection_t *conn, qd_deferred_t call, vo DEQ_INSERT_TAIL(conn->deferred_calls, dc); sys_mutex_unlock(conn->deferred_call_lock); - qd_server_activate(conn, true); + qd_server_activate(conn); } -void qd_connection_set_event_stall(qd_connection_t *conn, bool stall) +qd_listener_t *qd_server_listener(qd_server_t *server) { - conn->event_stall = stall; - if (!stall) - qd_server_activate(conn, true); -} + qd_listener_t *li = new_qd_listener_t(); + if (!li) return 0; + ZERO(li); -qd_listener_t *qd_server_listen(qd_dispatch_t *qd, const qd_server_config_t *config, void *context) -{ - qd_server_t *qd_server = qd->server; - qd_listener_t *li = new_qd_listener_t(); - - if (!li) - return 0; - - li->server = qd_server; - li->config = config; - li->context = context; + sys_atomic_init(&li->ref_count, 1); + li->server = server; li->http = NULL; - - if (config->http) { - li->http = qd_http_listener(qd_server->http, config); - if (!li->http) { - free_qd_listener_t(li); - qd_log(qd_server->log_source, QD_LOG_ERROR, "Cannot start HTTP listener on %s:%s", - config->host, config->port); - return NULL; - } - } - - li->pn_listener = qdpn_listener( - qd_server->driver, config->host, config->port, config->protocol_family, li); - - if (!li->pn_listener) { - free_qd_listener_t(li); - qd_log(qd_server->log_source, QD_LOG_ERROR, "Cannot start listener on %s:%s", - config->host, config->port); - return NULL; - } - qd_log(qd_server->log_source, QD_LOG_TRACE, "Listening on %s:%s%s", config->host, config->port, - config->http ? (config->ssl_profile ? "(HTTPS)":"(HTTP)") : ""); - return li; } -void qd_server_listener_free(qd_listener_t* li) -{ - if (!li) - return; - if (li->http) qd_http_listener_free(li->http); - qdpn_listener_free(li->pn_listener); - free_qd_listener_t(li); +bool qd_listener_listen(qd_listener_t *li) { + if (!li->pn_listener) { /* Not already listening */ + li->pn_listener = pn_listener(); + if (!li->pn_listener) { + qd_log(li->server->log_source, QD_LOG_ERROR, "No memory listening on %s", + li->config.host_port); + return false; + } + pn_listener_set_context(li->pn_listener, li); + /* Listen is asynchronous, log listening on PN_LISTENER_OPEN */ + sys_atomic_inc(&li->ref_count); + pn_proactor_listen(li->server->proactor, li->pn_listener, li->config.host_port, BACKLOG); + } + return true; } -void qd_server_listener_close(qd_listener_t* li) +void qd_listener_decref(qd_listener_t* li) { - if (li) - qdpn_listener_close(li->pn_listener); + if (li && sys_atomic_dec(&li->ref_count) == 1) { + qd_server_config_free(&li->config); + if (li->http) qd_http_listener_free(li->http); + free_qd_listener_t(li); + } } -qd_connector_t *qd_server_connect(qd_dispatch_t *qd, const qd_server_config_t *config, void *context) +qd_connector_t *qd_server_connector(qd_server_t *server) { - qd_server_t *qd_server = qd->server; qd_connector_t *ct = new_qd_connector_t(); - - if (!ct) + if (!ct) return 0; + sys_atomic_init(&ct->ref_count, 1); + ct->server = server; + ct->lock = sys_mutex(); + ct->timer = qd_timer(ct->server->qd, try_open_cb, ct); + if (!ct->lock || !ct->timer) { + qd_connector_decref(ct); return 0; + } + return ct; +} - ct->server = qd_server; + +bool qd_connector_connect(qd_connector_t *ct) +{ + sys_mutex_lock(ct->lock); ct->state = CXTR_STATE_CONNECTING; - ct->config = config; - ct->context = context; ct->ctx = 0; - ct->timer = qd_timer(qd, cxtr_try_open, (void*) ct); ct->delay = 0; - + /* Referenced by timer */ + sys_atomic_inc(&ct->ref_count); qd_timer_schedule(ct->timer, ct->delay); - return ct; + sys_mutex_unlock(ct->lock); + return true; } -void qd_server_connector_free(qd_connector_t* ct) +void qd_connector_decref(qd_connector_t* ct) { - // Don't free the proton connector. This will be done by the connector - // processing/cleanup. - - if (!ct) - return; - - if (ct->ctx) { - qdpn_connector_close(ct->ctx->pn_cxtr); - ct->ctx->connector = 0; + if (ct && sys_atomic_dec(&ct->ref_count) == 1) { + sys_mutex_lock(ct->lock); + if (ct->ctx) { + ct->ctx->connector = 0; + } + sys_mutex_unlock(ct->lock); + qd_server_config_free(&ct->config); + qd_timer_free(ct->timer); + free_qd_connector_t(ct); } - - qd_timer_free(ct->timer); - free_qd_connector_t(ct); } -void qd_server_timer_pending_LH(qd_timer_t *timer) -{ - DEQ_INSERT_TAIL(timer->server->pending_timers, timer); - qdpn_driver_wakeup(timer->server->driver); -} - - -void qd_server_timer_cancel_LH(qd_timer_t *timer) -{ - DEQ_REMOVE(timer->server->pending_timers, timer); +void qd_server_timeout(qd_server_t *server, qd_duration_t duration) { + pn_proactor_set_timeout(server->proactor, duration); } qd_dispatch_t* qd_server_dispatch(qd_server_t *server) { return server->qd; } const char* qd_connection_name(const qd_connection_t *c) { - return qdpn_connector_name(c->pn_cxtr); -} - -const char* qd_connection_hostip(const qd_connection_t *c) { - return qdpn_connector_hostip(c->pn_cxtr); + if (c->connector) { + return c->connector->config.host_port; + } else { + return c->rhost_port; + } } qd_connector_t* qd_connection_connector(const qd_connection_t *c) { @@ -1760,9 +1232,13 @@ qd_connector_t* qd_connection_connector(const qd_connection_t *c) { } const qd_server_config_t *qd_connector_config(const qd_connector_t *c) { - return c->config; + return &c->config; } -qd_http_listener_t *qd_listener_http(qd_listener_t *l) { - return l->http; +qd_http_listener_t *qd_listener_http(qd_listener_t *li) { + return li->http; +} + +const char* qd_connection_hostip(const qd_connection_t *c) { + return c->rhost; }
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/server_private.h ---------------------------------------------------------------------- diff --git a/src/server_private.h b/src/server_private.h index 9581196..a6543fa 100644 --- a/src/server_private.h +++ b/src/server_private.h @@ -19,34 +19,39 @@ * under the License. */ +#include <qpid/dispatch/atomic.h> #include <qpid/dispatch/enum.h> #include <qpid/dispatch/server.h> +#include <qpid/dispatch/threading.h> #include "alloc.h" #include <qpid/dispatch/ctools.h> #include <qpid/dispatch/log.h> -#include <qpid/dispatch/driver.h> #include <proton/engine.h> #include <proton/event.h> +#include <proton/ssl.h> #include "dispatch_private.h" #include "timer_private.h" #include "http.h" -void qd_server_timer_pending_LH(qd_timer_t *timer); -void qd_server_timer_cancel_LH(qd_timer_t *timer); +#include <netdb.h> /* For NI_MAXHOST/NI_MAXSERV */ -struct qd_dispatch_t* qd_server_dispatch(qd_server_t *server); +qd_dispatch_t* qd_server_dispatch(qd_server_t *server); +void qd_server_timeout(qd_server_t *server, qd_duration_t delay); const char* qd_connection_name(const qd_connection_t *c); -const char* qd_connection_hostip(const qd_connection_t *c); qd_connector_t* qd_connection_connector(const qd_connection_t *c); +const char* qd_connection_hostip(const qd_connection_t *c); const qd_server_config_t *qd_connector_config(const qd_connector_t *c); qd_http_listener_t *qd_listener_http(qd_listener_t *l); -#define CONTEXT_NO_OWNER -1 -#define CONTEXT_UNSPECIFIED_OWNER -2 +qd_listener_t *qd_server_listener(qd_server_t *server); +qd_connector_t *qd_server_connector(qd_server_t *server); +void qd_connector_decref(qd_connector_t* ct); +void qd_listener_decref(qd_listener_t* ct); +void qd_server_config_free(qd_server_config_t *cf); typedef enum { CXTR_STATE_CONNECTING = 0, @@ -55,8 +60,6 @@ typedef enum { } cxtr_state_t; - - typedef struct qd_deferred_call_t { DEQ_LINKS(struct qd_deferred_call_t); qd_deferred_t call; @@ -73,19 +76,63 @@ typedef struct qd_pn_free_link_session_t { DEQ_DECLARE(qd_pn_free_link_session_t, qd_pn_free_link_session_list_t); +#ifndef NI_MAXHOST +# define NI_MAXHOST 1025 +#endif + +#ifndef NI_MAXSERV +# define NI_MAXSERV 32 +#endif + +/** + * Listener objects represent the desire to accept incoming transport connections. + */ +struct qd_listener_t { + /* May be referenced by connection_manager and pn_listener_t */ + sys_atomic_t ref_count; + qd_server_t *server; + qd_server_config_t config; + pn_listener_t *pn_listener; + qd_http_listener_t *http; + DEQ_LINKS(qd_listener_t); + bool exit_on_error; +}; + +DEQ_DECLARE(qd_listener_t, qd_listener_list_t); + + +/** + * Connector objects represent the desire to create and maintain an outgoing transport connection. + */ +struct qd_connector_t { + /* May be referenced by connection_manager, timer and pn_connection_t */ + sys_atomic_t ref_count; + qd_server_t *server; + qd_server_config_t config; + qd_timer_t *timer; + long delay; + + /* Connector state and ctx can be modified in proactor or management threads. */ + sys_mutex_t *lock; + cxtr_state_t state; + qd_connection_t *ctx; + DEQ_LINKS(qd_connector_t); +}; + +DEQ_DECLARE(qd_connector_t, qd_connector_list_t); + + /** * Connection objects wrap Proton connection objects. */ struct qd_connection_t { DEQ_LINKS(qd_connection_t); + char *name; qd_server_t *server; bool opened; // An open callback was invoked for this connection bool closed; - int owner_thread; int enqueued; - qdpn_connector_t *pn_cxtr; pn_connection_t *pn_conn; - pn_collector_t *collector; pn_ssl_t *ssl; qd_listener_t *listener; qd_connector_t *connector; @@ -102,10 +149,11 @@ struct qd_connection_t { void *open_container; qd_deferred_call_list_t deferred_calls; sys_mutex_t *deferred_call_lock; - bool event_stall; bool policy_counted; char *role; //The specified role of the connection, e.g. "normal", "inter-router", "route-container" etc. qd_pn_free_link_session_list_t free_link_session_list; + char rhost[NI_MAXHOST]; /* Remote host numeric IP for incoming connections */ + char rhost_port[NI_MAXHOST+NI_MAXSERV]; /* Remote host:port for incoming connections */ }; DEQ_DECLARE(qd_connection_t, qd_connection_list_t); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/timer.c ---------------------------------------------------------------------- diff --git a/src/timer.c b/src/timer.c index b2121ae..988c086 100644 --- a/src/timer.c +++ b/src/timer.c @@ -25,11 +25,15 @@ #include "alloc.h" #include <assert.h> #include <stdio.h> +#include <time.h> -static sys_mutex_t *lock; -static qd_timer_list_t idle_timers; -static qd_timer_list_t scheduled_timers; -static qd_timestamp_t time_base; +static sys_mutex_t *lock = NULL; +static qd_timer_list_t idle_timers = {0}; +static qd_timer_list_t scheduled_timers = {0}; +/* Timers have relative delta_time measured from the previous timer. + * The delta_time of the first timer on the queue is measured from timer_base. + */ +static qd_timestamp_t time_base = 0; ALLOC_DECLARE(qd_timer_t); ALLOC_DEFINE(qd_timer_t); @@ -41,30 +45,35 @@ sys_mutex_t* qd_timer_lock() { return lock; } // Private static functions //========================================================================= -static void qd_timer_cancel_LH(qd_timer_t *timer) +static void timer_cancel_LH(qd_timer_t *timer) { - switch (timer->state) { - case TIMER_FREE: - assert(0); - break; - - case TIMER_IDLE: - break; - - case TIMER_SCHEDULED: + if (timer->scheduled) { if (timer->next) timer->next->delta_time += timer->delta_time; DEQ_REMOVE(scheduled_timers, timer); DEQ_INSERT_TAIL(idle_timers, timer); - break; - - case TIMER_PENDING: - qd_server_timer_cancel_LH(timer); - DEQ_INSERT_TAIL(idle_timers, timer); - break; + timer->scheduled = false; } +} - timer->state = TIMER_IDLE; +/* Adjust timer's time_base and delays for the current time. */ +static void timer_adjust_now_LH() +{ + qd_timestamp_t now = qd_timer_now(); + if (time_base != 0 && now > time_base) { + qd_duration_t delta = now - time_base; + /* Adjust timer delays by removing duration delta, starting from timer. */ + for (qd_timer_t *timer = DEQ_HEAD(scheduled_timers); delta > 0 && timer; timer = DEQ_NEXT(timer)) { + if (timer->delta_time >= delta) { + timer->delta_time -= delta; + delta = 0; + } else { + delta -= timer->delta_time; + timer->delta_time = 0; /* Ready to fire */ + } + } + } + time_base = now; } @@ -72,6 +81,7 @@ static void qd_timer_cancel_LH(qd_timer_t *timer) // Public Functions from timer.h //========================================================================= + qd_timer_t *qd_timer(qd_dispatch_t *qd, qd_timer_cb_t cb, void* context) { qd_timer_t *timer = new_qd_timer_t(); @@ -84,8 +94,7 @@ qd_timer_t *qd_timer(qd_dispatch_t *qd, qd_timer_cb_t cb, void* context) timer->handler = cb; timer->context = context; timer->delta_time = 0; - timer->state = TIMER_IDLE; - + timer->scheduled = false; sys_mutex_lock(lock); DEQ_INSERT_TAIL(idle_timers, timer); sys_mutex_unlock(lock); @@ -98,73 +107,55 @@ void qd_timer_free(qd_timer_t *timer) { if (!timer) return; sys_mutex_lock(lock); - qd_timer_cancel_LH(timer); + timer_cancel_LH(timer); DEQ_REMOVE(idle_timers, timer); sys_mutex_unlock(lock); - - timer->state = TIMER_FREE; free_qd_timer_t(timer); } -void qd_timer_schedule(qd_timer_t *timer, qd_timestamp_t duration) -{ - qd_timer_t *ptr; - qd_timer_t *last; - qd_timestamp_t total_time; +qd_timestamp_t qd_timer_now() { + struct timespec tv; + clock_gettime(CLOCK_REALTIME, &tv); + return ((qd_timestamp_t)tv.tv_sec) * 1000 + tv.tv_nsec / 1000000; +} + +void qd_timer_schedule(qd_timer_t *timer, qd_duration_t duration) +{ sys_mutex_lock(lock); - qd_timer_cancel_LH(timer); // Timer is now on the idle list - assert(timer->state == TIMER_IDLE); + timer_cancel_LH(timer); // Timer is now on the idle list DEQ_REMOVE(idle_timers, timer); // - // Handle the special case of a zero-time scheduling. In this case, - // the timer doesn't go on the scheduled list. It goes straight to the - // pending list in the server. - // - if (duration == 0) { - timer->state = TIMER_PENDING; - qd_server_timer_pending_LH(timer); - sys_mutex_unlock(lock); - return; - } - - // // Find the insert point in the schedule. // - total_time = 0; - ptr = DEQ_HEAD(scheduled_timers); - assert(!ptr || ptr->prev == 0); - while (ptr) { - total_time += ptr->delta_time; - if (total_time > duration) - break; + timer_adjust_now_LH(); /* Adjust the timers for current time */ + + /* Invariant: time_before == total time up to but not including ptr */ + qd_timer_t *ptr = DEQ_HEAD(scheduled_timers); + qd_duration_t time_before = 0; + while (ptr && time_before + ptr->delta_time < duration) { + time_before += ptr->delta_time; ptr = ptr->next; } - - // - // Insert the timer into the schedule and adjust the delta time - // of the following timer if present. - // - if (total_time <= duration) { - assert(ptr == 0); - timer->delta_time = duration - total_time; + /* ptr is the first timer to exceed duration or NULL if we ran out */ + if (!ptr) { + timer->delta_time = duration - time_before; DEQ_INSERT_TAIL(scheduled_timers, timer); } else { - total_time -= ptr->delta_time; - timer->delta_time = duration - total_time; - assert(ptr->delta_time > timer->delta_time); + timer->delta_time = duration - time_before; ptr->delta_time -= timer->delta_time; - last = ptr->prev; - if (last) - DEQ_INSERT_AFTER(scheduled_timers, timer, last); + ptr = ptr->prev; + if (ptr) + DEQ_INSERT_AFTER(scheduled_timers, timer, ptr); else DEQ_INSERT_HEAD(scheduled_timers, timer); } + timer->scheduled = true; - timer->state = TIMER_SCHEDULED; - + qd_timer_t *first = DEQ_HEAD(scheduled_timers); + qd_server_timeout(first->server, first->delta_time); sys_mutex_unlock(lock); } @@ -172,7 +163,7 @@ void qd_timer_schedule(qd_timer_t *timer, qd_timestamp_t duration) void qd_timer_cancel(qd_timer_t *timer) { sys_mutex_lock(lock); - qd_timer_cancel_LH(timer); + timer_cancel_LH(timer); sys_mutex_unlock(lock); } @@ -181,6 +172,7 @@ void qd_timer_cancel(qd_timer_t *timer) // Private Functions from timer_private.h //========================================================================= + void qd_timer_initialize(sys_mutex_t *server_lock) { lock = server_lock; @@ -196,47 +188,22 @@ void qd_timer_finalize(void) } -qd_timestamp_t qd_timer_next_duration_LH(void) +/* Execute all timers that are ready and set up next timeout. */ +void qd_timer_visit() { + sys_mutex_lock(lock); + timer_adjust_now_LH(); qd_timer_t *timer = DEQ_HEAD(scheduled_timers); - if (timer) - return timer->delta_time; - return -1; -} - - -void qd_timer_visit_LH(qd_timestamp_t current_time) -{ - qd_timestamp_t delta; - qd_timer_t *timer = DEQ_HEAD(scheduled_timers); - - if (time_base == 0) { - time_base = current_time; - return; - } - - delta = current_time - time_base; - time_base = current_time; - - while (timer) { - assert(delta >= 0); - if (timer->delta_time > delta) { - timer->delta_time -= delta; - break; - } else { - DEQ_REMOVE_HEAD(scheduled_timers); - delta -= timer->delta_time; - timer->state = TIMER_PENDING; - qd_server_timer_pending_LH(timer); - - } + while (timer && timer->delta_time == 0) { + timer_cancel_LH(timer); /* Removes timer from scheduled_timers */ + sys_mutex_unlock(lock); + timer->handler(timer->context); /* Call the handler outside the lock, may re-schedule */ + sys_mutex_lock(lock); timer = DEQ_HEAD(scheduled_timers); } -} - - -void qd_timer_idle_LH(qd_timer_t *timer) -{ - timer->state = TIMER_IDLE; - DEQ_INSERT_TAIL(idle_timers, timer); + qd_timer_t *first = DEQ_HEAD(scheduled_timers); + if (first) { + qd_server_timeout(first->server, first->delta_time); + } + sys_mutex_unlock(lock); } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/timer_private.h ---------------------------------------------------------------------- diff --git a/src/timer_private.h b/src/timer_private.h index 4acd988..537eb4b 100644 --- a/src/timer_private.h +++ b/src/timer_private.h @@ -23,30 +23,20 @@ #include <qpid/dispatch/timer.h> #include <qpid/dispatch/threading.h> -typedef enum { - TIMER_FREE, - TIMER_IDLE, - TIMER_SCHEDULED, - TIMER_PENDING -} qd_timer_state_t; - - struct qd_timer_t { DEQ_LINKS(qd_timer_t); qd_server_t *server; qd_timer_cb_t handler; void *context; qd_timestamp_t delta_time; - qd_timer_state_t state; + bool scheduled; /* true means on scheduled list, false on idle list */ }; DEQ_DECLARE(qd_timer_t, qd_timer_list_t); void qd_timer_initialize(sys_mutex_t *server_lock); void qd_timer_finalize(void); -qd_timestamp_t qd_timer_next_duration_LH(void); -void qd_timer_visit_LH(qd_timestamp_t current_time); -void qd_timer_idle_LH(qd_timer_t *timer); +void qd_timer_visit(); /// For tests only sys_mutex_t* qd_timer_lock(); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 314ad50..bc62232 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -29,9 +29,9 @@ set(unit_test_SOURCES compose_test.c policy_test.c run_unit_tests.c - timer_test.c tool_test.c failoverlist_test.c + timer_test.c ) if (USE_MEMORY_POOL) list(APPEND unit_test_SOURCES alloc_test.c) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/tests/run_unit_tests.c ---------------------------------------------------------------------- diff --git a/tests/run_unit_tests.c b/tests/run_unit_tests.c index e481f13..c8c4ef3 100644 --- a/tests/run_unit_tests.c +++ b/tests/run_unit_tests.c @@ -24,7 +24,7 @@ #include <stdio.h> int tool_tests(void); -int timer_tests(void); +int timer_tests(qd_dispatch_t*); int alloc_tests(void); int compose_tests(void); int policy_tests(void); @@ -52,7 +52,7 @@ int main(int argc, char** argv) printf("Config failed: %s\n", qd_error_message()); return 1; } - result += timer_tests(); + result += timer_tests(qd); result += tool_tests(); result += compose_tests(); #if USE_MEMORY_POOL http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/tests/system_tests_management.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_management.py b/tests/system_tests_management.py index b9535a8..c26741f 100644 --- a/tests/system_tests_management.py +++ b/tests/system_tests_management.py @@ -169,7 +169,7 @@ class ManagementTest(system_test.TestCase): attributes = {'name':'foo', 'port':str(port), 'role':'normal', 'saslMechanisms': 'ANONYMOUS', 'authenticatePeer': False} entity = self.assert_create_ok(LISTENER, 'foo', attributes) self.assertEqual(entity['name'], 'foo') - self.assertEqual(entity['host'], '127.0.0.1') + self.assertEqual(entity['host'], '') # Connect via the new listener node3 = self.cleanup(Node.connect(Url(port=port))) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/tests/system_tests_policy.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_policy.py b/tests/system_tests_policy.py index 2ce3778..120137d 100644 --- a/tests/system_tests_policy.py +++ b/tests/system_tests_policy.py @@ -275,66 +275,29 @@ class SenderReceiverLimits(TestCase): def test_verify_n_receivers(self): n = 4 addr = self.address() - - # connection should be ok - denied = False - try: - br1 = BlockingConnection(addr) - except ConnectionException: - denied = True - - self.assertFalse(denied) # assert if connections that should open did not open + br1 = BlockingConnection(addr) # n receivers OK - try: - r1 = br1.create_receiver(address="****YES_1of4***") - r2 = br1.create_receiver(address="****YES_20f4****") - r3 = br1.create_receiver(address="****YES_3of4****") - r4 = br1.create_receiver(address="****YES_4of4****") - except Exception: - denied = True - - self.assertFalse(denied) # n receivers should have worked + br1.create_receiver(address="****YES_1of4***") + br1.create_receiver(address="****YES_20f4****") + br1.create_receiver(address="****YES_3of4****") + br1.create_receiver(address="****YES_4of4****") # receiver n+1 should be denied - try: - r5 = br1.create_receiver("****NO****") - except Exception: - denied = True - - self.assertTrue(denied) # receiver n+1 should have failed + self.assertRaises(LinkDetached, br1.create_receiver, "****NO****") br1.close() def test_verify_n_senders(self): n = 2 addr = self.address() - - # connection should be ok - denied = False - try: - bs1 = BlockingConnection(addr) - except ConnectionException: - denied = True - - self.assertFalse(denied) # assert if connections that should open did not open + bs1 = BlockingConnection(addr) # n senders OK - try: - s1 = bs1.create_sender(address="****YES_1of2****") - s2 = bs1.create_sender(address="****YES_2of2****") - except Exception: - denied = True - - self.assertFalse(denied) # n senders should have worked - - # receiver n+1 should be denied - try: - s3 = bs1.create_sender("****NO****") - except Exception: - denied = True - - self.assertTrue(denied) # sender n+1 should have failed + bs1.create_sender(address="****YES_1of2****") + bs1.create_sender(address="****YES_2of2****") + # sender n+1 should be denied + self.assertRaises(LinkDetached, bs1.create_sender, "****NO****") bs1.close() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
