Repository: qpid-jms
Updated Branches:
  refs/heads/master 3d31992d5 -> 9d2ed0d66


move these to a matching test package

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/10981041
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/10981041
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/10981041

Branch: refs/heads/master
Commit: 10981041d41b2d7455813f90008d5f8efb98205b
Parents: 3d31992
Author: Timothy Bish <[email protected]>
Authored: Thu Jan 22 15:12:00 2015 -0500
Committer: Timothy Bish <[email protected]>
Committed: Thu Jan 22 15:12:00 2015 -0500

----------------------------------------------------------------------
 .../qpid/jms/test/netty/NettyEchoServer.java    | 155 --------
 .../jms/test/netty/NettyTcpTransportTest.java   | 353 -------------------
 .../jms/transports/netty/NettyEchoServer.java   | 155 ++++++++
 .../transports/netty/NettyTcpTransportTest.java | 353 +++++++++++++++++++
 4 files changed, 508 insertions(+), 508 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/10981041/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java
deleted file mode 100644
index 42bc535..0000000
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.test.netty;
-
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.logging.LogLevel;
-import io.netty.handler.logging.LoggingHandler;
-import io.netty.util.concurrent.Future;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.net.ServerSocketFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Simple Netty Server used to echo all data.
- */
-public class NettyEchoServer implements AutoCloseable {
-    private static final Logger LOG = 
LoggerFactory.getLogger(NettyEchoServer.class);
-
-    static final int PORT = Integer.parseInt(System.getProperty("port", 
"8007"));
-    static final int TIMEOUT = 5000;
-
-    private EventLoopGroup bossGroup;
-    private EventLoopGroup workerGroup;
-    private Channel serverChannel;
-    private int serverPort;
-
-    private final AtomicBoolean started = new AtomicBoolean();
-
-    public void start() throws Exception {
-
-        if (started.compareAndSet(false, true)) {
-
-            // Configure the server.
-            bossGroup = new NioEventLoopGroup(1);
-            workerGroup = new NioEventLoopGroup();
-
-            ServerBootstrap server = new ServerBootstrap();
-            server.group(bossGroup, workerGroup);
-            server.channel(NioServerSocketChannel.class);
-            server.option(ChannelOption.SO_BACKLOG, 100);
-            server.handler(new LoggingHandler(LogLevel.INFO));
-            server.childHandler(new ChannelInitializer<Channel>() {
-                @Override
-                public void initChannel(Channel ch) throws Exception {
-                    ch.pipeline().addLast(new EchoServerHandler());
-                }
-            });
-
-            // Start the server.
-            serverChannel = server.bind(getServerPort()).sync().channel();
-        }
-    }
-
-    public void stop() throws InterruptedException {
-        if (started.compareAndSet(true, false)) {
-            try {
-                LOG.info("Syncing channel close");
-                serverChannel.close().sync();
-            } catch (InterruptedException e) {
-            }
-            // Shut down all event loops to terminate all threads.
-            LOG.info("Shutting down boss group");
-            Future<?> bossFuture = bossGroup.shutdownGracefully(10, TIMEOUT, 
TimeUnit.MILLISECONDS);
-            LOG.info("Shutting down worker group");
-            Future<?> workerFuture = workerGroup.shutdownGracefully(10, 
TIMEOUT, TimeUnit.MILLISECONDS);
-
-            LOG.info("Awaiting boss group shutdown");
-            boolean bossShutdown = bossFuture.await(TIMEOUT + 500);
-
-            LOG.info("Awaiting worker group shutdown");
-            boolean workerShutdown = workerFuture.await(TIMEOUT + 500);
-
-            if (!bossShutdown) {
-                throw new RuntimeException("Failed to shut down bossGroup in 
allotted time");
-            }
-            if (!workerShutdown) {
-                throw new RuntimeException("Failed to shut down workerGroup in 
allotted time");
-            }
-        }
-    }
-
-    @Override
-    public void close() throws InterruptedException {
-        stop();
-    }
-
-    public int getServerPort() {
-        if (serverPort == 0) {
-            ServerSocket ss = null;
-            try {
-                ss = ServerSocketFactory.getDefault().createServerSocket(0);
-                serverPort = ss.getLocalPort();
-            } catch (IOException e) { // revert back to default
-                serverPort = PORT;
-            } finally {
-                try {
-                    if (ss != null ) {
-                        ss.close();
-                    }
-                } catch (IOException e) { // ignore
-                }
-            }
-        }
-        return serverPort;
-    }
-
-    private class EchoServerHandler extends ChannelInboundHandlerAdapter {
-
-        @Override
-        public void channelRead(ChannelHandlerContext ctx, Object msg) {
-            ctx.write(msg);
-        }
-
-        @Override
-        public void channelReadComplete(ChannelHandlerContext ctx) {
-            ctx.flush();
-        }
-
-        @Override
-        public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) {
-            // Close the connection when an exception is raised.
-            cause.printStackTrace();
-            ctx.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/10981041/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
deleted file mode 100644
index 448b1be..0000000
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
+++ /dev/null
@@ -1,353 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.test.netty;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.qpid.jms.test.QpidJmsTestCase;
-import org.apache.qpid.jms.test.Wait;
-import org.apache.qpid.jms.transports.TransportListener;
-import org.apache.qpid.jms.transports.TransportOptions;
-import org.apache.qpid.jms.transports.netty.NettyTcpTransport;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test basic functionality of the Netty based TCP transport.
- */
-public class NettyTcpTransportTest extends QpidJmsTestCase {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(NettyTcpTransportTest.class);
-
-    private static final int SEND_BYTE_COUNT = 1024;
-
-    private boolean transportClosed;
-    private final List<Throwable> exceptions = new ArrayList<Throwable>();
-    private final List<ByteBuf> data = new ArrayList<ByteBuf>();
-    private final AtomicInteger bytesRead = new AtomicInteger();
-
-    private final TransportListener testListener = new 
NettyTransportListener();
-    private final TransportOptions testOptions = new TransportOptions();
-
-    @Test(timeout = 60 * 1000)
-    public void testConnectToServer() throws Exception {
-        try (NettyEchoServer server = new NettyEchoServer()) {
-            server.start();
-
-            int port = server.getServerPort();
-            URI serverLocation = new URI("tcp://localhost:" + port);
-
-            NettyTcpTransport transport = new NettyTcpTransport(testListener, 
serverLocation, testOptions);
-            try {
-                transport.connect();
-                LOG.info("Connected to test server.");
-            } catch (Exception e) {
-                fail("Should have connected to the server");
-            }
-
-            assertTrue(transport.isConnected());
-
-            transport.close();
-        }
-
-        assertTrue(!transportClosed);  // Normal shutdown does not trigger the 
event.
-        assertTrue(exceptions.isEmpty());
-        assertTrue(data.isEmpty());
-    }
-
-    @Test(timeout = 60 * 1000)
-    public void testMultipleConnectionsToServer() throws Exception {
-        final int CONNECTION_COUNT = 25;
-
-        try (NettyEchoServer server = new NettyEchoServer()) {
-            server.start();
-
-            int port = server.getServerPort();
-            URI serverLocation = new URI("tcp://localhost:" + port);
-
-            List<NettyTcpTransport> transports = new 
ArrayList<NettyTcpTransport>();
-
-            for (int i = 0; i < CONNECTION_COUNT; ++i) {
-                NettyTcpTransport transport = new 
NettyTcpTransport(testListener, serverLocation, testOptions);
-                try {
-                    transport.connect();
-                    assertTrue(transport.isConnected());
-                    LOG.info("Connected to test server.");
-                    transports.add(transport);
-                } catch (Exception e) {
-                    fail("Should have connected to the server");
-                }
-            }
-
-            for (NettyTcpTransport transport : transports) {
-                transport.close();
-            }
-        }
-
-        assertTrue(!transportClosed);  // Normal shutdown does not trigger the 
event.
-        assertTrue(exceptions.isEmpty());
-        assertTrue(data.isEmpty());
-    }
-
-    @Test(timeout = 60 * 1000)
-    public void testMultipleConnectionsSendReceive() throws Exception {
-        final int CONNECTION_COUNT = 25;
-        final int FRAME_SIZE = 8;
-
-        ByteBuf sendBuffer = Unpooled.buffer(FRAME_SIZE);
-        for (int i = 0; i < 8; ++i) {
-            sendBuffer.writeByte('A');
-        }
-
-        try (NettyEchoServer server = new NettyEchoServer()) {
-            server.start();
-
-            int port = server.getServerPort();
-            URI serverLocation = new URI("tcp://localhost:" + port);
-
-            List<NettyTcpTransport> transports = new 
ArrayList<NettyTcpTransport>();
-
-            for (int i = 0; i < CONNECTION_COUNT; ++i) {
-                NettyTcpTransport transport = new 
NettyTcpTransport(testListener, serverLocation, testOptions);
-                try {
-                    transport.connect();
-                    transport.send(sendBuffer.copy());
-                    transports.add(transport);
-                } catch (Exception e) {
-                    fail("Should have connected to the server");
-                }
-            }
-
-            assertTrue(Wait.waitFor(new Wait.Condition() {
-
-                @Override
-                public boolean isSatisified() throws Exception {
-                    LOG.debug("Checking completion: read {} expecting {}", 
bytesRead.get(), (FRAME_SIZE * CONNECTION_COUNT));
-                    return bytesRead.get() == (FRAME_SIZE * CONNECTION_COUNT);
-                }
-            }));
-
-            for (NettyTcpTransport transport : transports) {
-                transport.close();
-            }
-        }
-
-        assertTrue(exceptions.isEmpty());
-    }
-
-    @Test(timeout = 60 * 1000)
-    public void testDetectServerClose() throws Exception {
-        NettyTcpTransport transport = null;
-
-        try (NettyEchoServer server = new NettyEchoServer()) {
-            server.start();
-
-            int port = server.getServerPort();
-            URI serverLocation = new URI("tcp://localhost:" + port);
-
-            transport = new NettyTcpTransport(testListener, serverLocation, 
testOptions);
-            try {
-                transport.connect();
-                LOG.info("Connected to test server.");
-            } catch (Exception e) {
-                fail("Should have connected to the server");
-            }
-
-            assertTrue(transport.isConnected());
-
-            server.close();
-        }
-
-        assertTrue(Wait.waitFor(new Wait.Condition() {
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return transportClosed;
-            }
-        }));
-        assertTrue(exceptions.isEmpty());
-        assertTrue(data.isEmpty());
-        assertFalse(transport.isConnected());
-
-        try {
-            transport.close();
-        } catch (Exception ex) {
-            fail("Close of a disconnect transport should not generate errors");
-        }
-    }
-
-    @Test(timeout = 60 * 1000)
-    public void testDataSentIsReceived() throws Exception {
-        try (NettyEchoServer server = new NettyEchoServer()) {
-            server.start();
-
-            int port = server.getServerPort();
-            URI serverLocation = new URI("tcp://localhost:" + port);
-
-
-            NettyTcpTransport transport = new NettyTcpTransport(testListener, 
serverLocation, testOptions);
-            try {
-                transport.connect();
-                LOG.info("Connected to test server.");
-            } catch (Exception e) {
-                fail("Should have connected to the server");
-            }
-
-            assertTrue(transport.isConnected());
-
-            ByteBuf sendBuffer = Unpooled.buffer(SEND_BYTE_COUNT);
-            for (int i = 0; i < SEND_BYTE_COUNT; ++i) {
-                sendBuffer.writeByte('A');
-            }
-
-            transport.send(sendBuffer);
-
-            assertTrue(Wait.waitFor(new Wait.Condition() {
-
-                @Override
-                public boolean isSatisified() throws Exception {
-                    return !data.isEmpty();
-                }
-            }));
-
-            assertEquals(SEND_BYTE_COUNT, data.get(0).readableBytes());
-
-            transport.close();
-        }
-
-        assertTrue(!transportClosed);  // Normal shutdown does not trigger the 
event.
-        assertTrue(exceptions.isEmpty());
-    }
-
-
-    @Test(timeout = 60 * 1000)
-    public void testMultipleDataPacketsSentAreReceived() throws Exception {
-        doMultipleDataPacketsSentAndReceive(SEND_BYTE_COUNT, 1);
-    }
-
-    @Test(timeout = 60 * 1000)
-    public void testMultipleDataPacketsSentAreReceivedRepeatedly() throws 
Exception {
-        doMultipleDataPacketsSentAndReceive(SEND_BYTE_COUNT, 10);
-    }
-
-    public void doMultipleDataPacketsSentAndReceive(final int byteCount, final 
int iterations) throws Exception {
-
-        try (NettyEchoServer server = new NettyEchoServer()) {
-            server.start();
-
-            int port = server.getServerPort();
-            URI serverLocation = new URI("tcp://localhost:" + port);
-
-            NettyTcpTransport transport = new NettyTcpTransport(testListener, 
serverLocation, testOptions);
-            try {
-                transport.connect();
-                LOG.info("Connected to test server.");
-            } catch (Exception e) {
-                fail("Should have connected to the server");
-            }
-
-            assertTrue(transport.isConnected());
-
-            ByteBuf sendBuffer = Unpooled.buffer(byteCount);
-            for (int i = 0; i < byteCount; ++i) {
-                sendBuffer.writeByte('A');
-            }
-
-            for (int i = 0; i < iterations; ++i) {
-                transport.send(sendBuffer.copy());
-            }
-
-            assertTrue(Wait.waitFor(new Wait.Condition() {
-
-                @Override
-                public boolean isSatisified() throws Exception {
-                    return bytesRead.get() == (byteCount * iterations);
-                }
-            }));
-
-            transport.close();
-        }
-
-        assertTrue(!transportClosed);  // Normal shutdown does not trigger the 
event.
-        assertTrue(exceptions.isEmpty());
-    }
-
-    @Test(timeout = 60 * 1000)
-    public void testSendToClosedTransportFails() throws Exception {
-        NettyTcpTransport transport = null;
-
-        try (NettyEchoServer server = new NettyEchoServer()) {
-            server.start();
-
-            int port = server.getServerPort();
-            URI serverLocation = new URI("tcp://localhost:" + port);
-
-            transport = new NettyTcpTransport(testListener, serverLocation, 
testOptions);
-            try {
-                transport.connect();
-                LOG.info("Connected to test server.");
-            } catch (Exception e) {
-                fail("Should have connected to the server");
-            }
-
-            assertTrue(transport.isConnected());
-
-            transport.close();
-
-            ByteBuf sendBuffer = Unpooled.buffer(10);
-            try {
-                transport.send(sendBuffer);
-                fail("Should throw on send of closed transport");
-            } catch (IOException ex) {
-            }
-        }
-    }
-
-    private class NettyTransportListener implements TransportListener {
-
-        @Override
-        public void onData(ByteBuf incoming) {
-            LOG.debug("Client has new incoming data of size: {}", 
incoming.readableBytes());
-            data.add(incoming);
-            bytesRead.addAndGet(incoming.readableBytes());
-        }
-
-        @Override
-        public void onTransportClosed() {
-            LOG.debug("Transport reports that it has closed.");
-            transportClosed = true;
-        }
-
-        @Override
-        public void onTransportError(Throwable cause) {
-            LOG.debug("Transport error caught: {}", cause.getMessage());
-            exceptions.add(cause);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/10981041/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java
new file mode 100644
index 0000000..592ed1d
--- /dev/null
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.util.concurrent.Future;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.ServerSocketFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple Netty Server used to echo all data.
+ */
+public class NettyEchoServer implements AutoCloseable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(NettyEchoServer.class);
+
+    static final int PORT = Integer.parseInt(System.getProperty("port", 
"8007"));
+    static final int TIMEOUT = 5000;
+
+    private EventLoopGroup bossGroup;
+    private EventLoopGroup workerGroup;
+    private Channel serverChannel;
+    private int serverPort;
+
+    private final AtomicBoolean started = new AtomicBoolean();
+
+    public void start() throws Exception {
+
+        if (started.compareAndSet(false, true)) {
+
+            // Configure the server.
+            bossGroup = new NioEventLoopGroup(1);
+            workerGroup = new NioEventLoopGroup();
+
+            ServerBootstrap server = new ServerBootstrap();
+            server.group(bossGroup, workerGroup);
+            server.channel(NioServerSocketChannel.class);
+            server.option(ChannelOption.SO_BACKLOG, 100);
+            server.handler(new LoggingHandler(LogLevel.INFO));
+            server.childHandler(new ChannelInitializer<Channel>() {
+                @Override
+                public void initChannel(Channel ch) throws Exception {
+                    ch.pipeline().addLast(new EchoServerHandler());
+                }
+            });
+
+            // Start the server.
+            serverChannel = server.bind(getServerPort()).sync().channel();
+        }
+    }
+
+    public void stop() throws InterruptedException {
+        if (started.compareAndSet(true, false)) {
+            try {
+                LOG.info("Syncing channel close");
+                serverChannel.close().sync();
+            } catch (InterruptedException e) {
+            }
+            // Shut down all event loops to terminate all threads.
+            LOG.info("Shutting down boss group");
+            Future<?> bossFuture = bossGroup.shutdownGracefully(10, TIMEOUT, 
TimeUnit.MILLISECONDS);
+            LOG.info("Shutting down worker group");
+            Future<?> workerFuture = workerGroup.shutdownGracefully(10, 
TIMEOUT, TimeUnit.MILLISECONDS);
+
+            LOG.info("Awaiting boss group shutdown");
+            boolean bossShutdown = bossFuture.await(TIMEOUT + 500);
+
+            LOG.info("Awaiting worker group shutdown");
+            boolean workerShutdown = workerFuture.await(TIMEOUT + 500);
+
+            if (!bossShutdown) {
+                throw new RuntimeException("Failed to shut down bossGroup in 
allotted time");
+            }
+            if (!workerShutdown) {
+                throw new RuntimeException("Failed to shut down workerGroup in 
allotted time");
+            }
+        }
+    }
+
+    @Override
+    public void close() throws InterruptedException {
+        stop();
+    }
+
+    public int getServerPort() {
+        if (serverPort == 0) {
+            ServerSocket ss = null;
+            try {
+                ss = ServerSocketFactory.getDefault().createServerSocket(0);
+                serverPort = ss.getLocalPort();
+            } catch (IOException e) { // revert back to default
+                serverPort = PORT;
+            } finally {
+                try {
+                    if (ss != null ) {
+                        ss.close();
+                    }
+                } catch (IOException e) { // ignore
+                }
+            }
+        }
+        return serverPort;
+    }
+
+    private class EchoServerHandler extends ChannelInboundHandlerAdapter {
+
+        @Override
+        public void channelRead(ChannelHandlerContext ctx, Object msg) {
+            ctx.write(msg);
+        }
+
+        @Override
+        public void channelReadComplete(ChannelHandlerContext ctx) {
+            ctx.flush();
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) {
+            // Close the connection when an exception is raised.
+            cause.printStackTrace();
+            ctx.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/10981041/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
new file mode 100644
index 0000000..e65a1b7
--- /dev/null
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.Wait;
+import org.apache.qpid.jms.transports.TransportListener;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.netty.NettyTcpTransport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test basic functionality of the Netty based TCP transport.
+ */
+public class NettyTcpTransportTest extends QpidJmsTestCase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(NettyTcpTransportTest.class);
+
+    private static final int SEND_BYTE_COUNT = 1024;
+
+    private boolean transportClosed;
+    private final List<Throwable> exceptions = new ArrayList<Throwable>();
+    private final List<ByteBuf> data = new ArrayList<ByteBuf>();
+    private final AtomicInteger bytesRead = new AtomicInteger();
+
+    private final TransportListener testListener = new 
NettyTransportListener();
+    private final TransportOptions testOptions = new TransportOptions();
+
+    @Test(timeout = 60 * 1000)
+    public void testConnectToServer() throws Exception {
+        try (NettyEchoServer server = new NettyEchoServer()) {
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            NettyTcpTransport transport = new NettyTcpTransport(testListener, 
serverLocation, testOptions);
+            try {
+                transport.connect();
+                LOG.info("Connected to test server.");
+            } catch (Exception e) {
+                fail("Should have connected to the server");
+            }
+
+            assertTrue(transport.isConnected());
+
+            transport.close();
+        }
+
+        assertTrue(!transportClosed);  // Normal shutdown does not trigger the 
event.
+        assertTrue(exceptions.isEmpty());
+        assertTrue(data.isEmpty());
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testMultipleConnectionsToServer() throws Exception {
+        final int CONNECTION_COUNT = 25;
+
+        try (NettyEchoServer server = new NettyEchoServer()) {
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            List<NettyTcpTransport> transports = new 
ArrayList<NettyTcpTransport>();
+
+            for (int i = 0; i < CONNECTION_COUNT; ++i) {
+                NettyTcpTransport transport = new 
NettyTcpTransport(testListener, serverLocation, testOptions);
+                try {
+                    transport.connect();
+                    assertTrue(transport.isConnected());
+                    LOG.info("Connected to test server.");
+                    transports.add(transport);
+                } catch (Exception e) {
+                    fail("Should have connected to the server");
+                }
+            }
+
+            for (NettyTcpTransport transport : transports) {
+                transport.close();
+            }
+        }
+
+        assertTrue(!transportClosed);  // Normal shutdown does not trigger the 
event.
+        assertTrue(exceptions.isEmpty());
+        assertTrue(data.isEmpty());
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testMultipleConnectionsSendReceive() throws Exception {
+        final int CONNECTION_COUNT = 25;
+        final int FRAME_SIZE = 8;
+
+        ByteBuf sendBuffer = Unpooled.buffer(FRAME_SIZE);
+        for (int i = 0; i < 8; ++i) {
+            sendBuffer.writeByte('A');
+        }
+
+        try (NettyEchoServer server = new NettyEchoServer()) {
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            List<NettyTcpTransport> transports = new 
ArrayList<NettyTcpTransport>();
+
+            for (int i = 0; i < CONNECTION_COUNT; ++i) {
+                NettyTcpTransport transport = new 
NettyTcpTransport(testListener, serverLocation, testOptions);
+                try {
+                    transport.connect();
+                    transport.send(sendBuffer.copy());
+                    transports.add(transport);
+                } catch (Exception e) {
+                    fail("Should have connected to the server");
+                }
+            }
+
+            assertTrue(Wait.waitFor(new Wait.Condition() {
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    LOG.debug("Checking completion: read {} expecting {}", 
bytesRead.get(), (FRAME_SIZE * CONNECTION_COUNT));
+                    return bytesRead.get() == (FRAME_SIZE * CONNECTION_COUNT);
+                }
+            }));
+
+            for (NettyTcpTransport transport : transports) {
+                transport.close();
+            }
+        }
+
+        assertTrue(exceptions.isEmpty());
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testDetectServerClose() throws Exception {
+        NettyTcpTransport transport = null;
+
+        try (NettyEchoServer server = new NettyEchoServer()) {
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            transport = new NettyTcpTransport(testListener, serverLocation, 
testOptions);
+            try {
+                transport.connect();
+                LOG.info("Connected to test server.");
+            } catch (Exception e) {
+                fail("Should have connected to the server");
+            }
+
+            assertTrue(transport.isConnected());
+
+            server.close();
+        }
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return transportClosed;
+            }
+        }));
+        assertTrue(exceptions.isEmpty());
+        assertTrue(data.isEmpty());
+        assertFalse(transport.isConnected());
+
+        try {
+            transport.close();
+        } catch (Exception ex) {
+            fail("Close of a disconnect transport should not generate errors");
+        }
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testDataSentIsReceived() throws Exception {
+        try (NettyEchoServer server = new NettyEchoServer()) {
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+
+            NettyTcpTransport transport = new NettyTcpTransport(testListener, 
serverLocation, testOptions);
+            try {
+                transport.connect();
+                LOG.info("Connected to test server.");
+            } catch (Exception e) {
+                fail("Should have connected to the server");
+            }
+
+            assertTrue(transport.isConnected());
+
+            ByteBuf sendBuffer = Unpooled.buffer(SEND_BYTE_COUNT);
+            for (int i = 0; i < SEND_BYTE_COUNT; ++i) {
+                sendBuffer.writeByte('A');
+            }
+
+            transport.send(sendBuffer);
+
+            assertTrue(Wait.waitFor(new Wait.Condition() {
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return !data.isEmpty();
+                }
+            }));
+
+            assertEquals(SEND_BYTE_COUNT, data.get(0).readableBytes());
+
+            transport.close();
+        }
+
+        assertTrue(!transportClosed);  // Normal shutdown does not trigger the 
event.
+        assertTrue(exceptions.isEmpty());
+    }
+
+
+    @Test(timeout = 60 * 1000)
+    public void testMultipleDataPacketsSentAreReceived() throws Exception {
+        doMultipleDataPacketsSentAndReceive(SEND_BYTE_COUNT, 1);
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testMultipleDataPacketsSentAreReceivedRepeatedly() throws 
Exception {
+        doMultipleDataPacketsSentAndReceive(SEND_BYTE_COUNT, 10);
+    }
+
+    public void doMultipleDataPacketsSentAndReceive(final int byteCount, final 
int iterations) throws Exception {
+
+        try (NettyEchoServer server = new NettyEchoServer()) {
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            NettyTcpTransport transport = new NettyTcpTransport(testListener, 
serverLocation, testOptions);
+            try {
+                transport.connect();
+                LOG.info("Connected to test server.");
+            } catch (Exception e) {
+                fail("Should have connected to the server");
+            }
+
+            assertTrue(transport.isConnected());
+
+            ByteBuf sendBuffer = Unpooled.buffer(byteCount);
+            for (int i = 0; i < byteCount; ++i) {
+                sendBuffer.writeByte('A');
+            }
+
+            for (int i = 0; i < iterations; ++i) {
+                transport.send(sendBuffer.copy());
+            }
+
+            assertTrue(Wait.waitFor(new Wait.Condition() {
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return bytesRead.get() == (byteCount * iterations);
+                }
+            }));
+
+            transport.close();
+        }
+
+        assertTrue(!transportClosed);  // Normal shutdown does not trigger the 
event.
+        assertTrue(exceptions.isEmpty());
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testSendToClosedTransportFails() throws Exception {
+        NettyTcpTransport transport = null;
+
+        try (NettyEchoServer server = new NettyEchoServer()) {
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            transport = new NettyTcpTransport(testListener, serverLocation, 
testOptions);
+            try {
+                transport.connect();
+                LOG.info("Connected to test server.");
+            } catch (Exception e) {
+                fail("Should have connected to the server");
+            }
+
+            assertTrue(transport.isConnected());
+
+            transport.close();
+
+            ByteBuf sendBuffer = Unpooled.buffer(10);
+            try {
+                transport.send(sendBuffer);
+                fail("Should throw on send of closed transport");
+            } catch (IOException ex) {
+            }
+        }
+    }
+
+    private class NettyTransportListener implements TransportListener {
+
+        @Override
+        public void onData(ByteBuf incoming) {
+            LOG.debug("Client has new incoming data of size: {}", 
incoming.readableBytes());
+            data.add(incoming);
+            bytesRead.addAndGet(incoming.readableBytes());
+        }
+
+        @Override
+        public void onTransportClosed() {
+            LOG.debug("Transport reports that it has closed.");
+            transportClosed = true;
+        }
+
+        @Override
+        public void onTransportError(Throwable cause) {
+            LOG.debug("Transport error caught: {}", cause.getMessage());
+            exceptions.add(cause);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to