Updated Branches: refs/heads/trunk fe3d2e8ec -> ccc7fc3c9
o The buffer is copied into a DirectBuffer if it's bigger than the SenBufferSize, so that we don't copy it many times when sending big files; o Added a method to convert a HeapBuffer to a DirectBuffer o Added some fields in the TcpSession (The SendBufferSize, and a pre-allocated DirectBuffer to send chu,ks of data) o Minimized the number of loops for the 64Mb buffer on Netty o The IoSession.writeDirect() method has been removed from the interface o Some more Javadoc Project: http://git-wip-us.apache.org/repos/asf/mina/repo Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/ccc7fc3c Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/ccc7fc3c Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/ccc7fc3c Branch: refs/heads/trunk Commit: ccc7fc3c99c1dc0f3526015004089a9960b8d97a Parents: fe3d2e8 Author: Emmanuel Lécharny <[email protected]> Authored: Fri Jan 11 00:39:25 2013 +0100 Committer: Emmanuel Lécharny <[email protected]> Committed: Fri Jan 11 00:39:25 2013 +0100 ---------------------------------------------------------------------- benchmarks/pom.xml | 2 +- ...ina3ClientVsMina3ServerBenchmarkBinaryTest.java | 10 ++-- ...ettyClientVsNettyServerBenchmarkBinaryTest.java | 2 +- .../main/java/org/apache/mina/api/IoSession.java | 10 --- .../org/apache/mina/session/AbstractIoSession.java | 36 +++++++----- .../apache/mina/transport/nio/NioTcpSession.java | 45 ++++++++++++++- .../apache/mina/transport/nio/NioUdpSession.java | 15 ++++- .../service/idlecheker/IndexedIdleChekerTest.java | 19 ++++++- .../apache/mina/session/AbstractIoSessionTest.java | 16 +++++ 9 files changed, 117 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina/blob/ccc7fc3c/benchmarks/pom.xml ---------------------------------------------------------------------- diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml index 768f7d8..41d99dd 100755 --- a/benchmarks/pom.xml +++ b/benchmarks/pom.xml @@ -31,7 +31,7 @@ <groupId>org.apache.mina</groupId> <name>Apache MINA Benchmarks tests</name> -<properties> + <properties> <!-- defined in order to run against a different MINA version --> <mina.version>${project.version}</mina.version> <netty.version>3.5.9.Final</netty.version> http://git-wip-us.apache.org/repos/asf/mina/blob/ccc7fc3c/benchmarks/src/test/java/org/apache/mina/core/Mina3ClientVsMina3ServerBenchmarkBinaryTest.java ---------------------------------------------------------------------- diff --git a/benchmarks/src/test/java/org/apache/mina/core/Mina3ClientVsMina3ServerBenchmarkBinaryTest.java b/benchmarks/src/test/java/org/apache/mina/core/Mina3ClientVsMina3ServerBenchmarkBinaryTest.java index 7efe62a..3c18609 100755 --- a/benchmarks/src/test/java/org/apache/mina/core/Mina3ClientVsMina3ServerBenchmarkBinaryTest.java +++ b/benchmarks/src/test/java/org/apache/mina/core/Mina3ClientVsMina3ServerBenchmarkBinaryTest.java @@ -56,12 +56,12 @@ public class Mina3ClientVsMina3ServerBenchmarkBinaryTest extends BenchmarkBinary @Parameters public static Collection<Object[]> getParameters() { - Object[][] parameters = new Object[][] { + Object[][] parameters = new Object[][] { { 1000000, 10, 2 * 60 }, - { 1000000, 1 * 1024, 2 * 60 }, - { 1000000, 10 * 1024, 2 * 60 }, - { 100, 64 * 1024 * 1024, 10 * 60 } - }; + { 1000000, 1 * 1024, 2 * 60 }, + { 1000000, 10 * 1024, 2 * 60 }, + { 100, 64 * 1024 * 1024, 10 * 60 } + }; return Arrays.asList(parameters); } } http://git-wip-us.apache.org/repos/asf/mina/blob/ccc7fc3c/benchmarks/src/test/java/org/apache/mina/core/NettyClientVsNettyServerBenchmarkBinaryTest.java ---------------------------------------------------------------------- diff --git a/benchmarks/src/test/java/org/apache/mina/core/NettyClientVsNettyServerBenchmarkBinaryTest.java b/benchmarks/src/test/java/org/apache/mina/core/NettyClientVsNettyServerBenchmarkBinaryTest.java index 1921ca4..b583b91 100644 --- a/benchmarks/src/test/java/org/apache/mina/core/NettyClientVsNettyServerBenchmarkBinaryTest.java +++ b/benchmarks/src/test/java/org/apache/mina/core/NettyClientVsNettyServerBenchmarkBinaryTest.java @@ -60,7 +60,7 @@ public class NettyClientVsNettyServerBenchmarkBinaryTest { 1000000, 10, 2 * 60 }, { 1000000, 1 * 1024, 2 * 60 }, { 1000000, 10 * 1024, 2 * 60 }, - { 100, 64 * 1024 * 1024, 10 * 60 } + { 10, 64 * 1024 * 1024, 10 * 60 } }; return Arrays.asList(parameters); } http://git-wip-us.apache.org/repos/asf/mina/blob/ccc7fc3c/core/src/main/java/org/apache/mina/api/IoSession.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/api/IoSession.java b/core/src/main/java/org/apache/mina/api/IoSession.java index b654547..7104fdd 100644 --- a/core/src/main/java/org/apache/mina/api/IoSession.java +++ b/core/src/main/java/org/apache/mina/api/IoSession.java @@ -347,16 +347,6 @@ public interface IoSession { public void write(Object message); /** - * Writes the message immediately. If we can't write all the message, we will get back the number of - * written bytes. - * - * @param message the message to write - * @return the number of written bytes - * - */ - public int writeDirect(Object message); - - /** * Same as {@link IoSession#write(Object)}, but provide a {@link IoFuture} for tracking the completion of this * write. * http://git-wip-us.apache.org/repos/asf/mina/blob/ccc7fc3c/core/src/main/java/org/apache/mina/session/AbstractIoSession.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/session/AbstractIoSession.java b/core/src/main/java/org/apache/mina/session/AbstractIoSession.java index d2ad113..f24dd02 100644 --- a/core/src/main/java/org/apache/mina/session/AbstractIoSession.java +++ b/core/src/main/java/org/apache/mina/session/AbstractIoSession.java @@ -523,12 +523,15 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon } /** - * {@inheritDoc} + * Writes the message immediately. If we can't write all the message, we will get back the number of + * written bytes. + * + * @param message the message to write + * @return the number of written bytes */ - public int writeDirect(Object message) { - // Default to 0 : this method should be overwritten if needed - return 0; - } + protected abstract int writeDirect(Object message); + + protected abstract ByteBuffer convertToDirectBuffer(WriteRequest writeRequest); /** * {@inheritDoc} @@ -547,8 +550,11 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon } synchronized (writeQueue) { + ByteBuffer message = (ByteBuffer) writeRequest.getMessage(); + if (writeQueue.isEmpty()) { - ByteBuffer message = (ByteBuffer) writeRequest.getMessage(); + // Transfer the buffer in a DirectByteBuffer if it's a HeapByteBuffer and if it's too big + message = convertToDirectBuffer(writeRequest); // We don't have anything in the writeQueue, let's try to write the // data in the channel immediately if we can @@ -562,8 +568,9 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon // Update the idle status for this session idleChecker.sessionWritten(this, System.currentTimeMillis()); + int remaining = message.remaining(); - if ((written < 0) || message.remaining() > 0) { + if ((written < 0) || (remaining > 0)) { // We have to push the request on the writeQueue writeQueue.add(writeRequest); @@ -588,6 +595,12 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon processMessageSent(highLevel); } } + } else { + // Transfer the buffer in a DirectByteBuffer if it's a HeapByteBuffer + message = convertToDirectBuffer(writeRequest); + + // We have to push the request on the writeQueue + writeQueue.add(writeRequest); } } @@ -841,15 +854,6 @@ public abstract class AbstractIoSession implements IoSession, ReadFilterChainCon if (future != null) { writeRequest.setFuture(future); } - /* - final WriteRequest request = lastWriteRequest; - if (request != null) { - if (future != null) { - ((DefaultWriteRequest) request).setFuture(future); - } - ((DefaultWriteRequest) request).setHighLevelMessage(message); - } - */ } catch (final RuntimeException e) { processException(e); } http://git-wip-us.apache.org/repos/asf/mina/blob/ccc7fc3c/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java b/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java index 1dbfd8d..f36f572 100644 --- a/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java +++ b/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java @@ -64,14 +64,23 @@ public class NioTcpSession extends AbstractIoSession implements SelectorListener /** the future representing this session connection operation (client only) */ private ConnectFuture connectFuture; + /** The associated selectionKey */ private SelectionKey selectionKey; + /** The Direct Buffer used to send data */ + private ByteBuffer sendBuffer; + + /** The size of the buffer configured in the socket to send data */ + private int sendBufferSize; + NioTcpSession(final IoService service, final SocketChannel channel, final SelectorLoop selectorLoop, final IdleChecker idleChecker) { super(service, idleChecker); this.channel = channel; this.selectorLoop = selectorLoop; this.configuration = new ProxyTcpSessionConfig(channel.socket()); + sendBufferSize = configuration.getSendBufferSize(); + sendBuffer = ByteBuffer.allocateDirect(sendBufferSize); } void setConnectFuture(ConnectFuture connectFuture) { @@ -140,8 +149,11 @@ public class NioTcpSession extends AbstractIoSession implements SelectorListener throw new RuntimeException("Not implemented"); } + /** + * {@inheritDoc} + */ @Override - public int writeDirect(Object message) { + protected int writeDirect(Object message) { try { // Check that we can write into the channel if (!isRegisteredForWrite()) { @@ -162,6 +174,37 @@ public class NioTcpSession extends AbstractIoSession implements SelectorListener * {@inheritDoc} */ @Override + protected ByteBuffer convertToDirectBuffer(WriteRequest writeRequest) { + ByteBuffer message = (ByteBuffer) writeRequest.getMessage(); + + if (!message.isDirect()) { + //int sendBufferSize = configuration.getSendBufferSize(); + int remaining = message.remaining(); + + if (remaining > sendBufferSize) { + ByteBuffer directBuffer = ByteBuffer.allocateDirect(remaining); + directBuffer.put(message); + directBuffer.flip(); + writeRequest.setMessage(directBuffer); + + return directBuffer; + } else { + sendBuffer.clear(); + sendBuffer.put(message); + sendBuffer.flip(); + writeRequest.setMessage(sendBuffer); + + return sendBuffer; + } + } + + return message; + } + + /** + * {@inheritDoc} + */ + @Override public void resumeRead() { // TODO throw new RuntimeException("Not implemented"); http://git-wip-us.apache.org/repos/asf/mina/blob/ccc7fc3c/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java b/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java index 44ba069..a902b9e 100644 --- a/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java +++ b/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java @@ -27,6 +27,7 @@ import org.apache.mina.api.IoService; import org.apache.mina.api.IoSessionConfig; import org.apache.mina.service.idlechecker.IdleChecker; import org.apache.mina.session.AbstractIoSession; +import org.apache.mina.session.WriteRequest; import org.apache.mina.util.AbstractIoFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -200,9 +201,19 @@ public class NioUdpSession extends AbstractIoSession { idleChecker.sessionRead(this, System.currentTimeMillis()); } + /** + * {@inheritDoc} + */ @Override - public int writeDirect(Object message) { - // TODO + protected int writeDirect(Object message) { return 0; } + + /** + * {@inheritDoc} + */ + @Override + protected ByteBuffer convertToDirectBuffer(WriteRequest writeRequest) { + return null; + } } http://git-wip-us.apache.org/repos/asf/mina/blob/ccc7fc3c/core/src/test/java/org/apache/mina/service/idlecheker/IndexedIdleChekerTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/mina/service/idlecheker/IndexedIdleChekerTest.java b/core/src/test/java/org/apache/mina/service/idlecheker/IndexedIdleChekerTest.java index f1ca8d3..3d903d0 100644 --- a/core/src/test/java/org/apache/mina/service/idlecheker/IndexedIdleChekerTest.java +++ b/core/src/test/java/org/apache/mina/service/idlecheker/IndexedIdleChekerTest.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import java.net.SocketAddress; +import java.nio.ByteBuffer; import org.apache.mina.api.IdleStatus; import org.apache.mina.api.IoFuture; @@ -34,6 +35,7 @@ import org.apache.mina.service.idlechecker.IdleChecker; import org.apache.mina.service.idlechecker.IndexedIdleChecker; import org.apache.mina.session.AbstractIoSession; import org.apache.mina.session.AbstractIoSessionConfig; +import org.apache.mina.session.WriteRequest; import org.junit.Test; /** @@ -173,7 +175,6 @@ public class IndexedIdleChekerTest { @Override public boolean isClosed() { - // TODO Auto-generated method stub return false; } @@ -182,7 +183,6 @@ public class IndexedIdleChekerTest { */ @Override protected void channelClose() { - } /** @@ -190,7 +190,22 @@ public class IndexedIdleChekerTest { */ @Override public void flushWriteQueue() { + } + /** + * {@inheritDoc} + */ + @Override + protected int writeDirect(Object message) { + return 0; + } + + /** + * {@inheritDoc} + */ + @Override + protected ByteBuffer convertToDirectBuffer(WriteRequest writeRequest) { + return null; } } } http://git-wip-us.apache.org/repos/asf/mina/blob/ccc7fc3c/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java b/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java index 28a7138..d71f83b 100644 --- a/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java +++ b/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java @@ -136,6 +136,22 @@ public class AbstractIoSessionTest { @Override public void flushWriteQueue() { } + + /** + * {@inheritDoc} + */ + @Override + protected int writeDirect(Object message) { + return 0; + } + + /** + * {@inheritDoc} + */ + @Override + protected ByteBuffer convertToDirectBuffer(WriteRequest writeRequest) { + return null; + } } private IoService service = null;
