Repository: qpid-proton Updated Branches: refs/heads/master 223e6d012 -> 4dfe29692
PROTON-1706: pn_listener_t provide access to actual listening port pn_netaddr_listening() provides list of pn_netaddr_t addresses for a listener. Implemented in all proactors: epoll, libuv, iocp Updated C tests to listen on port 0 for safe, portable dynamic port allocation. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/7bc25c61 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/7bc25c61 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/7bc25c61 Branch: refs/heads/master Commit: 7bc25c6117350f55572c0b628c9f9fd29d68c7d9 Parents: 223e6d0 Author: Alan Conway <[email protected]> Authored: Thu Jan 4 10:28:36 2018 -0500 Committer: Alan Conway <[email protected]> Committed: Thu Jan 4 12:51:24 2018 -0500 ---------------------------------------------------------------------- examples/c/broker.c | 13 +- examples/c/direct.c | 13 +- examples/c/example_test.py | 59 ++++----- proton-c/CMakeLists.txt | 1 + proton-c/include/proton/netaddr.h | 33 ++++- proton-c/src/proactor/epoll.c | 49 +++---- proton-c/src/proactor/libuv.c | 51 ++++---- proton-c/src/proactor/netaddr-internal.h | 96 ++++++++++++++ proton-c/src/proactor/proactor-internal.c | 15 ++- proton-c/src/proactor/proactor-internal.h | 6 +- proton-c/src/proactor/win_iocp.c | 92 +++----------- proton-c/src/tests/fdlimit.py | 12 +- proton-c/src/tests/proactor.c | 169 +++++++++++++------------ proton-c/src/tests/test_port.h | 142 --------------------- proton-c/src/tests/test_tools.h | 2 +- 15 files changed, 341 insertions(+), 412 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/examples/c/broker.c ---------------------------------------------------------------------- diff --git a/examples/c/broker.c b/examples/c/broker.c index c2efed8..dee8526 100644 --- a/examples/c/broker.c +++ b/examples/c/broker.c @@ -21,6 +21,7 @@ #include <proton/engine.h> #include <proton/listener.h> +#include <proton/netaddr.h> #include <proton/proactor.h> #include <proton/sasl.h> #include <proton/ssl.h> @@ -285,11 +286,13 @@ static void handle(broker_t* b, pn_event_t* e) { switch (pn_event_type(e)) { - case PN_LISTENER_OPEN: - printf("listening\n"); - fflush(stdout); - break; - + case PN_LISTENER_OPEN: { + char port[256]; /* Get the listening port */ + pn_netaddr_host_port(pn_netaddr_listening(pn_event_listener(e)), NULL, 0, port, sizeof(port)); + printf("listening on %s\n", port); + fflush(stdout); + break; + } case PN_LISTENER_ACCEPT: { /* Configure a transport to allow SSL and SASL connections. See ssl_domain setup in main() */ pn_transport_t *t = pn_transport(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/examples/c/direct.c ---------------------------------------------------------------------- diff --git a/examples/c/direct.c b/examples/c/direct.c index 3313ab2..1943dcc 100644 --- a/examples/c/direct.c +++ b/examples/c/direct.c @@ -24,6 +24,7 @@ #include <proton/delivery.h> #include <proton/link.h> #include <proton/listener.h> +#include <proton/netaddr.h> #include <proton/message.h> #include <proton/proactor.h> #include <proton/sasl.h> @@ -228,11 +229,13 @@ static void handle_send(app_data_t* app, pn_event_t* event) { static bool handle(app_data_t* app, pn_event_t* event) { switch (pn_event_type(event)) { - case PN_LISTENER_OPEN: - printf("listening\n"); - fflush(stdout); - break; - + case PN_LISTENER_OPEN: { + char port[256]; /* Get the listening port */ + pn_netaddr_host_port(pn_netaddr_listening(pn_event_listener(event)), NULL, 0, port, sizeof(port)); + printf("listening on %s\n", port); + fflush(stdout); + break; + } case PN_LISTENER_ACCEPT: pn_listener_accept2(pn_event_listener(event), NULL, NULL); break; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/examples/c/example_test.py ---------------------------------------------------------------------- diff --git a/examples/c/example_test.py b/examples/c/example_test.py index 1e25c76..b6a5a4a 100644 --- a/examples/c/example_test.py +++ b/examples/c/example_test.py @@ -35,18 +35,18 @@ def receive_expect(n=MESSAGES): return receive_expect_messages(n)+receive_expect def send_expect(n=MESSAGES): return "%s messages sent and acknowledged\n" % n def send_abort_expect(n=MESSAGES): return "%s messages started and aborted\n" % n +def wait_listening(proc): + m = proc.wait_re("listening on ([0-9]+)$") + return m.group(1), m.group(0)+"\n" # Return (port, line) + class Broker(object): def __init__(self, test): self.test = test def __enter__(self): - with TestPort() as tp: - self.port = tp.port - self.host = tp.host - self.addr = tp.addr - self.proc = self.test.proc(["broker", "", self.port]) - self.proc.wait_re("listening") - return self + self.proc = self.test.proc(["broker", "", "0"]) + self.port, _ = wait_listening(self.proc) + return self def __exit__(self, *args): b = getattr(self, "proc") @@ -75,19 +75,17 @@ class CExampleTest(ProcTestCase): def test_send_direct(self): """Send to direct server""" - with TestPort() as tp: - d = self.proc(["direct", "", tp.port]) - d.wait_re("listening") - self.assertEqual(send_expect(), self.runex("send", tp.port)) - self.assertMultiLineEqual("listening\n"+receive_expect(), d.wait_exit()) + d = self.proc(["direct", "", "0"]) + port, line = wait_listening(d) + self.assertEqual(send_expect(), self.runex("send", port)) + self.assertMultiLineEqual(line+receive_expect(), d.wait_exit()) def test_receive_direct(self): """Receive from direct server""" - with TestPort() as tp: - d = self.proc(["direct", "", tp.port]) - d.wait_re("listening") - self.assertMultiLineEqual(receive_expect(), self.runex("receive", tp.port)) - self.assertEqual("listening\n10 messages sent and acknowledged\n", d.wait_exit()) + d = self.proc(["direct", "", "0"]) + port, line = wait_listening(d) + self.assertMultiLineEqual(receive_expect(), self.runex("receive", port)) + self.assertEqual(line+"10 messages sent and acknowledged\n", d.wait_exit()) def test_send_abort_broker(self): """Sending aborted messages to a broker""" @@ -101,20 +99,19 @@ class CExampleTest(ProcTestCase): def test_send_abort_direct(self): """Send aborted messages to the direct server""" - with TestPort() as tp: - d = self.proc(["direct", "", tp.port, "examples", "20"]) - expect = "listening\n" - d.wait_re(expect) - self.assertEqual(send_expect(), self.runex("send", tp.port)) - expect += receive_expect_messages() - d.wait_re(expect) - self.assertEqual(send_abort_expect(), self.runex("send-abort", tp.port)) - expect += "Message aborted\n"*MESSAGES - d.wait_re(expect) - self.assertEqual(send_expect(), self.runex("send", tp.port)) - expect += receive_expect_messages()+receive_expect_total(20) - self.maxDiff = None - self.assertMultiLineEqual(expect, d.wait_exit()) + d = self.proc(["direct", "", "0", "examples", "20"]) + port, line = wait_listening(d) + expect = line + self.assertEqual(send_expect(), self.runex("send", port)) + expect += receive_expect_messages() + d.wait_re(expect) + self.assertEqual(send_abort_expect(), self.runex("send-abort", port)) + expect += "Message aborted\n"*MESSAGES + d.wait_re(expect) + self.assertEqual(send_expect(), self.runex("send", port)) + expect += receive_expect_messages()+receive_expect_total(20) + self.maxDiff = None + self.assertMultiLineEqual(expect, d.wait_exit()) def test_send_ssl_receive(self): """Send first then receive""" http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/proton-c/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt index 590b6a6..0fd5864 100644 --- a/proton-c/CMakeLists.txt +++ b/proton-c/CMakeLists.txt @@ -440,6 +440,7 @@ set (qpid-proton-private-includes src/reactor/selectable.h src/platform/platform.h src/platform/platform_fmt.h + src/proactor/netaddr-internal.h src/proactor/proactor-internal.h ) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/proton-c/include/proton/netaddr.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/netaddr.h b/proton-c/include/proton/netaddr.h index 7f21d93..47f1ac0 100644 --- a/proton-c/include/proton/netaddr.h +++ b/proton-c/include/proton/netaddr.h @@ -51,14 +51,39 @@ PNP_EXTERN int pn_netaddr_str(const pn_netaddr_t *addr, char *buf, size_t size); /** * Get the local address of a transport. Return `NULL` if not available. + * Pointer is invalid after the transport closes (PN_TRANSPORT_CLOSED event is handled) */ PNP_EXTERN const pn_netaddr_t *pn_netaddr_local(pn_transport_t *t); /** - * Get the remote address of a transport. Return NULL if not available. + * Get the local address of a transport. Return `NULL` if not available. + * Pointer is invalid after the transport closes (PN_TRANSPORT_CLOSED event is handled) */ PNP_EXTERN const pn_netaddr_t *pn_netaddr_remote(pn_transport_t *t); +/** + * Get the listening addresses of a listener. + * + * A listener can have more than one address for several reasons: + * - DNS host records may indicate more than one address + * - On a multi-homed host, listening on the default host "" will listen on all local addresses. + * - Some IPv4/IPV6 configurations may expand a single address into a v4/v6 pair. + * + * pn_netaddr_next() will iterate over all the addresses in the list. + * + * @param l points to the listener + * @return The first listening address or NULL if there are no addresses are available. + * Use pn_netaddr_next() to iterate over the list. + * Pointer is invalid after the listener closes (PN_LISTENER_CLOSED event is handled) + */ +PNP_EXTERN const pn_netaddr_t *pn_netaddr_listening(pn_listener_t *l); + +/** + * @return Pointer to the next address in a list of addresses, NULL if at the end of the list or + * if this address is not part of a list. + */ +PNP_EXTERN const pn_netaddr_t *pn_netaddr_next(const pn_netaddr_t *na); + struct sockaddr; /** @@ -74,6 +99,12 @@ PNP_EXTERN const struct sockaddr *pn_netaddr_sockaddr(const pn_netaddr_t *na); PNP_EXTERN size_t pn_netaddr_socklen(const pn_netaddr_t *na); /** + * Get the host and port name from na as separate strings. + * Returns 0 if successful, non-0 on error. + */ +PNP_EXTERN int pn_netaddr_host_port(const pn_netaddr_t* na, char *host, size_t hlen, char *port, size_t plen); + +/** * @} */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/proton-c/src/proactor/epoll.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c index 8f6dacb..3d507ff 100644 --- a/proton-c/src/proactor/epoll.c +++ b/proton-c/src/proactor/epoll.c @@ -32,7 +32,6 @@ #include <proton/condition.h> #include <proton/connection_driver.h> #include <proton/engine.h> -#include <proton/netaddr.h> #include <proton/object.h> #include <proton/proactor.h> #include <proton/transport.h> @@ -57,6 +56,8 @@ #include <limits.h> #include <time.h> +#include "./netaddr-internal.h" /* Include after socket/inet headers */ + // TODO: replace timerfd per connection with global lightweight timer mechanism. // logging in general // SIGPIPE? @@ -507,10 +508,6 @@ static void psocket_init(psocket_t* ps, pn_proactor_t* p, pn_listener_t *listene pni_parse_addr(addr, ps->addr_buf, sizeof(ps->addr_buf), &ps->host, &ps->port); } -struct pn_netaddr_t { - struct sockaddr_storage ss; -}; - typedef struct pconnection_t { psocket_t psocket; pcontext_t context; @@ -553,6 +550,7 @@ struct acceptor_t{ bool armed; bool overflowed; acceptor_t *next; /* next listener list member */ + struct pn_netaddr_t addr; /* listening address */ }; struct pn_listener_t { @@ -1435,8 +1433,10 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in l->acceptors = (acceptor_t*)calloc(len, sizeof(acceptor_t)); assert(l->acceptors); /* TODO aconway 2017-05-05: memory safety */ l->acceptors_size = 0; + uint16_t dynamic_port = 0; /* Record dynamic port from first bind(0) */ /* Find working listen addresses */ for (struct addrinfo *ai = addrinfo; ai; ai = ai->ai_next) { + if (dynamic_port) set_port(ai->ai_addr, dynamic_port); int fd = socket(ai->ai_family, SOCK_STREAM, ai->ai_protocol); static int on = 1; if (fd >= 0) { @@ -1448,6 +1448,15 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in !listen(fd, backlog)) { acceptor_t *acceptor = &l->acceptors[l->acceptors_size++]; + /* Get actual address */ + socklen_t len = pn_netaddr_socklen(&acceptor->addr); + (void)getsockname(fd, (struct sockaddr*)(&acceptor->addr.ss), &len); + if (acceptor == l->acceptors) { /* First acceptor, check for dynamic port */ + dynamic_port = check_dynamic_port(ai->ai_addr, pn_netaddr_sockaddr(&acceptor->addr)); + } else { /* Link addr to previous addr */ + (acceptor-1)->addr.next = &acceptor->addr; + } + acceptor->accepted_fd = -1; psocket_t *ps = &acceptor->psocket; psocket_init(ps, p, l, addr); @@ -2195,14 +2204,6 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) { wake_notify(&p->context); } -const struct sockaddr *pn_netaddr_sockaddr(const pn_netaddr_t *na) { - return (struct sockaddr*)na; -} - -size_t pn_netaddr_socklen(const pn_netaddr_t *na) { - return sizeof(struct sockaddr_storage); -} - const pn_netaddr_t *pn_netaddr_local(pn_transport_t *t) { pconnection_t *pc = get_pconnection(pn_transport_connection(t)); return pc? &pc->local : NULL; @@ -2213,26 +2214,8 @@ const pn_netaddr_t *pn_netaddr_remote(pn_transport_t *t) { return pc ? &pc->remote : NULL; } -#ifndef NI_MAXHOST -# define NI_MAXHOST 1025 -#endif - -#ifndef NI_MAXSERV -# define NI_MAXSERV 32 -#endif - -int pn_netaddr_str(const pn_netaddr_t* na, char *buf, size_t len) { - char host[NI_MAXHOST]; - char port[NI_MAXSERV]; - int err = getnameinfo((struct sockaddr *)&na->ss, sizeof(na->ss), - host, sizeof(host), port, sizeof(port), - NI_NUMERICHOST | NI_NUMERICSERV); - if (!err) { - return snprintf(buf, len, "%s:%s", host, port); - } else { - if (buf) *buf = '\0'; - return 0; - } +const pn_netaddr_t *pn_netaddr_listening(pn_listener_t *l) { + return l->acceptors_size > 0 ? &l->acceptors[0].addr : NULL; } pn_millis_t pn_proactor_now(void) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/proton-c/src/proactor/libuv.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c index 7be83fe..edbe214 100644 --- a/proton-c/src/proactor/libuv.c +++ b/proton-c/src/proactor/libuv.c @@ -32,12 +32,12 @@ #include <proton/engine.h> #include <proton/listener.h> #include <proton/message.h> -#include <proton/netaddr.h> #include <proton/object.h> #include <proton/proactor.h> #include <proton/transport.h> #include <uv.h> +#include "netaddr-internal.h" /* Include after socket headers via uv.h */ /* All asserts are cheap and should remain in a release build for debuggability */ #undef NDEBUG @@ -157,10 +157,6 @@ PN_STRUCT_CLASSDEF(lsocket, CID_pn_listener_socket) typedef enum { W_NONE, W_PENDING, W_CLOSED } wake_state; -struct pn_netaddr_t { - struct sockaddr_storage ss; -}; - /* An incoming or outgoing connection. */ typedef struct pconnection_t { work_t work; /* Must be first to allow casting */ @@ -212,6 +208,11 @@ struct pn_listener_t { /* Only used by leader */ addr_t addr; lsocket_t *lsockets; + int dynamic_port; /* Record dynamic port from first bind(0) */ + + /* Invariant listening addresses allocated during leader_listen_lh() */ + struct pn_netaddr_t *addrs; + int addrs_len; /* Locked for thread-safe access. uv_listen can't be stopped or cancelled so we can't * detach a listener from the UV loop to prevent concurrent access. @@ -602,10 +603,20 @@ static int lsocket(pn_listener_t *l, struct addrinfo *ai) { if (err) { free(ls); /* Will never be closed */ } else { + if (l->dynamic_port) set_port(ai->ai_addr, l->dynamic_port); int flags = (ai->ai_family == AF_INET6) ? UV_TCP_IPV6ONLY : 0; err = uv_tcp_bind(&ls->tcp, ai->ai_addr, flags); if (!err) err = uv_listen((uv_stream_t*)&ls->tcp, l->backlog, on_connection); if (!err) { + /* Get actual listening address */ + pn_netaddr_t *na = &l->addrs[l->addrs_len++]; + int len = sizeof(na->ss); + uv_tcp_getsockname(&ls->tcp, (struct sockaddr*)(&na->ss), &len); + if (na == l->addrs) { /* First socket, check for dynamic port bind */ + l->dynamic_port = check_dynamic_port(ai->ai_addr, pn_netaddr_sockaddr(na)); + } else { + (na-1)->next = na; /* Link into list */ + } /* Add to l->lsockets list */ ls->parent = l; ls->next = l->lsockets; @@ -624,6 +635,13 @@ static void leader_listen_lh(pn_listener_t *l) { add_active(l->work.proactor); int err = leader_resolve(l->work.proactor, &l->addr, true); if (!err) { + /* Allocate enough space for the pn_netaddr_t addresses */ + size_t len = 0; + for (struct addrinfo *ai = l->addr.getaddrinfo.addrinfo; ai; ai = ai->ai_next) { + ++len; + } + l->addrs = (pn_netaddr_t*)calloc(len, sizeof(lsocket_t)); + /* Find the working addresses */ for (struct addrinfo *ai = l->addr.getaddrinfo.addrinfo; ai; ai = ai->ai_next) { int err2 = lsocket(l, ai); @@ -646,6 +664,7 @@ static void leader_listen_lh(pn_listener_t *l) { void pn_listener_free(pn_listener_t *l) { if (l) { + if (l->addrs) free(l->addrs); if (l->addr.getaddrinfo.addrinfo) { /* Interrupted after resolve */ uv_freeaddrinfo(l->addr.getaddrinfo.addrinfo); } @@ -1285,14 +1304,6 @@ void pn_listener_accept2(pn_listener_t *l, pn_connection_t *c, pn_transport_t *t work_notify(&l->work); } -const struct sockaddr *pn_netaddr_sockaddr(const pn_netaddr_t *na) { - return (struct sockaddr*)na; -} - -size_t pn_netaddr_socklen(const pn_netaddr_t *na) { - return sizeof(struct sockaddr_storage); -} - const pn_netaddr_t *pn_netaddr_local(pn_transport_t *t) { pconnection_t *pc = get_pconnection(pn_transport_connection(t)); return pc? &pc->local : NULL; @@ -1303,18 +1314,8 @@ const pn_netaddr_t *pn_netaddr_remote(pn_transport_t *t) { return pc ? &pc->remote : NULL; } -int pn_netaddr_str(const pn_netaddr_t* na, char *buf, size_t len) { - char host[NI_MAXHOST]; - char port[NI_MAXSERV]; - int err = getnameinfo((struct sockaddr *)&na->ss, sizeof(na->ss), - host, sizeof(host), port, sizeof(port), - NI_NUMERICHOST | NI_NUMERICSERV); - if (!err) { - return snprintf(buf, len, "%s:%s", host, port); - } else { - if (buf) *buf = '\0'; - return 0; - } +const pn_netaddr_t *pn_netaddr_listening(pn_listener_t *l) { + return l->addrs ? &l->addrs[0] : NULL; } pn_millis_t pn_proactor_now(void) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/proton-c/src/proactor/netaddr-internal.h ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/netaddr-internal.h b/proton-c/src/proactor/netaddr-internal.h new file mode 100644 index 0000000..16e406c --- /dev/null +++ b/proton-c/src/proactor/netaddr-internal.h @@ -0,0 +1,96 @@ +#ifndef PROACTOR_NETADDR_INTERNAL_H +#define PROACTOR_NETADDR_INTERNAL_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <proton/netaddr.h> + +/* Common code for proactors that use the POSIX/Winsock sockaddr library for socket addresses. */ + +struct pn_netaddr_t { + struct sockaddr_storage ss; + pn_netaddr_t *next; +}; + +const struct sockaddr *pn_netaddr_sockaddr(const pn_netaddr_t *na) { + return na ? (struct sockaddr*)&na->ss : NULL; +} + +size_t pn_netaddr_socklen(const pn_netaddr_t *na) { + return sizeof(na->ss); +} + +const pn_netaddr_t *pn_netaddr_next(const pn_netaddr_t *na) { + return na ? na->next : NULL; +} + +#ifndef NI_MAXHOST +# define NI_MAXHOST 1025 +#endif + +#ifndef NI_MAXSERV +# define NI_MAXSERV 32 +#endif + +int pn_netaddr_host_port(const pn_netaddr_t* na, char *host, size_t hlen, char *port, size_t plen) { + return getnameinfo(pn_netaddr_sockaddr(na), pn_netaddr_socklen(na), + host, hlen, port, plen, NI_NUMERICHOST | NI_NUMERICSERV); +} + +int pn_netaddr_str(const pn_netaddr_t* na, char *buf, size_t len) { + char host[NI_MAXHOST]; + char port[NI_MAXSERV]; + int err = pn_netaddr_host_port(na, host, sizeof(host), port, sizeof(port)); + if (!err) { + return pn_proactor_addr(buf, len, host, port); + } else { + if (buf) *buf = '\0'; + return 0; + } +} + +/* Return port or -1 if sa is not a known address type */ +static int get_port(const struct sockaddr *sa) { + switch (sa->sa_family) { + case AF_INET: return ((struct sockaddr_in*)sa)->sin_port; + case AF_INET6: return ((struct sockaddr_in6*)sa)->sin6_port; + default: return -1; + } +} + +/* Set the port in sa or do nothing if it is not a known address type */ +static void set_port(struct sockaddr *sa, uint16_t port) { + switch (sa->sa_family) { + case AF_INET: ((struct sockaddr_in*)sa)->sin_port = port; break; + case AF_INET6: ((struct sockaddr_in6*)sa)->sin6_port = port; break; + default: break; + } +} + +/* If want has port=0 and got has port > 0 then return port of got, else return 0 */ +static uint16_t check_dynamic_port(const struct sockaddr *want, const struct sockaddr *got) { + if (get_port(want) == 0) { + int port = get_port(got); + if (port > 0) return (uint16_t)port; + } + return 0; +} + +#endif /*!PROACTOR_NETADDR_INTERNAL_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/proton-c/src/proactor/proactor-internal.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/proactor-internal.c b/proton-c/src/proactor/proactor-internal.c index f3f834a..6d96fee 100644 --- a/proton-c/src/proactor/proactor-internal.c +++ b/proton-c/src/proactor/proactor-internal.c @@ -37,15 +37,16 @@ static const char *AMQPS_PORT_NAME = "amqps"; const char *PNI_IO_CONDITION = "proton:io"; -#ifndef _WIN32 -/* - * Common implementation for C99-friendly compilers. Windows is - * not and implements its own. - */ int pn_proactor_addr(char *buf, size_t len, const char *host, const char *port) { - return snprintf(buf, len, "%s:%s", host ? host : "", port ? port : ""); + /* Don't use snprintf, Windows is not C99 compliant and snprintf is broken. */ + if (buf && len > 0) { + buf[0] = '\0'; + if (host) strncat(buf, host, len); + strncat(buf, ":", len); + if (port) strncat(buf, port, len); + } + return (host ? strlen(host) : 0) + (port ? strlen(port) : 0) + 1; } -#endif int pni_parse_addr(const char *addr, char *buf, size_t len, const char **host, const char **port) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/proton-c/src/proactor/proactor-internal.h ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/proactor-internal.h b/proton-c/src/proactor/proactor-internal.h index 7cc7363..67c0bf6 100644 --- a/proton-c/src/proactor/proactor-internal.h +++ b/proton-c/src/proactor/proactor-internal.h @@ -1,5 +1,5 @@ -#ifndef PROACTOR_NETADDR_INTERNAL_H -#define PROACTOR_NETADDR_INTERNAL_H +#ifndef PROACTOR_PROACTOR_INTERNAL_H +#define PROACTOR_PROACTOR_INTERNAL_H /* * Licensed to the Apache Software Foundation (ASF) under one @@ -48,4 +48,4 @@ extern const char *PNI_IO_CONDITION; void pni_proactor_set_cond( pn_condition_t *cond, const char *what, const char *host, const char *port, const char *msg); -#endif // PROACTOR_NETADDR_INTERNAL_H +#endif /*!PROACTOR_PROACTOR_INTERNAL_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/proton-c/src/proactor/win_iocp.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/win_iocp.c b/proton-c/src/proactor/win_iocp.c index 0be4b51..71f9b0d 100644 --- a/proton-c/src/proactor/win_iocp.c +++ b/proton-c/src/proactor/win_iocp.c @@ -21,7 +21,6 @@ #include <proton/condition.h> #include <proton/connection_driver.h> -#include <proton/netaddr.h> #include <proton/engine.h> #include <proton/message.h> #include <proton/object.h> @@ -45,6 +44,8 @@ #include <iostream> #include <sstream> +#include "./netaddr-internal.h" /* Include after socket/inet headers */ + /* * Proactor for Windows using IO completion ports. * @@ -1657,8 +1658,9 @@ static void pcontext_finalize(pcontext_t* ctx) { } typedef struct psocket_t { - iocpdesc_t *iocpd; // NULL if reaper, or socket open failure. + iocpdesc_t *iocpd; /* NULL if reaper, or socket open failure. */ pn_listener_t *listener; /* NULL for a connection socket */ + pn_netaddr_t listen_addr; /* Not filled in for connection sockets */ char addr_buf[PN_MAX_ADDR]; const char *host, *port; bool is_reaper; @@ -1695,10 +1697,6 @@ struct pn_proactor_t { bool shutting_down; }; -struct pn_netaddr_t { - struct sockaddr_storage ss; -}; - typedef struct pconnection_t { psocket_t psocket; pcontext_t context; @@ -2786,8 +2784,10 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in l->psockets = (psocket_t*)calloc(len, sizeof(psocket_t)); assert(l->psockets); /* TODO: memory safety */ l->psockets_size = 0; + uint16_t dynamic_port = 0; /* Record dynamic port from first bind(0) */ /* Find working listen addresses */ for (struct addrinfo *ai = addrinfo; ai; ai = ai->ai_next) { + if (dynamic_port) set_port(ai->ai_addr, dynamic_port); // Note fd destructor can clear WSAGetLastError() unique_socket fd(::socket(ai->ai_family, SOCK_STREAM, ai->ai_protocol)); if (fd != INVALID_SOCKET) { @@ -2796,13 +2796,21 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in if (!::listen(fd, backlog)) { iocpdesc_t *iocpd = pni_iocpdesc_create(p->iocp, fd); if (iocpd) { - fd.release(); + pn_socket_t sock = fd.release(); psocket_t *ps = &l->psockets[l->psockets_size++]; psocket_init(ps, l, false, addr); ps->iocpd = iocpd; iocpd->is_mp = true; iocpd->active_completer = ps; pni_iocpdesc_start(ps->iocpd); + /* Get actual address */ + socklen_t len = sizeof(ps->listen_addr.ss); + (void)getsockname(sock, (struct sockaddr*)&ps->listen_addr.ss, &len); + if (ps == l->psockets) { /* First socket, check for dynamic port */ + dynamic_port = check_dynamic_port(ai->ai_addr, pn_netaddr_sockaddr(&ps->listen_addr)); + } else { + (ps-1)->listen_addr.next = &ps->listen_addr; /* Link into list */ + } } } } @@ -3393,17 +3401,6 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) { } } - -static int pni2_snprintf(char *buf, size_t count, const char *fmt, ...); - -const struct sockaddr *pn_netaddr_sockaddr(const pn_netaddr_t *na) { - return (struct sockaddr*)na; -} - -size_t pn_netaddr_socklen(const pn_netaddr_t *na) { - return sizeof(struct sockaddr_storage); -} - const pn_netaddr_t *pn_netaddr_local(pn_transport_t *t) { pconnection_t *pc = get_pconnection(pn_transport_connection(t)); return pc? &pc->local : NULL; @@ -3414,26 +3411,8 @@ const pn_netaddr_t *pn_netaddr_remote(pn_transport_t *t) { return pc ? &pc->remote : NULL; } -#ifndef NI_MAXHOST -# define NI_MAXHOST 1025 -#endif - -#ifndef NI_MAXSERV -# define NI_MAXSERV 32 -#endif - -int pn_netaddr_str(const pn_netaddr_t* na, char *buf, size_t len) { - char host[NI_MAXHOST]; - char port[NI_MAXSERV]; - int err = getnameinfo((struct sockaddr *)&na->ss, sizeof(na->ss), - host, sizeof(host), port, sizeof(port), - NI_NUMERICHOST | NI_NUMERICSERV); - if (!err) { - return pni2_snprintf(buf, len, "%s:%s", host, port); - } else { - if (buf) *buf = '\0'; - return 0; - } +const pn_netaddr_t *pn_netaddr_listening(pn_listener_t *l) { + return l->psockets ? &l->psockets[0].listen_addr : NULL; } pn_millis_t pn_proactor_now(void) { @@ -3445,40 +3424,3 @@ pn_millis_t pn_proactor_now(void) { // Convert to milliseconds and adjust base epoch return t.QuadPart / 10000 - 11644473600000; } - - -// ====================================================================== -// Platform dependent sprintf for pn_proactor_addr() -// ====================================================================== - -#include <stdarg.h> -// [v]snprintf on Windows only matches C99 when no errors or overflow. -static int pni2_vsnprintf(char *buf, size_t count, const char *fmt, va_list ap) { - if (fmt == NULL) - return -1; - if ((buf == NULL) && (count > 0)) - return -1; - if (count > 0) { - int n = vsnprintf_s(buf, count, _TRUNCATE, fmt, ap); - if (n >= 0) // no overflow - return n; // same as C99 - buf[count-1] = '\0'; - } - // separate call to get needed buffer size on overflow - int n = _vscprintf(fmt, ap); - if (n >= (int) count) - return n; - return -1; -} - -static int pni2_snprintf(char *buf, size_t count, const char *fmt, ...) { - va_list ap; - va_start(ap, fmt); - int n = pni2_vsnprintf(buf, count, fmt, ap); - va_end(ap); - return n; -} - -int pn_proactor_addr(char *buf, size_t len, const char *host, const char *port) { - return pni2_snprintf(buf, len, "%s:%s", host ? host : "", port ? port : ""); -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/proton-c/src/tests/fdlimit.py ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/fdlimit.py b/proton-c/src/tests/fdlimit.py index 53751cb..57faef8 100644 --- a/proton-c/src/tests/fdlimit.py +++ b/proton-c/src/tests/fdlimit.py @@ -20,17 +20,19 @@ from __future__ import print_function from proctest import * +def wait_listening(proc): + m = proc.wait_re("listening on ([0-9]+)$") + return m.group(1), m.group(0)+"\n" # Return (port, line) + class LimitedBroker(object): def __init__(self, test, fdlimit): self.test = test self.fdlimit = fdlimit def __enter__(self): - with TestPort() as tp: - self.port = str(tp.port) - self.proc = self.test.proc(['prlimit', '-n%d' % self.fdlimit, 'broker', '', self.port]) - self.proc.wait_re("listening") - return self + self.proc = self.test.proc(["broker", "", "0"]) + self.port, _ = wait_listening(self.proc) + return self def __exit__(self, *args): b = getattr(self, "proc") http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/proton-c/src/tests/proactor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c index 844cf95..a9eef18 100644 --- a/proton-c/src/tests/proactor.c +++ b/proton-c/src/tests/proactor.c @@ -17,7 +17,6 @@ * under the License. */ -#include "test_port.h" #include "test_tools.h" #include "test_handler.h" #include "test_config.h" @@ -36,8 +35,6 @@ #include <stdlib.h> #include <string.h> -static const char *localhost = ""; /* host for connect/listen */ - #define ARRAYLEN(A) (sizeof(A)/sizeof((A)[0])) /* Proactor and handler that take part in a test */ @@ -136,19 +133,37 @@ static void test_proactors_drain(test_proactor_t *tps, size_t n) { test_proactor_destroy((A)+i); \ } while (0) -/* Combine a test_port with a pn_listener */ -typedef struct test_listener_t { - test_port_t port; - pn_listener_t *listener; -} test_listener_t; -/* Return a listening test_listener_t, raise errors if not successful */ -test_listener_t test_listen(test_proactor_t *tp, const char *host) { - test_listener_t l = { test_port(host), pn_listener() }; - pn_proactor_listen(tp->proactor, l.listener, l.port.host_port, 4); +#define MAX_STR 256 +struct addrinfo { + char host[MAX_STR]; + char port[MAX_STR]; + char connect[MAX_STR]; + char host_port[MAX_STR]; +}; + +struct addrinfo listener_info(pn_listener_t *l) { + struct addrinfo ai = {{0}}; + const pn_netaddr_t *na = pn_netaddr_listening(l); + TEST_ASSERT(0 == pn_netaddr_host_port(na, ai.host, sizeof(ai.host), ai.port, sizeof(ai.port))); + for (na = pn_netaddr_next(na); na; na = pn_netaddr_next(na)) { /* Check that ports are consistent */ + char port[MAX_STR]; + TEST_ASSERT(0 == pn_netaddr_host_port(na, NULL, 0, port, sizeof(port))); + TEST_ASSERTF(0 == strcmp(port, ai.port), "%s != %s", port, ai.port); + } + (void)pn_proactor_addr(ai.connect, sizeof(ai.connect), "", ai.port); /* Address for connecting */ + (void)pn_netaddr_str(na, ai.host_port, sizeof(ai.host_port)); /* host:port listening address */ + return ai; +} + +/* Return a pn_listener_t*, raise errors if not successful */ +pn_listener_t *test_listen(test_proactor_t *tp, const char *host) { + char addr[1024]; + pn_listener_t *l = pn_listener(); + (void)pn_proactor_addr(addr, sizeof(addr), host, "0"); + pn_proactor_listen(tp->proactor, l, addr, 4); TEST_ETYPE_EQUAL(tp->handler.t, PN_LISTENER_OPEN, test_proactors_run(tp, 1)); TEST_COND_EMPTY(tp->handler.t, last_condition); - test_port_close(&l.port); return l; } @@ -283,9 +298,9 @@ static pn_event_type_t open_close_handler(test_handler_t *th, pn_event_t *e) { /* Test simple client/server connection with 2 proactors */ static void test_client_server(test_t *t) { test_proactor_t tps[] ={ test_proactor(t, open_close_handler), test_proactor(t, common_handler) }; - test_listener_t l = test_listen(&tps[1], localhost); + pn_listener_t *l = test_listen(&tps[1], ""); /* Connect and wait for close at both ends */ - pn_proactor_connect2(tps[0].proactor, NULL, NULL, l.port.host_port); + pn_proactor_connect2(tps[0].proactor, NULL, NULL, listener_info(l).connect); TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED); TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED); TEST_PROACTORS_DESTROY(tps); @@ -308,11 +323,11 @@ static pn_event_type_t open_wake_handler(test_handler_t *th, pn_event_t *e) { static void test_connection_wake(test_t *t) { test_proactor_t tps[] = { test_proactor(t, open_wake_handler), test_proactor(t, listen_handler) }; pn_proactor_t *client = tps[0].proactor; - test_listener_t l = test_listen(&tps[1], localhost); + pn_listener_t *l = test_listen(&tps[1], ""); pn_connection_t *c = pn_connection(); pn_incref(c); /* Keep a reference for wake() after free */ - pn_proactor_connect2(client, c, NULL, l.port.host_port); + pn_proactor_connect2(client, c, NULL, listener_info(l).connect); TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps)); TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */ pn_connection_wake(c); @@ -325,7 +340,7 @@ static void test_connection_wake(test_t *t) { /* Verify we don't get a wake after close even if they happen together */ pn_connection_t *c2 = pn_connection(); - pn_proactor_connect2(client, c2, NULL, l.port.host_port); + pn_proactor_connect2(client, c2, NULL, listener_info(l).connect); TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps)); pn_connection_wake(c2); pn_proactor_disconnect(client, NULL); @@ -360,8 +375,8 @@ static pn_event_type_t listen_abort_handler(test_handler_t *th, pn_event_t *e) { static void test_abort(test_t *t) { test_proactor_t tps[] = { test_proactor(t, open_close_handler), test_proactor(t, listen_abort_handler) }; pn_proactor_t *client = tps[0].proactor; - test_listener_t l = test_listen(&tps[1], localhost); - pn_proactor_connect2(client, NULL, NULL, l.port.host_port); + pn_listener_t *l = test_listen(&tps[1], ""); + pn_proactor_connect2(client, NULL, NULL, listener_info(l).connect); /* server transport closes */ if (TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps))) { @@ -374,7 +389,7 @@ static void test_abort(test_t *t) { TEST_COND_DESC(t, "abort", last_condition); } - pn_listener_close(l.listener); + pn_listener_close(l); while (TEST_PROACTORS_RUN(tps) != PN_PROACTOR_INACTIVE) {} while (TEST_PROACTORS_RUN(tps) != PN_PROACTOR_INACTIVE) {} @@ -419,14 +434,14 @@ static pn_event_type_t listen_refuse_handler(test_handler_t *th, pn_event_t *e) static void test_refuse(test_t *t) { test_proactor_t tps[] = { test_proactor(t, open_close_handler), test_proactor(t, listen_refuse_handler) }; pn_proactor_t *client = tps[0].proactor; - test_listener_t l = test_listen(&tps[1], localhost); - pn_proactor_connect2(client, NULL, NULL, l.port.host_port); + pn_listener_t *l = test_listen(&tps[1], ""); + pn_proactor_connect2(client, NULL, NULL, listener_info(l).connect); /* client transport closes */ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps)); /* client */ TEST_COND_NAME(t, "amqp:connection:framing-error", last_condition); - pn_listener_close(l.listener); + pn_listener_close(l); while (TEST_PROACTORS_RUN(tps) != PN_PROACTOR_INACTIVE) {} while (TEST_PROACTORS_RUN(tps) != PN_PROACTOR_INACTIVE) {} @@ -456,9 +471,9 @@ static void test_inactive(test_t *t) { pn_proactor_t *client = tps[0].proactor, *server = tps[1].proactor; /* Listen, connect, disconnect */ - test_listener_t l = test_listen(&tps[1], localhost); + pn_listener_t *l = test_listen(&tps[1], ""); pn_connection_t *c = pn_connection(); - pn_proactor_connect2(client, c, NULL, l.port.host_port); + pn_proactor_connect2(client, c, NULL, listener_info(l).connect); TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps)); pn_connection_wake(c); TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, TEST_PROACTORS_RUN(tps)); @@ -475,7 +490,7 @@ static void test_inactive(test_t *t) { /* Connect, set-timer, disconnect */ pn_proactor_set_timeout(client, 1000000); c = pn_connection(); - pn_proactor_connect2(client, c, NULL, l.port.host_port); + pn_proactor_connect2(client, c, NULL, listener_info(l).connect); TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps)); pn_connection_wake(c); TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, TEST_PROACTORS_RUN(tps)); @@ -489,7 +504,7 @@ static void test_inactive(test_t *t) { /* Server won't be INACTIVE until listener is closed */ TEST_CHECK(t, pn_proactor_get(server) == NULL); - pn_listener_close(l.listener); + pn_listener_close(l); TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, TEST_PROACTORS_RUN(tps)); TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps)); @@ -529,12 +544,12 @@ static void test_errors(test_t *t) { TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps)); /* Listen on a port already in use */ - test_port_t port = test_port(localhost); pn_listener_t *l = pn_listener(); - pn_proactor_listen(server, l, port.host_port, 1); + pn_proactor_listen(server, l, ":0", 1); TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, TEST_PROACTORS_RUN(tps)); test_handler_clear(&tps[1].handler, 0); - pn_proactor_listen(server, pn_listener(), port.host_port, 1); /* Busy */ + struct addrinfo laddr = listener_info(l); + pn_proactor_listen(server, pn_listener(), laddr.connect, 1); /* Busy */ TEST_PROACTORS_RUN(tps); TEST_HANDLER_EXPECT(&tps[1].handler, PN_LISTENER_CLOSE, 0); /* CLOSE only, no OPEN */ TEST_COND_NAME(t, "proton:io", last_condition); @@ -544,13 +559,12 @@ static void test_errors(test_t *t) { /* Connect with no listener */ c = pn_connection(); - pn_proactor_connect2(client, c, NULL, port.host_port); + pn_proactor_connect2(client, c, NULL, laddr.connect); if (TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps))) { TEST_COND_DESC(t, "refused", last_condition); TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps)); } - test_port_close(&port); TEST_PROACTORS_DESTROY(tps); } @@ -590,25 +604,27 @@ static void test_ipv4_ipv6(test_t *t) { pn_proactor_t *client = tps[0].proactor, *server = tps[1].proactor; /* Listen on all interfaces for IPv4 only. */ - test_listener_t l4 = test_listen(&tps[1], "0.0.0.0"); + pn_listener_t *l4 = test_listen(&tps[1], "0.0.0.0"); TEST_PROACTORS_DRAIN(tps); /* Empty address listens on both IPv4 and IPv6 on all interfaces */ - test_listener_t l = test_listen(&tps[1], ""); + pn_listener_t *l = test_listen(&tps[1], ""); TEST_PROACTORS_DRAIN(tps); -#define EXPECT_CONNECT(TP, HOST) do { \ - pn_proactor_connect2(client, NULL, NULL, test_port_use_host(&(TP), (HOST))); \ - TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps)); \ +#define EXPECT_CONNECT(LISTENER, HOST) do { \ + char addr[1024]; \ + pn_proactor_addr(addr, sizeof(addr), HOST, listener_info(LISTENER).port); \ + pn_proactor_connect2(client, NULL, NULL, addr); \ + TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps)); \ TEST_COND_EMPTY(t, last_condition); \ - TEST_PROACTORS_DRAIN(tps); \ + TEST_PROACTORS_DRAIN(tps); \ } while(0) - EXPECT_CONNECT(l4.port, "127.0.0.1"); /* v4->v4 */ - EXPECT_CONNECT(l4.port, ""); /* local->v4*/ + EXPECT_CONNECT(l4, "127.0.0.1"); /* v4->v4 */ + EXPECT_CONNECT(l4, ""); /* local->v4*/ - EXPECT_CONNECT(l.port, "127.0.0.1"); /* v4->all */ - EXPECT_CONNECT(l.port, ""); /* local->all */ + EXPECT_CONNECT(l, "127.0.0.1"); /* v4->all */ + EXPECT_CONNECT(l, ""); /* local->all */ /* Listen on ipv6 loopback, if it fails skip ipv6 tests. @@ -617,24 +633,24 @@ static void test_ipv4_ipv6(test_t *t) { local ipv6 loopback configured, so "::1" will force an error. */ TEST_PROACTORS_DRAIN(tps); - test_listener_t l6 = { test_port("::1"), pn_listener() }; - pn_proactor_listen(server, l6.listener, l6.port.host_port, 4); + pn_listener_t *l6 = pn_listener(); + pn_proactor_listen(server, l6, "::1:0", 4); pn_event_type_t e = TEST_PROACTORS_RUN(tps); if (e == PN_LISTENER_OPEN && !pn_condition_is_set(last_condition)) { TEST_PROACTORS_DRAIN(tps); - EXPECT_CONNECT(l6.port, "::1"); /* v6->v6 */ - EXPECT_CONNECT(l6.port, ""); /* local->v6 */ - EXPECT_CONNECT(l.port, "::1"); /* v6->all */ + EXPECT_CONNECT(l6, "::1"); /* v6->v6 */ + EXPECT_CONNECT(l6, ""); /* local->v6 */ + EXPECT_CONNECT(l, "::1"); /* v6->all */ - pn_listener_close(l6.listener); + pn_listener_close(l6); } else { const char *d = pn_condition_get_description(last_condition); TEST_LOGF(t, "skip IPv6 tests: %s %s", pn_event_type_name(e), d ? d : "no condition"); } - pn_listener_close(l.listener); - pn_listener_close(l4.listener); + pn_listener_close(l); + pn_listener_close(l4); TEST_PROACTORS_DESTROY(tps); } @@ -642,15 +658,15 @@ static void test_ipv4_ipv6(test_t *t) { static void test_release_free(test_t *t) { test_proactor_t tps[] = { test_proactor(t, open_wake_handler), test_proactor(t, listen_handler) }; pn_proactor_t *client = tps[0].proactor; - test_listener_t l = test_listen(&tps[1], localhost); + pn_listener_t *l = test_listen(&tps[1], ""); /* leave one connection to the proactor */ - pn_proactor_connect2(client, NULL, NULL, l.port.host_port); + pn_proactor_connect2(client, NULL, NULL, listener_info(l).connect); TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps)); /* release c1 and free immediately */ pn_connection_t *c1 = pn_connection(); - pn_proactor_connect2(client, c1, NULL, l.port.host_port); + pn_proactor_connect2(client, c1, NULL, listener_info(l).connect); TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps)); pn_proactor_release_connection(c1); /* We free but socket should still be cleaned up */ pn_connection_free(c1); @@ -659,7 +675,7 @@ static void test_release_free(test_t *t) { /* release c2 and but don't free till after proactor free */ pn_connection_t *c2 = pn_connection(); - pn_proactor_connect2(client, c2, NULL, l.port.host_port); + pn_proactor_connect2(client, c2, NULL, listener_info(l).connect); TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps)); pn_proactor_release_connection(c2); TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */ @@ -747,10 +763,10 @@ static void test_ssl(test_t *t) { pn_ssl_domain_t *cd = client->handler.ssl_domain = pn_ssl_domain(PN_SSL_MODE_CLIENT); pn_ssl_domain_t *sd = server->handler.ssl_domain = pn_ssl_domain(PN_SSL_MODE_SERVER); TEST_CHECK(t, 0 == SET_CREDENTIALS(sd, "tserver")); - test_listener_t l = test_listen(server, localhost); + pn_listener_t *l = test_listen(server, ""); /* Basic SSL connection */ - pn_proactor_connect2(client->proactor, NULL, NULL, l.port.host_port); + pn_proactor_connect2(client->proactor, NULL, NULL, listener_info(l).connect); /* Open ok at both ends */ TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps)); TEST_COND_EMPTY(t, last_condition); @@ -764,7 +780,7 @@ static void test_ssl(test_t *t) { TEST_INT_EQUAL(t, 0, pn_ssl_domain_set_peer_authentication(cd, PN_SSL_VERIFY_PEER_NAME, NULL)); pn_connection_t *c = pn_connection(); pn_connection_set_hostname(c, "test_server"); - pn_proactor_connect2(client->proactor, c, NULL, l.port.host_port); + pn_proactor_connect2(client->proactor, c, NULL, listener_info(l).connect); TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps)); TEST_COND_EMPTY(t, last_condition); TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps)); @@ -775,7 +791,7 @@ static void test_ssl(test_t *t) { /* Verify peer with bad hostname */ c = pn_connection(); pn_connection_set_hostname(c, "wrongname"); - pn_proactor_connect2(client->proactor, c, NULL, l.port.host_port); + pn_proactor_connect2(client->proactor, c, NULL, listener_info(l).connect); TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps)); TEST_COND_NAME(t, "amqp:connection:framing-error", last_condition); TEST_COND_DESC(t, "SSL", last_condition); @@ -852,9 +868,9 @@ static void test_netaddr(test_t *t) { test_proactor_t tps[] ={ test_proactor(t, open_wake_handler), test_proactor(t, listen_handler) }; pn_proactor_t *client = tps[0].proactor; /* Use IPv4 to get consistent results all platforms */ - test_listener_t l = test_listen(&tps[1], "127.0.0.1"); + pn_listener_t *l = test_listen(&tps[1], "127.0.0.1"); pn_connection_t *c = pn_connection(); - pn_proactor_connect2(client, c, NULL, l.port.host_port); + pn_proactor_connect2(client, c, NULL, listener_info(l).connect); if (!TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps))) { TEST_COND_EMPTY(t, last_condition); /* Show the last condition */ return; /* don't continue if connection is closed */ @@ -866,7 +882,7 @@ static void test_netaddr(test_t *t) { pn_transport_t *ct = pn_connection_transport(c); const pn_netaddr_t *na = pn_netaddr_remote(ct); pn_netaddr_str(na, cr, sizeof(cr)); - TEST_STR_IN(t, test_port_use_host(&l.port, ""), cr); /* remote address has listening port */ + TEST_STR_IN(t, listener_info(l).port, cr); /* remote address has listening port */ pn_connection_t *s = last_accepted; /* server side of the connection */ @@ -879,17 +895,12 @@ static void test_netaddr(test_t *t) { pn_netaddr_str(pn_netaddr_remote(st), sr, sizeof(sr)); TEST_STR_EQUAL(t, cl, sr); /* client local == server remote */ - /* Examine as sockaddr */ - const struct sockaddr *sa = pn_netaddr_sockaddr(na); - TEST_CHECK(t, AF_INET == sa->sa_family); - char host[TEST_PORT_MAX_STR] = ""; - char serv[TEST_PORT_MAX_STR] = ""; - int err = getnameinfo(sa, pn_netaddr_socklen(na), - host, sizeof(host), serv, sizeof(serv), - NI_NUMERICHOST | NI_NUMERICSERV); + char host[MAX_STR] = ""; + char serv[MAX_STR] = ""; + int err = pn_netaddr_host_port(na, host, sizeof(host), serv, sizeof(serv)); TEST_CHECK(t, 0 == err); TEST_STR_EQUAL(t, "127.0.0.1", host); - TEST_STR_EQUAL(t, l.port.str, serv); + TEST_STR_EQUAL(t, listener_info(l).port, serv); /* Make sure you can use NULL, 0 to get length of address string without a crash */ size_t len = pn_netaddr_str(pn_netaddr_local(ct), NULL, 0); @@ -905,15 +916,15 @@ static void test_disconnect(test_t *t) { pn_proactor_t *client = tps[0].proactor, *server = tps[1].proactor; /* Start two listeners */ - test_listener_t l = test_listen(&tps[1], localhost); - test_listener_t l2 = test_listen(&tps[1], localhost); + pn_listener_t *l = test_listen(&tps[1], ""); + pn_listener_t *l2 = test_listen(&tps[1], ""); /* Only wait for one connection to remote-open before disconnect */ pn_connection_t *c = pn_connection(); - pn_proactor_connect2(client, c, NULL, l.port.host_port); + pn_proactor_connect2(client, c, NULL, listener_info(l).connect); TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps)); pn_connection_t *c2 = pn_connection(); - pn_proactor_connect2(client, c2, NULL, l2.port.host_port); + pn_proactor_connect2(client, c2, NULL, listener_info(l2).connect); TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps)); TEST_PROACTORS_DRAIN(tps); @@ -944,8 +955,8 @@ static void test_disconnect(test_t *t) { TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps)); /* Make sure the proactors are still functional */ - test_listener_t l3 = test_listen(&tps[1], localhost); - pn_proactor_connect2(client, NULL, NULL, l3.port.host_port); + pn_listener_t *l3 = test_listen(&tps[1], ""); + pn_proactor_connect2(client, NULL, NULL, listener_info(l3).connect); TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps)); pn_proactor_disconnect(client, NULL); @@ -1027,7 +1038,7 @@ static void test_message_stream(test_t *t) { test_proactor(t, message_stream_handler) }; pn_proactor_t *client = tps[0].proactor; - test_listener_t l = test_listen(&tps[1], localhost); + pn_listener_t *l = test_listen(&tps[1], ""); struct message_stream_context ctx = { 0 }; tps[0].handler.context = &ctx; tps[1].handler.context = &ctx; @@ -1042,7 +1053,7 @@ static void test_message_stream(test_t *t) { pn_message_free(m); pn_connection_t *c = pn_connection(); - pn_proactor_connect2(client, c, NULL, l.port.host_port); + pn_proactor_connect2(client, c, NULL, listener_info(l).connect); pn_session_t *ssn = pn_session(c); pn_session_open(ssn); pn_link_t *snd = pn_sender(ssn, "x"); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/proton-c/src/tests/test_port.h ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/test_port.h b/proton-c/src/tests/test_port.h deleted file mode 100644 index b252dd9..0000000 --- a/proton-c/src/tests/test_port.h +++ /dev/null @@ -1,142 +0,0 @@ -#ifndef TESTS_TEST_PORT_H -#define TESTS_TEST_PORT_H - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* Some simple platform-specifics to acquire an unused socket */ - -#include <stdio.h> -#include <stdlib.h> -#include <string.h> - -#if defined(_WIN32) - -# include <winsock2.h> -# include <ws2tcpip.h> - -typedef SOCKET sock_t; - -static void test_snprintf(char *buf, size_t count, const char *fmt, ...) { - va_list ap; - va_start(ap, fmt); - _vsnprintf(buf, count, fmt, ap); - buf[count-1] = '\0'; /* _vsnprintf doesn't null-terminate on overflow */ -} - -void check_err(int ret, const char *what) { - if (ret) { - char buf[512]; - FormatMessage( - FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, - NULL, WSAGetLastError(), NULL, buf, sizeof(buf), NULL); - fprintf(stderr, "%s: %s\n", what, buf); - abort(); - } -} - -#else /* POSIX */ - -# include <sys/types.h> -# include <sys/socket.h> -# include <netinet/in.h> -# include <unistd.h> -# include <netdb.h> -# define test_snprintf snprintf - -typedef int sock_t; - -void check_err(int ret, const char *what) { - if (ret) { - perror(what); abort(); - } -} - -#endif - -#define TEST_PORT_MAX_STR 1060 - -/* Combines a sock_t with the int and char* versions of the port for convenience */ -typedef struct test_port_t { - sock_t sock; - int port; /* port as integer */ - char str[TEST_PORT_MAX_STR]; /* port as string */ - char host_port[TEST_PORT_MAX_STR]; /* host:port string */ -} test_port_t; - -/* Modifies tp->host_port to use host, returns the new tp->host_port */ -const char *test_port_use_host(test_port_t *tp, const char *host) { - test_snprintf(tp->host_port, sizeof(tp->host_port), "%s:%d", host, tp->port); - return tp->host_port; -} - -/* Create a socket and bind(INADDR_LOOPBACK:0) to get a free port. - Set socket options so the port can be bound and used for listen() within this process, - even though it is bound to the test_port socket. - Use host to create the host_port address string. -*/ -test_port_t test_port(const char* host) { -#ifdef _WIN32 - static int wsa_started = 0; - if (!wsa_started) { - WORD wsa_ver = MAKEWORD(2, 2); - WSADATA unused; - check_err(WSAStartup(wsa_ver, &unused), "WSAStartup"); - } -#endif - int err = 0; - test_port_t tp = {0}; - tp.sock = socket(AF_INET, SOCK_STREAM, 0); - check_err(tp.sock < 0, "socket"); -#ifndef _WIN32 - int on = 1; - check_err(setsockopt(tp.sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&on, sizeof(on)), "setsockopt"); -#endif - struct sockaddr_in addr = {0}; - addr.sin_family = AF_INET; /* set the type of connection to TCP/IP */ - addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - addr.sin_port = 0; /* bind to port 0 */ - err = bind(tp.sock, (struct sockaddr*)&addr, sizeof(addr)); - check_err(err, "bind"); - socklen_t len = sizeof(addr); - err = getsockname(tp.sock, (struct sockaddr*)&addr, &len); /* Get the bound port */ - check_err(err, "getsockname"); - tp.port = ntohs(addr.sin_port); - test_snprintf(tp.str, sizeof(tp.str), "%d", tp.port); - test_port_use_host(&tp, host); -#ifdef _WIN32 /* Windows doesn't support the twice-open socket trick */ - closesocket(tp.sock); -#elif defined (__APPLE__) || defined(__FreeBSD__) - close(tp.sock); -#endif - return tp; -} - -void test_port_close(test_port_t *tp) { -#ifdef _WIN32 - WSACleanup(); -#elif defined (__APPLE__) || defined(__FreeBSD__) - // We already closed and have no other cleanup to do -#else - close(tp->sock); -#endif -} - - -#endif // TESTS_TEST_PORT_H http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7bc25c61/proton-c/src/tests/test_tools.h ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h index 11354ee..d046a43 100644 --- a/proton-c/src/tests/test_tools.h +++ b/proton-c/src/tests/test_tools.h @@ -61,7 +61,7 @@ void test_vlogf_(test_t *t, const char *prefix, const char* expr, } if (t) fprintf(stderr, " [%s]", t->name); fprintf(stderr, "\n"); - fflush(stdout); + fflush(stderr); } void test_logf_(test_t *t, const char *prefix, const char* expr, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
