Add tests for the SSL version of the Transport. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/49b3b1ba Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/49b3b1ba Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/49b3b1ba
Branch: refs/heads/master Commit: 49b3b1baa4dbb9d54d8926271720ce88a3396b17 Parents: ffdf437 Author: Timothy Bish <[email protected]> Authored: Fri Jan 23 16:09:09 2015 -0500 Committer: Timothy Bish <[email protected]> Committed: Fri Jan 23 16:09:09 2015 -0500 ---------------------------------------------------------------------- .../jms/transports/netty/NettyEchoServer.java | 38 +++++++++++ .../transports/netty/NettySslTransportTest.java | 68 ++++++++++++++++++++ .../transports/netty/NettyTcpTransportTest.java | 66 +++++++++++-------- 3 files changed, 144 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/49b3b1ba/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java index 592ed1d..2144d03 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java @@ -27,7 +27,9 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.ssl.SslHandler; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import java.io.IOException; import java.net.ServerSocket; @@ -35,7 +37,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ServerSocketFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import org.apache.qpid.jms.transports.TransportOptions; +import org.apache.qpid.jms.transports.TransportSslOptions; +import org.apache.qpid.jms.transports.TransportSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,10 +58,19 @@ public class NettyEchoServer implements AutoCloseable { private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private Channel serverChannel; + private final TransportOptions options; private int serverPort; private final AtomicBoolean started = new AtomicBoolean(); + public NettyEchoServer() { + this.options = TransportOptions.DEFAULT_OPTIONS; + } + + public NettyEchoServer(TransportOptions options) { + this.options = options; + } + public void start() throws Exception { if (started.compareAndSet(false, true)) { @@ -71,6 +87,14 @@ public class NettyEchoServer implements AutoCloseable { server.childHandler(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { + if (options instanceof TransportSslOptions) { + TransportSslOptions sslOptions = (TransportSslOptions) options; + SSLContext context = TransportSupport.createSslContext(sslOptions); + SSLEngine engine = TransportSupport.createSslEngine(context, sslOptions); + engine.setUseClientMode(false); + SslHandler sslHandler = new SslHandler(engine); + ch.pipeline().addLast(sslHandler); + } ch.pipeline().addLast(new EchoServerHandler()); } }); @@ -87,6 +111,7 @@ public class NettyEchoServer implements AutoCloseable { serverChannel.close().sync(); } catch (InterruptedException e) { } + // Shut down all event loops to terminate all threads. LOG.info("Shutting down boss group"); Future<?> bossFuture = bossGroup.shutdownGracefully(10, TIMEOUT, TimeUnit.MILLISECONDS); @@ -136,6 +161,19 @@ public class NettyEchoServer implements AutoCloseable { private class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override + public void channelActive(final ChannelHandlerContext ctx) { + SslHandler handler = ctx.pipeline().get(SslHandler.class); + if (handler != null) { + handler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() { + @Override + public void operationComplete(Future<Channel> future) throws Exception { + LOG.info("SSL handshake completed successfully"); + } + }); + } + } + + @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/49b3b1ba/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportTest.java new file mode 100644 index 0000000..9fe7a54 --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.netty; + +import java.net.URI; + +import org.apache.qpid.jms.transports.Transport; +import org.apache.qpid.jms.transports.TransportListener; +import org.apache.qpid.jms.transports.TransportOptions; +import org.apache.qpid.jms.transports.TransportSslOptions; + +/** + * Test basic functionality of the Netty based SSL Transport. + */ +public class NettySslTransportTest extends NettyTcpTransportTest { + + public static final String PASSWORD = "password"; + public static final String SERVER_KEYSTORE = "src/test/resources/broker-jks.keystore"; + public static final String SERVER_TRUSTSTORE = "src/test/resources/broker-jks.truststore"; + public static final String CLINET_KEYSTORE = "src/test/resources/client-jks.keystore"; + public static final String CLINET_TRUSTSTORE = "src/test/resources/client-jks.truststore"; + public static final String KEYSTORE_TYPE = "jks"; + + @Override + protected Transport createTransport(URI serverLocation, TransportListener listener, TransportOptions options) { + return new NettySslTransport(listener, serverLocation, options); + } + + @Override + protected TransportSslOptions createClientOptions() { + TransportSslOptions options = TransportSslOptions.INSTANCE.clone(); + + options.setKeyStoreLocation(CLINET_KEYSTORE); + options.setTrustStoreLocation(CLINET_TRUSTSTORE); + options.setStoreType(KEYSTORE_TYPE); + options.setKeyStorePassword(PASSWORD); + options.setTrustStorePassword(PASSWORD); + + return options; + } + + @Override + protected TransportSslOptions createServerOptions() { + TransportSslOptions options = TransportSslOptions.INSTANCE.clone(); + + options.setKeyStoreLocation(SERVER_KEYSTORE); + options.setTrustStoreLocation(SERVER_TRUSTSTORE); + options.setStoreType(KEYSTORE_TYPE); + options.setKeyStorePassword(PASSWORD); + options.setTrustStorePassword(PASSWORD); + + return options; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/49b3b1ba/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java index e65a1b7..94de183 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java @@ -31,9 +31,9 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.qpid.jms.test.QpidJmsTestCase; import org.apache.qpid.jms.test.Wait; +import org.apache.qpid.jms.transports.Transport; import org.apache.qpid.jms.transports.TransportListener; import org.apache.qpid.jms.transports.TransportOptions; -import org.apache.qpid.jms.transports.netty.NettyTcpTransport; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,23 +47,22 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { private static final int SEND_BYTE_COUNT = 1024; - private boolean transportClosed; - private final List<Throwable> exceptions = new ArrayList<Throwable>(); - private final List<ByteBuf> data = new ArrayList<ByteBuf>(); - private final AtomicInteger bytesRead = new AtomicInteger(); + protected boolean transportClosed; + protected final List<Throwable> exceptions = new ArrayList<Throwable>(); + protected final List<ByteBuf> data = new ArrayList<ByteBuf>(); + protected final AtomicInteger bytesRead = new AtomicInteger(); - private final TransportListener testListener = new NettyTransportListener(); - private final TransportOptions testOptions = new TransportOptions(); + protected final TransportListener testListener = new NettyTransportListener(); @Test(timeout = 60 * 1000) public void testConnectToServer() throws Exception { - try (NettyEchoServer server = new NettyEchoServer()) { + try (NettyEchoServer server = new NettyEchoServer(createServerOptions())) { server.start(); int port = server.getServerPort(); URI serverLocation = new URI("tcp://localhost:" + port); - NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions); + Transport transport = createTransport(serverLocation, testListener, createClientOptions()); try { transport.connect(); LOG.info("Connected to test server."); @@ -85,16 +84,16 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { public void testMultipleConnectionsToServer() throws Exception { final int CONNECTION_COUNT = 25; - try (NettyEchoServer server = new NettyEchoServer()) { + try (NettyEchoServer server = new NettyEchoServer(createServerOptions())) { server.start(); int port = server.getServerPort(); URI serverLocation = new URI("tcp://localhost:" + port); - List<NettyTcpTransport> transports = new ArrayList<NettyTcpTransport>(); + List<Transport> transports = new ArrayList<Transport>(); for (int i = 0; i < CONNECTION_COUNT; ++i) { - NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions); + Transport transport = createTransport(serverLocation, testListener, createClientOptions()); try { transport.connect(); assertTrue(transport.isConnected()); @@ -105,7 +104,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { } } - for (NettyTcpTransport transport : transports) { + for (Transport transport : transports) { transport.close(); } } @@ -125,16 +124,16 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { sendBuffer.writeByte('A'); } - try (NettyEchoServer server = new NettyEchoServer()) { + try (NettyEchoServer server = new NettyEchoServer(createServerOptions())) { server.start(); int port = server.getServerPort(); URI serverLocation = new URI("tcp://localhost:" + port); - List<NettyTcpTransport> transports = new ArrayList<NettyTcpTransport>(); + List<Transport> transports = new ArrayList<Transport>(); for (int i = 0; i < CONNECTION_COUNT; ++i) { - NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions); + Transport transport = createTransport(serverLocation, testListener, createClientOptions()); try { transport.connect(); transport.send(sendBuffer.copy()); @@ -153,7 +152,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { } })); - for (NettyTcpTransport transport : transports) { + for (Transport transport : transports) { transport.close(); } } @@ -163,15 +162,15 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { @Test(timeout = 60 * 1000) public void testDetectServerClose() throws Exception { - NettyTcpTransport transport = null; + Transport transport = null; - try (NettyEchoServer server = new NettyEchoServer()) { + try (NettyEchoServer server = new NettyEchoServer(createServerOptions())) { server.start(); int port = server.getServerPort(); URI serverLocation = new URI("tcp://localhost:" + port); - transport = new NettyTcpTransport(testListener, serverLocation, testOptions); + transport = createTransport(serverLocation, testListener, createClientOptions()); try { transport.connect(); LOG.info("Connected to test server."); @@ -204,14 +203,13 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { @Test(timeout = 60 * 1000) public void testDataSentIsReceived() throws Exception { - try (NettyEchoServer server = new NettyEchoServer()) { + try (NettyEchoServer server = new NettyEchoServer(createServerOptions())) { server.start(); int port = server.getServerPort(); URI serverLocation = new URI("tcp://localhost:" + port); - - NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions); + Transport transport = createTransport(serverLocation, testListener, createClientOptions()); try { transport.connect(); LOG.info("Connected to test server."); @@ -258,13 +256,13 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { public void doMultipleDataPacketsSentAndReceive(final int byteCount, final int iterations) throws Exception { - try (NettyEchoServer server = new NettyEchoServer()) { + try (NettyEchoServer server = new NettyEchoServer(createServerOptions())) { server.start(); int port = server.getServerPort(); URI serverLocation = new URI("tcp://localhost:" + port); - NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions); + Transport transport = createTransport(serverLocation, testListener, createClientOptions()); try { transport.connect(); LOG.info("Connected to test server."); @@ -300,15 +298,15 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { @Test(timeout = 60 * 1000) public void testSendToClosedTransportFails() throws Exception { - NettyTcpTransport transport = null; + Transport transport = null; - try (NettyEchoServer server = new NettyEchoServer()) { + try (NettyEchoServer server = new NettyEchoServer(createServerOptions())) { server.start(); int port = server.getServerPort(); URI serverLocation = new URI("tcp://localhost:" + port); - transport = new NettyTcpTransport(testListener, serverLocation, testOptions); + transport = createTransport(serverLocation, testListener, createClientOptions()); try { transport.connect(); LOG.info("Connected to test server."); @@ -329,6 +327,18 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { } } + protected Transport createTransport(URI serverLocation, TransportListener listener, TransportOptions options) { + return new NettyTcpTransport(listener, serverLocation, options); + } + + protected TransportOptions createClientOptions() { + return TransportOptions.DEFAULT_OPTIONS.clone(); + } + + protected TransportOptions createServerOptions() { + return TransportOptions.DEFAULT_OPTIONS.clone(); + } + private class NettyTransportListener implements TransportListener { @Override --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
