Repository: qpid-jms
Updated Branches:
  refs/heads/master 9d2ed0d66 -> 982a983a1


Allow the ssl transport to control when the transport is considered
connected to account for the SSL handshake that must happen before
connect completes.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/982a983a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/982a983a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/982a983a

Branch: refs/heads/master
Commit: 982a983a17e6492dd0260208ddff34b6efb87fee
Parents: 9d2ed0d
Author: Timothy Bish <[email protected]>
Authored: Thu Jan 22 18:34:55 2015 -0500
Committer: Timothy Bish <[email protected]>
Committed: Thu Jan 22 18:34:55 2015 -0500

----------------------------------------------------------------------
 .../jms/transports/netty/NettySslTransport.java | 21 +++++++
 .../jms/transports/netty/NettyTcpTransport.java | 66 ++++++++++++++++----
 2 files changed, 76 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/982a983a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransport.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransport.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransport.java
index 46d1a94..f1eeb8e 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransport.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransport.java
@@ -17,6 +17,9 @@
 package org.apache.qpid.jms.transports.netty;
 
 import io.netty.channel.Channel;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 
 import java.net.URI;
 
@@ -24,6 +27,7 @@ import org.apache.qpid.jms.transports.TransportListener;
 import org.apache.qpid.jms.transports.TransportOptions;
 import org.apache.qpid.jms.transports.TransportSslOptions;
 import org.apache.qpid.jms.transports.TransportSupport;
+import org.apache.qpid.jms.util.IOExceptionSupport;
 
 /**
  * Extends the Netty based TCP transport to add SSL support.
@@ -63,6 +67,23 @@ public class NettySslTransport extends NettyTcpTransport {
         super.configureChannel(channel);
     }
 
+    @Override
+    protected void handleConnected(final Channel channel) throws Exception {
+        SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
+
+        Future<Channel> channelFuture = sslHandler.handshakeFuture();
+        channelFuture.addListener(new GenericFutureListener<Future<Channel>>() 
{
+            @Override
+            public void operationComplete(Future<Channel> future) throws 
Exception {
+                if (future.isSuccess()) {
+                    connectionEstablished(channel);
+                } else {
+                    
connectionFailed(IOExceptionSupport.create(future.cause()));
+                }
+            }
+        });
+    }
+
     private TransportSslOptions getSslOptions() {
         return (TransportSslOptions) options;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/982a983a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
index a958e45..1dff005 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
@@ -20,6 +20,7 @@ import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
@@ -31,6 +32,7 @@ import io.netty.channel.socket.nio.NioSocketChannel;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.qpid.jms.transports.Transport;
@@ -57,6 +59,8 @@ public class NettyTcpTransport implements Transport {
 
     private final AtomicBoolean connected = new AtomicBoolean();
     private final AtomicBoolean closed = new AtomicBoolean();
+    private final CountDownLatch connectLatch = new CountDownLatch(1);
+    private IOException failureCause;
 
     /**
      * Create a new transport instance
@@ -102,22 +106,37 @@ public class NettyTcpTransport implements Transport {
 
             @Override
             public void initChannel(Channel connectedChannel) throws Exception 
{
-                channel = connectedChannel;
-                configureChannel(channel);
+                configureChannel(connectedChannel);
             }
         });
 
         configureNetty(bootstrap, getTransportOptions());
 
         ChannelFuture future = bootstrap.connect(remote.getHost(), 
remote.getPort());
-        future.awaitUninterruptibly();
-
-        if (future.isCancelled()) {
-            throw new IOException("Connection attempt was cancelled");
-        } else if (!future.isSuccess()) {
-            throw IOExceptionSupport.create(future.cause());
-        } else {
-            connected.set(true);
+        future.addListener(new ChannelFutureListener() {
+
+            @Override
+            public void operationComplete(ChannelFuture future) throws 
Exception {
+                if (future.isSuccess()) {
+                    handleConnected(future.channel());
+                } else if (future.isCancelled()) {
+                    connectionFailed(new IOException("Connection attempt was 
cancelled"));
+                } else {
+                    
connectionFailed(IOExceptionSupport.create(future.cause()));
+                }
+            }
+        });
+
+        try {
+            connectLatch.await();
+        } catch (InterruptedException ex) {
+            LOG.debug("Transport connection was interrupted.");
+            Thread.interrupted();
+            throw IOExceptionSupport.create(ex);
+        }
+
+        if (failureCause != null) {
+            throw failureCause;
         }
     }
 
@@ -170,7 +189,7 @@ public class NettyTcpTransport implements Transport {
         return options;
     }
 
-    //----- Internal implementation details 
----------------------------------//
+    //----- Internal implementation details, can to be overridden as needed 
--//
 
     protected void configureNetty(Bootstrap bootstrap, TransportOptions 
options) {
         bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
@@ -197,6 +216,31 @@ public class NettyTcpTransport implements Transport {
         channel.pipeline().addLast(new NettyTcpTransportHandler());
     }
 
+    protected void handleConnected(Channel channel) throws Exception {
+        connectionEstablished(channel);
+    }
+
+    //----- State change handlers and checks 
---------------------------------//
+
+    /**
+     * Called when the transport has successfully connected and is ready for 
use.
+     */
+    protected void connectionEstablished(Channel connectedChannel) {
+        channel = connectedChannel;
+        connected.set(true);
+        connectLatch.countDown();
+    }
+
+    /**
+     * Called when the transport connection failed and an error should be 
returned.
+     * @param cause
+     */
+    protected void connectionFailed(IOException cause) {
+        failureCause = IOExceptionSupport.create(cause);
+        connected.set(false);
+        connectLatch.countDown();
+    }
+
     private void checkConnected() throws IOException {
         if (!connected.get()) {
             throw new IOException("Cannot send to a non-connected transport.");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to