http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/posix/driver.c ---------------------------------------------------------------------- diff --git a/src/posix/driver.c b/src/posix/driver.c deleted file mode 100644 index aa80fd9..0000000 --- a/src/posix/driver.c +++ /dev/null @@ -1,1093 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <assert.h> -#include <poll.h> -#include <stdio.h> -#include <string.h> - -#include <ctype.h> -#include <errno.h> -#include <stdio.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <netinet/tcp.h> -#include <netdb.h> -#include <unistd.h> -#include <fcntl.h> -#include <assert.h> -#include <time.h> - -#ifndef __sun -#include <sys/eventfd.h> -#endif - -#ifdef __sun -#include <signal.h> -#endif - -#include <qpid/dispatch/driver.h> -#include <qpid/dispatch/error.h> -#include <qpid/dispatch/threading.h> -#include <proton/error.h> -#include <proton/ssl.h> -#include <proton/object.h> -#include <qpid/dispatch/ctools.h> -#include "alloc.h" -#include "aprintf.h" -#include "log_private.h" - -/* Decls */ - -#define MAX_HOST 1024 -#define MAX_SERV 256 -#define ERROR_MAX 128 - -#define PN_SEL_RD (0x0001) -#define PN_SEL_WR (0x0002) - -DEQ_DECLARE(qdpn_listener_t, qdpn_listener_list_t); -DEQ_DECLARE(qdpn_connector_t, qdpn_connector_list_t); - -const char *protocol_family_ipv4 = "IPv4"; -const char *protocol_family_ipv6 = "IPv6"; - -const char *AF_INET6_STR = "AF_INET6"; -const char *AF_INET_STR = "AF_INET"; - -static inline void ignore_result(int unused_result) { - (void) unused_result; -} - -struct qdpn_driver_t { - qd_log_source_t *log; - sys_mutex_t *lock; - - // - // The following values need to be protected by lock from multi-threaded access. - // - qdpn_listener_list_t listeners; - qdpn_connector_list_t connectors; - qdpn_listener_t *listener_next; - qdpn_connector_t *connector_next; - size_t closed_count; - - // - // The following values will only be accessed by one thread at a time. - // - size_t capacity; - struct pollfd *fds; - size_t nfds; -#ifdef __sun - int ctrl[2]; -#else - int efd; // Event-FD for signaling the poll (driver-wakeup) -#endif - pn_timestamp_t wakeup; -}; - -struct qdpn_listener_t { - DEQ_LINKS(qdpn_listener_t); - qdpn_driver_t *driver; - void *context; - int idx; - int fd; - bool pending:1; - bool closed:1; -}; - -#define PN_NAME_MAX (256) - -struct qdpn_connector_t { - DEQ_LINKS(qdpn_connector_t); - qdpn_driver_t *driver; - char name[PN_NAME_MAX]; - char hostip[PN_NAME_MAX]; - pn_timestamp_t wakeup; - pn_connection_t *connection; - pn_transport_t *transport; - qdpn_listener_t *listener; - void *context; - qdpn_connector_methods_t *methods; - int idx; - int fd; - int status; - bool pending_tick:1; - bool pending_read:1; - bool pending_write:1; - bool socket_error:1; - bool hangup:1; - bool closed:1; -}; - -ALLOC_DECLARE(qdpn_listener_t); -ALLOC_DEFINE(qdpn_listener_t); - -ALLOC_DECLARE(qdpn_connector_t); -ALLOC_DEFINE(qdpn_connector_t); - -/* Impls */ - -static void qdpn_log_errno(qdpn_driver_t *d, const char *fmt, ...) -{ - char msg[QD_LOG_TEXT_MAX]; - char *begin = msg, *end = msg+sizeof(msg); - va_list ap; - va_start(ap, fmt); - vaprintf(&begin, end, fmt, ap); - va_end(ap); - aprintf(&begin, end, ": "); - strerror_r(errno, begin, end - begin); - qd_log(d->log, QD_LOG_ERROR, "%s", msg); -} - - -pn_timestamp_t pn_i_now(void) -{ - struct timespec now; -#ifdef CLOCK_MONOTONIC_COARSE - int cid = CLOCK_MONOTONIC_COARSE; -#else - int cid = CLOCK_MONOTONIC; -#endif - if (clock_gettime(cid, &now)) { - qd_error_errno(errno, "clock_gettime"); - exit(1); - } - return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_nsec / 1000000); -} - -pn_timestamp_t qdpn_now() { return pn_i_now(); } - -#define pn_min(X,Y) ((X) > (Y) ? (Y) : (X)) -#define pn_max(X,Y) ((X) < (Y) ? (Y) : (X)) - -static pn_timestamp_t pn_timestamp_min( pn_timestamp_t a, pn_timestamp_t b ) -{ - if (a && b) return pn_min(a, b); - if (a) return a; - return b; -} - -// listener - -static void qdpn_driver_add_listener(qdpn_driver_t *d, qdpn_listener_t *l) -{ - if (!l->driver) return; - sys_mutex_lock(d->lock); - DEQ_INSERT_TAIL(d->listeners, l); - sys_mutex_unlock(d->lock); - l->driver = d; -} - -static void qdpn_driver_remove_listener(qdpn_driver_t *d, qdpn_listener_t *l) -{ - if (!l->driver) return; - - sys_mutex_lock(d->lock); - if (l == d->listener_next) - d->listener_next = DEQ_NEXT(l); - DEQ_REMOVE(d->listeners, l); - sys_mutex_unlock(d->lock); - - l->driver = NULL; -} - - -static int qdpn_create_socket(int af) -{ - struct protoent *pe_tcp = getprotobyname("tcp"); - if (pe_tcp == NULL) - return -1; - return socket(af, SOCK_STREAM, pe_tcp->p_proto); -} - - -static void qdpn_configure_sock(qdpn_driver_t *driver, int sock, bool tcp) -{ - // - // Set the socket to be non-blocking for asynchronous operation. - // - int flags = fcntl(sock, F_GETFL); - flags |= O_NONBLOCK; - if (fcntl(sock, F_SETFL, flags) < 0) - qdpn_log_errno(driver, "fcntl"); - - // - // Disable the Nagle algorithm on TCP connections. - // - // Note: It would be more correct for the "level" argument to be SOL_TCP. However, there - // are portability issues with this macro so we use IPPROTO_TCP instead. - // - if (tcp) { - int tcp_nodelay = 1; - if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void*) &tcp_nodelay, sizeof(tcp_nodelay)) < 0) - qdpn_log_errno(driver, "setsockopt"); - } -} - - -/** - * Sets the ai_family field on the addrinfo struct based on the passed in NON-NULL protocol_family. - * If the passed in protocol family does not match IPv6, IPv4, the function does not set the ai_family field - */ -static void qd_set_addr_ai_family(qdpn_driver_t *driver, struct addrinfo *addr, const char* protocol_family) -{ - if (protocol_family) { - if(strcmp(protocol_family, protocol_family_ipv6) == 0) - addr->ai_family = AF_INET6; - else if(strcmp(protocol_family, protocol_family_ipv4) == 0) - addr->ai_family = AF_INET; - } -} - - -qdpn_listener_t *qdpn_listener(qdpn_driver_t *driver, - const char *host, - const char *port, - const char *protocol_family, - void* context) -{ - if (!driver) return NULL; - - struct addrinfo hints = {0}, *addr; - hints.ai_socktype = SOCK_STREAM; - int code = getaddrinfo(host, port, &hints, &addr); - if (code) { - qd_log(driver->log, QD_LOG_ERROR, "getaddrinfo(%s, %s): %s", host, port, gai_strerror(code)); - return 0; - } - - // Set the protocol family before creating the socket. - qd_set_addr_ai_family(driver, addr, protocol_family); - - int sock = qdpn_create_socket(addr->ai_family); - if (sock < 0) { - qdpn_log_errno(driver, "pn_create_socket"); - freeaddrinfo(addr); - return 0; - } - - int optval = 1; - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) == -1) { - qdpn_log_errno(driver, "setsockopt"); - close(sock); - freeaddrinfo(addr); - return 0; - } - - if (bind(sock, addr->ai_addr, addr->ai_addrlen) == -1) { - qdpn_log_errno(driver, "bind"); - freeaddrinfo(addr); - close(sock); - return 0; - } - - freeaddrinfo(addr); - - if (listen(sock, 50) == -1) { - qdpn_log_errno(driver, "listen"); - close(sock); - return 0; - } - - qdpn_listener_t *l = qdpn_listener_fd(driver, sock, context); - return l; -} - -qdpn_listener_t *qdpn_listener_fd(qdpn_driver_t *driver, int fd, void *context) -{ - if (!driver) return NULL; - - qdpn_listener_t *l = new_qdpn_listener_t(); - if (!l) return NULL; - DEQ_ITEM_INIT(l); - l->driver = driver; - l->idx = 0; - l->pending = false; - l->fd = fd; - l->closed = false; - l->context = context; - - qdpn_driver_add_listener(driver, l); - return l; -} - -int qdpn_listener_get_fd(qdpn_listener_t *listener) -{ - assert(listener); - return listener->fd; -} - -qdpn_listener_t *qdpn_listener_head(qdpn_driver_t *driver) -{ - if (!driver) - return 0; - - qdpn_listener_t *head; - sys_mutex_lock(driver->lock); - head = DEQ_HEAD(driver->listeners); - sys_mutex_unlock(driver->lock); - return head; -} - -qdpn_listener_t *qdpn_listener_next(qdpn_listener_t *listener) -{ - if (!listener || !listener->driver) - return 0; - - qdpn_listener_t *next; - sys_mutex_lock(listener->driver->lock); - next = DEQ_NEXT(listener); - sys_mutex_unlock(listener->driver->lock); - return next; -} - -void *qdpn_listener_context(qdpn_listener_t *l) -{ - return l ? l->context : NULL; -} - -void qdpn_listener_set_context(qdpn_listener_t *listener, void *context) -{ - assert(listener); - listener->context = context; -} - -qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *l, - void *policy, - bool (*policy_fn)(void *, const char *name), - bool *counted) -{ - if (!l || !l->pending) return NULL; - char name[PN_NAME_MAX]; - char serv[MAX_SERV]; - char hostip[MAX_HOST]; - - struct sockaddr_in addr = {0}; - addr.sin_family = AF_UNSPEC; - socklen_t addrlen = sizeof(addr); - - int sock = accept(l->fd, (struct sockaddr *) &addr, &addrlen); - if (sock < 0) { - qdpn_log_errno(l->driver, "accept"); - return 0; - } else { - int code = getnameinfo((struct sockaddr *) &addr, addrlen, hostip, MAX_HOST, serv, MAX_SERV, NI_NUMERICHOST | NI_NUMERICSERV); - if (code != 0) { - qd_log(l->driver->log, QD_LOG_ERROR, "getnameinfo: %s", gai_strerror(code)); - close(sock); - return 0; - } else { - qdpn_configure_sock(l->driver, sock, true); - snprintf(name, PN_NAME_MAX-1, "%s:%s", hostip, serv); - } - } - - if (policy_fn) { - if (!(*policy_fn)(policy, name)) { - close(sock); - return 0; - } else { - *counted = true; - } - } - qdpn_connector_t *c = qdpn_connector_fd(l->driver, sock, NULL); - snprintf(c->name, PN_NAME_MAX, "%s", name); - snprintf(c->hostip, PN_NAME_MAX, "%s", hostip); - c->listener = l; - return c; -} - -void qdpn_listener_close(qdpn_listener_t *l) -{ - if (!l) return; - if (l->closed) return; - - if (close(l->fd) == -1) - qdpn_log_errno(l->driver, "close"); - l->closed = true; -} - -void qdpn_listener_free(qdpn_listener_t *l) -{ - if (!l) return; - if (l->driver) qdpn_driver_remove_listener(l->driver, l); - free_qdpn_listener_t(l); -} - -// connector - -static void qdpn_driver_add_connector(qdpn_driver_t *d, qdpn_connector_t *c) -{ - if (!c->driver) return; - sys_mutex_lock(d->lock); - DEQ_INSERT_TAIL(d->connectors, c); - sys_mutex_unlock(d->lock); - c->driver = d; -} - -static void qdpn_driver_remove_connector(qdpn_driver_t *d, qdpn_connector_t *c) -{ - if (!c->driver) return; - - sys_mutex_lock(d->lock); - if (c == d->connector_next) { - d->connector_next = DEQ_NEXT(c); - } - - DEQ_REMOVE(d->connectors, c); - c->driver = NULL; - if (c->closed) { - d->closed_count--; - } - sys_mutex_unlock(d->lock); -} - -qdpn_connector_t *qdpn_connector(qdpn_driver_t *driver, - const char *host, - const char *port, - const char *protocol_family, - void *context) -{ - if (!driver) return NULL; - - struct addrinfo hints = {0}, *addr; - hints.ai_socktype = SOCK_STREAM; - int code = getaddrinfo(host, port, &hints, &addr); - if (code) { - qd_log(driver->log, QD_LOG_ERROR, "getaddrinfo(%s, %s): %s", host, port, gai_strerror(code)); - return 0; - } - - // Set the protocol family before creating the socket. - qd_set_addr_ai_family(driver, addr, protocol_family); - - int sock = qdpn_create_socket(addr->ai_family); - if (sock == PN_INVALID_SOCKET) { - freeaddrinfo(addr); - qdpn_log_errno(driver, "pn_create_socket"); - return 0; - } - - qdpn_configure_sock(driver, sock, true); - - if (connect(sock, addr->ai_addr, addr->ai_addrlen) == -1) { - if (errno != EINPROGRESS) { - qdpn_log_errno(driver, "connect"); - freeaddrinfo(addr); - close(sock); - return 0; - } - } - - freeaddrinfo(addr); - - qdpn_connector_t *c = qdpn_connector_fd(driver, sock, context); - snprintf(c->name, PN_NAME_MAX, "%s:%s", host, port); - return c; -} - - -static void connector_process(qdpn_connector_t *c); -static void connector_close(qdpn_connector_t *c); - -static qdpn_connector_methods_t connector_methods = { - connector_process, - connector_close -}; - -qdpn_connector_t *qdpn_connector_fd(qdpn_driver_t *driver, int fd, void *context) -{ - if (!driver) return NULL; - - qdpn_connector_t *c = new_qdpn_connector_t(); - if (!c) return NULL; - DEQ_ITEM_INIT(c); - c->driver = driver; - c->pending_tick = false; - c->pending_read = false; - c->pending_write = false; - c->socket_error = false; - c->hangup = false; - c->name[0] = '\0'; - c->idx = 0; - c->fd = fd; - c->status = PN_SEL_RD | PN_SEL_WR; - c->closed = false; - c->wakeup = 0; - c->connection = NULL; - c->transport = pn_transport(); - c->context = context; - c->listener = NULL; - c->methods = &connector_methods; - qdpn_driver_add_connector(driver, c); - return c; -} - -int qdpn_connector_get_fd(qdpn_connector_t *connector) -{ - assert(connector); - return connector->fd; -} - -qdpn_connector_t *qdpn_connector_head(qdpn_driver_t *driver) -{ - if (!driver) - return 0; - - sys_mutex_lock(driver->lock); - qdpn_connector_t *head = DEQ_HEAD(driver->connectors); - sys_mutex_unlock(driver->lock); - return head; -} - -qdpn_connector_t *qdpn_connector_next(qdpn_connector_t *connector) -{ - if (!connector || !connector->driver) - return 0; - sys_mutex_lock(connector->driver->lock); - qdpn_connector_t *next = DEQ_NEXT(connector); - sys_mutex_unlock(connector->driver->lock); - return next; -} - -pn_transport_t *qdpn_connector_transport(qdpn_connector_t *ctor) -{ - return ctor ? ctor->transport : NULL; -} - -void qdpn_connector_set_connection(qdpn_connector_t *ctor, pn_connection_t *connection) -{ - if (!ctor) return; - if (ctor->connection) { - pn_class_decref(PN_OBJECT, ctor->connection); - pn_transport_unbind(ctor->transport); - } - ctor->connection = connection; - if (ctor->connection) { - pn_class_incref(PN_OBJECT, ctor->connection); - pn_transport_bind(ctor->transport, connection); - } -} - -pn_connection_t *qdpn_connector_connection(qdpn_connector_t *ctor) -{ - return ctor ? ctor->connection : NULL; -} - -void *qdpn_connector_context(qdpn_connector_t *ctor) -{ - return ctor ? ctor->context : NULL; -} - -void qdpn_connector_set_context(qdpn_connector_t *ctor, void *context) -{ - if (!ctor) return; - ctor->context = context; -} - -const char *qdpn_connector_name(const qdpn_connector_t *ctor) -{ - if (!ctor) return 0; - return ctor->name; -} - -const char *qdpn_connector_hostip(const qdpn_connector_t *ctor) -{ - if (!ctor) return 0; - return ctor->hostip; -} - -qdpn_listener_t *qdpn_connector_listener(qdpn_connector_t *ctor) -{ - return ctor ? ctor->listener : NULL; -} - -/* Mark the connector as closed, but don't close the FD (already closed or - * will be closed elsewhere) - */ -void qdpn_connector_mark_closed(qdpn_connector_t *ctor) -{ - if (!ctor || !ctor->driver) return; - sys_mutex_lock(ctor->driver->lock); - ctor->status = 0; - if (!ctor->closed) { - qd_log(ctor->driver->log, QD_LOG_TRACE, "closed %s", ctor->name); - ctor->closed = true; - ctor->driver->closed_count++; - } - sys_mutex_unlock(ctor->driver->lock); -} - -static void connector_close(qdpn_connector_t *ctor) -{ - if (ctor && !ctor->closed) { - qdpn_connector_mark_closed(ctor); - if (close(ctor->fd) == -1) - qdpn_log_errno(ctor->driver, "close %s", ctor->name); - } -} - -void qdpn_connector_close(qdpn_connector_t *c) -{ - if (c && !c->closed) c->methods->close(c); -} - -bool qdpn_connector_closed(qdpn_connector_t *ctor) -{ - return ctor ? ctor->closed : true; -} - -bool qdpn_connector_failed(qdpn_connector_t *ctor) -{ - return ctor ? ctor->socket_error : true; -} - -void qdpn_connector_free(qdpn_connector_t *ctor) -{ - if (!ctor) return; - - if (ctor->driver) qdpn_driver_remove_connector(ctor->driver, ctor); - pn_transport_unbind(ctor->transport); - pn_transport_free(ctor->transport); - ctor->transport = NULL; - if (ctor->connection) pn_class_decref(PN_OBJECT, ctor->connection); - ctor->connection = NULL; - free_qdpn_connector_t(ctor); -} - -void qdpn_connector_activate(qdpn_connector_t *ctor, qdpn_activate_criteria_t crit) -{ - switch (crit) { - case QDPN_CONNECTOR_WRITABLE : - ctor->status |= PN_SEL_WR; - break; - - case QDPN_CONNECTOR_READABLE : - ctor->status |= PN_SEL_RD; - break; - } -} - - -void qdpn_activate_all(qdpn_driver_t *d) -{ - sys_mutex_lock(d->lock); - qdpn_connector_t *c = DEQ_HEAD(d->connectors); - while (c) { - c->status |= PN_SEL_WR; - c = DEQ_NEXT(c); - } - sys_mutex_unlock(d->lock); -} - -bool qdpn_connector_hangup(qdpn_connector_t *ctor) { - return ctor->hangup; -} - -bool qdpn_connector_activated(qdpn_connector_t *ctor, qdpn_activate_criteria_t crit) -{ - bool result = false; - - switch (crit) { - case QDPN_CONNECTOR_WRITABLE : - result = ctor->pending_write; - ctor->pending_write = false; - ctor->status &= ~PN_SEL_WR; - break; - - case QDPN_CONNECTOR_READABLE : - result = ctor->pending_read; - ctor->pending_read = false; - ctor->status &= ~PN_SEL_RD; - break; - } - - return result; -} - -static pn_timestamp_t qdpn_connector_tick(qdpn_connector_t *ctor, pn_timestamp_t now) -{ - if (!ctor->transport) return 0; - return pn_transport_tick(ctor->transport, now); -} - -void qdpn_connector_process(qdpn_connector_t *c) -{ - if (c && !c->closed) c->methods->process(c); -} - -static void connector_process(qdpn_connector_t *c) -{ - if(c->closed) return; - - pn_transport_t *transport = c->transport; - c->status = 0; - - /// - /// Socket read - /// - ssize_t capacity = pn_transport_capacity(transport); - if (capacity > 0) { - c->status |= PN_SEL_RD; - if (c->pending_read) { - c->pending_read = false; - ssize_t n = recv(c->fd, pn_transport_tail(transport), capacity, 0); - if (n < 0) { - if (errno != EAGAIN) { - qdpn_log_errno(c->driver, "recv %s", c->name); - pn_transport_close_tail( transport ); - } - } else if (n == 0) { /* HUP */ - pn_transport_close_tail( transport ); - } else { - pn_transport_process(transport, (size_t) n); - } - } - } - - /// - /// Event wakeup - /// - c->wakeup = qdpn_connector_tick(c, pn_i_now()); - - /// - /// Socket write - /// - ssize_t pending = pn_transport_pending(transport); - if (pending > 0) { - c->status |= PN_SEL_WR; - if (c->pending_write) { - c->pending_write = false; -#ifdef MSG_NOSIGNAL - ssize_t n = send(c->fd, pn_transport_head(transport), pending, MSG_NOSIGNAL); -#else - ssize_t n = send(c->fd, pn_transport_head(transport), pending, 0); -#endif - if (n < 0) { - if (errno != EAGAIN) { - qdpn_log_errno(c->driver, "send %s", c->name); - pn_transport_close_head( transport ); - } - } else if (n) { - pn_transport_pop(transport, (size_t) n); - } - } - } - - if (pn_transport_closed(c->transport)) { - qdpn_connector_close(c); - } -} - -// driver - -qdpn_driver_t *qdpn_driver(qd_log_source_t *log) -{ - qdpn_driver_t *d = (qdpn_driver_t *) malloc(sizeof(qdpn_driver_t)); - if (!d) return NULL; - ZERO(d); - DEQ_INIT(d->listeners); - DEQ_INIT(d->connectors); - d->log = log; - d->lock = sys_mutex(); - -#ifdef __sun - if (pipe(d->ctrl)) - perror("Can't create control pipe"); - - qdpn_configure_sock(d, d->ctrl[0], false); - qdpn_configure_sock(d, d->ctrl[1], false); - - struct sigaction act; - act.sa_handler = SIG_IGN; - sigaction(SIGPIPE, &act, NULL); -#else - d->efd = eventfd(0, EFD_NONBLOCK); - if (d->efd < 0) { - qdpn_log_errno(d, "Can't create eventfd"); - exit(1); - } -#endif - - return d; -} - -void qdpn_driver_free(qdpn_driver_t *d) -{ - if (!d) return; - -#ifdef __sun - close(d->ctrl[0]); - close(d->ctrl[1]); -#else - close(d->efd); -#endif - - qdpn_connector_t *conn = DEQ_HEAD(d->connectors); - - while (conn) { - qdpn_connector_free(conn); - conn = DEQ_HEAD(d->connectors); - } - - qdpn_listener_t *listener = DEQ_HEAD(d->listeners); - - while (listener) { - qdpn_listener_free(listener); - listener = DEQ_HEAD(d->listeners); - } - - free(d->fds); - sys_mutex_free(d->lock); - free(d); -} - -int qdpn_driver_wakeup(qdpn_driver_t *d) -{ -#ifdef __sun - if (d) { - ssize_t count = write(d->ctrl[1], "x", 1); - if (count <= 0) { - return count; - } else { - return 0; - } - } else { - return PN_ARG_ERR; - } -#else - static uint64_t efd_delta = 1; - - if (d) - ignore_result(write(d->efd, &efd_delta, sizeof(uint64_t))); - return 0; -#endif -} - -static void qdpn_driver_rebuild(qdpn_driver_t *d) -{ - sys_mutex_lock(d->lock); - size_t size = DEQ_SIZE(d->listeners) + DEQ_SIZE(d->connectors); - if (d->capacity < size + 1) { - d->capacity = d->capacity > 16 ? d->capacity : 16; - while (d->capacity < size + 1) { - d->capacity *= 2; - } - d->fds = (struct pollfd *) realloc(d->fds, d->capacity*sizeof(struct pollfd)); - } - - - d->wakeup = 0; - d->nfds = 0; - -#ifdef __sun - d->fds[d->nfds].fd = d->ctrl[0]; -#else - d->fds[d->nfds].fd = d->efd; -#endif - d->fds[d->nfds].events = POLLIN; - d->fds[d->nfds].revents = 0; - d->nfds++; - - qdpn_listener_t *l = DEQ_HEAD(d->listeners); - while (l) { - d->fds[d->nfds].fd = l->fd; - d->fds[d->nfds].events = POLLIN; - d->fds[d->nfds].revents = 0; - l->idx = d->nfds; - d->nfds++; - l = DEQ_NEXT(l); - } - - qdpn_connector_t *c = DEQ_HEAD(d->connectors); - while (c) { - if (!c->closed && !c->socket_error && !c->hangup) { - d->wakeup = pn_timestamp_min(d->wakeup, c->wakeup); - d->fds[d->nfds].fd = c->fd; - d->fds[d->nfds].events = (c->status & PN_SEL_RD ? POLLIN : 0) | (c->status & PN_SEL_WR ? POLLOUT : 0); - d->fds[d->nfds].revents = 0; - c->idx = d->nfds; - d->nfds++; - } - c = DEQ_NEXT(c); - } - - sys_mutex_unlock(d->lock); -} - -void qdpn_driver_wait_1(qdpn_driver_t *d) -{ - qdpn_driver_rebuild(d); -} - -int qdpn_driver_wait_2(qdpn_driver_t *d, int timeout) -{ - if (d->wakeup) { - pn_timestamp_t now = pn_i_now(); - if (now >= d->wakeup) - timeout = 0; - else - timeout = (timeout < 0) ? d->wakeup-now : pn_min(timeout, d->wakeup - now); - } - int result = poll(d->fds, d->nfds, d->closed_count > 0 ? 0 : timeout); - if (result == -1 && errno != EINTR) - qdpn_log_errno(d, "poll"); - return result; -} - -int qdpn_driver_wait_3(qdpn_driver_t *d) -{ - bool woken = false; - if (d->fds[0].revents & POLLIN) { - woken = true; -#ifdef __sun - //clear the pipe - char buffer[512]; - while (read(d->ctrl[0], buffer, 512) == 512); -#else - char buffer[sizeof(uint64_t)]; - ignore_result(read(d->efd, buffer, sizeof(uint64_t))); -#endif - } - - sys_mutex_lock(d->lock); - qdpn_listener_t *l = DEQ_HEAD(d->listeners); - while (l) { - l->pending = (l->idx && d->fds[l->idx].revents & POLLIN); - l = DEQ_NEXT(l); - } - - pn_timestamp_t now = pn_i_now(); - qdpn_connector_t *c = DEQ_HEAD(d->connectors); - while (c) { - if (c->closed) { - c->pending_read = false; - c->pending_write = false; - c->pending_tick = false; - } else if (c->idx) { - short revents = d->fds[c->idx].revents; - c->pending_read = (revents & POLLIN); - c->pending_write = (revents & POLLOUT); - c->socket_error = (revents & POLLERR); - c->pending_tick = (c->wakeup && c->wakeup <= now); - if (revents & ~(POLLIN|POLLOUT|POLLERR|POLLHUP)) { - qd_log(c->driver->log, QD_LOG_ERROR, "unexpected poll events %04x on %s", - revents, c->name); - c->socket_error = true; - } - if (revents & POLLHUP) { - c->hangup = true; - /* poll() is signalling POLLHUP. To see what happened we need - * to do an actual recv() to get the error code. But we might - * be in a state where we're not interested in input, in that - * case try to get the error code via send() */ - short events = d->fds[c->idx].events; - if (events & POLLIN) c->pending_read = true; - else if (events & POLLOUT) c->pending_write = true; - } - } - c = DEQ_NEXT(c); - } - - // - // Rotate the head connector to the tail. This improves the fairness of polling on - // open FDs. - // - c = DEQ_HEAD(d->connectors); - if (c) { - DEQ_REMOVE_HEAD(d->connectors); - DEQ_INSERT_TAIL(d->connectors, c); - } - - d->listener_next = DEQ_HEAD(d->listeners); - d->connector_next = DEQ_HEAD(d->connectors); - sys_mutex_unlock(d->lock); - - return woken ? PN_INTR : 0; -} - -// -// XXX - pn_driver_wait has been divided into three internal functions as a -// temporary workaround for a multi-threading problem. A multi-threaded -// application must hold a lock on parts 1 and 3, but not on part 2. -// This temporary change, which is not reflected in the driver's API, allows -// a multi-threaded application to use the three parts separately. -// -// This workaround will eventually be replaced by a more elegant solution -// to the problem. -// -int qdpn_driver_wait(qdpn_driver_t *d, int timeout) -{ - qdpn_driver_wait_1(d); - int result = qdpn_driver_wait_2(d, timeout); - if (result == -1) - return errno; - return qdpn_driver_wait_3(d); -} - -qdpn_listener_t *qdpn_driver_listener(qdpn_driver_t *d) -{ - if (!d) return NULL; - - sys_mutex_lock(d->lock); - while (d->listener_next) { - qdpn_listener_t *l = d->listener_next; - d->listener_next = DEQ_NEXT(l); - - if (l->pending) { - sys_mutex_unlock(d->lock); - return l; - } - } - - sys_mutex_unlock(d->lock); - return NULL; -} - -qdpn_connector_t *qdpn_driver_connector(qdpn_driver_t *d) -{ - if (!d) return NULL; - - sys_mutex_lock(d->lock); - while (d->connector_next) { - qdpn_connector_t *c = d->connector_next; - d->connector_next = DEQ_NEXT(c); - - if (c->closed || c->pending_read || c->pending_write || c->pending_tick || c->socket_error) { - sys_mutex_unlock(d->lock); - return c; - } - } - - sys_mutex_unlock(d->lock); - return NULL; -} - -void qdpn_connector_wakeup(qdpn_connector_t *c, pn_timestamp_t t) { - c->wakeup = t; -} - -void qdpn_connector_set_methods(qdpn_connector_t *c, qdpn_connector_methods_t *m) { - c->methods = m; -}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 20bcf56..277fd37 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -472,7 +472,6 @@ void qdr_connection_handlers(qdr_core_t *core, qdr_delivery_update_t delivery_update) { core->user_context = context; - core->activate_handler = activate; core->first_attach_handler = first_attach; core->second_attach_handler = second_attach; core->detach_handler = detach; @@ -492,10 +491,7 @@ void qdr_connection_handlers(qdr_core_t *core, void qdr_connection_activate_CT(qdr_core_t *core, qdr_connection_t *conn) { - if (!conn->in_activate_list) { - DEQ_INSERT_TAIL_N(ACTIVATE, core->connections_to_activate, conn); - conn->in_activate_list = true; - } + qd_server_activate((qd_connection_t*) qdr_connection_get_context(conn)); } @@ -1236,14 +1232,6 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo work = DEQ_HEAD(conn->work_list); } - // - // If this connection is on the activation list, remove it from the list - // - if (conn->in_activate_list) { - conn->in_activate_list = false; - DEQ_REMOVE_N(ACTIVATE, core->connections_to_activate, conn); - } - DEQ_REMOVE(core->open_connections, conn); sys_mutex_free(conn->work_lock); qdr_connection_free(conn); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 5e92326..ecd4807 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -518,7 +518,6 @@ struct qdr_connection_t { uint64_t identity; qdr_core_t *core; bool incoming; - bool in_activate_list; qdr_connection_role_t role; int inter_router_cost; qdr_conn_identifier_t *conn_id; @@ -611,7 +610,6 @@ struct qdr_core_t { qd_timer_t *work_timer; qdr_connection_list_t open_connections; - qdr_connection_list_t connections_to_activate; qdr_link_list_t open_links; // @@ -636,7 +634,6 @@ struct qdr_core_t { // Connection section // void *user_context; - qdr_connection_activate_t activate_handler; qdr_link_first_attach_t first_attach_handler; qdr_link_second_attach_t second_attach_handler; qdr_link_detach_t detach_handler; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/router_core/router_core_thread.c ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_thread.c b/src/router_core/router_core_thread.c index 5cffeb9..926063c 100644 --- a/src/router_core/router_core_thread.c +++ b/src/router_core/router_core_thread.c @@ -29,19 +29,6 @@ ALLOC_DEFINE(qdr_action_t); - -static void qdr_activate_connections_CT(qdr_core_t *core) -{ - qdr_connection_t *conn = DEQ_HEAD(core->connections_to_activate); - while (conn) { - DEQ_REMOVE_HEAD_N(ACTIVATE, core->connections_to_activate); - conn->in_activate_list = false; - core->activate_handler(core->user_context, conn, DEQ_IS_EMPTY(core->connections_to_activate)); - conn = DEQ_HEAD(core->connections_to_activate); - } -} - - void *router_core_thread(void *arg) { qdr_core_t *core = (qdr_core_t*) arg; @@ -84,11 +71,6 @@ void *router_core_thread(void *arg) free_qdr_action_t(action); action = DEQ_HEAD(action_list); } - - // - // Activate all connections that were flagged for activation during the above processing - // - qdr_activate_connections_CT(core); } qd_log(core->log, QD_LOG_INFO, "Router Core thread exited"); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/router_node.c ---------------------------------------------------------------------- diff --git a/src/router_node.c b/src/router_node.c index 12b6887..ca7d746 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -27,6 +27,7 @@ #include "entity_cache.h" #include "router_private.h" #include <qpid/dispatch/router_core.h> +#include <proton/sasl.h> const char *QD_ROUTER_NODE_TYPE = "router.node"; const char *QD_ROUTER_ADDRESS_TYPE = "router.address"; @@ -58,7 +59,7 @@ static void qd_router_connection_get_config(const qd_connection_t *conn, *strip_annotations_out = cf ? cf->strip_outbound_annotations : false; *link_capacity = cf ? cf->link_capacity : 1; - if (cf && strcmp(cf->role, router_role) == 0) { + if (cf && strcmp(cf->role, router_role) == 0) { *strip_annotations_in = false; *strip_annotations_out = false; *role = QDR_ROLE_INTER_ROUTER; @@ -296,7 +297,7 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd) qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn); int tenant_space_len; const char *tenant_space = qdr_connection_get_tenant_space(qdr_conn, &tenant_space_len); - if (conn->policy_settings) + if (conn->policy_settings) check_user = !conn->policy_settings->allowUserIdProxy; // @@ -650,7 +651,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool const qd_server_config_t *config; if (qd_connection_connector(conn)) { config = qd_connector_config(qd_connection_connector(conn)); - snprintf(host_local, 254, "%s:%s", config->host, config->port); + snprintf(host_local, 254, "%s", config->host_port); host = &host_local[0]; } else @@ -851,14 +852,13 @@ qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *are } -static void CORE_connection_activate(void *context, qdr_connection_t *conn, bool awaken) +static void CORE_connection_activate(void *context, qdr_connection_t *conn) { // // IMPORTANT: This is the only core callback that is invoked on the core - // thread itself. It is imperative that this function do nothing - // apart from setting the activation in the server for the connection. + // thread itself. It must not take locks that could deadlock the core. // - qd_server_activate((qd_connection_t*) qdr_connection_get_context(conn), awaken); + qd_server_activate((qd_connection_t*) qdr_connection_get_context(conn)); } @@ -952,7 +952,7 @@ static void CORE_link_flow(void *context, qdr_link_t *link, int credit) qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link); if (!qlink) return; - + pn_link_t *plink = qd_link_pn(qlink); if (plink) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
