Author: mheath
Date: Fri Jan 26 23:02:32 2007
New Revision: 500492
URL: http://svn.apache.org/viewvc?view=rev&rev=500492
Log:
Implemented AIO support through concurrent library.
Modified:
mina/sandbox/mheath/aioj/trunk/pom.xml
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/ByteBufferPosition.java
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java
Modified: mina/sandbox/mheath/aioj/trunk/pom.xml
URL:
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/pom.xml?view=diff&rev=500492&r1=500491&r2=500492
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/pom.xml (original)
+++ mina/sandbox/mheath/aioj/trunk/pom.xml Fri Jan 26 23:02:32 2007
@@ -65,7 +65,7 @@
<executions>
<execution>
<id>javadoc-jar</id>
- <phase>package</phase>
+ <phase>install</phase>
<goals>
<goal>jar</goal>
</goals>
@@ -90,6 +90,7 @@
<artifactId>testng</artifactId>
<version>5.1</version>
<classifier>jdk15</classifier>
+ <scope>test</scope>
</dependency>
</dependencies>
Modified:
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java?view=diff&rev=500492&r1=500491&r2=500492
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
(original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
Fri Jan 26 23:02:32 2007
@@ -41,6 +41,9 @@
static interface BatchFuture extends AioFuture<Long> {
ByteBufferPosition[] getBatch();
+ ByteBufferPosition[] getUsedBatch();
+ int getOffset();
+ int getLength();
}
static interface ByteBufferFuture extends AioFuture<Integer>,
ByteBufferPosition {}
Modified:
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java?view=diff&rev=500492&r1=500491&r2=500492
==============================================================================
---
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java
(original)
+++
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java
Fri Jan 26 23:02:32 2007
@@ -155,7 +155,9 @@
* @see #lock(long,long,boolean)
* @see #tryLock(long,long,boolean)
*/
- public abstract FileLock tryLock() throws AioException;
+ public FileLock tryLock() throws AioException {
+ return tryLock(0L, Long.MAX_VALUE, false);
+ }
/**
* Attempts to acquire a lock on the given region of this channel's file.
Modified:
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/ByteBufferPosition.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/ByteBufferPosition.java?view=diff&rev=500492&r1=500491&r2=500492
==============================================================================
---
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/ByteBufferPosition.java
(original)
+++
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/ByteBufferPosition.java
Fri Jan 26 23:02:32 2007
@@ -39,6 +39,6 @@
*
* @return
*/
- long position();
+ long getPosition();
}
Modified:
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java?view=diff&rev=500492&r1=500491&r2=500492
==============================================================================
---
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java
(original)
+++
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java
Fri Jan 26 23:02:32 2007
@@ -20,6 +20,7 @@
package org.apache.aio.concurrent;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
@@ -38,15 +39,44 @@
public static class BatchFutureImpl extends AioFutureImpl<Long> implements
BatchFuture {
private final ByteBufferPosition[] batch;
+ private final int offset;
+ private final int length;
- public BatchFutureImpl(ByteBufferPosition[] batch) {
+ public BatchFutureImpl(ByteBufferPosition[] batch, int offset, int
length) {
+ if (offset < 0) {
+ throw new IllegalArgumentException("offset cannot be
negative");
+ }
+ if (length < 0) {
+ throw new IllegalArgumentException("length cannot be
negative");
+ }
this.batch = batch;
+ this.offset = offset;
+ this.length = length;
}
public ByteBufferPosition[] getBatch() {
return batch;
}
+
+ public ByteBufferPosition[] getUsedBatch() {
+ if (offset == 0 && length == batch.length) {
+ return batch;
+ }
+ ByteBufferPosition[] usedBatch = new ByteBufferPosition[length];
+ for (int i = 0; i < length; i++) {
+ usedBatch[i] = batch[offset + i];
+ }
+ return usedBatch;
+ }
+ public int getOffset() {
+ return offset;
+ }
+
+ public int getLength() {
+ return length;
+ }
+
}
public static class ByteBufferFutureImpl extends AioFutureImpl<Integer>
implements ByteBufferFuture {
@@ -63,7 +93,7 @@
return byteBuffer;
}
- public long position() {
+ public long getPosition() {
return position;
}
@@ -86,7 +116,15 @@
private AsynchronousFileChannel asynchronousFileChannel = null;
private final List<AioCompletionHandler<AioFuture>> completionHandlers =
new ArrayList<AioCompletionHandler<AioFuture>>();
private ExecutionException exception;
+
+ public AioFutureImpl() {
+ // Default constructor
+ }
+ public AioFutureImpl(AsynchronousFileChannel channel) {
+ this.asynchronousFileChannel = channel;
+ }
+
public synchronized void
addCompletionHandler(AioCompletionHandler<AioFuture> completionHandler) {
completionHandlers.add(completionHandler);
}
Modified:
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java?view=diff&rev=500492&r1=500491&r2=500492
==============================================================================
---
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java
(original)
+++
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java
Fri Jan 26 23:02:32 2007
@@ -4,7 +4,9 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import org.apache.aio.AioException;
import org.apache.aio.AioFuture;
@@ -13,17 +15,19 @@
import org.apache.aio.Modes;
import org.apache.aio.AioFuture.BatchFuture;
import org.apache.aio.AioFuture.ByteBufferFuture;
+import org.apache.aio.concurrent.AioFutureImpl.BatchFutureImpl;
+import org.apache.aio.concurrent.AioFutureImpl.ByteBufferFutureImpl;
public class ConcurrentAsynchronousFileChannel extends AsynchronousFileChannel
{
private final FileChannel channel;
- private final ExecutorService service;
+ private final ExecutorService executorService;
private final Modes mode;
- public ConcurrentAsynchronousFileChannel(FileChannel channel, Modes mode,
ExecutorService service) {
+ public ConcurrentAsynchronousFileChannel(FileChannel channel, Modes mode,
ExecutorService executorService) {
this.channel = channel;
this.mode = mode;
- this.service = service;
+ this.executorService = executorService;
}
@@ -53,67 +57,140 @@
}
@Override
- public AioFuture<FileLock> lock(long position, long size, boolean shared)
throws AioException {
- // TODO Auto-generated method stub
- return null;
+ public AioFuture<FileLock> lock(final long position, final long size,
final boolean shared) throws AioException {
+ final AioFutureImpl<FileLock> aioFuture = new
AioFutureImpl<FileLock>(this);
+ Future<FileLock> future = executorService.submit(new
Callable<FileLock>() {
+ public FileLock call() throws Exception {
+ FileLock lock = channel.lock(position, size, shared);
+ callCompletionHandlers(aioFuture);
+ return lock;
+ }
+
+ });
+ aioFuture.setFuture(future);
+ return aioFuture;
}
@Override
- public ByteBufferFuture read(ByteBuffer buffer, long position) throws
AioException {
- // TODO Auto-generated method stub
- return null;
+ public ByteBufferFuture read(final ByteBuffer buffer, final long position)
throws AioException {
+ final ByteBufferFutureImpl byteBufferFuture = new
ByteBufferFutureImpl(buffer, position);
+ Future<Integer> future = executorService.submit(new
Callable<Integer>() {
+ public Integer call() throws Exception {
+ int bytesRead = channel.read(buffer, position);
+ callCompletionHandlers(byteBufferFuture);
+ return bytesRead;
+ }
+ });
+ byteBufferFuture.setFuture(future);
+ return byteBufferFuture;
}
@Override
- public BatchFuture read(ByteBufferPosition[] byteBufferPositions, int
offset, int length) throws AioException {
- // TODO Auto-generated method stub
- return null;
+ public BatchFuture read(final ByteBufferPosition[] byteBufferPositions,
final int offset, final int length) throws AioException {
+ final BatchFutureImpl batchFuture = new
BatchFutureImpl(byteBufferPositions, offset, length);
+ Future<Long> future = executorService.submit(new Callable<Long>() {
+ public Long call() throws Exception {
+ long length = 0;
+ for (int i = 0; i < length; i++) {
+ ByteBufferPosition byteBufferPosition =
byteBufferPositions[i + offset];
+ length += channel.read(byteBufferPosition.getByteBuffer(),
byteBufferPosition.getPosition());
+ }
+ callCompletionHandlers(batchFuture);
+ return length;
+ }
+ });
+ batchFuture.setFuture(future);
+ return batchFuture;
}
@Override
public long size() throws AioException {
- // TODO Auto-generated method stub
- return 0;
- }
-
- @Override
- public AioFuture<Void> sync(boolean metaData) throws AioException {
- // TODO Auto-generated method stub
- return null;
+ try {
+ return channel.size();
+ } catch (IOException e) {
+ throw new AioException(e);
+ }
}
@Override
- public AioFuture<Void> truncate(long size) throws AioException {
- // TODO Auto-generated method stub
- return null;
+ public AioFuture<Void> sync(final boolean metaData) throws AioException {
+ final AioFutureImpl<Void> aioFuture = new AioFutureImpl<Void>();
+ Future<Void> future = executorService.submit(new Callable<Void>() {
+ public Void call() throws Exception {
+ channel.force(metaData);
+ callCompletionHandlers(aioFuture);
+ return null;
+ }
+ });
+ aioFuture.setFuture(future);
+ return aioFuture;
}
@Override
- public FileLock tryLock() throws AioException {
- // TODO Auto-generated method stub
- return null;
+ public AioFuture<Void> truncate(final long size) throws AioException {
+ final AioFutureImpl<Void> aioFuture = new AioFutureImpl<Void>();
+ Future<Void> future = executorService.submit(new Callable<Void>() {
+ public Void call() throws Exception {
+ channel.truncate(size);
+ callCompletionHandlers(aioFuture);
+ return null;
+ }
+ });
+ aioFuture.setFuture(future);
+ return aioFuture;
}
@Override
public FileLock tryLock(long position, long size, boolean shared) throws
AioException {
- // TODO Auto-generated method stub
- return null;
+ try {
+ return channel.tryLock();
+ } catch (IOException e) {
+ throw new AioException(e);
+ }
}
@Override
- public ByteBufferFuture write(ByteBuffer buffer, long position) {
- // TODO Auto-generated method stub
- return null;
+ public ByteBufferFuture write(final ByteBuffer buffer, final long
position) {
+ final ByteBufferFutureImpl byteBufferFuture = new
ByteBufferFutureImpl(buffer, position);
+ Future<Integer> future = executorService.submit(new
Callable<Integer>() {
+ public Integer call() throws Exception {
+ int bytesWritten = channel.write(buffer, position);
+ callCompletionHandlers(byteBufferFuture);
+ return bytesWritten;
+ }
+ });
+ byteBufferFuture.setFuture(future);
+ return byteBufferFuture;
}
@Override
- public BatchFuture write(ByteBufferPosition[] byteBufferPositions, int
offset, int length) {
- // TODO Auto-generated method stub
- return null;
+ public BatchFuture write(final ByteBufferPosition[] byteBufferPositions,
final int offset, final int length) {
+ final BatchFutureImpl batchFuture = new
BatchFutureImpl(byteBufferPositions, offset, length);
+ Future<Long> future = executorService.submit(new Callable<Long>() {
+ public Long call() throws Exception {
+ long length = 0;
+ for (int i = 0; i < length; i++) {
+ ByteBufferPosition byteBufferPosition =
byteBufferPositions[i + offset];
+ length += channel.write(byteBufferPosition.getByteBuffer(),
byteBufferPosition.getPosition());
+ }
+ callCompletionHandlers(batchFuture);
+ return length;
+ }
+ });
+ batchFuture.setFuture(future);
+ return batchFuture;
}
public boolean isOpen() {
return channel.isOpen();
}
+
+ private void callCompletionHandlers(final AioFutureImpl aioFuture) {
+ executorService.submit(new Runnable() {
+ public void run() {
+ aioFuture.callCompletionHandlers();
+ }
+ });
+ }
}