Author: mturk
Date: Sat Aug  6 15:05:02 2011
New Revision: 1154526

URL: http://svn.apache.org/viewvc?rev=1154526&view=rev
Log:
Implement windows IPC socket references

Modified:
    commons/sandbox/runtime/trunk/src/main/native/include/acr/descriptor.h
    commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_ipcs.h
    commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.c
    commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsstream.c

Modified: commons/sandbox/runtime/trunk/src/main/native/include/acr/descriptor.h
URL: 
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/include/acr/descriptor.h?rev=1154526&r1=1154525&r2=1154526&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/include/acr/descriptor.h 
(original)
+++ commons/sandbox/runtime/trunk/src/main/native/include/acr/descriptor.h Sat 
Aug  6 15:05:02 2011
@@ -42,6 +42,7 @@
 #define ACR_DT_SOCKET           0x0003
 #define ACR_DT_LOCALSOCK        0x0004
 #define ACR_DT_SSLSOCK          0x0005
+#define ACR_DT_IPCSOCK          0x0006
 
 typedef struct acr_fd_t acr_fd_t;
 struct acr_fd_t {
@@ -53,6 +54,7 @@ struct acr_fd_t {
     union {
         HANDLE              h;
         SOCKET              s;
+        LPVOID              p;
     };
 #else
     int                     f;
@@ -69,6 +71,7 @@ struct acr_sd_t {
     union {
         HANDLE              h;
         SOCKET              s;
+        LPVOID              p;
     };
 #else
     int                     s;

Modified: commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_ipcs.h
URL: 
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_ipcs.h?rev=1154526&r1=1154525&r2=1154526&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_ipcs.h 
(original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_ipcs.h Sat Aug  
6 15:05:02 2011
@@ -284,7 +284,6 @@ struct IPCSOCK
     IPCSOCK_CLIENT     *c;
     DWORD               dwPageSize;    /* Allocation Page size */
     DWORD               dwTimeout;     /* Socket timeout */
-    DWORD               flags;         /* Socket flags */
     void               *pAttachment;   /* Custom user object */
     HANDLE              hClientMeta;   /* Metadata mapping   */
     HANDLE              hBufferMap[2]; /* IO Buffer mappings
@@ -321,7 +320,9 @@ struct IPCSERVER
     HANDLE              hAcceptSync;
     HANDLE              hAcceptLock;
     volatile long       nConnections;
+    volatile long       nReferences;
     DWORD               dwFlags;
+    BOOL                bClosed;
 };
 
 /**
@@ -382,6 +383,8 @@ struct IPCSERVER
                          * evet so writter can continue with writting.
                          */
 
+#define ACR_DESC_PTR(D, T) ((D) == 0) ? 0 : (T)((J2P(D, acr_sd_t *))->p)
+
 #if defined(__cplusplus)
 extern "C" {
 #endif
@@ -442,6 +445,28 @@ int AcrIpcSend(LPIPCSOCK pSocket, const 
  */
 int AcrIpcRecv(LPIPCSOCK pSocket, void *pData, int nSize);
 
+/**
+ * Check if there is data present that can be read
+ * or write without blocking.
+ * @param pSocket the socket to check.
+ * @param bForRead if TRUE check read side.
+ * @return number of bytes that can be read or write
+ *         without blocking, or -1 on error.
+ */
+int
+AcrIpcAvail(LPIPCSOCK pSocket, BOOL bForRead);
+
+/**
+ * Flush the output buffer and block until the peer
+ * reads the data.
+ * @param pSocket the socket to use.
+ * @return zero on success or error code.
+ * @notice The function will block if input buffer is not
+ *         empty until the peer reads the entire data or until
+ *         the socket times out.
+ */
+int AcrIpcFlush(LPIPCSOCK pSocket);
+
 #if defined(__cplusplus)
 }
 #endif

Modified: commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.c
URL: 
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.c?rev=1154526&r1=1154525&r2=1154526&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.c (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.c Sat Aug  6 
15:05:02 2011
@@ -304,18 +304,64 @@ AcrIpcRemoteUnref(LPIPCREMOTE pRemote)
     LeaveCriticalSection(&gSynchronized);
 }
 
+static int
+AcrIpcServerClose(LPIPCSERVER sp)
+{
+    LPIPCSOCK   cp;
+    LPIPCSOCK   np;
+
+    if (sp == 0)
+        return WSAENOTSOCK;
+    ACR_RING_FOREACH_SAFE(cp, np, &sp->rConnections, IPCSOCK, rLink) {
+        /* Unlink for the server */
+        ACR_RING_REMOVE(cp, rLink);
+        /* Mark the connection as aborted.
+         * The user must still close each individual
+         * connection to free the memory and resources.
+         */
+        cp->nStatus = _InterlockedOr(cp->pStatus, IPCSOCK_ABORTED);
+        if (cp->nStatus == 0) {
+            /* Inform our listeners that we are going to close.
+             */
+            SetEvent(cp->hSync[IPCSOCK_RDR]);
+            SetEvent(cp->hSync[IPCSOCK_RTT]);
+        }
+        sp->nConnections--;
+    }
+    if (sp->nConnections != 0) {
+        /* Should never happen [tm]
+         */
+        printf("[server] Found %d active. Should be zero\n", sp->nConnections);
+    }
+    SAFE_CLOSE_HANDLE(sp->hAcceptSema);
+    SAFE_CLOSE_HANDLE(sp->hAcceptSync);
+    SAFE_CLOSE_HANDLE(sp->hAcceptLock);
+    SAFE_CLOSE_HANDLE(sp->hServerMap);
+
+    AcrFree(sp);
+    return 0;
+}
+
 ACR_NET_EXPORT(jlong, IpcServerEndpoint, create0)(JNI_STDARGS, jint flags)
 {
+    acr_sd_t  *sd;
     IPCSERVER *sp;
 
-    if ((sp = (LPIPCSERVER)calloc(1, sizeof(IPCSERVER))) == 0) {
+    if ((sd = ACR_TALLOC(acr_sd_t)) == 0) {
         /* Allocation failed */
-        ACR_THROW_NET_ERROR(ACR_ENOMEM);
+        return 0;
+    }
+    if ((sp = ACR_TALLOC(IPCSERVER)) == 0) {
+        /* Allocation failed */
+        AcrFree(sd);
         return 0;
     }
     ACR_RING_INIT(&sp->rConnections, IPCSOCK, rLink);
-    sp->dwFlags = flags;
-    return P2J(sp);
+    sd->p           = sp;
+    sd->flags       = flags;
+    sd->refs        = 1;
+    sp->nReferences = 1;
+    return P2J(sd);
 }
 
 ACR_NET_EXPORT(jint, IpcServerEndpoint, bind0)(JNI_STDARGS, jlong fp,
@@ -325,13 +371,13 @@ ACR_NET_EXPORT(jint, IpcServerEndpoint, 
     DWORD dwShareLen;
     DWORD dwShareSiz = backlog * sizeof(IPCSOCK_ACCEPT) + 
sizeof(IPCSOCK_SERVER);
     acr_sockaddr_t *ca = SOCKADDR_CAST(cb);
-    LPIPCSERVER     sp = J2P(fp, LPIPCSERVER);
-    dwShareLen = ACR_ALIGN(dwShareSiz, PAGESIZE);
+    LPIPCSERVER     sp = ACR_DESC_PTR(fp, LPIPCSERVER);
 
     if (sp->hServerMap != 0) {
         SOCKADDR_RELEASE(cb, ca);
         return WSAEISCONN;
     }
+    dwShareLen     = ACR_ALIGN(dwShareSiz, PAGESIZE);
     sp->hServerMap = CreateFileMappingA(INVALID_HANDLE_VALUE,
                                         IPCSECURITY_TOKEN,
                                         PAGE_READWRITE,
@@ -368,6 +414,26 @@ failed:
     return rc;
 }
 
+ACR_NET_EXPORT(jint, IpcServerEndpoint, close0)(JNI_STDARGS, jlong fp)
+{
+    int rc = 0;
+    acr_sd_t   *sd = J2P(fp, acr_sd_t *);
+    LPIPCSERVER sp;
+
+    if (sd == 0)
+        return ACR_EBADF;
+    sp = (LPIPCSERVER)InterlockedExchangePointer(&sd->p, 0);
+    if (InterlockedDecrement(&sd->refs) == 0)
+        AcrFree(sd);
+    if (sp == 0)
+        return ACR_EBADF;
+    sp->bClosed = TRUE;
+    SetEvent(sp->hAcceptSync);
+    if (InterlockedDecrement(&sp->nReferences) == 0)
+        rc = AcrIpcServerClose(sp);
+    return rc;
+}
+
 static LPBYTE
 AcrIoBufMap(HANDLE hMap, DWORD dwSize)
 {
@@ -431,22 +497,30 @@ ApcIpcUnref(LPIPCSOCK cp)
     }
 }
 
-ACR_NET_EXPORT(jlong, IpcServerEndpoint, accept0)(JNI_STDARGS, jlong fp,
-                                                  jint timeout)
+static __inline void
+ApcIpcServerUnref(LPIPCSERVER sp)
+{
+    if (InterlockedDecrement(&sp->nReferences) == 0) {
+        DWORD rc = GetLastError();
+        AcrIpcServerClose(sp);
+        SetLastError(rc);
+    }
+}
 
+LPIPCSOCK AcrIpcAccept(LPIPCSERVER sp, int nTimeout)
 {
     int   i, rc = 0;
     DWORD ws;
     HANDLE hProcessLock = 0;
     HANDLE hClientMeta;
     IPCSOCK_ACCEPT *a;
-    IPCSOCK   *cp  = 0;
-    INT64 nTimeup  = 0;
-    LPIPCSERVER sp = J2P(fp, LPIPCSERVER);
+    IPCSOCK   *cp = 0;
+    INT64 nTimeup = 0;
 
-    if (timeout != -1 && timeout != 0) {
+    InterlockedIncrement(&sp->nReferences);
+    if (nTimeout != -1 && nTimeout != 0) {
         /* Get "future" timeout */
-        nTimeup = GetCurrentMilliseconds() + timeout;
+        nTimeup = GetCurrentMilliseconds() + nTimeout;
     }
 retry:
     AcquireMutex(sp->hAcceptLock);
@@ -454,6 +528,7 @@ retry:
         /* Reached the backlog limit.
          */
         ReleaseMutex(sp->hAcceptLock);
+        ApcIpcServerUnref(sp);
         SetLastError(WSAEMFILE);
         return 0;
     }
@@ -462,32 +537,41 @@ retry:
      * We do it once for each accept call.
      */
     if (!ReleaseSemaphore(sp->hAcceptSema, 1, 0)) {
+        ApcIpcServerUnref(sp);
         return 0;
     }
 again:
     printf("[server] Waiting on accept ...\n");
     /* Wait for a client connect */
-    ws = WaitForSingleObject(sp->hAcceptSync, timeout);
+    ws = WaitForSingleObject(sp->hAcceptSync, nTimeout);
     printf("[server] Waiting on accept : %d\n", ws);
     switch (ws) {
         case WAIT_OBJECT_0:
             /* Client signaled there is a new
              * valid connect record
              */
+            if (sp->bClosed) {
+                ApcIpcServerUnref(sp);
+                SetLastError(WSAEINTR);
+                return 0;
+            }
         break;
         case WAIT_TIMEOUT:
+            ApcIpcServerUnref(sp);
             /* Timeout */
             printf("[server] accept timeout\n");
-            if (timeout == 0)
+            if (nTimeout == 0)
                 SetLastError(WSAEWOULDBLOCK);
             else
                 SetLastError(WSAETIMEDOUT);
             return 0;
         break;
         case WAIT_FAILED:
+            ApcIpcServerUnref(sp);
             return 0;
         break;
         default:
+            ApcIpcServerUnref(sp);
             /* Error!  */
             printf("[server] illegal accept wait result\n");
             SetLastError(rc);
@@ -499,11 +583,12 @@ again:
         /* Some other thread accepted the signaled connection
          * Can happen if multiple threads are accepting.
          */
-        if ((timeout == -1) || ((nTimeup > 0) && (nTimeup > 
GetCurrentMilliseconds()))) {
+        if ((nTimeout == -1) || ((nTimeup > 0) && (nTimeup > 
GetCurrentMilliseconds()))) {
             ReleaseMutex(sp->hAcceptLock);
             printf("[server] Retrying ...\n");
             goto again;
         }
+        ApcIpcServerUnref(sp);
         /* Timeout reached */
         SetLastError(WSAETIMEDOUT);
         return 0;
@@ -524,7 +609,7 @@ again:
         /* XXX: Should we return faulty connect attempt
          * to the caller?
          */
-        if ((timeout == -1) || ((nTimeup > 0) && (nTimeup > 
GetCurrentMilliseconds()))) {
+        if ((nTimeout == -1) || ((nTimeup > 0) && (nTimeup > 
GetCurrentMilliseconds()))) {
             printf("[server] Restarting ...\n");
             goto retry;
         }
@@ -560,7 +645,7 @@ again:
     }
     cp->pStatus      = &cp->c->nStatus;
     cp->dwPageSize   = cp->c->dwPageSize;
-    cp->dwTimeout    = timeout;
+    cp->dwTimeout    = INFINITE;
     cp->nReferences  = 1;
     cp->hClientMeta  = hClientMeta;
     for (i = 0; i < 4; i++) {
@@ -611,7 +696,8 @@ again:
     ACR_RING_INSERT_TAIL(&sp->rConnections, cp, IPCSOCK, rLink);
     InterlockedIncrement(&sp->nConnections);
     LeaveCriticalSection(&gSynchronized);
-    return P2J(cp);
+    ApcIpcServerUnref(sp);
+    return cp;
 
 failed:
     InterlockedExchange(&a->nStatus, WSAECONNREFUSED);
@@ -634,30 +720,46 @@ failed:
 finally:
     CloseHandle(hProcessLock);
     CloseHandle(hClientMeta);
+    ApcIpcServerUnref(sp);
     SetLastError(rc);
     return 0;
 }
 
-ACR_NET_EXPORT(jlong, IpcEndpoint, create0)(JNI_STDARGS, jboolean msg)
+ACR_NET_EXPORT(jlong, IpcEndpoint, create0)(JNI_STDARGS, jint stype, jboolean 
block)
 {
-    IPCSOCK *cp;
+    acr_sd_t *sd;
+    IPCSOCK  *cp;
 
-    if ((cp = calloc(1, sizeof(IPCSOCK))) == 0) {
-        ACR_THROW_NET_ERROR(ACR_ENOMEM);
+    if ((sd = ACR_TALLOC(acr_sd_t)) == 0) {
+        /* Allocation failed */
+        return 0;
+    }
+    if ((cp = ACR_TALLOC(IPCSOCK)) == 0) {
+        AcrFree(sd);
         /* Allocation failed */
         return 0;
     }
     ACR_RING_ELEM_INIT(cp, rLink);
-    if (msg == JNI_FALSE)
+    if (stype != SOCK_STREAM)
         cp->dwPageSize = PAGESIZE;
-    cp->dwTimeout   = INFINITE;
+    if (block != JNI_FALSE) {
+        cp->dwTimeout   = INFINITE;
+        sd->timeout     = -1;
+    }
+    else {
+        ACR_SETFLAG(sd, ACR_SO_NONBLOCK);
+    }
     cp->nStatus     = IPCSOCK_CLOSED;
     cp->pStatus     = &cp->nStatus;
     cp->nReferences = 1;
-    return P2J(cp);
+    sd->p           = cp;
+    sd->refs        = 1;
+    sd->type        = ACR_DT_IPCSOCK;
+    return P2J(sd);
 };
 
-ACR_NET_EXPORT(jint, IpcEndpoint, connect0)(JNI_STDARGS, jlong fp, jbyteArray 
cb, jint timeout)
+static int
+AcrIpcConnect(LPIPCSOCK cp, LPCSTR szAddress, int nTimeout)
 {
     int i, rc = 0;
     INT64  nTimeup     = 0;
@@ -665,29 +767,24 @@ ACR_NET_EXPORT(jint, IpcEndpoint, connec
     HANDLE wh[2];
     HANDLE hDuplicate;
     BOOL   bRemoteInit = FALSE;
-    LPIPCSOCK        cp = J2P(fp, LPIPCSOCK);
-    acr_sockaddr_t  *ca = 0;
 
     if (cp == 0)
         return WSAENOTSOCK;
     if (cp->c != 0)
         return WSAEISCONN;
-    if (timeout != -1 && timeout != 0) {
+    if (szAddress == 0 || *szAddress == 0)
+        return WSAEINVAL;
+    if (nTimeout != -1 && nTimeout != 0) {
         /* Get "future" timeout */
-        nTimeup = GetCurrentMilliseconds() + timeout;
+        nTimeup = GetCurrentMilliseconds() + nTimeout;
     }
-    ca = SOCKADDR_CAST(cb);
     InterlockedIncrement(&cp->nReferences);
     _InterlockedOr(&cp->nStatus, IPCSOCK_CONNECTING);
-    if ((cp->rp = AcrIpcRemoteOpen(ca->hostname)) == 0) {
+    if ((cp->rp = AcrIpcRemoteOpen(szAddress)) == 0) {
         _InterlockedOr(cp->pStatus, IPCSOCK_ABORTED);
         ApcIpcUnref(cp);
-        SOCKADDR_RELEASE(cb, ca);
         return GetLastError();
     }
-    SOCKADDR_RELEASE(cb, ca);
-    printf("[client] Connecting to %d ...\n", cp->rp->s->dwProcessId);
-
     /* Create our metadata page */
     if ((cp->hClientMeta = CreateFileMapping(INVALID_HANDLE_VALUE,
                                              IPCSECURITY_TOKEN,
@@ -702,7 +799,7 @@ ACR_NET_EXPORT(jint, IpcEndpoint, connec
     wh[0] = cp->rp->hAcceptSema;
     wh[1] = cp->rp->hProcessLock;
 again:
-    rc = WaitForMultipleObjects(2, wh, FALSE, timeout);
+    rc = WaitForMultipleObjects(2, wh, FALSE, nTimeout);
     switch (rc) {
         case WAIT_OBJECT_0:
             /* Got a semaphore */
@@ -715,7 +812,7 @@ again:
         break;
         case WAIT_TIMEOUT:
             printf("[client] Timeout!\n");
-            if (timeout == 0)
+            if (nTimeout == 0)
                 rc = WSAEWOULDBLOCK;
             else
                 rc = WSAETIMEDOUT;
@@ -742,7 +839,7 @@ again:
         /* No strorage for our accept data
          */
         ReleaseMutex(cp->rp->hAcceptLock);
-        if ((timeout == -1) || ((nTimeup > 0) && (nTimeup > 
GetCurrentMilliseconds()))) {
+        if ((nTimeout == -1) || ((nTimeup > 0) && (nTimeup > 
GetCurrentMilliseconds()))) {
             printf("[client] No free connection slots. retrying\n");
             goto again;
         }
@@ -808,12 +905,12 @@ again:
     SetEvent(cp->rp->hAcceptSync);
     /* Not needed any more */
     printf("[client] Waiting for ack\n");
-    if (timeout != -1 && timeout != 0) {
+    if (nTimeout != -1 && nTimeout != 0) {
         /* Update timeout with the time we spend inside processing so far
          */
-        timeout = (int)(nTimeup - GetCurrentMilliseconds());
-        if (timeout < 0)
-            timeout = 1;
+        nTimeout = (int)(nTimeup - GetCurrentMilliseconds());
+        if (nTimeout < 0)
+            nTimeout = 1;
     }
     /* Now wait for the server ack */
     wh[0] = cp->hSync[IPCSOCK_RDR];
@@ -824,10 +921,10 @@ again:
      * There is no point to wait infinitely
      * if the server gets too busy.
      */
-     if (timeout < 0 || timeout > IPCSOCK_ACK_TIMEOUT)
-        timeout = IPCSOCK_ACK_TIMEOUT;
+     if (nTimeout < 0 || nTimeout > IPCSOCK_ACK_TIMEOUT)
+        nTimeout = IPCSOCK_ACK_TIMEOUT;
 #endif
-    rc = WaitForMultipleObjects(2, wh, FALSE, timeout);
+    rc = WaitForMultipleObjects(2, wh, FALSE, nTimeout);
     switch (rc) {
         case WAIT_OBJECT_0:
             /* Got ack event */
@@ -915,7 +1012,7 @@ cleanup:
     cp->rp = 0;
     ApcIpcUnref(cp);
     return rc;
-};
+}
 
 ACR_NET_EXPORT(jint, IpcEndpoint, tmset0)(JNI_STDARGS, jlong fp, jint timeout)
 {
@@ -952,11 +1049,10 @@ ACR_NET_EXPORT(jint, IpcEndpoint, shutdo
     return 0;
 }
 
-ACR_NET_EXPORT(jint, IpcEndpoint, close0)(JNI_STDARGS, jlong fp)
+static int
+AcrIpcClose(LPIPCSOCK cp)
 {
     int i;
-    LPIPCSOCK cp = J2P(fp, LPIPCSOCK);
-
     if (cp == 0)
         return WSAENOTSOCK;
     if (cp->c == 0) {
@@ -1028,99 +1124,12 @@ ACR_NET_EXPORT(jint, IpcEndpoint, close0
     return 0;
 }
 
-ACR_NET_EXPORT(jint, IpcServerEndpoint, close0)(JNI_STDARGS, jlong fp)
-{
-    LPIPCSERVER sp = J2P(fp, LPIPCSERVER);
-    LPIPCSOCK   cp;
-    LPIPCSOCK   np;
-
-    if (sp == 0)
-        return WSAENOTSOCK;
-    ACR_RING_FOREACH_SAFE(cp, np, &sp->rConnections, IPCSOCK, rLink) {
-        /* Unlink for the server */
-        ACR_RING_REMOVE(cp, rLink);
-        /* Mark the connection as aborted.
-         * The user must still close each individual
-         * connection to free the memory and resources.
-         */
-        cp->nStatus = _InterlockedOr(cp->pStatus, IPCSOCK_ABORTED);
-        if (cp->nStatus == 0) {
-            /* Inform our listeners that we are going to close.
-             */
-            SetEvent(cp->hSync[IPCSOCK_RDR]);
-            SetEvent(cp->hSync[IPCSOCK_RTT]);
-        }
-        sp->nConnections--;
-    }
-    if (sp->nConnections != 0) {
-        /* Should never happen [tm]
-         */
-        printf("[server] Found %d active. Should be zero\n", sp->nConnections);
-    }
-    SAFE_CLOSE_HANDLE(sp->hAcceptSema);
-    SAFE_CLOSE_HANDLE(sp->hAcceptSync);
-    SAFE_CLOSE_HANDLE(sp->hAcceptLock);
-    SAFE_CLOSE_HANDLE(sp->hServerMap);
-
-    AcrFree(sp);
-    return 0;
-}
-
-ACR_NET_EXPORT(jint, IpcEndpoint, avail0)(JNI_STDARGS, jlong fp, jboolean 
readside)
-{
-    LPIPCSOCK cp = J2P(fp, LPIPCSOCK);
-    long nAvail;
-
-    if (cp == 0) {
-        ACR_THROW_NET(WSAENOTSOCK);
-        return -1;
-    }
-    if (cp->c == 0) {
-        ACR_THROW_NET(WSAENOTCONN);
-        return -1;
-    }
-    cp->nStatus = _InterlockedOr(cp->pStatus, 0);
-    if (cp->nStatus > IPCSOCK_SHUTDOWN) {
-        /* Any attempt to read on closed connection is error.
-         */
-        if (cp->c != 0)
-            ACR_THROW_NET(cp->c->dwError);
-        else
-            ACR_THROW_NET(WSAENOTCONN);
-        return -1;
-    }
-    if (WaitForSingleObject(cp->rp->hProcessLock, 0) != WAIT_TIMEOUT) {
-        ReleaseMutex(cp->rp->hProcessLock);
-        cp->c->dwError = WSAECONNRESET;
-        InterlockedExchange(cp->pStatus, IPCSOCK_ABORTED);
-        ACR_THROW_NET(cp->c->dwError);
-        return -1;
-    }
-    MemoryBarrier();
-    if (cp->dwPageSize == 0) {
-        if (readside)
-            nAvail = cp->Mr->Length - cp->Mr->Readed;
-        else
-            nAvail = cp->Mw->Length == 0 ? IPCSOCK_MSGSIZE : 0;
-    }
-    else {
-        nAvail = cp->Rd->SendPos - cp->Rd->RecvPos;
-        if (readside == JNI_FALSE)
-            nAvail = cp->dwPageSize - nAvail;
-    }
-    if (nAvail > 0)
-        return nAvail;
-    else
-        return 0;
-}
-
 int AcrIpcRead(LPIPCSOCK cp, void *pData, int nSize)
 {
     long   nAvail;
     long   nRead     = 0;
     long   nCapacity = cp->dwPageSize;
     LPBYTE pStart    = cp->pRdbData;
-    DWORD  dwTimeout = cp->dwTimeout;
 
     if (cp == 0) {
         SetLastError(WSAENOTSOCK);
@@ -1205,14 +1214,10 @@ int AcrIpcRead(LPIPCSOCK cp, void *pData
              * XXX: INFINITE timeouts should really be some sane
              *      value which will abort the connection.
              */
-            ws = WaitForMultipleObjects(2, wh, FALSE, dwTimeout);
+            ws = WaitForMultipleObjects(2, wh, FALSE, cp->dwTimeout);
             cp->Rd->RecvWait = FALSE;
             switch (ws) {
                 case WAIT_OBJECT_0:
-                    /* Make sure we don't end up in the
-                     * wait call twice in a row
-                     */
-                    dwTimeout = 0;
                 break;
                 case WAIT_OBJECT_1:
                 case WAIT_ABANDONED_1:
@@ -1248,7 +1253,7 @@ int AcrIpcWrite(LPIPCSOCK cp, const void
     long   nSend    = 0;
     long   nChunk   = 0;
     INT64  nTimeup  = 0;
-    int    nTimeout = cp->dwTimeout;
+    int    nTimeout;
 
     if (cp == 0) {
         SetLastError(WSAENOTSOCK);
@@ -1273,7 +1278,8 @@ int AcrIpcWrite(LPIPCSOCK cp, const void
         return -1;
     }
     InterlockedIncrement(&cp->nReferences);
-    if (cp->dwTimeout != INFINITE && cp->dwTimeout != 0) {
+    nTimeout = cp->dwTimeout;
+    if (nTimeout <= 0) {
         /* Get "future" timeout */
         nTimeup = GetCurrentMilliseconds() + cp->dwTimeout;
     }
@@ -1377,7 +1383,7 @@ int AcrIpcSend(LPIPCSOCK cp, const void 
     int    rc       = 0;
     long   nSend    = 0;
     INT64  nTimeup  = 0;
-    int    nTimeout = cp->dwTimeout;
+    int    nTimeout;
 
     if (cp == 0)
         return WSAENOTSOCK;
@@ -1390,7 +1396,8 @@ int AcrIpcSend(LPIPCSOCK cp, const void 
     if (_InterlockedOr(&cp->nState, IPCSOCK_SEND) == IPCSOCK_SEND)
         return WSAEALREADY;
     InterlockedIncrement(&cp->nReferences);
-    if (cp->dwTimeout != INFINITE && cp->dwTimeout != 0) {
+    nTimeout = cp->dwTimeout;
+    if (nTimeout <= 0) {
         /* Get "future" timeout */
         nTimeup = GetCurrentMilliseconds() + cp->dwTimeout;
     }
@@ -1489,7 +1496,6 @@ int AcrIpcRecv(LPIPCSOCK cp, void *pData
 {
     long   nAvail;
     long   nRead     = 0;
-    DWORD  dwTimeout = cp->dwTimeout;
 
     if (cp == 0) {
         SetLastError(WSAENOTSOCK);
@@ -1572,7 +1578,7 @@ int AcrIpcRecv(LPIPCSOCK cp, void *pData
              * XXX: INFINITE timeouts should really be some sane
              *      value which will abort the connection.
              */
-            ws = WaitForMultipleObjects(2, wh, FALSE, dwTimeout);
+            ws = WaitForMultipleObjects(2, wh, FALSE, cp->dwTimeout);
             cp->Mr->RecvWait = FALSE;
             switch (ws) {
                 case WAIT_OBJECT_0:
@@ -1603,13 +1609,62 @@ int AcrIpcRecv(LPIPCSOCK cp, void *pData
     return nRead;
 }
 
-ACR_NET_EXPORT(jint, IpcEndpoint, flush0)(JNI_STDARGS, jlong fp)
+int AcrIpcAvail(LPIPCSOCK cp, BOOL bForRead)
+{
+    long nAvail;
+
+    if (cp == 0) {
+        SetLastError(WSAENOTSOCK);
+        return -1;
+    }
+    if (cp->c == 0) {
+        SetLastError(WSAENOTCONN);
+        return -1;
+    }
+    cp->nStatus = _InterlockedOr(cp->pStatus, 0);
+    if (cp->nStatus > IPCSOCK_SHUTDOWN) {
+        /* Any attempt to read on closed connection is error.
+         */
+        if (cp->c != 0)
+            SetLastError(cp->c->dwError);
+        else
+            SetLastError(WSAENOTCONN);
+        return -1;
+    }
+    InterlockedIncrement(&cp->nReferences);
+    if (WaitForSingleObject(cp->rp->hProcessLock, 0) != WAIT_TIMEOUT) {
+        ReleaseMutex(cp->rp->hProcessLock);
+        cp->c->dwError = WSAECONNRESET;
+        InterlockedExchange(cp->pStatus, IPCSOCK_ABORTED);
+        SetLastError(cp->c->dwError);
+        ApcIpcUnref(cp);
+        return -1;
+    }
+    MemoryBarrier();
+    if (cp->dwPageSize == 0) {
+        if (bForRead)
+            nAvail = cp->Mr->Length - cp->Mr->Readed;
+        else
+            nAvail = cp->Mw->Length == 0 ? IPCSOCK_MSGSIZE : 0;
+    }
+    else {
+        nAvail = cp->Rd->SendPos - cp->Rd->RecvPos;
+        if (!bForRead)
+            nAvail = cp->dwPageSize - nAvail;
+    }
+    ApcIpcUnref(cp);
+    if (nAvail > 0)
+        return nAvail;
+    else
+        return 0;
+}
+
+int AcrIpcFlush(LPIPCSOCK cp)
 {
     int    rc       = 0;
     long   nUsed    = 0;
     INT64  nTimeup  = 0;
     int    nTimeout;
-    LPIPCSOCK cp    = J2P(fp, LPIPCSOCK);
     volatile long *pSendWait;
 
     if (cp == 0)
@@ -1618,9 +1673,9 @@ ACR_NET_EXPORT(jint, IpcEndpoint, flush0
         return WSAENOTCONN;
     if (_InterlockedOr(&cp->nState, IPCSOCK_SEND) == IPCSOCK_SEND)
         return WSAEALREADY;
-    nTimeout = cp->dwTimeout;
     InterlockedIncrement(&cp->nReferences);
-    if (cp->dwTimeout != INFINITE && cp->dwTimeout != 0) {
+    nTimeout = cp->dwTimeout;
+    if (nTimeout <= 0) {
         /* Get "future" timeout */
         nTimeup = GetCurrentMilliseconds() + cp->dwTimeout;
     }
@@ -1713,3 +1768,69 @@ ACR_NET_EXPORT(jint, IpcEndpoint, flush0
     return rc;
 }
 
+ACR_NET_EXPORT(jlong, IpcServerEndpoint, accept0)(JNI_STDARGS, jlong fp,
+                                                  jint timeout, jboolean block)
+{
+    acr_sd_t   *sd = J2P(fp, acr_sd_t *);
+    acr_sd_t   *cd;
+    LPIPCSERVER sp;
+    LPIPCSOCK   cp;
+
+    if (sd == 0 || sd->p == 0) {
+        ACR_THROW_NET_ERROR(ACR_EBADF);
+        return 0;
+    }
+    if ((cd = ACR_TALLOC(acr_sd_t)) == 0)
+        return 0;
+    sp = (LPIPCSERVER)sd->p;
+    cp = AcrIpcAccept(sp, timeout);
+    if (cp == 0) {
+        ACR_THROW_NET_ERRNO();
+        AcrFree(cd);
+        return 0;
+    }
+    cd->p    = cp;
+    cd->refs = 1;
+    cd->type = ACR_DT_IPCSOCK;
+    if (block) {
+        cd->timeout = cp->dwTimeout;
+    }
+    else {
+        cd->timeout   = 0;
+        cp->dwTimeout = 0;
+        ACR_SETFLAG(cd, ACR_SO_NONBLOCK);
+    }
+    return P2J(cd);
+}
+
+ACR_NET_EXPORT(jint, IpcEndpoint, connect0)(JNI_STDARGS, jlong fp, jbyteArray 
cb, jint timeout)
+{
+    int rc;
+    acr_sd_t *sd = J2P(fp, acr_sd_t *);
+    LPIPCSOCK cp;
+    acr_sockaddr_t *ca;
+
+    if (sd == 0 || sd->p == 0)
+        return ACR_EBADF;
+    InterlockedIncrement(&sd->refs);
+    cp = (LPIPCSOCK)sd->p;
+    ca = SOCKADDR_CAST(cb);
+    rc = AcrIpcConnect(cp, ca->hostname, timeout);
+    SOCKADDR_RELEASE(cb, ca);
+    return rc;
+}
+
+ACR_NET_EXPORT(jint, IpcEndpoint, close0)(JNI_STDARGS, jlong fp)
+{
+    acr_sd_t *sd = J2P(fp, acr_sd_t *);
+    LPIPCSOCK sp;
+
+    if (sd == 0)
+        return ACR_EBADF;
+    sp = (LPIPCSOCK)sd->p;
+    if (InterlockedDecrement(&sd->refs) == 0)
+        AcrFree(sd);
+    else
+        sd->p = 0;
+    return AcrIpcClose(sp);
+}

Modified: commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsstream.c
URL: 
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsstream.c?rev=1154526&r1=1154525&r2=1154526&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsstream.c 
(original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsstream.c Sat Aug 
 6 15:05:02 2011
@@ -28,22 +28,85 @@
 #include "arch_sync.h"
 #include "arch_ipcs.h"
 
+typedef struct acr_ss_t {
+    acr_sd_t    *sd;
+} acr_ss_t;
+
+ACR_INLINE(LPIPCSOCK) sdretain(acr_ss_t *ss)
+{
+    if (ss->sd != 0) {
+        InterlockedIncrement(&ss->sd->refs);
+        return (LPIPCSOCK)ss->sd->p;
+    }
+    else
+        return 0;
+}
+
+ACR_INLINE(int) sdrelease(acr_ss_t *ss)
+{
+    if (ss->sd == 0)
+        return 0;
+    if (InterlockedDecrement(&ss->sd->refs) == 0) {
+        AcrFree(ss->sd);
+        ss->sd = 0;
+        return 0;
+    }
+    else
+        return 1;
+}
+
+ACR_NET_EXPORT(jlong, IpcStream, alloc0)(JNI_STDARGS, jlong fp)
+{
+    acr_ss_t *ss;
+    acr_sd_t *sd = J2P(fp, acr_sd_t *);
+
+    if (sd == 0 || sd->p == 0) {
+        ACR_THROW_NET_ERROR(ACR_EBADF);
+        return 0;
+    }
+
+    if ((ss = ACR_TALLOC(acr_ss_t)) == 0)
+        return 0;
+    ss->sd = sd;
+    return P2J(ss);
+}
+
+ACR_NET_EXPORT(jint, IpcStream, close0)(JNI_STDARGS, jlong sp)
+{
+    acr_ss_t *ss = J2P(sp, acr_ss_t *);
+
+    AcrFree(ss);
+    return 0;
+}
+
 ACR_NET_EXPORT(jint, IpcStream, read0)(JNI_STDARGS, jlong sp)
 {
+    int  rc = 0;
     int  rv = -1;
     int  rd;
     BYTE ch;
-    LPIPCSOCK cp = J2P(sp, LPIPCSOCK);
+    acr_ss_t *ss = J2P(sp, acr_ss_t *);
+    LPIPCSOCK cp;
 
-    if (ACR_HASFLAG(cp, ACR_SO_RDEOF))
-        return -1;
+    if ((cp = sdretain(ss)) == 0) {
+        rc = ACR_EBADF;
+        goto finally;
+    }
+    if (ACR_HASFLAG(ss->sd, ACR_SO_RDEOF))
+        goto finally;
     rd = AcrIpcRead(cp, &ch, 1);
     if (rd == -1)
-        ACR_THROW_NET_ERRNO();
+        rc = ACR_GET_OS_ERROR();
     else if (rd == 1)
         rv = ch;
     else
-        cp->flags |= ACR_SO_RDEOF;
+        ss->sd->flags |= ACR_SO_RDEOF;
+finally:
+    sdrelease(ss);
+    if (rc != 0) {
+        ACR_THROW_NET_ERROR(rc);
+        return -1;
+    }
     return rv;
 }
 


Reply via email to