Author: mheath
Date: Wed Mar 14 22:04:16 2007
New Revision: 518484

URL: http://svn.apache.org/viewvc?view=rev&rev=518484
Log:
Fixed support for signal completion notification.
Fixed support for heap buffers.

Added:
    
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/AbstractPosixAioFuture.java
    
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixReadWriteAioFuture.java
Modified:
    mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
    
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAioFutureImpl.java
    
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java
    
mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java

Modified: mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp?view=diff&rev=518484&r1=518483&r2=518484
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp Wed Mar 14 
22:04:16 2007
@@ -96,7 +96,6 @@
 static jfieldID FID_posixAioFuture_aioRequestPtr;
 static jmethodID MID_posixAioFuture_setValue;
 static jmethodID MID_posixAioFuture_setException;
-static jmethodID MID_posixAioFtgure_setValueInExecutorService;
 
 // --- org.apache.aio.posix.PosixAioFutureImpl.PosixAioReadWriteFuture
 static jclass posixAioReadWriteFuture;
@@ -150,11 +149,14 @@
 
 jbyte* getBufferAddress(struct aio_request *request, int bufferIndex)
 {
-       jbyte *address = (jbyte *)request;
-       for (int i = 0; i < bufferIndex && i < request->bufferCount; i++) {
+       if (bufferIndex > request->bufferCount) {
+               return NULL;
+       }
+       jbyte *address = (jbyte *)request + sizeof(aio_request) + 
sizeof(buffer_size) * request->bufferCount;
+       for (int i = 0; i < bufferIndex; i++) {
                address += request->buffers[i].allocatedBufferSize;
        }
-       return NULL;
+       return address;
 }
 
 void cleanupPosixAio(JNIEnv *env, struct aio_request *request)
@@ -225,7 +227,7 @@
  * 
  
*********************************************************************************************************************/
 
-void processPosixAioCallback(JNIEnv *env, struct aio_request *request, bool 
callbackInExecutorService)
+void processPosixAioCallback(JNIEnv *env, struct aio_request *request)
 {
        LOG_DEBUG("Processing callback\n");
 
@@ -252,6 +254,7 @@
                                jclass bufferClass = 
env->GetObjectClass(buffer);
                                int remaining = getByteBufferRemaining(env, 
buffer, bufferClass);
                                if (remaining < totalBytes) {
+                                       LOG_DEBUG("Error remaining bytes is 
less than total bytes read\n");
                                        // TODO: Throw exception
                                        cleanupPosixAio(env, request);
                                        
@@ -265,7 +268,11 @@
                                jint position = getByteBufferPosition(env, 
buffer, bufferClass);
                                jbyte *address = (jbyte 
*)env->GetDirectBufferAddress(buffer);
                                if (address == NULL) {
+                                       LOG_DEBUG("Copying data for heap 
buffer\n");
                                        if (byteBufferHasArray(env, buffer, 
bufferClass)) {
+                                               LOG_DEBUG("Buffer has array\n");
+                                               setByteBufferPosition(env, 
buffer, bufferClass, position + totalBytes);                         
+               
                                                jbyteArray array = 
getByteBufferArray(env, buffer, bufferClass);
                                                jbyte *arrayPtr = (jbyte 
*)env->GetPrimitiveArrayCritical(array, NULL);
                                                memcpy(arrayPtr, 
request->buffers[0].buffer, totalBytes);
@@ -273,9 +280,9 @@
                                                env->DeleteLocalRef(array);
                                        } else {
                                                // TODO: Handle byte buffers 
that don't support arrays
+                                               LOG_DEBUG("Buffer does not have 
array\n");
                                        }
                                } else {
-                                       LOG_DEBUG("Setting buffer position in 
direct buffer\n");
                                        setByteBufferPosition(env, buffer, 
bufferClass, position + totalBytes);                         
                                }
 
@@ -286,11 +293,7 @@
 
                LOG_DEBUG("Invoke callbacks\n");                
                jobject totalBytesObj = env->CallStaticObjectMethod(Integer, 
MID_Integer_valueOf, totalBytes);
-               if (callbackInExecutorService) {
-                       env->CallVoidMethod(request->future, 
MID_posixAioFtgure_setValueInExecutorService, totalBytesObj);
-               } else {
-                       env->CallVoidMethod(request->future, 
MID_posixAioFuture_setValue, totalBytesObj);
-               }
+               env->CallVoidMethod(request->future, 
MID_posixAioFuture_setValue, totalBytesObj);
                env->DeleteLocalRef(totalBytesObj);
                LOG_DEBUG("Done with callbacks\n");             
        } else {
@@ -331,7 +334,7 @@
                        perror("Unable to obtain reference to JNI Environment, 
can not handle AIO completion\n");
                } else {
                        struct aio_request *req = (struct aio_request 
*)info->si_value.sival_ptr;
-                       processPosixAioCallback(env, req, true);
+                       processPosixAioCallback(env, req);
                }
        }
 }
@@ -348,7 +351,7 @@
        }
        
        struct aio_request *req = (aio_request *)sigval.sival_ptr;
-       processPosixAioCallback(env, req, false);
+       processPosixAioCallback(env, req);
        
        jvm->DetachCurrentThread();
        return;
@@ -409,12 +412,11 @@
        cls = env->FindClass("org/apache/aio/posix/PosixAioFutureImpl"); 
NULL_CHECK(cls)
        FID_posixAioFuture_aioRequestPtr = env->GetFieldID(cls, 
"aioRequestPtr", "J"); NULL_CHECK(FID_posixAioFuture_aioRequestPtr)
        MID_posixAioFuture_setValue = env->GetMethodID(cls, "setValue", 
"(Ljava/lang/Object;)V"); NULL_CHECK(MID_posixAioFuture_setValue)
-       MID_posixAioFtgure_setValueInExecutorService = env->GetMethodID(cls, 
"setValueInExecutorService", "(Ljava/lang/Object;)V"); 
NULL_CHECK(MID_posixAioFuture_setValue)
        MID_posixAioFuture_setException = env->GetMethodID(cls, "setException", 
"(Ljava/lang/Throwable;)V"); NULL_CHECK(MID_posixAioFuture_setException)
        
        // --- org.apache.aio.posix.PosixAioFutureImpl.PosixReadWriteAioFuture 
--------------
        LOG_DEBUG("Getting ids for 
PosixAioFutureImpl.PosixReadWriteAioFuture\n")
-       cls = 
env->FindClass("org/apache/aio/posix/PosixAioFutureImpl$PosixReadWriteAioFuture");
 NULL_CHECK(cls)
+       cls = env->FindClass("org/apache/aio/posix/PosixReadWriteAioFuture"); 
NULL_CHECK(cls)
        posixAioReadWriteFuture = (jclass)env->NewWeakGlobalRef(cls); 
NULL_CHECK(posixAioReadWriteFuture)
        CID_posixAioReadWriteFuture = env->GetMethodID(posixAioReadWriteFuture, 
"<init>", 
"(Lorg/apache/aio/posix/PosixAsynchronousFileChannel;JLjava/nio/ByteBuffer;J)V");
 NULL_CHECK(CID_posixAioReadWriteFuture)
        FID_posixAioReadWriteFuture_buffer = 
env->GetFieldID(posixAioReadWriteFuture, "buffer", "Ljava/nio/ByteBuffer;"); 
NULL_CHECK(FID_posixAioReadWriteFuture_buffer)
@@ -478,7 +480,14 @@
                address += bufferPosition;
        }
        
+       LOG_DEBUG_PARAM("Buffer size to allocate %d\n", bufferSizeToAllocate);
+       
        struct aio_request *req = allocateRequest(1, bufferSizeToAllocate);
+
+       if (req == NULL) {
+               // Error occured ??? Do with throw an OutOfMemory exception?
+               return NULL;
+       }
        
        // Setup aio_request fields
        req->buffers[0].bufferSize = bufferSize;
@@ -492,11 +501,13 @@
        jobject future = createPosixReadWriteAioFuture(env, self, req, buffer, 
position);
        if (future == NULL) {
                // Error occured, return and let Java throw exception
+               free(req);
                return NULL;
        }
        req->future = env->NewGlobalRef(future);
        if (req->future == NULL) {
                // Error occured, return and let Java throw exception
+               free(req);
                return NULL;
        }
 
@@ -512,16 +523,19 @@
        req->cb.aio_sigevent.sigev_value.sival_ptr = req;
        if (env->GetBooleanField(self, FID_posixChannelThreadNotify)) {
                // Do thread notification
+               LOG_DEBUG("Doing thread notification\n");
                req->cb.aio_sigevent.sigev_notify = SIGEV_THREAD;
                req->cb.aio_sigevent.sigev_notify_function = aioThreadHandler;
        } else {
                // Do SIG notification
+               LOG_DEBUG("Doing signal notification\n");
                req->cb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
                req->cb.aio_sigevent.sigev_signo = SIG_AIO;
        }
        
        LOG_DEBUG("Calling aio_read\n");
        int ret = aio_read(&req->cb);
+       LOG_DEBUG("Aio read returned\n");
        if (ret) {
                // TODO Handle each error return type as appropriate
                perror("Error issuing aio_read");

Added: 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/AbstractPosixAioFuture.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/AbstractPosixAioFuture.java?view=auto&rev=518484
==============================================================================
--- 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/AbstractPosixAioFuture.java
 (added)
+++ 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/AbstractPosixAioFuture.java
 Wed Mar 14 22:04:16 2007
@@ -0,0 +1,5 @@
+package org.apache.aio.posix;
+
+public class AbstractPosixAioFuture {
+
+}

Modified: 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAioFutureImpl.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAioFutureImpl.java?view=diff&rev=518484&r1=518483&r2=518484
==============================================================================
--- 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAioFutureImpl.java
 (original)
+++ 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAioFutureImpl.java
 Wed Mar 14 22:04:16 2007
@@ -19,49 +19,20 @@
  */
 package org.apache.aio.posix;
 
-import java.nio.ByteBuffer;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.aio.AioCallbackException;
-import org.apache.aio.AsynchronousFileChannel;
 import org.apache.aio.concurrent.AbstractAioFutureCallBackHandler;
 
 public class PosixAioFutureImpl<V> extends AbstractAioFutureCallBackHandler<V> 
{
 
-    public static class PosixReadWriteAioFuture extends 
PosixAioFutureImpl<Integer> implements ByteBufferFuture {
-
-        private final ByteBuffer buffer;
-        private final long position;
-        
-        public PosixReadWriteAioFuture(PosixAsynchronousFileChannel channel, 
long aioRequestPtr, ByteBuffer buffer, long position) {
-            super(channel, aioRequestPtr);
-            this.buffer = buffer;
-            this.position = position;
-        }
-        
-        public ByteBuffer getBuffer() {
-            return buffer;
-        }
-
-        public long getPosition() {
-            return position;
-        }
-
-    }
-    
     private final PosixAsynchronousFileChannel channel;
     @SuppressWarnings("unused")
     private final long aioRequestPtr; // The pointer to the aio request struct 
(used by native code)
 
-    private final Lock lock = new ReentrantLock();
-    private final Condition condition = lock.newCondition();
-    
     private volatile Throwable exception;
     private volatile boolean done = false;
     private volatile V value = null;
@@ -72,34 +43,40 @@
     }
     
     public V get() throws InterruptedException, ExecutionException, 
AioCallbackException {
-        lock.lockInterruptibly();
-        try {
-            // TOOD - Come up with a better blocking mechanism than a spin 
lock that works with signals
-            while (!done) {
-                Thread.yield();
-                System.out.print(".");
+        if (!done) {
+            if (channel.isThreadNotify()) {
+                synchronized (this) {
+                    wait();
+                }
+            } else {
+                while (!done) {
+                    Thread.yield();
+                }
             }
-            System.out.println("Done with get!");
-            return getValue();
-        } finally {
-            lock.unlock();
         }
+        return getValue();
     }
 
     public V get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException, TimeoutException {
-        lock.lockInterruptibly();
-        try {
-            if (done) {
-                return getValue();
+        if (!done) {
+            if (channel.isThreadNotify()) {
+                synchronized (this) {
+                    wait(unit.toMillis(timeout));
+                }
+            } else {
+                long start = System.currentTimeMillis();
+                while (!done) {
+                    Thread.yield();
+                    if ((System.currentTimeMillis() - start) > 
unit.toMillis(timeout)) {
+                        throw new TimeoutException();
+                    }
+                }
             }
-            condition.await(timeout, unit);
-            return getValue();
-        } finally {
-            lock.unlock();
         }
+        return getValue();
     }
 
-    public AsynchronousFileChannel getChannel() {
+    public PosixAsynchronousFileChannel getChannel() {
         return channel;
     }
 
@@ -111,11 +88,8 @@
         if (canceled) {
             done = true;
             exception = new CancellationException();
-            lock.lock();
-            try {
-                condition.signalAll();
-            } finally {
-                lock.unlock();
+            synchronized (this) {
+                notifyAll();
             }
         }
         return canceled;
@@ -131,6 +105,13 @@
         return done;
     }
 
+    /**
+     * If there's a valid assocaited with this future, return it.  If there's 
an exception associated with this future,
+     * throw it.
+     * 
+     * @return  The value associated with tis future.
+     * @throws ExecutionException  If an error occured while processing this 
future, an exception is thrown
+     */
     protected V getValue() throws ExecutionException {
         if (exception != null) {
             if (exception instanceof ExecutionException) {
@@ -141,36 +122,40 @@
         return value;
     }
     
+    /**
+     * Sets the value assocaited with this future.
+     * 
+     * @param value  The value to assocaited with this future.
+     */
     protected void setValue(V value) {
         this.value = value;
         done = true;
         signal();
     }
     
-    protected void setValueInExecutorService(final V value) {
-        channel.getExecutorService().submit(new Runnable() {
-            public void run() {
-                setValue(value);
-            }
-        });
-    }
-    
+    /**
+     * Sets the exception that occured while processing this future.
+     * 
+     * @param exception  The exception that occurred.
+     */
     public void setException(Throwable exception) {
         this.exception = exception;
         done = true;
         signal();
     }
 
-    private void signal() {
-        System.out.println("Signalling");
-        lock.lock();
-        try {
-            condition.signalAll();
-        } finally {
-            lock.unlock();
+    /**
+     * Wakes up any threads waiting on this future's compeltion.  Calls any 
observing completion handlers.
+     *
+     */
+    private synchronized void signal() {
+        notifyAll();
+        if (channel.isThreadNotify()) {
+            // TODO: Make this configurable so even if thread callback 
handlers are being used, the callback handlers can be called in the 
ExecutorService
+            callCompletionHandlers();
+        } else {
+            channel.callCompletionHandlersInExecutorService(this);
         }
-        
-        callCompletionHandlers();
     }
     
 }

Modified: 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java?view=diff&rev=518484&r1=518483&r2=518484
==============================================================================
--- 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java
 (original)
+++ 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java
 Wed Mar 14 22:04:16 2007
@@ -15,8 +15,8 @@
 
 public class PosixAsynchronousFileChannel extends 
ConcurrentAsynchronousFileChannel {
 
-    @SuppressWarnings("unused") // This is used by native code
     private final boolean threadNotify;
+
     @SuppressWarnings("unused")
     private final FileDescriptor fd;
     
@@ -41,8 +41,7 @@
     @Override
     public native BatchFuture write(ByteBufferPosition[] byteBufferPositions, 
int offset, int length);
     
-    public void test() {
-        System.out.println("This is from Java");
-        System.out.println("I love JNI");
+    public boolean isThreadNotify() {
+        return threadNotify;
     }
 }

Added: 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixReadWriteAioFuture.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixReadWriteAioFuture.java?view=auto&rev=518484
==============================================================================
--- 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixReadWriteAioFuture.java
 (added)
+++ 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixReadWriteAioFuture.java
 Wed Mar 14 22:04:16 2007
@@ -0,0 +1,46 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+
+package org.apache.aio.posix;
+
+import java.nio.ByteBuffer;
+
+import org.apache.aio.AioFuture.ByteBufferFuture;
+
+public class PosixReadWriteAioFuture extends PosixAioFutureImpl<Integer> 
implements ByteBufferFuture {
+
+    private final ByteBuffer buffer;
+    private final long position;
+    
+    public PosixReadWriteAioFuture(PosixAsynchronousFileChannel channel, long 
aioRequestPtr, ByteBuffer buffer, long position) {
+        super(channel, aioRequestPtr);
+        this.buffer = buffer;
+        this.position = position;
+    }
+    
+    public ByteBuffer getBuffer() {
+        return buffer;
+    }
+
+    public long getPosition() {
+        return position;
+    }
+
+}
\ No newline at end of file

Modified: 
mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java?view=diff&rev=518484&r1=518483&r2=518484
==============================================================================
--- 
mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java
 (original)
+++ 
mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java
 Wed Mar 14 22:04:16 2007
@@ -4,6 +4,7 @@
 import java.nio.ByteBuffer;
 import java.util.concurrent.Executors;
 
+import org.apache.aio.AioCompletionHandler;
 import org.apache.aio.AioFuture;
 import org.apache.aio.Modes;
 
@@ -20,9 +21,21 @@
                 in.getChannel(),
                 Modes.READ_ONLY,
                 Executors.newCachedThreadPool(), true);
-        ByteBuffer buffer = ByteBuffer.allocateDirect(1000);
+        ByteBuffer buffer = ByteBuffer.allocate(1000);
         AioFuture<Integer> future = channel.read(buffer, 0L);
+        System.out.println("read returned");
+        future.addCompletionHandler(new AioCompletionHandler<Integer>() {
+            public void onCompletion(AioFuture<Integer> future) {
+                try {
+                    System.out.println(String.format("Completion handler read 
%d bytes", future.get()));
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
         future.get();
+        System.out.println("********");
+        System.out.println(buffer.position());
         buffer.flip();
         byte[] bytes = new byte[buffer.remaining()];
         buffer.get(bytes);


Reply via email to