Author: mheath
Date: Thu Mar 15 21:33:35 2007
New Revision: 518862

URL: http://svn.apache.org/viewvc?view=rev&rev=518862
Log:
Fixed write support

Modified:
    mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
    
mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java

Modified: mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp?view=diff&rev=518862&r1=518861&r2=518862
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp Thu Mar 15 
21:33:35 2007
@@ -20,9 +20,9 @@
 #include <aio.h>
 #include <errno.h>
 #include <jni.h>
-#include <malloc.h>
 #include <signal.h>
 #include <stdio.h>
+#include <stdlib.h>
 #include <string.h>
 
 #include "org_apache_aio_test_Test.h"
@@ -244,7 +244,7 @@
                        cleanupPosixAio(env, request);
                        return; 
                }
-               if (request->flags & READ_FLAG) {
+               if ((request->flags & READ_FLAG) == READ_FLAG) {
                        LOG_DEBUG("Handling read\n");
                        // Advance buffer position
                        jobject buffer = env->GetObjectField(request->future, 
FID_posixAioReadWriteFuture_buffer);
@@ -357,6 +357,97 @@
        return;
 }
 
+jobject doReadWrite(JNIEnv *env, jobject channel, jobject buffer, jlong 
position, int flag)
+{
+               jclass bufferClass = env->GetObjectClass(buffer);
+       
+       // Get buffer limit and position
+       jint bufferPosition = getByteBufferPosition(env, buffer, bufferClass);
+
+       jint bufferSize = getByteBufferRemaining(env, buffer, bufferClass);
+       jint bufferSizeToAllocate = 0;
+
+       jbyte *address = (jbyte *)env->GetDirectBufferAddress(buffer);
+       if (address == NULL) {
+               bufferSizeToAllocate = bufferSize;
+       } else {
+               address += bufferPosition;
+       }
+       
+       struct aio_request *req = allocateRequest(1, bufferSizeToAllocate);
+
+       if (req == NULL) {
+               // Error occured ??? Do with throw an OutOfMemory exception?
+               return NULL;
+       }
+       
+       // Setup aio_request fields
+       req->buffers[0].bufferSize = bufferSize;
+       req->buffers[0].allocatedBufferSize = bufferSizeToAllocate;
+       if (address == NULL) {
+               address = getBufferAddress(req, 0);
+       }
+       req->buffers[0].buffer = address;
+       
+       // TODO: If doing write with heap buffer, copy data from byteBuffer to 
allocated space 
+       
+       // Create future object
+       jobject future = createPosixReadWriteAioFuture(env, channel, req, 
buffer, position);
+       req->future = env->NewGlobalRef(future);
+       if (req->future == NULL) {
+               // Error occured, return and let Java throw exception
+               free(req);
+               return NULL;
+       }
+
+       // Set flag
+       req->flags = flag;
+       
+       // Setup aiocb
+       req->cb.aio_fildes = getPosixChannelFD(env, channel);
+       req->cb.aio_offset = position;
+       req->cb.aio_buf = address;
+       req->cb.aio_nbytes = bufferSize;
+
+       req->cb.aio_sigevent.sigev_value.sival_ptr = req;
+       if (env->GetBooleanField(channel, FID_posixChannelThreadNotify)) {
+               // Do thread notification
+               req->cb.aio_sigevent.sigev_notify = SIGEV_THREAD;
+               req->cb.aio_sigevent.sigev_notify_function = aioThreadHandler;
+       } else {
+               // Do SIG notification
+               req->cb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
+               req->cb.aio_sigevent.sigev_signo = SIG_AIO;
+       }
+       
+       if ((flag & READ_FLAG) == READ_FLAG) {
+               LOG_DEBUG("Calling aio_read\n");
+               int ret = aio_read(&req->cb);
+               LOG_DEBUG("Aio read returned\n");
+               if (ret) {
+                       // TODO Handle each error return type as appropriate
+                       perror("Error issuing aio_read");
+                       cleanupPosixAio(env, req);
+                       env->ThrowNew(aioException, "error issuing aio_read 
request");
+               }
+       } else if ((flag & WRITE_FLAG) == WRITE_FLAG) {
+               LOG_DEBUG("Calling aio_write\n");
+               int ret = aio_write(&req->cb);
+               LOG_DEBUG("Aio write returned\n");
+               if (ret) {
+                       // TODO Handle each error return type as appropriate
+                       perror("Error issuing aio_write");
+                       cleanupPosixAio(env, req);
+                       env->ThrowNew(aioException, "error issuing aio_read 
request");
+               }
+       } else {
+               LOG_DEBUG("Oops doing nothing\n");
+               // Throw exception
+       }
+       
+       return req->future;
+}
+
 
/**********************************************************************************************************************
  * 
  * JNI OnLoad and OnUnload functions
@@ -463,86 +554,7 @@
 {
        LOG_DEBUG("In read function\n");
 
-       jclass bufferClass = env->GetObjectClass(buffer);
-       
-       // Get buffer limit and position
-       jint bufferPosition = getByteBufferPosition(env, buffer, bufferClass);
-
-       jint bufferSize = getByteBufferRemaining(env, buffer, bufferClass);
-       jint bufferSizeToAllocate = 0;
-
-       jbyte *address = (jbyte *)env->GetDirectBufferAddress(buffer);
-       if (address == NULL) {
-               LOG_DEBUG("Using heap ByteBuffer\n");
-               bufferSizeToAllocate = bufferSize;
-       } else {
-               LOG_DEBUG("Using direct ByteBuffer\n");
-               address += bufferPosition;
-       }
-       
-       LOG_DEBUG_PARAM("Buffer size to allocate %d\n", bufferSizeToAllocate);
-       
-       struct aio_request *req = allocateRequest(1, bufferSizeToAllocate);
-
-       if (req == NULL) {
-               // Error occured ??? Do with throw an OutOfMemory exception?
-               return NULL;
-       }
-       
-       // Setup aio_request fields
-       req->buffers[0].bufferSize = bufferSize;
-       req->buffers[0].allocatedBufferSize = bufferSizeToAllocate;
-       if (address == NULL) {
-               address = getBufferAddress(req, 0);
-       }
-       req->buffers[0].buffer = address;
-       
-       // Create future object
-       jobject future = createPosixReadWriteAioFuture(env, self, req, buffer, 
position);
-       if (future == NULL) {
-               // Error occured, return and let Java throw exception
-               free(req);
-               return NULL;
-       }
-       req->future = env->NewGlobalRef(future);
-       if (req->future == NULL) {
-               // Error occured, return and let Java throw exception
-               free(req);
-               return NULL;
-       }
-
-       // Set flag
-       req->flags = READ_FLAG;
-       
-       // Setup aiocb
-       req->cb.aio_fildes = getPosixChannelFD(env, self);
-       req->cb.aio_offset = position;
-       req->cb.aio_buf = address;
-       req->cb.aio_nbytes = bufferSize;
-
-       req->cb.aio_sigevent.sigev_value.sival_ptr = req;
-       if (env->GetBooleanField(self, FID_posixChannelThreadNotify)) {
-               // Do thread notification
-               LOG_DEBUG("Doing thread notification\n");
-               req->cb.aio_sigevent.sigev_notify = SIGEV_THREAD;
-               req->cb.aio_sigevent.sigev_notify_function = aioThreadHandler;
-       } else {
-               // Do SIG notification
-               LOG_DEBUG("Doing signal notification\n");
-               req->cb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
-               req->cb.aio_sigevent.sigev_signo = SIG_AIO;
-       }
-       
-       LOG_DEBUG("Calling aio_read\n");
-       int ret = aio_read(&req->cb);
-       LOG_DEBUG("Aio read returned\n");
-       if (ret) {
-               // TODO Handle each error return type as appropriate
-               perror("Error issuing aio_read");
-               cleanupPosixAio(env, req);
-               env->ThrowNew(aioException, "error issuing aio_read request");
-       }
-       return req->future;
+       return doReadWrite(env, self, buffer, position, READ_FLAG);
 }
 
 /*
@@ -575,7 +587,9 @@
 JNIEXPORT jobject JNICALL 
Java_org_apache_aio_posix_PosixAsynchronousFileChannel_write__Ljava_nio_ByteBuffer_2J
   (JNIEnv *env, jobject self, jobject buffer, jlong position)
 {
-       LOG_DEBUG("In write function");
+       LOG_DEBUG("In write function\n");
+       
+       return doReadWrite(env, self, buffer, position, WRITE_FLAG);
 }
 
 /*

Modified: 
mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java?view=diff&rev=518862&r1=518861&r2=518862
==============================================================================
--- 
mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java
 (original)
+++ 
mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java
 Thu Mar 15 21:33:35 2007
@@ -1,12 +1,14 @@
 package org.apache.aio.posix;
 
 import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.nio.ByteBuffer;
 import java.util.concurrent.Executors;
 
 import org.apache.aio.AioCompletionHandler;
 import org.apache.aio.AioFuture;
 import org.apache.aio.Modes;
+import org.apache.aio.AioFuture.ByteBufferFuture;
 
 public class TestPosixFileChannel {
 
@@ -15,31 +17,50 @@
     }
     
     public static void main(String[] args) throws Exception {
-        FileInputStream in = new FileInputStream("/etc/passwd");
+        ByteBuffer buffer = ByteBuffer.allocateDirect(1000);
+
+//        FileInputStream in = new FileInputStream("/etc/passwd");
+//        PosixAsynchronousFileChannel channel = new 
PosixAsynchronousFileChannel(
+//                in.getFD(),
+//                in.getChannel(),
+//                Modes.READ_ONLY,
+//                Executors.newCachedThreadPool(), true);
+//        AioFuture<Integer> future = channel.read(buffer, 0L);
+//        future.addCompletionHandler(new AioCompletionHandler<Integer>() {
+//            public void onCompletion(AioFuture<Integer> future) {
+//                try {
+//                    System.out.println(String.format("Completion handler 
read %d bytes", future.get()));
+//                } catch (Exception e) {
+//                    e.printStackTrace();
+//                }
+//            }
+//        });
+//        future.get();
+//        System.out.println(buffer.position());
+//        buffer.flip();
+//        byte[] bytes = new byte[buffer.remaining()];
+//        buffer.get(bytes);
+//        System.out.println(new String(bytes));
+//
+//        channel.close();
+//        
+        // Test output
+        FileOutputStream out = new FileOutputStream("/tmp/aiotest");
         PosixAsynchronousFileChannel channel = new 
PosixAsynchronousFileChannel(
-                in.getFD(),
-                in.getChannel(),
-                Modes.READ_ONLY,
+                out.getFD(),
+                out.getChannel(),
+                Modes.READ_WRITE,
                 Executors.newCachedThreadPool(), true);
-        ByteBuffer buffer = ByteBuffer.allocate(1000);
-        AioFuture<Integer> future = channel.read(buffer, 0L);
-        System.out.println("read returned");
-        future.addCompletionHandler(new AioCompletionHandler<Integer>() {
-            public void onCompletion(AioFuture<Integer> future) {
-                try {
-                    System.out.println(String.format("Completion handler read 
%d bytes", future.get()));
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        });
-        future.get();
-        System.out.println("********");
-        System.out.println(buffer.position());
+        
+        buffer.put("Have a nice day!\n".getBytes());
         buffer.flip();
-        byte[] bytes = new byte[buffer.remaining()];
-        buffer.get(bytes);
-        System.out.println(new String(bytes));
+
+        ByteBufferFuture writeFuture = channel.write(buffer, 0);
+        System.out.println("Done with write");
+        writeFuture.get();
+        
+        channel.close();
+        
         System.out.println("Done!");
     }
 


Reply via email to