Updated Branches: refs/heads/trunk fbc3ba2af -> 363f108c1
Repair Netty 4 benchmarks tests, update Netty 4 version and include benchmarks2 in build Project: http://git-wip-us.apache.org/repos/asf/mina/repo Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/363f108c Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/363f108c Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/363f108c Branch: refs/heads/trunk Commit: 363f108c1986086fa7a797aee42b0f1070a51a01 Parents: fbc3ba2 Author: Jeff MAURY <[email protected]> Authored: Wed Jul 24 11:48:16 2013 +0200 Committer: Jeff MAURY <[email protected]> Committed: Wed Jul 24 11:48:16 2013 +0200 ---------------------------------------------------------------------- benchmarks2/pom.xml | 2 +- .../mina/core/BenchmarkClientFactory.java | 3 - .../core/nio/tcp/Netty4TcpBenchmarkClient.java | 72 +++++---- .../core/nio/tcp/Netty4TcpBenchmarkServer.java | 39 ++--- .../core/nio/udp/Netty4UdpBenchmarkClient.java | 71 ++++----- .../core/nio/udp/Netty4UdpBenchmarkServer.java | 146 +++++++------------ pom.xml | 1 + 7 files changed, 153 insertions(+), 181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina/blob/363f108c/benchmarks2/pom.xml ---------------------------------------------------------------------- diff --git a/benchmarks2/pom.xml b/benchmarks2/pom.xml index 2b03467..292e957 100755 --- a/benchmarks2/pom.xml +++ b/benchmarks2/pom.xml @@ -34,7 +34,7 @@ <properties> <!-- defined in order to run against a different MINA version --> <mina.version>${project.version}</mina.version> - <netty.version>4.0.0.CR3</netty.version> + <netty.version>4.0.4.Final</netty.version> </properties> <dependencies> http://git-wip-us.apache.org/repos/asf/mina/blob/363f108c/benchmarks2/src/test/java/org/apache/mina/core/BenchmarkClientFactory.java ---------------------------------------------------------------------- diff --git a/benchmarks2/src/test/java/org/apache/mina/core/BenchmarkClientFactory.java b/benchmarks2/src/test/java/org/apache/mina/core/BenchmarkClientFactory.java index b866d18..ecf00f7 100755 --- a/benchmarks2/src/test/java/org/apache/mina/core/BenchmarkClientFactory.java +++ b/benchmarks2/src/test/java/org/apache/mina/core/BenchmarkClientFactory.java @@ -19,7 +19,6 @@ */ package org.apache.mina.core; -import org.apache.mina.core.nio.tcp.Mina3TcpBenchmarkClient; import org.apache.mina.core.nio.tcp.Netty4TcpBenchmarkClient; import org.apache.mina.core.nio.udp.Netty4UdpBenchmarkClient; @@ -36,8 +35,6 @@ public class BenchmarkClientFactory implements BenchmarkFactory<BenchmarkClient> return new Netty4TcpBenchmarkClient(); case Netty4_udp: return new Netty4UdpBenchmarkClient(); - case Mina3_tcp: - return new Mina3TcpBenchmarkClient(); default: throw new IllegalArgumentException("Invalid type " + type); } http://git-wip-us.apache.org/repos/asf/mina/blob/363f108c/benchmarks2/src/test/java/org/apache/mina/core/nio/tcp/Netty4TcpBenchmarkClient.java ---------------------------------------------------------------------- diff --git a/benchmarks2/src/test/java/org/apache/mina/core/nio/tcp/Netty4TcpBenchmarkClient.java b/benchmarks2/src/test/java/org/apache/mina/core/nio/tcp/Netty4TcpBenchmarkClient.java index 7edde48..fcc8782 100644 --- a/benchmarks2/src/test/java/org/apache/mina/core/nio/tcp/Netty4TcpBenchmarkClient.java +++ b/benchmarks2/src/test/java/org/apache/mina/core/nio/tcp/Netty4TcpBenchmarkClient.java @@ -19,13 +19,19 @@ */ package org.apache.mina.core.nio.tcp; -import io.netty.bootstrap.ChannelFactory; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.channels.Channels; import java.util.concurrent.CountDownLatch; import org.apache.mina.core.BenchmarkClient; @@ -36,8 +42,8 @@ import org.apache.mina.core.BenchmarkClient; */ public class Netty4TcpBenchmarkClient implements BenchmarkClient { - private ChannelFactory factory; - + private EventLoopGroup group = new NioEventLoopGroup(); + /** * */ @@ -48,42 +54,48 @@ public class Netty4TcpBenchmarkClient implements BenchmarkClient { * {@inheritedDoc} */ public void start(final int port, final CountDownLatch counter, final byte[] data) throws IOException { - factory = new NioClientSocketChannelFactory(); - ClientBootstrap bootstrap = new ClientBootstrap(factory); - bootstrap.setOption("sendBufferSize", 64 * 1024); - bootstrap.setOption("tcpNoDelay", true); - bootstrap.setPipelineFactory(new ChannelPipelineFactory() { - public ChannelPipeline getPipeline() throws Exception { - return Channels.pipeline(new SimpleChannelUpstreamHandler() { + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(group); + bootstrap.option(ChannelOption.SO_SNDBUF, 64 * 1024); + bootstrap.option(ChannelOption.TCP_NODELAY, true); + bootstrap.channel(NioSocketChannel.class); + bootstrap.handler(new ChannelInitializer<SocketChannel>() { + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { private void sendMessage(ChannelHandlerContext ctx, byte[] data) { - ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data); - ctx.getChannel().write(buffer); + ByteBuf buf = ctx.alloc().buffer(data.length); + buf.writeBytes(data); + ctx.writeAndFlush(buf); } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - if (e.getMessage() instanceof ChannelBuffer) { - ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); - for (int i = 0; i < buffer.readableBytes(); ++i) { - counter.countDown(); - if (counter.getCount() > 0) { - sendMessage(ctx, data); - } else { - ctx.getChannel().close(); - } + public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception { + ByteBuf buf = (ByteBuf)message; + for(int i=0; i < buf.readableBytes();i++) { + counter.countDown(); + if (counter.getCount() > 0) { + sendMessage(ctx, data); + } else { + ctx.channel().close(); } - } else { - throw new IllegalArgumentException(e.getMessage().getClass().getName()); } + buf.release(); } @Override - public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - sendMessage(ctx, data); + public void channelActive(ChannelHandlerContext ctx) throws Exception { + sendMessage(ctx, data); } - }); } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } }); bootstrap.connect(new InetSocketAddress(port)); } @@ -92,6 +104,6 @@ public class Netty4TcpBenchmarkClient implements BenchmarkClient { * {@inheritedDoc} */ public void stop() throws IOException { - factory.releaseExternalResources(); + group.shutdownGracefully(); } } http://git-wip-us.apache.org/repos/asf/mina/blob/363f108c/benchmarks2/src/test/java/org/apache/mina/core/nio/tcp/Netty4TcpBenchmarkServer.java ---------------------------------------------------------------------- diff --git a/benchmarks2/src/test/java/org/apache/mina/core/nio/tcp/Netty4TcpBenchmarkServer.java b/benchmarks2/src/test/java/org/apache/mina/core/nio/tcp/Netty4TcpBenchmarkServer.java index 1fc1f12..13a8a78 100644 --- a/benchmarks2/src/test/java/org/apache/mina/core/nio/tcp/Netty4TcpBenchmarkServer.java +++ b/benchmarks2/src/test/java/org/apache/mina/core/nio/tcp/Netty4TcpBenchmarkServer.java @@ -21,16 +21,14 @@ package org.apache.mina.core.nio.tcp; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; -import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundByteHandlerAdapter; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; +import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.Attribute; import io.netty.util.AttributeKey; @@ -59,7 +57,9 @@ public class Netty4TcpBenchmarkServer implements BenchmarkServer { private static final AttributeKey<Integer> LENGTH_ATTRIBUTE = new AttributeKey<Integer>("length"); - private class TestServerHandler extends ChannelInboundByteHandlerAdapter { + private ServerBootstrap bootstrap = null; + + private class TestServerHandler extends ChannelInboundHandlerAdapter { public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("childChannelOpen"); ctx.attr(STATE_ATTRIBUTE).set(State.WAIT_FOR_FIRST_BYTE_LENGTH); @@ -75,12 +75,13 @@ public class Netty4TcpBenchmarkServer implements BenchmarkServer { } @Override - public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { + public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception { + ByteBuf buffer = (ByteBuf)message; State state = ctx.attr(STATE_ATTRIBUTE).get(); int length = 0; Attribute<Integer> lengthAttribute = ctx.attr(LENGTH_ATTRIBUTE); - if (lengthAttribute != null) { + if (lengthAttribute.get() != null) { length = lengthAttribute.get(); } @@ -106,9 +107,7 @@ public class Netty4TcpBenchmarkServer implements BenchmarkServer { state = State.READING; if ((length == 0) && (buffer.readableBytes() == 0)) { - MessageBuf<Object> messageOut = ctx.nextOutboundMessageBuffer(); - messageOut.add(ACK.slice()); - ctx.flush(); + ctx.writeAndFlush(ACK.retain(1).resetReaderIndex()); state = State.WAIT_FOR_FIRST_BYTE_LENGTH; } @@ -122,9 +121,7 @@ public class Netty4TcpBenchmarkServer implements BenchmarkServer { buffer.skipBytes(remaining); } else { buffer.skipBytes(length); - MessageBuf<Object> messageOut = ctx.nextOutboundMessageBuffer(); - messageOut.add(ACK.slice()); - ctx.flush(); + ctx.writeAndFlush(ACK.retain(1).resetReaderIndex()); state = State.WAIT_FOR_FIRST_BYTE_LENGTH; length = 0; } @@ -133,6 +130,7 @@ public class Netty4TcpBenchmarkServer implements BenchmarkServer { ctx.attr(STATE_ATTRIBUTE).set(state); ctx.attr(LENGTH_ATTRIBUTE).set(length); + buffer.release(); } } @@ -141,9 +139,6 @@ public class Netty4TcpBenchmarkServer implements BenchmarkServer { * @throws */ public void start(int port) throws IOException { - ServerBootstrap bootstrap = null; - ; - try { bootstrap = new ServerBootstrap(); bootstrap.option(ChannelOption.SO_RCVBUF, 128 * 1024); @@ -158,15 +153,11 @@ public class Netty4TcpBenchmarkServer implements BenchmarkServer { }; }); ChannelFuture bindFuture = bootstrap.bind(); - bindFuture.sync(); - Channel channel = bindFuture.channel(); - ChannelFuture closeFuture = channel.closeFuture(); + //bindFuture.sync(); + //Channel channel = bindFuture.channel(); + //ChannelFuture closeFuture = channel.closeFuture(); //closeFuture.sync(); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); } finally { - bootstrap.shutdown(); } } @@ -174,5 +165,7 @@ public class Netty4TcpBenchmarkServer implements BenchmarkServer { * {@inheritedDoc} */ public void stop() throws IOException { + bootstrap.childGroup().shutdownGracefully(); + bootstrap.group().shutdownGracefully(); } } http://git-wip-us.apache.org/repos/asf/mina/blob/363f108c/benchmarks2/src/test/java/org/apache/mina/core/nio/udp/Netty4UdpBenchmarkClient.java ---------------------------------------------------------------------- diff --git a/benchmarks2/src/test/java/org/apache/mina/core/nio/udp/Netty4UdpBenchmarkClient.java b/benchmarks2/src/test/java/org/apache/mina/core/nio/udp/Netty4UdpBenchmarkClient.java index 2030e5e..267dffb 100644 --- a/benchmarks2/src/test/java/org/apache/mina/core/nio/udp/Netty4UdpBenchmarkClient.java +++ b/benchmarks2/src/test/java/org/apache/mina/core/nio/udp/Netty4UdpBenchmarkClient.java @@ -19,13 +19,19 @@ */ package org.apache.mina.core.nio.udp; -import io.netty.bootstrap.ChannelFactory; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.DatagramPacket; +import io.netty.channel.socket.nio.NioDatagramChannel; import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.channels.Channels; import java.util.concurrent.CountDownLatch; import org.apache.mina.core.BenchmarkClient; @@ -36,7 +42,7 @@ import org.apache.mina.core.BenchmarkClient; */ public class Netty4UdpBenchmarkClient implements BenchmarkClient { - private ChannelFactory factory; + private Bootstrap bootstrap; /** * @@ -48,50 +54,47 @@ public class Netty4UdpBenchmarkClient implements BenchmarkClient { * {@inheritedDoc} */ public void start(final int port, final CountDownLatch counter, final byte[] data) throws IOException { - factory = new NioDatagramChannelFactory(); - ConnectionlessBootstrap bootstrap = new ConnectionlessBootstrap(factory); - bootstrap.setOption("sendBufferSize", 65536); - bootstrap.setPipelineFactory(new ChannelPipelineFactory() { - public ChannelPipeline getPipeline() throws Exception { - return Channels.pipeline(new SimpleChannelUpstreamHandler() { - private void sendMessage(ChannelHandlerContext ctx, byte[] data) { - ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data); - ctx.getChannel().write(buffer); - } + bootstrap = new Bootstrap(); + bootstrap.option(ChannelOption.SO_SNDBUF, 65536); + bootstrap.group(new NioEventLoopGroup()); + bootstrap.channel(NioDatagramChannel.class); + bootstrap.handler(new ChannelInitializer<DatagramChannel>() { + @Override + protected void initChannel(DatagramChannel ch) throws Exception { + ch.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() { + private void sendMessage(ChannelHandlerContext ctx, byte[] data, InetSocketAddress address) { + ByteBuf buf = ctx.alloc().buffer(data.length); + buf.writeBytes(data); + ctx.writeAndFlush(new DatagramPacket(buf, address)); + } + @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - if (e.getMessage() instanceof ChannelBuffer) { - ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); - for (int i = 0; i < buffer.readableBytes(); ++i) { - counter.countDown(); - if (counter.getCount() > 0) { - sendMessage(ctx, data); - } else { - ctx.getChannel().close(); - } - } - } else { - throw new IllegalArgumentException(e.getMessage().getClass().getName()); - } + public void channelActive(ChannelHandlerContext ctx) throws Exception { + sendMessage(ctx, data, new InetSocketAddress("localhost", port)); } @Override - public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - sendMessage(ctx, data); + protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket message) throws Exception { + for(int i=0; i < message.content().readableBytes();i++) { + counter.countDown(); + if (counter.getCount() > 0) { + sendMessage(ctx, data, message.sender()); + } else { + ctx.channel().close(); + } + } } - }); } }); - bootstrap.connect(new InetSocketAddress(port)); + bootstrap.bind(port+1); } /** * {@inheritedDoc} */ public void stop() throws IOException { - factory.shutdown(); - factory.releaseExternalResources(); + bootstrap.group().shutdownGracefully(); } } http://git-wip-us.apache.org/repos/asf/mina/blob/363f108c/benchmarks2/src/test/java/org/apache/mina/core/nio/udp/Netty4UdpBenchmarkServer.java ---------------------------------------------------------------------- diff --git a/benchmarks2/src/test/java/org/apache/mina/core/nio/udp/Netty4UdpBenchmarkServer.java b/benchmarks2/src/test/java/org/apache/mina/core/nio/udp/Netty4UdpBenchmarkServer.java index a8c3b18..be1c625 100644 --- a/benchmarks2/src/test/java/org/apache/mina/core/nio/udp/Netty4UdpBenchmarkServer.java +++ b/benchmarks2/src/test/java/org/apache/mina/core/nio/udp/Netty4UdpBenchmarkServer.java @@ -19,23 +19,24 @@ */ package org.apache.mina.core.nio.udp; -import io.netty.bootstrap.ChannelFactory; +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.group.ChannelGroup; -import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.DatagramPacket; +import io.netty.channel.socket.nio.NioDatagramChannel; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.channels.Channels; -import java.util.HashMap; -import java.util.Map; - import org.apache.mina.core.BenchmarkServer; -import com.sun.jdi.event.ExceptionEvent; - /** * A Netty 3 based UDP server * @@ -47,137 +48,102 @@ public class Netty4UdpBenchmarkServer implements BenchmarkServer { WAIT_FOR_FIRST_BYTE_LENGTH, WAIT_FOR_SECOND_BYTE_LENGTH, WAIT_FOR_THIRD_BYTE_LENGTH, WAIT_FOR_FOURTH_BYTE_LENGTH, READING } - private static final ChannelBuffer ACK = ChannelBuffers.buffer(1); - - static { - ACK.writeByte(0); - } - - private static final String STATE_ATTRIBUTE = Netty4UdpBenchmarkServer.class.getName() + ".state"; - - private static final String LENGTH_ATTRIBUTE = Netty4UdpBenchmarkServer.class.getName() + ".length"; - - private ChannelFactory factory; + private static final ByteBuf ACK = Unpooled.buffer(1); - private ChannelGroup allChannels = new DefaultChannelGroup(); + private static final AttributeKey<State> STATE_ATTRIBUTE = new AttributeKey<State>("state"); - /** - * Allocate a map as attachment for storing attributes. - * - * @param ctx the channel context - * @return the map from the attachment - */ - protected static Map<String, Object> getAttributesMap(ChannelHandlerContext ctx) { - Map<String, Object> map = (Map<String, Object>) ctx.getAttachment(); - if (map == null) { - map = new HashMap<String, Object>(); - ctx.setAttachment(map); - } - return map; - } - - private static void setAttribute(ChannelHandlerContext ctx, String name, Object value) { - getAttributesMap(ctx).put(name, value); - } + private static final AttributeKey<Integer> LENGTH_ATTRIBUTE = new AttributeKey<Integer>("length"); + + static { + ACK.writeByte(0); + } - private static Object getAttribute(ChannelHandlerContext ctx, String name) { - return getAttributesMap(ctx).get(name); - } + private Bootstrap bootstrap; /** * {@inheritDoc} */ public void start(int port) throws IOException { - factory = new NioDatagramChannelFactory(); - ConnectionlessBootstrap bootstrap = new ConnectionlessBootstrap(factory); - bootstrap.setPipelineFactory(new ChannelPipelineFactory() { - public ChannelPipeline getPipeline() throws Exception { - return Channels.pipeline(new SimpleChannelUpstreamHandler() { + bootstrap = new Bootstrap(); + bootstrap.group(new NioEventLoopGroup()); + bootstrap.channel(NioDatagramChannel.class); + bootstrap.option(ChannelOption.SO_RCVBUF, 65536); + bootstrap.handler(new ChannelInitializer<DatagramChannel>() { + + @Override + protected void initChannel(DatagramChannel ch) throws Exception { + ch.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() { @Override - public void childChannelOpen(ChannelHandlerContext ctx, ChildChannelStateEvent e) throws Exception { - System.out.println("childChannelOpen"); - setAttribute(ctx, STATE_ATTRIBUTE, State.WAIT_FOR_FIRST_BYTE_LENGTH); + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ctx.attr(STATE_ATTRIBUTE).set(State.WAIT_FOR_FIRST_BYTE_LENGTH); } @Override - public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - System.out.println("channelOpen"); - setAttribute(ctx, STATE_ATTRIBUTE, State.WAIT_FOR_FIRST_BYTE_LENGTH); - allChannels.add(ctx.getChannel()); - } + protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket message) throws Exception { + ByteBuf buffer = message.content(); + State state = ctx.attr(STATE_ATTRIBUTE).get(); + int length = 0; + Attribute<Integer> lengthAttribute = ctx.attr(LENGTH_ATTRIBUTE); + + if (lengthAttribute.get() != null) { + length = lengthAttribute.get(); + } - @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - if (e.getMessage() instanceof ChannelBuffer) { - ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); - - State state = (State) getAttribute(ctx, STATE_ATTRIBUTE); - int length = 0; - if (getAttributesMap(ctx).containsKey(LENGTH_ATTRIBUTE)) { - length = (Integer) getAttribute(ctx, LENGTH_ATTRIBUTE); - } - while (buffer.readableBytes() > 0) { - switch (state) { + while (buffer.readableBytes() > 0) { + switch (state) { case WAIT_FOR_FIRST_BYTE_LENGTH: length = (buffer.readByte() & 255) << 24; state = State.WAIT_FOR_SECOND_BYTE_LENGTH; break; + case WAIT_FOR_SECOND_BYTE_LENGTH: length += (buffer.readByte() & 255) << 16; state = State.WAIT_FOR_THIRD_BYTE_LENGTH; break; + case WAIT_FOR_THIRD_BYTE_LENGTH: length += (buffer.readByte() & 255) << 8; state = State.WAIT_FOR_FOURTH_BYTE_LENGTH; break; + case WAIT_FOR_FOURTH_BYTE_LENGTH: length += (buffer.readByte() & 255); state = State.READING; + if ((length == 0) && (buffer.readableBytes() == 0)) { - ctx.getChannel().write(ACK.slice()); + ctx.writeAndFlush(new DatagramPacket(ACK.retain(1).resetReaderIndex(), message.sender())); state = State.WAIT_FOR_FIRST_BYTE_LENGTH; } break; + case READING: int remaining = buffer.readableBytes(); + if (length > remaining) { length -= remaining; buffer.skipBytes(remaining); } else { buffer.skipBytes(length); - SocketAddress remoteAddress = e.getRemoteAddress(); - ctx.getChannel().write(ACK.slice(), remoteAddress); + ctx.writeAndFlush(new DatagramPacket(ACK.retain(1).resetReaderIndex(), message.sender())); state = State.WAIT_FOR_FIRST_BYTE_LENGTH; length = 0; } - } } - setAttribute(ctx, STATE_ATTRIBUTE, state); - setAttribute(ctx, LENGTH_ATTRIBUTE, length); } - } - @Override - public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - allChannels.remove(ctx.getChannel()); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { - e.getCause().printStackTrace(); - } + ctx.attr(STATE_ATTRIBUTE).set(state); + ctx.attr(LENGTH_ATTRIBUTE).set(length); + } }); } }); - allChannels.add(bootstrap.bind(new InetSocketAddress(port))); + bootstrap.bind(port); } /** * {@inheritedDoc} */ public void stop() throws IOException { - allChannels.disconnect().awaitUninterruptibly(); - factory.shutdown(); - factory.releaseExternalResources(); + bootstrap.group().shutdownGracefully(); } } http://git-wip-us.apache.org/repos/asf/mina/blob/363f108c/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 99bc8af..5f94a3a 100644 --- a/pom.xml +++ b/pom.xml @@ -116,6 +116,7 @@ <module>thrift</module> <module>protobuf</module> <module>benchmarks</module> + <module>benchmarks2</module> </modules> <dependencyManagement>
