PROTON-1438: C proactor listening behavior Improved listening behavior for pn_proactor_listen to allow selective listening by protocol (ipv4/v6) or portable "listen to everything".
Host can be a host name, IPV4 or IPV6 literal, or the empty string/NULL (treated the same). The empty string listens on all local addresses. A host name listens on all addresses associated with the name. An IPV6 literal address (or wildcard '[::]') listens only for IPV6. An IPV4 literal address (or wildcard '0.0.0.0') listens only for IPV4. - pn_proactor_listen may listen on more than one socket for ipv6/v4 or for DNS names with multiple address records. - the 'backlog' applies to *each* socket - an error on any socket will close all the sockets of the listener, PN_LISTERN_CLOSE event indicates all sockets are closed and provides the error that triggered the close. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ce1b3d1f Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ce1b3d1f Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ce1b3d1f Branch: refs/heads/master Commit: ce1b3d1f84f40963f5ea1ec391a5f589ffb62ac1 Parents: d48bf9b Author: Alan Conway <[email protected]> Authored: Sun Mar 19 14:57:13 2017 -0400 Committer: Alan Conway <[email protected]> Committed: Wed Mar 22 10:59:04 2017 -0400 ---------------------------------------------------------------------- proton-c/include/proton/cid.h | 4 +- proton-c/include/proton/listener.h | 8 + proton-c/include/proton/proactor.h | 25 +- proton-c/src/proactor/libuv.c | 791 +++++++++++++++++++------------- proton-c/src/tests/proactor.c | 249 +++++++--- proton-c/src/tests/test_tools.h | 86 ++-- 6 files changed, 746 insertions(+), 417 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce1b3d1f/proton-c/include/proton/cid.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/cid.h b/proton-c/include/proton/cid.h index 2d68896..e0766a0 100644 --- a/proton-c/include/proton/cid.h +++ b/proton-c/include/proton/cid.h @@ -64,7 +64,9 @@ typedef enum { CID_pn_url, CID_pn_listener, - CID_pn_proactor + CID_pn_proactor, + + CID_pn_listener_socket } pn_cid_t; /** http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce1b3d1f/proton-c/include/proton/listener.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/listener.h b/proton-c/include/proton/listener.h index 2038c06..6646dd1 100644 --- a/proton-c/include/proton/listener.h +++ b/proton-c/include/proton/listener.h @@ -22,6 +22,7 @@ #include <proton/import_export.h> #include <proton/types.h> +#include <proton/event.h> #ifdef __cplusplus extern "C" { @@ -89,6 +90,9 @@ PNP_EXTERN pn_record_t *pn_listener_attachments(pn_listener_t *listener); /** * Close the listener (thread safe). + * + * The PN_LISTENER_CLOSE event is generated when the listener has stopped listening. + * */ PNP_EXTERN void pn_listener_close(pn_listener_t *l); @@ -97,6 +101,10 @@ PNP_EXTERN void pn_listener_close(pn_listener_t *l); */ PNP_EXTERN pn_proactor_t *pn_listener_proactor(pn_listener_t *c); +/** + * Return the listener associated with an event or NULL. + */ +PNP_EXTERN pn_listener_t *pn_event_listener(pn_event_t *event); /** *@} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce1b3d1f/proton-c/include/proton/proactor.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h index 43b8ccb..185a1af 100644 --- a/proton-c/include/proton/proactor.h +++ b/proton-c/include/proton/proactor.h @@ -79,15 +79,23 @@ PNP_EXTERN int pn_proactor_connect( pn_proactor_t *proactor, pn_connection_t *connection, const char *addr); /** - * Start listening with listener. pn_proactor_wait() will return a - * PN_LISTENER_ACCEPT event when a connection can be accepted. + * Start listening with listener. + * + * pn_proactor_wait() will return a PN_LISTENER_ACCEPT event when a connection can be + * accepted. + * * * @param[in] proactor the proactor object * @param[in] listener proactor takes ownership of listener, do not free - * @param[in] addr the network address (not AMQP address) to connect to. May - * be in the form "host:port" or an "amqp://" or "amqps://" URL. The `/path` part of - * the URL is ignored. - * @param[in] backlog number of connection requests to queue + * @param[in] addr the network address (not AMQP address) to connect to in "host:port" + * + * The host can be a host name, IPV4 or IPV6 literal, or the empty string. The empty + * string listens on all local addresses. A host name listens on all addresses associated + * with the name. An IPV6 literal address (or wildcard '[::]') listens only for IPV6. An + * IPV4 literal address (or wildcard '0.0.0.0') listens only for IPV4." + * + * @param[in] backlog number of connection requests to queue. If the host resolves + * to multiple addresses, this backlog applies to each address. * * @return error on immediate error, e.g. an allocation failure. * Other errors are indicated by pn_listener_condition() on the @@ -187,11 +195,6 @@ PNP_EXTERN pn_proactor_t *pn_connection_proactor(pn_connection_t *connection); PNP_EXTERN pn_proactor_t *pn_event_proactor(pn_event_t *event); /** - * Return the listener associated with an event or NULL. - */ -PNP_EXTERN pn_listener_t *pn_event_listener(pn_event_t *event); - -/** * @} */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce1b3d1f/proton-c/src/proactor/libuv.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c index 1d16972..a077a5f 100644 --- a/proton-c/src/proactor/libuv.c +++ b/proton-c/src/proactor/libuv.c @@ -77,76 +77,126 @@ PN_HANDLE(PN_PROACTOR) PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor) PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener) -/* common to connection and listener */ -typedef struct psocket_t { + +/* ================ Queues ================ */ +static int unqueued; /* Provide invalid address for _unqueued pointers */ + +#define QUEUE_DECL(T) \ + typedef struct T##_queue_t { T##_t *front, *back; } T##_queue_t; \ + \ + static T##_t *T##_unqueued = (T##_t*)&unqueued; \ + \ + static void T##_push(T##_queue_t *q, T##_t *x) { \ + assert(x->next == T##_unqueued); \ + x->next = NULL; \ + if (!q->front) { \ + q->front = q->back = x; \ + } else { \ + q->back->next = x; \ + q->back = x; \ + } \ + } \ + \ + static T##_t* T##_pop(T##_queue_t *q) { \ + T##_t *x = q->front; \ + if (x) { \ + q->front = x->next; \ + x->next = T##_unqueued; \ + } \ + return x; \ + } + + +/* All work structs and UV callback data structs start with a struct_type member */ +typedef enum { T_CONNECTION, T_LISTENER, T_LSOCKET } struct_type; + +/* A stream of serialized work for the proactor */ +typedef struct work_t { /* Immutable */ + struct_type type; pn_proactor_t *proactor; - char host[NI_MAXHOST]; - char port[NI_MAXSERV]; - bool is_conn; /* Protected by proactor.lock */ - struct psocket_t* next; + struct work_t* next; bool working; /* Owned by a worker thread */ +} work_t; - /* Only used by leader thread when it owns the psocket */ - uv_tcp_t tcp; -} psocket_t; +QUEUE_DECL(work) -typedef struct queue { psocket_t *front, *back; } queue; +static void work_init(work_t* w, pn_proactor_t* p, struct_type type) { + w->proactor = p; + w->next = work_unqueued; + w->type = type; + w->working = true; +} -/* Special value for psocket.next pointer when socket is not on any any list. */ -psocket_t UNLISTED; +/* ================ IO ================ */ -static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const char *host, const char *port) { - ps->proactor = p; - ps->next = &UNLISTED; - ps->is_conn = is_conn; - ps->tcp.data = NULL; /* Set in leader_init */ - ps->working = true; +#define MAXADDR (NI_MAXHOST+NI_MAXSERV) - /* For platforms that don't know about "amqp" and "amqps" service names. */ - if (port && strcmp(port, AMQP_PORT_NAME) == 0) - port = AMQP_PORT; - else if (port && strcmp(port, AMQPS_PORT_NAME) == 0) - port = AMQPS_PORT; - /* Set to "\001" to indicate a NULL as opposed to an empty string "" */ - strncpy(ps->host, host ? host : "\001", sizeof(ps->host)); - strncpy(ps->port, port ? port : "\001", sizeof(ps->port)); -} +/* A resolvable address */ +typedef struct addr_t { + char addr[MAXADDR]; + char *host, *port; /* Point into addr after destructive pni_url_parse */ + uv_getaddrinfo_t getaddrinfo; /* UV getaddrinfo request, contains list of addrinfo */ + struct addrinfo* addrinfo; /* The current addrinfo being tried */ +} addr_t; -/* Turn "\001" back to NULL */ -static inline const char* fixstr(const char* str) { - return str[0] == '\001' ? NULL : str; -} +/* A single listening socket, a listener can have more than one */ +typedef struct lsocket_t { + struct_type type; /* Always T_LSOCKET */ + pn_listener_t *parent; + uv_tcp_t tcp; +} lsocket_t; + +PN_STRUCT_CLASSDEF(lsocket, CID_pn_listener_socket) typedef enum { W_NONE, W_PENDING, W_CLOSED } wake_state; /* An incoming or outgoing connection. */ typedef struct pconnection_t { - psocket_t psocket; + work_t work; /* Must be first to allow casting */ + struct pconnection_t *next; /* For listener list */ /* Only used by owner thread */ pn_connection_driver_t driver; /* Only used by leader */ + uv_tcp_t tcp; + addr_t addr; + + uv_connect_t connect; /* Outgoing connection only */ + int connected; /* 0: not connected, <0: connecting after error, 1 = connected ok */ + + lsocket_t *lsocket; /* Incoming connection only */ + uv_timer_t timer; uv_write_t write; - uv_shutdown_t shutdown; size_t writing; /* size of pending write request, 0 if none pending */ - - /* Outgoing connection only */ - uv_connect_t connect; + uv_shutdown_t shutdown; /* Locked for thread-safe access */ uv_mutex_t lock; wake_state wake; } pconnection_t; +QUEUE_DECL(pconnection) + +typedef enum { + L_UNINIT, /**<< Not yet listening */ + L_LISTENING, /**<< Listening */ + L_CLOSE, /**<< Close requested */ + L_CLOSING, /**<< Socket close initiated, wait for close */ + L_CLOSED /**<< User saw PN_LISTENER_CLOSED, all done */ +} listener_state; -/* a listener socket */ +/* A listener */ struct pn_listener_t { - psocket_t psocket; + work_t work; /* Must be first to allow casting */ + + size_t nsockets; + lsocket_t *sockets; + lsocket_t prealloc[1]; /* Pre-allocated socket array, allocate larger if needed */ /* Only used by owner thread */ pn_event_batch_t batch; @@ -154,14 +204,17 @@ struct pn_listener_t { void *context; size_t backlog; + /* Only used by leader */ + addr_t addr; + /* Locked for thread-safe access. uv_listen can't be stopped or cancelled so we can't * detach a listener from the UV loop to prevent concurrent access. */ uv_mutex_t lock; pn_condition_t *condition; pn_collector_t *collector; - queue accept; /* pconnection_t for uv_accept() */ - bool closed; + pconnection_queue_t accept; /* pconnection_t list for accepting */ + listener_state state; }; struct pn_proactor_t { @@ -177,11 +230,11 @@ struct pn_proactor_t { /* Protected by lock */ uv_mutex_t lock; - queue worker_q; /* psockets ready for work, to be returned via pn_proactor_wait() */ - queue leader_q; /* psockets waiting for attention by the leader thread */ + work_queue_t worker_q; /* ready for work, to be returned via pn_proactor_wait() */ + work_queue_t leader_q; /* waiting for attention by the leader thread */ size_t interrupt; /* pending interrupts */ pn_millis_t timeout; - size_t count; /* psocket count */ + size_t count; /* connection/listener count for INACTIVE events */ bool inactive; bool timeout_request; bool timeout_elapsed; @@ -189,62 +242,40 @@ struct pn_proactor_t { bool batch_working; /* batch is being processed in a worker thread */ }; -static void push_lh(queue *q, psocket_t *ps) { - assert(ps->next == &UNLISTED); - ps->next = NULL; - if (!q->front) { - q->front = q->back = ps; - } else { - q->back->next = ps; - q->back = ps; - } -} - -/* Pop returns front of q or NULL if empty */ -static psocket_t* pop_lh(queue *q) { - psocket_t *ps = q->front; - if (ps) { - q->front = ps->next; - ps->next = &UNLISTED; - } - return ps; -} /* Notify the leader thread that there is something to do outside of uv_run() */ static inline void notify(pn_proactor_t* p) { uv_async_send(&p->async); } -/* Notify that this socket needs attention from the leader at the next opportunity */ -static void psocket_notify(psocket_t *ps) { - uv_mutex_lock(&ps->proactor->lock); +/* Notify that this work item needs attention from the leader at the next opportunity */ +static void work_notify(work_t *w) { + uv_mutex_lock(&w->proactor->lock); /* If the socket is in use by a worker or is already queued then leave it where it is. It will be processed in pn_proactor_done() or when the queue it is on is processed. */ - if (!ps->working && ps->next == &UNLISTED) { - push_lh(&ps->proactor->leader_q, ps); - notify(ps->proactor); + if (!w->working && w->next == work_unqueued) { + work_push(&w->proactor->leader_q, w); + notify(w->proactor); } - uv_mutex_unlock(&ps->proactor->lock); + uv_mutex_unlock(&w->proactor->lock); } -/* Notify the leader of a newly-created socket */ -static void psocket_start(psocket_t *ps) { - uv_mutex_lock(&ps->proactor->lock); - if (ps->next == &UNLISTED) { /* No-op if already queued */ - ps->working = false; - push_lh(&ps->proactor->leader_q, ps); - notify(ps->proactor); - uv_mutex_unlock(&ps->proactor->lock); +/* Notify the leader of a newly-created work item */ +static void work_start(work_t *w) { + uv_mutex_lock(&w->proactor->lock); + if (w->next == work_unqueued) { /* No-op if already queued */ + w->working = false; + work_push(&w->proactor->leader_q, w); + notify(w->proactor); + uv_mutex_unlock(&w->proactor->lock); } } -static inline pconnection_t *as_pconnection(psocket_t* ps) { - return ps->is_conn ? (pconnection_t*)ps : NULL; -} - -static inline pn_listener_t *as_listener(psocket_t* ps) { - return ps->is_conn ? NULL: (pn_listener_t*)ps; +static void parse_addr(addr_t *addr, const char *str) { + strncpy(addr->addr, str, sizeof(addr->addr)); + char *scheme, *user, *pass, *path; + pni_parse_url(addr->addr, &scheme, &user, &pass, &addr->host, &addr->port, &path); } /* Make a pn_class for pconnection_t since it is attached to a pn_connection_t record */ @@ -262,7 +293,7 @@ static void pconnection_finalize(void *vp_pconnection) { static const pn_class_t pconnection_class = PN_CLASS(pconnection); -static pconnection_t *pconnection(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) { +static pconnection_t *pconnection(pn_proactor_t *p, pn_connection_t *c, bool server) { /* pconnection_t is a pn_class instance so we can attach it to the pn_connection_t and it will be finalized when the pn_connection_t is freed. */ @@ -272,15 +303,18 @@ static pconnection_t *pconnection(pn_proactor_t *p, pn_connection_t *c, bool ser if (!pc || pn_connection_driver_init(&pc->driver, c, NULL) != 0) { return NULL; } - psocket_init(&pc->psocket, p, true, host, port); - pc->write.data = &pc->psocket; + work_init(&pc->work, p, T_CONNECTION); + pc->next = pconnection_unqueued; + pc->write.data = &pc->work; if (server) { pn_transport_set_server(pc->driver.transport); } + pc->addr.host = pc->addr.port = pc->addr.addr; /* Set host/port to "" by default */ pn_record_t *r = pn_connection_attachments(pc->driver.connection); pn_record_def(r, PN_PROACTOR, &pconnection_class); pn_record_set(r, PN_PROACTOR, pc); pn_decref(pc); /* Will be deleted when the connection is */ + pc->addr.host = pc->addr.port = pc->addr.addr; /* Set host/port to "" by default */ return pc; } @@ -302,38 +336,51 @@ static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) { return d ? (pconnection_t*)((char*)d - offsetof(pconnection_t, driver)) : NULL; } -static inline psocket_t *batch_psocket(pn_event_batch_t *batch) { +static inline work_t *batch_work(pn_event_batch_t *batch) { pconnection_t *pc = batch_pconnection(batch); - if (pc) return &pc->psocket; + if (pc) return &pc->work; pn_listener_t *l = batch_listener(batch); - if (l) return &l->psocket; + if (l) return &l->work; return NULL; } -static void leader_count(pn_proactor_t *p, int change) { +/* Total count of listener and connections for PN_PROACTOR_INACTIVE */ +static void leader_inc(pn_proactor_t *p) { uv_mutex_lock(&p->lock); - p->count += change; - if (p->count == 0) { + ++p->count; + uv_mutex_unlock(&p->lock); +} + +static void leader_dec(pn_proactor_t *p) { + uv_mutex_lock(&p->lock); + assert(p->count > 0); + if (--p->count == 0) { p->inactive = true; notify(p); } uv_mutex_unlock(&p->lock); } -static void pn_listener_free(pn_listener_t *l); +static void pconnection_free(pconnection_t *pc) { + if (pc->addr.getaddrinfo.addrinfo) { + uv_freeaddrinfo(pc->addr.getaddrinfo.addrinfo); /* Interrupted after resolve */ + } + pn_incref(pc); /* Make sure we don't do a circular free */ + pn_connection_driver_destroy(&pc->driver); + pn_decref(pc); + /* Now pc is freed iff the connection is, otherwise remains till the pn_connection_t is freed. */ +} -/* Final close event for for a pconnection_t, closes the timer */ +/* Final close event for for a pconnection_t, disconnects from proactor */ static void on_close_pconnection_final(uv_handle_t *h) { - /* If the life of the pn_connection_t has been extended with reference counts - we want the pconnection_t to have the same lifespan so calls to pn_connection_wake - will be valid (but no-ops) + /* Free resources associated with a pconnection_t. + If the life of the pn_connection_t has been extended with reference counts + we want the pconnection_t to have the same lifespan so calls to pn_connection_wake() + will be valid, but no-ops. */ pconnection_t *pc = (pconnection_t*)h->data; - /* Break circular references */ - pn_incref(pc); /* Don't let driver_destroy free pc */ - pn_connection_driver_destroy(&pc->driver); - pn_decref(pc); - /* Now pc is freed iff the connection is, otherwise remains till the is freed. */ + leader_dec(pc->work.proactor); + pconnection_free(pc); } static void uv_safe_close(uv_handle_t *h, uv_close_cb cb) { @@ -342,18 +389,27 @@ static void uv_safe_close(uv_handle_t *h, uv_close_cb cb) { } } -/* Close event for uv_tcp_t of a psocket_t */ -static void on_close_psocket(uv_handle_t *h) { - psocket_t *ps = (psocket_t*)h->data; - leader_count(ps->proactor, -1); - if (ps->is_conn) { - pconnection_t *pc = as_pconnection(ps); - uv_timer_stop(&pc->timer); - /* Delay the free till the timer handle is also closed */ - uv_safe_close((uv_handle_t*)&pc->timer, on_close_pconnection_final); - } else { - pn_listener_free(as_listener(ps)); +static void on_close_pconnection(uv_handle_t *h) { + pconnection_t *pc = (pconnection_t*)h->data; + /* Delay the free till the timer handle is also closed */ + uv_timer_stop(&pc->timer); + uv_safe_close((uv_handle_t*)&pc->timer, on_close_pconnection_final); +} + +static void listener_close_lh(pn_listener_t* l) { + if (l->state < L_CLOSE) { + l->state = L_CLOSE; } + work_notify(&l->work); +} + +static void on_close_lsocket(uv_handle_t *h) { + lsocket_t* ls = (lsocket_t*)h->data; + pn_listener_t *l = ls->parent; + uv_mutex_lock(&l->lock); + --l->nsockets; + listener_close_lh(l); + uv_mutex_unlock(&l->lock); } static pconnection_t *get_pconnection(pn_connection_t* c) { @@ -364,174 +420,280 @@ static pconnection_t *get_pconnection(pn_connection_t* c) { return (pconnection_t*) pn_record_get(r, PN_PROACTOR); } +static inline void pconnection_bad_connect(pconnection_t *pc, int err) { + if (!pc->connected) { + pc->connected = err; /* Remember first connect error in case they all fail */ + } +} + static void pconnection_error(pconnection_t *pc, int err, const char* what) { assert(err); + pconnection_bad_connect(pc, err); pn_connection_driver_t *driver = &pc->driver; pn_connection_driver_bind(driver); /* Bind so errors will be reported */ if (!pn_condition_is_set(pn_transport_condition(driver->transport))) { pn_connection_driver_errorf(driver, uv_err_name(err), "%s %s:%s: %s", - what, fixstr(pc->psocket.host), fixstr(pc->psocket.port), + what, pc->addr.host, pc->addr.port, uv_strerror(err)); } pn_connection_driver_close(driver); } -static void listener_error(pn_listener_t *l, int err, const char* what) { +static void listener_error_lh(pn_listener_t *l, int err, const char* what) { assert(err); - uv_mutex_lock(&l->lock); if (!pn_condition_is_set(l->condition)) { pn_condition_format(l->condition, uv_err_name(err), "%s %s:%s: %s", - what, fixstr(l->psocket.host), fixstr(l->psocket.port), + what, l->addr.host, l->addr.port, uv_strerror(err)); } + listener_close_lh(l); +} + +static void listener_error(pn_listener_t *l, int err, const char* what) { + uv_mutex_lock(&l->lock); + listener_error_lh(l, err, what); uv_mutex_unlock(&l->lock); - pn_listener_close(l); } -static void psocket_error(psocket_t *ps, int err, const char* what) { - if (ps->is_conn) { - pconnection_error(as_pconnection(ps), err, what); +static int pconnection_init(pconnection_t *pc) { + int err = 0; + err = uv_tcp_init(&pc->work.proactor->loop, &pc->tcp); + if (!err) { + pc->tcp.data = pc; + pc->connect.data = pc; + err = uv_timer_init(&pc->work.proactor->loop, &pc->timer); + if (!err) { + pc->timer.data = pc; + } else { + uv_close((uv_handle_t*)&pc->tcp, NULL); + } + } + if (!err) { + leader_inc(pc->work.proactor); } else { - listener_error(as_listener(ps), err, what); + pconnection_error(pc, err, "initialization"); } + return err; } -/* psocket uv-initialization */ -static int leader_init(psocket_t *ps) { - ps->working = false; - ps->tcp.data = ps; - leader_count(ps->proactor, +1); - int err = uv_tcp_init(&ps->proactor->loop, &ps->tcp); +static void try_connect(pconnection_t *pc); + +static void on_connect_fail(uv_handle_t *handle) { + pconnection_t *pc = (pconnection_t*)handle->data; + /* Create a new TCP socket, the current one is closed */ + int err = uv_tcp_init(&pc->work.proactor->loop, &pc->tcp); if (err) { - psocket_error(ps, err, "initialization"); + pc->connected = err; + pc->addr.addrinfo = NULL; /* No point in trying anymore, we can't create a socket */ } else { - pconnection_t *pc = as_pconnection(ps); - if (pc) { - pc->connect.data = ps; - int err = uv_timer_init(&ps->proactor->loop, &pc->timer); - if (!err) { - pc->timer.data = ps; - } - } + try_connect(pc); } - return err; } /* Outgoing connection */ static void on_connect(uv_connect_t *connect, int err) { pconnection_t *pc = (pconnection_t*)connect->data; - if (err) pconnection_error(pc, err, "on connect to"); - psocket_notify(&pc->psocket); + if (!err) { + pc->connected = 1; + pn_connection_open(pc->driver.connection); + work_notify(&pc->work); + uv_freeaddrinfo(pc->addr.getaddrinfo.addrinfo); /* Done with address info */ + pc->addr.getaddrinfo.addrinfo = NULL; + } else { + pconnection_bad_connect(pc, err); + uv_safe_close((uv_handle_t*)&pc->tcp, on_connect_fail); /* Try the next addr if there is one */ + } } /* Incoming connection ready to be accepted */ static void on_connection(uv_stream_t* server, int err) { /* Unlike most on_* functions, this can be called by the leader thread when the listener - * is ON_WORKER or ON_LEADER, because there's no way to stop libuv from calling - * on_connection(). Update the state of the listener and queue it for leader attention. + * is ON_WORKER or ON_LEADER, because + * + * 1. There's no way to stop libuv from calling on_connection(). + * 2. There can be multiple lsockets per listener. + * + * Update the state of the listener and queue it for leader attention. */ - pn_listener_t *l = (pn_listener_t*) server->data; + lsocket_t *ls = (lsocket_t*)server->data; + pn_listener_t *l = ls->parent; if (err) { listener_error(l, err, "on incoming connection"); } else { uv_mutex_lock(&l->lock); - pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT); + pn_collector_put(l->collector, lsocket__class(), ls, PN_LISTENER_ACCEPT); uv_mutex_unlock(&l->lock); - psocket_notify(&l->psocket); } + work_notify(&l->work); } /* Common address resolution for leader_listen and leader_connect */ -static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) { - int err = leader_init(ps); +static int leader_resolve(pn_proactor_t *p, addr_t *addr, bool listen) { struct addrinfo hints = { 0 }; - if (server) hints.ai_flags = AI_PASSIVE; - if (!err) { - err = uv_getaddrinfo(&ps->proactor->loop, info, NULL, fixstr(ps->host), fixstr(ps->port), &hints); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + /* Note this looks contradictory since we disable V4 mapping in bind() but it is + correct - read the getaddrinfo man page carefully! + */ + hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG; + if (listen) { + hints.ai_flags |= AI_PASSIVE | AI_ALL; } + int err = uv_getaddrinfo(&p->loop, &addr->getaddrinfo, NULL, + *addr->host ? addr->host : NULL, addr->port, &hints); + addr->addrinfo = addr->getaddrinfo.addrinfo; /* Start with the first addrinfo */ return err; } -static void leader_connect(pconnection_t *pc) { - uv_getaddrinfo_t info; - int err = leader_resolve(&pc->psocket, &info, false); - if (!err) { - err = uv_tcp_connect(&pc->connect, &pc->psocket.tcp, info.addrinfo->ai_addr, on_connect); - uv_freeaddrinfo(info.addrinfo); +/* Try to connect to the current addrinfo. Called by leader and via callbacks for retry.*/ +static void try_connect(pconnection_t *pc) { + struct addrinfo *ai = pc->addr.addrinfo; + if (!ai) { /* End of list, connect fails */ + uv_freeaddrinfo(pc->addr.getaddrinfo.addrinfo); + pc->addr.getaddrinfo.addrinfo = NULL; + pconnection_bad_connect(pc, UV_EAI_NODATA); + pconnection_error(pc, pc->connected, "connecting to"); + work_notify(&pc->work); + } else { + pc->addr.addrinfo = ai->ai_next; /* Advance for next attempt */ + int err = uv_tcp_connect(&pc->connect, &pc->tcp, ai->ai_addr, on_connect); + if (err) { + pconnection_bad_connect(pc, err); + uv_close((uv_handle_t*)&pc->tcp, on_connect_fail); /* Queue up next attempt */ + } } +} + +static bool leader_connect(pconnection_t *pc) { + int err = pconnection_init(pc); + if (!err) err = leader_resolve(pc->work.proactor, &pc->addr, false); if (err) { - pconnection_error(pc, err, "connecting to"); + pconnection_error(pc, err, "connect resolving"); + return true; } else { - pn_connection_open(pc->driver.connection); + try_connect(pc); + return false; } } -static void leader_listen(pn_listener_t *l) { - uv_getaddrinfo_t info; - int err = leader_resolve(&l->psocket, &info, true); +static int lsocket_init(lsocket_t *ls, pn_listener_t *l, struct addrinfo *ai) { + ls->type = T_LSOCKET; + ls->parent = l; + ls->tcp.data = ls; + int err = uv_tcp_init(&l->work.proactor->loop, &ls->tcp); if (!err) { - err = uv_tcp_bind(&l->psocket.tcp, info.addrinfo->ai_addr, 0); - uv_freeaddrinfo(info.addrinfo); + int flags = (ai->ai_family == AF_INET6) ? UV_TCP_IPV6ONLY : 0; + err = uv_tcp_bind(&ls->tcp, ai->ai_addr, flags); + if (!err) err = uv_listen((uv_stream_t*)&ls->tcp, l->backlog, on_connection); + if (err) uv_close((uv_handle_t*)&ls->tcp, NULL); } + return err; +} + +#define ARRAY_LEN(A) (sizeof(A)/sizeof(*(A))) + +/* Listen on all available addresses */ +static void leader_listen_lh(pn_listener_t *l) { + leader_inc(l->work.proactor); + int err = leader_resolve(l->work.proactor, &l->addr, true); if (!err) { - err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_connection); + /* Count addresses, allocate enough space */ + size_t len = 0; + for (struct addrinfo *ai = l->addr.getaddrinfo.addrinfo; ai; ai = ai->ai_next) { + ++len; + } + assert(len > 0); /* Guaranteed by getaddrinfo() */ + l->sockets = (len > ARRAY_LEN(l->prealloc)) ? (lsocket_t*)calloc(len, sizeof(lsocket_t)) : l->prealloc; + /* Find the working addresses */ + l->nsockets = 0; + int first_err = 0; + for (struct addrinfo *ai = l->addr.getaddrinfo.addrinfo; ai; ai = ai->ai_next) { + lsocket_t *ls = &l->sockets[l->nsockets]; + int err2 = lsocket_init(ls, l, ai); + if (!err2) { + ++l->nsockets; /* Next socket */ + } else if (!first_err) { + first_err = err2; + } + } + uv_freeaddrinfo(l->addr.getaddrinfo.addrinfo); + l->addr.getaddrinfo.addrinfo = NULL; + if (l->nsockets == 0) err = first_err; } - uv_mutex_lock(&l->lock); /* Always put an OPEN event for symmetry, even if we immediately close with err */ pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN); - uv_mutex_unlock(&l->lock); if (err) { - listener_error(l, err, "listening on"); + listener_error_lh(l, err, "listening on"); } } -static bool listener_has_work(pn_listener_t *l) { - uv_mutex_lock(&l->lock); - bool has_work = pn_collector_peek(l->collector); - uv_mutex_unlock(&l->lock); - return has_work; -} - -static pconnection_t *listener_pop(pn_listener_t *l) { - uv_mutex_lock(&l->lock); - pconnection_t *pc = (pconnection_t*)pop_lh(&l->accept); - uv_mutex_unlock(&l->lock); - return pc; -} - -static bool listener_finished(pn_listener_t *l) { - uv_mutex_lock(&l->lock); - bool finished = l->closed && !pn_collector_peek(l->collector) && !l->accept.front; - uv_mutex_unlock(&l->lock); - return finished; +static void pn_listener_free(pn_listener_t *l) { + if (l) { + if (l->addr.getaddrinfo.addrinfo) { /* Interrupted after resolve */ + uv_freeaddrinfo(l->addr.getaddrinfo.addrinfo); + } + if (l->collector) pn_collector_free(l->collector); + if (l->condition) pn_condition_free(l->condition); + if (l->attachments) pn_free(l->attachments); + if (l->sockets && l->sockets != l->prealloc) free(l->sockets); + assert(!l->accept.front); + free(l); + } } /* Process a listener, return true if it has events for a worker thread */ static bool leader_process_listener(pn_listener_t *l) { /* NOTE: l may be concurrently accessed by on_connection() */ + bool closed = false; + uv_mutex_lock(&l->lock); + switch (l->state) { - if (l->psocket.tcp.data == NULL) { - /* Start listening if not already listening */ - leader_listen(l); - } else if (listener_finished(l)) { - /* Close if listener is finished. */ - uv_safe_close((uv_handle_t*)&l->psocket.tcp, on_close_psocket); - return false; - } else { - /* Process accepted connections if any */ - pconnection_t *pc; - while ((pc = listener_pop(l))) { - int err = leader_init(&pc->psocket); - if (!err) { - err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp); - } else { - listener_error(l, err, "accepting from"); - pconnection_error(pc, err, "accepting from"); - } - psocket_start(&pc->psocket); + case L_UNINIT: + l->state = L_LISTENING; + leader_listen_lh(l); + break; + + case L_LISTENING: + break; + + case L_CLOSE: /* Close requested, start closing lsockets */ + l->state = L_CLOSING; + for (size_t i = 0; i < l->nsockets; ++i) { + uv_safe_close((uv_handle_t*)&l->sockets[i].tcp, on_close_lsocket); + } + /* NOTE: Fall through in case we have 0 sockets - e.g. resolver error */ + + case L_CLOSING: /* Closing - can we send PN_LISTENER_CLOSE? */ + if (l->nsockets == 0) { + l->state = L_CLOSED; + pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE); + } + break; + + case L_CLOSED: /* Closed, has LISTENER_CLOSE has been processed? */ + if (!pn_collector_peek(l->collector)) { + leader_dec(l->work.proactor); + closed = true; + } + } + /* Process accepted connections - if we are closed they will get an error */ + for (pconnection_t *pc = pconnection_pop(&l->accept); pc; pc = pconnection_pop(&l->accept)) { + int err = pconnection_init(pc); + if (!err) { + err = uv_accept((uv_stream_t*)&pc->lsocket->tcp, (uv_stream_t*)&pc->tcp); + } else { + listener_error(l, err, "accepting from"); + pconnection_error(pc, err, "accepting from"); } + work_start(&pc->work); /* Process events for the accepted/failed connection */ } - return listener_has_work(l); + bool has_work = !closed && pn_collector_peek(l->collector); + uv_mutex_unlock(&l->lock); + + if (closed) { + pn_listener_free(l); + } + return has_work; } /* Generate tick events and return millis till next tick or 0 if no tick is required */ @@ -544,7 +706,7 @@ static pn_millis_t leader_tick(pconnection_t *pc) { static void on_tick(uv_timer_t *timer) { pconnection_t *pc = (pconnection_t*)timer->data; leader_tick(pc); - psocket_notify(&pc->psocket); + work_notify(&pc->work); } static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { @@ -556,7 +718,7 @@ static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { } else { pconnection_error(pc, nread, "on read from"); } - psocket_notify(&pc->psocket); + work_notify(&pc->work); } static void on_write(uv_write_t* write, int err) { @@ -568,7 +730,7 @@ static void on_write(uv_write_t* write, int err) { } else { pn_connection_driver_write_done(&pc->driver, size); } - psocket_notify(&pc->psocket); + work_notify(&pc->work); } static void on_timeout(uv_timer_t *timer) { @@ -592,13 +754,6 @@ static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) return &p->batch; } -static void on_stopping(uv_handle_t* h, void* v) { - /* Mark all sockets with an error, pn_proactor_free will clear the resulting events */ - if (h->type == UV_TCP) { - psocket_error((psocket_t*)h->data, UV_ESHUTDOWN, "proactor free"); - } -} - static pn_event_t *log_event(void* p, pn_event_t *e) { if (e) { pn_logf("[%p]:(%s)", (void*)p, pn_event_type_name(pn_event_type(e))); @@ -620,16 +775,6 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) { return log_event(p, pn_collector_next(p->collector)); } -static void pn_listener_free(pn_listener_t *l) { - if (l) { - if (l->collector) pn_collector_free(l->collector); - if (l->condition) pn_condition_free(l->condition); - if (l->attachments) pn_free(l->attachments); - assert(!l->accept.front); - free(l); - } -} - /* Return the next event batch or NULL if no events are available */ static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) { if (!p->batch_working) { /* Can generate proactor events */ @@ -646,12 +791,15 @@ static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) { return proactor_batch_lh(p, PN_PROACTOR_TIMEOUT); } } - for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) { - assert(ps->working); - if (ps->is_conn) { - return &as_pconnection(ps)->driver.batch; - } else { /* Listener */ - return &as_listener(ps)->batch; + for (work_t *w = work_pop(&p->worker_q); w; w = work_pop(&p->worker_q)) { + assert(w->working); + switch (w->type) { + case T_CONNECTION: + return &((pconnection_t*)w)->driver.batch; + case T_LISTENER: + return &((pn_listener_t*)w)->batch; + default: + break; } } return NULL; @@ -671,18 +819,18 @@ static void check_wake(pconnection_t *pc) { /* Process a pconnection, return true if it has events for a worker thread */ static bool leader_process_pconnection(pconnection_t *pc) { /* Important to do the following steps in order */ + if (!pc->connected) { + return leader_connect(pc); + } if (pc->writing) { /* We can't do anything while a write request is pending */ return false; } - if (pc->psocket.tcp.data == NULL) { - /* Start the connection if not already connected */ - leader_connect(pc); - } else if (pn_connection_driver_finished(&pc->driver)) { + if (pn_connection_driver_finished(&pc->driver)) { uv_mutex_lock(&pc->lock); - pc->wake = W_CLOSED; /* wake() cannot notify anymore */ + pc->wake = W_CLOSED; /* wake() is a no-op from now on */ uv_mutex_unlock(&pc->lock); - uv_safe_close((uv_handle_t*)&pc->psocket.tcp, on_close_psocket); + uv_safe_close((uv_handle_t*)&pc->tcp, on_close_pconnection); } else { /* Check for events that can be generated without blocking for IO */ check_wake(pc); @@ -702,17 +850,17 @@ static bool leader_process_pconnection(pconnection_t *pc) { what = "write"; if (wbuf.size > 0) { uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size); - err = uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write); + err = uv_write(&pc->write, (uv_stream_t*)&pc->tcp, &buf, 1, on_write); if (!err) { pc->writing = wbuf.size; } } else if (pn_connection_driver_write_closed(&pc->driver)) { - uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, NULL); + uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->tcp, NULL); } } if (!err && rbuf.size > 0) { what = "read"; - err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read); + err = uv_read_start((uv_stream_t*)&pc->tcp, alloc_read_buffer, on_read); } if (err) { /* Some IO requests failed, generate the error events */ @@ -725,8 +873,8 @@ static bool leader_process_pconnection(pconnection_t *pc) { /* Detach a connection from the UV loop so it can be used safely by a worker */ void pconnection_detach(pconnection_t *pc) { - if (!pc->writing) { /* Can't detach while a write is pending */ - uv_read_stop((uv_stream_t*)&pc->psocket.tcp); + if (pc->connected && !pc->writing) { /* Can't detach while a write is pending */ + uv_read_stop((uv_stream_t*)&pc->tcp); uv_timer_stop(&pc->timer); } } @@ -734,21 +882,29 @@ void pconnection_detach(pconnection_t *pc) { /* Process the leader_q and the UV loop, in the leader thread */ static pn_event_batch_t *leader_lead_lh(pn_proactor_t *p, uv_run_mode mode) { pn_event_batch_t *batch = NULL; - for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) { - assert(!ps->working); + for (work_t *w = work_pop(&p->leader_q); w; w = work_pop(&p->leader_q)) { + assert(!w->working); uv_mutex_unlock(&p->lock); /* Unlock to process each item, may add more items to leader_q */ - bool has_work = ps->is_conn ? - leader_process_pconnection(as_pconnection(ps)) : - leader_process_listener(as_listener(ps)); + bool has_work = false; + switch (w->type) { + case T_CONNECTION: + has_work = leader_process_pconnection((pconnection_t*)w); + break; + case T_LISTENER: + has_work = leader_process_listener((pn_listener_t*)w); + break; + default: + break; + } uv_mutex_lock(&p->lock); - if (has_work && !ps->working && ps->next == &UNLISTED) { - if (ps->is_conn) { - pconnection_detach(as_pconnection(ps)); + if (has_work && !w->working && w->next == work_unqueued) { + if (w->type == T_CONNECTION) { + pconnection_detach((pconnection_t*)w); } - ps->working = true; - push_lh(&p->worker_q, ps); + w->working = true; + work_push(&p->worker_q, w); } } batch = get_batch_lh(p); /* Check for work */ @@ -805,13 +961,14 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) { } void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { + if (!batch) return; uv_mutex_lock(&p->lock); - psocket_t *ps = batch_psocket(batch); - if (ps) { - assert(ps->working); - assert(ps->next == &UNLISTED); - ps->working = false; - push_lh(&p->leader_q, ps); + work_t *w = batch_work(batch); + if (w) { + assert(w->working); + assert(w->next == work_unqueued); + w->working = false; + work_push(&p->leader_q, w); } pn_proactor_t *bp = batch_proactor(batch); /* Proactor events */ if (bp == p) { @@ -822,7 +979,13 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { } pn_listener_t *pn_event_listener(pn_event_t *e) { - return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL; + if (pn_event_class(e) == pn_listener__class()) { + return (pn_listener_t*)pn_event_context(e); + } else if (pn_event_class(e) == lsocket__class()) { + return ((lsocket_t*)pn_event_context(e))->parent; + } else { + return NULL; + } } pn_proactor_t *pn_event_proactor(pn_event_t *e) { @@ -831,7 +994,7 @@ pn_proactor_t *pn_event_proactor(pn_event_t *e) { } pn_listener_t *l = pn_event_listener(e); if (l) { - return l->psocket.proactor; + return l->work.proactor; } pn_connection_t *c = pn_event_connection(e); if (c) { @@ -856,33 +1019,21 @@ void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) { } int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) { - char *buf = strdup(addr); - if (!buf) { - return PN_OUT_OF_MEMORY; - } - char *scheme, *user, *pass, *host, *port, *path; - pni_parse_url(buf, &scheme, &user, &pass, &host, &port, &path); - pconnection_t *pc = pconnection(p, c, false, host, port); - free(buf); - if (!pc) { + pconnection_t *pc = pconnection(p, c, false); + if (pc) { + parse_addr(&pc->addr, addr); + work_start(&pc->work); + } else { return PN_OUT_OF_MEMORY; } - psocket_start(&pc->psocket); return 0; } int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, int backlog) { - assert(!l->closed); - char *buf = strdup(addr); - if (!buf) { - return PN_OUT_OF_MEMORY; - } - char *scheme, *user, *pass, *host, *port, *path; - pni_parse_url(buf, &scheme, &user, &pass, &host, &port, &path); - psocket_init(&l->psocket, p, false, host, port); - free(buf); + work_init(&l->work, p, T_LISTENER); + parse_addr(&l->addr, addr); l->backlog = backlog; - psocket_start(&l->psocket); + work_start(&l->work); return 0; } @@ -900,26 +1051,41 @@ pn_proactor_t *pn_proactor() { return p; } -void pn_proactor_free(pn_proactor_t *p) { - if (p->count > 0) { - uv_walk(&p->loop, on_stopping, NULL); /* Set errors on all sockets */ - /* Drain all events so sockets can close normally */ - pn_event_t *e = NULL; - do { - pn_event_batch_t *eb = pn_proactor_wait(p); - e = pn_event_batch_next(eb); - while (e && pn_event_type(e) != PN_PROACTOR_INACTIVE) { - e = pn_event_batch_next(eb); - } - pn_proactor_done(p, eb); - } while (pn_event_type(e) != PN_PROACTOR_INACTIVE); +static void on_proactor_free(uv_handle_t* h, void* v) { + uv_safe_close(h, NULL); /* Close the handle */ + if (h->type == UV_TCP) { /* Put the corresponding work item on the leader_q for cleanup */ + work_t *w = NULL; + switch (*(struct_type*)h->data) { + case T_CONNECTION: w = (work_t*)h->data; break; + case T_LSOCKET: w = &((lsocket_t*)h->data)->parent->work; break; + default: break; + } + if (w && w->next == work_unqueued) { + work_push(&w->proactor->leader_q, w); /* Save to be freed after all closed */ + } } - /* Close the the proactor handles */ - uv_timer_stop(&p->timer); - uv_safe_close((uv_handle_t*)&p->timer, NULL); - uv_safe_close((uv_handle_t*)&p->async, NULL); +} + +static void work_free(work_t *w) { + switch (w->type) { + case T_CONNECTION: pconnection_free((pconnection_t*)w); break; + case T_LISTENER: pn_listener_free((pn_listener_t*)w); break; + default: break; + } +} + +void pn_proactor_free(pn_proactor_t *p) { + /* Close all open handles */ + uv_walk(&p->loop, on_proactor_free, NULL); while (uv_loop_alive(&p->loop)) { - uv_run(&p->loop, UV_RUN_NOWAIT); /* Run till all handles closed */ + uv_run(&p->loop, UV_RUN_DEFAULT); /* Finish closing the proactor handles */ + } + /* Free all work items */ + for (work_t *w = work_pop(&p->leader_q); w; w = work_pop(&p->leader_q)) { + work_free(w); + } + for (work_t *w = work_pop(&p->worker_q); w; w = work_pop(&p->worker_q)) { + work_free(w); } uv_loop_close(&p->loop); uv_mutex_destroy(&p->lock); @@ -930,7 +1096,7 @@ void pn_proactor_free(pn_proactor_t *p) { pn_proactor_t *pn_connection_proactor(pn_connection_t* c) { pconnection_t *pc = get_pconnection(c); - return pc ? pc->psocket.proactor : NULL; + return pc ? pc->work.proactor : NULL; } void pn_connection_wake(pn_connection_t* c) { @@ -945,7 +1111,7 @@ void pn_connection_wake(pn_connection_t* c) { } uv_mutex_unlock(&pc->lock); if (notify) { - psocket_notify(&pc->psocket); + work_notify(&pc->work); } } } @@ -968,16 +1134,12 @@ pn_listener_t *pn_listener(void) { void pn_listener_close(pn_listener_t* l) { /* May be called from any thread */ uv_mutex_lock(&l->lock); - if (!l->closed) { - l->closed = true; - pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE); - } + listener_close_lh(l); uv_mutex_unlock(&l->lock); - psocket_notify(&l->psocket); } pn_proactor_t *pn_listener_proactor(pn_listener_t* l) { - return l ? l->psocket.proactor : NULL; + return l ? l->work.proactor : NULL; } pn_condition_t* pn_listener_condition(pn_listener_t* l) { @@ -998,13 +1160,18 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) { int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) { uv_mutex_lock(&l->lock); - assert(!l->closed); - pconnection_t *pc = pconnection(l->psocket.proactor, c, true, l->psocket.host, l->psocket.port); + pconnection_t *pc = pconnection(l->work.proactor, c, true); if (!pc) { return PN_OUT_OF_MEMORY; } - push_lh(&l->accept, &pc->psocket); + /* Get the socket from the accept event that we are processing */ + pn_event_t *e = pn_collector_prev(l->collector); + assert(pn_event_type(e) == PN_LISTENER_ACCEPT); + assert(pn_event_listener(e) == l); + pc->lsocket = (lsocket_t*)pn_event_context(e); + pc->connected = 1; /* Don't need to connect() */ + pconnection_push(&l->accept, pc); uv_mutex_unlock(&l->lock); - psocket_notify(&l->psocket); + work_notify(&l->work); return 0; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce1b3d1f/proton-c/src/tests/proactor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c index 41d889b..38237b0 100644 --- a/proton-c/src/tests/proactor.c +++ b/proton-c/src/tests/proactor.c @@ -32,7 +32,7 @@ static pn_millis_t timeout = 7*1000; /* timeout for hanging tests */ static const char *localhost = "127.0.0.1"; /* host for connect/listen */ -typedef int (*test_handler_fn)(test_t *, pn_event_t*); +typedef pn_event_t *(*test_handler_fn)(test_t *, pn_event_t*); /* Proactor and handler that take part in a test */ typedef struct proactor_test_t { @@ -61,27 +61,50 @@ static void proactor_test_free(proactor_test_t *pts, size_t n) { #define PROACTOR_TEST_FREE(A) proactor_test_free(A, sizeof(A)/sizeof(*A)) -/* Run an array of proactors till a handler returns non-0 */ -static int proactor_test_run(proactor_test_t *pts, size_t n) { - int ret = 0; - while (!ret) { +/* Process events on a proactor array until a handler returns an event, or + * all proactors return NULL + */ +static pn_event_t *proactor_test_get(proactor_test_t *pts, size_t n) { + while (true) { + bool busy = false; for (proactor_test_t *pt = pts; pt < pts + n; ++pt) { - pn_event_batch_t *events = pn_proactor_get(pt->proactor); - if (events) { - pn_event_t *e = pn_event_batch_next(events); - TEST_CHECKF(pts->t, e, "empty batch"); - while (e && !ret) { - if (!(ret = pt->handler(pt->t, e))) - e = pn_event_batch_next(events); - } - pn_proactor_done(pt->proactor, events); + pn_event_batch_t *eb = pn_proactor_get(pt->proactor); + if (eb) { + busy = true; + pn_event_t *ret = NULL; + for (pn_event_t* e = pn_event_batch_next(eb); e; e = pn_event_batch_next(eb)) { + ret = pt->handler(pt->t, e); + if (ret) break; + } + pn_proactor_done(pt->proactor, eb); + if (ret) return ret; } } + if (!busy) { + return NULL; + } } - return ret; } +/* Run an array of proactors till a handler returns an event. */ +static pn_event_t *proactor_test_run(proactor_test_t *pts, size_t n) { + pn_event_t *e; + while ((e = proactor_test_get(pts, n)) == NULL) + ; + return e; +} + + +/* Drain and discard outstanding events from an array of proactors */ +static void proactor_test_drain(proactor_test_t *pts, size_t n) { + while (proactor_test_get(pts, n)) + ; +} + + +#define PROACTOR_TEST_GET(A) proactor_test_get((A), sizeof(A)/sizeof(*A)) #define PROACTOR_TEST_RUN(A) proactor_test_run((A), sizeof(A)/sizeof(*A)) +#define PROACTOR_TEST_DRAIN(A) proactor_test_drain((A), sizeof(A)/sizeof(*A)) /* Wait for the next single event, return its type */ static pn_event_type_t wait_next(pn_proactor_t *proactor) { @@ -104,31 +127,31 @@ static void test_interrupt_timeout(test_t *t) { } /* Common handler for simple client/server interactions, */ -static int common_handler(test_t *t, pn_event_t *e) { +static pn_event_t *common_handler(test_t *t, pn_event_t *e) { pn_connection_t *c = pn_event_connection(e); pn_listener_t *l = pn_event_listener(e); switch (pn_event_type(e)) { /* Stop on these events */ + case PN_LISTENER_CLOSE: case PN_LISTENER_OPEN: + case PN_PROACTOR_INACTIVE: case PN_PROACTOR_TIMEOUT: case PN_TRANSPORT_CLOSED: - case PN_PROACTOR_INACTIVE: - case PN_LISTENER_CLOSE: - return pn_event_type(e); + return e; case PN_LISTENER_ACCEPT: pn_listener_accept(l, pn_connection()); - return 0; + return NULL; case PN_CONNECTION_REMOTE_OPEN: pn_connection_open(c); /* Return the open (no-op if already open) */ - return 0; + return NULL; case PN_CONNECTION_REMOTE_CLOSE: pn_connection_close(c); /* Return the close */ - return 0; + return NULL; /* Ignored these events */ case PN_CONNECTION_INIT: @@ -139,47 +162,54 @@ static int common_handler(test_t *t, pn_event_t *e) { case PN_TRANSPORT_ERROR: case PN_TRANSPORT_HEAD_CLOSED: case PN_TRANSPORT_TAIL_CLOSED: - return 0; + return NULL; default: TEST_ERRORF(t, "unexpected event %s", pn_event_type_name(pn_event_type(e))); - return 0; /* Fail the test but keep going */ + return NULL; /* Fail the test but keep going */ } } /* close a connection when it is remote open */ -static int open_close_handler(test_t *t, pn_event_t *e) { +static pn_event_t *open_close_handler(test_t *t, pn_event_t *e) { switch (pn_event_type(e)) { case PN_CONNECTION_REMOTE_OPEN: pn_connection_close(pn_event_connection(e)); - return 0; /* common_handler will finish on TRANSPORT_CLOSED */ + return NULL; /* common_handler will finish on TRANSPORT_CLOSED */ default: return common_handler(t, e); } } -/* Simple client/server connection with 2 proactors */ +/* Test several client/server connection with 2 proactors */ static void test_client_server(test_t *t) { proactor_test_t pts[] ={ { open_close_handler }, { common_handler } }; PROACTOR_TEST_INIT(pts, t); pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor; test_port_t port = test_port(localhost); pn_proactor_listen(server, pn_listener(), port.host_port, 4); - TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts)); - pn_proactor_connect(client, pn_connection(), port.host_port); - TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); + TEST_EVENT_TYPE(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts)); sock_close(port.sock); + /* Connect and wait for close at both ends */ + pn_proactor_connect(client, pn_connection(), port.host_port); + TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); + TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); + /* Connect and wait for close at both ends */ + pn_proactor_connect(client, pn_connection(), port.host_port); + TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); + TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); + PROACTOR_TEST_FREE(pts); } /* Return on connection open, close and return on wake */ -static int open_wake_handler(test_t *t, pn_event_t *e) { +static pn_event_t *open_wake_handler(test_t *t, pn_event_t *e) { switch (pn_event_type(e)) { case PN_CONNECTION_REMOTE_OPEN: - return pn_event_type(e); + return e; case PN_CONNECTION_WAKE: pn_connection_close(pn_event_connection(e)); - return pn_event_type(e); + return e; default: return common_handler(t, e); } @@ -192,20 +222,21 @@ static void test_connection_wake(test_t *t) { pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor; test_port_t port = test_port(localhost); /* Hold a port */ pn_proactor_listen(server, pn_listener(), port.host_port, 4); - TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts)); + TEST_EVENT_TYPE(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts)); sock_close(port.sock); pn_connection_t *c = pn_connection(); pn_incref(c); /* Keep c alive after proactor frees it */ pn_proactor_connect(client, c, port.host_port); - TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts)); + TEST_EVENT_TYPE(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts)); TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */ pn_connection_wake(c); - TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts)); - TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); - TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); - PROACTOR_TEST_FREE(pts); + TEST_EVENT_TYPE(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts)); + TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); + /* The pn_connection_t is still valid so wake is legal but a no-op */ + pn_connection_wake(c); + PROACTOR_TEST_FREE(pts); /* The pn_connection_t is still valid so wake is legal but a no-op */ pn_connection_wake(c); pn_decref(c); @@ -220,34 +251,26 @@ static void test_inactive(test_t *t) { pn_listener_t *l = pn_listener(); pn_proactor_listen(server, l, port.host_port, 4); - TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts)); + TEST_EVENT_TYPE(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts)); pn_connection_t *c = pn_connection(); pn_proactor_connect(client, c, port.host_port); - TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts)); + TEST_EVENT_TYPE(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts)); pn_connection_wake(c); - TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts)); + TEST_EVENT_TYPE(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts)); /* expect TRANSPORT_CLOSED from client and server, INACTIVE from client */ - TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); - TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); - TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts)); + TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); + TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); + TEST_EVENT_TYPE(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts)); /* server won't be INACTIVE until listener is closed */ TEST_CHECK(t, pn_proactor_get(server) == NULL); pn_listener_close(l); - TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts)); - TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts)); + TEST_EVENT_TYPE(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts)); + TEST_EVENT_TYPE(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts)); sock_close(port.sock); PROACTOR_TEST_FREE(pts); } -#define TEST_CHECK_ERROR(T, WANT, COND) do { \ - TEST_CHECKF((T), pn_condition_is_set(COND), "expecting error"); \ - const char* description = pn_condition_get_description(COND); \ - if (!strstr(description, (WANT))) { \ - TEST_ERRORF((T), "bad error, expected '%s' in '%s'", (WANT), description); \ - } \ - } while(0) - /* Tests for error handling */ static void test_errors(test_t *t) { proactor_test_t pts[] = { { open_wake_handler }, { common_handler } }; @@ -258,29 +281,121 @@ static void test_errors(test_t *t) { /* Invalid connect/listen parameters */ pn_connection_t *c = pn_connection(); pn_proactor_connect(client, c, "127.0.0.1:xxx"); - TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); - TEST_CHECK_ERROR(t, "xxx", pn_transport_condition(pn_connection_transport(c))); - TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts)); + TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); + TEST_CHECK_COND(t, "xxx", pn_transport_condition(pn_connection_transport(c))); + TEST_EVENT_TYPE(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts)); pn_listener_t *l = pn_listener(); pn_proactor_listen(server, l, "127.0.0.1:xxx", 1); - TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts)); - TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts)); - TEST_CHECK_ERROR(t, "xxx", pn_listener_condition(l)); - TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts)); + TEST_EVENT_TYPE(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts)); + TEST_EVENT_TYPE(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts)); + TEST_CHECK_COND(t, "xxx", pn_listener_condition(l)); + TEST_EVENT_TYPE(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts)); /* Connect with no listener */ c = pn_connection(); pn_proactor_connect(client, c, port.host_port); - TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); - TEST_CHECK(t, pn_condition_is_set(pn_transport_condition(pn_connection_transport(c)))); - TEST_CHECK_ERROR(t, "connection refused", pn_transport_condition(pn_connection_transport(c))); - TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts)); + if (TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts))) { + TEST_CHECK_COND(t, "connection refused", pn_transport_condition(pn_connection_transport(c))); + TEST_EVENT_TYPE(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts)); + sock_close(port.sock); + PROACTOR_TEST_FREE(pts); + } +} +static inline const char *event_listener_desc(pn_event_t *e) { + return pn_condition_get_description(pn_listener_condition(pn_event_listener(e))); +} + +/* Test that we can control listen/select on ipv6/v4 and listen on both by default */ +static void test_ipv4_ipv6(test_t *t) { + proactor_test_t pts[] ={ { open_close_handler }, { common_handler } }; + PROACTOR_TEST_INIT(pts, t); + pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor; + + /* Listen on all interfaces for IPv6 only. If this fails, skip IPv6 tests */ + test_port_t port6 = test_port("[::]"); + pn_proactor_listen(server, pn_listener(), port6.host_port, 4); + TEST_EVENT_TYPE(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts)); + sock_close(port6.sock); + pn_event_t *e = PROACTOR_TEST_GET(pts); + bool has_ipv6 = (pn_event_type(e) != PN_LISTENER_CLOSE); + if (!has_ipv6) { + TEST_LOGF(t, "skip IPv6 tests: %s", event_listener_desc(e)); + } + PROACTOR_TEST_DRAIN(pts); + + /* Listen on all interfaces for IPv4 only. */ + test_port_t port4 = test_port("0.0.0.0"); + pn_proactor_listen(server, pn_listener(), port4.host_port, 4); + TEST_EVENT_TYPE(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts)); + sock_close(port4.sock); + e = PROACTOR_TEST_GET(pts); + if (pn_event_type(e) == PN_LISTENER_CLOSE) { + TEST_ERRORF(t, "listener error: %s", event_listener_desc(e)); + } + PROACTOR_TEST_DRAIN(pts); + + /* Empty address listens on both IPv4 and IPv6 on all interfaces */ + test_port_t port = test_port(""); + pn_proactor_listen(server, pn_listener(), port.host_port, 4); + TEST_EVENT_TYPE(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts)); sock_close(port.sock); + e = PROACTOR_TEST_GET(pts); + if (pn_event_type(e) == PN_LISTENER_CLOSE) { + TEST_ERRORF(t, "listener error: %s", event_listener_desc(e)); + } + PROACTOR_TEST_DRAIN(pts); + +#define EXPECT_CONNECT(TP, HOST) do { \ + pn_proactor_connect(client, pn_connection(), test_port_use_host(&(TP), (HOST))); \ + pn_event_t *e = TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); \ + if (e) TEST_CHECK_NO_COND(t, pn_transport_condition(pn_event_transport(e))); \ + PROACTOR_TEST_DRAIN(pts); \ + } while(0) + +#define EXPECT_FAIL(TP, HOST) do { \ + pn_proactor_connect(client, pn_connection(), test_port_use_host(&(TP), (HOST))); \ + pn_event_t *e = TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); \ + if (e) TEST_CHECK_COND(t, "refused", pn_transport_condition(pn_event_transport(e))); \ + PROACTOR_TEST_DRAIN(pts); \ + } while(0) + + EXPECT_CONNECT(port4, "127.0.0.1"); /* v4->v4 */ + EXPECT_CONNECT(port4, ""); /* local->v4*/ + + EXPECT_CONNECT(port, "127.0.0.1"); /* v4->all */ + EXPECT_CONNECT(port, ""); /* local->all */ + + if (has_ipv6) { + EXPECT_CONNECT(port6, "[::]"); /* v6->v6 */ + EXPECT_CONNECT(port6, ""); /* local->v6 */ + EXPECT_CONNECT(port, "[::1]"); /* v6->all */ + + EXPECT_FAIL(port6, "127.0.0.1"); /* fail v4->v6 */ + EXPECT_FAIL(port4, "[::1]"); /* fail v6->v4 */ + } + + PROACTOR_TEST_FREE(pts); +} + +/* Make sure pn_proactor_free cleans up open sockets */ +static void test_free_cleanup(test_t *t) { + proactor_test_t pts[] = { { open_wake_handler }, { common_handler } }; + PROACTOR_TEST_INIT(pts, t); + pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor; + test_port_t ports[3] = { test_port(localhost), test_port(localhost), test_port(localhost) }; + for (int i = 0; i < 3; ++i) { + pn_proactor_listen(server, pn_listener(), ports[i].host_port, 2); + TEST_EVENT_TYPE(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts)); + sock_close(ports[i].sock); + pn_proactor_connect(client, pn_connection(), ports[i].host_port); + pn_proactor_connect(client, pn_connection(), ports[i].host_port); + } PROACTOR_TEST_FREE(pts); } + int main(int argc, char **argv) { int failed = 0; RUN_ARGV_TEST(failed, t, test_inactive(&t)); @@ -288,5 +403,7 @@ int main(int argc, char **argv) { RUN_ARGV_TEST(failed, t, test_errors(&t)); RUN_ARGV_TEST(failed, t, test_client_server(&t)); RUN_ARGV_TEST(failed, t, test_connection_wake(&t)); + RUN_ARGV_TEST(failed, t, test_ipv4_ipv6(&t)); + RUN_ARGV_TEST(failed, t, test_free_cleanup(&t)); return failed; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce1b3d1f/proton-c/src/tests/test_tools.h ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h index f70a327..26f3b35 100644 --- a/proton-c/src/tests/test_tools.h +++ b/proton-c/src/tests/test_tools.h @@ -38,28 +38,49 @@ typedef struct test_t { } test_t; /* Internal, use macros. Print error message and increase the t->errors count. - All output from test marcros goes to stdout not stderr, error messages are normal for a test. + All output from test marcros goes to stderr so it interleaves with PN_TRACE logs. */ + static void test_vlogf_(test_t *t, const char *prefix, const char* expr, const char* file, int line, const char *fmt, va_list ap) { - printf("%s:%d", file, line); - if (prefix && *prefix) printf(": %s", prefix); - if (expr && *expr) printf(": %s", expr); + fprintf(stderr, "%s:%d", file, line); + if (prefix && *prefix) fprintf(stderr, ": %s", prefix); + if (expr && *expr) fprintf(stderr, ": %s", expr); if (fmt && *fmt) { - printf(": "); - vprintf(fmt, ap); + fprintf(stderr, ": "); + vfprintf(stderr, fmt, ap); } - if (t) printf(" [%s]", t->name); - printf("\n"); + if (t) fprintf(stderr, " [%s]", t->name); + fprintf(stderr, "\n"); fflush(stdout); } -static void test_errorf_(test_t *t, const char *prefix, const char* expr, +static void test_errorf_(test_t *t, const char* expr, const char* file, int line, const char *fmt, ...) { + ++t->errors; + va_list ap; + va_start(ap, fmt); + test_vlogf_(t, "error", expr, file, line, fmt, ap); + va_end(ap); +} + +static bool test_check_(test_t *t, bool expr, const char *sexpr, + const char *file, int line, const char* fmt, ...) { + if (!expr) { + ++t->errors; + va_list ap; + va_start(ap, fmt); + test_vlogf_(t, "error: check failed", sexpr, file, line, fmt, ap); + va_end(ap); + } + return expr; +} + +static void test_logf_(test_t *t, const char *prefix, const char* expr, + const char* file, int line, const char *fmt, ...) { va_list ap; va_start(ap, fmt); - ++t->errors; test_vlogf_(t, prefix, expr, file, line, fmt, ap); va_end(ap); } @@ -87,25 +108,13 @@ static void assert_fail_(const char* expr, const char* file, int line, const cha TEST_ASSERTF((expr), "%s", strerror(err)) -/* Internal, use macros */ -static inline bool test_check_(test_t *t, bool expr, const char *sexpr, const char *file, int line, const char* fmt, ...) { - if (!expr) { - ++t->errors; - va_list ap; - va_start(ap, fmt); - test_vlogf_(t, "check failed", sexpr, file, line, fmt, ap); - va_end(ap); - } - return expr; -} - /* Print a message but don't mark the test as having an error */ #define TEST_LOGF(TEST, ...) \ test_logf_((TEST), "info", NULL, __FILE__, __LINE__, __VA_ARGS__) /* Print an error with printf-style message, increment TEST->errors */ #define TEST_ERRORF(TEST, ...) \ - test_errorf_((TEST), "error", NULL, __FILE__, __LINE__, __VA_ARGS__) + test_errorf_((TEST), NULL, __FILE__, __LINE__, __VA_ARGS__) /* If EXPR is false, print and record an error for t */ #define TEST_CHECKF(TEST, EXPR, ...) \ @@ -121,6 +130,23 @@ static inline bool test_etype_equal_(test_t *t, int want, int got, const char *f pn_event_type_name((pn_event_type_t)got)); } +#define TEST_CHECK_COND(T, WANT, COND) do { \ + pn_condition_t *cond = (COND); \ + if (TEST_CHECKF((T), pn_condition_is_set(cond), "expecting error")) { \ + const char* description = pn_condition_get_description(cond); \ + if (!strstr(description, (WANT))) { \ + TEST_ERRORF((T), "expected '%s' in '%s'", (WANT), description); \ + } \ + } \ + } while(0) + +#define TEST_CHECK_NO_COND(T, COND) do { \ + pn_condition_t *cond = (COND); \ + if (cond && pn_condition_is_set(cond)) { \ + TEST_ERRORF((T), "unexpected condition: %s", pn_condition_get_description(cond)); \ + } \ + } while(0) + #define TEST_ETYPE_EQUAL(TEST, WANT, GOT) \ test_etype_equal_((TEST), (WANT), (GOT), __FILE__, __LINE__) @@ -131,7 +157,7 @@ static inline pn_event_t *test_event_type_(test_t *t, pn_event_type_t want, pn_e if (want != pn_event_type(got)) { pn_condition_t *cond = pn_event_condition(got); if (cond && pn_condition_is_set(cond)) { - test_errorf_(t, NULL, NULL, file, line, "condition: %s:%s", + test_errorf_(t, NULL, file, line, "condition: %s:%s", pn_condition_get_name(cond), pn_condition_get_description(cond)); } return NULL; @@ -146,12 +172,12 @@ static inline pn_event_t *test_event_type_(test_t *t, pn_event_type_t want, pn_e FAILED is incremented if the test has errors */ #define RUN_TEST(FAILED, T, EXPR) do { \ - printf("TEST: %s\n", #EXPR); \ + fprintf(stderr, "TEST: %s\n", #EXPR); \ fflush(stdout); \ test_t T = { #EXPR, 0 }; \ (EXPR); \ if (T.errors) { \ - printf("FAIL: %s (%d errors)\n", #EXPR, T.errors); \ + fprintf(stderr, "FAIL: %s (%d errors)\n", #EXPR, T.errors); \ ++(FAILED); \ } \ } while(0) @@ -226,13 +252,19 @@ typedef struct test_port_t { char host_port[256]; /* host:port string */ } test_port_t; +/* Modifies tp->host_port to use host, returns the new tp->host_port */ +static const char *test_port_use_host(test_port_t *tp, const char *host) { + snprintf(tp->host_port, sizeof(tp->host_port), "%s:%d", host, tp->port); + return tp->host_port; +} + /* Create a test_port_t */ static inline test_port_t test_port(const char* host) { test_port_t tp = {0}; tp.sock = sock_bind0(); tp.port = sock_port(tp.sock); snprintf(tp.str, sizeof(tp.str), "%d", tp.port); - snprintf(tp.host_port, sizeof(tp.host_port), "%s:%d", host, tp.port); + test_port_use_host(&tp, host); return tp; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
