Repository: qpid-jms Updated Branches: refs/heads/master 3d31992d5 -> 9d2ed0d66
move these to a matching test package Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/10981041 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/10981041 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/10981041 Branch: refs/heads/master Commit: 10981041d41b2d7455813f90008d5f8efb98205b Parents: 3d31992 Author: Timothy Bish <[email protected]> Authored: Thu Jan 22 15:12:00 2015 -0500 Committer: Timothy Bish <[email protected]> Committed: Thu Jan 22 15:12:00 2015 -0500 ---------------------------------------------------------------------- .../qpid/jms/test/netty/NettyEchoServer.java | 155 -------- .../jms/test/netty/NettyTcpTransportTest.java | 353 ------------------- .../jms/transports/netty/NettyEchoServer.java | 155 ++++++++ .../transports/netty/NettyTcpTransportTest.java | 353 +++++++++++++++++++ 4 files changed, 508 insertions(+), 508 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/10981041/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java deleted file mode 100644 index 42bc535..0000000 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java +++ /dev/null @@ -1,155 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.qpid.jms.test.netty; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.logging.LoggingHandler; -import io.netty.util.concurrent.Future; - -import java.io.IOException; -import java.net.ServerSocket; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.net.ServerSocketFactory; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Simple Netty Server used to echo all data. - */ -public class NettyEchoServer implements AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(NettyEchoServer.class); - - static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); - static final int TIMEOUT = 5000; - - private EventLoopGroup bossGroup; - private EventLoopGroup workerGroup; - private Channel serverChannel; - private int serverPort; - - private final AtomicBoolean started = new AtomicBoolean(); - - public void start() throws Exception { - - if (started.compareAndSet(false, true)) { - - // Configure the server. - bossGroup = new NioEventLoopGroup(1); - workerGroup = new NioEventLoopGroup(); - - ServerBootstrap server = new ServerBootstrap(); - server.group(bossGroup, workerGroup); - server.channel(NioServerSocketChannel.class); - server.option(ChannelOption.SO_BACKLOG, 100); - server.handler(new LoggingHandler(LogLevel.INFO)); - server.childHandler(new ChannelInitializer<Channel>() { - @Override - public void initChannel(Channel ch) throws Exception { - ch.pipeline().addLast(new EchoServerHandler()); - } - }); - - // Start the server. - serverChannel = server.bind(getServerPort()).sync().channel(); - } - } - - public void stop() throws InterruptedException { - if (started.compareAndSet(true, false)) { - try { - LOG.info("Syncing channel close"); - serverChannel.close().sync(); - } catch (InterruptedException e) { - } - // Shut down all event loops to terminate all threads. - LOG.info("Shutting down boss group"); - Future<?> bossFuture = bossGroup.shutdownGracefully(10, TIMEOUT, TimeUnit.MILLISECONDS); - LOG.info("Shutting down worker group"); - Future<?> workerFuture = workerGroup.shutdownGracefully(10, TIMEOUT, TimeUnit.MILLISECONDS); - - LOG.info("Awaiting boss group shutdown"); - boolean bossShutdown = bossFuture.await(TIMEOUT + 500); - - LOG.info("Awaiting worker group shutdown"); - boolean workerShutdown = workerFuture.await(TIMEOUT + 500); - - if (!bossShutdown) { - throw new RuntimeException("Failed to shut down bossGroup in allotted time"); - } - if (!workerShutdown) { - throw new RuntimeException("Failed to shut down workerGroup in allotted time"); - } - } - } - - @Override - public void close() throws InterruptedException { - stop(); - } - - public int getServerPort() { - if (serverPort == 0) { - ServerSocket ss = null; - try { - ss = ServerSocketFactory.getDefault().createServerSocket(0); - serverPort = ss.getLocalPort(); - } catch (IOException e) { // revert back to default - serverPort = PORT; - } finally { - try { - if (ss != null ) { - ss.close(); - } - } catch (IOException e) { // ignore - } - } - } - return serverPort; - } - - private class EchoServerHandler extends ChannelInboundHandlerAdapter { - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - ctx.write(msg); - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) { - ctx.flush(); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - // Close the connection when an exception is raised. - cause.printStackTrace(); - ctx.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/10981041/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java deleted file mode 100644 index 448b1be..0000000 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java +++ /dev/null @@ -1,353 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.qpid.jms.test.netty; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; - -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.qpid.jms.test.QpidJmsTestCase; -import org.apache.qpid.jms.test.Wait; -import org.apache.qpid.jms.transports.TransportListener; -import org.apache.qpid.jms.transports.TransportOptions; -import org.apache.qpid.jms.transports.netty.NettyTcpTransport; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Test basic functionality of the Netty based TCP transport. - */ -public class NettyTcpTransportTest extends QpidJmsTestCase { - - private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransportTest.class); - - private static final int SEND_BYTE_COUNT = 1024; - - private boolean transportClosed; - private final List<Throwable> exceptions = new ArrayList<Throwable>(); - private final List<ByteBuf> data = new ArrayList<ByteBuf>(); - private final AtomicInteger bytesRead = new AtomicInteger(); - - private final TransportListener testListener = new NettyTransportListener(); - private final TransportOptions testOptions = new TransportOptions(); - - @Test(timeout = 60 * 1000) - public void testConnectToServer() throws Exception { - try (NettyEchoServer server = new NettyEchoServer()) { - server.start(); - - int port = server.getServerPort(); - URI serverLocation = new URI("tcp://localhost:" + port); - - NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions); - try { - transport.connect(); - LOG.info("Connected to test server."); - } catch (Exception e) { - fail("Should have connected to the server"); - } - - assertTrue(transport.isConnected()); - - transport.close(); - } - - assertTrue(!transportClosed); // Normal shutdown does not trigger the event. - assertTrue(exceptions.isEmpty()); - assertTrue(data.isEmpty()); - } - - @Test(timeout = 60 * 1000) - public void testMultipleConnectionsToServer() throws Exception { - final int CONNECTION_COUNT = 25; - - try (NettyEchoServer server = new NettyEchoServer()) { - server.start(); - - int port = server.getServerPort(); - URI serverLocation = new URI("tcp://localhost:" + port); - - List<NettyTcpTransport> transports = new ArrayList<NettyTcpTransport>(); - - for (int i = 0; i < CONNECTION_COUNT; ++i) { - NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions); - try { - transport.connect(); - assertTrue(transport.isConnected()); - LOG.info("Connected to test server."); - transports.add(transport); - } catch (Exception e) { - fail("Should have connected to the server"); - } - } - - for (NettyTcpTransport transport : transports) { - transport.close(); - } - } - - assertTrue(!transportClosed); // Normal shutdown does not trigger the event. - assertTrue(exceptions.isEmpty()); - assertTrue(data.isEmpty()); - } - - @Test(timeout = 60 * 1000) - public void testMultipleConnectionsSendReceive() throws Exception { - final int CONNECTION_COUNT = 25; - final int FRAME_SIZE = 8; - - ByteBuf sendBuffer = Unpooled.buffer(FRAME_SIZE); - for (int i = 0; i < 8; ++i) { - sendBuffer.writeByte('A'); - } - - try (NettyEchoServer server = new NettyEchoServer()) { - server.start(); - - int port = server.getServerPort(); - URI serverLocation = new URI("tcp://localhost:" + port); - - List<NettyTcpTransport> transports = new ArrayList<NettyTcpTransport>(); - - for (int i = 0; i < CONNECTION_COUNT; ++i) { - NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions); - try { - transport.connect(); - transport.send(sendBuffer.copy()); - transports.add(transport); - } catch (Exception e) { - fail("Should have connected to the server"); - } - } - - assertTrue(Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - LOG.debug("Checking completion: read {} expecting {}", bytesRead.get(), (FRAME_SIZE * CONNECTION_COUNT)); - return bytesRead.get() == (FRAME_SIZE * CONNECTION_COUNT); - } - })); - - for (NettyTcpTransport transport : transports) { - transport.close(); - } - } - - assertTrue(exceptions.isEmpty()); - } - - @Test(timeout = 60 * 1000) - public void testDetectServerClose() throws Exception { - NettyTcpTransport transport = null; - - try (NettyEchoServer server = new NettyEchoServer()) { - server.start(); - - int port = server.getServerPort(); - URI serverLocation = new URI("tcp://localhost:" + port); - - transport = new NettyTcpTransport(testListener, serverLocation, testOptions); - try { - transport.connect(); - LOG.info("Connected to test server."); - } catch (Exception e) { - fail("Should have connected to the server"); - } - - assertTrue(transport.isConnected()); - - server.close(); - } - - assertTrue(Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return transportClosed; - } - })); - assertTrue(exceptions.isEmpty()); - assertTrue(data.isEmpty()); - assertFalse(transport.isConnected()); - - try { - transport.close(); - } catch (Exception ex) { - fail("Close of a disconnect transport should not generate errors"); - } - } - - @Test(timeout = 60 * 1000) - public void testDataSentIsReceived() throws Exception { - try (NettyEchoServer server = new NettyEchoServer()) { - server.start(); - - int port = server.getServerPort(); - URI serverLocation = new URI("tcp://localhost:" + port); - - - NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions); - try { - transport.connect(); - LOG.info("Connected to test server."); - } catch (Exception e) { - fail("Should have connected to the server"); - } - - assertTrue(transport.isConnected()); - - ByteBuf sendBuffer = Unpooled.buffer(SEND_BYTE_COUNT); - for (int i = 0; i < SEND_BYTE_COUNT; ++i) { - sendBuffer.writeByte('A'); - } - - transport.send(sendBuffer); - - assertTrue(Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return !data.isEmpty(); - } - })); - - assertEquals(SEND_BYTE_COUNT, data.get(0).readableBytes()); - - transport.close(); - } - - assertTrue(!transportClosed); // Normal shutdown does not trigger the event. - assertTrue(exceptions.isEmpty()); - } - - - @Test(timeout = 60 * 1000) - public void testMultipleDataPacketsSentAreReceived() throws Exception { - doMultipleDataPacketsSentAndReceive(SEND_BYTE_COUNT, 1); - } - - @Test(timeout = 60 * 1000) - public void testMultipleDataPacketsSentAreReceivedRepeatedly() throws Exception { - doMultipleDataPacketsSentAndReceive(SEND_BYTE_COUNT, 10); - } - - public void doMultipleDataPacketsSentAndReceive(final int byteCount, final int iterations) throws Exception { - - try (NettyEchoServer server = new NettyEchoServer()) { - server.start(); - - int port = server.getServerPort(); - URI serverLocation = new URI("tcp://localhost:" + port); - - NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions); - try { - transport.connect(); - LOG.info("Connected to test server."); - } catch (Exception e) { - fail("Should have connected to the server"); - } - - assertTrue(transport.isConnected()); - - ByteBuf sendBuffer = Unpooled.buffer(byteCount); - for (int i = 0; i < byteCount; ++i) { - sendBuffer.writeByte('A'); - } - - for (int i = 0; i < iterations; ++i) { - transport.send(sendBuffer.copy()); - } - - assertTrue(Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return bytesRead.get() == (byteCount * iterations); - } - })); - - transport.close(); - } - - assertTrue(!transportClosed); // Normal shutdown does not trigger the event. - assertTrue(exceptions.isEmpty()); - } - - @Test(timeout = 60 * 1000) - public void testSendToClosedTransportFails() throws Exception { - NettyTcpTransport transport = null; - - try (NettyEchoServer server = new NettyEchoServer()) { - server.start(); - - int port = server.getServerPort(); - URI serverLocation = new URI("tcp://localhost:" + port); - - transport = new NettyTcpTransport(testListener, serverLocation, testOptions); - try { - transport.connect(); - LOG.info("Connected to test server."); - } catch (Exception e) { - fail("Should have connected to the server"); - } - - assertTrue(transport.isConnected()); - - transport.close(); - - ByteBuf sendBuffer = Unpooled.buffer(10); - try { - transport.send(sendBuffer); - fail("Should throw on send of closed transport"); - } catch (IOException ex) { - } - } - } - - private class NettyTransportListener implements TransportListener { - - @Override - public void onData(ByteBuf incoming) { - LOG.debug("Client has new incoming data of size: {}", incoming.readableBytes()); - data.add(incoming); - bytesRead.addAndGet(incoming.readableBytes()); - } - - @Override - public void onTransportClosed() { - LOG.debug("Transport reports that it has closed."); - transportClosed = true; - } - - @Override - public void onTransportError(Throwable cause) { - LOG.debug("Transport error caught: {}", cause.getMessage()); - exceptions.add(cause); - } - } -} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/10981041/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java new file mode 100644 index 0000000..592ed1d --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.netty; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.util.concurrent.Future; + +import java.io.IOException; +import java.net.ServerSocket; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.net.ServerSocketFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple Netty Server used to echo all data. + */ +public class NettyEchoServer implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(NettyEchoServer.class); + + static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); + static final int TIMEOUT = 5000; + + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + private Channel serverChannel; + private int serverPort; + + private final AtomicBoolean started = new AtomicBoolean(); + + public void start() throws Exception { + + if (started.compareAndSet(false, true)) { + + // Configure the server. + bossGroup = new NioEventLoopGroup(1); + workerGroup = new NioEventLoopGroup(); + + ServerBootstrap server = new ServerBootstrap(); + server.group(bossGroup, workerGroup); + server.channel(NioServerSocketChannel.class); + server.option(ChannelOption.SO_BACKLOG, 100); + server.handler(new LoggingHandler(LogLevel.INFO)); + server.childHandler(new ChannelInitializer<Channel>() { + @Override + public void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast(new EchoServerHandler()); + } + }); + + // Start the server. + serverChannel = server.bind(getServerPort()).sync().channel(); + } + } + + public void stop() throws InterruptedException { + if (started.compareAndSet(true, false)) { + try { + LOG.info("Syncing channel close"); + serverChannel.close().sync(); + } catch (InterruptedException e) { + } + // Shut down all event loops to terminate all threads. + LOG.info("Shutting down boss group"); + Future<?> bossFuture = bossGroup.shutdownGracefully(10, TIMEOUT, TimeUnit.MILLISECONDS); + LOG.info("Shutting down worker group"); + Future<?> workerFuture = workerGroup.shutdownGracefully(10, TIMEOUT, TimeUnit.MILLISECONDS); + + LOG.info("Awaiting boss group shutdown"); + boolean bossShutdown = bossFuture.await(TIMEOUT + 500); + + LOG.info("Awaiting worker group shutdown"); + boolean workerShutdown = workerFuture.await(TIMEOUT + 500); + + if (!bossShutdown) { + throw new RuntimeException("Failed to shut down bossGroup in allotted time"); + } + if (!workerShutdown) { + throw new RuntimeException("Failed to shut down workerGroup in allotted time"); + } + } + } + + @Override + public void close() throws InterruptedException { + stop(); + } + + public int getServerPort() { + if (serverPort == 0) { + ServerSocket ss = null; + try { + ss = ServerSocketFactory.getDefault().createServerSocket(0); + serverPort = ss.getLocalPort(); + } catch (IOException e) { // revert back to default + serverPort = PORT; + } finally { + try { + if (ss != null ) { + ss.close(); + } + } catch (IOException e) { // ignore + } + } + } + return serverPort; + } + + private class EchoServerHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ctx.write(msg); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + // Close the connection when an exception is raised. + cause.printStackTrace(); + ctx.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/10981041/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java new file mode 100644 index 0000000..e65a1b7 --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java @@ -0,0 +1,353 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.netty; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.qpid.jms.test.QpidJmsTestCase; +import org.apache.qpid.jms.test.Wait; +import org.apache.qpid.jms.transports.TransportListener; +import org.apache.qpid.jms.transports.TransportOptions; +import org.apache.qpid.jms.transports.netty.NettyTcpTransport; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test basic functionality of the Netty based TCP transport. + */ +public class NettyTcpTransportTest extends QpidJmsTestCase { + + private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransportTest.class); + + private static final int SEND_BYTE_COUNT = 1024; + + private boolean transportClosed; + private final List<Throwable> exceptions = new ArrayList<Throwable>(); + private final List<ByteBuf> data = new ArrayList<ByteBuf>(); + private final AtomicInteger bytesRead = new AtomicInteger(); + + private final TransportListener testListener = new NettyTransportListener(); + private final TransportOptions testOptions = new TransportOptions(); + + @Test(timeout = 60 * 1000) + public void testConnectToServer() throws Exception { + try (NettyEchoServer server = new NettyEchoServer()) { + server.start(); + + int port = server.getServerPort(); + URI serverLocation = new URI("tcp://localhost:" + port); + + NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions); + try { + transport.connect(); + LOG.info("Connected to test server."); + } catch (Exception e) { + fail("Should have connected to the server"); + } + + assertTrue(transport.isConnected()); + + transport.close(); + } + + assertTrue(!transportClosed); // Normal shutdown does not trigger the event. + assertTrue(exceptions.isEmpty()); + assertTrue(data.isEmpty()); + } + + @Test(timeout = 60 * 1000) + public void testMultipleConnectionsToServer() throws Exception { + final int CONNECTION_COUNT = 25; + + try (NettyEchoServer server = new NettyEchoServer()) { + server.start(); + + int port = server.getServerPort(); + URI serverLocation = new URI("tcp://localhost:" + port); + + List<NettyTcpTransport> transports = new ArrayList<NettyTcpTransport>(); + + for (int i = 0; i < CONNECTION_COUNT; ++i) { + NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions); + try { + transport.connect(); + assertTrue(transport.isConnected()); + LOG.info("Connected to test server."); + transports.add(transport); + } catch (Exception e) { + fail("Should have connected to the server"); + } + } + + for (NettyTcpTransport transport : transports) { + transport.close(); + } + } + + assertTrue(!transportClosed); // Normal shutdown does not trigger the event. + assertTrue(exceptions.isEmpty()); + assertTrue(data.isEmpty()); + } + + @Test(timeout = 60 * 1000) + public void testMultipleConnectionsSendReceive() throws Exception { + final int CONNECTION_COUNT = 25; + final int FRAME_SIZE = 8; + + ByteBuf sendBuffer = Unpooled.buffer(FRAME_SIZE); + for (int i = 0; i < 8; ++i) { + sendBuffer.writeByte('A'); + } + + try (NettyEchoServer server = new NettyEchoServer()) { + server.start(); + + int port = server.getServerPort(); + URI serverLocation = new URI("tcp://localhost:" + port); + + List<NettyTcpTransport> transports = new ArrayList<NettyTcpTransport>(); + + for (int i = 0; i < CONNECTION_COUNT; ++i) { + NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions); + try { + transport.connect(); + transport.send(sendBuffer.copy()); + transports.add(transport); + } catch (Exception e) { + fail("Should have connected to the server"); + } + } + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + LOG.debug("Checking completion: read {} expecting {}", bytesRead.get(), (FRAME_SIZE * CONNECTION_COUNT)); + return bytesRead.get() == (FRAME_SIZE * CONNECTION_COUNT); + } + })); + + for (NettyTcpTransport transport : transports) { + transport.close(); + } + } + + assertTrue(exceptions.isEmpty()); + } + + @Test(timeout = 60 * 1000) + public void testDetectServerClose() throws Exception { + NettyTcpTransport transport = null; + + try (NettyEchoServer server = new NettyEchoServer()) { + server.start(); + + int port = server.getServerPort(); + URI serverLocation = new URI("tcp://localhost:" + port); + + transport = new NettyTcpTransport(testListener, serverLocation, testOptions); + try { + transport.connect(); + LOG.info("Connected to test server."); + } catch (Exception e) { + fail("Should have connected to the server"); + } + + assertTrue(transport.isConnected()); + + server.close(); + } + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return transportClosed; + } + })); + assertTrue(exceptions.isEmpty()); + assertTrue(data.isEmpty()); + assertFalse(transport.isConnected()); + + try { + transport.close(); + } catch (Exception ex) { + fail("Close of a disconnect transport should not generate errors"); + } + } + + @Test(timeout = 60 * 1000) + public void testDataSentIsReceived() throws Exception { + try (NettyEchoServer server = new NettyEchoServer()) { + server.start(); + + int port = server.getServerPort(); + URI serverLocation = new URI("tcp://localhost:" + port); + + + NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions); + try { + transport.connect(); + LOG.info("Connected to test server."); + } catch (Exception e) { + fail("Should have connected to the server"); + } + + assertTrue(transport.isConnected()); + + ByteBuf sendBuffer = Unpooled.buffer(SEND_BYTE_COUNT); + for (int i = 0; i < SEND_BYTE_COUNT; ++i) { + sendBuffer.writeByte('A'); + } + + transport.send(sendBuffer); + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return !data.isEmpty(); + } + })); + + assertEquals(SEND_BYTE_COUNT, data.get(0).readableBytes()); + + transport.close(); + } + + assertTrue(!transportClosed); // Normal shutdown does not trigger the event. + assertTrue(exceptions.isEmpty()); + } + + + @Test(timeout = 60 * 1000) + public void testMultipleDataPacketsSentAreReceived() throws Exception { + doMultipleDataPacketsSentAndReceive(SEND_BYTE_COUNT, 1); + } + + @Test(timeout = 60 * 1000) + public void testMultipleDataPacketsSentAreReceivedRepeatedly() throws Exception { + doMultipleDataPacketsSentAndReceive(SEND_BYTE_COUNT, 10); + } + + public void doMultipleDataPacketsSentAndReceive(final int byteCount, final int iterations) throws Exception { + + try (NettyEchoServer server = new NettyEchoServer()) { + server.start(); + + int port = server.getServerPort(); + URI serverLocation = new URI("tcp://localhost:" + port); + + NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions); + try { + transport.connect(); + LOG.info("Connected to test server."); + } catch (Exception e) { + fail("Should have connected to the server"); + } + + assertTrue(transport.isConnected()); + + ByteBuf sendBuffer = Unpooled.buffer(byteCount); + for (int i = 0; i < byteCount; ++i) { + sendBuffer.writeByte('A'); + } + + for (int i = 0; i < iterations; ++i) { + transport.send(sendBuffer.copy()); + } + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return bytesRead.get() == (byteCount * iterations); + } + })); + + transport.close(); + } + + assertTrue(!transportClosed); // Normal shutdown does not trigger the event. + assertTrue(exceptions.isEmpty()); + } + + @Test(timeout = 60 * 1000) + public void testSendToClosedTransportFails() throws Exception { + NettyTcpTransport transport = null; + + try (NettyEchoServer server = new NettyEchoServer()) { + server.start(); + + int port = server.getServerPort(); + URI serverLocation = new URI("tcp://localhost:" + port); + + transport = new NettyTcpTransport(testListener, serverLocation, testOptions); + try { + transport.connect(); + LOG.info("Connected to test server."); + } catch (Exception e) { + fail("Should have connected to the server"); + } + + assertTrue(transport.isConnected()); + + transport.close(); + + ByteBuf sendBuffer = Unpooled.buffer(10); + try { + transport.send(sendBuffer); + fail("Should throw on send of closed transport"); + } catch (IOException ex) { + } + } + } + + private class NettyTransportListener implements TransportListener { + + @Override + public void onData(ByteBuf incoming) { + LOG.debug("Client has new incoming data of size: {}", incoming.readableBytes()); + data.add(incoming); + bytesRead.addAndGet(incoming.readableBytes()); + } + + @Override + public void onTransportClosed() { + LOG.debug("Transport reports that it has closed."); + transportClosed = true; + } + + @Override + public void onTransportError(Throwable cause) { + LOG.debug("Transport error caught: {}", cause.getMessage()); + exceptions.add(cause); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
