Repository: qpid-jms
Updated Branches:
  refs/heads/master 1144acaf8 -> 945a48985


QPIDJMS-191 Refactor the Netty based test server

Extract the Netty server logic to allow for custom servers to be created
for other types of tests.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/945a4898
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/945a4898
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/945a4898

Branch: refs/heads/master
Commit: 945a489853a13396ed32e2ebb99eb043110f187b
Parents: 1144aca
Author: Timothy Bish <[email protected]>
Authored: Tue Aug 9 17:17:32 2016 -0400
Committer: Timothy Bish <[email protected]>
Committed: Tue Aug 9 17:17:32 2016 -0400

----------------------------------------------------------------------
 .../jms/transports/netty/NettyEchoServer.java   | 225 +-------------
 .../qpid/jms/transports/netty/NettyServer.java  | 293 +++++++++++++++++++
 2 files changed, 302 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/945a4898/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java
index 0884fc4..e14fdae 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java
@@ -16,219 +16,45 @@
  */
 package org.apache.qpid.jms.transports.netty;
 
-import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
-import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.net.ServerSocketFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-
 import org.apache.qpid.jms.transports.TransportOptions;
-import org.apache.qpid.jms.transports.TransportSslOptions;
-import org.apache.qpid.jms.transports.TransportSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
 import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.codec.http.DefaultFullHttpResponse;
-import io.netty.handler.codec.http.FullHttpRequest;
-import io.netty.handler.codec.http.FullHttpResponse;
-import io.netty.handler.codec.http.HttpHeaders;
-import io.netty.handler.codec.http.HttpObjectAggregator;
-import io.netty.handler.codec.http.HttpServerCodec;
-import io.netty.handler.codec.http.websocketx.WebSocketFrame;
-import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
-import io.netty.handler.logging.LogLevel;
-import io.netty.handler.logging.LoggingHandler;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
 
 /**
  * Simple Netty Server used to echo all data.
  */
-public class NettyEchoServer implements AutoCloseable {
-    private static final Logger LOG = 
LoggerFactory.getLogger(NettyEchoServer.class);
+public class NettyEchoServer extends NettyServer {
 
-    static final int PORT = Integer.parseInt(System.getProperty("port", 
"8007"));
-    static final String WEBSOCKET_PATH = "/";
-
-    private EventLoopGroup bossGroup;
-    private EventLoopGroup workerGroup;
-    private Channel serverChannel;
-    private final TransportOptions options;
-    private int serverPort;
-    private final boolean needClientAuth;
-    private final boolean webSocketServer;
-    private String webSocketPath = WEBSOCKET_PATH;
-    private volatile SslHandler sslHandler;
-
-    private final AtomicBoolean started = new AtomicBoolean();
+    private static final Logger LOG = 
LoggerFactory.getLogger(NettyEchoServer.class);
 
     public NettyEchoServer(TransportOptions options) {
-        this(options, false);
+        super(options);
     }
 
     public NettyEchoServer(TransportOptions options, boolean needClientAuth) {
-        this(options, needClientAuth, false);
+        super(options, needClientAuth);
     }
 
     public NettyEchoServer(TransportOptions options, boolean needClientAuth, 
boolean webSocketServer) {
-        this.options = options;
-        this.needClientAuth = needClientAuth;
-        this.webSocketServer = webSocketServer;
-    }
-
-    public String getWebSocketPath() {
-        return webSocketPath;
-    }
-
-    public void setWebSocketPath(String webSocketPath) {
-        this.webSocketPath = webSocketPath;
-    }
-
-    public void start() throws Exception {
-
-        if (started.compareAndSet(false, true)) {
-
-            // Configure the server.
-            bossGroup = new NioEventLoopGroup(1);
-            workerGroup = new NioEventLoopGroup();
-
-            ServerBootstrap server = new ServerBootstrap();
-            server.group(bossGroup, workerGroup);
-            server.channel(NioServerSocketChannel.class);
-            server.option(ChannelOption.SO_BACKLOG, 100);
-            server.handler(new LoggingHandler(LogLevel.INFO));
-            server.childHandler(new ChannelInitializer<Channel>() {
-
-                @Override
-                public void initChannel(Channel ch) throws Exception {
-                    if (options instanceof TransportSslOptions) {
-                        TransportSslOptions sslOptions = (TransportSslOptions) 
options;
-                        SSLContext context = 
TransportSupport.createSslContext(sslOptions);
-                        SSLEngine engine = 
TransportSupport.createSslEngine(context, sslOptions);
-                        engine.setUseClientMode(false);
-                        engine.setNeedClientAuth(needClientAuth);
-                        sslHandler = new SslHandler(engine);
-                        ch.pipeline().addLast(sslHandler);
-                    }
-
-                    if (webSocketServer) {
-                        ch.pipeline().addLast(new HttpServerCodec());
-                        ch.pipeline().addLast(new HttpObjectAggregator(65536));
-                        ch.pipeline().addLast(new 
WebSocketServerProtocolHandler(getWebSocketPath(), "amqp", true));
-                    }
-
-                    ch.pipeline().addLast(new EchoServerHandler());
-                }
-            });
-
-            // Start the server.
-            serverChannel = server.bind(getServerPort()).sync().channel();
-        }
-    }
-
-    public void stop() throws InterruptedException {
-        if (started.compareAndSet(true, false)) {
-            try {
-                LOG.info("Syncing channel close");
-                serverChannel.close().sync();
-            } catch (InterruptedException e) {
-            }
-
-            // Shut down all event loops to terminate all threads.
-            LOG.info("Shutting down boss group");
-            bossGroup.shutdownGracefully(10, 100, TimeUnit.MILLISECONDS);
-            LOG.info("Shutting down worker group");
-            workerGroup.shutdownGracefully(10, 100, TimeUnit.MILLISECONDS);
-        }
+        super(options, needClientAuth, webSocketServer);
     }
 
     @Override
-    public void close() throws InterruptedException {
-        stop();
-    }
-
-    public int getServerPort() {
-        if (serverPort == 0) {
-            ServerSocket ss = null;
-            try {
-                ss = ServerSocketFactory.getDefault().createServerSocket(0);
-                serverPort = ss.getLocalPort();
-            } catch (IOException e) { // revert back to default
-                serverPort = PORT;
-            } finally {
-                try {
-                    if (ss != null ) {
-                        ss.close();
-                    }
-                } catch (IOException e) { // ignore
-                }
-            }
-        }
-        return serverPort;
+    protected ChannelHandler getServerHandler() {
+        return new EchoServerHandler();
     }
 
     private class EchoServerHandler extends 
SimpleChannelInboundHandler<Object>  {
 
         @Override
-        public void channelActive(final ChannelHandlerContext ctx) {
-            LOG.info("Server -> New active channel: {}", ctx.channel());
-            SslHandler handler = ctx.pipeline().get(SslHandler.class);
-            if (handler != null) {
-                handler.handshakeFuture().addListener(new 
GenericFutureListener<Future<Channel>>() {
-                    @Override
-                    public void operationComplete(Future<Channel> future) 
throws Exception {
-                        LOG.info("Server -> SSL handshake completed. 
Succeeded: {}", future.isSuccess());
-                        if (!future.isSuccess()) {
-                            sslHandler.close();
-                            ctx.close();
-                        }
-                    }
-                });
-            }
-        }
-
-        @Override
-        public void channelInactive(ChannelHandlerContext ctx) throws 
Exception {
-            LOG.info("channel has gone inactive: {}", ctx.channel());
-            ctx.close();
-        }
-
-        @Override
         public void channelRead0(ChannelHandlerContext ctx, Object msg) {
             LOG.trace("Channel read: {}", msg);
-            if (webSocketServer) {
-                if (msg instanceof WebSocketFrame) {
-                    WebSocketFrame frame = (WebSocketFrame) msg;
-                    ctx.write(frame.copy());
-                    return;
-                } else if (msg instanceof FullHttpRequest) {
-                    // Reject anything not on the WebSocket path
-                    FullHttpRequest request = (FullHttpRequest) msg;
-                    sendHttpResponse(ctx, request, new 
DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
-                    return;
-                }
-            } else if (msg instanceof ByteBuf) {
+            if (msg instanceof ByteBuf) {
                 ctx.write(((ByteBuf) msg).copy());
                 return;
             }
@@ -236,38 +62,5 @@ public class NettyEchoServer implements AutoCloseable {
             String message = "unsupported frame type: " + 
msg.getClass().getName();
             throw new UnsupportedOperationException(message);
         }
-
-        @Override
-        public void channelReadComplete(ChannelHandlerContext ctx) {
-            ctx.flush();
-        }
-
-        @Override
-        public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) {
-            LOG.info("Exception caught on channel: {}", ctx.channel());
-            // Close the connection when an exception is raised.
-            cause.printStackTrace();
-            ctx.close();
-        }
-    }
-
-    private static void sendHttpResponse(ChannelHandlerContext ctx, 
FullHttpRequest request, FullHttpResponse response) {
-        // Generate an error page if response getStatus code is not OK (200).
-        if (response.getStatus().code() != 200) {
-            ByteBuf buf = 
Unpooled.copiedBuffer(response.getStatus().toString(), StandardCharsets.UTF_8);
-            response.content().writeBytes(buf);
-            buf.release();
-            HttpHeaders.setContentLength(response, 
response.content().readableBytes());
-        }
-
-        // Send the response and close the connection if necessary.
-        ChannelFuture f = ctx.channel().writeAndFlush(response);
-        if (!HttpHeaders.isKeepAlive(request) || response.getStatus().code() 
!= 200) {
-            f.addListener(ChannelFutureListener.CLOSE);
-        }
-    }
-
-    protected SslHandler getSslHandler() {
-        return sslHandler;
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/945a4898/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyServer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyServer.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyServer.java
new file mode 100644
index 0000000..5379880
--- /dev/null
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyServer.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.ServerSocketFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.TransportSslOptions;
+import org.apache.qpid.jms.transports.TransportSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+
+/**
+ * Base Server implementation used to create Netty based server 
implementations for
+ * unit testing aspects of the client code.
+ */
+public abstract class NettyServer implements AutoCloseable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(NettyEchoServer.class);
+
+    static final int PORT = Integer.parseInt(System.getProperty("port", 
"8007"));
+    static final String WEBSOCKET_PATH = "/";
+
+    private EventLoopGroup bossGroup;
+    private EventLoopGroup workerGroup;
+    private Channel serverChannel;
+    private final TransportOptions options;
+    private int serverPort;
+    private final boolean needClientAuth;
+    private final boolean webSocketServer;
+    private String webSocketPath = WEBSOCKET_PATH;
+    private volatile SslHandler sslHandler;
+
+    private final AtomicBoolean started = new AtomicBoolean();
+
+    public NettyServer(TransportOptions options) {
+        this(options, false);
+    }
+
+    public NettyServer(TransportOptions options, boolean needClientAuth) {
+        this(options, needClientAuth, false);
+    }
+
+    public NettyServer(TransportOptions options, boolean needClientAuth, 
boolean webSocketServer) {
+        this.options = options;
+        this.needClientAuth = needClientAuth;
+        this.webSocketServer = webSocketServer;
+    }
+
+    public boolean isWebSocketServer() {
+        return webSocketServer;
+    }
+
+    public String getWebSocketPath() {
+        return webSocketPath;
+    }
+
+    public void setWebSocketPath(String webSocketPath) {
+        this.webSocketPath = webSocketPath;
+    }
+
+    public void start() throws Exception {
+
+        if (started.compareAndSet(false, true)) {
+
+            // Configure the server.
+            bossGroup = new NioEventLoopGroup(1);
+            workerGroup = new NioEventLoopGroup();
+
+            ServerBootstrap server = new ServerBootstrap();
+            server.group(bossGroup, workerGroup);
+            server.channel(NioServerSocketChannel.class);
+            server.option(ChannelOption.SO_BACKLOG, 100);
+            server.handler(new LoggingHandler(LogLevel.INFO));
+            server.childHandler(new ChannelInitializer<Channel>() {
+
+                @Override
+                public void initChannel(Channel ch) throws Exception {
+                    if (options instanceof TransportSslOptions) {
+                        TransportSslOptions sslOptions = (TransportSslOptions) 
options;
+                        SSLContext context = 
TransportSupport.createSslContext(sslOptions);
+                        SSLEngine engine = 
TransportSupport.createSslEngine(context, sslOptions);
+                        engine.setUseClientMode(false);
+                        engine.setNeedClientAuth(needClientAuth);
+                        sslHandler = new SslHandler(engine);
+                        ch.pipeline().addLast(sslHandler);
+                    }
+
+                    if (webSocketServer) {
+                        ch.pipeline().addLast(new HttpServerCodec());
+                        ch.pipeline().addLast(new HttpObjectAggregator(65536));
+                        ch.pipeline().addLast(new 
WebSocketServerProtocolHandler(getWebSocketPath(), "amqp", true));
+                    }
+
+                    ch.pipeline().addLast(new NettyServerOutboundHandler());
+                    ch.pipeline().addLast(new NettyServerInboundHandler());
+                    ch.pipeline().addLast(getServerHandler());
+                }
+            });
+
+            // Start the server.
+            serverChannel = server.bind(getServerPort()).sync().channel();
+        }
+    }
+
+    protected abstract ChannelHandler getServerHandler();
+
+    public void stop() throws InterruptedException {
+        if (started.compareAndSet(true, false)) {
+            try {
+                LOG.info("Syncing channel close");
+                serverChannel.close().sync();
+            } catch (InterruptedException e) {
+            }
+
+            // Shut down all event loops to terminate all threads.
+            LOG.info("Shutting down boss group");
+            bossGroup.shutdownGracefully(10, 100, TimeUnit.MILLISECONDS);
+            LOG.info("Shutting down worker group");
+            workerGroup.shutdownGracefully(10, 100, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Override
+    public void close() throws InterruptedException {
+        stop();
+    }
+
+    public int getServerPort() {
+        if (serverPort == 0) {
+            ServerSocket ss = null;
+            try {
+                ss = ServerSocketFactory.getDefault().createServerSocket(0);
+                serverPort = ss.getLocalPort();
+            } catch (IOException e) { // revert back to default
+                serverPort = PORT;
+            } finally {
+                try {
+                    if (ss != null ) {
+                        ss.close();
+                    }
+                } catch (IOException e) { // ignore
+                }
+            }
+        }
+        return serverPort;
+    }
+
+    private class NettyServerOutboundHandler extends 
ChannelOutboundHandlerAdapter  {
+
+        @Override
+        public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise) throws Exception {
+            if (isWebSocketServer() && msg instanceof ByteBuf) {
+                BinaryWebSocketFrame frame = new 
BinaryWebSocketFrame((ByteBuf) msg);
+                ctx.write(frame, promise);
+            } else {
+                ctx.write(msg, promise);
+            }
+        }
+    }
+
+    private class NettyServerInboundHandler extends 
ChannelInboundHandlerAdapter  {
+
+        @Override
+        public void channelActive(final ChannelHandlerContext ctx) {
+            LOG.info("NettyServerHandler -> New active channel: {}", 
ctx.channel());
+            SslHandler handler = ctx.pipeline().get(SslHandler.class);
+            if (handler != null) {
+                handler.handshakeFuture().addListener(new 
GenericFutureListener<Future<Channel>>() {
+                    @Override
+                    public void operationComplete(Future<Channel> future) 
throws Exception {
+                        LOG.info("Server -> SSL handshake completed. 
Succeeded: {}", future.isSuccess());
+                        if (!future.isSuccess()) {
+                            sslHandler.close();
+                            ctx.close();
+                        }
+                    }
+                });
+            }
+        }
+
+        @Override
+        public void channelInactive(ChannelHandlerContext ctx) throws 
Exception {
+            LOG.info("NettyServerHandler: channel has gone inactive: {}", 
ctx.channel());
+            ctx.close();
+        }
+
+        @Override
+        public void channelRead(ChannelHandlerContext ctx, Object msg) {
+            LOG.trace("NettyServerHandler: Channel read: {}", msg);
+            if (msg instanceof WebSocketFrame) {
+                WebSocketFrame frame = (WebSocketFrame) msg;
+                ctx.fireChannelRead(frame.content());
+            } else if (msg instanceof FullHttpRequest) {
+                // Reject anything not on the WebSocket path
+                FullHttpRequest request = (FullHttpRequest) msg;
+                sendHttpResponse(ctx, request, new 
DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
+            } else {
+                // Forward anything else along to the next handler.
+                ctx.fireChannelRead(msg);
+            }
+        }
+
+        @Override
+        public void channelReadComplete(ChannelHandlerContext ctx) {
+            ctx.flush();
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) {
+            LOG.info("NettyServerHandler: NettyServerHandlerException caught 
on channel: {}", ctx.channel());
+            // Close the connection when an exception is raised.
+            cause.printStackTrace();
+            ctx.close();
+        }
+    }
+
+    private static void sendHttpResponse(ChannelHandlerContext ctx, 
FullHttpRequest request, FullHttpResponse response) {
+        // Generate an error page if response getStatus code is not OK (200).
+        if (response.getStatus().code() != 200) {
+            ByteBuf buf = 
Unpooled.copiedBuffer(response.getStatus().toString(), StandardCharsets.UTF_8);
+            response.content().writeBytes(buf);
+            buf.release();
+            HttpHeaders.setContentLength(response, 
response.content().readableBytes());
+        }
+
+        // Send the response and close the connection if necessary.
+        ChannelFuture f = ctx.channel().writeAndFlush(response);
+        if (!HttpHeaders.isKeepAlive(request) || response.getStatus().code() 
!= 200) {
+            f.addListener(ChannelFutureListener.CLOSE);
+        }
+    }
+
+    protected SslHandler getSslHandler() {
+        return sslHandler;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to