Author: trustin
Date: Mon Nov  5 05:48:47 2007
New Revision: 591994

URL: http://svn.apache.org/viewvc?rev=591994&view=rev
Log:
Improved CachedBufferAllocator
* Direct buffer is supported
* free() is supported


Modified:
    
mina/trunk/core/src/main/java/org/apache/mina/common/CachedBufferAllocator.java

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/CachedBufferAllocator.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/CachedBufferAllocator.java?rev=591994&r1=591993&r2=591994&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/common/CachedBufferAllocator.java 
(original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/common/CachedBufferAllocator.java 
Mon Nov  5 05:48:47 2007
@@ -53,53 +53,70 @@
  * environment.
  * <p>
  * [EMAIL PROTECTED] CachedBufferAllocator} uses [EMAIL PROTECTED] 
ThreadLocal} to store the cached
- * buffer, allocates buffers whose capacity is power of 2 only, and doesn't
- * provide any caching for direct buffers.
+ * buffer, allocates buffers whose capacity is power of 2 only and provides
+ * performance advantage if [EMAIL PROTECTED] IoBuffer#free()} is called 
properly.
  *
  * @author The Apache MINA Project ([EMAIL PROTECTED])
  * @version $Rev$, $Date$
  */
 public class CachedBufferAllocator implements IoBufferAllocator {
+
     private static final int MAX_POOL_SIZE = 8;
     
-    private final ThreadLocal<Map<Integer, Queue<ByteBuffer>>> 
localRecyclables =
-        new ThreadLocal<Map<Integer, Queue<ByteBuffer>>>() {
+    private static Map<Integer, Queue<CachedBuffer>> newPoolMap() {
+        Map<Integer, Queue<CachedBuffer>> poolMap =
+            new HashMap<Integer, Queue<CachedBuffer>>();
+        for (int i = 0; i < 31; i ++) {
+            poolMap.put(1 << i, new 
CircularQueue<CachedBuffer>(MAX_POOL_SIZE));
+        }
+        poolMap.put(0, new CircularQueue<CachedBuffer>(MAX_POOL_SIZE));
+        poolMap.put(Integer.MAX_VALUE, new 
CircularQueue<CachedBuffer>(MAX_POOL_SIZE));
+        return poolMap;
+    }
+
+    private final ThreadLocal<Map<Integer, Queue<CachedBuffer>>> heapBuffers =
+        new ThreadLocal<Map<Integer, Queue<CachedBuffer>>>() {
             @Override
-            protected Map<Integer, Queue<ByteBuffer>> initialValue() {
-                Map<Integer, Queue<ByteBuffer>> queues =
-                    new HashMap<Integer, Queue<ByteBuffer>>();
-                for (int i = 0; i < 31; i ++) {
-                    queues.put(1 << i, new 
CircularQueue<ByteBuffer>(MAX_POOL_SIZE));
-                }
-                queues.put(Integer.MAX_VALUE, new 
CircularQueue<ByteBuffer>(MAX_POOL_SIZE));
-                return queues;
+            protected Map<Integer, Queue<CachedBuffer>> initialValue() {
+                return newPoolMap();
             }
-        };
-
+    };
+        
+    private final ThreadLocal<Map<Integer, Queue<CachedBuffer>>> directBuffers 
=
+        new ThreadLocal<Map<Integer, Queue<CachedBuffer>>>() {
+            @Override
+            protected Map<Integer, Queue<CachedBuffer>> initialValue() {
+                return newPoolMap();
+            }
+    };
+            
     public IoBuffer allocate(int capacity, boolean direct) {
-        return wrap(allocate0(capacity, direct));
-    }
-    
-    private ByteBuffer allocate0(int capacity, boolean direct) {
         capacity = normalizeCapacity(capacity);
-        ByteBuffer buf;
+        Queue<CachedBuffer> pool;
         if (direct) {
-            buf = ByteBuffer.allocateDirect(capacity);
+            pool = directBuffers.get().get(capacity);
         } else {
-            // Recycle if possible.
-            Queue<ByteBuffer> pool = localRecyclables.get().get(capacity);
-            buf = pool.poll();
-            if (buf != null) {
-                buf.clear();
+            pool = heapBuffers.get().get(capacity);
+        }
+        
+        // Recycle if possible.
+        IoBuffer buf = pool.poll();
+        if (buf != null) {
+            buf.clear();
+            buf.setAutoExpand(false);
+            buf.order(ByteOrder.BIG_ENDIAN);
+        } else {
+            if (direct) {
+                buf = wrap(ByteBuffer.allocateDirect(capacity));
             } else {
-                buf = ByteBuffer.allocate(capacity);
+                buf = wrap(ByteBuffer.allocate(capacity));
             }
         }
         return buf;
     }
-
+    
     public IoBuffer wrap(ByteBuffer nioBuffer) {
-        return new DirtyBuffer(nioBuffer, true);
+        return new CachedBuffer(nioBuffer, true);
     }
 
     public void dispose() {
@@ -107,6 +124,7 @@
     
     private static int normalizeCapacity(int requestedCapacity) {
         switch (requestedCapacity) {
+        case 0:
         case 1 <<  0: case 1 <<  1: case 1 <<  2: case 1 <<  3: case 1 <<  4:
         case 1 <<  5: case 1 <<  6: case 1 <<  7: case 1 <<  8: case 1 <<  9:
         case 1 << 10: case 1 << 11: case 1 << 12: case 1 << 13: case 1 << 14:
@@ -127,17 +145,22 @@
         return newCapacity;
     }
 
-    private class DirtyBuffer extends AbstractIoBuffer {
+    private class CachedBuffer extends AbstractIoBuffer {
+        private final Thread ownerThread;
         private ByteBuffer buf;
 
-        protected DirtyBuffer(ByteBuffer buf, boolean autoExpandAllowed) {
+        protected CachedBuffer(ByteBuffer buf, boolean autoExpandAllowed) {
             super(autoExpandAllowed);
+            this.ownerThread = Thread.currentThread();
             this.buf = buf;
             buf.order(ByteOrder.BIG_ENDIAN);
         }
 
         @Override
         public ByteBuffer buf() {
+            if (buf == null) {
+                throw new IllegalStateException("Buffer has been freed 
already.");
+            }
             return buf;
         }
 
@@ -145,59 +168,75 @@
         protected void capacity0(int requestedCapacity) {
             int newCapacity = normalizeCapacity(requestedCapacity);
 
-            ByteBuffer oldBuf = this.buf;
-            ByteBuffer newBuf = allocate0(newCapacity, isDirect());
+            ByteBuffer oldBuf = buf();
+            ByteBuffer newBuf = allocate(newCapacity, isDirect()).buf();
             oldBuf.clear();
             newBuf.put(oldBuf);
             this.buf = newBuf;
             
             free(oldBuf);
-            free(buf);
         }
 
         @Override
         protected IoBuffer duplicate0() {
-            return new DirtyBuffer(this.buf.duplicate(), false);
+            return new CachedBuffer(buf().duplicate(), false);
         }
 
         @Override
         protected IoBuffer slice0() {
-            return new DirtyBuffer(this.buf.slice(), false);
+            return new CachedBuffer(buf().slice(), false);
         }
 
         @Override
         protected IoBuffer asReadOnlyBuffer0() {
-            return new DirtyBuffer(this.buf.asReadOnlyBuffer(), false);
+            return new CachedBuffer(buf().asReadOnlyBuffer(), false);
         }
 
         @Override
         public byte[] array() {
-            return buf.array();
+            return buf().array();
         }
 
         @Override
         public int arrayOffset() {
-            return buf.arrayOffset();
+            return buf().arrayOffset();
         }
 
         @Override
         public boolean hasArray() {
-            return buf.hasArray();
+            return buf().hasArray();
         }
 
         @Override
         public void free() {
             free(buf);
-            buf = null; // FIXME better sanity check scheme.
+            buf = null;
         }
         
-        private void free(ByteBuffer buf) {
+        private void free(ByteBuffer oldBuf) {
+            if (oldBuf == null) {
+                return;
+            }
+            if (Thread.currentThread() != ownerThread) {
+                return;
+            }
+
             // Add to the cache.
-            if (!buf.isDirect() && !buf.isReadOnly() && !isDerived()) {
-                Queue<ByteBuffer> pool = 
localRecyclables.get().get(buf.capacity());
+            if (!oldBuf.isReadOnly() && !isDerived()) {
+                Queue<CachedBuffer> pool;
+                if (oldBuf.isDirect()) {
+                    pool = directBuffers.get().get(oldBuf.capacity());
+                } else {
+                    pool = heapBuffers.get().get(oldBuf.capacity());
+                }
+                
+                if (pool == null) {
+                    return;
+                }
+
                 // Restrict the size of the pool to prevent OOM.
                 if (pool.size() < MAX_POOL_SIZE) {
-                    pool.offer(buf);
+                    pool.offer(new CachedBuffer(oldBuf, true));
                 }
             }
         }


Reply via email to