http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/reactor/io/windows/io.c ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/io/windows/io.c b/proton-c/src/reactor/io/windows/io.c new file mode 100644 index 0000000..3ae6722 --- /dev/null +++ b/proton-c/src/reactor/io/windows/io.c @@ -0,0 +1,459 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#define FD_SETSIZE 2048 +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0501 +#endif +#if _WIN32_WINNT < 0x0501 +#error "Proton requires Windows API support for XP or later." +#endif +#include <winsock2.h> +#include <mswsock.h> +#include <Ws2tcpip.h> + +#include "reactor/io.h" +#include "reactor/selector.h" + +#include "platform/platform.h" +#include "iocp.h" +#include "core/util.h" + +#include <proton/object.h> + +#include <ctype.h> +#include <errno.h> +#include <stdio.h> +#include <assert.h> + +int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code) +{ + // Error code can be from GetLastError or WSAGetLastError, + char err[1024] = {0}; + FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS | + FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err), NULL); + return pn_error_format(error, PN_ERR, "%s: %s", msg, err); +} + +static void io_log(const char *fmt, ...) +{ + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fflush(stderr); +} + +struct pn_io_t { + char host[NI_MAXHOST]; + char serv[NI_MAXSERV]; + pn_error_t *error; + bool trace; + bool wouldblock; + iocp_t *iocp; +}; + +void pn_io_initialize(void *obj) +{ + pn_io_t *io = (pn_io_t *) obj; + io->error = pn_error(); + io->wouldblock = false; + io->trace = pn_env_bool("PN_TRACE_DRV"); + + /* Request WinSock 2.2 */ + WORD wsa_ver = MAKEWORD(2, 2); + WSADATA unused; + int err = WSAStartup(wsa_ver, &unused); + if (err) { + pni_win32_error(io->error, "WSAStartup", WSAGetLastError()); + fprintf(stderr, "Can't load WinSock: %s\n", pn_error_text(io->error)); + } + io->iocp = pni_iocp(); +} + +void pn_io_finalize(void *obj) +{ + pn_io_t *io = (pn_io_t *) obj; + pn_error_free(io->error); + pn_free(io->iocp); + WSACleanup(); +} + +#define pn_io_hashcode NULL +#define pn_io_compare NULL +#define pn_io_inspect + +pn_io_t *pn_io(void) +{ + static const pn_class_t clazz = PN_CLASS(pn_io); + pn_io_t *io = (pn_io_t *) pn_class_new(&clazz, sizeof(pn_io_t)); + return io; +} + +void pn_io_free(pn_io_t *io) +{ + pn_free(io); +} + +pn_error_t *pn_io_error(pn_io_t *io) +{ + assert(io); + return io->error; +} + +static void ensure_unique(pn_io_t *io, pn_socket_t new_socket) +{ + // A brand new socket can have the same HANDLE value as a previous + // one after a socketclose. If the application closes one itself + // (i.e. not using pn_close), we don't find out about it until here. + iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, new_socket); + if (iocpd) { + if (io->trace) + io_log("Stale external socket reference discarded\n"); + // Re-use means former socket instance was closed + assert(iocpd->ops_in_progress == 0); + assert(iocpd->external); + // Clean up the straggler as best we can + pn_socket_t sock = iocpd->socket; + iocpd->socket = INVALID_SOCKET; + pni_iocpdesc_map_del(io->iocp, sock); // may free the iocpdesc_t depending on refcount + } +} + + +/* + * This heavyweight surrogate pipe could be replaced with a normal Windows pipe + * now that select() is no longer used. If interrupt semantics are all that is + * needed, a simple user space counter and reserved completion status would + * probably suffice. + */ +static int pni_socket_pair(pn_io_t *io, SOCKET sv[2]); + +int pn_pipe(pn_io_t *io, pn_socket_t *dest) +{ + int n = pni_socket_pair(io, dest); + if (n) { + pni_win32_error(io->error, "pipe", WSAGetLastError()); + } + return n; +} + +static void pn_configure_sock(pn_io_t *io, pn_socket_t sock) { + // + // Disable the Nagle algorithm on TCP connections. + // + int flag = 1; + if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)) != 0) { + perror("setsockopt"); + } + + u_long nonblock = 1; + if (ioctlsocket(sock, FIONBIO, &nonblock)) { + perror("ioctlsocket"); + } +} + +static inline pn_socket_t pni_create_socket(int domain, int protocol); + +static const char *amqp_service(const char *port) { + // Help older Windows to know about amqp[s] ports + if (port) { + if (!strcmp("amqp", port)) return "5672"; + if (!strcmp("amqps", port)) return "5671"; + } + return port; +} + +pn_socket_t pn_listen(pn_io_t *io, const char *host, const char *port) +{ + struct addrinfo *addr; + int code = getaddrinfo(host, amqp_service(port), NULL, &addr); + if (code) { + pn_error_format(io->error, PN_ERR, "getaddrinfo(%s, %s): %s\n", host, port, gai_strerror(code)); + return INVALID_SOCKET; + } + + pn_socket_t sock = pni_create_socket(addr->ai_family, addr->ai_protocol); + if (sock == INVALID_SOCKET) { + pni_win32_error(io->error, "pni_create_socket", WSAGetLastError()); + return INVALID_SOCKET; + } + ensure_unique(io, sock); + + bool optval = 1; + if (setsockopt(sock, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char *) &optval, + sizeof(optval)) == -1) { + pni_win32_error(io->error, "setsockopt", WSAGetLastError()); + closesocket(sock); + return INVALID_SOCKET; + } + + if (bind(sock, addr->ai_addr, addr->ai_addrlen) == -1) { + pni_win32_error(io->error, "bind", WSAGetLastError()); + freeaddrinfo(addr); + closesocket(sock); + return INVALID_SOCKET; + } + freeaddrinfo(addr); + + if (listen(sock, 50) == -1) { + pni_win32_error(io->error, "listen", WSAGetLastError()); + closesocket(sock); + return INVALID_SOCKET; + } + + if (io->iocp->selector) { + iocpdesc_t *iocpd = pni_iocpdesc_create(io->iocp, sock, false); + if (!iocpd) { + pn_i_error_from_errno(io->error, "register"); + closesocket(sock); + return INVALID_SOCKET; + } + pni_iocpdesc_start(iocpd); + } + + return sock; +} + +pn_socket_t pn_connect(pn_io_t *io, const char *hostarg, const char *port) +{ + // convert "0.0.0.0" to "127.0.0.1" on Windows for outgoing sockets + const char *host = strcmp("0.0.0.0", hostarg) ? hostarg : "127.0.0.1"; + + struct addrinfo *addr; + int code = getaddrinfo(host, amqp_service(port), NULL, &addr); + if (code) { + pn_error_format(io->error, PN_ERR, "getaddrinfo(%s, %s): %s", host, port, gai_strerror(code)); + return INVALID_SOCKET; + } + + pn_socket_t sock = pni_create_socket(addr->ai_family, addr->ai_protocol); + if (sock == INVALID_SOCKET) { + pni_win32_error(io->error, "proton pni_create_socket", WSAGetLastError()); + freeaddrinfo(addr); + return INVALID_SOCKET; + } + + ensure_unique(io, sock); + pn_configure_sock(io, sock); + + if (io->iocp->selector) { + return pni_iocp_begin_connect(io->iocp, sock, addr, io->error); + } else { + if (connect(sock, addr->ai_addr, addr->ai_addrlen) != 0) { + if (WSAGetLastError() != WSAEWOULDBLOCK) { + pni_win32_error(io->error, "connect", WSAGetLastError()); + freeaddrinfo(addr); + closesocket(sock); + return INVALID_SOCKET; + } + } + + freeaddrinfo(addr); + return sock; + } +} + +pn_socket_t pn_accept(pn_io_t *io, pn_socket_t listen_sock, char *name, size_t size) +{ + struct sockaddr_storage addr; + socklen_t addrlen = sizeof(addr); + iocpdesc_t *listend = pni_iocpdesc_map_get(io->iocp, listen_sock); + pn_socket_t accept_sock; + + *name = '\0'; + if (listend) + accept_sock = pni_iocp_end_accept(listend, (struct sockaddr *) &addr, &addrlen, &io->wouldblock, io->error); + else { + // User supplied socket + accept_sock = accept(listen_sock, (struct sockaddr *) &addr, &addrlen); + if (accept_sock == INVALID_SOCKET) + pni_win32_error(io->error, "sync accept", WSAGetLastError()); + } + + if (accept_sock == INVALID_SOCKET) + return accept_sock; + + int code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, NI_MAXHOST, + io->serv, NI_MAXSERV, 0); + if (code) + code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, NI_MAXHOST, + io->serv, NI_MAXSERV, NI_NUMERICHOST | NI_NUMERICSERV); + if (code) { + pn_error_format(io->error, PN_ERR, "getnameinfo: %s\n", gai_strerror(code)); + pn_close(io, accept_sock); + return INVALID_SOCKET; + } else { + pn_configure_sock(io, accept_sock); + pni_snprintf(name, size, "%s:%s", io->host, io->serv); + if (listend) { + pni_iocpdesc_start(pni_iocpdesc_map_get(io->iocp, accept_sock)); + } + return accept_sock; + } +} + +static inline pn_socket_t pni_create_socket(int domain, int protocol) { + return socket(domain, SOCK_STREAM, protocol); +} + +ssize_t pn_send(pn_io_t *io, pn_socket_t sockfd, const void *buf, size_t len) { + ssize_t count; + iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, sockfd); + if (iocpd) { + count = pni_iocp_begin_write(iocpd, buf, len, &io->wouldblock, io->error); + } else { + count = send(sockfd, (const char *) buf, len, 0); + io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK; + } + return count; +} + +ssize_t pn_recv(pn_io_t *io, pn_socket_t socket, void *buf, size_t size) +{ + ssize_t count; + iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, socket); + if (iocpd) { + count = pni_iocp_recv(iocpd, buf, size, &io->wouldblock, io->error); + } else { + count = recv(socket, (char *) buf, size, 0); + io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK; + } + return count; +} + +ssize_t pn_write(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size) +{ + // non-socket io is mapped to socket io for now. See pn_pipe() + return pn_send(io, socket, buf, size); +} + +ssize_t pn_read(pn_io_t *io, pn_socket_t socket, void *buf, size_t size) +{ + return pn_recv(io, socket, buf, size); +} + +void pn_close(pn_io_t *io, pn_socket_t socket) +{ + iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, socket); + if (iocpd) + pni_iocp_begin_close(iocpd); + else { + closesocket(socket); + } +} + +bool pn_wouldblock(pn_io_t *io) +{ + return io->wouldblock; +} + +pn_selector_t *pn_io_selector(pn_io_t *io) +{ + if (io->iocp->selector == NULL) + io->iocp->selector = pni_selector_create(io->iocp); + return io->iocp->selector; +} + +static void configure_pipe_socket(pn_io_t *io, pn_socket_t sock) +{ + u_long v = 1; + ioctlsocket (sock, FIONBIO, &v); + ensure_unique(io, sock); + iocpdesc_t *iocpd = pni_iocpdesc_create(io->iocp, sock, false); + pni_iocpdesc_start(iocpd); +} + + +static int pni_socket_pair (pn_io_t *io, SOCKET sv[2]) { + // no socketpair on windows. provide pipe() semantics using sockets + struct protoent * pe_tcp = getprotobyname("tcp"); + if (pe_tcp == NULL) { + perror("getprotobyname"); + return -1; + } + + SOCKET sock = socket(AF_INET, SOCK_STREAM, pe_tcp->p_proto); + if (sock == INVALID_SOCKET) { + perror("socket"); + return -1; + } + + BOOL b = 1; + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char *) &b, sizeof(b)) == -1) { + perror("setsockopt"); + closesocket(sock); + return -1; + } + else { + struct sockaddr_in addr = {0}; + addr.sin_family = AF_INET; + addr.sin_port = 0; + addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); + + if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) == -1) { + perror("bind"); + closesocket(sock); + return -1; + } + } + + if (listen(sock, 50) == -1) { + perror("listen"); + closesocket(sock); + return -1; + } + + if ((sv[1] = socket(AF_INET, SOCK_STREAM, pe_tcp->p_proto)) == INVALID_SOCKET) { + perror("sock1"); + closesocket(sock); + return -1; + } + else { + struct sockaddr addr = {0}; + int l = sizeof(addr); + if (getsockname(sock, &addr, &l) == -1) { + perror("getsockname"); + closesocket(sock); + return -1; + } + + if (connect(sv[1], &addr, sizeof(addr)) == -1) { + int err = WSAGetLastError(); + fprintf(stderr, "connect wsaerrr %d\n", err); + closesocket(sock); + closesocket(sv[1]); + return -1; + } + + if ((sv[0] = accept(sock, &addr, &l)) == INVALID_SOCKET) { + perror("accept"); + closesocket(sock); + closesocket(sv[1]); + return -1; + } + } + + configure_pipe_socket(io, sv[0]); + configure_pipe_socket(io, sv[1]); + closesocket(sock); + return 0; +}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/reactor/io/windows/iocp.c ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/io/windows/iocp.c b/proton-c/src/reactor/io/windows/iocp.c new file mode 100644 index 0000000..8a1a64a --- /dev/null +++ b/proton-c/src/reactor/io/windows/iocp.c @@ -0,0 +1,1179 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0501 +#endif +#if _WIN32_WINNT < 0x0501 +#error "Proton requires Windows API support for XP or later." +#endif +#include <winsock2.h> +#include <mswsock.h> +#include <Ws2tcpip.h> + +#include "reactor/io.h" +#include "reactor/selector.h" + +#include "iocp.h" +#include "platform/platform.h" +#include "core/util.h" + +#include <proton/object.h> +#include <proton/error.h> +#include <proton/transport.h> + +#include <assert.h> + +/* + * Windows IO Completion Port support for Proton. + * + * Overlapped writes are used to avoid lengthy stalls between write + * completion and starting a new write. Non-overlapped reads are used + * since Windows accumulates inbound traffic without stalling and + * managing read buffers would not avoid a memory copy at the pn_read + * boundary. + * + * A socket must not get a Windows closesocket() unless the + * application has called pn_close on the socket or a global + * pn_io_finalize(). On error, the internal accounting for + * write_closed or read_closed may be updated along with the external + * event notification. A socket may be closed if it is never added to + * the iocpdesc_map or is on its way out of the map. + */ + +// Max number of overlapped accepts per listener +#define IOCP_MAX_ACCEPTS 10 + +// AcceptEx squishes the local and remote addresses and optional data +// all together when accepting the connection. Reserve enough for +// IPv6 addresses, even if the socket is IPv4. The 16 bytes padding +// per address is required by AcceptEx. +#define IOCP_SOCKADDRMAXLEN (sizeof(sockaddr_in6) + 16) +#define IOCP_SOCKADDRBUFLEN (2 * IOCP_SOCKADDRMAXLEN) + +static void iocp_log(const char *fmt, ...) +{ + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fflush(stderr); +} + +static void set_iocp_error_status(pn_error_t *error, int code, HRESULT status) +{ + char buf[512]; + if (FormatMessage(FORMAT_MESSAGE_MAX_WIDTH_MASK | FORMAT_MESSAGE_FROM_SYSTEM, + 0, status, 0, buf, sizeof(buf), 0)) + pn_error_set(error, code, buf); + else { + fprintf(stderr, "pn internal Windows error: %lu\n", GetLastError()); + } +} + +static void reap_check(iocpdesc_t *); +static void bind_to_completion_port(iocpdesc_t *iocpd); +static void iocp_shutdown(iocpdesc_t *iocpd); +static void start_reading(iocpdesc_t *iocpd); +static bool is_listener(iocpdesc_t *iocpd); +static void release_sys_sendbuf(SOCKET s); + +static void iocpdesc_fail(iocpdesc_t *iocpd, HRESULT status, const char* text) +{ + pni_win32_error(iocpd->error, text, status); + if (iocpd->iocp->iocp_trace) { + iocp_log("connection terminated: %s\n", pn_error_text(iocpd->error)); + } + iocpd->write_closed = true; + iocpd->read_closed = true; + iocpd->poll_error = true; + pni_events_update(iocpd, iocpd->events & ~(PN_READABLE | PN_WRITABLE)); +} + +// Helper functions to use specialized IOCP AcceptEx() and ConnectEx() +static LPFN_ACCEPTEX lookup_accept_ex(SOCKET s) +{ + GUID guid = WSAID_ACCEPTEX; + DWORD bytes = 0; + LPFN_ACCEPTEX fn; + WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), + &fn, sizeof(fn), &bytes, NULL, NULL); + assert(fn); + return fn; +} + +static LPFN_CONNECTEX lookup_connect_ex(SOCKET s) +{ + GUID guid = WSAID_CONNECTEX; + DWORD bytes = 0; + LPFN_CONNECTEX fn; + WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), + &fn, sizeof(fn), &bytes, NULL, NULL); + assert(fn); + return fn; +} + +static LPFN_GETACCEPTEXSOCKADDRS lookup_get_accept_ex_sockaddrs(SOCKET s) +{ + GUID guid = WSAID_GETACCEPTEXSOCKADDRS; + DWORD bytes = 0; + LPFN_GETACCEPTEXSOCKADDRS fn; + WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), + &fn, sizeof(fn), &bytes, NULL, NULL); + assert(fn); + return fn; +} + +// match accept socket to listener socket +static iocpdesc_t *create_same_type_socket(iocpdesc_t *iocpd) +{ + sockaddr_storage sa; + socklen_t salen = sizeof(sa); + if (getsockname(iocpd->socket, (sockaddr*)&sa, &salen) == -1) + return NULL; + SOCKET s = socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM + if (s == INVALID_SOCKET) + return NULL; + return pni_iocpdesc_create(iocpd->iocp, s, false); +} + +static bool is_listener(iocpdesc_t *iocpd) +{ + return iocpd && iocpd->acceptor; +} + +// === Async accept processing + +typedef struct { + iocp_result_t base; + iocpdesc_t *new_sock; + char address_buffer[IOCP_SOCKADDRBUFLEN]; + DWORD unused; +} accept_result_t; + +static accept_result_t *accept_result(iocpdesc_t *listen_sock) { + accept_result_t *result = (accept_result_t *)calloc(1, sizeof(accept_result_t)); + if (result) { + result->base.type = IOCP_ACCEPT; + result->base.iocpd = listen_sock; + } + return result; +} + +static void reset_accept_result(accept_result_t *result) { + memset(&result->base.overlapped, 0, sizeof (OVERLAPPED)); + memset(&result->address_buffer, 0, IOCP_SOCKADDRBUFLEN); +} + +struct pni_acceptor_t { + int accept_queue_size; + pn_list_t *accepts; + iocpdesc_t *listen_sock; + bool signalled; + LPFN_ACCEPTEX fn_accept_ex; + LPFN_GETACCEPTEXSOCKADDRS fn_get_accept_ex_sockaddrs; +}; + +#define pni_acceptor_compare NULL +#define pni_acceptor_inspect NULL +#define pni_acceptor_hashcode NULL + +static void pni_acceptor_initialize(void *object) +{ + pni_acceptor_t *acceptor = (pni_acceptor_t *) object; + acceptor->accepts = pn_list(PN_VOID, IOCP_MAX_ACCEPTS); +} + +static void pni_acceptor_finalize(void *object) +{ + pni_acceptor_t *acceptor = (pni_acceptor_t *) object; + size_t len = pn_list_size(acceptor->accepts); + for (size_t i = 0; i < len; i++) + free(pn_list_get(acceptor->accepts, i)); + pn_free(acceptor->accepts); +} + +static pni_acceptor_t *pni_acceptor(iocpdesc_t *iocpd) +{ + static const pn_cid_t CID_pni_acceptor = CID_pn_void; + static const pn_class_t clazz = PN_CLASS(pni_acceptor); + pni_acceptor_t *acceptor = (pni_acceptor_t *) pn_class_new(&clazz, sizeof(pni_acceptor_t)); + acceptor->listen_sock = iocpd; + acceptor->accept_queue_size = 0; + acceptor->signalled = false; + pn_socket_t sock = acceptor->listen_sock->socket; + acceptor->fn_accept_ex = lookup_accept_ex(sock); + acceptor->fn_get_accept_ex_sockaddrs = lookup_get_accept_ex_sockaddrs(sock); + return acceptor; +} + +static void begin_accept(pni_acceptor_t *acceptor, accept_result_t *result) +{ + if (acceptor->listen_sock->closing) { + if (result) { + free(result); + acceptor->accept_queue_size--; + } + if (acceptor->accept_queue_size == 0) + acceptor->signalled = true; + return; + } + + if (result) { + reset_accept_result(result); + } else { + if (acceptor->accept_queue_size < IOCP_MAX_ACCEPTS && + pn_list_size(acceptor->accepts) == acceptor->accept_queue_size ) { + result = accept_result(acceptor->listen_sock); + acceptor->accept_queue_size++; + } else { + // an async accept is still pending or max concurrent accepts already hit + return; + } + } + + result->new_sock = create_same_type_socket(acceptor->listen_sock); + if (result->new_sock) { + // Not yet connected. + result->new_sock->read_closed = true; + result->new_sock->write_closed = true; + + bool success = acceptor->fn_accept_ex(acceptor->listen_sock->socket, result->new_sock->socket, + result->address_buffer, 0, IOCP_SOCKADDRMAXLEN, IOCP_SOCKADDRMAXLEN, + &result->unused, (LPOVERLAPPED) result); + if (!success && WSAGetLastError() != ERROR_IO_PENDING) { + result->base.status = WSAGetLastError(); + pn_list_add(acceptor->accepts, result); + pni_events_update(acceptor->listen_sock, acceptor->listen_sock->events | PN_READABLE); + } else { + acceptor->listen_sock->ops_in_progress++; + // This socket is equally involved in the async operation. + result->new_sock->ops_in_progress++; + } + } else { + iocpdesc_fail(acceptor->listen_sock, WSAGetLastError(), "create accept socket"); + } +} + +static void complete_accept(accept_result_t *result, HRESULT status) +{ + result->new_sock->ops_in_progress--; + iocpdesc_t *ld = result->base.iocpd; + if (ld->read_closed) { + if (!result->new_sock->closing) + pni_iocp_begin_close(result->new_sock); + free(result); // discard + reap_check(ld); + } else { + result->base.status = status; + pn_list_add(ld->acceptor->accepts, result); + pni_events_update(ld, ld->events | PN_READABLE); + } +} + +pn_socket_t pni_iocp_end_accept(iocpdesc_t *ld, sockaddr *addr, socklen_t *addrlen, bool *would_block, pn_error_t *error) +{ + if (!is_listener(ld)) { + set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP); + return INVALID_SOCKET; + } + if (ld->read_closed) { + set_iocp_error_status(error, PN_ERR, WSAENOTSOCK); + return INVALID_SOCKET; + } + if (pn_list_size(ld->acceptor->accepts) == 0) { + if (ld->events & PN_READABLE && ld->iocp->iocp_trace) + iocp_log("listen socket readable with no available accept completions\n"); + *would_block = true; + return INVALID_SOCKET; + } + + accept_result_t *result = (accept_result_t *) pn_list_get(ld->acceptor->accepts, 0); + pn_list_del(ld->acceptor->accepts, 0, 1); + if (!pn_list_size(ld->acceptor->accepts)) + pni_events_update(ld, ld->events & ~PN_READABLE); // No pending accepts + + pn_socket_t accept_sock; + if (result->base.status) { + accept_sock = INVALID_SOCKET; + pni_win32_error(ld->error, "accept failure", result->base.status); + if (ld->iocp->iocp_trace) + iocp_log("%s\n", pn_error_text(ld->error)); + // App never sees this socket so close it here. + pni_iocp_begin_close(result->new_sock); + } else { + accept_sock = result->new_sock->socket; + // AcceptEx special setsockopt: + setsockopt(accept_sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&ld->socket, + sizeof (SOCKET)); + if (addr && addrlen && *addrlen > 0) { + sockaddr_storage *local_addr = NULL; + sockaddr_storage *remote_addr = NULL; + int local_addrlen, remote_addrlen; + LPFN_GETACCEPTEXSOCKADDRS fn = ld->acceptor->fn_get_accept_ex_sockaddrs; + fn(result->address_buffer, 0, IOCP_SOCKADDRMAXLEN, IOCP_SOCKADDRMAXLEN, + (SOCKADDR **) &local_addr, &local_addrlen, (SOCKADDR **) &remote_addr, + &remote_addrlen); + *addrlen = pn_min(*addrlen, remote_addrlen); + memmove(addr, remote_addr, *addrlen); + } + } + + if (accept_sock != INVALID_SOCKET) { + // Connected. + result->new_sock->read_closed = false; + result->new_sock->write_closed = false; + } + + // Done with the completion result, so reuse it + result->new_sock = NULL; + begin_accept(ld->acceptor, result); + return accept_sock; +} + + +// === Async connect processing + +typedef struct { + iocp_result_t base; + char address_buffer[IOCP_SOCKADDRBUFLEN]; + struct addrinfo *addrinfo; +} connect_result_t; + +#define connect_result_initialize NULL +#define connect_result_compare NULL +#define connect_result_inspect NULL +#define connect_result_hashcode NULL + +static void connect_result_finalize(void *object) +{ + connect_result_t *result = (connect_result_t *) object; + // Do not release addrinfo until ConnectEx completes + if (result->addrinfo) + freeaddrinfo(result->addrinfo); +} + +static connect_result_t *connect_result(iocpdesc_t *iocpd, struct addrinfo *addr) { + static const pn_cid_t CID_connect_result = CID_pn_void; + static const pn_class_t clazz = PN_CLASS(connect_result); + connect_result_t *result = (connect_result_t *) pn_class_new(&clazz, sizeof(connect_result_t)); + if (result) { + memset(result, 0, sizeof(connect_result_t)); + result->base.type = IOCP_CONNECT; + result->base.iocpd = iocpd; + result->addrinfo = addr; + } + return result; +} + +pn_socket_t pni_iocp_begin_connect(iocp_t *iocp, pn_socket_t sock, struct addrinfo *addr, pn_error_t *error) +{ + // addr lives for the duration of the async connect. Caller has passed ownership here. + // See connect_result_finalize(). + // Use of Windows-specific ConnectEx() requires our socket to be "loosely" pre-bound: + sockaddr_storage sa; + memset(&sa, 0, sizeof(sa)); + sa.ss_family = addr->ai_family; + if (bind(sock, (SOCKADDR *) &sa, addr->ai_addrlen)) { + pni_win32_error(error, "begin async connection", WSAGetLastError()); + if (iocp->iocp_trace) + iocp_log("%s\n", pn_error_text(error)); + closesocket(sock); + freeaddrinfo(addr); + return INVALID_SOCKET; + } + + iocpdesc_t *iocpd = pni_iocpdesc_create(iocp, sock, false); + bind_to_completion_port(iocpd); + LPFN_CONNECTEX fn_connect_ex = lookup_connect_ex(iocpd->socket); + connect_result_t *result = connect_result(iocpd, addr); + DWORD unused; + bool success = fn_connect_ex(iocpd->socket, result->addrinfo->ai_addr, result->addrinfo->ai_addrlen, + NULL, 0, &unused, (LPOVERLAPPED) result); + if (!success && WSAGetLastError() != ERROR_IO_PENDING) { + pni_win32_error(error, "ConnectEx failure", WSAGetLastError()); + pn_free(result); + iocpd->write_closed = true; + iocpd->read_closed = true; + if (iocp->iocp_trace) + iocp_log("%s\n", pn_error_text(error)); + } else { + iocpd->ops_in_progress++; + } + return sock; +} + +static void complete_connect(connect_result_t *result, HRESULT status) +{ + iocpdesc_t *iocpd = result->base.iocpd; + if (iocpd->closing) { + pn_free(result); + reap_check(iocpd); + return; + } + + if (status) { + iocpdesc_fail(iocpd, status, "Connect failure"); + // Posix sets selectable events as follows: + pni_events_update(iocpd, PN_READABLE | PN_EXPIRED); + } else { + release_sys_sendbuf(iocpd->socket); + if (setsockopt(iocpd->socket, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0)) { + iocpdesc_fail(iocpd, WSAGetLastError(), "Internal connect failure (update context)"); + } else { + pni_events_update(iocpd, PN_WRITABLE); + start_reading(iocpd); + } + } + pn_free(result); + return; +} + + +// === Async writes + +static bool write_in_progress(iocpdesc_t *iocpd) +{ + return pni_write_pipeline_size(iocpd->pipeline) != 0; +} + +write_result_t *pni_write_result(iocpdesc_t *iocpd, const char *buf, size_t buflen) +{ + write_result_t *result = (write_result_t *) calloc(sizeof(write_result_t), 1); + if (result) { + result->base.type = IOCP_WRITE; + result->base.iocpd = iocpd; + result->buffer.start = buf; + result->buffer.size = buflen; + } + return result; +} + +static int submit_write(write_result_t *result, const void *buf, size_t len) +{ + WSABUF wsabuf; + wsabuf.buf = (char *) buf; + wsabuf.len = len; + memset(&result->base.overlapped, 0, sizeof (OVERLAPPED)); + return WSASend(result->base.iocpd->socket, &wsabuf, 1, NULL, 0, + (LPOVERLAPPED) result, 0); +} + +ssize_t pni_iocp_begin_write(iocpdesc_t *iocpd, const void *buf, size_t len, bool *would_block, pn_error_t *error) +{ + if (len == 0) return 0; + *would_block = false; + if (is_listener(iocpd)) { + set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP); + return INVALID_SOCKET; + } + if (iocpd->closing) { + set_iocp_error_status(error, PN_ERR, WSAESHUTDOWN); + return SOCKET_ERROR; + } + if (iocpd->write_closed) { + assert(pn_error_code(iocpd->error)); + pn_error_copy(error, iocpd->error); + if (iocpd->iocp->iocp_trace) + iocp_log("write error: %s\n", pn_error_text(error)); + return SOCKET_ERROR; + } + if (len == 0) return 0; + if (!(iocpd->events & PN_WRITABLE)) { + *would_block = true; + return SOCKET_ERROR; + } + + size_t written = 0; + size_t requested = len; + const char *outgoing = (const char *) buf; + size_t available = pni_write_pipeline_reserve(iocpd->pipeline, len); + if (!available) { + *would_block = true; + return SOCKET_ERROR; + } + + for (size_t wr_count = 0; wr_count < available; wr_count++) { + write_result_t *result = pni_write_pipeline_next(iocpd->pipeline); + assert(result); + result->base.iocpd = iocpd; + ssize_t actual_len = pn_min(len, result->buffer.size); + result->requested = actual_len; + memmove((void *)result->buffer.start, outgoing, actual_len); + outgoing += actual_len; + written += actual_len; + len -= actual_len; + + int werror = submit_write(result, result->buffer.start, actual_len); + if (werror && WSAGetLastError() != ERROR_IO_PENDING) { + pni_write_pipeline_return(iocpd->pipeline, result); + iocpdesc_fail(iocpd, WSAGetLastError(), "overlapped send"); + return SOCKET_ERROR; + } + iocpd->ops_in_progress++; + } + + if (!pni_write_pipeline_writable(iocpd->pipeline)) + pni_events_update(iocpd, iocpd->events & ~PN_WRITABLE); + return written; +} + +/* + * Note: iocp write completion is not "bytes on the wire", it is "peer + * acked the sent bytes". Completion can be seconds on a slow + * consuming peer. + */ +static void complete_write(write_result_t *result, DWORD xfer_count, HRESULT status) +{ + iocpdesc_t *iocpd = result->base.iocpd; + if (iocpd->closing) { + pni_write_pipeline_return(iocpd->pipeline, result); + if (!iocpd->write_closed && !write_in_progress(iocpd)) + iocp_shutdown(iocpd); + reap_check(iocpd); + return; + } + if (status == 0 && xfer_count > 0) { + if (xfer_count != result->requested) { + // Is this recoverable? How to preserve order if multiple overlapped writes? + pni_write_pipeline_return(iocpd->pipeline, result); + iocpdesc_fail(iocpd, WSA_OPERATION_ABORTED, "Partial overlapped write on socket"); + return; + } else { + // Success. + pni_write_pipeline_return(iocpd->pipeline, result); + if (pni_write_pipeline_writable(iocpd->pipeline)) + pni_events_update(iocpd, iocpd->events | PN_WRITABLE); + return; + } + } + // Other error + pni_write_pipeline_return(iocpd->pipeline, result); + if (status == WSAECONNABORTED || status == WSAECONNRESET || status == WSAENOTCONN + || status == ERROR_NETNAME_DELETED) { + iocpd->write_closed = true; + iocpd->poll_error = true; + pni_events_update(iocpd, iocpd->events & ~PN_WRITABLE); + pni_win32_error(iocpd->error, "Remote close or timeout", status); + } else { + iocpdesc_fail(iocpd, status, "IOCP async write error"); + } +} + + +// === Async reads + +struct read_result_t { + iocp_result_t base; + size_t drain_count; + char unused_buf[1]; +}; + +static read_result_t *read_result(iocpdesc_t *iocpd) +{ + read_result_t *result = (read_result_t *) calloc(sizeof(read_result_t), 1); + if (result) { + result->base.type = IOCP_READ; + result->base.iocpd = iocpd; + } + return result; +} + +static void begin_zero_byte_read(iocpdesc_t *iocpd) +{ + if (iocpd->read_in_progress) return; + if (iocpd->read_closed) { + pni_events_update(iocpd, iocpd->events | PN_READABLE); + return; + } + + read_result_t *result = iocpd->read_result; + memset(&result->base.overlapped, 0, sizeof (OVERLAPPED)); + DWORD flags = 0; + WSABUF wsabuf; + wsabuf.buf = result->unused_buf; + wsabuf.len = 0; + int rc = WSARecv(iocpd->socket, &wsabuf, 1, NULL, &flags, + &result->base.overlapped, 0); + if (rc && WSAGetLastError() != ERROR_IO_PENDING) { + iocpdesc_fail(iocpd, WSAGetLastError(), "IOCP read error"); + return; + } + iocpd->ops_in_progress++; + iocpd->read_in_progress = true; +} + +static void drain_until_closed(iocpdesc_t *iocpd) { + size_t max_drain = 16 * 1024; + char buf[512]; + read_result_t *result = iocpd->read_result; + while (result->drain_count < max_drain) { + int rv = recv(iocpd->socket, buf, 512, 0); + if (rv > 0) + result->drain_count += rv; + else if (rv == 0) { + iocpd->read_closed = true; + return; + } else if (WSAGetLastError() == WSAEWOULDBLOCK) { + // wait a little longer + start_reading(iocpd); + return; + } + else + break; + } + // Graceful close indication unlikely, force the issue + if (iocpd->iocp->iocp_trace) + if (result->drain_count >= max_drain) + iocp_log("graceful close on reader abandoned (too many chars)\n"); + else + iocp_log("graceful close on reader abandoned: %d\n", WSAGetLastError()); + iocpd->read_closed = true; +} + + +static void complete_read(read_result_t *result, DWORD xfer_count, HRESULT status) +{ + iocpdesc_t *iocpd = result->base.iocpd; + iocpd->read_in_progress = false; + + if (iocpd->closing) { + // Application no longer reading, but we are looking for a zero length read + if (!iocpd->read_closed) + drain_until_closed(iocpd); + reap_check(iocpd); + return; + } + + if (status == 0 && xfer_count == 0) { + // Success. + pni_events_update(iocpd, iocpd->events | PN_READABLE); + } else { + iocpdesc_fail(iocpd, status, "IOCP read complete error"); + } +} + +ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool *would_block, pn_error_t *error) +{ + if (size == 0) return 0; + *would_block = false; + if (is_listener(iocpd)) { + set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP); + return SOCKET_ERROR; + } + if (iocpd->closing) { + // Previous call to pn_close() + set_iocp_error_status(error, PN_ERR, WSAESHUTDOWN); + return SOCKET_ERROR; + } + if (iocpd->read_closed) { + if (pn_error_code(iocpd->error)) + pn_error_copy(error, iocpd->error); + else + set_iocp_error_status(error, PN_ERR, WSAENOTCONN); + return SOCKET_ERROR; + } + + int count = recv(iocpd->socket, (char *) buf, size, 0); + if (count > 0) { + pni_events_update(iocpd, iocpd->events & ~PN_READABLE); + begin_zero_byte_read(iocpd); + return (ssize_t) count; + } else if (count == 0) { + iocpd->read_closed = true; + return 0; + } + if (WSAGetLastError() == WSAEWOULDBLOCK) + *would_block = true; + else { + set_iocp_error_status(error, PN_ERR, WSAGetLastError()); + iocpd->read_closed = true; + } + return SOCKET_ERROR; +} + +static void start_reading(iocpdesc_t *iocpd) +{ + begin_zero_byte_read(iocpd); +} + + +// === The iocp descriptor + +static void pni_iocpdesc_initialize(void *object) +{ + iocpdesc_t *iocpd = (iocpdesc_t *) object; + memset(iocpd, 0, sizeof(iocpdesc_t)); + iocpd->socket = INVALID_SOCKET; +} + +static void pni_iocpdesc_finalize(void *object) +{ + iocpdesc_t *iocpd = (iocpdesc_t *) object; + pn_free(iocpd->acceptor); + pn_error_free(iocpd->error); + if (iocpd->pipeline) + if (write_in_progress(iocpd)) + iocp_log("iocp descriptor write leak\n"); + else + pn_free(iocpd->pipeline); + if (iocpd->read_in_progress) + iocp_log("iocp descriptor read leak\n"); + else + free(iocpd->read_result); +} + +static uintptr_t pni_iocpdesc_hashcode(void *object) +{ + iocpdesc_t *iocpd = (iocpdesc_t *) object; + return iocpd->socket; +} + +#define pni_iocpdesc_compare NULL +#define pni_iocpdesc_inspect NULL + +// Reference counted in the iocpdesc map, zombie_list, selector. +static iocpdesc_t *pni_iocpdesc(pn_socket_t s) +{ + static const pn_cid_t CID_pni_iocpdesc = CID_pn_void; + static pn_class_t clazz = PN_CLASS(pni_iocpdesc); + iocpdesc_t *iocpd = (iocpdesc_t *) pn_class_new(&clazz, sizeof(iocpdesc_t)); + assert(iocpd); + iocpd->socket = s; + return iocpd; +} + +static bool is_listener_socket(pn_socket_t s) +{ + BOOL tval = false; + int tvalsz = sizeof(tval); + int code = getsockopt(s, SOL_SOCKET, SO_ACCEPTCONN, (char *)&tval, &tvalsz); + return code == 0 && tval; +} + +iocpdesc_t *pni_iocpdesc_create(iocp_t *iocp, pn_socket_t s, bool external) { + assert (s != INVALID_SOCKET); + assert(!pni_iocpdesc_map_get(iocp, s)); + bool listening = is_listener_socket(s); + iocpdesc_t *iocpd = pni_iocpdesc(s); + iocpd->iocp = iocp; + if (iocpd) { + iocpd->external = external; + iocpd->error = pn_error(); + if (listening) { + iocpd->acceptor = pni_acceptor(iocpd); + } else { + iocpd->pipeline = pni_write_pipeline(iocpd); + iocpd->read_result = read_result(iocpd); + } + pni_iocpdesc_map_push(iocpd); + } + return iocpd; +} + +iocpdesc_t *pni_deadline_desc(iocp_t *iocp) { + // Non IO descriptor for selector deadlines. Do not add to iocpdesc map or + // zombie list. Selector responsible to free/decref object. + iocpdesc_t *iocpd = pni_iocpdesc(PN_INVALID_SOCKET); + iocpd->iocp = iocp; + iocpd->deadline_desc = true; + return iocpd; +} + +// === Fast lookup of a socket's iocpdesc_t + +iocpdesc_t *pni_iocpdesc_map_get(iocp_t *iocp, pn_socket_t s) { + iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_get(iocp->iocpdesc_map, s); + return iocpd; +} + +void pni_iocpdesc_map_push(iocpdesc_t *iocpd) { + pn_hash_put(iocpd->iocp->iocpdesc_map, iocpd->socket, iocpd); + pn_decref(iocpd); + assert(pn_refcount(iocpd) == 1); +} + +void pni_iocpdesc_map_del(iocp_t *iocp, pn_socket_t s) { + pn_hash_del(iocp->iocpdesc_map, (uintptr_t) s); +} + +static void bind_to_completion_port(iocpdesc_t *iocpd) +{ + if (iocpd->bound) return; + if (!iocpd->iocp->completion_port) { + iocpdesc_fail(iocpd, WSAEINVAL, "Incomplete setup, no completion port."); + return; + } + + if (CreateIoCompletionPort ((HANDLE) iocpd->socket, iocpd->iocp->completion_port, 0, 0)) + iocpd->bound = true; + else { + iocpdesc_fail(iocpd, GetLastError(), "IOCP socket setup."); + } +} + +static void release_sys_sendbuf(SOCKET s) +{ + // Set the socket's send buffer size to zero. + int sz = 0; + int status = setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char *)&sz, sizeof(int)); + assert(status == 0); +} + +void pni_iocpdesc_start(iocpdesc_t *iocpd) +{ + if (iocpd->bound) return; + bind_to_completion_port(iocpd); + if (is_listener(iocpd)) { + begin_accept(iocpd->acceptor, NULL); + } + else { + release_sys_sendbuf(iocpd->socket); + pni_events_update(iocpd, PN_WRITABLE); + start_reading(iocpd); + } +} + +static void complete(iocp_result_t *result, bool success, DWORD num_transferred) { + result->iocpd->ops_in_progress--; + DWORD status = success ? 0 : GetLastError(); + + switch (result->type) { + case IOCP_ACCEPT: + complete_accept((accept_result_t *) result, status); + break; + case IOCP_CONNECT: + complete_connect((connect_result_t *) result, status); + break; + case IOCP_WRITE: + complete_write((write_result_t *) result, num_transferred, status); + break; + case IOCP_READ: + complete_read((read_result_t *) result, num_transferred, status); + break; + default: + assert(false); + } +} + +void pni_iocp_drain_completions(iocp_t *iocp) +{ + while (true) { + DWORD timeout_ms = 0; + DWORD num_transferred = 0; + ULONG_PTR completion_key = 0; + OVERLAPPED *overlapped = 0; + + bool good_op = GetQueuedCompletionStatus (iocp->completion_port, &num_transferred, + &completion_key, &overlapped, timeout_ms); + if (!overlapped) + return; // timed out + iocp_result_t *result = (iocp_result_t *) overlapped; + complete(result, good_op, num_transferred); + } +} + +// returns: -1 on error, 0 on timeout, 1 successful completion +int pni_iocp_wait_one(iocp_t *iocp, int timeout, pn_error_t *error) { + DWORD win_timeout = (timeout < 0) ? INFINITE : (DWORD) timeout; + DWORD num_transferred = 0; + ULONG_PTR completion_key = 0; + OVERLAPPED *overlapped = 0; + + bool good_op = GetQueuedCompletionStatus (iocp->completion_port, &num_transferred, + &completion_key, &overlapped, win_timeout); + if (!overlapped) + if (GetLastError() == WAIT_TIMEOUT) + return 0; + else { + if (error) + pni_win32_error(error, "GetQueuedCompletionStatus", GetLastError()); + return -1; + } + + iocp_result_t *result = (iocp_result_t *) overlapped; + complete(result, good_op, num_transferred); + return 1; +} + +// === Close (graceful and otherwise) + +// zombie_list is for sockets transitioning out of iocp on their way to zero ops_in_progress +// and fully closed. + +static void zombie_list_add(iocpdesc_t *iocpd) +{ + assert(iocpd->closing); + if (!iocpd->ops_in_progress) { + // No need to make a zombie. + if (iocpd->socket != INVALID_SOCKET) { + closesocket(iocpd->socket); + iocpd->socket = INVALID_SOCKET; + iocpd->read_closed = true; + } + return; + } + // Allow 2 seconds for graceful shutdown before releasing socket resource. + iocpd->reap_time = pn_i_now() + 2000; + pn_list_add(iocpd->iocp->zombie_list, iocpd); +} + +static void reap_check(iocpdesc_t *iocpd) +{ + if (iocpd->closing && !iocpd->ops_in_progress) { + if (iocpd->socket != INVALID_SOCKET) { + closesocket(iocpd->socket); + iocpd->socket = INVALID_SOCKET; + } + pn_list_remove(iocpd->iocp->zombie_list, iocpd); + // iocpd is decref'ed and possibly released + } +} + +pn_timestamp_t pni_zombie_deadline(iocp_t *iocp) +{ + if (pn_list_size(iocp->zombie_list)) { + iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, 0); + return iocpd->reap_time; + } + return 0; +} + +void pni_zombie_check(iocp_t *iocp, pn_timestamp_t now) +{ + pn_list_t *zl = iocp->zombie_list; + // Look for stale zombies that should have been reaped by "now" + for (size_t idx = 0; idx < pn_list_size(zl); idx++) { + iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(zl, idx); + if (iocpd->reap_time > now) + return; + if (iocpd->socket == INVALID_SOCKET) + continue; + assert(iocpd->ops_in_progress > 0); + if (iocp->iocp_trace) + iocp_log("async close: graceful close timeout exceeded\n"); + closesocket(iocpd->socket); + iocpd->socket = INVALID_SOCKET; + iocpd->read_closed = true; + // outstanding ops should complete immediately now + } +} + +static void drain_zombie_completions(iocp_t *iocp) +{ + // No more pn_selector_select() from App, but zombies still need care and feeding + // until their outstanding async actions complete. + pni_iocp_drain_completions(iocp); + + // Discard any that have no pending async IO + size_t sz = pn_list_size(iocp->zombie_list); + for (size_t idx = 0; idx < sz;) { + iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, idx); + if (!iocpd->ops_in_progress) { + pn_list_del(iocp->zombie_list, idx, 1); + sz--; + } else { + idx++; + } + } + + unsigned shutdown_grace = 2000; + char *override = getenv("PN_SHUTDOWN_GRACE"); + if (override) { + int grace = atoi(override); + if (grace > 0 && grace < 60000) + shutdown_grace = (unsigned) grace; + } + pn_timestamp_t now = pn_i_now(); + pn_timestamp_t deadline = now + shutdown_grace; + + while (pn_list_size(iocp->zombie_list)) { + if (now >= deadline) + break; + int rv = pni_iocp_wait_one(iocp, deadline - now, NULL); + if (rv < 0) { + iocp_log("unexpected IOCP failure on Proton IO shutdown %d\n", GetLastError()); + break; + } + now = pn_i_now(); + } + if (now >= deadline && pn_list_size(iocp->zombie_list) && iocp->iocp_trace) + // Should only happen if really slow TCP handshakes, i.e. total network failure + iocp_log("network failure on Proton shutdown\n"); +} + +static pn_list_t *iocp_map_close_all(iocp_t *iocp) +{ + // Zombify stragglers, i.e. no pn_close() from the application. + pn_list_t *externals = pn_list(PN_OBJECT, 0); + for (pn_handle_t entry = pn_hash_head(iocp->iocpdesc_map); entry; + entry = pn_hash_next(iocp->iocpdesc_map, entry)) { + iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_value(iocp->iocpdesc_map, entry); + // Just listeners first. + if (is_listener(iocpd)) { + if (iocpd->external) { + // Owned by application, just keep a temporary reference to it. + // iocp_result_t structs must not be free'd until completed or + // the completion port is closed. + if (iocpd->ops_in_progress) + pn_list_add(externals, iocpd); + pni_iocpdesc_map_del(iocp, iocpd->socket); + } else { + // Make it a zombie. + pni_iocp_begin_close(iocpd); + } + } + } + pni_iocp_drain_completions(iocp); + + for (pn_handle_t entry = pn_hash_head(iocp->iocpdesc_map); entry; + entry = pn_hash_next(iocp->iocpdesc_map, entry)) { + iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_value(iocp->iocpdesc_map, entry); + if (iocpd->external) { + iocpd->read_closed = true; // Do not consume from read side + iocpd->write_closed = true; // Do not shutdown write side + if (iocpd->ops_in_progress) + pn_list_add(externals, iocpd); + pni_iocpdesc_map_del(iocp, iocpd->socket); + } else { + // Make it a zombie. + pni_iocp_begin_close(iocpd); + } + } + return externals; +} + +static void zombie_list_hard_close_all(iocp_t *iocp) +{ + pni_iocp_drain_completions(iocp); + size_t zs = pn_list_size(iocp->zombie_list); + for (size_t i = 0; i < zs; i++) { + iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, i); + if (iocpd->socket != INVALID_SOCKET) { + closesocket(iocpd->socket); + iocpd->socket = INVALID_SOCKET; + iocpd->read_closed = true; + iocpd->write_closed = true; + } + } + pni_iocp_drain_completions(iocp); + + // Zombies should be all gone. Do a sanity check. + zs = pn_list_size(iocp->zombie_list); + int remaining = 0; + int ops = 0; + for (size_t i = 0; i < zs; i++) { + iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, i); + remaining++; + ops += iocpd->ops_in_progress; + } + if (remaining) + iocp_log("Proton: %d unfinished close operations (ops count = %d)\n", remaining, ops); +} + +static void iocp_shutdown(iocpdesc_t *iocpd) +{ + if (iocpd->socket == PN_INVALID_SOCKET) + return; // Hard close in progress + if (shutdown(iocpd->socket, SD_SEND)) { + int err = WSAGetLastError(); + if (err != WSAECONNABORTED && err != WSAECONNRESET && err != WSAENOTCONN) + if (iocpd->iocp->iocp_trace) + iocp_log("socket shutdown failed %d\n", err); + } + iocpd->write_closed = true; +} + +void pni_iocp_begin_close(iocpdesc_t *iocpd) +{ + assert (!iocpd->closing); + if (is_listener(iocpd)) { + // Listening socket is easy. Close the socket which will cancel async ops. + pn_socket_t old_sock = iocpd->socket; + iocpd->socket = INVALID_SOCKET; + iocpd->closing = true; + iocpd->read_closed = true; + iocpd->write_closed = true; + closesocket(old_sock); + // Pending accepts will now complete. Zombie can die when all consumed. + zombie_list_add(iocpd); + pni_iocpdesc_map_del(iocpd->iocp, old_sock); // may pn_free *iocpd + } else { + // Continue async operation looking for graceful close confirmation or timeout. + pn_socket_t old_sock = iocpd->socket; + iocpd->closing = true; + if (!iocpd->write_closed && !write_in_progress(iocpd)) + iocp_shutdown(iocpd); + zombie_list_add(iocpd); + pni_iocpdesc_map_del(iocpd->iocp, old_sock); // may pn_free *iocpd + } +} + + +// === iocp_t + +#define pni_iocp_hashcode NULL +#define pni_iocp_compare NULL +#define pni_iocp_inspect NULL + +void pni_iocp_initialize(void *obj) +{ + iocp_t *iocp = (iocp_t *) obj; + memset(iocp, 0, sizeof(iocp_t)); + pni_shared_pool_create(iocp); + iocp->completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); + assert(iocp->completion_port != NULL); + iocp->iocpdesc_map = pn_hash(PN_OBJECT, 0, 0.75); + iocp->zombie_list = pn_list(PN_OBJECT, 0); + iocp->iocp_trace = pn_env_bool("PN_TRACE_DRV"); + iocp->selector = NULL; +} + +void pni_iocp_finalize(void *obj) +{ + iocp_t *iocp = (iocp_t *) obj; + // Move sockets to closed state, except external sockets. + pn_list_t *externals = iocp_map_close_all(iocp); + // Now everything with ops_in_progress is in the zombie_list or the externals list. + assert(!pn_hash_head(iocp->iocpdesc_map)); + pn_free(iocp->iocpdesc_map); + + drain_zombie_completions(iocp); // Last chance for graceful close + zombie_list_hard_close_all(iocp); + CloseHandle(iocp->completion_port); // This cancels all our async ops + iocp->completion_port = NULL; + + if (pn_list_size(externals) && iocp->iocp_trace) + iocp_log("%d external sockets not closed and removed from Proton IOCP control\n", pn_list_size(externals)); + + // Now safe to free everything that might be touched by a former async operation. + pn_free(externals); + pn_free(iocp->zombie_list); + pni_shared_pool_free(iocp); +} + +iocp_t *pni_iocp() +{ + static const pn_cid_t CID_pni_iocp = CID_pn_void; + static const pn_class_t clazz = PN_CLASS(pni_iocp); + iocp_t *iocp = (iocp_t *) pn_class_new(&clazz, sizeof(iocp_t)); + return iocp; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/reactor/io/windows/iocp.h ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/io/windows/iocp.h b/proton-c/src/reactor/io/windows/iocp.h new file mode 100644 index 0000000..07f47be --- /dev/null +++ b/proton-c/src/reactor/io/windows/iocp.h @@ -0,0 +1,136 @@ +#ifndef PROTON_SRC_IOCP_H +#define PROTON_SRC_IOCP_H 1 + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/import_export.h> +#include <proton/selectable.h> +#include <proton/type_compat.h> + +typedef struct pni_acceptor_t pni_acceptor_t; +typedef struct write_result_t write_result_t; +typedef struct read_result_t read_result_t; +typedef struct write_pipeline_t write_pipeline_t; +typedef struct iocpdesc_t iocpdesc_t; + + +// One per pn_io_t. + +struct iocp_t { + HANDLE completion_port; + pn_hash_t *iocpdesc_map; + pn_list_t *zombie_list; + int shared_pool_size; + char *shared_pool_memory; + write_result_t **shared_results; + write_result_t **available_results; + size_t shared_available_count; + size_t writer_count; + int loopback_bufsize; + bool iocp_trace; + pn_selector_t *selector; +}; + + +// One for each socket. +// This iocpdesc_t structure is ref counted by the iocpdesc_map, zombie_list, +// selector->iocp_descriptors list. It should remain ref counted in the +// zombie_list until ops_in_progress == 0 or the completion port is closed. + +struct iocpdesc_t { + pn_socket_t socket; + iocp_t *iocp; + pni_acceptor_t *acceptor; + pn_error_t *error; + int ops_in_progress; + bool read_in_progress; + write_pipeline_t *pipeline; + read_result_t *read_result; + bool external; // true if socket set up outside Proton + bool bound; // associted with the completion port + bool closing; // pn_close called by application + bool read_closed; // EOF or read error + bool write_closed; // shutdown sent or write error + bool poll_error; // flag posix-like POLLERR/POLLHUP/POLLNVAL + bool deadline_desc; // Socket-less deadline descriptor for selectors + pn_selector_t *selector; + pn_selectable_t *selectable; + int events; + int interests; + pn_timestamp_t deadline; + iocpdesc_t *triggered_list_next; + iocpdesc_t *triggered_list_prev; + iocpdesc_t *deadlines_next; + iocpdesc_t *deadlines_prev; + pn_timestamp_t reap_time;; +}; + +typedef enum { IOCP_ACCEPT, IOCP_CONNECT, IOCP_READ, IOCP_WRITE } iocp_type_t; + +typedef struct { + OVERLAPPED overlapped; + iocp_type_t type; + iocpdesc_t *iocpd; + HRESULT status; +} iocp_result_t; + +struct write_result_t { + iocp_result_t base; + size_t requested; + bool in_use; + pn_bytes_t buffer; +}; + +iocpdesc_t *pni_iocpdesc_create(iocp_t *, pn_socket_t s, bool external); +iocpdesc_t *pni_iocpdesc_map_get(iocp_t *, pn_socket_t s); +iocpdesc_t *pni_deadline_desc(iocp_t *); +void pni_iocpdesc_map_del(iocp_t *, pn_socket_t s); +void pni_iocpdesc_map_push(iocpdesc_t *iocpd); +void pni_iocpdesc_start(iocpdesc_t *iocpd); +void pni_iocp_drain_completions(iocp_t *); +int pni_iocp_wait_one(iocp_t *, int timeout, pn_error_t *); +void pni_iocp_start_accepting(iocpdesc_t *iocpd); +pn_socket_t pni_iocp_end_accept(iocpdesc_t *ld, sockaddr *addr, socklen_t *addrlen, bool *would_block, pn_error_t *error); +pn_socket_t pni_iocp_begin_connect(iocp_t *, pn_socket_t sock, struct addrinfo *addr, pn_error_t *error); +ssize_t pni_iocp_begin_write(iocpdesc_t *, const void *, size_t, bool *, pn_error_t *); +ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool *would_block, pn_error_t *error); +void pni_iocp_begin_close(iocpdesc_t *iocpd); +iocp_t *pni_iocp(); + +void pni_events_update(iocpdesc_t *iocpd, int events); +write_result_t *pni_write_result(iocpdesc_t *iocpd, const char *buf, size_t buflen); +write_pipeline_t *pni_write_pipeline(iocpdesc_t *iocpd); +size_t pni_write_pipeline_size(write_pipeline_t *); +bool pni_write_pipeline_writable(write_pipeline_t *); +void pni_write_pipeline_return(write_pipeline_t *, write_result_t *); +size_t pni_write_pipeline_reserve(write_pipeline_t *, size_t); +write_result_t *pni_write_pipeline_next(write_pipeline_t *); +void pni_shared_pool_create(iocp_t *); +void pni_shared_pool_free(iocp_t *); +void pni_zombie_check(iocp_t *, pn_timestamp_t); +pn_timestamp_t pni_zombie_deadline(iocp_t *); + +pn_selector_t *pni_selector_create(iocp_t *iocp); + +int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code); + +#endif /* iocp.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/reactor/io/windows/selector.c ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/io/windows/selector.c b/proton-c/src/reactor/io/windows/selector.c new file mode 100644 index 0000000..15da73b --- /dev/null +++ b/proton-c/src/reactor/io/windows/selector.c @@ -0,0 +1,384 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0501 +#endif +#if _WIN32_WINNT < 0x0501 +#error "Proton requires Windows API support for XP or later." +#endif +#include <winsock2.h> +#include <Ws2tcpip.h> + +#include "reactor/io.h" +#include "reactor/selectable.h" +#include "reactor/selector.h" + +#include "iocp.h" +#include "platform/platform.h" +#include "core/util.h" + +#include <proton/object.h> +#include <proton/error.h> +#include <assert.h> + +static void interests_update(iocpdesc_t *iocpd, int interests); +static void deadlines_update(iocpdesc_t *iocpd, pn_timestamp_t t); + +struct pn_selector_t { + iocp_t *iocp; + pn_list_t *selectables; + pn_list_t *iocp_descriptors; + size_t current; + iocpdesc_t *current_triggered; + pn_timestamp_t awoken; + pn_error_t *error; + iocpdesc_t *triggered_list_head; + iocpdesc_t *triggered_list_tail; + iocpdesc_t *deadlines_head; + iocpdesc_t *deadlines_tail; +}; + +void pn_selector_initialize(void *obj) +{ + pn_selector_t *selector = (pn_selector_t *) obj; + selector->iocp = NULL; + selector->selectables = pn_list(PN_WEAKREF, 0); + selector->iocp_descriptors = pn_list(PN_OBJECT, 0); + selector->current = 0; + selector->current_triggered = NULL; + selector->awoken = 0; + selector->error = pn_error(); + selector->triggered_list_head = NULL; + selector->triggered_list_tail = NULL; + selector->deadlines_head = NULL; + selector->deadlines_tail = NULL; +} + +void pn_selector_finalize(void *obj) +{ + pn_selector_t *selector = (pn_selector_t *) obj; + pn_free(selector->selectables); + pn_free(selector->iocp_descriptors); + pn_error_free(selector->error); + selector->iocp->selector = NULL; +} + +#define pn_selector_hashcode NULL +#define pn_selector_compare NULL +#define pn_selector_inspect NULL + +pn_selector_t *pni_selector() +{ + static const pn_class_t clazz = PN_CLASS(pn_selector); + pn_selector_t *selector = (pn_selector_t *) pn_class_new(&clazz, sizeof(pn_selector_t)); + return selector; +} + +pn_selector_t *pni_selector_create(iocp_t *iocp) +{ + pn_selector_t *selector = pni_selector(); + selector->iocp = iocp; + return selector; +} + +void pn_selector_add(pn_selector_t *selector, pn_selectable_t *selectable) +{ + assert(selector); + assert(selectable); + assert(pni_selectable_get_index(selectable) < 0); + pn_socket_t sock = pn_selectable_get_fd(selectable); + iocpdesc_t *iocpd = NULL; + + if (pni_selectable_get_index(selectable) < 0) { + pn_list_add(selector->selectables, selectable); + pn_list_add(selector->iocp_descriptors, NULL); + size_t size = pn_list_size(selector->selectables); + pni_selectable_set_index(selectable, size - 1); + } + + pn_selector_update(selector, selectable); +} + +void pn_selector_update(pn_selector_t *selector, pn_selectable_t *selectable) +{ + // A selectable's fd may switch from PN_INVALID_SCOKET to a working socket between + // update calls. If a selectable without a valid socket has a deadline, we need + // a dummy iocpdesc_t to participate in the deadlines list. + int idx = pni_selectable_get_index(selectable); + assert(idx >= 0); + pn_timestamp_t deadline = pn_selectable_get_deadline(selectable); + pn_socket_t sock = pn_selectable_get_fd(selectable); + iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(selector->iocp_descriptors, idx); + + if (!iocpd && deadline && sock == PN_INVALID_SOCKET) { + iocpd = pni_deadline_desc(selector->iocp); + assert(iocpd); + pn_list_set(selector->iocp_descriptors, idx, iocpd); + pn_decref(iocpd); // life is solely tied to iocp_descriptors list + iocpd->selector = selector; + iocpd->selectable = selectable; + } + else if (iocpd && iocpd->deadline_desc && sock != PN_INVALID_SOCKET) { + // Switching to a real socket. Stop using a deadline descriptor. + deadlines_update(iocpd, 0); + // decref descriptor in list and pick up a real iocpd below + pn_list_set(selector->iocp_descriptors, idx, NULL); + iocpd = NULL; + } + + // The selectables socket may be set long after it has been added + if (!iocpd && sock != PN_INVALID_SOCKET) { + iocpd = pni_iocpdesc_map_get(selector->iocp, sock); + if (!iocpd) { + // Socket created outside proton. Hook it up to iocp. + iocpd = pni_iocpdesc_create(selector->iocp, sock, true); + assert(iocpd); + if (iocpd) + pni_iocpdesc_start(iocpd); + } + if (iocpd) { + pn_list_set(selector->iocp_descriptors, idx, iocpd); + iocpd->selector = selector; + iocpd->selectable = selectable; + } + } + + if (iocpd) { + assert(sock == iocpd->socket || iocpd->closing); + int interests = PN_ERROR; // Always + if (pn_selectable_is_reading(selectable)) { + interests |= PN_READABLE; + } + if (pn_selectable_is_writing(selectable)) { + interests |= PN_WRITABLE; + } + if (deadline) { + interests |= PN_EXPIRED; + } + interests_update(iocpd, interests); + deadlines_update(iocpd, deadline); + } +} + +void pn_selector_remove(pn_selector_t *selector, pn_selectable_t *selectable) +{ + assert(selector); + assert(selectable); + + int idx = pni_selectable_get_index(selectable); + assert(idx >= 0); + iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(selector->iocp_descriptors, idx); + if (iocpd) { + if (selector->current_triggered == iocpd) + selector->current_triggered = iocpd->triggered_list_next; + interests_update(iocpd, 0); + deadlines_update(iocpd, 0); + assert(selector->triggered_list_head != iocpd && !iocpd->triggered_list_prev); + assert(selector->deadlines_head != iocpd && !iocpd->deadlines_prev); + iocpd->selector = NULL; + iocpd->selectable = NULL; + } + pn_list_del(selector->selectables, idx, 1); + pn_list_del(selector->iocp_descriptors, idx, 1); + size_t size = pn_list_size(selector->selectables); + for (size_t i = idx; i < size; i++) { + pn_selectable_t *sel = (pn_selectable_t *) pn_list_get(selector->selectables, i); + pni_selectable_set_index(sel, i); + } + + pni_selectable_set_index(selectable, -1); + + if (selector->current >= (size_t) idx) { + selector->current--; + } +} + +size_t pn_selector_size(pn_selector_t *selector) { + assert(selector); + return pn_list_size(selector->selectables); +} + +int pn_selector_select(pn_selector_t *selector, int timeout) +{ + assert(selector); + pn_error_clear(selector->error); + pn_timestamp_t deadline = 0; + pn_timestamp_t now = pn_i_now(); + + if (timeout) { + if (selector->deadlines_head) + deadline = selector->deadlines_head->deadline; + } + if (deadline) { + int64_t delta = deadline - now; + if (delta < 0) { + delta = 0; + } + if (timeout < 0) + timeout = delta; + else if (timeout > delta) + timeout = delta; + } + deadline = (timeout >= 0) ? now + timeout : 0; + + // Process all currently available completions, even if matched events available + pni_iocp_drain_completions(selector->iocp); + pni_zombie_check(selector->iocp, now); + // Loop until an interested event is matched, or until deadline + while (true) { + if (selector->triggered_list_head) + break; + if (deadline && deadline <= now) + break; + pn_timestamp_t completion_deadline = deadline; + pn_timestamp_t zd = pni_zombie_deadline(selector->iocp); + if (zd) + completion_deadline = completion_deadline ? pn_min(zd, completion_deadline) : zd; + + int completion_timeout = (!completion_deadline) ? -1 : completion_deadline - now; + int rv = pni_iocp_wait_one(selector->iocp, completion_timeout, selector->error); + if (rv < 0) + return pn_error_code(selector->error); + + now = pn_i_now(); + if (zd && zd <= now) { + pni_zombie_check(selector->iocp, now); + } + } + + selector->current = 0; + selector->awoken = now; + for (iocpdesc_t *iocpd = selector->deadlines_head; iocpd; iocpd = iocpd->deadlines_next) { + if (iocpd->deadline <= now) + pni_events_update(iocpd, iocpd->events | PN_EXPIRED); + else + break; + } + selector->current_triggered = selector->triggered_list_head; + return pn_error_code(selector->error); +} + +pn_selectable_t *pn_selector_next(pn_selector_t *selector, int *events) +{ + if (selector->current_triggered) { + iocpdesc_t *iocpd = selector->current_triggered; + *events = iocpd->interests & iocpd->events; + selector->current_triggered = iocpd->triggered_list_next; + return iocpd->selectable; + } + return NULL; +} + +void pn_selector_free(pn_selector_t *selector) +{ + assert(selector); + pn_free(selector); +} + + +static void triggered_list_add(pn_selector_t *selector, iocpdesc_t *iocpd) +{ + if (iocpd->triggered_list_prev || selector->triggered_list_head == iocpd) + return; // already in list + LL_ADD(selector, triggered_list, iocpd); +} + +static void triggered_list_remove(pn_selector_t *selector, iocpdesc_t *iocpd) +{ + if (!iocpd->triggered_list_prev && selector->triggered_list_head != iocpd) + return; // not in list + LL_REMOVE(selector, triggered_list, iocpd); + iocpd->triggered_list_prev = NULL; + iocpd->triggered_list_next = NULL; +} + + +void pni_events_update(iocpdesc_t *iocpd, int events) +{ + // If set, a poll error is permanent + if (iocpd->poll_error) + events |= PN_ERROR; + if (iocpd->events == events) + return; + iocpd->events = events; + if (iocpd->selector) { + if (iocpd->events & iocpd->interests) + triggered_list_add(iocpd->selector, iocpd); + else + triggered_list_remove(iocpd->selector, iocpd); + } +} + +static void interests_update(iocpdesc_t *iocpd, int interests) +{ + int old_interests = iocpd->interests; + if (old_interests == interests) + return; + iocpd->interests = interests; + if (iocpd->selector) { + if (iocpd->events & iocpd->interests) + triggered_list_add(iocpd->selector, iocpd); + else + triggered_list_remove(iocpd->selector, iocpd); + } +} + +static void deadlines_remove(pn_selector_t *selector, iocpdesc_t *iocpd) +{ + if (!iocpd->deadlines_prev && selector->deadlines_head != iocpd) + return; // not in list + LL_REMOVE(selector, deadlines, iocpd); + iocpd->deadlines_prev = NULL; + iocpd->deadlines_next = NULL; +} + + +static void deadlines_update(iocpdesc_t *iocpd, pn_timestamp_t deadline) +{ + if (deadline == iocpd->deadline) + return; + + iocpd->deadline = deadline; + pn_selector_t *selector = iocpd->selector; + if (!deadline) { + deadlines_remove(selector, iocpd); + pni_events_update(iocpd, iocpd->events & ~PN_EXPIRED); + } else { + if (iocpd->deadlines_prev || selector->deadlines_head == iocpd) { + deadlines_remove(selector, iocpd); + pni_events_update(iocpd, iocpd->events & ~PN_EXPIRED); + } + iocpdesc_t *dl_iocpd = LL_HEAD(selector, deadlines); + while (dl_iocpd && dl_iocpd->deadline <= deadline) + dl_iocpd = dl_iocpd->deadlines_next; + if (dl_iocpd) { + // insert + iocpd->deadlines_prev = dl_iocpd->deadlines_prev; + iocpd->deadlines_next = dl_iocpd; + dl_iocpd->deadlines_prev = iocpd; + if (selector->deadlines_head == dl_iocpd) + selector->deadlines_head = iocpd; + } else { + LL_ADD(selector, deadlines, iocpd); // append + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/reactor/io/windows/write_pipeline.c ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/io/windows/write_pipeline.c b/proton-c/src/reactor/io/windows/write_pipeline.c new file mode 100644 index 0000000..905c7f6 --- /dev/null +++ b/proton-c/src/reactor/io/windows/write_pipeline.c @@ -0,0 +1,314 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * A simple write buffer pool. Each socket has a dedicated "primary" + * buffer and can borrow from a shared pool with limited size tuning. + * Could enhance e.g. with separate pools per network interface and fancier + * memory tuning based on interface speed, system resources, and + * number of connections, etc. + */ + +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0501 +#endif +#if _WIN32_WINNT < 0x0501 +#error "Proton requires Windows API support for XP or later." +#endif +#include <winsock2.h> +#include <Ws2tcpip.h> + +#include "reactor/io.h" +#include "reactor/selector.h" +#include "reactor/selectable.h" + +#include "iocp.h" +#include "core/util.h" + +#include <proton/error.h> +#include <proton/object.h> + +#include <assert.h> + +// Max overlapped writes per socket +#define IOCP_MAX_OWRITES 16 +// Write buffer size +#define IOCP_WBUFSIZE 16384 + +static void pipeline_log(const char *fmt, ...) +{ + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fflush(stderr); +} + +void pni_shared_pool_create(iocp_t *iocp) +{ + // TODO: more pools (or larger one) when using multiple non-loopback interfaces + iocp->shared_pool_size = 16; + char *env = getenv("PNI_WRITE_BUFFERS"); // Internal: for debugging + if (env) { + int sz = atoi(env); + if (sz >= 0 && sz < 256) { + iocp->shared_pool_size = sz; + } + } + iocp->loopback_bufsize = 0; + env = getenv("PNI_LB_BUFSIZE"); // Internal: for debugging + if (env) { + int sz = atoi(env); + if (sz >= 0 && sz <= 128 * 1024) { + iocp->loopback_bufsize = sz; + } + } + + if (iocp->shared_pool_size) { + iocp->shared_pool_memory = (char *) VirtualAlloc(NULL, IOCP_WBUFSIZE * iocp->shared_pool_size, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE); + HRESULT status = GetLastError(); + if (!iocp->shared_pool_memory) { + perror("Proton write buffer pool allocation failure\n"); + iocp->shared_pool_size = 0; + iocp->shared_available_count = 0; + return; + } + + iocp->shared_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *)); + iocp->available_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *)); + iocp->shared_available_count = iocp->shared_pool_size; + char *mem = iocp->shared_pool_memory; + for (int i = 0; i < iocp->shared_pool_size; i++) { + iocp->shared_results[i] = iocp->available_results[i] = pni_write_result(NULL, mem, IOCP_WBUFSIZE); + mem += IOCP_WBUFSIZE; + } + } +} + +void pni_shared_pool_free(iocp_t *iocp) +{ + for (int i = 0; i < iocp->shared_pool_size; i++) { + write_result_t *result = iocp->shared_results[i]; + if (result->in_use) + pipeline_log("Proton buffer pool leak\n"); + else + free(result); + } + if (iocp->shared_pool_size) { + free(iocp->shared_results); + free(iocp->available_results); + if (iocp->shared_pool_memory) { + if (!VirtualFree(iocp->shared_pool_memory, 0, MEM_RELEASE)) { + perror("write buffers release failed"); + } + iocp->shared_pool_memory = NULL; + } + } +} + +static void shared_pool_push(write_result_t *result) +{ + iocp_t *iocp = result->base.iocpd->iocp; + assert(iocp->shared_available_count < iocp->shared_pool_size); + iocp->available_results[iocp->shared_available_count++] = result; +} + +static write_result_t *shared_pool_pop(iocp_t *iocp) +{ + return iocp->shared_available_count ? iocp->available_results[--iocp->shared_available_count] : NULL; +} + +struct write_pipeline_t { + iocpdesc_t *iocpd; + size_t pending_count; + write_result_t *primary; + size_t reserved_count; + size_t next_primary_index; + size_t depth; + bool is_writer; +}; + +#define write_pipeline_compare NULL +#define write_pipeline_inspect NULL +#define write_pipeline_hashcode NULL + +static void write_pipeline_initialize(void *object) +{ + write_pipeline_t *pl = (write_pipeline_t *) object; + pl->pending_count = 0; + const char *pribuf = (const char *) malloc(IOCP_WBUFSIZE); + pl->primary = pni_write_result(NULL, pribuf, IOCP_WBUFSIZE); + pl->depth = 0; + pl->is_writer = false; +} + +static void write_pipeline_finalize(void *object) +{ + write_pipeline_t *pl = (write_pipeline_t *) object; + free((void *)pl->primary->buffer.start); + free(pl->primary); +} + +write_pipeline_t *pni_write_pipeline(iocpdesc_t *iocpd) +{ + static const pn_cid_t CID_write_pipeline = CID_pn_void; + static const pn_class_t clazz = PN_CLASS(write_pipeline); + write_pipeline_t *pipeline = (write_pipeline_t *) pn_class_new(&clazz, sizeof(write_pipeline_t)); + pipeline->iocpd = iocpd; + pipeline->primary->base.iocpd = iocpd; + return pipeline; +} + +static void confirm_as_writer(write_pipeline_t *pl) +{ + if (!pl->is_writer) { + iocp_t *iocp = pl->iocpd->iocp; + iocp->writer_count++; + pl->is_writer = true; + } +} + +static void remove_as_writer(write_pipeline_t *pl) +{ + if (!pl->is_writer) + return; + iocp_t *iocp = pl->iocpd->iocp; + assert(iocp->writer_count); + pl->is_writer = false; + iocp->writer_count--; +} + +/* + * Optimal depth will depend on properties of the NIC, server, and driver. For now, + * just distinguish between loopback interfaces and the rest. Optimizations in the + * loopback stack allow decent performance with depth 1 and actually cause major + * performance hiccups if set to large values. + */ +static void set_depth(write_pipeline_t *pl) +{ + pl->depth = 1; + sockaddr_storage sa; + socklen_t salen = sizeof(sa); + char buf[INET6_ADDRSTRLEN]; + DWORD buflen = sizeof(buf); + + if (getsockname(pl->iocpd->socket,(sockaddr*) &sa, &salen) == 0 && + getnameinfo((sockaddr*) &sa, salen, buf, buflen, NULL, 0, NI_NUMERICHOST) == 0) { + if ((sa.ss_family == AF_INET6 && strcmp(buf, "::1")) || + (sa.ss_family == AF_INET && strncmp(buf, "127.", 4))) { + // not loopback + pl->depth = IOCP_MAX_OWRITES; + } else { + iocp_t *iocp = pl->iocpd->iocp; + if (iocp->loopback_bufsize) { + const char *p = (const char *) realloc((void *) pl->primary->buffer.start, iocp->loopback_bufsize); + if (p) { + pl->primary->buffer.start = p; + pl->primary->buffer.size = iocp->loopback_bufsize; + } + } + } + } +} + +// Reserve as many buffers as possible for count bytes. +size_t pni_write_pipeline_reserve(write_pipeline_t *pl, size_t count) +{ + if (pl->primary->in_use) + return 0; // I.e. io->wouldblock + if (!pl->depth) + set_depth(pl); + if (pl->depth == 1) { + // always use the primary + pl->reserved_count = 1; + pl->next_primary_index = 0; + return 1; + } + + iocp_t *iocp = pl->iocpd->iocp; + confirm_as_writer(pl); + size_t wanted = (count / IOCP_WBUFSIZE); + if (count % IOCP_WBUFSIZE) + wanted++; + size_t pending = pl->pending_count; + assert(pending < pl->depth); + size_t bufs = pn_min(wanted, pl->depth - pending); + // Can draw from shared pool or the primary... but share with others. + size_t writers = iocp->writer_count; + size_t shared_count = (iocp->shared_available_count + writers - 1) / writers; + bufs = pn_min(bufs, shared_count + 1); + pl->reserved_count = pending + bufs; + + if (bufs == wanted && + pl->reserved_count < (pl->depth / 2) && + iocp->shared_available_count > (2 * writers + bufs)) { + // No shortage: keep the primary as spare for future use + pl->next_primary_index = pl->reserved_count; + } else if (bufs == 1) { + pl->next_primary_index = pending; + } else { + // let approx 1/3 drain before replenishing + pl->next_primary_index = ((pl->reserved_count + 2) / 3) - 1; + if (pl->next_primary_index < pending) + pl->next_primary_index = pending; + } + return bufs; +} + +write_result_t *pni_write_pipeline_next(write_pipeline_t *pl) +{ + size_t sz = pl->pending_count; + if (sz >= pl->reserved_count) + return NULL; + write_result_t *result; + if (sz == pl->next_primary_index) { + result = pl->primary; + } else { + assert(pl->iocpd->iocp->shared_available_count > 0); + result = shared_pool_pop(pl->iocpd->iocp); + } + + result->in_use = true; + pl->pending_count++; + return result; +} + +void pni_write_pipeline_return(write_pipeline_t *pl, write_result_t *result) +{ + result->in_use = false; + pl->pending_count--; + pl->reserved_count = 0; + if (result != pl->primary) + shared_pool_push(result); + if (pl->pending_count == 0) + remove_as_writer(pl); +} + +bool pni_write_pipeline_writable(write_pipeline_t *pl) +{ + // Only writable if not full and we can guarantee a buffer: + return pl->pending_count < pl->depth && !pl->primary->in_use; +} + +size_t pni_write_pipeline_size(write_pipeline_t *pl) +{ + return pl->pending_count; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/reactor/reactor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/reactor.c b/proton-c/src/reactor/reactor.c index a83a881..abf5d1e 100644 --- a/proton-c/src/reactor/reactor.c +++ b/proton-c/src/reactor/reactor.c @@ -19,23 +19,24 @@ * */ +#include "io.h" +#include "reactor.h" +#include "selectable.h" +#include "platform/platform.h" // pn_i_now + #include <proton/object.h> #include <proton/handlers.h> -#include <proton/io.h> #include <proton/event.h> #include <proton/transport.h> #include <proton/connection.h> #include <proton/session.h> #include <proton/link.h> #include <proton/delivery.h> + #include <stdio.h> #include <stdlib.h> #include <assert.h> -#include "reactor.h" -#include "selectable.h" -#include "platform.h" - struct pn_reactor_t { pn_record_t *attachments; pn_io_t *io; @@ -164,7 +165,7 @@ void pn_reactor_set_handler(pn_reactor_t *reactor, pn_handler_t *handler) { pn_incref(reactor->handler); } -pn_io_t *pn_reactor_io(pn_reactor_t *reactor) { +pn_io_t *pni_reactor_io(pn_reactor_t *reactor) { assert(reactor); return reactor->io; } @@ -389,6 +390,16 @@ bool pn_reactor_quiesced(pn_reactor_t *reactor) { return pn_event_type(event) == PN_REACTOR_QUIESCED; } +pn_handler_t *pn_event_root(pn_event_t *event) +{ + pn_handler_t *h = pn_record_get_handler(pn_event_attachments(event)); + return h; +} + +static void pni_event_set_root(pn_event_t *event, pn_handler_t *handler) { + pn_record_set_handler(pn_event_attachments(event), handler); +} + bool pn_reactor_process(pn_reactor_t *reactor) { assert(reactor); pn_reactor_mark(reactor); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/reactor/reactor.h ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/reactor.h b/proton-c/src/reactor/reactor.h index 461e8b3..bfb397c 100644 --- a/proton-c/src/reactor/reactor.h +++ b/proton-c/src/reactor/reactor.h @@ -26,9 +26,9 @@ #include <proton/url.h> void pni_record_init_reactor(pn_record_t *record, pn_reactor_t *reactor); -void pni_event_set_root(pn_event_t *event, pn_handler_t *handler); void pni_reactor_set_connection_peer_address(pn_connection_t *connection, const char *host, const char *port); +pn_io_t *pni_reactor_io(pn_reactor_t *reactor); #endif /* src/reactor.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/reactor/selectable.c ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/selectable.c b/proton-c/src/reactor/selectable.c new file mode 100644 index 0000000..b42ad1f --- /dev/null +++ b/proton-c/src/reactor/selectable.c @@ -0,0 +1,300 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "selectable.h" + +#include <proton/error.h> + +#include "io.h" + +#include <assert.h> +#include <stdlib.h> + +pn_selectables_t *pn_selectables(void) +{ + return pn_iterator(); +} + +pn_selectable_t *pn_selectables_next(pn_selectables_t *selectables) +{ + return (pn_selectable_t *) pn_iterator_next(selectables); +} + +void pn_selectables_free(pn_selectables_t *selectables) +{ + pn_free(selectables); +} + +struct pn_selectable_t { + pn_socket_t fd; + int index; + pn_record_t *attachments; + void (*readable)(pn_selectable_t *); + void (*writable)(pn_selectable_t *); + void (*error)(pn_selectable_t *); + void (*expired)(pn_selectable_t *); + void (*release) (pn_selectable_t *); + void (*finalize)(pn_selectable_t *); + pn_collector_t *collector; + pn_timestamp_t deadline; + bool reading; + bool writing; + bool registered; + bool terminal; +}; + +void pn_selectable_initialize(pn_selectable_t *sel) +{ + sel->fd = PN_INVALID_SOCKET; + sel->index = -1; + sel->attachments = pn_record(); + sel->readable = NULL; + sel->writable = NULL; + sel->error = NULL; + sel->expired = NULL; + sel->release = NULL; + sel->finalize = NULL; + sel->collector = NULL; + sel->deadline = 0; + sel->reading = false; + sel->writing = false; + sel->registered = false; + sel->terminal = false; +} + +void pn_selectable_finalize(pn_selectable_t *sel) +{ + if (sel->finalize) { + sel->finalize(sel); + } + pn_decref(sel->attachments); + pn_decref(sel->collector); +} + +#define pn_selectable_hashcode NULL +#define pn_selectable_inspect NULL +#define pn_selectable_compare NULL + +PN_CLASSDEF(pn_selectable) + +pn_selectable_t *pn_selectable(void) +{ + return pn_selectable_new(); +} + +bool pn_selectable_is_reading(pn_selectable_t *sel) { + assert(sel); + return sel->reading; +} + +void pn_selectable_set_reading(pn_selectable_t *sel, bool reading) { + assert(sel); + sel->reading = reading; +} + +bool pn_selectable_is_writing(pn_selectable_t *sel) { + assert(sel); + return sel->writing; +} + +void pn_selectable_set_writing(pn_selectable_t *sel, bool writing) { + assert(sel); + sel->writing = writing; +} + +pn_timestamp_t pn_selectable_get_deadline(pn_selectable_t *sel) { + assert(sel); + return sel->deadline; +} + +void pn_selectable_set_deadline(pn_selectable_t *sel, pn_timestamp_t deadline) { + assert(sel); + sel->deadline = deadline; +} + +void pn_selectable_on_readable(pn_selectable_t *sel, void (*readable)(pn_selectable_t *)) { + assert(sel); + sel->readable = readable; +} + +void pn_selectable_on_writable(pn_selectable_t *sel, void (*writable)(pn_selectable_t *)) { + assert(sel); + sel->writable = writable; +} + +void pn_selectable_on_error(pn_selectable_t *sel, void (*error)(pn_selectable_t *)) { + assert(sel); + sel->error = error; +} + +void pn_selectable_on_expired(pn_selectable_t *sel, void (*expired)(pn_selectable_t *)) { + assert(sel); + sel->expired = expired; +} + +void pn_selectable_on_release(pn_selectable_t *sel, void (*release)(pn_selectable_t *)) { + assert(sel); + sel->release = release; +} + +void pn_selectable_on_finalize(pn_selectable_t *sel, void (*finalize)(pn_selectable_t *)) { + assert(sel); + sel->finalize = finalize; +} + +pn_record_t *pn_selectable_attachments(pn_selectable_t *sel) { + return sel->attachments; +} + +void *pni_selectable_get_context(pn_selectable_t *selectable) +{ + assert(selectable); + return pn_record_get(selectable->attachments, PN_LEGCTX); +} + +void pni_selectable_set_context(pn_selectable_t *selectable, void *context) +{ + assert(selectable); + pn_record_set(selectable->attachments, PN_LEGCTX, context); +} + +int pni_selectable_get_index(pn_selectable_t *selectable) +{ + assert(selectable); + return selectable->index; +} + +void pni_selectable_set_index(pn_selectable_t *selectable, int index) +{ + assert(selectable); + selectable->index = index; +} + +pn_socket_t pn_selectable_get_fd(pn_selectable_t *selectable) +{ + assert(selectable); + return selectable->fd; +} + +void pn_selectable_set_fd(pn_selectable_t *selectable, pn_socket_t fd) +{ + assert(selectable); + selectable->fd = fd; +} + +void pn_selectable_readable(pn_selectable_t *selectable) +{ + assert(selectable); + if (selectable->readable) { + selectable->readable(selectable); + } +} + +void pn_selectable_writable(pn_selectable_t *selectable) +{ + assert(selectable); + if (selectable->writable) { + selectable->writable(selectable); + } +} + +void pn_selectable_error(pn_selectable_t *selectable) +{ + assert(selectable); + if (selectable->error) { + selectable->error(selectable); + } +} + +void pn_selectable_expired(pn_selectable_t *selectable) +{ + assert(selectable); + if (selectable->expired) { + selectable->expired(selectable); + } +} + +bool pn_selectable_is_registered(pn_selectable_t *selectable) +{ + assert(selectable); + return selectable->registered; +} + +void pn_selectable_set_registered(pn_selectable_t *selectable, bool registered) +{ + assert(selectable); + selectable->registered = registered; +} + +bool pn_selectable_is_terminal(pn_selectable_t *selectable) +{ + assert(selectable); + return selectable->terminal; +} + +void pn_selectable_terminate(pn_selectable_t *selectable) +{ + assert(selectable); + selectable->terminal = true; +} + +void pn_selectable_release(pn_selectable_t *selectable) +{ + assert(selectable); + if (selectable->release) { + selectable->release(selectable); + } +} + +void pn_selectable_free(pn_selectable_t *selectable) +{ + pn_decref(selectable); +} + +static void pni_readable(pn_selectable_t *selectable) { + pn_collector_put(selectable->collector, PN_OBJECT, selectable, PN_SELECTABLE_READABLE); +} + +static void pni_writable(pn_selectable_t *selectable) { + pn_collector_put(selectable->collector, PN_OBJECT, selectable, PN_SELECTABLE_WRITABLE); +} + +static void pni_error(pn_selectable_t *selectable) { + pn_collector_put(selectable->collector, PN_OBJECT, selectable, PN_SELECTABLE_ERROR); +} + +static void pni_expired(pn_selectable_t *selectable) { + pn_collector_put(selectable->collector, PN_OBJECT, selectable, PN_SELECTABLE_EXPIRED); +} + +void pn_selectable_collect(pn_selectable_t *selectable, pn_collector_t *collector) { + assert(selectable); + pn_decref(selectable->collector); + selectable->collector = collector; + pn_incref(selectable->collector); + + if (collector) { + pn_selectable_on_readable(selectable, pni_readable); + pn_selectable_on_writable(selectable, pni_writable); + pn_selectable_on_error(selectable, pni_error); + pn_selectable_on_expired(selectable, pni_expired); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
