Author: mheath
Date: Thu Mar 15 21:33:35 2007
New Revision: 518862
URL: http://svn.apache.org/viewvc?view=rev&rev=518862
Log:
Fixed write support
Modified:
mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java
Modified: mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
URL:
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp?view=diff&rev=518862&r1=518861&r2=518862
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp Thu Mar 15
21:33:35 2007
@@ -20,9 +20,9 @@
#include <aio.h>
#include <errno.h>
#include <jni.h>
-#include <malloc.h>
#include <signal.h>
#include <stdio.h>
+#include <stdlib.h>
#include <string.h>
#include "org_apache_aio_test_Test.h"
@@ -244,7 +244,7 @@
cleanupPosixAio(env, request);
return;
}
- if (request->flags & READ_FLAG) {
+ if ((request->flags & READ_FLAG) == READ_FLAG) {
LOG_DEBUG("Handling read\n");
// Advance buffer position
jobject buffer = env->GetObjectField(request->future,
FID_posixAioReadWriteFuture_buffer);
@@ -357,6 +357,97 @@
return;
}
+jobject doReadWrite(JNIEnv *env, jobject channel, jobject buffer, jlong
position, int flag)
+{
+ jclass bufferClass = env->GetObjectClass(buffer);
+
+ // Get buffer limit and position
+ jint bufferPosition = getByteBufferPosition(env, buffer, bufferClass);
+
+ jint bufferSize = getByteBufferRemaining(env, buffer, bufferClass);
+ jint bufferSizeToAllocate = 0;
+
+ jbyte *address = (jbyte *)env->GetDirectBufferAddress(buffer);
+ if (address == NULL) {
+ bufferSizeToAllocate = bufferSize;
+ } else {
+ address += bufferPosition;
+ }
+
+ struct aio_request *req = allocateRequest(1, bufferSizeToAllocate);
+
+ if (req == NULL) {
+ // Error occured ??? Do with throw an OutOfMemory exception?
+ return NULL;
+ }
+
+ // Setup aio_request fields
+ req->buffers[0].bufferSize = bufferSize;
+ req->buffers[0].allocatedBufferSize = bufferSizeToAllocate;
+ if (address == NULL) {
+ address = getBufferAddress(req, 0);
+ }
+ req->buffers[0].buffer = address;
+
+ // TODO: If doing write with heap buffer, copy data from byteBuffer to
allocated space
+
+ // Create future object
+ jobject future = createPosixReadWriteAioFuture(env, channel, req,
buffer, position);
+ req->future = env->NewGlobalRef(future);
+ if (req->future == NULL) {
+ // Error occured, return and let Java throw exception
+ free(req);
+ return NULL;
+ }
+
+ // Set flag
+ req->flags = flag;
+
+ // Setup aiocb
+ req->cb.aio_fildes = getPosixChannelFD(env, channel);
+ req->cb.aio_offset = position;
+ req->cb.aio_buf = address;
+ req->cb.aio_nbytes = bufferSize;
+
+ req->cb.aio_sigevent.sigev_value.sival_ptr = req;
+ if (env->GetBooleanField(channel, FID_posixChannelThreadNotify)) {
+ // Do thread notification
+ req->cb.aio_sigevent.sigev_notify = SIGEV_THREAD;
+ req->cb.aio_sigevent.sigev_notify_function = aioThreadHandler;
+ } else {
+ // Do SIG notification
+ req->cb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
+ req->cb.aio_sigevent.sigev_signo = SIG_AIO;
+ }
+
+ if ((flag & READ_FLAG) == READ_FLAG) {
+ LOG_DEBUG("Calling aio_read\n");
+ int ret = aio_read(&req->cb);
+ LOG_DEBUG("Aio read returned\n");
+ if (ret) {
+ // TODO Handle each error return type as appropriate
+ perror("Error issuing aio_read");
+ cleanupPosixAio(env, req);
+ env->ThrowNew(aioException, "error issuing aio_read
request");
+ }
+ } else if ((flag & WRITE_FLAG) == WRITE_FLAG) {
+ LOG_DEBUG("Calling aio_write\n");
+ int ret = aio_write(&req->cb);
+ LOG_DEBUG("Aio write returned\n");
+ if (ret) {
+ // TODO Handle each error return type as appropriate
+ perror("Error issuing aio_write");
+ cleanupPosixAio(env, req);
+ env->ThrowNew(aioException, "error issuing aio_read
request");
+ }
+ } else {
+ LOG_DEBUG("Oops doing nothing\n");
+ // Throw exception
+ }
+
+ return req->future;
+}
+
/**********************************************************************************************************************
*
* JNI OnLoad and OnUnload functions
@@ -463,86 +554,7 @@
{
LOG_DEBUG("In read function\n");
- jclass bufferClass = env->GetObjectClass(buffer);
-
- // Get buffer limit and position
- jint bufferPosition = getByteBufferPosition(env, buffer, bufferClass);
-
- jint bufferSize = getByteBufferRemaining(env, buffer, bufferClass);
- jint bufferSizeToAllocate = 0;
-
- jbyte *address = (jbyte *)env->GetDirectBufferAddress(buffer);
- if (address == NULL) {
- LOG_DEBUG("Using heap ByteBuffer\n");
- bufferSizeToAllocate = bufferSize;
- } else {
- LOG_DEBUG("Using direct ByteBuffer\n");
- address += bufferPosition;
- }
-
- LOG_DEBUG_PARAM("Buffer size to allocate %d\n", bufferSizeToAllocate);
-
- struct aio_request *req = allocateRequest(1, bufferSizeToAllocate);
-
- if (req == NULL) {
- // Error occured ??? Do with throw an OutOfMemory exception?
- return NULL;
- }
-
- // Setup aio_request fields
- req->buffers[0].bufferSize = bufferSize;
- req->buffers[0].allocatedBufferSize = bufferSizeToAllocate;
- if (address == NULL) {
- address = getBufferAddress(req, 0);
- }
- req->buffers[0].buffer = address;
-
- // Create future object
- jobject future = createPosixReadWriteAioFuture(env, self, req, buffer,
position);
- if (future == NULL) {
- // Error occured, return and let Java throw exception
- free(req);
- return NULL;
- }
- req->future = env->NewGlobalRef(future);
- if (req->future == NULL) {
- // Error occured, return and let Java throw exception
- free(req);
- return NULL;
- }
-
- // Set flag
- req->flags = READ_FLAG;
-
- // Setup aiocb
- req->cb.aio_fildes = getPosixChannelFD(env, self);
- req->cb.aio_offset = position;
- req->cb.aio_buf = address;
- req->cb.aio_nbytes = bufferSize;
-
- req->cb.aio_sigevent.sigev_value.sival_ptr = req;
- if (env->GetBooleanField(self, FID_posixChannelThreadNotify)) {
- // Do thread notification
- LOG_DEBUG("Doing thread notification\n");
- req->cb.aio_sigevent.sigev_notify = SIGEV_THREAD;
- req->cb.aio_sigevent.sigev_notify_function = aioThreadHandler;
- } else {
- // Do SIG notification
- LOG_DEBUG("Doing signal notification\n");
- req->cb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
- req->cb.aio_sigevent.sigev_signo = SIG_AIO;
- }
-
- LOG_DEBUG("Calling aio_read\n");
- int ret = aio_read(&req->cb);
- LOG_DEBUG("Aio read returned\n");
- if (ret) {
- // TODO Handle each error return type as appropriate
- perror("Error issuing aio_read");
- cleanupPosixAio(env, req);
- env->ThrowNew(aioException, "error issuing aio_read request");
- }
- return req->future;
+ return doReadWrite(env, self, buffer, position, READ_FLAG);
}
/*
@@ -575,7 +587,9 @@
JNIEXPORT jobject JNICALL
Java_org_apache_aio_posix_PosixAsynchronousFileChannel_write__Ljava_nio_ByteBuffer_2J
(JNIEnv *env, jobject self, jobject buffer, jlong position)
{
- LOG_DEBUG("In write function");
+ LOG_DEBUG("In write function\n");
+
+ return doReadWrite(env, self, buffer, position, WRITE_FLAG);
}
/*
Modified:
mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java?view=diff&rev=518862&r1=518861&r2=518862
==============================================================================
---
mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java
(original)
+++
mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java
Thu Mar 15 21:33:35 2007
@@ -1,12 +1,14 @@
package org.apache.aio.posix;
import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
import org.apache.aio.AioCompletionHandler;
import org.apache.aio.AioFuture;
import org.apache.aio.Modes;
+import org.apache.aio.AioFuture.ByteBufferFuture;
public class TestPosixFileChannel {
@@ -15,31 +17,50 @@
}
public static void main(String[] args) throws Exception {
- FileInputStream in = new FileInputStream("/etc/passwd");
+ ByteBuffer buffer = ByteBuffer.allocateDirect(1000);
+
+// FileInputStream in = new FileInputStream("/etc/passwd");
+// PosixAsynchronousFileChannel channel = new
PosixAsynchronousFileChannel(
+// in.getFD(),
+// in.getChannel(),
+// Modes.READ_ONLY,
+// Executors.newCachedThreadPool(), true);
+// AioFuture<Integer> future = channel.read(buffer, 0L);
+// future.addCompletionHandler(new AioCompletionHandler<Integer>() {
+// public void onCompletion(AioFuture<Integer> future) {
+// try {
+// System.out.println(String.format("Completion handler
read %d bytes", future.get()));
+// } catch (Exception e) {
+// e.printStackTrace();
+// }
+// }
+// });
+// future.get();
+// System.out.println(buffer.position());
+// buffer.flip();
+// byte[] bytes = new byte[buffer.remaining()];
+// buffer.get(bytes);
+// System.out.println(new String(bytes));
+//
+// channel.close();
+//
+ // Test output
+ FileOutputStream out = new FileOutputStream("/tmp/aiotest");
PosixAsynchronousFileChannel channel = new
PosixAsynchronousFileChannel(
- in.getFD(),
- in.getChannel(),
- Modes.READ_ONLY,
+ out.getFD(),
+ out.getChannel(),
+ Modes.READ_WRITE,
Executors.newCachedThreadPool(), true);
- ByteBuffer buffer = ByteBuffer.allocate(1000);
- AioFuture<Integer> future = channel.read(buffer, 0L);
- System.out.println("read returned");
- future.addCompletionHandler(new AioCompletionHandler<Integer>() {
- public void onCompletion(AioFuture<Integer> future) {
- try {
- System.out.println(String.format("Completion handler read
%d bytes", future.get()));
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- future.get();
- System.out.println("********");
- System.out.println(buffer.position());
+
+ buffer.put("Have a nice day!\n".getBytes());
buffer.flip();
- byte[] bytes = new byte[buffer.remaining()];
- buffer.get(bytes);
- System.out.println(new String(bytes));
+
+ ByteBufferFuture writeFuture = channel.write(buffer, 0);
+ System.out.println("Done with write");
+ writeFuture.get();
+
+ channel.close();
+
System.out.println("Done!");
}