Author: mturk
Date: Sat Aug 6 15:05:02 2011
New Revision: 1154526
URL: http://svn.apache.org/viewvc?rev=1154526&view=rev
Log:
Implement windows IPC socket references
Modified:
commons/sandbox/runtime/trunk/src/main/native/include/acr/descriptor.h
commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_ipcs.h
commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.c
commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsstream.c
Modified: commons/sandbox/runtime/trunk/src/main/native/include/acr/descriptor.h
URL:
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/include/acr/descriptor.h?rev=1154526&r1=1154525&r2=1154526&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/include/acr/descriptor.h
(original)
+++ commons/sandbox/runtime/trunk/src/main/native/include/acr/descriptor.h Sat
Aug 6 15:05:02 2011
@@ -42,6 +42,7 @@
#define ACR_DT_SOCKET 0x0003
#define ACR_DT_LOCALSOCK 0x0004
#define ACR_DT_SSLSOCK 0x0005
+#define ACR_DT_IPCSOCK 0x0006
typedef struct acr_fd_t acr_fd_t;
struct acr_fd_t {
@@ -53,6 +54,7 @@ struct acr_fd_t {
union {
HANDLE h;
SOCKET s;
+ LPVOID p;
};
#else
int f;
@@ -69,6 +71,7 @@ struct acr_sd_t {
union {
HANDLE h;
SOCKET s;
+ LPVOID p;
};
#else
int s;
Modified: commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_ipcs.h
URL:
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_ipcs.h?rev=1154526&r1=1154525&r2=1154526&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_ipcs.h
(original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/win32/arch_ipcs.h Sat Aug
6 15:05:02 2011
@@ -284,7 +284,6 @@ struct IPCSOCK
IPCSOCK_CLIENT *c;
DWORD dwPageSize; /* Allocation Page size */
DWORD dwTimeout; /* Socket timeout */
- DWORD flags; /* Socket flags */
void *pAttachment; /* Custom user object */
HANDLE hClientMeta; /* Metadata mapping */
HANDLE hBufferMap[2]; /* IO Buffer mappings
@@ -321,7 +320,9 @@ struct IPCSERVER
HANDLE hAcceptSync;
HANDLE hAcceptLock;
volatile long nConnections;
+ volatile long nReferences;
DWORD dwFlags;
+ BOOL bClosed;
};
/**
@@ -382,6 +383,8 @@ struct IPCSERVER
* evet so writter can continue with writting.
*/
+#define ACR_DESC_PTR(D, T) ((D) == 0) ? 0 : (T)((J2P(D, acr_sd_t *))->p)
+
#if defined(__cplusplus)
extern "C" {
#endif
@@ -442,6 +445,28 @@ int AcrIpcSend(LPIPCSOCK pSocket, const
*/
int AcrIpcRecv(LPIPCSOCK pSocket, void *pData, int nSize);
+/**
+ * Check if there is data present that can be read
+ * or write without blocking.
+ * @param pSocket the socket to check.
+ * @param bForRead if TRUE check read side.
+ * @return number of bytes that can be read or write
+ * without blocking, or -1 on error.
+ */
+int
+AcrIpcAvail(LPIPCSOCK pSocket, BOOL bForRead);
+
+/**
+ * Flush the output buffer and block until the peer
+ * reads the data.
+ * @param pSocket the socket to use.
+ * @return zero on success or error code.
+ * @notice The function will block if input buffer is not
+ * empty until the peer reads the entire data or until
+ * the socket times out.
+ */
+int AcrIpcFlush(LPIPCSOCK pSocket);
+
#if defined(__cplusplus)
}
#endif
Modified: commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.c
URL:
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.c?rev=1154526&r1=1154525&r2=1154526&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.c (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsock.c Sat Aug 6
15:05:02 2011
@@ -304,18 +304,64 @@ AcrIpcRemoteUnref(LPIPCREMOTE pRemote)
LeaveCriticalSection(&gSynchronized);
}
+static int
+AcrIpcServerClose(LPIPCSERVER sp)
+{
+ LPIPCSOCK cp;
+ LPIPCSOCK np;
+
+ if (sp == 0)
+ return WSAENOTSOCK;
+ ACR_RING_FOREACH_SAFE(cp, np, &sp->rConnections, IPCSOCK, rLink) {
+ /* Unlink for the server */
+ ACR_RING_REMOVE(cp, rLink);
+ /* Mark the connection as aborted.
+ * The user must still close each individual
+ * connection to free the memory and resources.
+ */
+ cp->nStatus = _InterlockedOr(cp->pStatus, IPCSOCK_ABORTED);
+ if (cp->nStatus == 0) {
+ /* Inform our listeners that we are going to close.
+ */
+ SetEvent(cp->hSync[IPCSOCK_RDR]);
+ SetEvent(cp->hSync[IPCSOCK_RTT]);
+ }
+ sp->nConnections--;
+ }
+ if (sp->nConnections != 0) {
+ /* Should never happen [tm]
+ */
+ printf("[server] Found %d active. Should be zero\n", sp->nConnections);
+ }
+ SAFE_CLOSE_HANDLE(sp->hAcceptSema);
+ SAFE_CLOSE_HANDLE(sp->hAcceptSync);
+ SAFE_CLOSE_HANDLE(sp->hAcceptLock);
+ SAFE_CLOSE_HANDLE(sp->hServerMap);
+
+ AcrFree(sp);
+ return 0;
+}
+
ACR_NET_EXPORT(jlong, IpcServerEndpoint, create0)(JNI_STDARGS, jint flags)
{
+ acr_sd_t *sd;
IPCSERVER *sp;
- if ((sp = (LPIPCSERVER)calloc(1, sizeof(IPCSERVER))) == 0) {
+ if ((sd = ACR_TALLOC(acr_sd_t)) == 0) {
/* Allocation failed */
- ACR_THROW_NET_ERROR(ACR_ENOMEM);
+ return 0;
+ }
+ if ((sp = ACR_TALLOC(IPCSERVER)) == 0) {
+ /* Allocation failed */
+ AcrFree(sd);
return 0;
}
ACR_RING_INIT(&sp->rConnections, IPCSOCK, rLink);
- sp->dwFlags = flags;
- return P2J(sp);
+ sd->p = sp;
+ sd->flags = flags;
+ sd->refs = 1;
+ sp->nReferences = 1;
+ return P2J(sd);
}
ACR_NET_EXPORT(jint, IpcServerEndpoint, bind0)(JNI_STDARGS, jlong fp,
@@ -325,13 +371,13 @@ ACR_NET_EXPORT(jint, IpcServerEndpoint,
DWORD dwShareLen;
DWORD dwShareSiz = backlog * sizeof(IPCSOCK_ACCEPT) +
sizeof(IPCSOCK_SERVER);
acr_sockaddr_t *ca = SOCKADDR_CAST(cb);
- LPIPCSERVER sp = J2P(fp, LPIPCSERVER);
- dwShareLen = ACR_ALIGN(dwShareSiz, PAGESIZE);
+ LPIPCSERVER sp = ACR_DESC_PTR(fp, LPIPCSERVER);
if (sp->hServerMap != 0) {
SOCKADDR_RELEASE(cb, ca);
return WSAEISCONN;
}
+ dwShareLen = ACR_ALIGN(dwShareSiz, PAGESIZE);
sp->hServerMap = CreateFileMappingA(INVALID_HANDLE_VALUE,
IPCSECURITY_TOKEN,
PAGE_READWRITE,
@@ -368,6 +414,26 @@ failed:
return rc;
}
+ACR_NET_EXPORT(jint, IpcServerEndpoint, close0)(JNI_STDARGS, jlong fp)
+{
+ int rc = 0;
+ acr_sd_t *sd = J2P(fp, acr_sd_t *);
+ LPIPCSERVER sp;
+
+ if (sd == 0)
+ return ACR_EBADF;
+ sp = (LPIPCSERVER)InterlockedExchangePointer(&sd->p, 0);
+ if (InterlockedDecrement(&sd->refs) == 0)
+ AcrFree(sd);
+ if (sp == 0)
+ return ACR_EBADF;
+ sp->bClosed = TRUE;
+ SetEvent(sp->hAcceptSync);
+ if (InterlockedDecrement(&sp->nReferences) == 0)
+ rc = AcrIpcServerClose(sp);
+ return rc;
+}
+
static LPBYTE
AcrIoBufMap(HANDLE hMap, DWORD dwSize)
{
@@ -431,22 +497,30 @@ ApcIpcUnref(LPIPCSOCK cp)
}
}
-ACR_NET_EXPORT(jlong, IpcServerEndpoint, accept0)(JNI_STDARGS, jlong fp,
- jint timeout)
+static __inline void
+ApcIpcServerUnref(LPIPCSERVER sp)
+{
+ if (InterlockedDecrement(&sp->nReferences) == 0) {
+ DWORD rc = GetLastError();
+ AcrIpcServerClose(sp);
+ SetLastError(rc);
+ }
+}
+LPIPCSOCK AcrIpcAccept(LPIPCSERVER sp, int nTimeout)
{
int i, rc = 0;
DWORD ws;
HANDLE hProcessLock = 0;
HANDLE hClientMeta;
IPCSOCK_ACCEPT *a;
- IPCSOCK *cp = 0;
- INT64 nTimeup = 0;
- LPIPCSERVER sp = J2P(fp, LPIPCSERVER);
+ IPCSOCK *cp = 0;
+ INT64 nTimeup = 0;
- if (timeout != -1 && timeout != 0) {
+ InterlockedIncrement(&sp->nReferences);
+ if (nTimeout != -1 && nTimeout != 0) {
/* Get "future" timeout */
- nTimeup = GetCurrentMilliseconds() + timeout;
+ nTimeup = GetCurrentMilliseconds() + nTimeout;
}
retry:
AcquireMutex(sp->hAcceptLock);
@@ -454,6 +528,7 @@ retry:
/* Reached the backlog limit.
*/
ReleaseMutex(sp->hAcceptLock);
+ ApcIpcServerUnref(sp);
SetLastError(WSAEMFILE);
return 0;
}
@@ -462,32 +537,41 @@ retry:
* We do it once for each accept call.
*/
if (!ReleaseSemaphore(sp->hAcceptSema, 1, 0)) {
+ ApcIpcServerUnref(sp);
return 0;
}
again:
printf("[server] Waiting on accept ...\n");
/* Wait for a client connect */
- ws = WaitForSingleObject(sp->hAcceptSync, timeout);
+ ws = WaitForSingleObject(sp->hAcceptSync, nTimeout);
printf("[server] Waiting on accept : %d\n", ws);
switch (ws) {
case WAIT_OBJECT_0:
/* Client signaled there is a new
* valid connect record
*/
+ if (sp->bClosed) {
+ ApcIpcServerUnref(sp);
+ SetLastError(WSAEINTR);
+ return 0;
+ }
break;
case WAIT_TIMEOUT:
+ ApcIpcServerUnref(sp);
/* Timeout */
printf("[server] accept timeout\n");
- if (timeout == 0)
+ if (nTimeout == 0)
SetLastError(WSAEWOULDBLOCK);
else
SetLastError(WSAETIMEDOUT);
return 0;
break;
case WAIT_FAILED:
+ ApcIpcServerUnref(sp);
return 0;
break;
default:
+ ApcIpcServerUnref(sp);
/* Error! */
printf("[server] illegal accept wait result\n");
SetLastError(rc);
@@ -499,11 +583,12 @@ again:
/* Some other thread accepted the signaled connection
* Can happen if multiple threads are accepting.
*/
- if ((timeout == -1) || ((nTimeup > 0) && (nTimeup >
GetCurrentMilliseconds()))) {
+ if ((nTimeout == -1) || ((nTimeup > 0) && (nTimeup >
GetCurrentMilliseconds()))) {
ReleaseMutex(sp->hAcceptLock);
printf("[server] Retrying ...\n");
goto again;
}
+ ApcIpcServerUnref(sp);
/* Timeout reached */
SetLastError(WSAETIMEDOUT);
return 0;
@@ -524,7 +609,7 @@ again:
/* XXX: Should we return faulty connect attempt
* to the caller?
*/
- if ((timeout == -1) || ((nTimeup > 0) && (nTimeup >
GetCurrentMilliseconds()))) {
+ if ((nTimeout == -1) || ((nTimeup > 0) && (nTimeup >
GetCurrentMilliseconds()))) {
printf("[server] Restarting ...\n");
goto retry;
}
@@ -560,7 +645,7 @@ again:
}
cp->pStatus = &cp->c->nStatus;
cp->dwPageSize = cp->c->dwPageSize;
- cp->dwTimeout = timeout;
+ cp->dwTimeout = INFINITE;
cp->nReferences = 1;
cp->hClientMeta = hClientMeta;
for (i = 0; i < 4; i++) {
@@ -611,7 +696,8 @@ again:
ACR_RING_INSERT_TAIL(&sp->rConnections, cp, IPCSOCK, rLink);
InterlockedIncrement(&sp->nConnections);
LeaveCriticalSection(&gSynchronized);
- return P2J(cp);
+ ApcIpcServerUnref(sp);
+ return cp;
failed:
InterlockedExchange(&a->nStatus, WSAECONNREFUSED);
@@ -634,30 +720,46 @@ failed:
finally:
CloseHandle(hProcessLock);
CloseHandle(hClientMeta);
+ ApcIpcServerUnref(sp);
SetLastError(rc);
return 0;
}
-ACR_NET_EXPORT(jlong, IpcEndpoint, create0)(JNI_STDARGS, jboolean msg)
+ACR_NET_EXPORT(jlong, IpcEndpoint, create0)(JNI_STDARGS, jint stype, jboolean
block)
{
- IPCSOCK *cp;
+ acr_sd_t *sd;
+ IPCSOCK *cp;
- if ((cp = calloc(1, sizeof(IPCSOCK))) == 0) {
- ACR_THROW_NET_ERROR(ACR_ENOMEM);
+ if ((sd = ACR_TALLOC(acr_sd_t)) == 0) {
+ /* Allocation failed */
+ return 0;
+ }
+ if ((cp = ACR_TALLOC(IPCSOCK)) == 0) {
+ AcrFree(sd);
/* Allocation failed */
return 0;
}
ACR_RING_ELEM_INIT(cp, rLink);
- if (msg == JNI_FALSE)
+ if (stype != SOCK_STREAM)
cp->dwPageSize = PAGESIZE;
- cp->dwTimeout = INFINITE;
+ if (block != JNI_FALSE) {
+ cp->dwTimeout = INFINITE;
+ sd->timeout = -1;
+ }
+ else {
+ ACR_SETFLAG(sd, ACR_SO_NONBLOCK);
+ }
cp->nStatus = IPCSOCK_CLOSED;
cp->pStatus = &cp->nStatus;
cp->nReferences = 1;
- return P2J(cp);
+ sd->p = cp;
+ sd->refs = 1;
+ sd->type = ACR_DT_IPCSOCK;
+ return P2J(sd);
};
-ACR_NET_EXPORT(jint, IpcEndpoint, connect0)(JNI_STDARGS, jlong fp, jbyteArray
cb, jint timeout)
+static int
+AcrIpcConnect(LPIPCSOCK cp, LPCSTR szAddress, int nTimeout)
{
int i, rc = 0;
INT64 nTimeup = 0;
@@ -665,29 +767,24 @@ ACR_NET_EXPORT(jint, IpcEndpoint, connec
HANDLE wh[2];
HANDLE hDuplicate;
BOOL bRemoteInit = FALSE;
- LPIPCSOCK cp = J2P(fp, LPIPCSOCK);
- acr_sockaddr_t *ca = 0;
if (cp == 0)
return WSAENOTSOCK;
if (cp->c != 0)
return WSAEISCONN;
- if (timeout != -1 && timeout != 0) {
+ if (szAddress == 0 || *szAddress == 0)
+ return WSAEINVAL;
+ if (nTimeout != -1 && nTimeout != 0) {
/* Get "future" timeout */
- nTimeup = GetCurrentMilliseconds() + timeout;
+ nTimeup = GetCurrentMilliseconds() + nTimeout;
}
- ca = SOCKADDR_CAST(cb);
InterlockedIncrement(&cp->nReferences);
_InterlockedOr(&cp->nStatus, IPCSOCK_CONNECTING);
- if ((cp->rp = AcrIpcRemoteOpen(ca->hostname)) == 0) {
+ if ((cp->rp = AcrIpcRemoteOpen(szAddress)) == 0) {
_InterlockedOr(cp->pStatus, IPCSOCK_ABORTED);
ApcIpcUnref(cp);
- SOCKADDR_RELEASE(cb, ca);
return GetLastError();
}
- SOCKADDR_RELEASE(cb, ca);
- printf("[client] Connecting to %d ...\n", cp->rp->s->dwProcessId);
-
/* Create our metadata page */
if ((cp->hClientMeta = CreateFileMapping(INVALID_HANDLE_VALUE,
IPCSECURITY_TOKEN,
@@ -702,7 +799,7 @@ ACR_NET_EXPORT(jint, IpcEndpoint, connec
wh[0] = cp->rp->hAcceptSema;
wh[1] = cp->rp->hProcessLock;
again:
- rc = WaitForMultipleObjects(2, wh, FALSE, timeout);
+ rc = WaitForMultipleObjects(2, wh, FALSE, nTimeout);
switch (rc) {
case WAIT_OBJECT_0:
/* Got a semaphore */
@@ -715,7 +812,7 @@ again:
break;
case WAIT_TIMEOUT:
printf("[client] Timeout!\n");
- if (timeout == 0)
+ if (nTimeout == 0)
rc = WSAEWOULDBLOCK;
else
rc = WSAETIMEDOUT;
@@ -742,7 +839,7 @@ again:
/* No strorage for our accept data
*/
ReleaseMutex(cp->rp->hAcceptLock);
- if ((timeout == -1) || ((nTimeup > 0) && (nTimeup >
GetCurrentMilliseconds()))) {
+ if ((nTimeout == -1) || ((nTimeup > 0) && (nTimeup >
GetCurrentMilliseconds()))) {
printf("[client] No free connection slots. retrying\n");
goto again;
}
@@ -808,12 +905,12 @@ again:
SetEvent(cp->rp->hAcceptSync);
/* Not needed any more */
printf("[client] Waiting for ack\n");
- if (timeout != -1 && timeout != 0) {
+ if (nTimeout != -1 && nTimeout != 0) {
/* Update timeout with the time we spend inside processing so far
*/
- timeout = (int)(nTimeup - GetCurrentMilliseconds());
- if (timeout < 0)
- timeout = 1;
+ nTimeout = (int)(nTimeup - GetCurrentMilliseconds());
+ if (nTimeout < 0)
+ nTimeout = 1;
}
/* Now wait for the server ack */
wh[0] = cp->hSync[IPCSOCK_RDR];
@@ -824,10 +921,10 @@ again:
* There is no point to wait infinitely
* if the server gets too busy.
*/
- if (timeout < 0 || timeout > IPCSOCK_ACK_TIMEOUT)
- timeout = IPCSOCK_ACK_TIMEOUT;
+ if (nTimeout < 0 || nTimeout > IPCSOCK_ACK_TIMEOUT)
+ nTimeout = IPCSOCK_ACK_TIMEOUT;
#endif
- rc = WaitForMultipleObjects(2, wh, FALSE, timeout);
+ rc = WaitForMultipleObjects(2, wh, FALSE, nTimeout);
switch (rc) {
case WAIT_OBJECT_0:
/* Got ack event */
@@ -915,7 +1012,7 @@ cleanup:
cp->rp = 0;
ApcIpcUnref(cp);
return rc;
-};
+}
ACR_NET_EXPORT(jint, IpcEndpoint, tmset0)(JNI_STDARGS, jlong fp, jint timeout)
{
@@ -952,11 +1049,10 @@ ACR_NET_EXPORT(jint, IpcEndpoint, shutdo
return 0;
}
-ACR_NET_EXPORT(jint, IpcEndpoint, close0)(JNI_STDARGS, jlong fp)
+static int
+AcrIpcClose(LPIPCSOCK cp)
{
int i;
- LPIPCSOCK cp = J2P(fp, LPIPCSOCK);
-
if (cp == 0)
return WSAENOTSOCK;
if (cp->c == 0) {
@@ -1028,99 +1124,12 @@ ACR_NET_EXPORT(jint, IpcEndpoint, close0
return 0;
}
-ACR_NET_EXPORT(jint, IpcServerEndpoint, close0)(JNI_STDARGS, jlong fp)
-{
- LPIPCSERVER sp = J2P(fp, LPIPCSERVER);
- LPIPCSOCK cp;
- LPIPCSOCK np;
-
- if (sp == 0)
- return WSAENOTSOCK;
- ACR_RING_FOREACH_SAFE(cp, np, &sp->rConnections, IPCSOCK, rLink) {
- /* Unlink for the server */
- ACR_RING_REMOVE(cp, rLink);
- /* Mark the connection as aborted.
- * The user must still close each individual
- * connection to free the memory and resources.
- */
- cp->nStatus = _InterlockedOr(cp->pStatus, IPCSOCK_ABORTED);
- if (cp->nStatus == 0) {
- /* Inform our listeners that we are going to close.
- */
- SetEvent(cp->hSync[IPCSOCK_RDR]);
- SetEvent(cp->hSync[IPCSOCK_RTT]);
- }
- sp->nConnections--;
- }
- if (sp->nConnections != 0) {
- /* Should never happen [tm]
- */
- printf("[server] Found %d active. Should be zero\n", sp->nConnections);
- }
- SAFE_CLOSE_HANDLE(sp->hAcceptSema);
- SAFE_CLOSE_HANDLE(sp->hAcceptSync);
- SAFE_CLOSE_HANDLE(sp->hAcceptLock);
- SAFE_CLOSE_HANDLE(sp->hServerMap);
-
- AcrFree(sp);
- return 0;
-}
-
-ACR_NET_EXPORT(jint, IpcEndpoint, avail0)(JNI_STDARGS, jlong fp, jboolean
readside)
-{
- LPIPCSOCK cp = J2P(fp, LPIPCSOCK);
- long nAvail;
-
- if (cp == 0) {
- ACR_THROW_NET(WSAENOTSOCK);
- return -1;
- }
- if (cp->c == 0) {
- ACR_THROW_NET(WSAENOTCONN);
- return -1;
- }
- cp->nStatus = _InterlockedOr(cp->pStatus, 0);
- if (cp->nStatus > IPCSOCK_SHUTDOWN) {
- /* Any attempt to read on closed connection is error.
- */
- if (cp->c != 0)
- ACR_THROW_NET(cp->c->dwError);
- else
- ACR_THROW_NET(WSAENOTCONN);
- return -1;
- }
- if (WaitForSingleObject(cp->rp->hProcessLock, 0) != WAIT_TIMEOUT) {
- ReleaseMutex(cp->rp->hProcessLock);
- cp->c->dwError = WSAECONNRESET;
- InterlockedExchange(cp->pStatus, IPCSOCK_ABORTED);
- ACR_THROW_NET(cp->c->dwError);
- return -1;
- }
- MemoryBarrier();
- if (cp->dwPageSize == 0) {
- if (readside)
- nAvail = cp->Mr->Length - cp->Mr->Readed;
- else
- nAvail = cp->Mw->Length == 0 ? IPCSOCK_MSGSIZE : 0;
- }
- else {
- nAvail = cp->Rd->SendPos - cp->Rd->RecvPos;
- if (readside == JNI_FALSE)
- nAvail = cp->dwPageSize - nAvail;
- }
- if (nAvail > 0)
- return nAvail;
- else
- return 0;
-}
-
int AcrIpcRead(LPIPCSOCK cp, void *pData, int nSize)
{
long nAvail;
long nRead = 0;
long nCapacity = cp->dwPageSize;
LPBYTE pStart = cp->pRdbData;
- DWORD dwTimeout = cp->dwTimeout;
if (cp == 0) {
SetLastError(WSAENOTSOCK);
@@ -1205,14 +1214,10 @@ int AcrIpcRead(LPIPCSOCK cp, void *pData
* XXX: INFINITE timeouts should really be some sane
* value which will abort the connection.
*/
- ws = WaitForMultipleObjects(2, wh, FALSE, dwTimeout);
+ ws = WaitForMultipleObjects(2, wh, FALSE, cp->dwTimeout);
cp->Rd->RecvWait = FALSE;
switch (ws) {
case WAIT_OBJECT_0:
- /* Make sure we don't end up in the
- * wait call twice in a row
- */
- dwTimeout = 0;
break;
case WAIT_OBJECT_1:
case WAIT_ABANDONED_1:
@@ -1248,7 +1253,7 @@ int AcrIpcWrite(LPIPCSOCK cp, const void
long nSend = 0;
long nChunk = 0;
INT64 nTimeup = 0;
- int nTimeout = cp->dwTimeout;
+ int nTimeout;
if (cp == 0) {
SetLastError(WSAENOTSOCK);
@@ -1273,7 +1278,8 @@ int AcrIpcWrite(LPIPCSOCK cp, const void
return -1;
}
InterlockedIncrement(&cp->nReferences);
- if (cp->dwTimeout != INFINITE && cp->dwTimeout != 0) {
+ nTimeout = cp->dwTimeout;
+ if (nTimeout <= 0) {
/* Get "future" timeout */
nTimeup = GetCurrentMilliseconds() + cp->dwTimeout;
}
@@ -1377,7 +1383,7 @@ int AcrIpcSend(LPIPCSOCK cp, const void
int rc = 0;
long nSend = 0;
INT64 nTimeup = 0;
- int nTimeout = cp->dwTimeout;
+ int nTimeout;
if (cp == 0)
return WSAENOTSOCK;
@@ -1390,7 +1396,8 @@ int AcrIpcSend(LPIPCSOCK cp, const void
if (_InterlockedOr(&cp->nState, IPCSOCK_SEND) == IPCSOCK_SEND)
return WSAEALREADY;
InterlockedIncrement(&cp->nReferences);
- if (cp->dwTimeout != INFINITE && cp->dwTimeout != 0) {
+ nTimeout = cp->dwTimeout;
+ if (nTimeout <= 0) {
/* Get "future" timeout */
nTimeup = GetCurrentMilliseconds() + cp->dwTimeout;
}
@@ -1489,7 +1496,6 @@ int AcrIpcRecv(LPIPCSOCK cp, void *pData
{
long nAvail;
long nRead = 0;
- DWORD dwTimeout = cp->dwTimeout;
if (cp == 0) {
SetLastError(WSAENOTSOCK);
@@ -1572,7 +1578,7 @@ int AcrIpcRecv(LPIPCSOCK cp, void *pData
* XXX: INFINITE timeouts should really be some sane
* value which will abort the connection.
*/
- ws = WaitForMultipleObjects(2, wh, FALSE, dwTimeout);
+ ws = WaitForMultipleObjects(2, wh, FALSE, cp->dwTimeout);
cp->Mr->RecvWait = FALSE;
switch (ws) {
case WAIT_OBJECT_0:
@@ -1603,13 +1609,62 @@ int AcrIpcRecv(LPIPCSOCK cp, void *pData
return nRead;
}
-ACR_NET_EXPORT(jint, IpcEndpoint, flush0)(JNI_STDARGS, jlong fp)
+int AcrIpcAvail(LPIPCSOCK cp, BOOL bForRead)
+{
+ long nAvail;
+
+ if (cp == 0) {
+ SetLastError(WSAENOTSOCK);
+ return -1;
+ }
+ if (cp->c == 0) {
+ SetLastError(WSAENOTCONN);
+ return -1;
+ }
+ cp->nStatus = _InterlockedOr(cp->pStatus, 0);
+ if (cp->nStatus > IPCSOCK_SHUTDOWN) {
+ /* Any attempt to read on closed connection is error.
+ */
+ if (cp->c != 0)
+ SetLastError(cp->c->dwError);
+ else
+ SetLastError(WSAENOTCONN);
+ return -1;
+ }
+ InterlockedIncrement(&cp->nReferences);
+ if (WaitForSingleObject(cp->rp->hProcessLock, 0) != WAIT_TIMEOUT) {
+ ReleaseMutex(cp->rp->hProcessLock);
+ cp->c->dwError = WSAECONNRESET;
+ InterlockedExchange(cp->pStatus, IPCSOCK_ABORTED);
+ SetLastError(cp->c->dwError);
+ ApcIpcUnref(cp);
+ return -1;
+ }
+ MemoryBarrier();
+ if (cp->dwPageSize == 0) {
+ if (bForRead)
+ nAvail = cp->Mr->Length - cp->Mr->Readed;
+ else
+ nAvail = cp->Mw->Length == 0 ? IPCSOCK_MSGSIZE : 0;
+ }
+ else {
+ nAvail = cp->Rd->SendPos - cp->Rd->RecvPos;
+ if (!bForRead)
+ nAvail = cp->dwPageSize - nAvail;
+ }
+ ApcIpcUnref(cp);
+ if (nAvail > 0)
+ return nAvail;
+ else
+ return 0;
+}
+
+int AcrIpcFlush(LPIPCSOCK cp)
{
int rc = 0;
long nUsed = 0;
INT64 nTimeup = 0;
int nTimeout;
- LPIPCSOCK cp = J2P(fp, LPIPCSOCK);
volatile long *pSendWait;
if (cp == 0)
@@ -1618,9 +1673,9 @@ ACR_NET_EXPORT(jint, IpcEndpoint, flush0
return WSAENOTCONN;
if (_InterlockedOr(&cp->nState, IPCSOCK_SEND) == IPCSOCK_SEND)
return WSAEALREADY;
- nTimeout = cp->dwTimeout;
InterlockedIncrement(&cp->nReferences);
- if (cp->dwTimeout != INFINITE && cp->dwTimeout != 0) {
+ nTimeout = cp->dwTimeout;
+ if (nTimeout <= 0) {
/* Get "future" timeout */
nTimeup = GetCurrentMilliseconds() + cp->dwTimeout;
}
@@ -1713,3 +1768,69 @@ ACR_NET_EXPORT(jint, IpcEndpoint, flush0
return rc;
}
+ACR_NET_EXPORT(jlong, IpcServerEndpoint, accept0)(JNI_STDARGS, jlong fp,
+ jint timeout, jboolean block)
+{
+ acr_sd_t *sd = J2P(fp, acr_sd_t *);
+ acr_sd_t *cd;
+ LPIPCSERVER sp;
+ LPIPCSOCK cp;
+
+ if (sd == 0 || sd->p == 0) {
+ ACR_THROW_NET_ERROR(ACR_EBADF);
+ return 0;
+ }
+ if ((cd = ACR_TALLOC(acr_sd_t)) == 0)
+ return 0;
+ sp = (LPIPCSERVER)sd->p;
+ cp = AcrIpcAccept(sp, timeout);
+ if (cp == 0) {
+ ACR_THROW_NET_ERRNO();
+ AcrFree(cd);
+ return 0;
+ }
+ cd->p = cp;
+ cd->refs = 1;
+ cd->type = ACR_DT_IPCSOCK;
+ if (block) {
+ cd->timeout = cp->dwTimeout;
+ }
+ else {
+ cd->timeout = 0;
+ cp->dwTimeout = 0;
+ ACR_SETFLAG(cd, ACR_SO_NONBLOCK);
+ }
+ return P2J(cd);
+}
+
+ACR_NET_EXPORT(jint, IpcEndpoint, connect0)(JNI_STDARGS, jlong fp, jbyteArray
cb, jint timeout)
+{
+ int rc;
+ acr_sd_t *sd = J2P(fp, acr_sd_t *);
+ LPIPCSOCK cp;
+ acr_sockaddr_t *ca;
+
+ if (sd == 0 || sd->p == 0)
+ return ACR_EBADF;
+ InterlockedIncrement(&sd->refs);
+ cp = (LPIPCSOCK)sd->p;
+ ca = SOCKADDR_CAST(cb);
+ rc = AcrIpcConnect(cp, ca->hostname, timeout);
+ SOCKADDR_RELEASE(cb, ca);
+ return rc;
+}
+
+ACR_NET_EXPORT(jint, IpcEndpoint, close0)(JNI_STDARGS, jlong fp)
+{
+ acr_sd_t *sd = J2P(fp, acr_sd_t *);
+ LPIPCSOCK sp;
+
+ if (sd == 0)
+ return ACR_EBADF;
+ sp = (LPIPCSOCK)sd->p;
+ if (InterlockedDecrement(&sd->refs) == 0)
+ AcrFree(sd);
+ else
+ sd->p = 0;
+ return AcrIpcClose(sp);
+}
Modified: commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsstream.c
URL:
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsstream.c?rev=1154526&r1=1154525&r2=1154526&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsstream.c
(original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/win32/ipcsstream.c Sat Aug
6 15:05:02 2011
@@ -28,22 +28,85 @@
#include "arch_sync.h"
#include "arch_ipcs.h"
+typedef struct acr_ss_t {
+ acr_sd_t *sd;
+} acr_ss_t;
+
+ACR_INLINE(LPIPCSOCK) sdretain(acr_ss_t *ss)
+{
+ if (ss->sd != 0) {
+ InterlockedIncrement(&ss->sd->refs);
+ return (LPIPCSOCK)ss->sd->p;
+ }
+ else
+ return 0;
+}
+
+ACR_INLINE(int) sdrelease(acr_ss_t *ss)
+{
+ if (ss->sd == 0)
+ return 0;
+ if (InterlockedDecrement(&ss->sd->refs) == 0) {
+ AcrFree(ss->sd);
+ ss->sd = 0;
+ return 0;
+ }
+ else
+ return 1;
+}
+
+ACR_NET_EXPORT(jlong, IpcStream, alloc0)(JNI_STDARGS, jlong fp)
+{
+ acr_ss_t *ss;
+ acr_sd_t *sd = J2P(fp, acr_sd_t *);
+
+ if (sd == 0 || sd->p == 0) {
+ ACR_THROW_NET_ERROR(ACR_EBADF);
+ return 0;
+ }
+
+ if ((ss = ACR_TALLOC(acr_ss_t)) == 0)
+ return 0;
+ ss->sd = sd;
+ return P2J(ss);
+}
+
+ACR_NET_EXPORT(jint, IpcStream, close0)(JNI_STDARGS, jlong sp)
+{
+ acr_ss_t *ss = J2P(sp, acr_ss_t *);
+
+ AcrFree(ss);
+ return 0;
+}
+
ACR_NET_EXPORT(jint, IpcStream, read0)(JNI_STDARGS, jlong sp)
{
+ int rc = 0;
int rv = -1;
int rd;
BYTE ch;
- LPIPCSOCK cp = J2P(sp, LPIPCSOCK);
+ acr_ss_t *ss = J2P(sp, acr_ss_t *);
+ LPIPCSOCK cp;
- if (ACR_HASFLAG(cp, ACR_SO_RDEOF))
- return -1;
+ if ((cp = sdretain(ss)) == 0) {
+ rc = ACR_EBADF;
+ goto finally;
+ }
+ if (ACR_HASFLAG(ss->sd, ACR_SO_RDEOF))
+ goto finally;
rd = AcrIpcRead(cp, &ch, 1);
if (rd == -1)
- ACR_THROW_NET_ERRNO();
+ rc = ACR_GET_OS_ERROR();
else if (rd == 1)
rv = ch;
else
- cp->flags |= ACR_SO_RDEOF;
+ ss->sd->flags |= ACR_SO_RDEOF;
+finally:
+ sdrelease(ss);
+ if (rc != 0) {
+ ACR_THROW_NET_ERROR(rc);
+ return -1;
+ }
return rv;
}