Repository: qpid-jms Updated Branches: refs/heads/master 997c5aec5 -> 72ab8a358
more extensive test case. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/72ab8a35 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/72ab8a35 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/72ab8a35 Branch: refs/heads/master Commit: 72ab8a3585b362b5c918d307c36c7ac6d70d35e7 Parents: 997c5ae Author: Timothy Bish <[email protected]> Authored: Fri Jan 16 12:54:22 2015 -0500 Committer: Timothy Bish <[email protected]> Committed: Fri Jan 16 12:57:14 2015 -0500 ---------------------------------------------------------------------- .../jms/test/netty/NettyTcpTransportTest.java | 111 +++++++++++++++++-- 1 file changed, 100 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/72ab8a35/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java index 7e1e497..448b1be 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java @@ -45,6 +45,8 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransportTest.class); + private static final int SEND_BYTE_COUNT = 1024; + private boolean transportClosed; private final List<Throwable> exceptions = new ArrayList<Throwable>(); private final List<ByteBuf> data = new ArrayList<ByteBuf>(); @@ -80,6 +82,86 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { } @Test(timeout = 60 * 1000) + public void testMultipleConnectionsToServer() throws Exception { + final int CONNECTION_COUNT = 25; + + try (NettyEchoServer server = new NettyEchoServer()) { + server.start(); + + int port = server.getServerPort(); + URI serverLocation = new URI("tcp://localhost:" + port); + + List<NettyTcpTransport> transports = new ArrayList<NettyTcpTransport>(); + + for (int i = 0; i < CONNECTION_COUNT; ++i) { + NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions); + try { + transport.connect(); + assertTrue(transport.isConnected()); + LOG.info("Connected to test server."); + transports.add(transport); + } catch (Exception e) { + fail("Should have connected to the server"); + } + } + + for (NettyTcpTransport transport : transports) { + transport.close(); + } + } + + assertTrue(!transportClosed); // Normal shutdown does not trigger the event. + assertTrue(exceptions.isEmpty()); + assertTrue(data.isEmpty()); + } + + @Test(timeout = 60 * 1000) + public void testMultipleConnectionsSendReceive() throws Exception { + final int CONNECTION_COUNT = 25; + final int FRAME_SIZE = 8; + + ByteBuf sendBuffer = Unpooled.buffer(FRAME_SIZE); + for (int i = 0; i < 8; ++i) { + sendBuffer.writeByte('A'); + } + + try (NettyEchoServer server = new NettyEchoServer()) { + server.start(); + + int port = server.getServerPort(); + URI serverLocation = new URI("tcp://localhost:" + port); + + List<NettyTcpTransport> transports = new ArrayList<NettyTcpTransport>(); + + for (int i = 0; i < CONNECTION_COUNT; ++i) { + NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions); + try { + transport.connect(); + transport.send(sendBuffer.copy()); + transports.add(transport); + } catch (Exception e) { + fail("Should have connected to the server"); + } + } + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + LOG.debug("Checking completion: read {} expecting {}", bytesRead.get(), (FRAME_SIZE * CONNECTION_COUNT)); + return bytesRead.get() == (FRAME_SIZE * CONNECTION_COUNT); + } + })); + + for (NettyTcpTransport transport : transports) { + transport.close(); + } + } + + assertTrue(exceptions.isEmpty()); + } + + @Test(timeout = 60 * 1000) public void testDetectServerClose() throws Exception { NettyTcpTransport transport = null; @@ -122,14 +204,13 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { @Test(timeout = 60 * 1000) public void testDataSentIsReceived() throws Exception { - final int SEND_BYTE_COUNT = 1024; - try (NettyEchoServer server = new NettyEchoServer()) { server.start(); int port = server.getServerPort(); URI serverLocation = new URI("tcp://localhost:" + port); + NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions); try { transport.connect(); @@ -164,10 +245,18 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { assertTrue(exceptions.isEmpty()); } + @Test(timeout = 60 * 1000) public void testMultipleDataPacketsSentAreReceived() throws Exception { - final int SEND_BYTE_COUNT = 1024; - final int SEND_PACKETS_COUNT = 3; + doMultipleDataPacketsSentAndReceive(SEND_BYTE_COUNT, 1); + } + + @Test(timeout = 60 * 1000) + public void testMultipleDataPacketsSentAreReceivedRepeatedly() throws Exception { + doMultipleDataPacketsSentAndReceive(SEND_BYTE_COUNT, 10); + } + + public void doMultipleDataPacketsSentAndReceive(final int byteCount, final int iterations) throws Exception { try (NettyEchoServer server = new NettyEchoServer()) { server.start(); @@ -185,12 +274,12 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { assertTrue(transport.isConnected()); - ByteBuf sendBuffer = Unpooled.buffer(SEND_BYTE_COUNT); - for (int i = 0; i < SEND_BYTE_COUNT; ++i) { + ByteBuf sendBuffer = Unpooled.buffer(byteCount); + for (int i = 0; i < byteCount; ++i) { sendBuffer.writeByte('A'); } - for (int i = 0; i < SEND_PACKETS_COUNT; ++i) { + for (int i = 0; i < iterations; ++i) { transport.send(sendBuffer.copy()); } @@ -198,7 +287,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { @Override public boolean isSatisified() throws Exception { - return bytesRead.get() != SEND_BYTE_COUNT * SEND_PACKETS_COUNT; + return bytesRead.get() == (byteCount * iterations); } })); @@ -244,20 +333,20 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { @Override public void onData(ByteBuf incoming) { - LOG.info("Client has new incoming data of size: {}", incoming.readableBytes()); + LOG.debug("Client has new incoming data of size: {}", incoming.readableBytes()); data.add(incoming); bytesRead.addAndGet(incoming.readableBytes()); } @Override public void onTransportClosed() { - LOG.info("Transport reports that it has closed."); + LOG.debug("Transport reports that it has closed."); transportClosed = true; } @Override public void onTransportError(Throwable cause) { - LOG.info("Transport error caught: {}", cause.getMessage()); + LOG.debug("Transport error caught: {}", cause.getMessage()); exceptions.add(cause); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
