Repository: qpid-jms Updated Branches: refs/heads/master 7596c7b7f -> d7a1e25e9
QPIDJMS-191 Some additional cleanups simplify the handler chain by creating a single common handler and specializing for socket vs web socket. Add explicit pong handling to the WS case. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/d7a1e25e Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/d7a1e25e Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/d7a1e25e Branch: refs/heads/master Commit: d7a1e25e9362a9f9a34666730ed295c550ba87b3 Parents: 7596c7b Author: Timothy Bish <[email protected]> Authored: Fri Jul 15 18:44:36 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Fri Jul 15 18:44:36 2016 -0400 ---------------------------------------------------------------------- .../jms/transports/netty/NettyTcpTransport.java | 85 ++++++++++---------- .../jms/transports/netty/NettyWsTransport.java | 15 ++-- 2 files changed, 50 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d7a1e25e/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java index 43f44e8..76d6481 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java @@ -71,8 +71,7 @@ public class NettyTcpTransport implements Transport { private final AtomicBoolean connected = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean(); private final CountDownLatch connectLatch = new CountDownLatch(1); - private IOException failureCause; - private Throwable pendingFailure; + private volatile IOException failureCause; /** * Create a new transport instance @@ -118,7 +117,7 @@ public class NettyTcpTransport implements Transport { } final SslHandler sslHandler; - if(isSecure()) { + if (isSecure()) { try { sslHandler = TransportSupport.createSslHandler(getRemoteLocation(), getSslOptions()); } catch (Exception ex) { @@ -144,9 +143,15 @@ public class NettyTcpTransport implements Transport { configureNetty(bootstrap, getTransportOptions()); ChannelFuture future = bootstrap.connect(getRemoteHost(), getRemotePort()); + future.addListener(new ChannelFutureListener() { - // Route all events through the channel handlers for consistent processing. - future.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + handleException(future.channel(), IOExceptionSupport.create(future.cause())); + } + } + }); try { connectLatch.await(); @@ -174,8 +179,8 @@ public class NettyTcpTransport implements Transport { @Override public void run() { - if (pendingFailure != null) { - channel.pipeline().fireExceptionCaught(pendingFailure); + if (failureCause != null) { + channel.pipeline().fireExceptionCaught(failureCause); } } }); @@ -297,20 +302,20 @@ public class NettyTcpTransport implements Transport { LOG.trace("Exception on channel! Channel is {}", channel); if (connected.compareAndSet(true, false) && !closed.get()) { LOG.trace("Firing onTransportError listener"); - if (pendingFailure != null) { - listener.onTransportError(pendingFailure); + if (failureCause != null) { + listener.onTransportError(failureCause); } else { listener.onTransportError(cause); } } else { // Hold the first failure for later dispatch if connect succeeds. // This will then trigger disconnect using the first error reported. - if (pendingFailure == null) { + if (failureCause == null) { LOG.trace("Holding error until connect succeeds: {}", cause.getMessage()); - pendingFailure = cause; + failureCause = IOExceptionSupport.create(cause); } - connectionFailed(channel, IOExceptionSupport.create(pendingFailure)); + connectionFailed(channel, failureCause); } } @@ -367,22 +372,7 @@ public class NettyTcpTransport implements Transport { } private void configureChannel(final Channel channel, final SslHandler sslHandler) throws Exception { - channel.pipeline().addLast(new NettyDefaultHandler()); - if (isSecure()) { - sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() { - @Override - public void operationComplete(Future<Channel> future) throws Exception { - if (future.isSuccess()) { - LOG.trace("SSL Handshake has completed: {}", channel); - handleConnected(channel); - } else { - LOG.trace("SSL Handshake has failed: {}", channel); - handleException(channel, future.cause()); - } - } - }); - channel.pipeline().addLast(sslHandler); } @@ -393,7 +383,7 @@ public class NettyTcpTransport implements Transport { //----- Handle connection errors -----------------------------------------// - private final class NettyDefaultHandler extends ChannelInboundHandlerAdapter { + protected abstract class NettyDefaultHandler<E> extends SimpleChannelInboundHandler<E> { @Override public void channelRegistered(ChannelHandlerContext context) throws Exception { @@ -401,6 +391,29 @@ public class NettyTcpTransport implements Transport { } @Override + public void channelActive(ChannelHandlerContext context) throws Exception { + // In the Secure case we need to let the handshake complete before we + // trigger the connected event. + if (!isSecure()) { + handleConnected(context.channel()); + } else { + SslHandler sslHandler = context.pipeline().get(SslHandler.class); + sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() { + @Override + public void operationComplete(Future<Channel> future) throws Exception { + if (future.isSuccess()) { + LOG.trace("SSL Handshake has completed: {}", channel); + handleConnected(channel); + } else { + LOG.trace("SSL Handshake has failed: {}", channel); + handleException(channel, future.cause()); + } + } + }); + } + } + + @Override public void channelInactive(ChannelHandlerContext context) throws Exception { handleChannelInactive(context.channel()); } @@ -413,21 +426,7 @@ public class NettyTcpTransport implements Transport { //----- Handle connection events -----------------------------------------// - protected class NettyTcpTransportHandler extends SimpleChannelInboundHandler<ByteBuf> { - - @Override - public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception { - handleException(context.channel(), cause); - } - - @Override - public void channelActive(ChannelHandlerContext context) throws Exception { - // In the Secure case we need to let the handshake complete before we - // trigger the connected event. - if (!isSecure()) { - handleConnected(context.channel()); - } - } + protected class NettyTcpTransportHandler extends NettyDefaultHandler<ByteBuf> { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d7a1e25e/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java index 5de56e1..0539495 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java @@ -30,13 +30,14 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPipeline; -import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; @@ -108,7 +109,7 @@ public class NettyWsTransport extends NettyTcpTransport { //----- Handle connection events -----------------------------------------// - private class NettyWebSocketTransportHandler extends SimpleChannelInboundHandler<Object> { + private class NettyWebSocketTransportHandler extends NettyDefaultHandler<Object> { private final WebSocketClientHandshaker handshaker; @@ -120,6 +121,8 @@ public class NettyWsTransport extends NettyTcpTransport { @Override public void channelActive(ChannelHandlerContext context) throws Exception { handshaker.handshake(context.channel()); + + super.channelActive(context); } @Override @@ -152,15 +155,13 @@ public class NettyWsTransport extends NettyTcpTransport { BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame; LOG.trace("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes()); listener.onData(binaryFrame.content()); + } else if (frame instanceof PingWebSocketFrame) { + LOG.trace("WebSocket Client received ping, response with pong"); + ch.write(new PongWebSocketFrame(frame.content())); } else if (frame instanceof CloseWebSocketFrame) { LOG.trace("WebSocket Client received closing"); ch.close(); } } - - @Override - public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception { - handleException(context.channel(), cause); - } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
