Author: mheath
Date: Fri Jan 26 23:02:32 2007
New Revision: 500492

URL: http://svn.apache.org/viewvc?view=rev&rev=500492
Log:
Implemented AIO support through concurrent library.

Modified:
    mina/sandbox/mheath/aioj/trunk/pom.xml
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
    
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java
    
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/ByteBufferPosition.java
    
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java
    
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java

Modified: mina/sandbox/mheath/aioj/trunk/pom.xml
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/pom.xml?view=diff&rev=500492&r1=500491&r2=500492
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/pom.xml (original)
+++ mina/sandbox/mheath/aioj/trunk/pom.xml Fri Jan 26 23:02:32 2007
@@ -65,7 +65,7 @@
                                <executions>
                                    <execution>
                                        <id>javadoc-jar</id>
-                                       <phase>package</phase>
+                                       <phase>install</phase>
                                        <goals>
                                            <goal>jar</goal>
                                        </goals>
@@ -90,6 +90,7 @@
                        <artifactId>testng</artifactId>
                        <version>5.1</version>
                        <classifier>jdk15</classifier>
+                       <scope>test</scope>
                </dependency>
        </dependencies>
 

Modified: 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java?view=diff&rev=500492&r1=500491&r2=500492
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java 
(original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java 
Fri Jan 26 23:02:32 2007
@@ -41,6 +41,9 @@
     
     static interface BatchFuture extends AioFuture<Long> {
         ByteBufferPosition[] getBatch();
+        ByteBufferPosition[] getUsedBatch();
+        int getOffset();
+        int getLength();
     }
 
     static interface ByteBufferFuture extends AioFuture<Integer>, 
ByteBufferPosition {}

Modified: 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java?view=diff&rev=500492&r1=500491&r2=500492
==============================================================================
--- 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java
 (original)
+++ 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java
 Fri Jan 26 23:02:32 2007
@@ -155,7 +155,9 @@
      * @see     #lock(long,long,boolean)
      * @see     #tryLock(long,long,boolean)
      */
-    public abstract FileLock tryLock() throws AioException;
+    public FileLock tryLock() throws AioException {
+        return tryLock(0L, Long.MAX_VALUE, false);
+    }
 
     /**
      * Attempts to acquire a lock on the given region of this channel's file.

Modified: 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/ByteBufferPosition.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/ByteBufferPosition.java?view=diff&rev=500492&r1=500491&r2=500492
==============================================================================
--- 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/ByteBufferPosition.java
 (original)
+++ 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/ByteBufferPosition.java
 Fri Jan 26 23:02:32 2007
@@ -39,6 +39,6 @@
      * 
      * @return
      */
-    long position();
+    long getPosition();
 
 }

Modified: 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java?view=diff&rev=500492&r1=500491&r2=500492
==============================================================================
--- 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java
 (original)
+++ 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java
 Fri Jan 26 23:02:32 2007
@@ -20,6 +20,7 @@
 package org.apache.aio.concurrent;
 
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
@@ -38,15 +39,44 @@
     public static class BatchFutureImpl extends AioFutureImpl<Long> implements 
BatchFuture {
 
         private final ByteBufferPosition[] batch;
+        private final int offset;
+        private final int length;
         
-        public BatchFutureImpl(ByteBufferPosition[] batch) {
+        public BatchFutureImpl(ByteBufferPosition[] batch, int offset, int 
length) {
+            if (offset < 0) {
+                throw new IllegalArgumentException("offset cannot be 
negative");
+            }
+            if (length < 0) {
+                throw new IllegalArgumentException("length cannot be 
negative");
+            }
             this.batch = batch;
+            this.offset = offset;
+            this.length = length;
         }
         
         public ByteBufferPosition[] getBatch() {
             return batch;
         }
+        
+        public ByteBufferPosition[] getUsedBatch() {
+            if (offset == 0 && length == batch.length) {
+                return batch;
+            }
+            ByteBufferPosition[] usedBatch = new ByteBufferPosition[length];
+            for (int i = 0; i < length; i++) {
+                usedBatch[i] = batch[offset + i];
+            }
+            return usedBatch;
+        }
 
+        public int getOffset() {
+            return offset;
+        }
+        
+        public int getLength() {
+            return length;
+        }
+        
     }
     
     public static class ByteBufferFutureImpl extends AioFutureImpl<Integer> 
implements ByteBufferFuture {
@@ -63,7 +93,7 @@
             return byteBuffer;
         }
 
-        public long position() {
+        public long getPosition() {
             return position;
         }
         
@@ -86,7 +116,15 @@
     private AsynchronousFileChannel asynchronousFileChannel = null;
     private final List<AioCompletionHandler<AioFuture>> completionHandlers = 
new ArrayList<AioCompletionHandler<AioFuture>>();
     private ExecutionException exception;
+
+    public AioFutureImpl() {
+        // Default constructor
+    }
     
+    public AioFutureImpl(AsynchronousFileChannel channel) {
+        this.asynchronousFileChannel = channel;
+    }
+
     public synchronized void 
addCompletionHandler(AioCompletionHandler<AioFuture> completionHandler) {
         completionHandlers.add(completionHandler);
     }

Modified: 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java
URL: 
http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java?view=diff&rev=500492&r1=500491&r2=500492
==============================================================================
--- 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java
 (original)
+++ 
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java
 Fri Jan 26 23:02:32 2007
@@ -4,7 +4,9 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import org.apache.aio.AioException;
 import org.apache.aio.AioFuture;
@@ -13,17 +15,19 @@
 import org.apache.aio.Modes;
 import org.apache.aio.AioFuture.BatchFuture;
 import org.apache.aio.AioFuture.ByteBufferFuture;
+import org.apache.aio.concurrent.AioFutureImpl.BatchFutureImpl;
+import org.apache.aio.concurrent.AioFutureImpl.ByteBufferFutureImpl;
 
 public class ConcurrentAsynchronousFileChannel extends AsynchronousFileChannel 
{
 
     private final FileChannel channel;
-    private final ExecutorService service;
+    private final ExecutorService executorService;
     private final Modes mode;
     
-    public ConcurrentAsynchronousFileChannel(FileChannel channel, Modes mode, 
ExecutorService service) {
+    public ConcurrentAsynchronousFileChannel(FileChannel channel, Modes mode, 
ExecutorService executorService) {
         this.channel = channel;
         this.mode = mode;
-        this.service = service;
+        this.executorService = executorService;
     }
     
 
@@ -53,67 +57,140 @@
     }
 
     @Override
-    public AioFuture<FileLock> lock(long position, long size, boolean shared) 
throws AioException {
-        // TODO Auto-generated method stub
-        return null;
+    public AioFuture<FileLock> lock(final long position, final long size, 
final boolean shared) throws AioException {
+        final AioFutureImpl<FileLock> aioFuture = new 
AioFutureImpl<FileLock>(this);
+        Future<FileLock> future = executorService.submit(new 
Callable<FileLock>() {
+           public FileLock call() throws Exception {
+                FileLock lock = channel.lock(position, size, shared);
+                callCompletionHandlers(aioFuture);
+                return lock;
+            }
+
+        });
+        aioFuture.setFuture(future);
+        return aioFuture;
     }
 
     @Override
-    public ByteBufferFuture read(ByteBuffer buffer, long position) throws 
AioException {
-        // TODO Auto-generated method stub
-        return null;
+    public ByteBufferFuture read(final ByteBuffer buffer, final long position) 
throws AioException {
+        final ByteBufferFutureImpl byteBufferFuture = new 
ByteBufferFutureImpl(buffer, position);
+        Future<Integer> future = executorService.submit(new 
Callable<Integer>() {
+           public Integer call() throws Exception {
+               int bytesRead = channel.read(buffer, position);
+               callCompletionHandlers(byteBufferFuture);
+               return bytesRead;
+            } 
+        });
+        byteBufferFuture.setFuture(future);
+        return byteBufferFuture;
     }
 
     @Override
-    public BatchFuture read(ByteBufferPosition[] byteBufferPositions, int 
offset, int length) throws AioException {
-        // TODO Auto-generated method stub
-        return null;
+    public BatchFuture read(final ByteBufferPosition[] byteBufferPositions, 
final int offset, final int length) throws AioException {
+        final BatchFutureImpl batchFuture = new 
BatchFutureImpl(byteBufferPositions, offset, length);
+        Future<Long> future = executorService.submit(new Callable<Long>() {
+           public Long call() throws Exception {
+               long length = 0;
+               for (int i = 0; i < length; i++) {
+                   ByteBufferPosition byteBufferPosition = 
byteBufferPositions[i + offset];
+                   length += channel.read(byteBufferPosition.getByteBuffer(), 
byteBufferPosition.getPosition());
+               }
+               callCompletionHandlers(batchFuture);
+               return length;
+            } 
+        });
+        batchFuture.setFuture(future);
+        return batchFuture;
     }
 
     @Override
     public long size() throws AioException {
-        // TODO Auto-generated method stub
-        return 0;
-    }
-
-    @Override
-    public AioFuture<Void> sync(boolean metaData) throws AioException {
-        // TODO Auto-generated method stub
-        return null;
+        try {
+            return channel.size();
+        } catch (IOException e) {
+            throw new AioException(e);
+        }
     }
 
     @Override
-    public AioFuture<Void> truncate(long size) throws AioException {
-        // TODO Auto-generated method stub
-        return null;
+    public AioFuture<Void> sync(final boolean metaData) throws AioException {
+        final AioFutureImpl<Void> aioFuture = new AioFutureImpl<Void>();
+        Future<Void> future = executorService.submit(new Callable<Void>() {
+            public Void call() throws Exception {
+                channel.force(metaData);
+                callCompletionHandlers(aioFuture);
+                return null;
+            } 
+        });
+        aioFuture.setFuture(future);
+        return aioFuture;
     }
 
     @Override
-    public FileLock tryLock() throws AioException {
-        // TODO Auto-generated method stub
-        return null;
+    public AioFuture<Void> truncate(final long size) throws AioException {
+        final AioFutureImpl<Void> aioFuture = new AioFutureImpl<Void>();
+        Future<Void> future = executorService.submit(new Callable<Void>() {
+            public Void call() throws Exception {
+                channel.truncate(size);
+                callCompletionHandlers(aioFuture);
+                return null;
+            } 
+        });
+        aioFuture.setFuture(future);
+        return aioFuture;
     }
 
     @Override
     public FileLock tryLock(long position, long size, boolean shared) throws 
AioException {
-        // TODO Auto-generated method stub
-        return null;
+        try {
+            return channel.tryLock();
+        } catch (IOException e) {
+            throw new AioException(e);
+        }
     }
 
     @Override
-    public ByteBufferFuture write(ByteBuffer buffer, long position) {
-        // TODO Auto-generated method stub
-        return null;
+    public ByteBufferFuture write(final ByteBuffer buffer, final long 
position) {
+        final ByteBufferFutureImpl byteBufferFuture = new 
ByteBufferFutureImpl(buffer, position);
+        Future<Integer> future = executorService.submit(new 
Callable<Integer>() {
+           public Integer call() throws Exception {
+               int bytesWritten = channel.write(buffer, position);
+               callCompletionHandlers(byteBufferFuture);
+               return bytesWritten;
+            } 
+        });
+        byteBufferFuture.setFuture(future);
+        return byteBufferFuture;
     }
 
     @Override
-    public BatchFuture write(ByteBufferPosition[] byteBufferPositions, int 
offset, int length) {
-        // TODO Auto-generated method stub
-        return null;
+    public BatchFuture write(final ByteBufferPosition[] byteBufferPositions, 
final int offset, final int length) {
+        final BatchFutureImpl batchFuture = new 
BatchFutureImpl(byteBufferPositions, offset, length);
+        Future<Long> future = executorService.submit(new Callable<Long>() {
+           public Long call() throws Exception {
+               long length = 0;
+               for (int i = 0; i < length; i++) {
+                   ByteBufferPosition byteBufferPosition = 
byteBufferPositions[i + offset];
+                   length += channel.write(byteBufferPosition.getByteBuffer(), 
byteBufferPosition.getPosition());
+               }
+               callCompletionHandlers(batchFuture);
+               return length;
+            } 
+        });
+        batchFuture.setFuture(future);
+        return batchFuture;
     }
 
     public boolean isOpen() {
         return channel.isOpen();
     }
+
+    private void callCompletionHandlers(final AioFutureImpl aioFuture) {
+        executorService.submit(new Runnable() {
+            public void run() {
+                aioFuture.callCompletionHandlers();
+            }
+        });
+    } 
 
 }


Reply via email to