http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/reactor/io/windows/iocp.c ---------------------------------------------------------------------- diff --git a/c/src/reactor/io/windows/iocp.c b/c/src/reactor/io/windows/iocp.c new file mode 100644 index 0000000..8a1a64a --- /dev/null +++ b/c/src/reactor/io/windows/iocp.c @@ -0,0 +1,1179 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0501 +#endif +#if _WIN32_WINNT < 0x0501 +#error "Proton requires Windows API support for XP or later." +#endif +#include <winsock2.h> +#include <mswsock.h> +#include <Ws2tcpip.h> + +#include "reactor/io.h" +#include "reactor/selector.h" + +#include "iocp.h" +#include "platform/platform.h" +#include "core/util.h" + +#include <proton/object.h> +#include <proton/error.h> +#include <proton/transport.h> + +#include <assert.h> + +/* + * Windows IO Completion Port support for Proton. + * + * Overlapped writes are used to avoid lengthy stalls between write + * completion and starting a new write. Non-overlapped reads are used + * since Windows accumulates inbound traffic without stalling and + * managing read buffers would not avoid a memory copy at the pn_read + * boundary. + * + * A socket must not get a Windows closesocket() unless the + * application has called pn_close on the socket or a global + * pn_io_finalize(). On error, the internal accounting for + * write_closed or read_closed may be updated along with the external + * event notification. A socket may be closed if it is never added to + * the iocpdesc_map or is on its way out of the map. + */ + +// Max number of overlapped accepts per listener +#define IOCP_MAX_ACCEPTS 10 + +// AcceptEx squishes the local and remote addresses and optional data +// all together when accepting the connection. Reserve enough for +// IPv6 addresses, even if the socket is IPv4. The 16 bytes padding +// per address is required by AcceptEx. +#define IOCP_SOCKADDRMAXLEN (sizeof(sockaddr_in6) + 16) +#define IOCP_SOCKADDRBUFLEN (2 * IOCP_SOCKADDRMAXLEN) + +static void iocp_log(const char *fmt, ...) +{ + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fflush(stderr); +} + +static void set_iocp_error_status(pn_error_t *error, int code, HRESULT status) +{ + char buf[512]; + if (FormatMessage(FORMAT_MESSAGE_MAX_WIDTH_MASK | FORMAT_MESSAGE_FROM_SYSTEM, + 0, status, 0, buf, sizeof(buf), 0)) + pn_error_set(error, code, buf); + else { + fprintf(stderr, "pn internal Windows error: %lu\n", GetLastError()); + } +} + +static void reap_check(iocpdesc_t *); +static void bind_to_completion_port(iocpdesc_t *iocpd); +static void iocp_shutdown(iocpdesc_t *iocpd); +static void start_reading(iocpdesc_t *iocpd); +static bool is_listener(iocpdesc_t *iocpd); +static void release_sys_sendbuf(SOCKET s); + +static void iocpdesc_fail(iocpdesc_t *iocpd, HRESULT status, const char* text) +{ + pni_win32_error(iocpd->error, text, status); + if (iocpd->iocp->iocp_trace) { + iocp_log("connection terminated: %s\n", pn_error_text(iocpd->error)); + } + iocpd->write_closed = true; + iocpd->read_closed = true; + iocpd->poll_error = true; + pni_events_update(iocpd, iocpd->events & ~(PN_READABLE | PN_WRITABLE)); +} + +// Helper functions to use specialized IOCP AcceptEx() and ConnectEx() +static LPFN_ACCEPTEX lookup_accept_ex(SOCKET s) +{ + GUID guid = WSAID_ACCEPTEX; + DWORD bytes = 0; + LPFN_ACCEPTEX fn; + WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), + &fn, sizeof(fn), &bytes, NULL, NULL); + assert(fn); + return fn; +} + +static LPFN_CONNECTEX lookup_connect_ex(SOCKET s) +{ + GUID guid = WSAID_CONNECTEX; + DWORD bytes = 0; + LPFN_CONNECTEX fn; + WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), + &fn, sizeof(fn), &bytes, NULL, NULL); + assert(fn); + return fn; +} + +static LPFN_GETACCEPTEXSOCKADDRS lookup_get_accept_ex_sockaddrs(SOCKET s) +{ + GUID guid = WSAID_GETACCEPTEXSOCKADDRS; + DWORD bytes = 0; + LPFN_GETACCEPTEXSOCKADDRS fn; + WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), + &fn, sizeof(fn), &bytes, NULL, NULL); + assert(fn); + return fn; +} + +// match accept socket to listener socket +static iocpdesc_t *create_same_type_socket(iocpdesc_t *iocpd) +{ + sockaddr_storage sa; + socklen_t salen = sizeof(sa); + if (getsockname(iocpd->socket, (sockaddr*)&sa, &salen) == -1) + return NULL; + SOCKET s = socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM + if (s == INVALID_SOCKET) + return NULL; + return pni_iocpdesc_create(iocpd->iocp, s, false); +} + +static bool is_listener(iocpdesc_t *iocpd) +{ + return iocpd && iocpd->acceptor; +} + +// === Async accept processing + +typedef struct { + iocp_result_t base; + iocpdesc_t *new_sock; + char address_buffer[IOCP_SOCKADDRBUFLEN]; + DWORD unused; +} accept_result_t; + +static accept_result_t *accept_result(iocpdesc_t *listen_sock) { + accept_result_t *result = (accept_result_t *)calloc(1, sizeof(accept_result_t)); + if (result) { + result->base.type = IOCP_ACCEPT; + result->base.iocpd = listen_sock; + } + return result; +} + +static void reset_accept_result(accept_result_t *result) { + memset(&result->base.overlapped, 0, sizeof (OVERLAPPED)); + memset(&result->address_buffer, 0, IOCP_SOCKADDRBUFLEN); +} + +struct pni_acceptor_t { + int accept_queue_size; + pn_list_t *accepts; + iocpdesc_t *listen_sock; + bool signalled; + LPFN_ACCEPTEX fn_accept_ex; + LPFN_GETACCEPTEXSOCKADDRS fn_get_accept_ex_sockaddrs; +}; + +#define pni_acceptor_compare NULL +#define pni_acceptor_inspect NULL +#define pni_acceptor_hashcode NULL + +static void pni_acceptor_initialize(void *object) +{ + pni_acceptor_t *acceptor = (pni_acceptor_t *) object; + acceptor->accepts = pn_list(PN_VOID, IOCP_MAX_ACCEPTS); +} + +static void pni_acceptor_finalize(void *object) +{ + pni_acceptor_t *acceptor = (pni_acceptor_t *) object; + size_t len = pn_list_size(acceptor->accepts); + for (size_t i = 0; i < len; i++) + free(pn_list_get(acceptor->accepts, i)); + pn_free(acceptor->accepts); +} + +static pni_acceptor_t *pni_acceptor(iocpdesc_t *iocpd) +{ + static const pn_cid_t CID_pni_acceptor = CID_pn_void; + static const pn_class_t clazz = PN_CLASS(pni_acceptor); + pni_acceptor_t *acceptor = (pni_acceptor_t *) pn_class_new(&clazz, sizeof(pni_acceptor_t)); + acceptor->listen_sock = iocpd; + acceptor->accept_queue_size = 0; + acceptor->signalled = false; + pn_socket_t sock = acceptor->listen_sock->socket; + acceptor->fn_accept_ex = lookup_accept_ex(sock); + acceptor->fn_get_accept_ex_sockaddrs = lookup_get_accept_ex_sockaddrs(sock); + return acceptor; +} + +static void begin_accept(pni_acceptor_t *acceptor, accept_result_t *result) +{ + if (acceptor->listen_sock->closing) { + if (result) { + free(result); + acceptor->accept_queue_size--; + } + if (acceptor->accept_queue_size == 0) + acceptor->signalled = true; + return; + } + + if (result) { + reset_accept_result(result); + } else { + if (acceptor->accept_queue_size < IOCP_MAX_ACCEPTS && + pn_list_size(acceptor->accepts) == acceptor->accept_queue_size ) { + result = accept_result(acceptor->listen_sock); + acceptor->accept_queue_size++; + } else { + // an async accept is still pending or max concurrent accepts already hit + return; + } + } + + result->new_sock = create_same_type_socket(acceptor->listen_sock); + if (result->new_sock) { + // Not yet connected. + result->new_sock->read_closed = true; + result->new_sock->write_closed = true; + + bool success = acceptor->fn_accept_ex(acceptor->listen_sock->socket, result->new_sock->socket, + result->address_buffer, 0, IOCP_SOCKADDRMAXLEN, IOCP_SOCKADDRMAXLEN, + &result->unused, (LPOVERLAPPED) result); + if (!success && WSAGetLastError() != ERROR_IO_PENDING) { + result->base.status = WSAGetLastError(); + pn_list_add(acceptor->accepts, result); + pni_events_update(acceptor->listen_sock, acceptor->listen_sock->events | PN_READABLE); + } else { + acceptor->listen_sock->ops_in_progress++; + // This socket is equally involved in the async operation. + result->new_sock->ops_in_progress++; + } + } else { + iocpdesc_fail(acceptor->listen_sock, WSAGetLastError(), "create accept socket"); + } +} + +static void complete_accept(accept_result_t *result, HRESULT status) +{ + result->new_sock->ops_in_progress--; + iocpdesc_t *ld = result->base.iocpd; + if (ld->read_closed) { + if (!result->new_sock->closing) + pni_iocp_begin_close(result->new_sock); + free(result); // discard + reap_check(ld); + } else { + result->base.status = status; + pn_list_add(ld->acceptor->accepts, result); + pni_events_update(ld, ld->events | PN_READABLE); + } +} + +pn_socket_t pni_iocp_end_accept(iocpdesc_t *ld, sockaddr *addr, socklen_t *addrlen, bool *would_block, pn_error_t *error) +{ + if (!is_listener(ld)) { + set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP); + return INVALID_SOCKET; + } + if (ld->read_closed) { + set_iocp_error_status(error, PN_ERR, WSAENOTSOCK); + return INVALID_SOCKET; + } + if (pn_list_size(ld->acceptor->accepts) == 0) { + if (ld->events & PN_READABLE && ld->iocp->iocp_trace) + iocp_log("listen socket readable with no available accept completions\n"); + *would_block = true; + return INVALID_SOCKET; + } + + accept_result_t *result = (accept_result_t *) pn_list_get(ld->acceptor->accepts, 0); + pn_list_del(ld->acceptor->accepts, 0, 1); + if (!pn_list_size(ld->acceptor->accepts)) + pni_events_update(ld, ld->events & ~PN_READABLE); // No pending accepts + + pn_socket_t accept_sock; + if (result->base.status) { + accept_sock = INVALID_SOCKET; + pni_win32_error(ld->error, "accept failure", result->base.status); + if (ld->iocp->iocp_trace) + iocp_log("%s\n", pn_error_text(ld->error)); + // App never sees this socket so close it here. + pni_iocp_begin_close(result->new_sock); + } else { + accept_sock = result->new_sock->socket; + // AcceptEx special setsockopt: + setsockopt(accept_sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&ld->socket, + sizeof (SOCKET)); + if (addr && addrlen && *addrlen > 0) { + sockaddr_storage *local_addr = NULL; + sockaddr_storage *remote_addr = NULL; + int local_addrlen, remote_addrlen; + LPFN_GETACCEPTEXSOCKADDRS fn = ld->acceptor->fn_get_accept_ex_sockaddrs; + fn(result->address_buffer, 0, IOCP_SOCKADDRMAXLEN, IOCP_SOCKADDRMAXLEN, + (SOCKADDR **) &local_addr, &local_addrlen, (SOCKADDR **) &remote_addr, + &remote_addrlen); + *addrlen = pn_min(*addrlen, remote_addrlen); + memmove(addr, remote_addr, *addrlen); + } + } + + if (accept_sock != INVALID_SOCKET) { + // Connected. + result->new_sock->read_closed = false; + result->new_sock->write_closed = false; + } + + // Done with the completion result, so reuse it + result->new_sock = NULL; + begin_accept(ld->acceptor, result); + return accept_sock; +} + + +// === Async connect processing + +typedef struct { + iocp_result_t base; + char address_buffer[IOCP_SOCKADDRBUFLEN]; + struct addrinfo *addrinfo; +} connect_result_t; + +#define connect_result_initialize NULL +#define connect_result_compare NULL +#define connect_result_inspect NULL +#define connect_result_hashcode NULL + +static void connect_result_finalize(void *object) +{ + connect_result_t *result = (connect_result_t *) object; + // Do not release addrinfo until ConnectEx completes + if (result->addrinfo) + freeaddrinfo(result->addrinfo); +} + +static connect_result_t *connect_result(iocpdesc_t *iocpd, struct addrinfo *addr) { + static const pn_cid_t CID_connect_result = CID_pn_void; + static const pn_class_t clazz = PN_CLASS(connect_result); + connect_result_t *result = (connect_result_t *) pn_class_new(&clazz, sizeof(connect_result_t)); + if (result) { + memset(result, 0, sizeof(connect_result_t)); + result->base.type = IOCP_CONNECT; + result->base.iocpd = iocpd; + result->addrinfo = addr; + } + return result; +} + +pn_socket_t pni_iocp_begin_connect(iocp_t *iocp, pn_socket_t sock, struct addrinfo *addr, pn_error_t *error) +{ + // addr lives for the duration of the async connect. Caller has passed ownership here. + // See connect_result_finalize(). + // Use of Windows-specific ConnectEx() requires our socket to be "loosely" pre-bound: + sockaddr_storage sa; + memset(&sa, 0, sizeof(sa)); + sa.ss_family = addr->ai_family; + if (bind(sock, (SOCKADDR *) &sa, addr->ai_addrlen)) { + pni_win32_error(error, "begin async connection", WSAGetLastError()); + if (iocp->iocp_trace) + iocp_log("%s\n", pn_error_text(error)); + closesocket(sock); + freeaddrinfo(addr); + return INVALID_SOCKET; + } + + iocpdesc_t *iocpd = pni_iocpdesc_create(iocp, sock, false); + bind_to_completion_port(iocpd); + LPFN_CONNECTEX fn_connect_ex = lookup_connect_ex(iocpd->socket); + connect_result_t *result = connect_result(iocpd, addr); + DWORD unused; + bool success = fn_connect_ex(iocpd->socket, result->addrinfo->ai_addr, result->addrinfo->ai_addrlen, + NULL, 0, &unused, (LPOVERLAPPED) result); + if (!success && WSAGetLastError() != ERROR_IO_PENDING) { + pni_win32_error(error, "ConnectEx failure", WSAGetLastError()); + pn_free(result); + iocpd->write_closed = true; + iocpd->read_closed = true; + if (iocp->iocp_trace) + iocp_log("%s\n", pn_error_text(error)); + } else { + iocpd->ops_in_progress++; + } + return sock; +} + +static void complete_connect(connect_result_t *result, HRESULT status) +{ + iocpdesc_t *iocpd = result->base.iocpd; + if (iocpd->closing) { + pn_free(result); + reap_check(iocpd); + return; + } + + if (status) { + iocpdesc_fail(iocpd, status, "Connect failure"); + // Posix sets selectable events as follows: + pni_events_update(iocpd, PN_READABLE | PN_EXPIRED); + } else { + release_sys_sendbuf(iocpd->socket); + if (setsockopt(iocpd->socket, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0)) { + iocpdesc_fail(iocpd, WSAGetLastError(), "Internal connect failure (update context)"); + } else { + pni_events_update(iocpd, PN_WRITABLE); + start_reading(iocpd); + } + } + pn_free(result); + return; +} + + +// === Async writes + +static bool write_in_progress(iocpdesc_t *iocpd) +{ + return pni_write_pipeline_size(iocpd->pipeline) != 0; +} + +write_result_t *pni_write_result(iocpdesc_t *iocpd, const char *buf, size_t buflen) +{ + write_result_t *result = (write_result_t *) calloc(sizeof(write_result_t), 1); + if (result) { + result->base.type = IOCP_WRITE; + result->base.iocpd = iocpd; + result->buffer.start = buf; + result->buffer.size = buflen; + } + return result; +} + +static int submit_write(write_result_t *result, const void *buf, size_t len) +{ + WSABUF wsabuf; + wsabuf.buf = (char *) buf; + wsabuf.len = len; + memset(&result->base.overlapped, 0, sizeof (OVERLAPPED)); + return WSASend(result->base.iocpd->socket, &wsabuf, 1, NULL, 0, + (LPOVERLAPPED) result, 0); +} + +ssize_t pni_iocp_begin_write(iocpdesc_t *iocpd, const void *buf, size_t len, bool *would_block, pn_error_t *error) +{ + if (len == 0) return 0; + *would_block = false; + if (is_listener(iocpd)) { + set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP); + return INVALID_SOCKET; + } + if (iocpd->closing) { + set_iocp_error_status(error, PN_ERR, WSAESHUTDOWN); + return SOCKET_ERROR; + } + if (iocpd->write_closed) { + assert(pn_error_code(iocpd->error)); + pn_error_copy(error, iocpd->error); + if (iocpd->iocp->iocp_trace) + iocp_log("write error: %s\n", pn_error_text(error)); + return SOCKET_ERROR; + } + if (len == 0) return 0; + if (!(iocpd->events & PN_WRITABLE)) { + *would_block = true; + return SOCKET_ERROR; + } + + size_t written = 0; + size_t requested = len; + const char *outgoing = (const char *) buf; + size_t available = pni_write_pipeline_reserve(iocpd->pipeline, len); + if (!available) { + *would_block = true; + return SOCKET_ERROR; + } + + for (size_t wr_count = 0; wr_count < available; wr_count++) { + write_result_t *result = pni_write_pipeline_next(iocpd->pipeline); + assert(result); + result->base.iocpd = iocpd; + ssize_t actual_len = pn_min(len, result->buffer.size); + result->requested = actual_len; + memmove((void *)result->buffer.start, outgoing, actual_len); + outgoing += actual_len; + written += actual_len; + len -= actual_len; + + int werror = submit_write(result, result->buffer.start, actual_len); + if (werror && WSAGetLastError() != ERROR_IO_PENDING) { + pni_write_pipeline_return(iocpd->pipeline, result); + iocpdesc_fail(iocpd, WSAGetLastError(), "overlapped send"); + return SOCKET_ERROR; + } + iocpd->ops_in_progress++; + } + + if (!pni_write_pipeline_writable(iocpd->pipeline)) + pni_events_update(iocpd, iocpd->events & ~PN_WRITABLE); + return written; +} + +/* + * Note: iocp write completion is not "bytes on the wire", it is "peer + * acked the sent bytes". Completion can be seconds on a slow + * consuming peer. + */ +static void complete_write(write_result_t *result, DWORD xfer_count, HRESULT status) +{ + iocpdesc_t *iocpd = result->base.iocpd; + if (iocpd->closing) { + pni_write_pipeline_return(iocpd->pipeline, result); + if (!iocpd->write_closed && !write_in_progress(iocpd)) + iocp_shutdown(iocpd); + reap_check(iocpd); + return; + } + if (status == 0 && xfer_count > 0) { + if (xfer_count != result->requested) { + // Is this recoverable? How to preserve order if multiple overlapped writes? + pni_write_pipeline_return(iocpd->pipeline, result); + iocpdesc_fail(iocpd, WSA_OPERATION_ABORTED, "Partial overlapped write on socket"); + return; + } else { + // Success. + pni_write_pipeline_return(iocpd->pipeline, result); + if (pni_write_pipeline_writable(iocpd->pipeline)) + pni_events_update(iocpd, iocpd->events | PN_WRITABLE); + return; + } + } + // Other error + pni_write_pipeline_return(iocpd->pipeline, result); + if (status == WSAECONNABORTED || status == WSAECONNRESET || status == WSAENOTCONN + || status == ERROR_NETNAME_DELETED) { + iocpd->write_closed = true; + iocpd->poll_error = true; + pni_events_update(iocpd, iocpd->events & ~PN_WRITABLE); + pni_win32_error(iocpd->error, "Remote close or timeout", status); + } else { + iocpdesc_fail(iocpd, status, "IOCP async write error"); + } +} + + +// === Async reads + +struct read_result_t { + iocp_result_t base; + size_t drain_count; + char unused_buf[1]; +}; + +static read_result_t *read_result(iocpdesc_t *iocpd) +{ + read_result_t *result = (read_result_t *) calloc(sizeof(read_result_t), 1); + if (result) { + result->base.type = IOCP_READ; + result->base.iocpd = iocpd; + } + return result; +} + +static void begin_zero_byte_read(iocpdesc_t *iocpd) +{ + if (iocpd->read_in_progress) return; + if (iocpd->read_closed) { + pni_events_update(iocpd, iocpd->events | PN_READABLE); + return; + } + + read_result_t *result = iocpd->read_result; + memset(&result->base.overlapped, 0, sizeof (OVERLAPPED)); + DWORD flags = 0; + WSABUF wsabuf; + wsabuf.buf = result->unused_buf; + wsabuf.len = 0; + int rc = WSARecv(iocpd->socket, &wsabuf, 1, NULL, &flags, + &result->base.overlapped, 0); + if (rc && WSAGetLastError() != ERROR_IO_PENDING) { + iocpdesc_fail(iocpd, WSAGetLastError(), "IOCP read error"); + return; + } + iocpd->ops_in_progress++; + iocpd->read_in_progress = true; +} + +static void drain_until_closed(iocpdesc_t *iocpd) { + size_t max_drain = 16 * 1024; + char buf[512]; + read_result_t *result = iocpd->read_result; + while (result->drain_count < max_drain) { + int rv = recv(iocpd->socket, buf, 512, 0); + if (rv > 0) + result->drain_count += rv; + else if (rv == 0) { + iocpd->read_closed = true; + return; + } else if (WSAGetLastError() == WSAEWOULDBLOCK) { + // wait a little longer + start_reading(iocpd); + return; + } + else + break; + } + // Graceful close indication unlikely, force the issue + if (iocpd->iocp->iocp_trace) + if (result->drain_count >= max_drain) + iocp_log("graceful close on reader abandoned (too many chars)\n"); + else + iocp_log("graceful close on reader abandoned: %d\n", WSAGetLastError()); + iocpd->read_closed = true; +} + + +static void complete_read(read_result_t *result, DWORD xfer_count, HRESULT status) +{ + iocpdesc_t *iocpd = result->base.iocpd; + iocpd->read_in_progress = false; + + if (iocpd->closing) { + // Application no longer reading, but we are looking for a zero length read + if (!iocpd->read_closed) + drain_until_closed(iocpd); + reap_check(iocpd); + return; + } + + if (status == 0 && xfer_count == 0) { + // Success. + pni_events_update(iocpd, iocpd->events | PN_READABLE); + } else { + iocpdesc_fail(iocpd, status, "IOCP read complete error"); + } +} + +ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool *would_block, pn_error_t *error) +{ + if (size == 0) return 0; + *would_block = false; + if (is_listener(iocpd)) { + set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP); + return SOCKET_ERROR; + } + if (iocpd->closing) { + // Previous call to pn_close() + set_iocp_error_status(error, PN_ERR, WSAESHUTDOWN); + return SOCKET_ERROR; + } + if (iocpd->read_closed) { + if (pn_error_code(iocpd->error)) + pn_error_copy(error, iocpd->error); + else + set_iocp_error_status(error, PN_ERR, WSAENOTCONN); + return SOCKET_ERROR; + } + + int count = recv(iocpd->socket, (char *) buf, size, 0); + if (count > 0) { + pni_events_update(iocpd, iocpd->events & ~PN_READABLE); + begin_zero_byte_read(iocpd); + return (ssize_t) count; + } else if (count == 0) { + iocpd->read_closed = true; + return 0; + } + if (WSAGetLastError() == WSAEWOULDBLOCK) + *would_block = true; + else { + set_iocp_error_status(error, PN_ERR, WSAGetLastError()); + iocpd->read_closed = true; + } + return SOCKET_ERROR; +} + +static void start_reading(iocpdesc_t *iocpd) +{ + begin_zero_byte_read(iocpd); +} + + +// === The iocp descriptor + +static void pni_iocpdesc_initialize(void *object) +{ + iocpdesc_t *iocpd = (iocpdesc_t *) object; + memset(iocpd, 0, sizeof(iocpdesc_t)); + iocpd->socket = INVALID_SOCKET; +} + +static void pni_iocpdesc_finalize(void *object) +{ + iocpdesc_t *iocpd = (iocpdesc_t *) object; + pn_free(iocpd->acceptor); + pn_error_free(iocpd->error); + if (iocpd->pipeline) + if (write_in_progress(iocpd)) + iocp_log("iocp descriptor write leak\n"); + else + pn_free(iocpd->pipeline); + if (iocpd->read_in_progress) + iocp_log("iocp descriptor read leak\n"); + else + free(iocpd->read_result); +} + +static uintptr_t pni_iocpdesc_hashcode(void *object) +{ + iocpdesc_t *iocpd = (iocpdesc_t *) object; + return iocpd->socket; +} + +#define pni_iocpdesc_compare NULL +#define pni_iocpdesc_inspect NULL + +// Reference counted in the iocpdesc map, zombie_list, selector. +static iocpdesc_t *pni_iocpdesc(pn_socket_t s) +{ + static const pn_cid_t CID_pni_iocpdesc = CID_pn_void; + static pn_class_t clazz = PN_CLASS(pni_iocpdesc); + iocpdesc_t *iocpd = (iocpdesc_t *) pn_class_new(&clazz, sizeof(iocpdesc_t)); + assert(iocpd); + iocpd->socket = s; + return iocpd; +} + +static bool is_listener_socket(pn_socket_t s) +{ + BOOL tval = false; + int tvalsz = sizeof(tval); + int code = getsockopt(s, SOL_SOCKET, SO_ACCEPTCONN, (char *)&tval, &tvalsz); + return code == 0 && tval; +} + +iocpdesc_t *pni_iocpdesc_create(iocp_t *iocp, pn_socket_t s, bool external) { + assert (s != INVALID_SOCKET); + assert(!pni_iocpdesc_map_get(iocp, s)); + bool listening = is_listener_socket(s); + iocpdesc_t *iocpd = pni_iocpdesc(s); + iocpd->iocp = iocp; + if (iocpd) { + iocpd->external = external; + iocpd->error = pn_error(); + if (listening) { + iocpd->acceptor = pni_acceptor(iocpd); + } else { + iocpd->pipeline = pni_write_pipeline(iocpd); + iocpd->read_result = read_result(iocpd); + } + pni_iocpdesc_map_push(iocpd); + } + return iocpd; +} + +iocpdesc_t *pni_deadline_desc(iocp_t *iocp) { + // Non IO descriptor for selector deadlines. Do not add to iocpdesc map or + // zombie list. Selector responsible to free/decref object. + iocpdesc_t *iocpd = pni_iocpdesc(PN_INVALID_SOCKET); + iocpd->iocp = iocp; + iocpd->deadline_desc = true; + return iocpd; +} + +// === Fast lookup of a socket's iocpdesc_t + +iocpdesc_t *pni_iocpdesc_map_get(iocp_t *iocp, pn_socket_t s) { + iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_get(iocp->iocpdesc_map, s); + return iocpd; +} + +void pni_iocpdesc_map_push(iocpdesc_t *iocpd) { + pn_hash_put(iocpd->iocp->iocpdesc_map, iocpd->socket, iocpd); + pn_decref(iocpd); + assert(pn_refcount(iocpd) == 1); +} + +void pni_iocpdesc_map_del(iocp_t *iocp, pn_socket_t s) { + pn_hash_del(iocp->iocpdesc_map, (uintptr_t) s); +} + +static void bind_to_completion_port(iocpdesc_t *iocpd) +{ + if (iocpd->bound) return; + if (!iocpd->iocp->completion_port) { + iocpdesc_fail(iocpd, WSAEINVAL, "Incomplete setup, no completion port."); + return; + } + + if (CreateIoCompletionPort ((HANDLE) iocpd->socket, iocpd->iocp->completion_port, 0, 0)) + iocpd->bound = true; + else { + iocpdesc_fail(iocpd, GetLastError(), "IOCP socket setup."); + } +} + +static void release_sys_sendbuf(SOCKET s) +{ + // Set the socket's send buffer size to zero. + int sz = 0; + int status = setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char *)&sz, sizeof(int)); + assert(status == 0); +} + +void pni_iocpdesc_start(iocpdesc_t *iocpd) +{ + if (iocpd->bound) return; + bind_to_completion_port(iocpd); + if (is_listener(iocpd)) { + begin_accept(iocpd->acceptor, NULL); + } + else { + release_sys_sendbuf(iocpd->socket); + pni_events_update(iocpd, PN_WRITABLE); + start_reading(iocpd); + } +} + +static void complete(iocp_result_t *result, bool success, DWORD num_transferred) { + result->iocpd->ops_in_progress--; + DWORD status = success ? 0 : GetLastError(); + + switch (result->type) { + case IOCP_ACCEPT: + complete_accept((accept_result_t *) result, status); + break; + case IOCP_CONNECT: + complete_connect((connect_result_t *) result, status); + break; + case IOCP_WRITE: + complete_write((write_result_t *) result, num_transferred, status); + break; + case IOCP_READ: + complete_read((read_result_t *) result, num_transferred, status); + break; + default: + assert(false); + } +} + +void pni_iocp_drain_completions(iocp_t *iocp) +{ + while (true) { + DWORD timeout_ms = 0; + DWORD num_transferred = 0; + ULONG_PTR completion_key = 0; + OVERLAPPED *overlapped = 0; + + bool good_op = GetQueuedCompletionStatus (iocp->completion_port, &num_transferred, + &completion_key, &overlapped, timeout_ms); + if (!overlapped) + return; // timed out + iocp_result_t *result = (iocp_result_t *) overlapped; + complete(result, good_op, num_transferred); + } +} + +// returns: -1 on error, 0 on timeout, 1 successful completion +int pni_iocp_wait_one(iocp_t *iocp, int timeout, pn_error_t *error) { + DWORD win_timeout = (timeout < 0) ? INFINITE : (DWORD) timeout; + DWORD num_transferred = 0; + ULONG_PTR completion_key = 0; + OVERLAPPED *overlapped = 0; + + bool good_op = GetQueuedCompletionStatus (iocp->completion_port, &num_transferred, + &completion_key, &overlapped, win_timeout); + if (!overlapped) + if (GetLastError() == WAIT_TIMEOUT) + return 0; + else { + if (error) + pni_win32_error(error, "GetQueuedCompletionStatus", GetLastError()); + return -1; + } + + iocp_result_t *result = (iocp_result_t *) overlapped; + complete(result, good_op, num_transferred); + return 1; +} + +// === Close (graceful and otherwise) + +// zombie_list is for sockets transitioning out of iocp on their way to zero ops_in_progress +// and fully closed. + +static void zombie_list_add(iocpdesc_t *iocpd) +{ + assert(iocpd->closing); + if (!iocpd->ops_in_progress) { + // No need to make a zombie. + if (iocpd->socket != INVALID_SOCKET) { + closesocket(iocpd->socket); + iocpd->socket = INVALID_SOCKET; + iocpd->read_closed = true; + } + return; + } + // Allow 2 seconds for graceful shutdown before releasing socket resource. + iocpd->reap_time = pn_i_now() + 2000; + pn_list_add(iocpd->iocp->zombie_list, iocpd); +} + +static void reap_check(iocpdesc_t *iocpd) +{ + if (iocpd->closing && !iocpd->ops_in_progress) { + if (iocpd->socket != INVALID_SOCKET) { + closesocket(iocpd->socket); + iocpd->socket = INVALID_SOCKET; + } + pn_list_remove(iocpd->iocp->zombie_list, iocpd); + // iocpd is decref'ed and possibly released + } +} + +pn_timestamp_t pni_zombie_deadline(iocp_t *iocp) +{ + if (pn_list_size(iocp->zombie_list)) { + iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, 0); + return iocpd->reap_time; + } + return 0; +} + +void pni_zombie_check(iocp_t *iocp, pn_timestamp_t now) +{ + pn_list_t *zl = iocp->zombie_list; + // Look for stale zombies that should have been reaped by "now" + for (size_t idx = 0; idx < pn_list_size(zl); idx++) { + iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(zl, idx); + if (iocpd->reap_time > now) + return; + if (iocpd->socket == INVALID_SOCKET) + continue; + assert(iocpd->ops_in_progress > 0); + if (iocp->iocp_trace) + iocp_log("async close: graceful close timeout exceeded\n"); + closesocket(iocpd->socket); + iocpd->socket = INVALID_SOCKET; + iocpd->read_closed = true; + // outstanding ops should complete immediately now + } +} + +static void drain_zombie_completions(iocp_t *iocp) +{ + // No more pn_selector_select() from App, but zombies still need care and feeding + // until their outstanding async actions complete. + pni_iocp_drain_completions(iocp); + + // Discard any that have no pending async IO + size_t sz = pn_list_size(iocp->zombie_list); + for (size_t idx = 0; idx < sz;) { + iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, idx); + if (!iocpd->ops_in_progress) { + pn_list_del(iocp->zombie_list, idx, 1); + sz--; + } else { + idx++; + } + } + + unsigned shutdown_grace = 2000; + char *override = getenv("PN_SHUTDOWN_GRACE"); + if (override) { + int grace = atoi(override); + if (grace > 0 && grace < 60000) + shutdown_grace = (unsigned) grace; + } + pn_timestamp_t now = pn_i_now(); + pn_timestamp_t deadline = now + shutdown_grace; + + while (pn_list_size(iocp->zombie_list)) { + if (now >= deadline) + break; + int rv = pni_iocp_wait_one(iocp, deadline - now, NULL); + if (rv < 0) { + iocp_log("unexpected IOCP failure on Proton IO shutdown %d\n", GetLastError()); + break; + } + now = pn_i_now(); + } + if (now >= deadline && pn_list_size(iocp->zombie_list) && iocp->iocp_trace) + // Should only happen if really slow TCP handshakes, i.e. total network failure + iocp_log("network failure on Proton shutdown\n"); +} + +static pn_list_t *iocp_map_close_all(iocp_t *iocp) +{ + // Zombify stragglers, i.e. no pn_close() from the application. + pn_list_t *externals = pn_list(PN_OBJECT, 0); + for (pn_handle_t entry = pn_hash_head(iocp->iocpdesc_map); entry; + entry = pn_hash_next(iocp->iocpdesc_map, entry)) { + iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_value(iocp->iocpdesc_map, entry); + // Just listeners first. + if (is_listener(iocpd)) { + if (iocpd->external) { + // Owned by application, just keep a temporary reference to it. + // iocp_result_t structs must not be free'd until completed or + // the completion port is closed. + if (iocpd->ops_in_progress) + pn_list_add(externals, iocpd); + pni_iocpdesc_map_del(iocp, iocpd->socket); + } else { + // Make it a zombie. + pni_iocp_begin_close(iocpd); + } + } + } + pni_iocp_drain_completions(iocp); + + for (pn_handle_t entry = pn_hash_head(iocp->iocpdesc_map); entry; + entry = pn_hash_next(iocp->iocpdesc_map, entry)) { + iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_value(iocp->iocpdesc_map, entry); + if (iocpd->external) { + iocpd->read_closed = true; // Do not consume from read side + iocpd->write_closed = true; // Do not shutdown write side + if (iocpd->ops_in_progress) + pn_list_add(externals, iocpd); + pni_iocpdesc_map_del(iocp, iocpd->socket); + } else { + // Make it a zombie. + pni_iocp_begin_close(iocpd); + } + } + return externals; +} + +static void zombie_list_hard_close_all(iocp_t *iocp) +{ + pni_iocp_drain_completions(iocp); + size_t zs = pn_list_size(iocp->zombie_list); + for (size_t i = 0; i < zs; i++) { + iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, i); + if (iocpd->socket != INVALID_SOCKET) { + closesocket(iocpd->socket); + iocpd->socket = INVALID_SOCKET; + iocpd->read_closed = true; + iocpd->write_closed = true; + } + } + pni_iocp_drain_completions(iocp); + + // Zombies should be all gone. Do a sanity check. + zs = pn_list_size(iocp->zombie_list); + int remaining = 0; + int ops = 0; + for (size_t i = 0; i < zs; i++) { + iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, i); + remaining++; + ops += iocpd->ops_in_progress; + } + if (remaining) + iocp_log("Proton: %d unfinished close operations (ops count = %d)\n", remaining, ops); +} + +static void iocp_shutdown(iocpdesc_t *iocpd) +{ + if (iocpd->socket == PN_INVALID_SOCKET) + return; // Hard close in progress + if (shutdown(iocpd->socket, SD_SEND)) { + int err = WSAGetLastError(); + if (err != WSAECONNABORTED && err != WSAECONNRESET && err != WSAENOTCONN) + if (iocpd->iocp->iocp_trace) + iocp_log("socket shutdown failed %d\n", err); + } + iocpd->write_closed = true; +} + +void pni_iocp_begin_close(iocpdesc_t *iocpd) +{ + assert (!iocpd->closing); + if (is_listener(iocpd)) { + // Listening socket is easy. Close the socket which will cancel async ops. + pn_socket_t old_sock = iocpd->socket; + iocpd->socket = INVALID_SOCKET; + iocpd->closing = true; + iocpd->read_closed = true; + iocpd->write_closed = true; + closesocket(old_sock); + // Pending accepts will now complete. Zombie can die when all consumed. + zombie_list_add(iocpd); + pni_iocpdesc_map_del(iocpd->iocp, old_sock); // may pn_free *iocpd + } else { + // Continue async operation looking for graceful close confirmation or timeout. + pn_socket_t old_sock = iocpd->socket; + iocpd->closing = true; + if (!iocpd->write_closed && !write_in_progress(iocpd)) + iocp_shutdown(iocpd); + zombie_list_add(iocpd); + pni_iocpdesc_map_del(iocpd->iocp, old_sock); // may pn_free *iocpd + } +} + + +// === iocp_t + +#define pni_iocp_hashcode NULL +#define pni_iocp_compare NULL +#define pni_iocp_inspect NULL + +void pni_iocp_initialize(void *obj) +{ + iocp_t *iocp = (iocp_t *) obj; + memset(iocp, 0, sizeof(iocp_t)); + pni_shared_pool_create(iocp); + iocp->completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); + assert(iocp->completion_port != NULL); + iocp->iocpdesc_map = pn_hash(PN_OBJECT, 0, 0.75); + iocp->zombie_list = pn_list(PN_OBJECT, 0); + iocp->iocp_trace = pn_env_bool("PN_TRACE_DRV"); + iocp->selector = NULL; +} + +void pni_iocp_finalize(void *obj) +{ + iocp_t *iocp = (iocp_t *) obj; + // Move sockets to closed state, except external sockets. + pn_list_t *externals = iocp_map_close_all(iocp); + // Now everything with ops_in_progress is in the zombie_list or the externals list. + assert(!pn_hash_head(iocp->iocpdesc_map)); + pn_free(iocp->iocpdesc_map); + + drain_zombie_completions(iocp); // Last chance for graceful close + zombie_list_hard_close_all(iocp); + CloseHandle(iocp->completion_port); // This cancels all our async ops + iocp->completion_port = NULL; + + if (pn_list_size(externals) && iocp->iocp_trace) + iocp_log("%d external sockets not closed and removed from Proton IOCP control\n", pn_list_size(externals)); + + // Now safe to free everything that might be touched by a former async operation. + pn_free(externals); + pn_free(iocp->zombie_list); + pni_shared_pool_free(iocp); +} + +iocp_t *pni_iocp() +{ + static const pn_cid_t CID_pni_iocp = CID_pn_void; + static const pn_class_t clazz = PN_CLASS(pni_iocp); + iocp_t *iocp = (iocp_t *) pn_class_new(&clazz, sizeof(iocp_t)); + return iocp; +}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/reactor/io/windows/iocp.h ---------------------------------------------------------------------- diff --git a/c/src/reactor/io/windows/iocp.h b/c/src/reactor/io/windows/iocp.h new file mode 100644 index 0000000..6cf0bc0 --- /dev/null +++ b/c/src/reactor/io/windows/iocp.h @@ -0,0 +1,136 @@ +#ifndef PROTON_SRC_IOCP_H +#define PROTON_SRC_IOCP_H 1 + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/import_export.h> +#include <proton/selectable.h> +#include <proton/type_compat.h> + +typedef struct pni_acceptor_t pni_acceptor_t; +typedef struct write_result_t write_result_t; +typedef struct read_result_t read_result_t; +typedef struct write_pipeline_t write_pipeline_t; +typedef struct iocpdesc_t iocpdesc_t; + + +// One per pn_io_t. + +struct iocp_t { + HANDLE completion_port; + pn_hash_t *iocpdesc_map; + pn_list_t *zombie_list; + unsigned shared_pool_size; + char *shared_pool_memory; + write_result_t **shared_results; + write_result_t **available_results; + size_t shared_available_count; + size_t writer_count; + int loopback_bufsize; + bool iocp_trace; + pn_selector_t *selector; +}; + + +// One for each socket. +// This iocpdesc_t structure is ref counted by the iocpdesc_map, zombie_list, +// selector->iocp_descriptors list. It should remain ref counted in the +// zombie_list until ops_in_progress == 0 or the completion port is closed. + +struct iocpdesc_t { + pn_socket_t socket; + iocp_t *iocp; + pni_acceptor_t *acceptor; + pn_error_t *error; + int ops_in_progress; + bool read_in_progress; + write_pipeline_t *pipeline; + read_result_t *read_result; + bool external; // true if socket set up outside Proton + bool bound; // associted with the completion port + bool closing; // pn_close called by application + bool read_closed; // EOF or read error + bool write_closed; // shutdown sent or write error + bool poll_error; // flag posix-like POLLERR/POLLHUP/POLLNVAL + bool deadline_desc; // Socket-less deadline descriptor for selectors + pn_selector_t *selector; + pn_selectable_t *selectable; + int events; + int interests; + pn_timestamp_t deadline; + iocpdesc_t *triggered_list_next; + iocpdesc_t *triggered_list_prev; + iocpdesc_t *deadlines_next; + iocpdesc_t *deadlines_prev; + pn_timestamp_t reap_time;; +}; + +typedef enum { IOCP_ACCEPT, IOCP_CONNECT, IOCP_READ, IOCP_WRITE } iocp_type_t; + +typedef struct { + OVERLAPPED overlapped; + iocp_type_t type; + iocpdesc_t *iocpd; + HRESULT status; +} iocp_result_t; + +struct write_result_t { + iocp_result_t base; + size_t requested; + bool in_use; + pn_bytes_t buffer; +}; + +iocpdesc_t *pni_iocpdesc_create(iocp_t *, pn_socket_t s, bool external); +iocpdesc_t *pni_iocpdesc_map_get(iocp_t *, pn_socket_t s); +iocpdesc_t *pni_deadline_desc(iocp_t *); +void pni_iocpdesc_map_del(iocp_t *, pn_socket_t s); +void pni_iocpdesc_map_push(iocpdesc_t *iocpd); +void pni_iocpdesc_start(iocpdesc_t *iocpd); +void pni_iocp_drain_completions(iocp_t *); +int pni_iocp_wait_one(iocp_t *, int timeout, pn_error_t *); +void pni_iocp_start_accepting(iocpdesc_t *iocpd); +pn_socket_t pni_iocp_end_accept(iocpdesc_t *ld, sockaddr *addr, socklen_t *addrlen, bool *would_block, pn_error_t *error); +pn_socket_t pni_iocp_begin_connect(iocp_t *, pn_socket_t sock, struct addrinfo *addr, pn_error_t *error); +ssize_t pni_iocp_begin_write(iocpdesc_t *, const void *, size_t, bool *, pn_error_t *); +ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool *would_block, pn_error_t *error); +void pni_iocp_begin_close(iocpdesc_t *iocpd); +iocp_t *pni_iocp(); + +void pni_events_update(iocpdesc_t *iocpd, int events); +write_result_t *pni_write_result(iocpdesc_t *iocpd, const char *buf, size_t buflen); +write_pipeline_t *pni_write_pipeline(iocpdesc_t *iocpd); +size_t pni_write_pipeline_size(write_pipeline_t *); +bool pni_write_pipeline_writable(write_pipeline_t *); +void pni_write_pipeline_return(write_pipeline_t *, write_result_t *); +size_t pni_write_pipeline_reserve(write_pipeline_t *, size_t); +write_result_t *pni_write_pipeline_next(write_pipeline_t *); +void pni_shared_pool_create(iocp_t *); +void pni_shared_pool_free(iocp_t *); +void pni_zombie_check(iocp_t *, pn_timestamp_t); +pn_timestamp_t pni_zombie_deadline(iocp_t *); + +pn_selector_t *pni_selector_create(iocp_t *iocp); + +int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code); + +#endif /* iocp.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/reactor/io/windows/selector.c ---------------------------------------------------------------------- diff --git a/c/src/reactor/io/windows/selector.c b/c/src/reactor/io/windows/selector.c new file mode 100644 index 0000000..15da73b --- /dev/null +++ b/c/src/reactor/io/windows/selector.c @@ -0,0 +1,384 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0501 +#endif +#if _WIN32_WINNT < 0x0501 +#error "Proton requires Windows API support for XP or later." +#endif +#include <winsock2.h> +#include <Ws2tcpip.h> + +#include "reactor/io.h" +#include "reactor/selectable.h" +#include "reactor/selector.h" + +#include "iocp.h" +#include "platform/platform.h" +#include "core/util.h" + +#include <proton/object.h> +#include <proton/error.h> +#include <assert.h> + +static void interests_update(iocpdesc_t *iocpd, int interests); +static void deadlines_update(iocpdesc_t *iocpd, pn_timestamp_t t); + +struct pn_selector_t { + iocp_t *iocp; + pn_list_t *selectables; + pn_list_t *iocp_descriptors; + size_t current; + iocpdesc_t *current_triggered; + pn_timestamp_t awoken; + pn_error_t *error; + iocpdesc_t *triggered_list_head; + iocpdesc_t *triggered_list_tail; + iocpdesc_t *deadlines_head; + iocpdesc_t *deadlines_tail; +}; + +void pn_selector_initialize(void *obj) +{ + pn_selector_t *selector = (pn_selector_t *) obj; + selector->iocp = NULL; + selector->selectables = pn_list(PN_WEAKREF, 0); + selector->iocp_descriptors = pn_list(PN_OBJECT, 0); + selector->current = 0; + selector->current_triggered = NULL; + selector->awoken = 0; + selector->error = pn_error(); + selector->triggered_list_head = NULL; + selector->triggered_list_tail = NULL; + selector->deadlines_head = NULL; + selector->deadlines_tail = NULL; +} + +void pn_selector_finalize(void *obj) +{ + pn_selector_t *selector = (pn_selector_t *) obj; + pn_free(selector->selectables); + pn_free(selector->iocp_descriptors); + pn_error_free(selector->error); + selector->iocp->selector = NULL; +} + +#define pn_selector_hashcode NULL +#define pn_selector_compare NULL +#define pn_selector_inspect NULL + +pn_selector_t *pni_selector() +{ + static const pn_class_t clazz = PN_CLASS(pn_selector); + pn_selector_t *selector = (pn_selector_t *) pn_class_new(&clazz, sizeof(pn_selector_t)); + return selector; +} + +pn_selector_t *pni_selector_create(iocp_t *iocp) +{ + pn_selector_t *selector = pni_selector(); + selector->iocp = iocp; + return selector; +} + +void pn_selector_add(pn_selector_t *selector, pn_selectable_t *selectable) +{ + assert(selector); + assert(selectable); + assert(pni_selectable_get_index(selectable) < 0); + pn_socket_t sock = pn_selectable_get_fd(selectable); + iocpdesc_t *iocpd = NULL; + + if (pni_selectable_get_index(selectable) < 0) { + pn_list_add(selector->selectables, selectable); + pn_list_add(selector->iocp_descriptors, NULL); + size_t size = pn_list_size(selector->selectables); + pni_selectable_set_index(selectable, size - 1); + } + + pn_selector_update(selector, selectable); +} + +void pn_selector_update(pn_selector_t *selector, pn_selectable_t *selectable) +{ + // A selectable's fd may switch from PN_INVALID_SCOKET to a working socket between + // update calls. If a selectable without a valid socket has a deadline, we need + // a dummy iocpdesc_t to participate in the deadlines list. + int idx = pni_selectable_get_index(selectable); + assert(idx >= 0); + pn_timestamp_t deadline = pn_selectable_get_deadline(selectable); + pn_socket_t sock = pn_selectable_get_fd(selectable); + iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(selector->iocp_descriptors, idx); + + if (!iocpd && deadline && sock == PN_INVALID_SOCKET) { + iocpd = pni_deadline_desc(selector->iocp); + assert(iocpd); + pn_list_set(selector->iocp_descriptors, idx, iocpd); + pn_decref(iocpd); // life is solely tied to iocp_descriptors list + iocpd->selector = selector; + iocpd->selectable = selectable; + } + else if (iocpd && iocpd->deadline_desc && sock != PN_INVALID_SOCKET) { + // Switching to a real socket. Stop using a deadline descriptor. + deadlines_update(iocpd, 0); + // decref descriptor in list and pick up a real iocpd below + pn_list_set(selector->iocp_descriptors, idx, NULL); + iocpd = NULL; + } + + // The selectables socket may be set long after it has been added + if (!iocpd && sock != PN_INVALID_SOCKET) { + iocpd = pni_iocpdesc_map_get(selector->iocp, sock); + if (!iocpd) { + // Socket created outside proton. Hook it up to iocp. + iocpd = pni_iocpdesc_create(selector->iocp, sock, true); + assert(iocpd); + if (iocpd) + pni_iocpdesc_start(iocpd); + } + if (iocpd) { + pn_list_set(selector->iocp_descriptors, idx, iocpd); + iocpd->selector = selector; + iocpd->selectable = selectable; + } + } + + if (iocpd) { + assert(sock == iocpd->socket || iocpd->closing); + int interests = PN_ERROR; // Always + if (pn_selectable_is_reading(selectable)) { + interests |= PN_READABLE; + } + if (pn_selectable_is_writing(selectable)) { + interests |= PN_WRITABLE; + } + if (deadline) { + interests |= PN_EXPIRED; + } + interests_update(iocpd, interests); + deadlines_update(iocpd, deadline); + } +} + +void pn_selector_remove(pn_selector_t *selector, pn_selectable_t *selectable) +{ + assert(selector); + assert(selectable); + + int idx = pni_selectable_get_index(selectable); + assert(idx >= 0); + iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(selector->iocp_descriptors, idx); + if (iocpd) { + if (selector->current_triggered == iocpd) + selector->current_triggered = iocpd->triggered_list_next; + interests_update(iocpd, 0); + deadlines_update(iocpd, 0); + assert(selector->triggered_list_head != iocpd && !iocpd->triggered_list_prev); + assert(selector->deadlines_head != iocpd && !iocpd->deadlines_prev); + iocpd->selector = NULL; + iocpd->selectable = NULL; + } + pn_list_del(selector->selectables, idx, 1); + pn_list_del(selector->iocp_descriptors, idx, 1); + size_t size = pn_list_size(selector->selectables); + for (size_t i = idx; i < size; i++) { + pn_selectable_t *sel = (pn_selectable_t *) pn_list_get(selector->selectables, i); + pni_selectable_set_index(sel, i); + } + + pni_selectable_set_index(selectable, -1); + + if (selector->current >= (size_t) idx) { + selector->current--; + } +} + +size_t pn_selector_size(pn_selector_t *selector) { + assert(selector); + return pn_list_size(selector->selectables); +} + +int pn_selector_select(pn_selector_t *selector, int timeout) +{ + assert(selector); + pn_error_clear(selector->error); + pn_timestamp_t deadline = 0; + pn_timestamp_t now = pn_i_now(); + + if (timeout) { + if (selector->deadlines_head) + deadline = selector->deadlines_head->deadline; + } + if (deadline) { + int64_t delta = deadline - now; + if (delta < 0) { + delta = 0; + } + if (timeout < 0) + timeout = delta; + else if (timeout > delta) + timeout = delta; + } + deadline = (timeout >= 0) ? now + timeout : 0; + + // Process all currently available completions, even if matched events available + pni_iocp_drain_completions(selector->iocp); + pni_zombie_check(selector->iocp, now); + // Loop until an interested event is matched, or until deadline + while (true) { + if (selector->triggered_list_head) + break; + if (deadline && deadline <= now) + break; + pn_timestamp_t completion_deadline = deadline; + pn_timestamp_t zd = pni_zombie_deadline(selector->iocp); + if (zd) + completion_deadline = completion_deadline ? pn_min(zd, completion_deadline) : zd; + + int completion_timeout = (!completion_deadline) ? -1 : completion_deadline - now; + int rv = pni_iocp_wait_one(selector->iocp, completion_timeout, selector->error); + if (rv < 0) + return pn_error_code(selector->error); + + now = pn_i_now(); + if (zd && zd <= now) { + pni_zombie_check(selector->iocp, now); + } + } + + selector->current = 0; + selector->awoken = now; + for (iocpdesc_t *iocpd = selector->deadlines_head; iocpd; iocpd = iocpd->deadlines_next) { + if (iocpd->deadline <= now) + pni_events_update(iocpd, iocpd->events | PN_EXPIRED); + else + break; + } + selector->current_triggered = selector->triggered_list_head; + return pn_error_code(selector->error); +} + +pn_selectable_t *pn_selector_next(pn_selector_t *selector, int *events) +{ + if (selector->current_triggered) { + iocpdesc_t *iocpd = selector->current_triggered; + *events = iocpd->interests & iocpd->events; + selector->current_triggered = iocpd->triggered_list_next; + return iocpd->selectable; + } + return NULL; +} + +void pn_selector_free(pn_selector_t *selector) +{ + assert(selector); + pn_free(selector); +} + + +static void triggered_list_add(pn_selector_t *selector, iocpdesc_t *iocpd) +{ + if (iocpd->triggered_list_prev || selector->triggered_list_head == iocpd) + return; // already in list + LL_ADD(selector, triggered_list, iocpd); +} + +static void triggered_list_remove(pn_selector_t *selector, iocpdesc_t *iocpd) +{ + if (!iocpd->triggered_list_prev && selector->triggered_list_head != iocpd) + return; // not in list + LL_REMOVE(selector, triggered_list, iocpd); + iocpd->triggered_list_prev = NULL; + iocpd->triggered_list_next = NULL; +} + + +void pni_events_update(iocpdesc_t *iocpd, int events) +{ + // If set, a poll error is permanent + if (iocpd->poll_error) + events |= PN_ERROR; + if (iocpd->events == events) + return; + iocpd->events = events; + if (iocpd->selector) { + if (iocpd->events & iocpd->interests) + triggered_list_add(iocpd->selector, iocpd); + else + triggered_list_remove(iocpd->selector, iocpd); + } +} + +static void interests_update(iocpdesc_t *iocpd, int interests) +{ + int old_interests = iocpd->interests; + if (old_interests == interests) + return; + iocpd->interests = interests; + if (iocpd->selector) { + if (iocpd->events & iocpd->interests) + triggered_list_add(iocpd->selector, iocpd); + else + triggered_list_remove(iocpd->selector, iocpd); + } +} + +static void deadlines_remove(pn_selector_t *selector, iocpdesc_t *iocpd) +{ + if (!iocpd->deadlines_prev && selector->deadlines_head != iocpd) + return; // not in list + LL_REMOVE(selector, deadlines, iocpd); + iocpd->deadlines_prev = NULL; + iocpd->deadlines_next = NULL; +} + + +static void deadlines_update(iocpdesc_t *iocpd, pn_timestamp_t deadline) +{ + if (deadline == iocpd->deadline) + return; + + iocpd->deadline = deadline; + pn_selector_t *selector = iocpd->selector; + if (!deadline) { + deadlines_remove(selector, iocpd); + pni_events_update(iocpd, iocpd->events & ~PN_EXPIRED); + } else { + if (iocpd->deadlines_prev || selector->deadlines_head == iocpd) { + deadlines_remove(selector, iocpd); + pni_events_update(iocpd, iocpd->events & ~PN_EXPIRED); + } + iocpdesc_t *dl_iocpd = LL_HEAD(selector, deadlines); + while (dl_iocpd && dl_iocpd->deadline <= deadline) + dl_iocpd = dl_iocpd->deadlines_next; + if (dl_iocpd) { + // insert + iocpd->deadlines_prev = dl_iocpd->deadlines_prev; + iocpd->deadlines_next = dl_iocpd; + dl_iocpd->deadlines_prev = iocpd; + if (selector->deadlines_head == dl_iocpd) + selector->deadlines_head = iocpd; + } else { + LL_ADD(selector, deadlines, iocpd); // append + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/reactor/io/windows/write_pipeline.c ---------------------------------------------------------------------- diff --git a/c/src/reactor/io/windows/write_pipeline.c b/c/src/reactor/io/windows/write_pipeline.c new file mode 100644 index 0000000..238303c --- /dev/null +++ b/c/src/reactor/io/windows/write_pipeline.c @@ -0,0 +1,314 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* + * A simple write buffer pool. Each socket has a dedicated "primary" + * buffer and can borrow from a shared pool with limited size tuning. + * Could enhance e.g. with separate pools per network interface and fancier + * memory tuning based on interface speed, system resources, and + * number of connections, etc. + */ + +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0501 +#endif +#if _WIN32_WINNT < 0x0501 +#error "Proton requires Windows API support for XP or later." +#endif +#include <winsock2.h> +#include <Ws2tcpip.h> + +#include "reactor/io.h" +#include "reactor/selector.h" +#include "reactor/selectable.h" + +#include "iocp.h" +#include "core/util.h" + +#include <proton/error.h> +#include <proton/object.h> + +#include <assert.h> + +// Max overlapped writes per socket +#define IOCP_MAX_OWRITES 16 +// Write buffer size +#define IOCP_WBUFSIZE 16384 + +static void pipeline_log(const char *fmt, ...) +{ + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fflush(stderr); +} + +void pni_shared_pool_create(iocp_t *iocp) +{ + // TODO: more pools (or larger one) when using multiple non-loopback interfaces + iocp->shared_pool_size = 16; + char *env = getenv("PNI_WRITE_BUFFERS"); // Internal: for debugging + if (env) { + int sz = atoi(env); + if (sz >= 0 && sz < 256) { + iocp->shared_pool_size = sz; + } + } + iocp->loopback_bufsize = 0; + env = getenv("PNI_LB_BUFSIZE"); // Internal: for debugging + if (env) { + int sz = atoi(env); + if (sz >= 0 && sz <= 128 * 1024) { + iocp->loopback_bufsize = sz; + } + } + + if (iocp->shared_pool_size) { + iocp->shared_pool_memory = (char *) VirtualAlloc(NULL, IOCP_WBUFSIZE * iocp->shared_pool_size, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE); + HRESULT status = GetLastError(); + if (!iocp->shared_pool_memory) { + perror("Proton write buffer pool allocation failure\n"); + iocp->shared_pool_size = 0; + iocp->shared_available_count = 0; + return; + } + + iocp->shared_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *)); + iocp->available_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *)); + iocp->shared_available_count = iocp->shared_pool_size; + char *mem = iocp->shared_pool_memory; + for (unsigned i = 0; i < iocp->shared_pool_size; i++) { + iocp->shared_results[i] = iocp->available_results[i] = pni_write_result(NULL, mem, IOCP_WBUFSIZE); + mem += IOCP_WBUFSIZE; + } + } +} + +void pni_shared_pool_free(iocp_t *iocp) +{ + for (unsigned i = 0; i < iocp->shared_pool_size; i++) { + write_result_t *result = iocp->shared_results[i]; + if (result->in_use) + pipeline_log("Proton buffer pool leak\n"); + else + free(result); + } + if (iocp->shared_pool_size) { + free(iocp->shared_results); + free(iocp->available_results); + if (iocp->shared_pool_memory) { + if (!VirtualFree(iocp->shared_pool_memory, 0, MEM_RELEASE)) { + perror("write buffers release failed"); + } + iocp->shared_pool_memory = NULL; + } + } +} + +static void shared_pool_push(write_result_t *result) +{ + iocp_t *iocp = result->base.iocpd->iocp; + assert(iocp->shared_available_count < iocp->shared_pool_size); + iocp->available_results[iocp->shared_available_count++] = result; +} + +static write_result_t *shared_pool_pop(iocp_t *iocp) +{ + return iocp->shared_available_count ? iocp->available_results[--iocp->shared_available_count] : NULL; +} + +struct write_pipeline_t { + iocpdesc_t *iocpd; + size_t pending_count; + write_result_t *primary; + size_t reserved_count; + size_t next_primary_index; + size_t depth; + bool is_writer; +}; + +#define write_pipeline_compare NULL +#define write_pipeline_inspect NULL +#define write_pipeline_hashcode NULL + +static void write_pipeline_initialize(void *object) +{ + write_pipeline_t *pl = (write_pipeline_t *) object; + pl->pending_count = 0; + const char *pribuf = (const char *) malloc(IOCP_WBUFSIZE); + pl->primary = pni_write_result(NULL, pribuf, IOCP_WBUFSIZE); + pl->depth = 0; + pl->is_writer = false; +} + +static void write_pipeline_finalize(void *object) +{ + write_pipeline_t *pl = (write_pipeline_t *) object; + free((void *)pl->primary->buffer.start); + free(pl->primary); +} + +write_pipeline_t *pni_write_pipeline(iocpdesc_t *iocpd) +{ + static const pn_cid_t CID_write_pipeline = CID_pn_void; + static const pn_class_t clazz = PN_CLASS(write_pipeline); + write_pipeline_t *pipeline = (write_pipeline_t *) pn_class_new(&clazz, sizeof(write_pipeline_t)); + pipeline->iocpd = iocpd; + pipeline->primary->base.iocpd = iocpd; + return pipeline; +} + +static void confirm_as_writer(write_pipeline_t *pl) +{ + if (!pl->is_writer) { + iocp_t *iocp = pl->iocpd->iocp; + iocp->writer_count++; + pl->is_writer = true; + } +} + +static void remove_as_writer(write_pipeline_t *pl) +{ + if (!pl->is_writer) + return; + iocp_t *iocp = pl->iocpd->iocp; + assert(iocp->writer_count); + pl->is_writer = false; + iocp->writer_count--; +} + +/* + * Optimal depth will depend on properties of the NIC, server, and driver. For now, + * just distinguish between loopback interfaces and the rest. Optimizations in the + * loopback stack allow decent performance with depth 1 and actually cause major + * performance hiccups if set to large values. + */ +static void set_depth(write_pipeline_t *pl) +{ + pl->depth = 1; + sockaddr_storage sa; + socklen_t salen = sizeof(sa); + char buf[INET6_ADDRSTRLEN]; + DWORD buflen = sizeof(buf); + + if (getsockname(pl->iocpd->socket,(sockaddr*) &sa, &salen) == 0 && + getnameinfo((sockaddr*) &sa, salen, buf, buflen, NULL, 0, NI_NUMERICHOST) == 0) { + if ((sa.ss_family == AF_INET6 && strcmp(buf, "::1")) || + (sa.ss_family == AF_INET && strncmp(buf, "127.", 4))) { + // not loopback + pl->depth = IOCP_MAX_OWRITES; + } else { + iocp_t *iocp = pl->iocpd->iocp; + if (iocp->loopback_bufsize) { + const char *p = (const char *) realloc((void *) pl->primary->buffer.start, iocp->loopback_bufsize); + if (p) { + pl->primary->buffer.start = p; + pl->primary->buffer.size = iocp->loopback_bufsize; + } + } + } + } +} + +// Reserve as many buffers as possible for count bytes. +size_t pni_write_pipeline_reserve(write_pipeline_t *pl, size_t count) +{ + if (pl->primary->in_use) + return 0; // I.e. io->wouldblock + if (!pl->depth) + set_depth(pl); + if (pl->depth == 1) { + // always use the primary + pl->reserved_count = 1; + pl->next_primary_index = 0; + return 1; + } + + iocp_t *iocp = pl->iocpd->iocp; + confirm_as_writer(pl); + size_t wanted = (count / IOCP_WBUFSIZE); + if (count % IOCP_WBUFSIZE) + wanted++; + size_t pending = pl->pending_count; + assert(pending < pl->depth); + size_t bufs = pn_min(wanted, pl->depth - pending); + // Can draw from shared pool or the primary... but share with others. + size_t writers = iocp->writer_count; + size_t shared_count = (iocp->shared_available_count + writers - 1) / writers; + bufs = pn_min(bufs, shared_count + 1); + pl->reserved_count = pending + bufs; + + if (bufs == wanted && + pl->reserved_count < (pl->depth / 2) && + iocp->shared_available_count > (2 * writers + bufs)) { + // No shortage: keep the primary as spare for future use + pl->next_primary_index = pl->reserved_count; + } else if (bufs == 1) { + pl->next_primary_index = pending; + } else { + // let approx 1/3 drain before replenishing + pl->next_primary_index = ((pl->reserved_count + 2) / 3) - 1; + if (pl->next_primary_index < pending) + pl->next_primary_index = pending; + } + return bufs; +} + +write_result_t *pni_write_pipeline_next(write_pipeline_t *pl) +{ + size_t sz = pl->pending_count; + if (sz >= pl->reserved_count) + return NULL; + write_result_t *result; + if (sz == pl->next_primary_index) { + result = pl->primary; + } else { + assert(pl->iocpd->iocp->shared_available_count > 0); + result = shared_pool_pop(pl->iocpd->iocp); + } + + result->in_use = true; + pl->pending_count++; + return result; +} + +void pni_write_pipeline_return(write_pipeline_t *pl, write_result_t *result) +{ + result->in_use = false; + pl->pending_count--; + pl->reserved_count = 0; + if (result != pl->primary) + shared_pool_push(result); + if (pl->pending_count == 0) + remove_as_writer(pl); +} + +bool pni_write_pipeline_writable(write_pipeline_t *pl) +{ + // Only writable if not full and we can guarantee a buffer: + return pl->pending_count < pl->depth && !pl->primary->in_use; +} + +size_t pni_write_pipeline_size(write_pipeline_t *pl) +{ + return pl->pending_count; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/reactor/reactor.c ---------------------------------------------------------------------- diff --git a/c/src/reactor/reactor.c b/c/src/reactor/reactor.c new file mode 100644 index 0000000..bbf94f1 --- /dev/null +++ b/c/src/reactor/reactor.c @@ -0,0 +1,501 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "io.h" +#include "reactor.h" +#include "selectable.h" +#include "platform/platform.h" // pn_i_now + +#include <proton/object.h> +#include <proton/handlers.h> +#include <proton/event.h> +#include <proton/transport.h> +#include <proton/connection.h> +#include <proton/session.h> +#include <proton/link.h> +#include <proton/delivery.h> + +#include <stdio.h> +#include <stdlib.h> +#include <assert.h> + +struct pn_reactor_t { + pn_record_t *attachments; + pn_io_t *io; + pn_collector_t *collector; + pn_handler_t *global; + pn_handler_t *handler; + pn_list_t *children; + pn_timer_t *timer; + pn_socket_t wakeup[2]; + pn_selectable_t *selectable; + pn_event_type_t previous; + pn_timestamp_t now; + int selectables; + int timeout; + bool yield; + bool stop; +}; + +pn_timestamp_t pn_reactor_mark(pn_reactor_t *reactor) { + assert(reactor); + reactor->now = pn_i_now(); + return reactor->now; +} + +pn_timestamp_t pn_reactor_now(pn_reactor_t *reactor) { + assert(reactor); + return reactor->now; +} + +static void pn_reactor_initialize(pn_reactor_t *reactor) { + reactor->attachments = pn_record(); + reactor->io = pn_io(); + reactor->collector = pn_collector(); + reactor->global = pn_iohandler(); + reactor->handler = pn_handler(NULL); + reactor->children = pn_list(PN_OBJECT, 0); + reactor->timer = pn_timer(reactor->collector); + reactor->wakeup[0] = PN_INVALID_SOCKET; + reactor->wakeup[1] = PN_INVALID_SOCKET; + reactor->selectable = NULL; + reactor->previous = PN_EVENT_NONE; + reactor->selectables = 0; + reactor->timeout = 0; + reactor->yield = false; + reactor->stop = false; + pn_reactor_mark(reactor); +} + +static void pn_reactor_finalize(pn_reactor_t *reactor) { + for (int i = 0; i < 2; i++) { + if (reactor->wakeup[i] != PN_INVALID_SOCKET) { + pn_close(reactor->io, reactor->wakeup[i]); + } + } + pn_decref(reactor->attachments); + pn_decref(reactor->collector); + pn_decref(reactor->global); + pn_decref(reactor->handler); + pn_decref(reactor->children); + pn_decref(reactor->timer); + pn_decref(reactor->io); +} + +#define pn_reactor_hashcode NULL +#define pn_reactor_compare NULL +#define pn_reactor_inspect NULL + +PN_CLASSDEF(pn_reactor) + +pn_reactor_t *pn_reactor() { + pn_reactor_t *reactor = pn_reactor_new(); + int err = pn_pipe(reactor->io, reactor->wakeup); + if (err) { + pn_free(reactor); + return NULL; + } else { + return reactor; + } +} + +pn_record_t *pn_reactor_attachments(pn_reactor_t *reactor) { + assert(reactor); + return reactor->attachments; +} + +pn_millis_t pn_reactor_get_timeout(pn_reactor_t *reactor) { + assert(reactor); + return reactor->timeout; +} + +void pn_reactor_set_timeout(pn_reactor_t *reactor, pn_millis_t timeout) { + assert(reactor); + reactor->timeout = timeout; +} + +void pn_reactor_free(pn_reactor_t *reactor) { + if (reactor) { + pn_collector_release(reactor->collector); + pn_handler_free(reactor->handler); + reactor->handler = NULL; + pn_decref(reactor); + } +} + +pn_handler_t *pn_reactor_get_global_handler(pn_reactor_t *reactor) { + assert(reactor); + return reactor->global; +} + +void pn_reactor_set_global_handler(pn_reactor_t *reactor, pn_handler_t *handler) { + assert(reactor); + pn_decref(reactor->global); + reactor->global = handler; + pn_incref(reactor->global); +} + +pn_handler_t *pn_reactor_get_handler(pn_reactor_t *reactor) { + assert(reactor); + return reactor->handler; +} + +void pn_reactor_set_handler(pn_reactor_t *reactor, pn_handler_t *handler) { + assert(reactor); + pn_decref(reactor->handler); + reactor->handler = handler; + pn_incref(reactor->handler); +} + +pn_io_t *pni_reactor_io(pn_reactor_t *reactor) { + assert(reactor); + return reactor->io; +} + +pn_error_t *pn_reactor_error(pn_reactor_t *reactor) { + assert(reactor); + return pn_io_error(reactor->io); +} + +pn_collector_t *pn_reactor_collector(pn_reactor_t *reactor) { + assert(reactor); + return reactor->collector; +} + +pn_list_t *pn_reactor_children(pn_reactor_t *reactor) { + assert(reactor); + return reactor->children; +} + +static void pni_selectable_release(pn_selectable_t *selectable) { + pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(selectable); + pn_incref(selectable); + if (pn_list_remove(reactor->children, selectable)) { + reactor->selectables--; + } + pn_decref(selectable); +} + +pn_selectable_t *pn_reactor_selectable(pn_reactor_t *reactor) { + assert(reactor); + pn_selectable_t *sel = pn_selectable(); + pn_selectable_collect(sel, reactor->collector); + pn_collector_put(reactor->collector, PN_OBJECT, sel, PN_SELECTABLE_INIT); + pni_selectable_set_context(sel, reactor); + pn_list_add(reactor->children, sel); + pn_selectable_on_release(sel, pni_selectable_release); + pn_decref(sel); + reactor->selectables++; + return sel; +} + +PN_HANDLE(PNI_TERMINATED) + +void pn_reactor_update(pn_reactor_t *reactor, pn_selectable_t *selectable) { + assert(reactor); + pn_record_t *record = pn_selectable_attachments(selectable); + if (!pn_record_has(record, PNI_TERMINATED)) { + if (pn_selectable_is_terminal(selectable)) { + pn_record_def(record, PNI_TERMINATED, PN_VOID); + pn_collector_put(reactor->collector, PN_OBJECT, selectable, PN_SELECTABLE_FINAL); + } else { + pn_collector_put(reactor->collector, PN_OBJECT, selectable, PN_SELECTABLE_UPDATED); + } + } +} + +void pni_handle_final(pn_reactor_t *reactor, pn_event_t *event); + +static void pni_reactor_dispatch_post(pn_reactor_t *reactor, pn_event_t *event) { + assert(reactor); + assert(event); + switch (pn_event_type(event)) { + case PN_CONNECTION_FINAL: + pni_handle_final(reactor, event); + break; + default: + break; + } +} + +PN_HANDLE(PN_HANDLER) + +pn_handler_t *pn_record_get_handler(pn_record_t *record) { + assert(record); + return (pn_handler_t *) pn_record_get(record, PN_HANDLER); +} + +void pn_record_set_handler(pn_record_t *record, pn_handler_t *handler) { + assert(record); + pn_record_def(record, PN_HANDLER, PN_OBJECT); + pn_record_set(record, PN_HANDLER, handler); +} + +PN_HANDLE(PN_REACTOR) + +pn_reactor_t *pni_record_get_reactor(pn_record_t *record) { + return (pn_reactor_t *) pn_record_get(record, PN_REACTOR); +} + +void pni_record_init_reactor(pn_record_t *record, pn_reactor_t *reactor) { + pn_record_def(record, PN_REACTOR, PN_WEAKREF); + pn_record_set(record, PN_REACTOR, reactor); +} + +static pn_connection_t *pni_object_connection(const pn_class_t *clazz, void *object) { + switch (pn_class_id(clazz)) { + case CID_pn_delivery: + return pn_session_connection(pn_link_session(pn_delivery_link((pn_delivery_t *) object))); + case CID_pn_link: + return pn_session_connection(pn_link_session((pn_link_t *) object)); + case CID_pn_session: + return pn_session_connection((pn_session_t *) object); + case CID_pn_connection: + return (pn_connection_t *) object; + case CID_pn_transport: + return pn_transport_connection((pn_transport_t *) object); + default: + return NULL; + } +} + +static pn_reactor_t *pni_reactor(pn_selectable_t *sel) { + return (pn_reactor_t *) pni_selectable_get_context(sel); +} + +pn_reactor_t *pn_class_reactor(const pn_class_t *clazz, void *object) { + switch (pn_class_id(clazz)) { + case CID_pn_reactor: + return (pn_reactor_t *) object; + case CID_pn_task: + return pni_record_get_reactor(pn_task_attachments((pn_task_t *) object)); + case CID_pn_transport: + return pni_record_get_reactor(pn_transport_attachments((pn_transport_t *) object)); + case CID_pn_delivery: + case CID_pn_link: + case CID_pn_session: + case CID_pn_connection: + { + pn_connection_t *conn = pni_object_connection(clazz, object); + pn_record_t *record = pn_connection_attachments(conn); + return pni_record_get_reactor(record); + } + case CID_pn_selectable: + { + pn_selectable_t *sel = (pn_selectable_t *) object; + return pni_reactor(sel); + } + default: + return NULL; + } +} + +pn_reactor_t *pn_object_reactor(void *object) { + return pn_class_reactor(pn_object_reify(object), object); +} + +pn_reactor_t *pn_event_reactor(pn_event_t *event) { + const pn_class_t *clazz = pn_event_class(event); + void *context = pn_event_context(event); + return pn_class_reactor(clazz, context); +} + +pn_handler_t *pn_event_handler(pn_event_t *event, pn_handler_t *default_handler) { + pn_handler_t *handler = NULL; + pn_link_t *link = pn_event_link(event); + if (link) { + handler = pn_record_get_handler(pn_link_attachments(link)); + if (handler) { return handler; } + } + pn_session_t *session = pn_event_session(event); + if (session) { + handler = pn_record_get_handler(pn_session_attachments(session)); + if (handler) { return handler; } + } + pn_connection_t *connection = pn_event_connection(event); + if (connection) { + handler = pn_record_get_handler(pn_connection_attachments(connection)); + if (handler) { return handler; } + } + switch (pn_class_id(pn_event_class(event))) { + case CID_pn_task: + handler = pn_record_get_handler(pn_task_attachments((pn_task_t *) pn_event_context(event))); + if (handler) { return handler; } + break; + case CID_pn_selectable: + handler = pn_record_get_handler(pn_selectable_attachments((pn_selectable_t *) pn_event_context(event))); + if (handler) { return handler; } + break; + default: + break; + } + return default_handler; +} + +pn_task_t *pn_reactor_schedule(pn_reactor_t *reactor, int delay, pn_handler_t *handler) { + pn_task_t *task = pn_timer_schedule(reactor->timer, reactor->now + delay); + pn_record_t *record = pn_task_attachments(task); + pni_record_init_reactor(record, reactor); + pn_record_set_handler(record, handler); + if (reactor->selectable) { + pn_selectable_set_deadline(reactor->selectable, pn_timer_deadline(reactor->timer)); + pn_reactor_update(reactor, reactor->selectable); + } + return task; +} + +void pni_event_print(pn_event_t *event) { + pn_string_t *str = pn_string(NULL); + pn_inspect(event, str); + printf("%s\n", pn_string_get(str)); + pn_free(str); +} + +bool pni_reactor_more(pn_reactor_t *reactor) { + assert(reactor); + return pn_timer_tasks(reactor->timer) || reactor->selectables > 1; +} + +void pn_reactor_yield(pn_reactor_t *reactor) { + assert(reactor); + reactor->yield = true; +} + +bool pn_reactor_quiesced(pn_reactor_t *reactor) { + assert(reactor); + pn_event_t *event = pn_collector_peek(reactor->collector); + // no events + if (!event) { return true; } + // more than one event + if (pn_collector_more(reactor->collector)) { return false; } + // if we have just one event then we are quiesced if the quiesced event + return pn_event_type(event) == PN_REACTOR_QUIESCED; +} + +pn_handler_t *pn_event_root(pn_event_t *event) +{ + pn_handler_t *h = pn_record_get_handler(pn_event_attachments(event)); + return h; +} + +static void pni_event_set_root(pn_event_t *event, pn_handler_t *handler) { + pn_record_set_handler(pn_event_attachments(event), handler); +} + +bool pn_reactor_process(pn_reactor_t *reactor) { + assert(reactor); + pn_reactor_mark(reactor); + pn_event_type_t previous = PN_EVENT_NONE; + while (true) { + pn_event_t *event = pn_collector_peek(reactor->collector); + //pni_event_print(event); + if (event) { + if (reactor->yield) { + reactor->yield = false; + return true; + } + reactor->yield = false; + pn_incref(event); + pn_handler_t *handler = pn_event_handler(event, reactor->handler); + pn_event_type_t type = pn_event_type(event); + pni_event_set_root(event, handler); + pn_handler_dispatch(handler, event, type); + pni_event_set_root(event, reactor->global); + pn_handler_dispatch(reactor->global, event, type); + pni_reactor_dispatch_post(reactor, event); + previous = reactor->previous = type; + pn_decref(event); + pn_collector_pop(reactor->collector); + } else if (!reactor->stop && pni_reactor_more(reactor)) { + if (previous != PN_REACTOR_QUIESCED && reactor->previous != PN_REACTOR_FINAL) { + pn_collector_put(reactor->collector, PN_OBJECT, reactor, PN_REACTOR_QUIESCED); + } else { + return true; + } + } else { + if (reactor->selectable) { + pn_selectable_terminate(reactor->selectable); + pn_reactor_update(reactor, reactor->selectable); + reactor->selectable = NULL; + } else { + if (reactor->previous != PN_REACTOR_FINAL) + pn_collector_put(reactor->collector, PN_OBJECT, reactor, PN_REACTOR_FINAL); + return false; + } + } + } +} + +static void pni_timer_expired(pn_selectable_t *sel) { + pn_reactor_t *reactor = pni_reactor(sel); + pn_timer_tick(reactor->timer, reactor->now); + pn_selectable_set_deadline(sel, pn_timer_deadline(reactor->timer)); + pn_reactor_update(reactor, sel); +} + +static void pni_timer_readable(pn_selectable_t *sel) { + char buf[64]; + pn_reactor_t *reactor = pni_reactor(sel); + pn_socket_t fd = pn_selectable_get_fd(sel); + pn_read(reactor->io, fd, buf, 64); + pni_timer_expired(sel); +} + +pn_selectable_t *pni_timer_selectable(pn_reactor_t *reactor) { + pn_selectable_t *sel = pn_reactor_selectable(reactor); + pn_selectable_set_fd(sel, reactor->wakeup[0]); + pn_selectable_on_readable(sel, pni_timer_readable); + pn_selectable_on_expired(sel, pni_timer_expired); + pn_selectable_set_reading(sel, true); + pn_selectable_set_deadline(sel, pn_timer_deadline(reactor->timer)); + pn_reactor_update(reactor, sel); + return sel; +} + +int pn_reactor_wakeup(pn_reactor_t *reactor) { + assert(reactor); + ssize_t n = pn_write(reactor->io, reactor->wakeup[1], "x", 1); + if (n < 0) { + return (int) n; + } else { + return 0; + } +} + +void pn_reactor_start(pn_reactor_t *reactor) { + assert(reactor); + pn_collector_put(reactor->collector, PN_OBJECT, reactor, PN_REACTOR_INIT); + reactor->selectable = pni_timer_selectable(reactor); +} + +void pn_reactor_stop(pn_reactor_t *reactor) { + assert(reactor); + reactor->stop = true; +} + +void pn_reactor_run(pn_reactor_t *reactor) { + assert(reactor); + pn_reactor_set_timeout(reactor, 3141); + pn_reactor_start(reactor); + while (pn_reactor_process(reactor)) {} + pn_reactor_process(reactor); + pn_collector_release(reactor->collector); +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/reactor/reactor.h ---------------------------------------------------------------------- diff --git a/c/src/reactor/reactor.h b/c/src/reactor/reactor.h new file mode 100644 index 0000000..bfb397c --- /dev/null +++ b/c/src/reactor/reactor.h @@ -0,0 +1,34 @@ +#ifndef _PROTON_SRC_REACTOR_H +#define _PROTON_SRC_REACTOR_H 1 + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/reactor.h> +#include <proton/url.h> + +void pni_record_init_reactor(pn_record_t *record, pn_reactor_t *reactor); +void pni_reactor_set_connection_peer_address(pn_connection_t *connection, + const char *host, + const char *port); +pn_io_t *pni_reactor_io(pn_reactor_t *reactor); + +#endif /* src/reactor.h */ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org