Repository: qpid-jms
Updated Branches:
  refs/heads/master 7596c7b7f -> d7a1e25e9


QPIDJMS-191 Some additional cleanups

simplify the handler chain by creating a single common handler and
specializing for socket vs web socket.  Add explicit pong handling to
the WS case.  

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/d7a1e25e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/d7a1e25e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/d7a1e25e

Branch: refs/heads/master
Commit: d7a1e25e9362a9f9a34666730ed295c550ba87b3
Parents: 7596c7b
Author: Timothy Bish <[email protected]>
Authored: Fri Jul 15 18:44:36 2016 -0400
Committer: Timothy Bish <[email protected]>
Committed: Fri Jul 15 18:44:36 2016 -0400

----------------------------------------------------------------------
 .../jms/transports/netty/NettyTcpTransport.java | 85 ++++++++++----------
 .../jms/transports/netty/NettyWsTransport.java  | 15 ++--
 2 files changed, 50 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d7a1e25e/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
index 43f44e8..76d6481 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
@@ -71,8 +71,7 @@ public class NettyTcpTransport implements Transport {
     private final AtomicBoolean connected = new AtomicBoolean();
     private final AtomicBoolean closed = new AtomicBoolean();
     private final CountDownLatch connectLatch = new CountDownLatch(1);
-    private IOException failureCause;
-    private Throwable pendingFailure;
+    private volatile IOException failureCause;
 
     /**
      * Create a new transport instance
@@ -118,7 +117,7 @@ public class NettyTcpTransport implements Transport {
         }
 
         final SslHandler sslHandler;
-        if(isSecure()) {
+        if (isSecure()) {
             try {
                 sslHandler = 
TransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
             } catch (Exception ex) {
@@ -144,9 +143,15 @@ public class NettyTcpTransport implements Transport {
         configureNetty(bootstrap, getTransportOptions());
 
         ChannelFuture future = bootstrap.connect(getRemoteHost(), 
getRemotePort());
+        future.addListener(new ChannelFutureListener() {
 
-        // Route all events through the channel handlers for consistent 
processing.
-        future.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+            @Override
+            public void operationComplete(ChannelFuture future) throws 
Exception {
+                if (!future.isSuccess()) {
+                    handleException(future.channel(), 
IOExceptionSupport.create(future.cause()));
+                }
+            }
+        });
 
         try {
             connectLatch.await();
@@ -174,8 +179,8 @@ public class NettyTcpTransport implements Transport {
 
                 @Override
                 public void run() {
-                    if (pendingFailure != null) {
-                        channel.pipeline().fireExceptionCaught(pendingFailure);
+                    if (failureCause != null) {
+                        channel.pipeline().fireExceptionCaught(failureCause);
                     }
                 }
             });
@@ -297,20 +302,20 @@ public class NettyTcpTransport implements Transport {
         LOG.trace("Exception on channel! Channel is {}", channel);
         if (connected.compareAndSet(true, false) && !closed.get()) {
             LOG.trace("Firing onTransportError listener");
-            if (pendingFailure != null) {
-                listener.onTransportError(pendingFailure);
+            if (failureCause != null) {
+                listener.onTransportError(failureCause);
             } else {
                 listener.onTransportError(cause);
             }
         } else {
             // Hold the first failure for later dispatch if connect succeeds.
             // This will then trigger disconnect using the first error 
reported.
-            if (pendingFailure == null) {
+            if (failureCause == null) {
                 LOG.trace("Holding error until connect succeeds: {}", 
cause.getMessage());
-                pendingFailure = cause;
+                failureCause = IOExceptionSupport.create(cause);
             }
 
-            connectionFailed(channel, 
IOExceptionSupport.create(pendingFailure));
+            connectionFailed(channel, failureCause);
         }
     }
 
@@ -367,22 +372,7 @@ public class NettyTcpTransport implements Transport {
     }
 
     private void configureChannel(final Channel channel, final SslHandler 
sslHandler) throws Exception {
-        channel.pipeline().addLast(new NettyDefaultHandler());
-
         if (isSecure()) {
-            sslHandler.handshakeFuture().addListener(new 
GenericFutureListener<Future<Channel>>() {
-                @Override
-                public void operationComplete(Future<Channel> future) throws 
Exception {
-                    if (future.isSuccess()) {
-                        LOG.trace("SSL Handshake has completed: {}", channel);
-                        handleConnected(channel);
-                    } else {
-                        LOG.trace("SSL Handshake has failed: {}", channel);
-                        handleException(channel, future.cause());
-                    }
-                }
-            });
-
             channel.pipeline().addLast(sslHandler);
         }
 
@@ -393,7 +383,7 @@ public class NettyTcpTransport implements Transport {
 
     //----- Handle connection errors 
-----------------------------------------//
 
-    private final class NettyDefaultHandler extends 
ChannelInboundHandlerAdapter {
+    protected abstract class NettyDefaultHandler<E> extends 
SimpleChannelInboundHandler<E> {
 
         @Override
         public void channelRegistered(ChannelHandlerContext context) throws 
Exception {
@@ -401,6 +391,29 @@ public class NettyTcpTransport implements Transport {
         }
 
         @Override
+        public void channelActive(ChannelHandlerContext context) throws 
Exception {
+            // In the Secure case we need to let the handshake complete before 
we
+            // trigger the connected event.
+            if (!isSecure()) {
+                handleConnected(context.channel());
+            } else {
+                SslHandler sslHandler = 
context.pipeline().get(SslHandler.class);
+                sslHandler.handshakeFuture().addListener(new 
GenericFutureListener<Future<Channel>>() {
+                    @Override
+                    public void operationComplete(Future<Channel> future) 
throws Exception {
+                        if (future.isSuccess()) {
+                            LOG.trace("SSL Handshake has completed: {}", 
channel);
+                            handleConnected(channel);
+                        } else {
+                            LOG.trace("SSL Handshake has failed: {}", channel);
+                            handleException(channel, future.cause());
+                        }
+                    }
+                });
+            }
+        }
+
+        @Override
         public void channelInactive(ChannelHandlerContext context) throws 
Exception {
             handleChannelInactive(context.channel());
         }
@@ -413,21 +426,7 @@ public class NettyTcpTransport implements Transport {
 
     //----- Handle connection events 
-----------------------------------------//
 
-    protected class NettyTcpTransportHandler extends 
SimpleChannelInboundHandler<ByteBuf> {
-
-        @Override
-        public void exceptionCaught(ChannelHandlerContext context, Throwable 
cause) throws Exception {
-            handleException(context.channel(), cause);
-        }
-
-        @Override
-        public void channelActive(ChannelHandlerContext context) throws 
Exception {
-            // In the Secure case we need to let the handshake complete before 
we
-            // trigger the connected event.
-            if (!isSecure()) {
-                handleConnected(context.channel());
-            }
-        }
+    protected class NettyTcpTransportHandler extends 
NettyDefaultHandler<ByteBuf> {
 
         @Override
         protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) 
throws Exception {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d7a1e25e/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java
index 5de56e1..0539495 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java
@@ -30,13 +30,14 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelPipeline;
-import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.DefaultHttpHeaders;
 import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.HttpClientCodec;
 import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
 import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
@@ -108,7 +109,7 @@ public class NettyWsTransport extends NettyTcpTransport {
 
     //----- Handle connection events 
-----------------------------------------//
 
-    private class NettyWebSocketTransportHandler extends 
SimpleChannelInboundHandler<Object> {
+    private class NettyWebSocketTransportHandler extends 
NettyDefaultHandler<Object> {
 
         private final WebSocketClientHandshaker handshaker;
 
@@ -120,6 +121,8 @@ public class NettyWsTransport extends NettyTcpTransport {
         @Override
         public void channelActive(ChannelHandlerContext context) throws 
Exception {
             handshaker.handshake(context.channel());
+
+            super.channelActive(context);
         }
 
         @Override
@@ -152,15 +155,13 @@ public class NettyWsTransport extends NettyTcpTransport {
                 BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) 
frame;
                 LOG.trace("WebSocket Client received data: {} bytes", 
binaryFrame.content().readableBytes());
                 listener.onData(binaryFrame.content());
+            } else if (frame instanceof PingWebSocketFrame) {
+                LOG.trace("WebSocket Client received ping, response with 
pong");
+                ch.write(new PongWebSocketFrame(frame.content()));
             } else if (frame instanceof CloseWebSocketFrame) {
                 LOG.trace("WebSocket Client received closing");
                 ch.close();
             }
         }
-
-        @Override
-        public void exceptionCaught(ChannelHandlerContext context, Throwable 
cause) throws Exception {
-            handleException(context.channel(), cause);
-        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to