Repository: qpid-proton Updated Branches: refs/heads/master da8b50a65 -> 934b40e35
PROTON-1223: defer socketclose as long as possible, shorten heartbeat test on Windows, mimic POSIX connection failure behavior Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/934b40e3 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/934b40e3 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/934b40e3 Branch: refs/heads/master Commit: 934b40e35d8bc5c12457e2aeaf354b4187cbd0ef Parents: 088737a Author: Clifford Jansen <[email protected]> Authored: Thu Jun 2 00:49:39 2016 -0700 Committer: Clifford Jansen <[email protected]> Committed: Thu Jun 2 00:53:22 2016 -0700 ---------------------------------------------------------------------- proton-c/src/windows/iocp.c | 44 ++++++++++++++++++-------------- tests/python/proton_tests/engine.py | 6 ++++- 2 files changed, 30 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/934b40e3/proton-c/src/windows/iocp.c ---------------------------------------------------------------------- diff --git a/proton-c/src/windows/iocp.c b/proton-c/src/windows/iocp.c index 88ae973..d1abc9a 100644 --- a/proton-c/src/windows/iocp.c +++ b/proton-c/src/windows/iocp.c @@ -47,6 +47,13 @@ * since Windows accumulates inbound traffic without stalling and * managing read buffers would not avoid a memory copy at the pn_read * boundary. + * + * A socket must not get a Windows closesocket() unless the + * application has called pn_close on the socket or a global + * pn_io_finalize(). On error, the internal accounting for + * write_closed or read_closed may be updated along with the external + * event notification. A socket may be closed if it is never added to + * the iocpdesc_map or is on its way out of the map. */ // Max number of overlapped accepts per listener @@ -92,8 +99,6 @@ static void iocpdesc_fail(iocpdesc_t *iocpd, HRESULT status, const char* text) if (iocpd->iocp->iocp_trace) { iocp_log("connection terminated: %s\n", pn_error_text(iocpd->error)); } - if (!is_listener(iocpd) && !iocpd->write_closed && !pni_write_pipeline_size(iocpd->pipeline)) - iocp_shutdown(iocpd); iocpd->write_closed = true; iocpd->read_closed = true; iocpd->poll_error = true; @@ -405,8 +410,6 @@ pn_socket_t pni_iocp_begin_connect(iocp_t *iocp, pn_socket_t sock, struct addrin pn_free(result); iocpd->write_closed = true; iocpd->read_closed = true; - pni_iocp_begin_close(iocpd); - sock = INVALID_SOCKET; if (iocp->iocp_trace) iocp_log("%s\n", pn_error_text(error)); } else { @@ -426,10 +429,12 @@ static void complete_connect(connect_result_t *result, HRESULT status) if (status) { iocpdesc_fail(iocpd, status, "Connect failure"); + // Posix sets selectable events as follows: + pni_events_update(iocpd, PN_READABLE | PN_EXPIRED); } else { release_sys_sendbuf(iocpd->socket); if (setsockopt(iocpd->socket, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0)) { - iocpdesc_fail(iocpd, WSAGetLastError(), "Connect failure (update context)"); + iocpdesc_fail(iocpd, WSAGetLastError(), "Internal connect failure (update context)"); } else { pni_events_update(iocpd, PN_WRITABLE); start_reading(iocpd); @@ -528,6 +533,11 @@ ssize_t pni_iocp_begin_write(iocpdesc_t *iocpd, const void *buf, size_t len, boo return written; } +/* + * Note: iocp write completion is not "bytes on the wire", it is "peer + * acked the sent bytes". Completion can be seconds on a slow + * consuming peer. + */ static void complete_write(write_result_t *result, DWORD xfer_count, HRESULT status) { iocpdesc_t *iocpd = result->base.iocpd; @@ -634,8 +644,6 @@ static void drain_until_closed(iocpdesc_t *iocpd) { else iocp_log("graceful close on reader abandoned: %d\n", WSAGetLastError()); iocpd->read_closed = true; - closesocket(iocpd->socket); - iocpd->socket = INVALID_SOCKET; } @@ -681,19 +689,21 @@ ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool *would_blo return SOCKET_ERROR; } - size_t count = recv(iocpd->socket, (char *) buf, size, 0); + int count = recv(iocpd->socket, (char *) buf, size, 0); if (count > 0) { pni_events_update(iocpd, iocpd->events & ~PN_READABLE); begin_zero_byte_read(iocpd); - return count; + return (ssize_t) count; } else if (count == 0) { iocpd->read_closed = true; return 0; } if (WSAGetLastError() == WSAEWOULDBLOCK) *would_block = true; - else + else { set_iocp_error_status(error, PN_ERR, WSAGetLastError()); + iocpd->read_closed = true; + } return SOCKET_ERROR; } @@ -1078,19 +1088,15 @@ static void zombie_list_hard_close_all(iocp_t *iocp) static void iocp_shutdown(iocpdesc_t *iocpd) { - bool disconnected = false; + if (iocpd->socket == PN_INVALID_SOCKET) + return; // Hard close in progress if (shutdown(iocpd->socket, SD_SEND)) { int err = WSAGetLastError(); - if (err == WSAECONNABORTED || err == WSAECONNRESET || err == WSAENOTCONN) - disconnected = true; - else if (iocpd->iocp->iocp_trace) - iocp_log("socket shutdown failed %d\n", err); + if (err != WSAECONNABORTED && err != WSAECONNRESET && err != WSAENOTCONN) + if (iocpd->iocp->iocp_trace) + iocp_log("socket shutdown failed %d\n", err); } iocpd->write_closed = true; - if (iocpd->read_closed || disconnected) { - closesocket(iocpd->socket); - iocpd->socket = INVALID_SOCKET; - } } void pni_iocp_begin_close(iocpdesc_t *iocpd) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/934b40e3/tests/python/proton_tests/engine.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/engine.py b/tests/python/proton_tests/engine.py index e7708da..d011210 100644 --- a/tests/python/proton_tests/engine.py +++ b/tests/python/proton_tests/engine.py @@ -2086,7 +2086,11 @@ class ServerTest(Test): assert self.conn.transport.frames_output > self.old_count, "No idle frames sent" # now wait to explicitly cause the other side to expire: - sleep(3 * idle_timeout) + suspend_time = 3 * idle_timeout + if os.name=="nt": + # On windows, the full delay gets too close to the graceful/hard close tipping point + suspend_time = 2.5 * idle_timeout + sleep(suspend_time) p = Program() Reactor(p).run() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
