Author: mheath
Date: Wed Mar 14 22:04:16 2007
New Revision: 518484
URL: http://svn.apache.org/viewvc?view=rev&rev=518484
Log:
Fixed support for signal completion notification.
Fixed support for heap buffers.
Added:
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/AbstractPosixAioFuture.java
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixReadWriteAioFuture.java
Modified:
mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAioFutureImpl.java
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java
mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java
Modified: mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
URL:
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp?view=diff&rev=518484&r1=518483&r2=518484
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp Wed Mar 14
22:04:16 2007
@@ -96,7 +96,6 @@
static jfieldID FID_posixAioFuture_aioRequestPtr;
static jmethodID MID_posixAioFuture_setValue;
static jmethodID MID_posixAioFuture_setException;
-static jmethodID MID_posixAioFtgure_setValueInExecutorService;
// --- org.apache.aio.posix.PosixAioFutureImpl.PosixAioReadWriteFuture
static jclass posixAioReadWriteFuture;
@@ -150,11 +149,14 @@
jbyte* getBufferAddress(struct aio_request *request, int bufferIndex)
{
- jbyte *address = (jbyte *)request;
- for (int i = 0; i < bufferIndex && i < request->bufferCount; i++) {
+ if (bufferIndex > request->bufferCount) {
+ return NULL;
+ }
+ jbyte *address = (jbyte *)request + sizeof(aio_request) +
sizeof(buffer_size) * request->bufferCount;
+ for (int i = 0; i < bufferIndex; i++) {
address += request->buffers[i].allocatedBufferSize;
}
- return NULL;
+ return address;
}
void cleanupPosixAio(JNIEnv *env, struct aio_request *request)
@@ -225,7 +227,7 @@
*
*********************************************************************************************************************/
-void processPosixAioCallback(JNIEnv *env, struct aio_request *request, bool
callbackInExecutorService)
+void processPosixAioCallback(JNIEnv *env, struct aio_request *request)
{
LOG_DEBUG("Processing callback\n");
@@ -252,6 +254,7 @@
jclass bufferClass =
env->GetObjectClass(buffer);
int remaining = getByteBufferRemaining(env,
buffer, bufferClass);
if (remaining < totalBytes) {
+ LOG_DEBUG("Error remaining bytes is
less than total bytes read\n");
// TODO: Throw exception
cleanupPosixAio(env, request);
@@ -265,7 +268,11 @@
jint position = getByteBufferPosition(env,
buffer, bufferClass);
jbyte *address = (jbyte
*)env->GetDirectBufferAddress(buffer);
if (address == NULL) {
+ LOG_DEBUG("Copying data for heap
buffer\n");
if (byteBufferHasArray(env, buffer,
bufferClass)) {
+ LOG_DEBUG("Buffer has array\n");
+ setByteBufferPosition(env,
buffer, bufferClass, position + totalBytes);
+
jbyteArray array =
getByteBufferArray(env, buffer, bufferClass);
jbyte *arrayPtr = (jbyte
*)env->GetPrimitiveArrayCritical(array, NULL);
memcpy(arrayPtr,
request->buffers[0].buffer, totalBytes);
@@ -273,9 +280,9 @@
env->DeleteLocalRef(array);
} else {
// TODO: Handle byte buffers
that don't support arrays
+ LOG_DEBUG("Buffer does not have
array\n");
}
} else {
- LOG_DEBUG("Setting buffer position in
direct buffer\n");
setByteBufferPosition(env, buffer,
bufferClass, position + totalBytes);
}
@@ -286,11 +293,7 @@
LOG_DEBUG("Invoke callbacks\n");
jobject totalBytesObj = env->CallStaticObjectMethod(Integer,
MID_Integer_valueOf, totalBytes);
- if (callbackInExecutorService) {
- env->CallVoidMethod(request->future,
MID_posixAioFtgure_setValueInExecutorService, totalBytesObj);
- } else {
- env->CallVoidMethod(request->future,
MID_posixAioFuture_setValue, totalBytesObj);
- }
+ env->CallVoidMethod(request->future,
MID_posixAioFuture_setValue, totalBytesObj);
env->DeleteLocalRef(totalBytesObj);
LOG_DEBUG("Done with callbacks\n");
} else {
@@ -331,7 +334,7 @@
perror("Unable to obtain reference to JNI Environment,
can not handle AIO completion\n");
} else {
struct aio_request *req = (struct aio_request
*)info->si_value.sival_ptr;
- processPosixAioCallback(env, req, true);
+ processPosixAioCallback(env, req);
}
}
}
@@ -348,7 +351,7 @@
}
struct aio_request *req = (aio_request *)sigval.sival_ptr;
- processPosixAioCallback(env, req, false);
+ processPosixAioCallback(env, req);
jvm->DetachCurrentThread();
return;
@@ -409,12 +412,11 @@
cls = env->FindClass("org/apache/aio/posix/PosixAioFutureImpl");
NULL_CHECK(cls)
FID_posixAioFuture_aioRequestPtr = env->GetFieldID(cls,
"aioRequestPtr", "J"); NULL_CHECK(FID_posixAioFuture_aioRequestPtr)
MID_posixAioFuture_setValue = env->GetMethodID(cls, "setValue",
"(Ljava/lang/Object;)V"); NULL_CHECK(MID_posixAioFuture_setValue)
- MID_posixAioFtgure_setValueInExecutorService = env->GetMethodID(cls,
"setValueInExecutorService", "(Ljava/lang/Object;)V");
NULL_CHECK(MID_posixAioFuture_setValue)
MID_posixAioFuture_setException = env->GetMethodID(cls, "setException",
"(Ljava/lang/Throwable;)V"); NULL_CHECK(MID_posixAioFuture_setException)
// --- org.apache.aio.posix.PosixAioFutureImpl.PosixReadWriteAioFuture
--------------
LOG_DEBUG("Getting ids for
PosixAioFutureImpl.PosixReadWriteAioFuture\n")
- cls =
env->FindClass("org/apache/aio/posix/PosixAioFutureImpl$PosixReadWriteAioFuture");
NULL_CHECK(cls)
+ cls = env->FindClass("org/apache/aio/posix/PosixReadWriteAioFuture");
NULL_CHECK(cls)
posixAioReadWriteFuture = (jclass)env->NewWeakGlobalRef(cls);
NULL_CHECK(posixAioReadWriteFuture)
CID_posixAioReadWriteFuture = env->GetMethodID(posixAioReadWriteFuture,
"<init>",
"(Lorg/apache/aio/posix/PosixAsynchronousFileChannel;JLjava/nio/ByteBuffer;J)V");
NULL_CHECK(CID_posixAioReadWriteFuture)
FID_posixAioReadWriteFuture_buffer =
env->GetFieldID(posixAioReadWriteFuture, "buffer", "Ljava/nio/ByteBuffer;");
NULL_CHECK(FID_posixAioReadWriteFuture_buffer)
@@ -478,7 +480,14 @@
address += bufferPosition;
}
+ LOG_DEBUG_PARAM("Buffer size to allocate %d\n", bufferSizeToAllocate);
+
struct aio_request *req = allocateRequest(1, bufferSizeToAllocate);
+
+ if (req == NULL) {
+ // Error occured ??? Do with throw an OutOfMemory exception?
+ return NULL;
+ }
// Setup aio_request fields
req->buffers[0].bufferSize = bufferSize;
@@ -492,11 +501,13 @@
jobject future = createPosixReadWriteAioFuture(env, self, req, buffer,
position);
if (future == NULL) {
// Error occured, return and let Java throw exception
+ free(req);
return NULL;
}
req->future = env->NewGlobalRef(future);
if (req->future == NULL) {
// Error occured, return and let Java throw exception
+ free(req);
return NULL;
}
@@ -512,16 +523,19 @@
req->cb.aio_sigevent.sigev_value.sival_ptr = req;
if (env->GetBooleanField(self, FID_posixChannelThreadNotify)) {
// Do thread notification
+ LOG_DEBUG("Doing thread notification\n");
req->cb.aio_sigevent.sigev_notify = SIGEV_THREAD;
req->cb.aio_sigevent.sigev_notify_function = aioThreadHandler;
} else {
// Do SIG notification
+ LOG_DEBUG("Doing signal notification\n");
req->cb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
req->cb.aio_sigevent.sigev_signo = SIG_AIO;
}
LOG_DEBUG("Calling aio_read\n");
int ret = aio_read(&req->cb);
+ LOG_DEBUG("Aio read returned\n");
if (ret) {
// TODO Handle each error return type as appropriate
perror("Error issuing aio_read");
Added:
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/AbstractPosixAioFuture.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/AbstractPosixAioFuture.java?view=auto&rev=518484
==============================================================================
---
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/AbstractPosixAioFuture.java
(added)
+++
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/AbstractPosixAioFuture.java
Wed Mar 14 22:04:16 2007
@@ -0,0 +1,5 @@
+package org.apache.aio.posix;
+
+public class AbstractPosixAioFuture {
+
+}
Modified:
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAioFutureImpl.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAioFutureImpl.java?view=diff&rev=518484&r1=518483&r2=518484
==============================================================================
---
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAioFutureImpl.java
(original)
+++
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAioFutureImpl.java
Wed Mar 14 22:04:16 2007
@@ -19,49 +19,20 @@
*/
package org.apache.aio.posix;
-import java.nio.ByteBuffer;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.aio.AioCallbackException;
-import org.apache.aio.AsynchronousFileChannel;
import org.apache.aio.concurrent.AbstractAioFutureCallBackHandler;
public class PosixAioFutureImpl<V> extends AbstractAioFutureCallBackHandler<V>
{
- public static class PosixReadWriteAioFuture extends
PosixAioFutureImpl<Integer> implements ByteBufferFuture {
-
- private final ByteBuffer buffer;
- private final long position;
-
- public PosixReadWriteAioFuture(PosixAsynchronousFileChannel channel,
long aioRequestPtr, ByteBuffer buffer, long position) {
- super(channel, aioRequestPtr);
- this.buffer = buffer;
- this.position = position;
- }
-
- public ByteBuffer getBuffer() {
- return buffer;
- }
-
- public long getPosition() {
- return position;
- }
-
- }
-
private final PosixAsynchronousFileChannel channel;
@SuppressWarnings("unused")
private final long aioRequestPtr; // The pointer to the aio request struct
(used by native code)
- private final Lock lock = new ReentrantLock();
- private final Condition condition = lock.newCondition();
-
private volatile Throwable exception;
private volatile boolean done = false;
private volatile V value = null;
@@ -72,34 +43,40 @@
}
public V get() throws InterruptedException, ExecutionException,
AioCallbackException {
- lock.lockInterruptibly();
- try {
- // TOOD - Come up with a better blocking mechanism than a spin
lock that works with signals
- while (!done) {
- Thread.yield();
- System.out.print(".");
+ if (!done) {
+ if (channel.isThreadNotify()) {
+ synchronized (this) {
+ wait();
+ }
+ } else {
+ while (!done) {
+ Thread.yield();
+ }
}
- System.out.println("Done with get!");
- return getValue();
- } finally {
- lock.unlock();
}
+ return getValue();
}
public V get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
- lock.lockInterruptibly();
- try {
- if (done) {
- return getValue();
+ if (!done) {
+ if (channel.isThreadNotify()) {
+ synchronized (this) {
+ wait(unit.toMillis(timeout));
+ }
+ } else {
+ long start = System.currentTimeMillis();
+ while (!done) {
+ Thread.yield();
+ if ((System.currentTimeMillis() - start) >
unit.toMillis(timeout)) {
+ throw new TimeoutException();
+ }
+ }
}
- condition.await(timeout, unit);
- return getValue();
- } finally {
- lock.unlock();
}
+ return getValue();
}
- public AsynchronousFileChannel getChannel() {
+ public PosixAsynchronousFileChannel getChannel() {
return channel;
}
@@ -111,11 +88,8 @@
if (canceled) {
done = true;
exception = new CancellationException();
- lock.lock();
- try {
- condition.signalAll();
- } finally {
- lock.unlock();
+ synchronized (this) {
+ notifyAll();
}
}
return canceled;
@@ -131,6 +105,13 @@
return done;
}
+ /**
+ * If there's a valid assocaited with this future, return it. If there's
an exception associated with this future,
+ * throw it.
+ *
+ * @return The value associated with tis future.
+ * @throws ExecutionException If an error occured while processing this
future, an exception is thrown
+ */
protected V getValue() throws ExecutionException {
if (exception != null) {
if (exception instanceof ExecutionException) {
@@ -141,36 +122,40 @@
return value;
}
+ /**
+ * Sets the value assocaited with this future.
+ *
+ * @param value The value to assocaited with this future.
+ */
protected void setValue(V value) {
this.value = value;
done = true;
signal();
}
- protected void setValueInExecutorService(final V value) {
- channel.getExecutorService().submit(new Runnable() {
- public void run() {
- setValue(value);
- }
- });
- }
-
+ /**
+ * Sets the exception that occured while processing this future.
+ *
+ * @param exception The exception that occurred.
+ */
public void setException(Throwable exception) {
this.exception = exception;
done = true;
signal();
}
- private void signal() {
- System.out.println("Signalling");
- lock.lock();
- try {
- condition.signalAll();
- } finally {
- lock.unlock();
+ /**
+ * Wakes up any threads waiting on this future's compeltion. Calls any
observing completion handlers.
+ *
+ */
+ private synchronized void signal() {
+ notifyAll();
+ if (channel.isThreadNotify()) {
+ // TODO: Make this configurable so even if thread callback
handlers are being used, the callback handlers can be called in the
ExecutorService
+ callCompletionHandlers();
+ } else {
+ channel.callCompletionHandlersInExecutorService(this);
}
-
- callCompletionHandlers();
}
}
Modified:
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java?view=diff&rev=518484&r1=518483&r2=518484
==============================================================================
---
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java
(original)
+++
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java
Wed Mar 14 22:04:16 2007
@@ -15,8 +15,8 @@
public class PosixAsynchronousFileChannel extends
ConcurrentAsynchronousFileChannel {
- @SuppressWarnings("unused") // This is used by native code
private final boolean threadNotify;
+
@SuppressWarnings("unused")
private final FileDescriptor fd;
@@ -41,8 +41,7 @@
@Override
public native BatchFuture write(ByteBufferPosition[] byteBufferPositions,
int offset, int length);
- public void test() {
- System.out.println("This is from Java");
- System.out.println("I love JNI");
+ public boolean isThreadNotify() {
+ return threadNotify;
}
}
Added:
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixReadWriteAioFuture.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixReadWriteAioFuture.java?view=auto&rev=518484
==============================================================================
---
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixReadWriteAioFuture.java
(added)
+++
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixReadWriteAioFuture.java
Wed Mar 14 22:04:16 2007
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.aio.posix;
+
+import java.nio.ByteBuffer;
+
+import org.apache.aio.AioFuture.ByteBufferFuture;
+
+public class PosixReadWriteAioFuture extends PosixAioFutureImpl<Integer>
implements ByteBufferFuture {
+
+ private final ByteBuffer buffer;
+ private final long position;
+
+ public PosixReadWriteAioFuture(PosixAsynchronousFileChannel channel, long
aioRequestPtr, ByteBuffer buffer, long position) {
+ super(channel, aioRequestPtr);
+ this.buffer = buffer;
+ this.position = position;
+ }
+
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
+
+ public long getPosition() {
+ return position;
+ }
+
+}
\ No newline at end of file
Modified:
mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java?view=diff&rev=518484&r1=518483&r2=518484
==============================================================================
---
mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java
(original)
+++
mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java
Wed Mar 14 22:04:16 2007
@@ -4,6 +4,7 @@
import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
+import org.apache.aio.AioCompletionHandler;
import org.apache.aio.AioFuture;
import org.apache.aio.Modes;
@@ -20,9 +21,21 @@
in.getChannel(),
Modes.READ_ONLY,
Executors.newCachedThreadPool(), true);
- ByteBuffer buffer = ByteBuffer.allocateDirect(1000);
+ ByteBuffer buffer = ByteBuffer.allocate(1000);
AioFuture<Integer> future = channel.read(buffer, 0L);
+ System.out.println("read returned");
+ future.addCompletionHandler(new AioCompletionHandler<Integer>() {
+ public void onCompletion(AioFuture<Integer> future) {
+ try {
+ System.out.println(String.format("Completion handler read
%d bytes", future.get()));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
future.get();
+ System.out.println("********");
+ System.out.println(buffer.position());
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);