Author: mheath
Date: Mon Nov 27 12:47:37 2006
New Revision: 479760

URL: http://svn.apache.org/viewvc?view=rev&rev=479760
Log:
Add error handling callbacks.

Modified:
    mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
    
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFutureBatch.java
    
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java
    mina/sandbox/mheath/aioj/trunk/src/test/java/AIOTest.java
    mina/sandbox/mheath/aioj/trunk/todo.txt

Modified: mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp?view=diff&rev=479760&r1=479759&r2=479760
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp Mon Nov 27 
12:47:37 2006
@@ -6,8 +6,19 @@
 #include "org_apache_aio_AsynchronousFileChannel.h"
 #include "org_apache_aio_posix_PosixAioFutureReadWrite.h"
 
-#define DEBUG 1
+#define DEBUG
 
+#ifdef DEBUG
+#define LOG_DEBUG(s) { fprintf(stdout, s); fflush(stdout); }
+#endif
+
+#ifndef DEBUG
+#define LOG_DEBUG(s)
+#endif
+
+// --- jvm handler ---
+
+// --- Exception classes ---
 static JavaVM *jvm;
 static jclass ioException;
 static jclass nullPointerException;
@@ -15,9 +26,10 @@
 // --- Classes and IDs for PosixAioFutureReadWrite ---
 static jclass posixAioFutureReadWrite;
 static jmethodID CID_posixAioFutureReadWrite;
-static jmethodID posixAioFutureReadWrite_processFutureListenersID;
 static jfieldID posixAioFutureReadWrite_aiocbPtrID;
 static jfieldID posixAioFutureReadWrite_bufferID;
+static jmethodID abstractAioFuture_processFutureListenersID;
+static jmethodID abstractAioFuture_handleErrorID;
 
 static jfieldID fdID; // ID for java.io.FileDescriptor.fd
 static jfieldID fieldDescID; // ID for 
org.apache.aio.AsynchronousFileChannel.fd
@@ -85,14 +97,12 @@
 struct aiocb *setupAioRequest(JNIEnv *env, jobject asynchronousFileChannel, 
jobject buffer, jlong position, jobject operation)
 {
                // Get address and capacity of buffer
-       if (buffer == NULL)
-       {
+       if (buffer == NULL) {
                env->ThrowNew(nullPointerException, "buffer cannot be null");
        }
        // TODO: Add support for non direct byte buffers
        jbyte *bufferAddress = (jbyte *)env->GetDirectBufferAddress(buffer);
-       if (bufferAddress == NULL)
-       {
+       if (bufferAddress == NULL) {
                env->ThrowNew(ioException, "Must use direct ByteBuffer");
                return NULL;
        }
@@ -114,8 +124,7 @@
        req->aio_nbytes = bufferSize;
        
        req->aio_sigevent.sigev_notify = SIGEV_THREAD;
-       if (operation == operationRead)
-       {
+       if (operation == operationRead) {
                req->aio_sigevent.sigev_notify_function = 
aio_read_completion_handler;
        } else if (operation == operationWrite) {
                req->aio_sigevent.sigev_notify_function = 
aio_write_completion_handler;
@@ -130,18 +139,16 @@
 JNIEXPORT jint JNICALL JNI_OnLoad(JavaVM *vm, void *reserved)
 {
        jint JNIversion = JNI_VERSION_1_4;
-       if (DEBUG)
-       {
-               fprintf(stdout, "Initializing native code for JNI version 
0x%x\n", JNIversion);
-               fflush(stdout);
-       }
+#ifdef DEBUG
+       fprintf(stdout, "Initializing native code for JNI version 0x%x\n", 
JNIversion);
+       fflush(stdout);
+#endif
        
        // Initialize static jvm pointer.
        jvm = vm;
        
        JNIEnv *env;
-       if (vm->GetEnv((void**)&env, JNIversion))
-       {
+       if (vm->GetEnv((void**)&env, JNIversion)) {
                return JNI_ERR;
        }
        
@@ -155,10 +162,13 @@
        cls = env->FindClass("org/apache/aio/posix/PosixAioFutureReadWrite");
        posixAioFutureReadWrite = (jclass)env->NewWeakGlobalRef(cls);
        CID_posixAioFutureReadWrite = env->GetMethodID(posixAioFutureReadWrite, 
"<init>", 
"(Lorg/apache/aio/AsynchronousFileChannel;Lorg/apache/aio/Operation;JLjava/nio/ByteBuffer;J)V");
-       posixAioFutureReadWrite_processFutureListenersID = 
env->GetMethodID(posixAioFutureReadWrite, "processFutureListeners", "()V");
        posixAioFutureReadWrite_aiocbPtrID = 
env->GetFieldID(posixAioFutureReadWrite, "aiocbPtr", "J"); 
        posixAioFutureReadWrite_bufferID = 
env->GetFieldID(posixAioFutureReadWrite, "buffer", "Ljava/nio/ByteBuffer;");
        
+       cls = env->FindClass("org/apache/aio/common/AbstractAioFuture");
+       abstractAioFuture_processFutureListenersID = env->GetMethodID(cls, 
"processFutureListeners", "()V");
+       abstractAioFuture_handleErrorID = env->GetMethodID(cls, "handleError", 
"(Ljava/lang/Throwable;)V");
+       
        cls = env->FindClass("java/io/FileDescriptor");
        fdID = env->GetFieldID(cls, "fd", "I");
        
@@ -183,8 +193,7 @@
 {
        jint JNIversion = JNI_VERSION_1_4;
        JNIEnv *env;
-       if (vm->GetEnv((void**)&env, JNIversion))
-       {
+       if (vm->GetEnv((void**)&env, JNIversion)) {
                return;
        }
        
@@ -201,15 +210,13 @@
 JNIEXPORT jobject JNICALL Java_org_apache_aio_AsynchronousFileChannel_write
   (JNIEnv *env, jobject obj, jobject buffer, jlong position)
 {
-       if (DEBUG)
-       {
-               fprintf(stdout, "aio write at file position %d\n", position);
-               fflush(stdout);
-       }
+#ifdef DEBUG
+       fprintf(stdout, "aio write at file position %d\n", position);
+       fflush(stdout);
+#endif
        struct aiocb *req = setupAioRequest(env, obj, buffer, position, 
operationWrite);
        int ret = aio_write(req);
-       if (ret)
-       {
+       if (ret) {
                // TODO Handle errors from ret
                // return a null on error.
                return NULL;
@@ -220,20 +227,14 @@
 JNIEXPORT jobject JNICALL Java_org_apache_aio_AsynchronousFileChannel_read
   (JNIEnv *env, jobject obj, jobject buffer, jlong position)
 {
-       if (DEBUG)
-       {
-               fprintf(stdout, "aio read at file position %d\n", position);
-               fflush(stdout);
-       }
+#ifdef DEBUG
+       fprintf(stdout, "aio read at file position %d\n", position);
+       fflush(stdout);
+#endif
        struct aiocb *req = setupAioRequest(env, obj, buffer, position, 
operationRead);
-       if (DEBUG)
-       {
-               fprintf(stdout, "Do aio read\n");
-               fflush(stdout);
-       }
+       LOG_DEBUG("Do aio read\n");
        int ret = aio_read(req);
-       if (ret)
-       {
+       if (ret) {
                // TODO Handle errors from ret
                // return a null on error.
                return NULL;
@@ -246,21 +247,21 @@
 JNIEXPORT jobject JNICALL Java_org_apache_aio_AsynchronousFileChannel_batchRead
   (JNIEnv *env, jobject obj, jobjectArray batch)
 {
-       printf("batchRead\n");
+       LOG_DEBUG("batchRead\n")
        return NULL;
 }
 
 JNIEXPORT jobject JNICALL 
Java_org_apache_aio_AsynchronousFileChannel_batchWrite
   (JNIEnv *env, jobject obj, jobjectArray batch)
 {
-       printf("batchWrite\n");
+       LOG_DEBUG("batchWrite\n");
        return NULL;
 }
 
 JNIEXPORT void JNICALL Java_org_apache_aio_AsynchronousFileChannel_suspend
   (JNIEnv *env, jobject obj, jobjectArray futures)
 {
-       printf("suspend\n");
+       LOG_DEBUG("suspend\n");
 }
 
 // --- PosixAioFutureReadWrite methods 
---------------------------------------------------
@@ -269,8 +270,7 @@
 {
        struct aiocb *req = (struct aiocb *)env->GetLongField(obj, 
posixAioFutureReadWrite_aiocbPtrID);
        int ret = aio_cancel(req->aio_fildes, req);
-       if (ret == AIO_CANCELED)
-       {
+       if (ret == AIO_CANCELED) {
                free(req);
                return 1;
        } 
@@ -278,19 +278,16 @@
 }
 
 // --- Completion handlers 
----------------------------------------------------------------
-void aio_read_completion_handler(sigval_t sigval)
-{
-       if (DEBUG)
-       {
-               fprintf(stdout, "In read completion handler\n");
-               fflush(stdout);
-       }
+// TODO: Consolodate completion handlers
+enum operation_mode {READ, WRITE};
 
+void aio_read_write_completion_handler(sigval_t sigval, operation_mode mode, 
char *jvmAttachErrorMessage, char *failureMessage)
+{
        JNIEnv *env;
        jint res = jvm->AttachCurrentThread((void**)&env, NULL);
-       if (res < 0)
-       {
-               fprintf(stderr, "Failed to attach JVM to AIO read thread\n");
+       if (res < 0) {
+               fprintf(stderr, jvmAttachErrorMessage);
+               fflush(stderr);
                return;
        }
        
@@ -302,68 +299,45 @@
                /* Request completed successfully, get number of bytes 
processed */
                int ret = aio_return( req );
                
-               // Adjust buffer limit
-               jobject buffer = env->GetObjectField(future, 
posixAioFutureReadWrite_bufferID);
-               int limit = ret + getBufferPosition(env, buffer);
-               setBufferLimit(env, buffer, limit);
+               if (mode == READ) {
+                       // Adjust buffer limit
+                       jobject buffer = env->GetObjectField(future, 
posixAioFutureReadWrite_bufferID);
+                       int limit = ret + getBufferPosition(env, buffer);
+                       setBufferLimit(env, buffer, limit);
+               } else if (mode == WRITE) {
+                       // Adjust buffer position
+                       jobject buffer = env->GetObjectField(future, 
posixAioFutureReadWrite_bufferID);
+                       int position = getBufferPosition(env, buffer) + ret;
+                       setBufferPosition(env, buffer, position);
+               }
                
                // Call aio listeners
-               env->CallVoidMethod(future, 
posixAioFutureReadWrite_processFutureListenersID);
-               
-               // Free resources               
-               free(req);
-               env->DeleteGlobalRef(future);
+               env->CallVoidMethod(future, 
abstractAioFuture_processFutureListenersID);
        } else {
-               // TODO Find a way to handle exception here
-               fprintf(stderr, "ERROR: AIO read request did NOT complete\n");
-               fflush(stderr);
+               // TODO Write a unit test for this
+               env->ThrowNew(ioException, failureMessage);
+               jthrowable e = env->ExceptionOccurred();
+               env->CallVoidMethod(future, abstractAioFuture_handleErrorID, e);
        }
+               
+       // Free resources               
+       free(req);
+       env->DeleteGlobalRef(future);
 
        jvm->DetachCurrentThread();
        return;
+} 
+
+void aio_read_completion_handler(sigval_t sigval)
+{
+       LOG_DEBUG("In read completion handler\n")
+       
+       aio_read_write_completion_handler(sigval, READ, "Failed to attach JVM 
to AIO read thread\n", "Could not complete read request");
 }
 
 void aio_write_completion_handler(sigval_t sigval)
 {
-       if (DEBUG)
-       {
-               fprintf(stdout, "In write completion handler\n");
-               fflush(stdout);
-       }
+       LOG_DEBUG("In write completion handler\n")
 
-       JNIEnv *env;
-       jint res = jvm->AttachCurrentThread((void**)&env, NULL);
-       if (res < 0)
-       {
-               fprintf(stderr, "Failed to attach JVM to AIO write thread\n");
-               return;
-       }
-       
-       jobject future = (jobject)sigval.sival_ptr;
-       struct aiocb *req = (struct aiocb *)env->GetLongField(future, 
posixAioFutureReadWrite_aiocbPtrID);
-       
-       /* Did the request complete? */
-       if (aio_error(req) == 0) {
-               /* Request completed successfully, get number of bytes 
processed */
-               int ret = aio_return( req );
-               
-               // Adjust buffer position
-               jobject buffer = env->GetObjectField(future, 
posixAioFutureReadWrite_bufferID);
-               int position = getBufferPosition(env, buffer) + ret;
-               setBufferPosition(env, buffer, position);
-               
-               // Call aio listeners
-               env->CallVoidMethod(future, 
posixAioFutureReadWrite_processFutureListenersID);
-               
-               // Free resources               
-               free(req);
-               env->DeleteGlobalRef(future);
-       } else {
-               // TODO Find a way to handle exception here
-               fprintf(stderr, "ERROR: AIO write request did NOT complete\n");
-               fflush(stderr);
-       }
-
-       jvm->DetachCurrentThread();
-       return;
+       aio_read_write_completion_handler(sigval, READ, "Failed to attach JVM 
to AIO write thread\n", "Could not complete write request");
 }

Modified: 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java?view=diff&rev=479760&r1=479759&r2=479760
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java 
(original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java 
Mon Nov 27 12:47:37 2006
@@ -8,9 +8,17 @@
        
        public void removeListener(AioFutureListener<T> ioFutureListener);
        
+       public void addErrorListener(AioFutureListener<T> ioFutureListener);
+       
+       public void removeErrorListener(AioFutureListener<T> ioFutureListener);
+       
        public AsynchronousFileChannel getChannel();
        
        public boolean isCompleted();
+
+       public boolean isSuccessful();
+       
+       public Throwable getException();
        
        public void join() throws InterruptedException;
        

Modified: 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFutureBatch.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFutureBatch.java?view=diff&rev=479760&r1=479759&r2=479760
==============================================================================
--- 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFutureBatch.java 
(original)
+++ 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFutureBatch.java 
Mon Nov 27 12:47:37 2006
@@ -1,10 +1,7 @@
 package org.apache.aio;
 
-public interface AioFutureBatch extends AioFuture {
-       
-       public void addListener(AioFutureListener<AioFutureBatch> 
ioFutureListener);
-       
-       public void removeListener(AioFutureListener<AioFutureBatch> 
ioFutureListener);
-       
+public interface AioFutureBatch extends AioFuture<AioFutureBatch> {
+
        public BatchRequest[] getRequests();
+
 }

Modified: 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java?view=diff&rev=479760&r1=479759&r2=479760
==============================================================================
--- 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java
 (original)
+++ 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java
 Mon Nov 27 12:47:37 2006
@@ -14,7 +14,9 @@
        private final AsynchronousFileChannel channel;
        private final Operation operation;
        private final List<AioFutureListener<T>> listeners = new 
LinkedList<AioFutureListener<T>>();
+       private final List<AioFutureListener<T>> errorListeners = new 
LinkedList<AioFutureListener<T>>();
        private volatile boolean completed = false;
+       private volatile Throwable exception = null;
 
        protected AbstractAioFuture(AsynchronousFileChannel channel, Operation 
operation) {
                this.channel = channel;
@@ -30,6 +32,23 @@
                }
        }
 
+       public synchronized void removeListener(AioFutureListener<T> 
ioFutureListener) {
+               listeners.remove(ioFutureListener);
+       }
+
+       @SuppressWarnings("unchecked")
+       public synchronized void addErrorListener(AioFutureListener<T> 
ioFutureListener) {
+               if (exception != null) {
+                       ioFutureListener.onCompletion((T)this);
+               } else {
+                       errorListeners.add(ioFutureListener);
+               }
+       }
+
+       public synchronized void removeErrorListener(AioFutureListener<T> 
ioFutureListener) {
+               errorListeners.remove(ioFutureListener);
+       }
+
        public AsynchronousFileChannel getChannel() {
                return channel;
        }
@@ -41,6 +60,14 @@
        public boolean isCompleted() {
                return completed;
        }
+       
+       public boolean isSuccessful() {
+               return exception == null;
+       }
+       
+       public Throwable getException() {
+               return exception;
+       }
 
        public synchronized void join() throws InterruptedException {
                if (!completed) {
@@ -55,10 +82,6 @@
                return isCompleted();
        }
 
-       public synchronized void removeListener(AioFutureListener<T> 
ioFutureListener) {
-               listeners.remove(ioFutureListener);
-       }
-
        @SuppressWarnings("unchecked")
        protected synchronized void processFutureListeners() {
                completed = true;
@@ -68,4 +91,14 @@
                }
        }
 
+       @SuppressWarnings("unchecked")
+       protected synchronized void handleError(Throwable exception) {
+               this.exception = exception;
+               completed = true;
+               notifyAll();
+               for (AioFutureListener<T> listener : errorListeners) {
+                       listener.onCompletion((T)this);
+               }
+       }
+       
 }

Modified: mina/sandbox/mheath/aioj/trunk/src/test/java/AIOTest.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/test/java/AIOTest.java?view=diff&rev=479760&r1=479759&r2=479760
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/test/java/AIOTest.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/test/java/AIOTest.java Mon Nov 27 
12:47:37 2006
@@ -30,9 +30,12 @@
                achannel = new AsynchronousFileChannel(out.getFD());
                buffer.clear();
                buffer.put("Have a really nice day!\n".getBytes());
+               System.out.println("Position: " + buffer.position());
                buffer.flip();
+               System.out.println("Position: " + buffer.position());
                future = achannel.write(buffer, 0);
-               System.out.println(future);
+               future.join();
+               System.out.println("Position: " + buffer.position());
        }
                
 }

Modified: mina/sandbox/mheath/aioj/trunk/todo.txt
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/todo.txt?view=diff&rev=479760&r1=479759&r2=479760
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/todo.txt (original)
+++ mina/sandbox/mheath/aioj/trunk/todo.txt Mon Nov 27 12:47:37 2006
@@ -1,7 +1,12 @@
 - Add support for heap buffers
 - Throw IO Exceptions when aio calls return an error
 - Figure out batch requests
+- Add support for timeouts and error handling
+- Add support for receiving file modification events
+- Make sure we're checking for errors on all JNI method calls
 
 === Testing ===
 - Make sure the AsynchronousFileChannel can be unloaded without holding onto 
native references
-- Test for memory leaks
\ No newline at end of file
+- Test for memory leaks
+- Do performance tests
+- Write unit tests
\ No newline at end of file


Reply via email to