Updated Branches:
  refs/heads/trunk fe3d2e8ec -> ccc7fc3c9

o The buffer is copied into a DirectBuffer if it's bigger than the
SenBufferSize, so that we don't copy it many times when sending big
files;
o Added a method to convert a HeapBuffer to a DirectBuffer
o Added some fields in the TcpSession (The SendBufferSize, and a
pre-allocated DirectBuffer to send chu,ks of data)
o Minimized the number of loops for the 64Mb buffer on Netty
o The IoSession.writeDirect() method has been removed from the interface
o Some more Javadoc

Project: http://git-wip-us.apache.org/repos/asf/mina/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/ccc7fc3c
Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/ccc7fc3c
Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/ccc7fc3c

Branch: refs/heads/trunk
Commit: ccc7fc3c99c1dc0f3526015004089a9960b8d97a
Parents: fe3d2e8
Author: Emmanuel Lécharny <[email protected]>
Authored: Fri Jan 11 00:39:25 2013 +0100
Committer: Emmanuel Lécharny <[email protected]>
Committed: Fri Jan 11 00:39:25 2013 +0100

----------------------------------------------------------------------
 benchmarks/pom.xml                                 |    2 +-
 ...ina3ClientVsMina3ServerBenchmarkBinaryTest.java |   10 ++--
 ...ettyClientVsNettyServerBenchmarkBinaryTest.java |    2 +-
 .../main/java/org/apache/mina/api/IoSession.java   |   10 ---
 .../org/apache/mina/session/AbstractIoSession.java |   36 +++++++-----
 .../apache/mina/transport/nio/NioTcpSession.java   |   45 ++++++++++++++-
 .../apache/mina/transport/nio/NioUdpSession.java   |   15 ++++-
 .../service/idlecheker/IndexedIdleChekerTest.java  |   19 ++++++-
 .../apache/mina/session/AbstractIoSessionTest.java |   16 +++++
 9 files changed, 117 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina/blob/ccc7fc3c/benchmarks/pom.xml
----------------------------------------------------------------------
diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml
index 768f7d8..41d99dd 100755
--- a/benchmarks/pom.xml
+++ b/benchmarks/pom.xml
@@ -31,7 +31,7 @@
   <groupId>org.apache.mina</groupId>
   <name>Apache MINA Benchmarks tests</name>
   
-<properties>
+  <properties>
      <!-- defined in order to run against a different MINA version -->
      <mina.version>${project.version}</mina.version>
      <netty.version>3.5.9.Final</netty.version>

http://git-wip-us.apache.org/repos/asf/mina/blob/ccc7fc3c/benchmarks/src/test/java/org/apache/mina/core/Mina3ClientVsMina3ServerBenchmarkBinaryTest.java
----------------------------------------------------------------------
diff --git 
a/benchmarks/src/test/java/org/apache/mina/core/Mina3ClientVsMina3ServerBenchmarkBinaryTest.java
 
b/benchmarks/src/test/java/org/apache/mina/core/Mina3ClientVsMina3ServerBenchmarkBinaryTest.java
index 7efe62a..3c18609 100755
--- 
a/benchmarks/src/test/java/org/apache/mina/core/Mina3ClientVsMina3ServerBenchmarkBinaryTest.java
+++ 
b/benchmarks/src/test/java/org/apache/mina/core/Mina3ClientVsMina3ServerBenchmarkBinaryTest.java
@@ -56,12 +56,12 @@ public class Mina3ClientVsMina3ServerBenchmarkBinaryTest 
extends BenchmarkBinary
 
     @Parameters
     public static Collection<Object[]> getParameters() {
-        Object[][] parameters = new Object[][] { 
+        Object[][] parameters = new Object[][] {
                 { 1000000, 10, 2 * 60 }, 
-                { 1000000, 1 * 1024, 2 * 60 }, 
-                { 1000000, 10 * 1024, 2 * 60 },
-                { 100, 64 * 1024 * 1024, 10 * 60 }
-        };
+                { 1000000, 1 * 1024, 2 * 60 },
+                { 1000000, 10 * 1024, 2 * 60 }, 
+                { 100, 64 * 1024 * 1024, 10 * 60 } 
+                };
         return Arrays.asList(parameters);
     }
 }

http://git-wip-us.apache.org/repos/asf/mina/blob/ccc7fc3c/benchmarks/src/test/java/org/apache/mina/core/NettyClientVsNettyServerBenchmarkBinaryTest.java
----------------------------------------------------------------------
diff --git 
a/benchmarks/src/test/java/org/apache/mina/core/NettyClientVsNettyServerBenchmarkBinaryTest.java
 
b/benchmarks/src/test/java/org/apache/mina/core/NettyClientVsNettyServerBenchmarkBinaryTest.java
index 1921ca4..b583b91 100644
--- 
a/benchmarks/src/test/java/org/apache/mina/core/NettyClientVsNettyServerBenchmarkBinaryTest.java
+++ 
b/benchmarks/src/test/java/org/apache/mina/core/NettyClientVsNettyServerBenchmarkBinaryTest.java
@@ -60,7 +60,7 @@ public class NettyClientVsNettyServerBenchmarkBinaryTest
                 { 1000000, 10, 2 * 60 }, 
                 { 1000000, 1 * 1024, 2 * 60 }, 
                 { 1000000, 10 * 1024, 2 * 60 },
-                { 100, 64 * 1024 * 1024, 10 * 60 }
+                { 10, 64 * 1024 * 1024, 10 * 60 }
         };
         return Arrays.asList(parameters);
     }

http://git-wip-us.apache.org/repos/asf/mina/blob/ccc7fc3c/core/src/main/java/org/apache/mina/api/IoSession.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/api/IoSession.java 
b/core/src/main/java/org/apache/mina/api/IoSession.java
index b654547..7104fdd 100644
--- a/core/src/main/java/org/apache/mina/api/IoSession.java
+++ b/core/src/main/java/org/apache/mina/api/IoSession.java
@@ -347,16 +347,6 @@ public interface IoSession {
     public void write(Object message);
 
     /**
-     * Writes the message immediately. If we can't write all the message, we 
will get back the number of
-     * written bytes.
-     * 
-     * @param message the message to write
-     * @return the number of written bytes
-     * 
-     */
-    public int writeDirect(Object message);
-
-    /**
      * Same as {@link IoSession#write(Object)}, but provide a {@link IoFuture} 
for tracking the completion of this
      * write.
      * 

http://git-wip-us.apache.org/repos/asf/mina/blob/ccc7fc3c/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/session/AbstractIoSession.java 
b/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
index d2ad113..f24dd02 100644
--- a/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
+++ b/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
@@ -523,12 +523,15 @@ public abstract class AbstractIoSession implements 
IoSession, ReadFilterChainCon
     }
 
     /**
-     * {@inheritDoc}
+     * Writes the message immediately. If we can't write all the message, we 
will get back the number of
+     * written bytes.
+     * 
+     * @param message the message to write
+     * @return the number of written bytes
      */
-    public int writeDirect(Object message) {
-        // Default to 0 : this method should be overwritten if needed
-        return 0;
-    }
+    protected abstract int writeDirect(Object message);
+
+    protected abstract ByteBuffer convertToDirectBuffer(WriteRequest 
writeRequest);
 
     /**
      * {@inheritDoc}
@@ -547,8 +550,11 @@ public abstract class AbstractIoSession implements 
IoSession, ReadFilterChainCon
         }
 
         synchronized (writeQueue) {
+            ByteBuffer message = (ByteBuffer) writeRequest.getMessage();
+
             if (writeQueue.isEmpty()) {
-                ByteBuffer message = (ByteBuffer) writeRequest.getMessage();
+                // Transfer the buffer in a DirectByteBuffer if it's a 
HeapByteBuffer and if it's too big
+                message = convertToDirectBuffer(writeRequest);
 
                 // We don't have anything in the writeQueue, let's try to 
write the
                 // data in the channel immediately if we can
@@ -562,8 +568,9 @@ public abstract class AbstractIoSession implements 
IoSession, ReadFilterChainCon
 
                 // Update the idle status for this session
                 idleChecker.sessionWritten(this, System.currentTimeMillis());
+                int remaining = message.remaining();
 
-                if ((written < 0) || message.remaining() > 0) {
+                if ((written < 0) || (remaining > 0)) {
                     // We have to push the request on the writeQueue
                     writeQueue.add(writeRequest);
 
@@ -588,6 +595,12 @@ public abstract class AbstractIoSession implements 
IoSession, ReadFilterChainCon
                         processMessageSent(highLevel);
                     }
                 }
+            } else {
+                // Transfer the buffer in a DirectByteBuffer if it's a 
HeapByteBuffer
+                message = convertToDirectBuffer(writeRequest);
+
+                // We have to push the request on the writeQueue
+                writeQueue.add(writeRequest);
             }
         }
 
@@ -841,15 +854,6 @@ public abstract class AbstractIoSession implements 
IoSession, ReadFilterChainCon
             if (future != null) {
                 writeRequest.setFuture(future);
             }
-            /*
-            final WriteRequest request = lastWriteRequest;
-            if (request != null) {
-                if (future != null) {
-                    ((DefaultWriteRequest) request).setFuture(future);
-                }
-                ((DefaultWriteRequest) request).setHighLevelMessage(message);
-            }
-                */
         } catch (final RuntimeException e) {
             processException(e);
         }

http://git-wip-us.apache.org/repos/asf/mina/blob/ccc7fc3c/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java 
b/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java
index 1dbfd8d..f36f572 100644
--- a/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java
+++ b/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java
@@ -64,14 +64,23 @@ public class NioTcpSession extends AbstractIoSession 
implements SelectorListener
     /** the future representing this session connection operation (client 
only) */
     private ConnectFuture connectFuture;
 
+    /** The associated selectionKey */
     private SelectionKey selectionKey;
 
+    /** The Direct Buffer used to send data */
+    private ByteBuffer sendBuffer;
+
+    /** The size of the buffer configured in the socket to send data */
+    private int sendBufferSize;
+
     NioTcpSession(final IoService service, final SocketChannel channel, final 
SelectorLoop selectorLoop,
             final IdleChecker idleChecker) {
         super(service, idleChecker);
         this.channel = channel;
         this.selectorLoop = selectorLoop;
         this.configuration = new ProxyTcpSessionConfig(channel.socket());
+        sendBufferSize = configuration.getSendBufferSize();
+        sendBuffer = ByteBuffer.allocateDirect(sendBufferSize);
     }
 
     void setConnectFuture(ConnectFuture connectFuture) {
@@ -140,8 +149,11 @@ public class NioTcpSession extends AbstractIoSession 
implements SelectorListener
         throw new RuntimeException("Not implemented");
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public int writeDirect(Object message) {
+    protected int writeDirect(Object message) {
         try {
             // Check that we can write into the channel
             if (!isRegisteredForWrite()) {
@@ -162,6 +174,37 @@ public class NioTcpSession extends AbstractIoSession 
implements SelectorListener
      * {@inheritDoc}
      */
     @Override
+    protected ByteBuffer convertToDirectBuffer(WriteRequest writeRequest) {
+        ByteBuffer message = (ByteBuffer) writeRequest.getMessage();
+
+        if (!message.isDirect()) {
+            //int sendBufferSize = configuration.getSendBufferSize();
+            int remaining = message.remaining();
+
+            if (remaining > sendBufferSize) {
+                ByteBuffer directBuffer = ByteBuffer.allocateDirect(remaining);
+                directBuffer.put(message);
+                directBuffer.flip();
+                writeRequest.setMessage(directBuffer);
+
+                return directBuffer;
+            } else {
+                sendBuffer.clear();
+                sendBuffer.put(message);
+                sendBuffer.flip();
+                writeRequest.setMessage(sendBuffer);
+
+                return sendBuffer;
+            }
+        }
+
+        return message;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public void resumeRead() {
         // TODO
         throw new RuntimeException("Not implemented");

http://git-wip-us.apache.org/repos/asf/mina/blob/ccc7fc3c/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java 
b/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java
index 44ba069..a902b9e 100644
--- a/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java
+++ b/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java
@@ -27,6 +27,7 @@ import org.apache.mina.api.IoService;
 import org.apache.mina.api.IoSessionConfig;
 import org.apache.mina.service.idlechecker.IdleChecker;
 import org.apache.mina.session.AbstractIoSession;
+import org.apache.mina.session.WriteRequest;
 import org.apache.mina.util.AbstractIoFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -200,9 +201,19 @@ public class NioUdpSession extends AbstractIoSession {
         idleChecker.sessionRead(this, System.currentTimeMillis());
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public int writeDirect(Object message) {
-        // TODO
+    protected int writeDirect(Object message) {
         return 0;
     }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected ByteBuffer convertToDirectBuffer(WriteRequest writeRequest) {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/mina/blob/ccc7fc3c/core/src/test/java/org/apache/mina/service/idlecheker/IndexedIdleChekerTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/mina/service/idlecheker/IndexedIdleChekerTest.java
 
b/core/src/test/java/org/apache/mina/service/idlecheker/IndexedIdleChekerTest.java
index f1ca8d3..3d903d0 100644
--- 
a/core/src/test/java/org/apache/mina/service/idlecheker/IndexedIdleChekerTest.java
+++ 
b/core/src/test/java/org/apache/mina/service/idlecheker/IndexedIdleChekerTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 
 import java.net.SocketAddress;
+import java.nio.ByteBuffer;
 
 import org.apache.mina.api.IdleStatus;
 import org.apache.mina.api.IoFuture;
@@ -34,6 +35,7 @@ import org.apache.mina.service.idlechecker.IdleChecker;
 import org.apache.mina.service.idlechecker.IndexedIdleChecker;
 import org.apache.mina.session.AbstractIoSession;
 import org.apache.mina.session.AbstractIoSessionConfig;
+import org.apache.mina.session.WriteRequest;
 import org.junit.Test;
 
 /**
@@ -173,7 +175,6 @@ public class IndexedIdleChekerTest {
 
         @Override
         public boolean isClosed() {
-            // TODO Auto-generated method stub
             return false;
         }
 
@@ -182,7 +183,6 @@ public class IndexedIdleChekerTest {
          */
         @Override
         protected void channelClose() {
-
         }
 
         /**
@@ -190,7 +190,22 @@ public class IndexedIdleChekerTest {
          */
         @Override
         public void flushWriteQueue() {
+        }
 
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        protected int writeDirect(Object message) {
+            return 0;
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        protected ByteBuffer convertToDirectBuffer(WriteRequest writeRequest) {
+            return null;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/mina/blob/ccc7fc3c/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java 
b/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java
index 28a7138..d71f83b 100644
--- a/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java
+++ b/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java
@@ -136,6 +136,22 @@ public class AbstractIoSessionTest {
         @Override
         public void flushWriteQueue() {
         }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        protected int writeDirect(Object message) {
+            return 0;
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        protected ByteBuffer convertToDirectBuffer(WriteRequest writeRequest) {
+            return null;
+        }
     }
 
     private IoService service = null;

Reply via email to