Repository: qpid-proton
Updated Branches:
  refs/heads/master da8b50a65 -> 934b40e35


PROTON-1223: defer socketclose as long as possible, shorten heartbeat test on 
Windows, mimic POSIX connection failure behavior


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/934b40e3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/934b40e3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/934b40e3

Branch: refs/heads/master
Commit: 934b40e35d8bc5c12457e2aeaf354b4187cbd0ef
Parents: 088737a
Author: Clifford Jansen <[email protected]>
Authored: Thu Jun 2 00:49:39 2016 -0700
Committer: Clifford Jansen <[email protected]>
Committed: Thu Jun 2 00:53:22 2016 -0700

----------------------------------------------------------------------
 proton-c/src/windows/iocp.c         | 44 ++++++++++++++++++--------------
 tests/python/proton_tests/engine.py |  6 ++++-
 2 files changed, 30 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/934b40e3/proton-c/src/windows/iocp.c
----------------------------------------------------------------------
diff --git a/proton-c/src/windows/iocp.c b/proton-c/src/windows/iocp.c
index 88ae973..d1abc9a 100644
--- a/proton-c/src/windows/iocp.c
+++ b/proton-c/src/windows/iocp.c
@@ -47,6 +47,13 @@
  * since Windows accumulates inbound traffic without stalling and
  * managing read buffers would not avoid a memory copy at the pn_read
  * boundary.
+ *
+ * A socket must not get a Windows closesocket() unless the
+ * application has called pn_close on the socket or a global
+ * pn_io_finalize().  On error, the internal accounting for
+ * write_closed or read_closed may be updated along with the external
+ * event notification.  A socket may be closed if it is never added to
+ * the iocpdesc_map or is on its way out of the map.
  */
 
 // Max number of overlapped accepts per listener
@@ -92,8 +99,6 @@ static void iocpdesc_fail(iocpdesc_t *iocpd, HRESULT status, 
const char* text)
   if (iocpd->iocp->iocp_trace) {
     iocp_log("connection terminated: %s\n", pn_error_text(iocpd->error));
   }
-  if (!is_listener(iocpd) && !iocpd->write_closed && 
!pni_write_pipeline_size(iocpd->pipeline))
-    iocp_shutdown(iocpd);
   iocpd->write_closed = true;
   iocpd->read_closed = true;
   iocpd->poll_error = true;
@@ -405,8 +410,6 @@ pn_socket_t pni_iocp_begin_connect(iocp_t *iocp, 
pn_socket_t sock, struct addrin
     pn_free(result);
     iocpd->write_closed = true;
     iocpd->read_closed = true;
-    pni_iocp_begin_close(iocpd);
-    sock = INVALID_SOCKET;
     if (iocp->iocp_trace)
       iocp_log("%s\n", pn_error_text(error));
   } else {
@@ -426,10 +429,12 @@ static void complete_connect(connect_result_t *result, 
HRESULT status)
 
   if (status) {
     iocpdesc_fail(iocpd, status, "Connect failure");
+    // Posix sets selectable events as follows:
+    pni_events_update(iocpd, PN_READABLE | PN_EXPIRED);
   } else {
     release_sys_sendbuf(iocpd->socket);
     if (setsockopt(iocpd->socket, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT,  
NULL, 0)) {
-      iocpdesc_fail(iocpd, WSAGetLastError(), "Connect failure (update 
context)");
+      iocpdesc_fail(iocpd, WSAGetLastError(), "Internal connect failure 
(update context)");
     } else {
       pni_events_update(iocpd, PN_WRITABLE);
       start_reading(iocpd);
@@ -528,6 +533,11 @@ ssize_t pni_iocp_begin_write(iocpdesc_t *iocpd, const void 
*buf, size_t len, boo
   return written;
 }
 
+/*
+ * Note: iocp write completion is not "bytes on the wire", it is "peer
+ * acked the sent bytes".  Completion can be seconds on a slow
+ * consuming peer.
+ */
 static void complete_write(write_result_t *result, DWORD xfer_count, HRESULT 
status)
 {
   iocpdesc_t *iocpd = result->base.iocpd;
@@ -634,8 +644,6 @@ static void drain_until_closed(iocpdesc_t *iocpd) {
     else
       iocp_log("graceful close on reader abandoned: %d\n", WSAGetLastError());
   iocpd->read_closed = true;
-  closesocket(iocpd->socket);
-  iocpd->socket = INVALID_SOCKET;
 }
 
 
@@ -681,19 +689,21 @@ ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, 
size_t size, bool *would_blo
     return SOCKET_ERROR;
   }
 
-  size_t count = recv(iocpd->socket, (char *) buf, size, 0);
+  int count = recv(iocpd->socket, (char *) buf, size, 0);
   if (count > 0) {
     pni_events_update(iocpd, iocpd->events & ~PN_READABLE);
     begin_zero_byte_read(iocpd);
-    return count;
+    return (ssize_t) count;
   } else if (count == 0) {
     iocpd->read_closed = true;
     return 0;
   }
   if (WSAGetLastError() == WSAEWOULDBLOCK)
     *would_block = true;
-  else
+  else {
     set_iocp_error_status(error, PN_ERR, WSAGetLastError());
+    iocpd->read_closed = true;
+  }
   return SOCKET_ERROR;
 }
 
@@ -1078,19 +1088,15 @@ static void zombie_list_hard_close_all(iocp_t *iocp)
 
 static void iocp_shutdown(iocpdesc_t *iocpd)
 {
-  bool disconnected = false;
+  if (iocpd->socket == PN_INVALID_SOCKET)
+    return;    // Hard close in progress
   if (shutdown(iocpd->socket, SD_SEND)) {
     int err = WSAGetLastError();
-    if (err == WSAECONNABORTED || err == WSAECONNRESET || err == WSAENOTCONN)
-      disconnected = true;
-    else if (iocpd->iocp->iocp_trace)
-      iocp_log("socket shutdown failed %d\n", err);
+    if (err != WSAECONNABORTED && err != WSAECONNRESET && err != WSAENOTCONN)
+      if (iocpd->iocp->iocp_trace)
+        iocp_log("socket shutdown failed %d\n", err);
   }
   iocpd->write_closed = true;
-  if (iocpd->read_closed || disconnected) {
-    closesocket(iocpd->socket);
-    iocpd->socket = INVALID_SOCKET;
-  }
 }
 
 void pni_iocp_begin_close(iocpdesc_t *iocpd)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/934b40e3/tests/python/proton_tests/engine.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/engine.py 
b/tests/python/proton_tests/engine.py
index e7708da..d011210 100644
--- a/tests/python/proton_tests/engine.py
+++ b/tests/python/proton_tests/engine.py
@@ -2086,7 +2086,11 @@ class ServerTest(Test):
         assert self.conn.transport.frames_output > self.old_count, "No idle 
frames sent"
 
         # now wait to explicitly cause the other side to expire:
-        sleep(3 * idle_timeout)
+        suspend_time = 3 * idle_timeout
+        if os.name=="nt":
+          # On windows, the full delay gets too close to the graceful/hard 
close tipping point
+          suspend_time = 2.5 * idle_timeout
+        sleep(suspend_time)
 
     p = Program()
     Reactor(p).run()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to