Author: mheath
Date: Mon Jan  8 22:23:54 2007
New Revision: 494327

URL: http://svn.apache.org/viewvc?view=rev&rev=494327
Log:
Added write support

Modified:
    mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile
    mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
    
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java
    
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java
    mina/sandbox/mheath/aioj/trunk/todo.txt

Modified: mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile?view=diff&rev=494327&r1=494326&r2=494327
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile Mon Jan  8 22:23:54 2007
@@ -28,6 +28,11 @@
 $(TARGET): org_apache_aio.cpp 
$(WORKING_DIR)/org_apache_aio_AsynchronousFileChannel.h 
$(WORKING_DIR)/org_apache_aio_posix_PosixAioFutureReadWrite.h
        g++ -shared -lrt -lstdc++ $(INCLUDES) org_apache_aio.cpp -o $(TARGET)
 
-$(WORKING_DIR)/org_apache_aio_AsynchronousFileChannel.h 
$(WORKING_DIR)/org_apache_aio_posix_PosixAioFutureReadWrite.h: 
$(TARGET_DIR)/classes/org/apache/aio/AsynchronousFileChannel.class 
$(TARGET_DIR)/classes/org/apache/aio/posix/PosixAioFutureReadWrite.class
+$(WORKING_DIR)/org_apache_aio_AsynchronousFileChannel.h: 
$(TARGET_DIR)/classes/org/apache/aio/AsynchronousFileChannel.class
        mkdir -p $(TARGET_DIR)/jni
-       javah -classpath $(JAVAH_CLASSPATH) -d $(WORKING_DIR) 
org.apache.aio.AsynchronousFileChannel 
org.apache.aio.posix.PosixAioFutureReadWrite
\ No newline at end of file
+       javah -force -classpath $(JAVAH_CLASSPATH) -d $(WORKING_DIR) 
org.apache.aio.AsynchronousFileChannel
+
+$(WORKING_DIR)/org_apache_aio_posix_PosixAioFutureReadWrite.h: 
$(TARGET_DIR)/classes/org/apache/aio/posix/PosixAioFutureReadWrite.class
+       mkdir -p $(TARGET_DIR)/jni
+       javah -force -classpath $(JAVAH_CLASSPATH) -d $(WORKING_DIR) 
org.apache.aio.posix.PosixAioFutureReadWrite
+       
\ No newline at end of file

Modified: mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp?view=diff&rev=494327&r1=494326&r2=494327
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp Mon Jan  8 
22:23:54 2007
@@ -16,10 +16,19 @@
 #define LOG_DEBUG(s)
 #endif
 
+struct aio_request
+{
+       struct aiocb aio;
+       jobject operation;
+       jobject future;
+       jint allocated_buffer_size;
+       jbyte buffer[];
+};
+
 // --- jvm handler ---
+static JavaVM *jvm;
 
 // --- Exception classes ---
-static JavaVM *jvm;
 static jclass ioException;
 static jclass nullPointerException;
 
@@ -31,9 +40,11 @@
 static jmethodID abstractAioFuture_processFutureListenersID;
 static jmethodID abstractAioFuture_handleErrorID;
 
+// --- IDs for file descriptors ---
 static jfieldID fdID; // ID for java.io.FileDescriptor.fd
 static jfieldID fieldDescID; // ID for 
org.apache.aio.AsynchronousFileChannel.fd
 
+// --- Operation enums ---
 static jobject operationRead;
 static jobject operationWrite;
 static jobject operationBatchRead;
@@ -43,12 +54,11 @@
 {
        JNIEXPORT jint JNICALL JNI_OnLoad(JavaVM *vm, void *reserved);
        JNIEXPORT void JNICALL JNI_OnUnload(JavaVM *vm, void *reserved);
-       void aio_read_completion_handler(sigval_t sigval);
-       void aio_write_completion_handler(sigval_t sigval);
+       void aio_read_write_completion_handler(sigval_t sigval);
 }
 
 // --- Utility Functions ----------------------------------------------------
-jint getFD(JNIEnv *env, jobject asynchFileChannel)
+inline jint getFD(JNIEnv *env, jobject asynchFileChannel)
 {
        jobject fieldDesc = env->GetObjectField(asynchFileChannel, fieldDescID);
        return env->GetIntField(fieldDesc, fdID);
@@ -58,13 +68,16 @@
 {
        jclass cls = env->GetObjectClass(buffer);
        jmethodID MID_position = env->GetMethodID(cls, "position", "()I");
-       return env->CallIntMethod(buffer, MID_position); 
+       env->DeleteLocalRef(cls);
+       jint ret = env->CallIntMethod(buffer, MID_position);
+       return  ret;
 }
 
 inline void setBufferPosition(JNIEnv *env, jobject buffer, jint position)
 {
        jclass cls = env->GetObjectClass(buffer);
        jmethodID MID_position = env->GetMethodID(cls, "position", 
"(I)Ljava/nio/Buffer;");
+       env->DeleteLocalRef(cls);
        env->CallVoidMethod(buffer, MID_position, position);
 }
 
@@ -72,6 +85,7 @@
 {
        jclass cls = env->GetObjectClass(buffer);
        jmethodID MID_limit = env->GetMethodID(cls, "limit", "()I");
+       env->DeleteLocalRef(cls);
        return env->CallIntMethod(buffer, MID_limit); 
 }
 
@@ -79,10 +93,11 @@
 {
        jclass cls = env->GetObjectClass(buffer);
        jmethodID MID_limit = env->GetMethodID(cls, "limit", 
"(I)Ljava/nio/Buffer;");
+       env->DeleteLocalRef(cls);
        env->CallVoidMethod(buffer, MID_limit, limit);
 }
 
-jobject createPosixAioFutureReadWrite(JNIEnv *env, jobject channel, jobject 
operation, void *aiocb, jobject buffer, long position)
+inline jobject createPosixAioFutureReadWrite(JNIEnv *env, jobject channel, 
jobject operation, void *aiocb, jobject buffer, long position)
 {
        jvalue values[5];
        values[0].l = channel;
@@ -94,43 +109,58 @@
        return env->NewGlobalRef(future);
 }
 
-struct aiocb *setupAioRequest(JNIEnv *env, jobject asynchronousFileChannel, 
jobject buffer, jlong position, jobject operation)
+struct aio_request *setupAioRequest(JNIEnv *env, jobject 
asynchronousFileChannel, jobject buffer, jlong position, jobject operation)
 {
-               // Get address and capacity of buffer
+       // Get address and capacity of buffer
        if (buffer == NULL) {
                env->ThrowNew(nullPointerException, "buffer cannot be null");
+               return NULL;
        }
-       // TODO: Add support for non direct byte buffers
+       // Adjust bufferAddress by the position of the buffer
+       jint bufferPosition = getBufferPosition(env, buffer);
+       jint bufferLimit = getBufferLimit(env, buffer)
+       jint bufferSize = bufferLimit - bufferPosition;
+
+       jint bufferToAllocate;
        jbyte *bufferAddress = (jbyte *)env->GetDirectBufferAddress(buffer);
        if (bufferAddress == NULL) {
-               env->ThrowNew(ioException, "Must use direct ByteBuffer");
-               return NULL;
+               bufferToAllocate = bufferSize;
+       } else {
+               bufferAddress += bufferPosition; 
+               bufferToAllocate = 0;
        }
-       // Adjust bufferAddress by the position of the buffer
-       jint bufferPosition = getBufferPosition(env, buffer); 
-       bufferAddress += bufferPosition; 
-       jint bufferSize = getBufferLimit(env, buffer) - position;
 
        // Allocate aiocb
-       struct aiocb *req = (aiocb*)malloc(sizeof(aiocb));
-       bzero(req, sizeof(struct aiocb));
+       int aio_request_size = sizeof(aio_request) + bufferToAllocate;
+       struct aio_request *req = (aio_request *)malloc(aio_request_size);
+       bzero(req, aio_request_size);
+       
+       if (bufferAddress == NULL) {
+               req->allocated_buffer_size = bufferToAllocate;
+               bufferAddress = req->buffer;
+               
+               //jbytearray env->NewByteArray(bufferSize);
+               
+               // TODO: For write, read data from buffer and put into 
allocated buffer and forward position
+       }
+       
+       // TODO: For write, increment position
        
-       jobject future = createPosixAioFutureReadWrite(env, 
asynchronousFileChannel, operation, req, buffer, position);
+       req->future = createPosixAioFutureReadWrite(env, 
asynchronousFileChannel, operation, req, buffer, position);
 
        // Setup aiocb
-       req->aio_fildes = getFD(env, asynchronousFileChannel);
-       req->aio_offset = position;
-       req->aio_buf = bufferAddress;
-       req->aio_nbytes = bufferSize;
-       
-       req->aio_sigevent.sigev_notify = SIGEV_THREAD;
-       if (operation == operationRead) {
-               req->aio_sigevent.sigev_notify_function = 
aio_read_completion_handler;
-       } else if (operation == operationWrite) {
-               req->aio_sigevent.sigev_notify_function = 
aio_write_completion_handler;
-       }
-       req->aio_sigevent.sigev_notify_attributes = NULL;
-       req->aio_sigevent.sigev_value.sival_ptr = future;
+       req->aio.aio_fildes = getFD(env, asynchronousFileChannel);
+       req->aio.aio_offset = position;
+       req->aio.aio_buf = bufferAddress;
+       req->aio.aio_nbytes = bufferSize;
+       
+       // Set other aio request fields
+       req->operation = operation;
+       
+       req->aio.aio_sigevent.sigev_notify = SIGEV_THREAD;
+       req->aio.aio_sigevent.sigev_notify_function = 
aio_read_write_completion_handler;
+       req->aio.aio_sigevent.sigev_notify_attributes = NULL;
+       req->aio.aio_sigevent.sigev_value.sival_ptr = req;
        
        return req;
 }
@@ -214,14 +244,15 @@
        fprintf(stdout, "aio write at file position %d\n", position);
        fflush(stdout);
 #endif
-       struct aiocb *req = setupAioRequest(env, obj, buffer, position, 
operationWrite);
-       int ret = aio_write(req);
+       struct aio_request *req = setupAioRequest(env, obj, buffer, position, 
operationWrite);
+       
+       int ret = aio_write(&req->aio);
        if (ret) {
                // TODO Handle errors from ret
                // return a null on error.
                return NULL;
        }
-       return (jobject)req->aio_sigevent.sigev_value.sival_ptr;
+       return req->future;
 }
 
 JNIEXPORT jobject JNICALL Java_org_apache_aio_AsynchronousFileChannel_read
@@ -231,9 +262,9 @@
        fprintf(stdout, "aio read at file position %d\n", position);
        fflush(stdout);
 #endif
-       struct aiocb *req = setupAioRequest(env, obj, buffer, position, 
operationRead);
+       struct aio_request *req = setupAioRequest(env, obj, buffer, position, 
operationRead);
        LOG_DEBUG("Do aio read\n");
-       int ret = aio_read(req);
+       int ret = aio_read(&req->aio);
        if (ret) {
                // TODO Handle errors from ret
                // return a null on error.
@@ -241,7 +272,7 @@
        }
 
        // Return future object 
-       return (jobject)req->aio_sigevent.sigev_value.sival_ptr;
+       return req->future;
 }
 
 JNIEXPORT jobject JNICALL Java_org_apache_aio_AsynchronousFileChannel_batchRead
@@ -268,8 +299,8 @@
 JNIEXPORT jboolean JNICALL 
Java_org_apache_aio_posix_PosixAioFutureReadWrite_cancel
   (JNIEnv *env, jobject obj)
 {
-       struct aiocb *req = (struct aiocb *)env->GetLongField(obj, 
posixAioFutureReadWrite_aiocbPtrID);
-       int ret = aio_cancel(req->aio_fildes, req);
+       struct aio_request *req = (struct aio_request *)env->GetLongField(obj, 
posixAioFutureReadWrite_aiocbPtrID);
+       int ret = aio_cancel(req->aio.aio_fildes, &req->aio);
        if (ret == AIO_CANCELED) {
                free(req);
                return 1;
@@ -278,65 +309,48 @@
 }
 
 // --- Completion handlers 
----------------------------------------------------------------
-enum operation_mode {READ, WRITE};
-
-void aio_read_write_completion_handler(sigval_t sigval, operation_mode mode, 
char *jvmAttachErrorMessage, char *failureMessage)
+void aio_read_write_completion_handler(sigval_t sigval)
 {
+       LOG_DEBUG("In AIO completion handler\n")
+
        JNIEnv *env;
        jint res = jvm->AttachCurrentThread((void**)&env, NULL);
        if (res < 0) {
-               fprintf(stderr, jvmAttachErrorMessage);
+               fprintf(stderr, "Could not attach JVM to AIO thread");
                fflush(stderr);
                return;
        }
        
-       jobject future = (jobject)sigval.sival_ptr;
-       struct aiocb *req = (struct aiocb *)env->GetLongField(future, 
posixAioFutureReadWrite_aiocbPtrID);
+       struct aio_request *req = (aio_request *)sigval.sival_ptr;
        
        /* Did the request complete? */
-       if (aio_error(req) == 0) {
+       if (aio_error(&req->aio) == 0) {
                /* Request completed successfully, get number of bytes 
processed */
-               int ret = aio_return( req );
-               
-               if (mode == READ) {
+               int ret = aio_return(&req->aio);
+
+               if (req->operation == operationRead) {
+                       jobject buffer = env->GetObjectField(req->future, 
posixAioFutureReadWrite_bufferID);
                        // Adjust buffer limit
-                       jobject buffer = env->GetObjectField(future, 
posixAioFutureReadWrite_bufferID);
+                       // TODO: Add support for writing read data to heap byte 
buffer 
+
+                       // Get buffer position
                        int limit = ret + getBufferPosition(env, buffer);
                        setBufferLimit(env, buffer, limit);
-               } else if (mode == WRITE) {
-                       // Adjust buffer position
-                       jobject buffer = env->GetObjectField(future, 
posixAioFutureReadWrite_bufferID);
-                       int position = getBufferPosition(env, buffer) + ret;
-                       setBufferPosition(env, buffer, position);
                }
                
                // Call aio listeners
-               env->CallVoidMethod(future, 
abstractAioFuture_processFutureListenersID);
+               env->CallVoidMethod(req->future, 
abstractAioFuture_processFutureListenersID);
        } else {
                // TODO Write a unit test for this
-               env->ThrowNew(ioException, failureMessage);
+               env->ThrowNew(ioException, "Could not complete AIO request");
                jthrowable e = env->ExceptionOccurred();
-               env->CallVoidMethod(future, abstractAioFuture_handleErrorID, e);
+               env->CallVoidMethod(req->future, 
abstractAioFuture_handleErrorID, e);
        }
                
        // Free resources               
+       env->DeleteGlobalRef(req->future);
        free(req);
-       env->DeleteGlobalRef(future);
 
        jvm->DetachCurrentThread();
        return;
 } 
-
-void aio_read_completion_handler(sigval_t sigval)
-{
-       LOG_DEBUG("In read completion handler\n")
-       
-       aio_read_write_completion_handler(sigval, READ, "Failed to attach JVM 
to AIO read thread\n", "Could not complete read request");
-}
-
-void aio_write_completion_handler(sigval_t sigval)
-{
-       LOG_DEBUG("In write completion handler\n")
-
-       aio_read_write_completion_handler(sigval, WRITE, "Failed to attach JVM 
to AIO write thread\n", "Could not complete write request");
-}

Modified: 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java?view=diff&rev=494327&r1=494326&r2=494327
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java 
(original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java 
Mon Jan  8 22:23:54 2007
@@ -4,6 +4,12 @@
 
 public interface AioFuture<T extends AioFuture> {
 
+       public boolean cancel();
+
+       public boolean isCancelled();
+       
+       public boolean isDone();
+
        public void addListener(AioFutureListener<T> ioFutureListener);
        
        public void removeListener(AioFutureListener<T> ioFutureListener);
@@ -14,17 +20,11 @@
        
        public AsynchronousFileChannel getChannel();
        
-       public boolean isCompleted();
-
-       public boolean isSuccessful();
-       
        public Throwable getException();
        
        public void join() throws InterruptedException;
        
        public boolean join(long timeout, TimeUnit timeUnit) throws 
InterruptedException;
-       
-       public boolean cancel();
        
        public Operation getOperation();
        

Modified: 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java?view=diff&rev=494327&r1=494326&r2=494327
==============================================================================
--- 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java
 (original)
+++ 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java
 Mon Jan  8 22:23:54 2007
@@ -1,12 +1,19 @@
 package org.apache.aio;
 
+import java.io.Closeable;
 import java.io.FileDescriptor;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.Channel;
 
-public class AsynchronousFileChannel {
+/**
+ * 
+ * @author Mike Heath <[EMAIL PROTECTED]>
+ */
+public class AsynchronousFileChannel implements Closeable, Channel {
 
        static {
+               // TODO Set up some loader so we can store the .so in the 
aioj.jar
                System.loadLibrary("aioj");
        }
 
@@ -16,9 +23,8 @@
                if (fd == null) {
                        throw new NullPointerException("fd cannot be null");
                }
-               SecurityManager sm = System.getSecurityManager();
-               if (sm != null) {
-                       sm.checkWrite(fd);
+               if (!fd.valid()) {
+                       throw new IOException("fd must be a valid 
FileDescriptor");
                }
                this.fd = fd;
        }
@@ -31,11 +37,20 @@
                return fd.valid();
        }
        
-       public native AioFutureReadWrite write(ByteBuffer buffer, long 
position) throws IOException;
+       /**
+        * Calling close closes the AsynchronousFileChannel but does not 
actually close
+        * the file.  You must close the file object where the file descriptor 
originated from.
+        *
+        */
+       public void close() {
+               fd = new FileDescriptor();
+       }
        
        public native AioFutureReadWrite read(ByteBuffer buffer, long position) 
throws IOException;
        
        public native AioFutureBatch batchRead(BatchRequest... reads) throws 
IOException;
+       
+       public native AioFutureReadWrite write(ByteBuffer buffer, long 
position) throws IOException;
        
        public native AioFutureBatch batchWrite(BatchRequest... writes) throws 
IOException;
        

Modified: 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java?view=diff&rev=494327&r1=494326&r2=494327
==============================================================================
--- 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java
 (original)
+++ 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java
 Mon Jan  8 22:23:54 2007
@@ -57,11 +57,11 @@
                return operation;
        }
 
-       public boolean isCompleted() {
+       public boolean isDone() {
                return completed;
        }
        
-       public boolean isSuccessful() {
+       public boolean isCancelled() {
                return exception == null;
        }
        
@@ -79,7 +79,7 @@
                if (!completed) {
                        wait(timeUnit.toMillis(timeout));
                }
-               return isCompleted();
+               return isDone();
        }
 
        @SuppressWarnings("unchecked")

Modified: mina/sandbox/mheath/aioj/trunk/todo.txt
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/todo.txt?view=diff&rev=494327&r1=494326&r2=494327
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/todo.txt (original)
+++ mina/sandbox/mheath/aioj/trunk/todo.txt Mon Jan  8 22:23:54 2007
@@ -1,13 +1,23 @@
 - Add support for heap buffers
 - Throw IO Exceptions when aio calls return an error
 - Figure out batch requests
-- Add support for timeouts
+- Add support for timeouts -- does AIO itself have support for timeouts?
 - Add support for receiving file modification events
 - Make sure we're checking for errors on all JNI method calls
+- Modify AIOFuture to extend Java 5 Future
+- Look into simplifying AioFuture using generics
+- Update ByteBuffer immediately
+- Add support for aio_fsync
+- Provide an asynchronous open
 
 === Testing ===
 - Make sure the AsynchronousFileChannel can be unloaded without holding onto 
native references
 - Test for memory leaks
 - Do performance tests
 - Write unit tests
-- Determine if doing AttachCurrentThread or AttachCurrentThreadAsDaemon is 
faster in callback
\ No newline at end of file
+- Determine if doing AttachCurrentThread or AttachCurrentThreadAsDaemon is 
faster in callback
+- Do a test using util.concurrent asynch I/O
+
+=== Things to ask Alan Bateman ===
+- What about asynchronous file opens?
+- Is there an easy way to get to the ByteBuffer in a completion handler when 
doing a read?


Reply via email to