Author: trustin
Date: Sun Nov  4 21:48:42 2007
New Revision: 591883

URL: http://svn.apache.org/viewvc?rev=591883&view=rev
Log:
* Fixed a problem that scheduledWrite* per service doesn't decrease.
* Added IoBuffer.free() which is completely optional. (I might remove it; still 
testing it..)
* Updated EchoServer example


Modified:
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoBuffer.java
    
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
    
mina/trunk/core/src/main/java/org/apache/mina/common/CachedBufferAllocator.java
    
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoBuffer.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoBufferWrapper.java
    
mina/trunk/core/src/main/java/org/apache/mina/common/SimpleBufferAllocator.java
    
mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java
    
mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/Main.java

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoBuffer.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoBuffer.java?rev=591883&r1=591882&r2=591883&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoBuffer.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoBuffer.java 
Sun Nov  4 21:48:42 2007
@@ -105,6 +105,11 @@
     public boolean isAutoExpand() {
         return autoExpand && autoExpandAllowed;
     }
+    
+    @Override
+    public boolean isDerived() {
+        return !autoExpandAllowed;
+    }
 
     @Override
     public IoBuffer setAutoExpand(boolean autoExpand) {

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java?rev=591883&r1=591882&r2=591883&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java 
(original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java 
Sun Nov  4 21:48:42 2007
@@ -414,6 +414,7 @@
         if (!failedRequests.isEmpty()) {
             WriteToClosedSessionException cause = new 
WriteToClosedSessionException(failedRequests);
             for (WriteRequest r: failedRequests) {
+                session.decreaseScheduledBytesAndMessages(r);
                 r.getFuture().setException(cause);
             }
             session.getFilterChain().fireExceptionCaught(cause);

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java?rev=591883&r1=591882&r2=591883&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java 
Sun Nov  4 21:48:42 2007
@@ -189,6 +189,10 @@
         scheduledWriteMessages.incrementAndGet();
     }
 
+    protected void decreaseScheduledWriteMessages() {
+        scheduledWriteMessages.decrementAndGet();
+    }
+
     public long getActivationTime() {
         return activationTime;
     }
@@ -203,7 +207,6 @@
 
     protected void increaseWrittenBytes(long increment) {
         writtenBytes.addAndGet(increment);
-        scheduledWriteBytes.addAndGet(-increment);
     }
 
     public long getWrittenMessages() {
@@ -212,7 +215,6 @@
 
     protected void increaseWrittenMessages() {
         writtenMessages.incrementAndGet();
-        scheduledWriteMessages.decrementAndGet();
     }
 
     public Set<WriteFuture> broadcast(Object message) {

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?rev=591883&r1=591882&r2=591883&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java 
Sun Nov  4 21:48:42 2007
@@ -381,7 +381,21 @@
             }
         }
     }
-
+    
+    protected void increaseWrittenBytesAndMessages(WriteRequest request) {
+        Object message = request.getMessage();
+        if (message instanceof IoBuffer) {
+            IoBuffer b = (IoBuffer) message;
+            if (b.hasRemaining()) {
+                increaseWrittenBytes(((IoBuffer) message).remaining());
+            } else {
+                increaseWrittenMessages();
+            }
+        } else {
+            increaseWrittenMessages();
+        }
+    }
+    
     protected void increaseWrittenBytes(long increment) {
         if (increment > 0) {
             writtenBytes += increment;
@@ -389,11 +403,11 @@
             idleCountForBoth = 0;
             idleCountForWrite = 0;
 
-            scheduledWriteBytes.addAndGet(-increment);
-
             if (getService() instanceof AbstractIoService) {
                 ((AbstractIoService) 
getService()).increaseWrittenBytes(increment);
             }
+
+            increaseScheduledWriteBytes(-increment);
         }
     }
 
@@ -406,10 +420,11 @@
 
     protected void increaseWrittenMessages() {
         writtenMessages++;
-        scheduledWriteMessages.decrementAndGet();
         if (getService() instanceof AbstractIoService) {
             ((AbstractIoService) getService()).increaseWrittenMessages();
         }
+
+        decreaseScheduledWriteMessages();
     }
 
     protected void increaseScheduledWriteBytes(long increment) {
@@ -423,6 +438,27 @@
         scheduledWriteMessages.incrementAndGet();
         if (getService() instanceof AbstractIoService) {
             ((AbstractIoService) 
getService()).increaseScheduledWriteMessages();
+        }
+    }
+
+    protected void decreaseScheduledWriteMessages() {
+        scheduledWriteMessages.decrementAndGet();
+        if (getService() instanceof AbstractIoService) {
+            ((AbstractIoService) 
getService()).decreaseScheduledWriteMessages();
+        }
+    }
+
+    protected void decreaseScheduledBytesAndMessages(WriteRequest request) {
+        Object message = request.getMessage();
+        if (message instanceof IoBuffer) {
+            IoBuffer b = (IoBuffer) message;
+            if (b.hasRemaining()) {
+                increaseScheduledWriteBytes(-((IoBuffer) message).remaining());
+            } else {
+                decreaseScheduledWriteMessages();
+            }
+        } else {
+            decreaseScheduledWriteMessages();
         }
     }
 

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/CachedBufferAllocator.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/CachedBufferAllocator.java?rev=591883&r1=591882&r2=591883&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/common/CachedBufferAllocator.java 
(original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/common/CachedBufferAllocator.java 
Sun Nov  4 21:48:42 2007
@@ -59,7 +59,7 @@
  * @version $Rev$, $Date$
  */
 public class CachedBufferAllocator implements IoBufferAllocator {
-    private static final int MAX_POOL_SIZE = 4;
+    private static final int MAX_POOL_SIZE = 8;
     
     private final ThreadLocal<Map<Integer, Queue<ByteBuffer>>> 
localRecyclables =
         new ThreadLocal<Map<Integer, Queue<ByteBuffer>>>() {
@@ -150,14 +150,7 @@
             newBuf.put(oldBuf);
             this.buf = newBuf;
             
-            // Add to the cache.
-            if (!buf.isDirect() && !buf.isReadOnly()) {
-                Queue<ByteBuffer> pool = 
localRecyclables.get().get(buf.capacity());
-                // Restrict the size of the pool to prevent OOM.
-                if (pool.size() < MAX_POOL_SIZE) {
-                    pool.offer(buf);
-                }
-            }
+            free(oldBuf);
         }
 
         @Override
@@ -189,6 +182,22 @@
         public boolean hasArray() {
             return buf.hasArray();
         }
-    }
 
+        @Override
+        public void free() {
+            free(buf);
+            buf = null; // FIXME better sanity check scheme.
+        }
+        
+        private void free(ByteBuffer buf) {
+            // Add to the cache.
+            if (!buf.isDirect() && !buf.isReadOnly() && !isDerived()) {
+                Queue<ByteBuffer> pool = 
localRecyclables.get().get(buf.capacity());
+                // Restrict the size of the pool to prevent OOM.
+                if (pool.size() < MAX_POOL_SIZE) {
+                    pool.offer(buf);
+                }
+            }
+        }
+    }
 }

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java?rev=591883&r1=591882&r2=591883&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java 
(original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java 
Sun Nov  4 21:48:42 2007
@@ -404,17 +404,7 @@
     }
 
     public void fireMessageSent(WriteRequest request) {
-        Object message = request.getMessage();
-        if (message instanceof IoBuffer) {
-            IoBuffer b = (IoBuffer) message;
-            if (b.hasRemaining()) {
-                session.increaseWrittenBytes(((IoBuffer) message).remaining());
-            } else {
-                session.increaseWrittenMessages();
-            }
-        } else {
-            session.increaseWrittenMessages();
-        }
+        session.increaseWrittenBytesAndMessages(request);
 
         try {
             request.getFuture().setWritten();

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoBuffer.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoBuffer.java?rev=591883&r1=591882&r2=591883&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoBuffer.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoBuffer.java Sun Nov  
4 21:48:42 2007
@@ -245,6 +245,14 @@
      */
     protected IoBuffer() {
     }
+    
+    /**
+     * Declares this buffer and all its derived buffers are not used anymore
+     * so that it can be reused by some [EMAIL PROTECTED] IoBufferAllocator} 
implementations.
+     * It is not mandatory to call this method, but you might want to invoke 
this
+     * method for maximum performance. 
+     */
+    public abstract void free();
 
     /**
      * Returns the underlying NIO buffer instance.
@@ -255,6 +263,12 @@
      * @see ByteBuffer#isDirect()
      */
     public abstract boolean isDirect();
+    
+    /**
+     * returns <tt>true</tt> if and only if this buffer is derived from other 
buffer
+     * via [EMAIL PROTECTED] #duplicate()}, [EMAIL PROTECTED] #slice()} or 
[EMAIL PROTECTED] #asReadOnlyBuffer()}.
+     */
+    public abstract boolean isDerived();
 
     /**
      * @see ByteBuffer#isReadOnly()

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/IoBufferWrapper.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoBufferWrapper.java?rev=591883&r1=591882&r2=591883&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoBufferWrapper.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoBufferWrapper.java 
Sun Nov  4 21:48:42 2007
@@ -636,4 +636,14 @@
     public boolean hasArray() {
         return buf.hasArray();
     }
+
+    @Override
+    public void free() {
+        buf.free();
+    }
+
+    @Override
+    public boolean isDerived() {
+        return buf.isDerived();
+    }
 }

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/SimpleBufferAllocator.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/SimpleBufferAllocator.java?rev=591883&r1=591882&r2=591883&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/common/SimpleBufferAllocator.java 
(original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/common/SimpleBufferAllocator.java 
Sun Nov  4 21:48:42 2007
@@ -133,5 +133,9 @@
         public boolean hasArray() {
             return buf.hasArray();
         }
+
+        @Override
+        public void free() {
+        }
     }
 }

Modified: 
mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java?rev=591883&r1=591882&r2=591883&view=diff
==============================================================================
--- 
mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java
 (original)
+++ 
mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java
 Sun Nov  4 21:48:42 2007
@@ -21,10 +21,12 @@
 
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoFutureListener;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoSessionLogger;
+import org.apache.mina.common.WriteFuture;
 import org.apache.mina.filter.ssl.SslFilter;
 
 /**
@@ -43,6 +45,16 @@
     }
 
     @Override
+    public void sessionClosed(IoSession session) throws Exception {
+        IoSessionLogger.getLogger(session).info("CLOSED");
+    }
+
+    @Override
+    public void sessionOpened(IoSession session) throws Exception {
+        IoSessionLogger.getLogger(session).info("OPENED");
+    }
+
+    @Override
     public void sessionIdle(IoSession session, IdleStatus status) {
         IoSessionLogger.getLogger(session).info(
                 "*** IDLE #" + session.getIdleCount(IdleStatus.BOTH_IDLE) + " 
***");
@@ -57,6 +69,11 @@
     public void messageReceived(IoSession session, Object message)
             throws Exception {
         // Write the received data back to remote peer
-        session.write(((IoBuffer) message).duplicate());
+        final IoBuffer src = (IoBuffer) message;
+        session.write(src.duplicate()).addListener(new 
IoFutureListener<WriteFuture>() {
+            public void operationComplete(WriteFuture future) {
+                src.free();
+            }
+        });
     }
 }

Modified: 
mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/Main.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/Main.java?rev=591883&r1=591882&r2=591883&view=diff
==============================================================================
--- 
mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/Main.java 
(original)
+++ 
mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/Main.java 
Sun Nov  4 21:48:42 2007
@@ -22,7 +22,9 @@
 import java.net.InetSocketAddress;
 import java.util.concurrent.Executors;
 
+import org.apache.mina.common.CachedBufferAllocator;
 import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.common.IoBuffer;
 import org.apache.mina.example.echoserver.ssl.BogusSslContextFactory;
 import org.apache.mina.filter.logging.LoggingFilter;
 import org.apache.mina.filter.ssl.SslFilter;
@@ -45,6 +47,8 @@
     public static void main(String[] args) throws Exception {
         SocketAcceptor acceptor = new 
NioSocketAcceptor(Executors.newCachedThreadPool());
         DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();
+        
+        IoBuffer.setAllocator(new CachedBufferAllocator());
 
         // Add SSL filter if SSL is enabled.
         if (USE_SSL) {
@@ -56,6 +60,7 @@
         // Bind
         acceptor.setLocalAddress(new InetSocketAddress(PORT));
         acceptor.setHandler(new EchoProtocolHandler());
+        // acceptor.getFilterChain().addLast("x", new 
WriteThrottleFilter(WriteThrottlePolicy.LOG, 0, 1048576, 0, 0, 0, 0));
         acceptor.bind();
 
         System.out.println("Listening on port " + PORT);


Reply via email to