DISPATCH-390: Restore HTTP using libwebsockets - single HTTP thread uses libwebsockets standard polling features - works with released libwebsockets, no patches required
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/16980f67 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/16980f67 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/16980f67 Branch: refs/heads/master Commit: 16980f6703f0b2b03a320c89cad60cdf669770d7 Parents: 20b06ac Author: Alan Conway <[email protected]> Authored: Wed Apr 12 19:29:14 2017 -0400 Committer: Alan Conway <[email protected]> Committed: Thu Apr 27 13:31:36 2017 -0400 ---------------------------------------------------------------------- CMakeLists.txt | 5 +- cmake/FindLibWebSockets.cmake | 20 +- include/qpid/dispatch/server.h | 11 + src/connection_manager.c | 3 +- src/container.c | 2 +- src/http-libwebsockets.c | 796 ++++++++++++++++++++-------------- src/http-none.c | 22 +- src/http.h | 19 +- src/policy.c | 8 +- src/server.c | 103 +++-- src/server_private.h | 14 +- tests/system_tests_http.py | 8 + tests/system_tests_one_router.py | 8 + 13 files changed, 584 insertions(+), 435 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index a06c67f..46f651e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -103,11 +103,8 @@ find_library(rt_lib rt) find_package(Proton 0.15 REQUIRED) ## Optional dependencies - include(FindLibWebSockets) -# FIXME aconway 2017-01-19: websockets disbled for temporary proactor work. -# option(USE_LIBWEBSOCKETS "Use libwebsockets for WebSocket support" ${LIBWEBSOCKETS_FOUND}) -set(USE_LIBWEBSOCKETS OFF) +option(USE_LIBWEBSOCKETS "Use libwebsockets for WebSocket support" ${LIBWEBSOCKETS_FOUND}) ## ## Find Valgrind http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/cmake/FindLibWebSockets.cmake ---------------------------------------------------------------------- diff --git a/cmake/FindLibWebSockets.cmake b/cmake/FindLibWebSockets.cmake index 18ef18a..a15d0d2 100644 --- a/cmake/FindLibWebSockets.cmake +++ b/cmake/FindLibWebSockets.cmake @@ -44,22 +44,10 @@ find_path(LIBWEBSOCKETS_INCLUDE_DIRS include(FindPackageHandleStandardArgs) -find_package_handle_standard_args(LIBWEBSOCKETS DEFAULT_MSG LIBWEBSOCKETS_LIBRARIES LIBWEBSOCKETS_INCLUDE_DIRS) - -if(LIBWEBSOCKETS_FOUND) - # For the moment we need a patched version of LibWebSockets: - # https://github.com/alanconway/libwebsockets/tree/v2.1-stable-aconway-adopt-ssl - # This function check verifies we have it. - set(CMAKE_REQUIRED_INCLUDES ${LIBWEBSOCKETS_INCLUDE_DIRS}) - set(CMAKE_REQUIRED_LIBRARIES ${LIBWEBSOCKETS_LIBRARIES}) - check_function_exists(lws_adopt_socket_vhost LWS_ADOPT_SOCKET_VHOST_FOUND) - if (NOT LWS_ADOPT_SOCKET_VHOST_FOUND) - message("Cannot use LibWebSockets, no function lws_adopt_socket_vhost") - unset(LIBWEBSOCKETS_FOUND) - endif() -endif() +find_package_handle_standard_args( + LIBWEBSOCKETS DEFAULT_MSG LIBWEBSOCKETS_LIBRARIES LIBWEBSOCKETS_INCLUDE_DIRS) if(NOT LIBWEBSOCKETS_FOUND) - set(LIBWEBSOCKETS_LIBRARIES "") - set(LIBWEBSOCKETS_INCLUDE_DIRS "") + unset(LIBWEBSOCKETS_LIBRARIES) + unset(LIBWEBSOCKETS_INCLUDE_DIRS) endif() http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/include/qpid/dispatch/server.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h index ec885ae..eb5214b 100644 --- a/include/qpid/dispatch/server.h +++ b/include/qpid/dispatch/server.h @@ -519,6 +519,17 @@ bool qd_connector_connect(qd_connector_t *ct); qd_error_t qd_register_display_name_service(qd_dispatch_t *qd, void *display_name_service); /** + * Get the name of the connection, based on its IP address. + */ +const char* qd_connection_name(const qd_connection_t *c); + + +/** + * Get the remote host IP address of the connection. + */ +const char* qd_connection_remote_ip(const qd_connection_t *c); + +/** * @} */ http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/src/connection_manager.c ---------------------------------------------------------------------- diff --git a/src/connection_manager.c b/src/connection_manager.c index 4bc18ce..fefc961 100644 --- a/src/connection_manager.c +++ b/src/connection_manager.c @@ -607,8 +607,7 @@ void qd_connection_manager_start(qd_dispatch_t *qd) while (li) { if (!li->pn_listener) { - qd_listener_listen(li); - if (!li->pn_listener && first_start) { + if (!qd_listener_listen(li) && first_start) { qd_log(qd->connection_manager->log_source, QD_LOG_CRITICAL, "Listen on %s failed during initial config", li->config.host_port); exit(1); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/src/container.c ---------------------------------------------------------------------- diff --git a/src/container.c b/src/container.c index ddc0418..639ffa2 100644 --- a/src/container.c +++ b/src/container.c @@ -375,7 +375,7 @@ static void add_link_to_free_list(qd_pn_free_link_session_list_t *free_link_ses /* - * FIXME aconway 2017-04-12: IMO this should not be necessary, we should + * TODO aconway 2017-04-12: IMO this should not be necessary, we should * be able to pn_*_free links and sessions directly the handler function. * They will not actually be freed from memory till the event, connection, * proactor etc. have all released their references. http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/src/http-libwebsockets.c ---------------------------------------------------------------------- diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c index 745b090..9f6fbe8 100644 --- a/src/http-libwebsockets.c +++ b/src/http-libwebsockets.c @@ -19,10 +19,11 @@ #include <qpid/dispatch/atomic.h> #include <qpid/dispatch/amqp.h> -#include <qpid/dispatch/driver.h> #include <qpid/dispatch/threading.h> #include <qpid/dispatch/timer.h> +#include <proton/connection_driver.h> + #include <libwebsockets.h> #include <assert.h> @@ -33,432 +34,555 @@ #include "server_private.h" #include "config.h" +static const char *CIPHER_LIST = "ALL:aNULL:!eNULL:@STRENGTH"; /* Default */ + +/* Log for LWS messages. For dispatch server messages use qd_http_server_t::log */ static qd_log_source_t* http_log; -static const char *CIPHER_LIST = "ALL:aNULL:!eNULL:@STRENGTH"; +static qd_log_level_t qd_level(int lll) { + switch (lll) { + case LLL_ERR: return QD_LOG_ERROR; + case LLL_WARN: return QD_LOG_WARNING; + /* LWS is noisy compared to dispatch on the informative levels, downgrade */ + case LLL_NOTICE: return QD_LOG_DEBUG; + default: return QD_LOG_TRACE; /* Everything else to trace */ + } +} + +static void logger(int lll, const char *line) { + size_t len = strlen(line); + while (len > 1 && isspace(line[len-1])) { /* Strip trailing newline */ + --len; + } + qd_log(http_log, qd_level(lll), "%.*s", len, line); +} + +static void log_init() { + http_log = qd_log_source("HTTP"); + int levels = 0; + for (int i = 0; i < LLL_COUNT; ++i) { + int lll = 1<<i; + levels |= qd_log_enabled(http_log, qd_level(lll)) ? lll : 0; + } + lws_set_log_level(levels, logger); +} + +/* Intermediate write buffer: LWS needs extra header space on write. */ +typedef struct buffer_t { + char *start; + size_t size, cap; +} buffer_t; + +/* Ensure size bytes in buffer, make buf empty if alloc fails */ +static void buffer_set_size(buffer_t *buf, size_t size) { + if (size > buf->cap) { + buf->cap = (size > buf->cap * 2) ? size : buf->cap * 2; + buf->start = realloc(buf->start, buf->cap); + } + if (buf->start) { + buf->size = size; + } else { + buf->size = buf->cap = 0; + } +} -/* Associate file-descriptors, LWS instances and qdpn_connectors */ -typedef struct fd_data_t { - qdpn_connector_t *connector; +/* AMQPWS connection: set as lws user data and qd_conn->context */ +typedef struct connection_t { + pn_connection_driver_t driver; + qd_connection_t* qd_conn; + buffer_t wbuf; /* LWS requires allocated header space at start of buffer */ struct lws *wsi; -} fd_data_t; +} connection_t; -/* HTTP server state shared by all listeners */ +/* Navigating from WSI pointer to qd objects */ +static qd_http_server_t *wsi_server(struct lws *wsi); +static qd_http_listener_t *wsi_listener(struct lws *wsi); +static qd_log_source_t *wsi_log(struct lws *wsi); + + +/* Declare LWS callbacks and protocol list */ +static int callback_http(struct lws *wsi, enum lws_callback_reasons reason, + void *user, void *in, size_t len); +static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason, + void *user, void *in, size_t len); + +static struct lws_protocols protocols[] = { + /* HTTP only protocol comes first */ + { + "http-only", + callback_http, + 0, + }, + /* "amqp" is the official oasis AMQP over WebSocket protocol name */ + { + "amqp", + callback_amqpws, + sizeof(connection_t), + }, + /* "binary" is an alias for "amqp", for compatibility with clients designed + * to work with a WebSocket proxy + */ + { + "binary", + callback_amqpws, + sizeof(connection_t), + }, + { NULL, NULL, 0, 0 } /* terminator */ +}; + + +static inline int unexpected_close(struct lws *wsi, const char *msg) { + lws_close_reason(wsi, LWS_CLOSE_STATUS_UNEXPECTED_CONDITION, + (unsigned char*)msg, strlen(msg)); + char peer[64]; + lws_get_peer_simple(wsi, peer, sizeof(peer)); + qd_log(wsi_log(wsi), QD_LOG_ERROR, "Error on HTTP connection from %s: %s", peer, msg); + return -1; +} + +static int handle_events(connection_t* c) { + if (!c->qd_conn) { + return unexpected_close(c->wsi, "not-established"); + } + pn_event_t *e; + while ((e = pn_connection_driver_next_event(&c->driver))) { + qd_connection_handle(c->qd_conn, e); + } + if (pn_connection_driver_write_buffer(&c->driver).size) { + lws_callback_on_writable(c->wsi); + } + if (pn_connection_driver_finished(&c->driver)) { + lws_close_reason(c->wsi, LWS_CLOSE_STATUS_NORMAL, NULL, 0); + return -1; + } + return 0; +} + +/* The server has a bounded, thread-safe queue for external work */ +typedef struct work_t { + enum { W_NONE, W_LISTEN, W_CLOSE, W_WAKE, W_STOP } type; + void *value; +} work_t; + +#define WORK_MAX 8 /* Just decouple threads, not a big buffer */ + +typedef struct work_queue_t { + sys_mutex_t *lock; + sys_cond_t *cond; + work_t work[WORK_MAX]; + size_t head, len; /* Ring buffer */ +} work_queue_t; + +/* HTTP Server runs in a single thread, communication from other threads via work_queue */ struct qd_http_server_t { - qd_dispatch_t *dispatch; + qd_server_t *server; + sys_thread_t *thread; + work_queue_t work; qd_log_source_t *log; - sys_mutex_t *lock; /* For now use LWS as a thread-unsafe library. */ struct lws_context *context; - qd_timer_t *timer; - int vhost_id; /* unique identifier for vhost name */ - fd_data_t *fd; /* indexed by file descriptor */ - size_t fd_len; + pn_timestamp_t now; /* Cache current time in thread_run */ + pn_timestamp_t next_tick; /* Next requested tick service */ }; -/* Per-HTTP-listener */ +static void work_queue_destroy(work_queue_t *wq) { + if (wq->lock) sys_mutex_free(wq->lock); + if (wq->cond) sys_cond_free(wq->cond); +} + +static void work_queue_init(work_queue_t *wq) { + wq->lock = sys_mutex(); + wq->cond = sys_cond(); +} + + /* Block till there is space */ +static void work_push(qd_http_server_t *hs, work_t w) { + work_queue_t *wq = &hs->work; + sys_mutex_lock(wq->lock); + while (wq->len == WORK_MAX) { + lws_cancel_service(hs->context); /* Wake up the run thread to clear space */ + sys_cond_wait(wq->cond, wq->lock); + } + wq->work[(wq->head + wq->len) % WORK_MAX] = w; + ++wq->len; + sys_mutex_unlock(wq->lock); + lws_cancel_service(hs->context); /* Wake up the run thread to handle my work */ +} + +/* Non-blocking, return { W_NONE, NULL } if empty */ +static work_t work_pop(qd_http_server_t *hs) { + work_t w = { W_NONE, NULL }; + work_queue_t *wq = &hs->work; + sys_mutex_lock(wq->lock); + if (wq->len > 0) { + w = wq->work[wq->head]; + wq->head = (wq->head + 1) % WORK_MAX; + --wq->len; + sys_cond_signal(wq->cond); + } + sys_mutex_unlock(wq->lock); + return w; +} + +/* Each qd_http_listener_t is associated with an lws_vhost */ struct qd_http_listener_t { + qd_listener_t *listener; qd_http_server_t *server; struct lws_vhost *vhost; struct lws_http_mount mount; - char name[256]; /* vhost name */ }; -/* Get wsi/connector associated with fd or NULL if nothing on record. */ -static inline fd_data_t *fd_data(qd_http_server_t *s, int fd) { - fd_data_t *d = (fd < s->fd_len) ? &s->fd[fd] : NULL; - return (d && (d->connector || d->wsi)) ? d : NULL; +void qd_http_listener_free(qd_http_listener_t *hl) { + if (!hl) return; + if (hl->listener) { + hl->listener->http = NULL; + qd_listener_decref(hl->listener); + } + free(hl); } -static inline qd_http_server_t *wsi_http_server(struct lws *wsi) { - return (qd_http_server_t*)lws_context_user(lws_get_context(wsi)); +static qd_http_listener_t *qd_http_listener(qd_http_server_t *hs, qd_listener_t *li) { + qd_http_listener_t *hl = calloc(1, sizeof(*hl)); + if (hl) { + hl->server = hs; + hl->listener = li; + li->http = hl; + sys_atomic_inc(&li->ref_count); /* Keep it around till qd_http_server_free() */ + } else { + qd_log(hs->log, QD_LOG_CRITICAL, "No memory for HTTP listen on %s", + li->config.host_port); + } + return hl; } -static inline qdpn_connector_t *wsi_connector(struct lws *wsi) { - fd_data_t *d = fd_data(wsi_http_server(wsi), lws_get_socket_fd(wsi)); - return d ? d->connector : NULL; -} +static void listener_start(qd_http_listener_t *hl, qd_http_server_t *hs) { + log_init(); /* Update log flags at each listener */ -static inline fd_data_t *set_fd(qd_http_server_t *s, int fd, qdpn_connector_t *c, struct lws *wsi) { - if (!s->fd || fd >= s->fd_len) { - size_t oldlen = s->fd_len; - s->fd_len = fd + 16; /* Don't double, low-range FDs will be re-used first. */ - void *newfds = realloc(s->fd, s->fd_len*sizeof(*s->fd)); - if (!newfds) return NULL; - s->fd = newfds; - memset(s->fd + oldlen, 0, sizeof(*s->fd)*(s->fd_len - oldlen)); - } - fd_data_t *d = &s->fd[fd]; - d->connector = c; - d->wsi = wsi; - return d; -} + qd_server_config_t *config = &hl->listener->config; -/* Push read data into the transport. - * Return 0 on success, number of bytes un-pushed on failure. - */ -static int transport_push(pn_transport_t *t, pn_bytes_t buf) { - ssize_t cap; - while (buf.size > 0 && (cap = pn_transport_capacity(t)) > 0) { - if (buf.size > cap) { - pn_transport_push(t, buf.start, cap); - buf.start += cap; - buf.size -= cap; - } else { - pn_transport_push(t, buf.start, buf.size); - buf.size = 0; - } + int port = qd_port_int(config->port); + if (port < 0) { + qd_log(hs->log, QD_LOG_ERROR, "HTTP listener %s has invalid port %s", + config->host_port, config->port); + goto error; } - return buf.size; -} + struct lws_http_mount *m = &hl->mount; + m->mountpoint = "/"; /* URL mount point */ + m->mountpoint_len = strlen(m->mountpoint); /* length of the mountpoint */ + m->origin = (config->http_root && *config->http_root) ? /* File system root */ + config->http_root : QPID_CONSOLE_STAND_ALONE_INSTALL_DIR; + m->def = "index.html"; /* Default file name */ + m->origin_protocol = LWSMPRO_FILE; /* mount type is a directory in a filesystem */ -static inline int normal_close(struct lws *wsi, const char *msg) { - lws_close_reason(wsi, LWS_CLOSE_STATUS_NORMAL, (unsigned char*)msg, strlen(msg)); - return -1; + struct lws_context_creation_info info = {0}; + info.mounts = m; + info.port = port; + info.protocols = protocols; + info.keepalive_timeout = 1; + info.ssl_cipher_list = CIPHER_LIST; + info.options |= LWS_SERVER_OPTION_VALIDATE_UTF8; + if (config->ssl_profile) { + info.ssl_cert_filepath = config->ssl_certificate_file; + info.ssl_private_key_filepath = config->ssl_private_key_file; + info.ssl_private_key_password = config->ssl_password; + info.ssl_ca_filepath = config->ssl_trusted_certificates; + info.options |= + LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT | + (config->ssl_required ? 0 : LWS_SERVER_OPTION_ALLOW_NON_SSL_ON_SSL_PORT) | + (config->requireAuthentication ? LWS_SERVER_OPTION_REQUIRE_VALID_OPENSSL_CLIENT_CERT : 0); + } + info.vhost_name = hl->listener->config.host_port; + hl->vhost = lws_create_vhost(hs->context, &info); + if (hl->vhost) { + /* Store hl pointer in vhost */ + void *vp = lws_protocol_vh_priv_zalloc(hl->vhost, &protocols[0], sizeof(hl)); + memcpy(vp, &hl, sizeof(hl)); + qd_log(hs->log, QD_LOG_NOTICE, "Listening for HTTP on %s", config->host_port); + return; + } else { + qd_log(hs->log, QD_LOG_NOTICE, "Error listening for HTTP on %s", config->host_port); + goto error; + } + return; + + error: + if (hl->listener->exit_on_error) { + qd_log(hs->log, QD_LOG_CRITICAL, "Shutting down, required listener failed %s", + config->host_port); + exit(1); + } + qd_http_listener_free(hl); } -static inline int unexpected_close(struct lws *wsi, const char *msg) { - lws_close_reason(wsi, LWS_CLOSE_STATUS_UNEXPECTED_CONDITION, (unsigned char*)msg, strlen(msg)); - return -1; +static void listener_close(qd_http_listener_t *hl, qd_http_server_t *hs) { + /* TODO aconway 2017-04-13: can't easily stop listeners under libwebsockets */ + qd_log(hs->log, QD_LOG_ERROR, "Cannot close HTTP listener %s", + hl->listener->config.host_port); } /* - * Callback for un-promoted HTTP connections, and low-level external poll operations. + * LWS callback for un-promoted HTTP connections. * Note main HTTP file serving is handled by the "mount" struct below. - * Called with http lock held. */ static int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { switch (reason) { - case LWS_CALLBACK_HTTP: /* Called if file mount can't find the file */ - lws_return_http_status(wsi, HTTP_STATUS_NOT_FOUND, (char*)in); + case LWS_CALLBACK_PROTOCOL_DESTROY: + qd_http_listener_free(wsi_listener(wsi)); return -1; - case LWS_CALLBACK_ADD_POLL_FD: { - /* Record WSI against FD here, the connector will be recorded when lws_service returns. */ - set_fd(wsi_http_server(wsi), lws_get_socket_fd(wsi), 0, wsi); - break; - } - case LWS_CALLBACK_DEL_POLL_FD: { - fd_data_t *d = fd_data(wsi_http_server(wsi), lws_get_socket_fd(wsi)); - if (d) { - /* Tell dispatch to forget this FD, but let LWS do the actual close() */ - if (d->connector) qdpn_connector_mark_closed(d->connector); - memset(d, 0, sizeof(*d)); - } - break; - } - case LWS_CALLBACK_CHANGE_MODE_POLL_FD: { - struct lws_pollargs *p = (struct lws_pollargs*)in; - qdpn_connector_t *c = wsi_connector(wsi); - if (c) { - if (p->events & POLLIN) qdpn_connector_activate(c, QDPN_CONNECTOR_READABLE); - if (p->events & POLLOUT) qdpn_connector_activate(c, QDPN_CONNECTOR_WRITABLE); - } - break; + case LWS_CALLBACK_HTTP: { + /* Called if file mount can't find the file */ + lws_return_http_status(wsi, HTTP_STATUS_NOT_FOUND, (char*)in); + return -1; } - /* NOTE: Not using LWS_CALLBACK_LOCK/UNLOCK_POLL as we are serializing all HTTP work for now. */ - default: - break; + return 0; } - - return 0; } -/* Buffer to allocate extra header space required by LWS. */ -typedef struct buffer_t { char *start; size_t size; size_t cap; } buffer_t; +/* Wake up a connection managed by the http server thread */ +static void connection_wake(qd_connection_t *qd_conn) +{ + connection_t *c = qd_conn->context; + if (c && qd_conn->listener->http) { + qd_http_server_t *hs = qd_conn->listener->http->server; + work_t w = { W_WAKE, c }; + work_push(hs, w); + } +} -/* Callbacks for promoted AMQP over WS connections. - * Called with http lock held. - */ +/* Callbacks for promoted AMQP over WS connections. */ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { - qdpn_connector_t *c = wsi_connector(wsi); - pn_transport_t *t = c ? qdpn_connector_transport(c) : NULL; + qd_http_server_t *hs = wsi_server(wsi); + connection_t *c = (connection_t*)user; switch (reason) { case LWS_CALLBACK_ESTABLISHED: { - qd_log(wsi_http_server(wsi)->log, QD_LOG_DEBUG, - "Upgraded incoming HTTP connection from %s[%"PRIu64"] to AMQP over WebSocket", - qdpn_connector_name(c), - qd_connection_connection_id((qd_connection_t*)qdpn_connector_context(c))); - memset(user, 0, sizeof(buffer_t)); - break; + /* Upgrade accepted HTTP connection to AMQPWS */ + memset(c, 0, sizeof(*c)); + c->wsi = wsi; + qd_http_listener_t *hl = wsi_listener(wsi); + if (hl == NULL) { + return unexpected_close(c->wsi, "cannot-upgrade"); + } + c->qd_conn = qd_server_connection(hs->server, &hl->listener->config); + if (c->qd_conn == NULL) { + return unexpected_close(c->wsi, "out-of-memory"); + } + c->qd_conn->context = c; + c->qd_conn->wake = connection_wake; + c->qd_conn->listener = hl->listener; + lws_get_peer_simple(wsi, c->qd_conn->rhost, sizeof(c->qd_conn->rhost)); + int err = pn_connection_driver_init(&c->driver, c->qd_conn->pn_conn, NULL); + if (err) { + return unexpected_close(c->wsi, pn_code(err)); + } + strncpy(c->qd_conn->rhost_port, c->qd_conn->rhost, sizeof(c->qd_conn->rhost_port)); + qd_log(hs->log, QD_LOG_DEBUG, + "[%"PRIu64"] upgraded HTTP connection from %s to AMQPWS", + qd_connection_connection_id(c->qd_conn), qd_connection_name(c->qd_conn)); + return handle_events(c); } case LWS_CALLBACK_SERVER_WRITEABLE: { - ssize_t size; - if (!t || (size = pn_transport_pending(t)) < 0) { - return normal_close(wsi, "write-closed"); - } - if (size > 0) { - const void *start = pn_transport_head(t); - /* lws_write() demands LWS_PRE bytes of free space before the data */ - size_t tmpsize = size + LWS_PRE; - buffer_t *wtmp = (buffer_t*)user; - if (wtmp->start == NULL || wtmp->cap < tmpsize) { - wtmp->start = realloc(wtmp->start, tmpsize); - wtmp->size = wtmp->cap = tmpsize; - } - if (wtmp->start == NULL) { - return unexpected_close(wsi, "out-of-memory"); + if (handle_events(c)) return -1; + pn_bytes_t dbuf = pn_connection_driver_write_buffer(&c->driver); + if (dbuf.size) { + /* lws_write() demands LWS_PRE bytes of free space before the data, + * so we must copy from the driver's buffer to larger temporary wbuf + */ + buffer_set_size(&c->wbuf, LWS_PRE + dbuf.size); + if (c->wbuf.start == NULL) { + return unexpected_close(c->wsi, "out-of-memory"); } - void *tmpstart = wtmp->start + LWS_PRE; - memcpy(tmpstart, start, size); - ssize_t wrote = lws_write(wsi, tmpstart, size, LWS_WRITE_BINARY); + unsigned char* buf = (unsigned char*)c->wbuf.start + LWS_PRE; + memcpy(buf, dbuf.start, dbuf.size); + ssize_t wrote = lws_write(wsi, buf, dbuf.size, LWS_WRITE_BINARY); if (wrote < 0) { - pn_transport_close_head(t); - return normal_close(wsi, "write-error"); + pn_connection_driver_write_close(&c->driver); + return unexpected_close(c->wsi, "write-error"); } else { - pn_transport_pop(t, (size_t)wrote); + pn_connection_driver_write_done(&c->driver, wrote); } } - break; + return handle_events(c); } case LWS_CALLBACK_RECEIVE: { - if (!t || pn_transport_capacity(t) < 0) { - return normal_close(wsi, "read-closed"); - } - if (transport_push(t, pn_bytes(len, in))) { - return unexpected_close(wsi, "read-overflow"); + while (len > 0) { + if (handle_events(c)) return -1; + pn_rwbytes_t dbuf = pn_connection_driver_read_buffer(&c->driver); + if (dbuf.size == 0) { + return unexpected_close(c->wsi, "unexpected-data"); + } + size_t copy = (len < dbuf.size) ? len : dbuf.size; + memcpy(dbuf.start, in, copy); + pn_connection_driver_read_done(&c->driver, copy); + len -= copy; + in = (char*)in + copy; } - break; + return handle_events(c); } - case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: - if (t) { - pn_transport_close_tail(t); + case LWS_CALLBACK_USER: { + pn_timestamp_t next_tick = pn_transport_tick(c->driver.transport, hs->now); + if (next_tick && next_tick > hs->now && next_tick < hs->next_tick) { + hs->next_tick = next_tick; } + return handle_events(c); + } - case LWS_CALLBACK_CLOSED: - break; - - default: - break; + case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: { + pn_connection_driver_read_close(&c->driver); + return handle_events(c); } - return 0; -} -static void check_timer(void *void_http_server) { - qd_http_server_t *s = (qd_http_server_t*)void_http_server; - /* Run LWS global timer and forced-service checks. */ - sys_mutex_lock(s->lock); - lws_service_fd(s->context, NULL); - while (!lws_service_adjust_timeout(s->context, 1, 0)) { - /* -1 timeout means just do forced service */ - lws_plat_service_tsi(s->context, -1, 0); - } - if (!s->timer) { - s->timer = qd_timer(s->dispatch, check_timer, s); - } - sys_mutex_unlock(s->lock); - /* Timer is locked using server lock. */ - qd_timer_cancel(s->timer); - qd_timer_schedule(s->timer, 1000); /* LWS wants per-second wakeups */ -} + case LWS_CALLBACK_CLOSED: { + if (c->driver.transport) { + pn_connection_driver_close(&c->driver); + handle_events(c); + } + pn_connection_driver_destroy(&c->driver); + free(c->wbuf.start); + return -1; + } -static qd_http_listener_t * qdpn_connector_http_listener(qdpn_connector_t* c) { - qd_listener_t* ql = (qd_listener_t*)qdpn_listener_context(qdpn_connector_listener(c)); - return qd_listener_http(ql); + default: + return 0; + } } -static void http_connector_process(qdpn_connector_t *c) { - qd_http_listener_t *hl = qdpn_connector_http_listener(c); - qd_http_server_t *s = hl->server; - sys_mutex_lock(s->lock); - int fd = qdpn_connector_get_fd(c); - fd_data_t *d = fd_data(s, fd); - /* Make sure we are still tracking this fd, could have been closed by timer */ - if (d) { - pn_transport_t *t = qdpn_connector_transport(c); - int flags = - (qdpn_connector_hangup(c) ? POLLHUP : 0) | - (qdpn_connector_activated(c, QDPN_CONNECTOR_READABLE) ? POLLIN : 0) | - (qdpn_connector_activated(c, QDPN_CONNECTOR_WRITABLE) ? POLLOUT : 0); - struct lws_pollfd pfd = { fd, flags, flags }; - if (pn_transport_pending(t) > 0) { - lws_callback_on_writable(d->wsi); +#define DEFAULT_TICK 1000 + +static void* http_thread_run(void* v) { + qd_http_server_t *hs = v; + qd_log(hs->log, QD_LOG_INFO, "HTTP server thread running"); + int result = 0; + while(result >= 0) { + /* Send a USER event to run transport ticks, may decrease hs->next_tick. */ + hs->now = qd_timer_now(); + hs->next_tick = hs->now + DEFAULT_TICK; + lws_callback_all_protocol(hs->context, &protocols[1], LWS_CALLBACK_USER); + lws_callback_all_protocol(hs->context, &protocols[2], LWS_CALLBACK_USER); + pn_millis_t timeout = (hs->next_tick > hs->now) ? hs->next_tick - hs->now : 1; + result = lws_service(hs->context, timeout); + + /* Process any work items on the queue */ + for (work_t w = work_pop(hs); w.type != W_NONE; w = work_pop(hs)) { + switch (w.type) { + case W_NONE: + break; + case W_STOP: + result = -1; + break; + case W_LISTEN: + listener_start((qd_http_listener_t*)w.value, hs); + break; + case W_CLOSE: + listener_close((qd_http_listener_t*)w.value, hs); + break; + case W_WAKE: { + connection_t *c = w.value; + pn_collector_put(c->driver.collector, PN_OBJECT, c->driver.connection, + PN_CONNECTION_WAKE); + handle_events(c); + break; + } + } } - lws_service_fd(s->context, &pfd); - d = fd_data(s, fd); /* We may have stopped tracking during service */ - if (pn_transport_capacity(t) > 0) - qdpn_connector_activate(c, QDPN_CONNECTOR_READABLE); - if (pn_transport_pending(t) > 0 || (d && lws_partial_buffered(d->wsi))) - qdpn_connector_activate(c, QDPN_CONNECTOR_WRITABLE); - pn_timestamp_t wake = pn_transport_tick(t, qdpn_now(NULL)); - if (wake) qdpn_connector_wakeup(c, wake); - } - sys_mutex_unlock(s->lock); - check_timer(s); /* Make sure the timer is running */ + } + qd_log(hs->log, QD_LOG_INFO, "HTTP server thread exit"); + return NULL; } -/* Dispatch closes a connector because it is HUP, socket_error or transport_closed() */ -static void http_connector_close(qdpn_connector_t *c) { - int fd = qdpn_connector_get_fd(c); - qd_http_server_t *s = qdpn_connector_http_listener(c)->server; - sys_mutex_lock(s->lock); - fd_data_t *d = fd_data(s, fd); - if (d) { /* Only if we are still tracking fd */ - /* Shutdown but let LWS do the close(), possibly in later timer */ - shutdown(qdpn_connector_get_fd(c), SHUT_RDWR); - short flags = POLLIN|POLLOUT|POLLHUP; - struct lws_pollfd pfd = { qdpn_connector_get_fd(c), flags, flags }; - lws_service_fd(s->context, &pfd); - qdpn_connector_mark_closed(c); - memset(d, 0 , sizeof(*d)); - } - sys_mutex_unlock(s->lock); +void qd_http_server_free(qd_http_server_t *hs) { + if (!hs) return; + if (hs->thread) { + /* Thread safe, stop via work queue then clean up */ + work_t work = { W_STOP, NULL }; + work_push(hs, work); + sys_thread_join(hs->thread); + sys_thread_free(hs->thread); + hs->thread = NULL; + } + work_queue_destroy(&hs->work); + if (hs->context) lws_context_destroy(hs->context); + free(hs); } -static struct qdpn_connector_methods_t http_methods = { - http_connector_process, - http_connector_close -}; - -void qd_http_listener_accept(qd_http_listener_t *hl, qdpn_connector_t *c) { - qd_http_server_t *s = hl->server; - sys_mutex_lock(s->lock); - int fd = qdpn_connector_get_fd(c); - struct lws *wsi = lws_adopt_socket_vhost(hl->vhost, fd); - fd_data_t *d = fd_data(s, fd); - if (d) { /* FD was adopted by LWS, so dispatch must not close it */ - qdpn_connector_set_methods(c, &http_methods); - if (wsi) d->connector = c; - } - sys_mutex_unlock(s->lock); - if (!wsi) { /* accept failed, dispatch should forget the FD. */ - qdpn_connector_mark_closed(c); +qd_http_server_t *qd_http_server(qd_server_t *s, qd_log_source_t *log) { + log_init(); + qd_http_server_t *hs = calloc(1, sizeof(*hs)); + if (hs) { + work_queue_init(&hs->work); + struct lws_context_creation_info info = {0}; + info.gid = info.uid = -1; + info.user = hs; + info.server_string = QD_CONNECTION_PROPERTY_PRODUCT_VALUE; + info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS | + LWS_SERVER_OPTION_SKIP_SERVER_CANONICAL_NAME | + LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; + info.max_http_header_pool = 32; + info.timeout_secs = 1; + + hs->context = lws_create_context(&info); + hs->server = s; + hs->log = log; /* For messages from this file */ + if (!hs->context) { + qd_log(hs->log, QD_LOG_CRITICAL, "No memory starting HTTP server"); + qd_http_server_free(hs); + hs = NULL; + } } + return hs; } -static struct lws_protocols protocols[] = { - /* HTTP only protocol comes first */ - { - "http-only", - callback_http, - 0, - }, - /* "amqp" is the official oasis AMQP over WebSocket protocol name */ - { - "amqp", - callback_amqpws, - sizeof(buffer_t), - }, - /* "binary" is an alias for "amqp", for compatibility with clients designed - * to work with a WebSocket proxy - */ - { - "binary", - callback_amqpws, - sizeof(buffer_t), - }, - { NULL, NULL, 0, 0 } /* terminator */ -}; +/* Thread safe calls that put items on work queue */ -static qd_log_level_t qd_level(int lll) { - switch (lll) { - case LLL_ERR: return QD_LOG_ERROR; - case LLL_WARN: return QD_LOG_WARNING; - case LLL_NOTICE: return QD_LOG_INFO; - case LLL_INFO:return QD_LOG_DEBUG; - case LLL_DEBUG: return QD_LOG_TRACE; - default: return QD_LOG_NONE; +qd_http_listener_t *qd_http_server_listen(qd_http_server_t *hs, qd_listener_t *li) +{ + sys_mutex_lock(hs->work.lock); + if (!hs->thread) { + hs->thread = sys_thread(http_thread_run, hs); } + bool ok = hs->thread; + sys_mutex_unlock(hs->work.lock); + if (!ok) return NULL; + + qd_http_listener_t *hl = qd_http_listener(hs, li); + if (hl) { + work_t w = { W_LISTEN, hl }; + work_push(hs, w); + } + return hl; } -static void emit_lws_log(int lll, const char *line) { - size_t len = strlen(line); - while (len > 1 && isspace(line[len-1])) - --len; - qd_log(http_log, qd_level(lll), "%.*s", len, line); -} - -qd_http_server_t *qd_http_server(qd_dispatch_t *d, qd_log_source_t *log) { - if (!http_log) http_log = qd_log_source("HTTP"); - qd_http_server_t *s = calloc(1, sizeof(*s)); - if (!s) return NULL; - s->log = log; - s->lock = sys_mutex(); - s->dispatch = d; - int levels = - (qd_log_enabled(log, QD_LOG_ERROR) ? LLL_ERR : 0) | - (qd_log_enabled(log, QD_LOG_WARNING) ? LLL_WARN : 0) | - (qd_log_enabled(log, QD_LOG_INFO) ? LLL_NOTICE : 0) | - (qd_log_enabled(log, QD_LOG_DEBUG) ? LLL_INFO : 0) | - (qd_log_enabled(log, QD_LOG_TRACE) ? LLL_DEBUG : 0); - lws_set_log_level(levels, emit_lws_log); - - struct lws_context_creation_info info = {0}; - info.gid = info.uid = -1; - info.user = s; - info.server_string = QD_CONNECTION_PROPERTY_PRODUCT_VALUE; - info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS | - LWS_SERVER_OPTION_SKIP_SERVER_CANONICAL_NAME | - LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; - info.max_http_header_pool = 32; - info.timeout_secs = 1; - s->context = lws_create_context(&info); - if (!s->context) { - free(s); - return NULL; - } - return s; +void qd_http_listener_close(qd_http_listener_t *hl) +{ + work_t w = { W_CLOSE, hl }; + work_push(hl->server, w); } -void qd_http_server_free(qd_http_server_t *s) { - sys_mutex_free(s->lock); - lws_context_destroy(s->context); - if (s->timer) qd_timer_free(s->timer); - if (s->fd) free(s->fd); - free(s); +static qd_http_server_t *wsi_server(struct lws *wsi) { + return (qd_http_server_t*)lws_context_user(lws_get_context(wsi)); } -qd_http_listener_t *qd_http_listener(qd_http_server_t *s, const qd_server_config_t *config) { - qd_http_listener_t *hl = calloc(1, sizeof(*hl)); - if (!hl) return NULL; - hl->server = s; - - struct lws_context_creation_info info = {0}; - - struct lws_http_mount *m = &hl->mount; - m->mountpoint = "/"; /* URL mount point */ - m->mountpoint_len = strlen(m->mountpoint); /* length of the mountpoint */ - m->origin = (config->http_root && *config->http_root) ? /* File system root */ - config->http_root : QPID_CONSOLE_STAND_ALONE_INSTALL_DIR; - m->def = "index.html"; /* Default file name */ - m->origin_protocol = LWSMPRO_FILE; /* mount type is a directory in a filesystem */ - info.mounts = m; - info.port = CONTEXT_PORT_NO_LISTEN_SERVER; /* Don't use LWS listener */ - info.protocols = protocols; - info.keepalive_timeout = 1; - info.ssl_cipher_list = CIPHER_LIST; - info.options |= LWS_SERVER_OPTION_VALIDATE_UTF8; - if (config->ssl_profile) { - info.ssl_cert_filepath = config->ssl_certificate_file; - info.ssl_private_key_filepath = config->ssl_private_key_file; - info.ssl_private_key_password = config->ssl_password; - info.ssl_ca_filepath = config->ssl_trusted_certificates; - info.options |= - LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT | - (config->ssl_required ? 0 : LWS_SERVER_OPTION_ALLOW_NON_SSL_ON_SSL_PORT) | - (config->requireAuthentication ? LWS_SERVER_OPTION_REQUIRE_VALID_OPENSSL_CLIENT_CERT : 0); - } - snprintf(hl->name, sizeof(hl->name), "vhost%x", s->vhost_id++); - info.vhost_name = hl->name; - hl->vhost = lws_create_vhost(s->context, &info); - if (!hl->vhost) { - free(hl); - return NULL; +static qd_http_listener_t *wsi_listener(struct lws *wsi) { + qd_http_listener_t *hl = NULL; + struct lws_vhost *vhost = lws_get_vhost(wsi); + if (vhost) { /* Get qd_http_listener from vhost data */ + void *vp = lws_protocol_vh_priv_get(vhost, &protocols[0]); + memcpy(&hl, vp, sizeof(hl)); } return hl; } -void qd_http_listener_free(qd_http_listener_t *hl) { - free(hl); +static qd_log_source_t *wsi_log(struct lws *wsi) { + return wsi_server(wsi)->log; } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/src/http-none.c ---------------------------------------------------------------------- diff --git a/src/http-none.c b/src/http-none.c index a8953e5..57869d5 100644 --- a/src/http-none.c +++ b/src/http-none.c @@ -20,29 +20,21 @@ #include <qpid/dispatch/log.h> #include "http.h" +struct qd_dispatch_t; + /* No HTTP implementation available. */ -qd_http_server_t *qd_http_server(struct qd_dispatch_t *d, qd_log_source_t *log) +qd_http_server_t *qd_http_server(struct qd_server_t *s, qd_log_source_t *log) { qd_log(log, QD_LOG_WARNING, "HTTP support is not available"); return 0; } -void qd_http_server_free(qd_http_server_t *h) -{ -} +void qd_http_server_free(qd_http_server_t *h) {} -qd_http_listener_t *qd_http_listener(struct qd_http_server_t *s, - const struct qd_server_config_t *config) -{ - return 0; -} +void* qd_http_server_run(void* qd_http_server) { return 0; } + +qd_http_listener_t *qd_http_server_listen(qd_http_server_t *s, struct qd_listener_t *li) { return 0; } -void qd_http_listener_free(qd_http_listener_t *hl) -{ -} -void qd_http_listener_accept(qd_http_listener_t *hl, struct qdpn_connector_t *c) -{ -} http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/src/http.h ---------------------------------------------------------------------- diff --git a/src/http.h b/src/http.h index fae3f1d..a169998 100644 --- a/src/http.h +++ b/src/http.h @@ -23,17 +23,18 @@ typedef struct qd_http_listener_t qd_http_listener_t; typedef struct qd_http_server_t qd_http_server_t; -struct qd_dispatch_t; -struct qd_log_source_t; +struct qd_server_t; struct qd_server_config_t; -struct qdpn_connector_t; +struct qd_listener_t; +struct qd_log_source_t; -qd_http_server_t *qd_http_server(struct qd_dispatch_t *dispatch, struct qd_log_source_t *log); +/* Create a HTTP server */ +qd_http_server_t *qd_http_server(struct qd_server_t *server, struct qd_log_source_t *log); + +/* Free the HTTP server */ void qd_http_server_free(qd_http_server_t*); -qd_http_listener_t *qd_http_listener(struct qd_http_server_t *s, - const struct qd_server_config_t *config); -void qd_http_listener_free(qd_http_listener_t *hl); -/* On error, qdpn_connector_closed(c) is true. */ -void qd_http_listener_accept(qd_http_listener_t *hl, struct qdpn_connector_t *c); + +/* Listening for HTTP, thread safe. */ +qd_http_listener_t *qd_http_server_listen(qd_http_server_t *s, struct qd_listener_t *li); #endif // QD_HTTP_H http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/src/policy.c ---------------------------------------------------------------------- diff --git a/src/policy.c b/src/policy.c index a55f245..2ddbf94 100644 --- a/src/policy.c +++ b/src/policy.c @@ -396,7 +396,7 @@ bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn) pn_connection_t *conn = qd_connection_pn(qd_conn); qd_dispatch_t *qd = qd_server_dispatch(qd_conn->server); qd_policy_t *policy = qd->policy; - const char *hostip = qd_connection_hostip(qd_conn); + const char *hostip = qd_connection_remote_ip(qd_conn); const char *vhost = pn_connection_remote_hostname(conn); if (result) { qd_log(policy->log_source, @@ -567,7 +567,7 @@ bool _qd_policy_approve_link_name(const char *username, const char *allowed, con bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_conn) { - const char *hostip = qd_connection_hostip(qd_conn); + const char *hostip = qd_connection_remote_ip(qd_conn); const char *vhost = pn_connection_remote_hostname(qd_connection_pn(qd_conn)); if (qd_conn->policy_settings->maxSenders) { @@ -618,7 +618,7 @@ bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_ bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *qd_conn) { - const char *hostip = qd_connection_hostip(qd_conn); + const char *hostip = qd_connection_remote_ip(qd_conn); const char *vhost = pn_connection_remote_hostname(qd_connection_pn(qd_conn)); if (qd_conn->policy_settings->maxReceivers) { @@ -683,7 +683,7 @@ void qd_policy_amqp_open(qd_connection_t *qd_conn) { if (policy->enableVhostPolicy) { // Open connection or not based on policy. pn_transport_t *pn_trans = pn_connection_transport(conn); - const char *hostip = qd_connection_hostip(qd_conn); + const char *hostip = qd_connection_remote_ip(qd_conn); const char *pcrh = pn_connection_remote_hostname(conn); const char *vhost = (pcrh ? pcrh : ""); const char *conn_name = qd_connection_name(qd_conn); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/src/server.c ---------------------------------------------------------------------- diff --git a/src/server.c b/src/server.c index e7fa34d..066c862 100644 --- a/src/server.c +++ b/src/server.c @@ -475,9 +475,14 @@ static void decorate_connection(qd_server_t *qd_server, pn_connection_t *conn, c pn_data_exit(pn_connection_properties(conn)); } +/* Wake function for proactor-manaed connections */ +static void connection_wake(qd_connection_t *ctx) { + if (ctx->pn_conn) pn_connection_wake(ctx->pn_conn); +} -/* Construct a new qd_connection. */ -static qd_connection_t *qd_connection(qd_server_t *server, qd_server_config_t *config) { +/* Construct a new qd_connection. Thread safe. */ +qd_connection_t *qd_server_connection(qd_server_t *server, qd_server_config_t *config) +{ qd_connection_t *ctx = new_qd_connection_t(); if (!ctx) return NULL; ZERO(ctx); @@ -491,6 +496,7 @@ static qd_connection_t *qd_connection(qd_server_t *server, qd_server_config_t *c return NULL; } ctx->server = server; + ctx->wake = connection_wake; /* Default, over-ridden for HTTP connections */ pn_connection_set_context(ctx->pn_conn, ctx); DEQ_ITEM_INIT(ctx); DEQ_INIT(ctx->deferred_calls); @@ -508,7 +514,7 @@ static void on_accept(pn_event_t *e) assert(pn_event_type(e) == PN_LISTENER_ACCEPT); pn_listener_t *pn_listener = pn_event_listener(e); qd_listener_t *listener = pn_listener_get_context(pn_listener); - qd_connection_t *ctx = qd_connection(listener->server, &listener->config); + qd_connection_t *ctx = qd_server_connection(listener->server, &listener->config); if (!ctx) { qd_log(listener->server->log_source, QD_LOG_CRITICAL, "Allocation failure during accept to %s", listener->config.host_port); @@ -549,25 +555,19 @@ void connect_fail(qd_connection_t *ctx, const char *name, const char *descriptio /* Get the host IP address for the remote end */ -static int set_remote_host_port(qd_connection_t *ctx) { +static void set_rhost_port(qd_connection_t *ctx) { pn_transport_t *tport = pn_connection_transport(ctx->pn_conn); - const struct sockaddr_storage* addr = pn_proactor_addr_sockaddr(pn_proactor_addr_remote(tport)); - int err = 0; - if (!addr) { - err = -1; - qd_log(ctx->server->log_source, QD_LOG_ERROR, "No remote address for connection to %s"); - } else { + const struct sockaddr_storage* addr = + pn_proactor_addr_sockaddr(pn_proactor_addr_remote(tport)); + if (addr) { char rport[NI_MAXSERV] = ""; int err = getnameinfo((struct sockaddr*)addr, sizeof(*addr), ctx->rhost, sizeof(ctx->rhost), rport, sizeof(rport), NI_NUMERICHOST | NI_NUMERICSERV); if (!err) { snprintf(ctx->rhost_port, sizeof(ctx->rhost_port), "%s:%s", ctx->rhost, rport); - } else { - qd_log(ctx->server->log_source, QD_LOG_ERROR, "No remote address for connection to %s"); } } - return err; } @@ -581,7 +581,6 @@ static void on_connection_bound(qd_server_t *server, pn_event_t *e) { // // Proton pushes out its trace to transport_tracer() which in turn writes a trace // message to the qdrouter log If trace level logging is enabled on the router set - // PN_TRACE_DRV | PN_TRACE_FRM | PN_TRACE_RAW on the proton transport // if (qd_log_enabled(ctx->server->log_source, QD_LOG_TRACE)) { pn_transport_trace(tport, PN_TRACE_FRM); @@ -593,9 +592,9 @@ static void on_connection_bound(qd_server_t *server, pn_event_t *e) { config = &ctx->listener->config; const char *name = config->host_port; pn_transport_set_server(tport); + set_rhost_port(ctx); - if (set_remote_host_port(ctx) == 0 && - qd_policy_socket_accept(server->qd->policy, ctx->rhost)) + if (qd_policy_socket_accept(server->qd->policy, ctx->rhost)) { ctx->policy_counted = true; } else { @@ -667,10 +666,9 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event); static void handle_listener(pn_event_t *e, qd_server_t *qd_server) { qd_log_source_t *log = qd_server->log_source; - - /* FIXME aconway 2017-02-20: HTTP support */ qd_listener_t *li = (qd_listener_t*) pn_listener_get_context(pn_event_listener(e)); const char *host_port = li->config.host_port; + switch (pn_event_type(e)) { case PN_LISTENER_OPEN: @@ -705,7 +703,7 @@ static void handle_listener(pn_event_t *e, qd_server_t *qd_server) { } -static void qd_connection_free(qd_connection_t *ctx) +void qd_connection_free(qd_connection_t *ctx) { qd_server_t *qd_server = ctx->server; @@ -751,7 +749,7 @@ static void qd_connection_free(qd_connection_t *ctx) /* Events involving a connection or listener are serialized by the proactor so * only one event per connection / listener will be processed at a time. */ -static bool handle(pn_event_t *e, qd_server_t *qd_server) { +static bool handle(qd_server_t *qd_server, pn_event_t *e) { pn_connection_t *pn_conn = pn_event_connection(e); qd_connection_t *ctx = pn_conn ? (qd_connection_t*) pn_connection_get_context(pn_conn) : NULL; @@ -803,7 +801,7 @@ static bool handle(pn_event_t *e, qd_server_t *qd_server) { /* TODO aconway 2017-04-18: fold the container handler into the server */ qd_container_handle_event(qd_server->container, e); - /* Free the connection after all other processing */ + /* Free the connection after all other processing is complete */ if (ctx && pn_event_type(e) == PN_TRANSPORT_CLOSED) { pn_connection_set_context(pn_conn, NULL); qd_connection_free(ctx); @@ -819,7 +817,7 @@ static void *thread_run(void *arg) pn_event_batch_t *events = pn_proactor_wait(qd_server->proactor); pn_event_t * e; while (running && (e = pn_event_batch_next(events))) { - running = handle(e, qd_server); + running = handle(qd_server, e); } pn_proactor_done(qd_server->proactor, events); } @@ -836,7 +834,7 @@ static void try_open_lh(qd_connector_t *ct) return; } - qd_connection_t *ctx = qd_connection(ct->server, &ct->config); + qd_connection_t *ctx = qd_server_connection(ct->server, &ct->config); if (!ctx) { /* Try again later */ qd_log(ct->server->log_source, QD_LOG_CRITICAL, "Allocation failure connecting to %s", ct->config.host_port); @@ -987,7 +985,7 @@ qd_server_t *qd_server(qd_dispatch_t *qd, int thread_count, const char *containe qd_server->next_connection_id = 1; qd_server->py_displayname_obj = 0; - /* FIXME aconway 2017-01-20: restore HTTP support */ + qd_server->http = qd_http_server(qd_server, qd_server->log_source); qd_log(qd_server->log_source, QD_LOG_INFO, "Container Name: %s", qd_server->container_name); @@ -1052,11 +1050,10 @@ void qd_server_stop(qd_dispatch_t *qd) void qd_server_activate(qd_connection_t *ctx) { - if (!ctx || !ctx->pn_conn) - return; - pn_connection_wake(ctx->pn_conn); + if (ctx) ctx->wake(ctx); } + void qd_connection_set_context(qd_connection_t *conn, void *context) { conn->user_context = context; @@ -1133,28 +1130,44 @@ qd_listener_t *qd_server_listener(qd_server_t *server) qd_listener_t *li = new_qd_listener_t(); if (!li) return 0; ZERO(li); - sys_atomic_init(&li->ref_count, 1); li->server = server; li->http = NULL; return li; } - -bool qd_listener_listen(qd_listener_t *li) { - if (!li->pn_listener) { /* Not already listening */ - li->pn_listener = pn_listener(); - if (!li->pn_listener) { - qd_log(li->server->log_source, QD_LOG_ERROR, "No memory listening on %s", - li->config.host_port); - return false; - } +static bool qd_listener_listen_pn(qd_listener_t *li) { + li->pn_listener = pn_listener(); + if (li->pn_listener) { pn_listener_set_context(li->pn_listener, li); - /* Listen is asynchronous, log listening on PN_LISTENER_OPEN */ - sys_atomic_inc(&li->ref_count); - pn_proactor_listen(li->server->proactor, li->pn_listener, li->config.host_port, BACKLOG); + pn_proactor_listen(li->server->proactor, li->pn_listener, li->config.host_port, + BACKLOG); + sys_atomic_inc(&li->ref_count); /* In use by proactor, PN_LISTENER_CLOSE will dec */ + /* Listen is asynchronous, log "listening" message on PN_LISTENER_OPEN event */ + } else { + qd_log(li->server->log_source, QD_LOG_CRITICAL, "No memory listening on %s", + li->config.host_port); + } + return li->pn_listener; +} + +static bool qd_listener_listen_http(qd_listener_t *li) { + if (li->server->http) { + /* qd_http_listener holds a reference to li, will decref when closed */ + qd_http_server_listen(li->server->http, li); + return li->http; + } else { + qd_log(li->server->log_source, QD_LOG_ERROR, "No HTTP support to listen on %s", + li->config.host_port); + return false; } - return true; +} + + +bool qd_listener_listen(qd_listener_t *li) { + if (li->pn_listener || li->http) /* Already listening */ + return true; + return li->config.http ? qd_listener_listen_http(li) : qd_listener_listen_pn(li); } @@ -1162,7 +1175,6 @@ void qd_listener_decref(qd_listener_t* li) { if (li && sys_atomic_dec(&li->ref_count) == 1) { qd_server_config_free(&li->config); - if (li->http) qd_http_listener_free(li->http); free_qd_listener_t(li); } } @@ -1238,6 +1250,11 @@ qd_http_listener_t *qd_listener_http(qd_listener_t *li) { return li->http; } -const char* qd_connection_hostip(const qd_connection_t *c) { +const char* qd_connection_remote_ip(const qd_connection_t *c) { return c->rhost; } + +/* Expose event handling for HTTP connections */ +void qd_connection_handle(qd_connection_t *c, pn_event_t *e) { + handle(c->server, e); +} http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/src/server_private.h ---------------------------------------------------------------------- diff --git a/src/server_private.h b/src/server_private.h index a6543fa..6e6f1c3 100644 --- a/src/server_private.h +++ b/src/server_private.h @@ -39,16 +39,19 @@ qd_dispatch_t* qd_server_dispatch(qd_server_t *server); void qd_server_timeout(qd_server_t *server, qd_duration_t delay); -const char* qd_connection_name(const qd_connection_t *c); +qd_connection_t *qd_server_connection(qd_server_t *server, qd_server_config_t* config); +void qd_connection_free(qd_connection_t* conn); + qd_connector_t* qd_connection_connector(const qd_connection_t *c); -const char* qd_connection_hostip(const qd_connection_t *c); -const qd_server_config_t *qd_connector_config(const qd_connector_t *c); +void qd_connection_handle(qd_connection_t *c, pn_event_t *e); -qd_http_listener_t *qd_listener_http(qd_listener_t *l); + +const qd_server_config_t *qd_connector_config(const qd_connector_t *c); qd_listener_t *qd_server_listener(qd_server_t *server); qd_connector_t *qd_server_connector(qd_server_t *server); + void qd_connector_decref(qd_connector_t* ct); void qd_listener_decref(qd_listener_t* ct); void qd_server_config_free(qd_server_config_t *cf); @@ -136,7 +139,7 @@ struct qd_connection_t { pn_ssl_t *ssl; qd_listener_t *listener; qd_connector_t *connector; - void *context; // Copy of context from listener or connector + void *context; // context from listener or connector void *user_context; void *link_context; // Context shared by this connection's links uint64_t connection_id; // A unique identifier for the qd_connection_t. The underlying pn_connection already has one but it is long and clunky. @@ -152,6 +155,7 @@ struct qd_connection_t { bool policy_counted; char *role; //The specified role of the connection, e.g. "normal", "inter-router", "route-container" etc. qd_pn_free_link_session_list_t free_link_session_list; + void (*wake)(qd_connection_t*); /* Wake method, different for HTTP vs. proactor */ char rhost[NI_MAXHOST]; /* Remote host numeric IP for incoming connections */ char rhost_port[NI_MAXHOST+NI_MAXSERV]; /* Remote host:port for incoming connections */ }; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/tests/system_tests_http.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_http.py b/tests/system_tests_http.py index 5627277..3d13c32 100644 --- a/tests/system_tests_http.py +++ b/tests/system_tests_http.py @@ -50,6 +50,14 @@ class RouterTestHttp(TestCase): def assert_get_cert(self, url): self.assertEqual("HTTP test\n", self.get_cert("%s/system_tests_http.txt" % url)) + def test_listen_error(self): + """Make sure a router exits if an initial HTTP listener fails, doesn't hang""" + config = Qdrouterd.Config([ + ('router', {'mode': 'standalone', 'id': 'bad'}), + ('listener', {'port': 80, 'http':True})]) + r = Qdrouterd(name="expect_fail", config=config, wait=False); + self.assertEqual(1, r.wait()) + def test_http_get(self): config = Qdrouterd.Config([ ('router', {'id': 'QDR.HTTP'}), http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/16980f67/tests/system_tests_one_router.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py index 0a982c7..483d5e2 100644 --- a/tests/system_tests_one_router.py +++ b/tests/system_tests_one_router.py @@ -57,6 +57,14 @@ class RouterTest(TestCase): cls.router.wait_ready() cls.address = cls.router.addresses[0] + def test_listen_error(self): + """Make sure a router exits if a initial listener fails, doesn't hang""" + config = Qdrouterd.Config([ + ('router', {'mode': 'standalone', 'id': 'bad'}), + ('listener', {'port': 80})]) + r = Qdrouterd(name="expect_fail", config=config, wait=False); + self.assertEqual(1, r.wait()) + def test_01_pre_settled(self): addr = self.address+"/pre_settled/1" M1 = self.messenger() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
