Repository: activemq
Updated Branches:
  refs/heads/master 19c940491 -> f6d25842c


NO-JIRA Adds some small fixes to the AMQP test client around SSL
handling that were found in Qpid JMS where some of this came from.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f6d25842
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f6d25842
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f6d25842

Branch: refs/heads/master
Commit: f6d25842ccb276ffe90f0c8ffdf3ee0fd0aca748
Parents: 19c9404
Author: Timothy Bish <[email protected]>
Authored: Mon Feb 29 16:13:28 2016 -0500
Committer: Timothy Bish <[email protected]>
Committed: Mon Feb 29 16:13:28 2016 -0500

----------------------------------------------------------------------
 .../amqp/client/transport/NettyTransport.java   | 47 ++++++++++----------
 .../client/transport/NettyTransportFactory.java |  2 +-
 2 files changed, 25 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/f6d25842/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
index 81e3a1d..4084780 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
@@ -126,9 +126,9 @@ public class NettyTransport {
                 if (future.isSuccess()) {
                     handleConnected(future.channel());
                 } else if (future.isCancelled()) {
-                    connectionFailed(new IOException("Connection attempt was 
cancelled"));
+                    connectionFailed(future.channel(), new 
IOException("Connection attempt was cancelled"));
                 } else {
-                    
connectionFailed(IOExceptionSupport.create(future.cause()));
+                    connectionFailed(future.channel(), 
IOExceptionSupport.create(future.cause()));
                 }
             }
         });
@@ -144,7 +144,7 @@ public class NettyTransport {
         if (failureCause != null) {
             // Close out any Netty resources now as they are no longer needed.
             if (channel != null) {
-                channel.close();
+                channel.close().syncUninterruptibly();
                 channel = null;
             }
             if (group != null) {
@@ -167,14 +167,14 @@ public class NettyTransport {
         }
     }
 
-    public boolean isSSL() {
-        return secure;
-    }
-
     public boolean isConnected() {
         return connected.get();
     }
 
+    public boolean isSSL() {
+        return secure;
+    }
+
     public void close() throws IOException {
         if (closed.compareAndSet(false, true)) {
             connected.set(false);
@@ -279,20 +279,10 @@ public class NettyTransport {
         }
     }
 
-    protected void configureChannel(Channel channel) throws Exception {
-        if (isSSL()) {
-            
channel.pipeline().addLast(NettyTransportSupport.createSslHandler(getRemoteLocation(),
 getSslOptions()));
-        }
-
-        channel.pipeline().addLast(new NettyTcpTransportHandler());
-    }
-
-    protected void handleConnected(final Channel channel) throws Exception {
+    protected void configureChannel(final Channel channel) throws Exception {
         if (isSSL()) {
-            SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
-
-            Future<Channel> channelFuture = sslHandler.handshakeFuture();
-            channelFuture.addListener(new 
GenericFutureListener<Future<Channel>>() {
+            SslHandler sslHandler = 
NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
+            sslHandler.handshakeFuture().addListener(new 
GenericFutureListener<Future<Channel>>() {
                 @Override
                 public void operationComplete(Future<Channel> future) throws 
Exception {
                     if (future.isSuccess()) {
@@ -300,11 +290,19 @@ public class NettyTransport {
                         connectionEstablished(channel);
                     } else {
                         LOG.trace("SSL Handshake has failed: {}", channel);
-                        
connectionFailed(IOExceptionSupport.create(future.cause()));
+                        connectionFailed(channel, 
IOExceptionSupport.create(future.cause()));
                     }
                 }
             });
-        } else {
+
+            channel.pipeline().addLast(sslHandler);
+        }
+
+        channel.pipeline().addLast(new NettyTcpTransportHandler());
+    }
+
+    protected void handleConnected(final Channel channel) throws Exception {
+        if (!isSSL()) {
             connectionEstablished(channel);
         }
     }
@@ -323,11 +321,14 @@ public class NettyTransport {
     /**
      * Called when the transport connection failed and an error should be 
returned.
      *
+     * @param failedChannel
+     *      The Channel instance that failed.
      * @param cause
      *      An IOException that describes the cause of the failed connection.
      */
-    protected void connectionFailed(IOException cause) {
+    protected void connectionFailed(Channel failedChannel, IOException cause) {
         failureCause = IOExceptionSupport.create(cause);
+        channel = failedChannel;
         connected.set(false);
         connectLatch.countDown();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f6d25842/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
index a002eae..fd50890 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
@@ -19,7 +19,7 @@ package org.apache.activemq.transport.amqp.client.transport;
 import java.net.URI;
 import java.util.Map;
 
-import org.apache.qpid.jms.util.PropertyUtil;
+import org.apache.activemq.transport.amqp.client.util.PropertyUtil;
 
 /**
  * Factory for creating the Netty based TCP Transport.

Reply via email to