Repository: qpid-jms
Updated Branches:
  refs/heads/master 55516e477 -> 60f86b37e


QPIDJMS-61: add ability to configure the contents of the SASL Init and Open 
frame hostname field, enabling vhost usage


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/60f86b37
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/60f86b37
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/60f86b37

Branch: refs/heads/master
Commit: 60f86b37e0a0a143c7c844acb5455604208a2c4a
Parents: 55516e4
Author: Robert Gemmell <[email protected]>
Authored: Mon Jun 8 17:10:03 2015 +0100
Committer: Robert Gemmell <[email protected]>
Committed: Mon Jun 8 17:10:20 2015 +0100

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpConnection.java  |  9 +++-
 .../qpid/jms/provider/amqp/AmqpProvider.java    | 38 ++++++++++++++++
 .../integration/ConnectionIntegrationTest.java  | 47 ++++++++++++++++++++
 .../integration/IdleTimeoutIntegrationTest.java |  4 +-
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    | 20 +++++++--
 5 files changed, 111 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/60f86b37/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index e20045a..4467aab 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -84,8 +84,15 @@ public class AmqpConnection extends 
AmqpAbstractResource<JmsConnectionInfo, Conn
 
     @Override
     protected void doOpen() {
+        String hostname = provider.getVhost();
+        if(hostname == null) {
+            hostname = remoteURI.getHost();
+        } else if (hostname.isEmpty()) {
+            hostname = null;
+        }
+
+        getEndpoint().setHostname(hostname);
         getEndpoint().setContainer(resource.getClientId());
-        getEndpoint().setHostname(remoteURI.getHost());
         getEndpoint().setDesiredCapabilities(new Symbol[] { 
SOLE_CONNECTION_CAPABILITY });
         super.doOpen();
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/60f86b37/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index b229cd6..5b532d1 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -21,6 +21,7 @@ import io.netty.buffer.ByteBufUtil;
 import io.netty.util.ReferenceCountUtil;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.security.Principal;
@@ -103,6 +104,7 @@ public class AmqpProvider implements Provider, 
TransportListener {
     private AmqpConnection connection;
     private org.apache.qpid.jms.transports.Transport transport;
     private String transportType = AmqpProviderFactory.DEFAULT_TRANSPORT_TYPE;
+    private String vhost;
     private boolean traceFrames;
     private boolean traceBytes;
     private boolean presettleConsumers;
@@ -271,6 +273,15 @@ public class AmqpProvider implements Provider, 
TransportListener {
                             Sasl sasl = protonTransport.sasl();
                             if (sasl != null) {
                                 sasl.client();
+
+                                String hostname = getVhost();
+                                if(hostname == null) {
+                                    hostname = remoteURI.getHost();
+                                } else if (hostname.isEmpty()) {
+                                    hostname = null;
+                                }
+
+                                setHostname(sasl, hostname);
                             }
                             connection = new AmqpConnection(AmqpProvider.this, 
protonConnection, sasl, connectionInfo);
                             connection.open(new AsyncResult() {
@@ -877,6 +888,22 @@ public class AmqpProvider implements Provider, 
TransportListener {
         return this.traceBytes;
     }
 
+    public String getVhost() {
+        return vhost;
+    }
+
+    /**
+     * Sets the hostname to be used in the AMQP SASL Init and Open frames.
+     *
+     * If set null, the host provided in the remoteURI will be used. If set to
+     * the empty string, the hostname field of the frames will be cleared.
+     *
+     * @param vhost the hostname to include in SASL Init and Open frames.
+     */
+    public void setVhost(String vhost) {
+        this.vhost = vhost;
+    }
+
     public int getIdleTimeout() {
         return idleTimeout;
     }
@@ -1016,4 +1043,15 @@ public class AmqpProvider implements Provider, 
TransportListener {
 
         return null;
     }
+
+    private static void setHostname(Sasl sasl, String hostname) {
+        //TODO: this is a hack until Proton 0.10+ is available with 
sasl#setHostname method.
+        try {
+            Field field = sasl.getClass().getDeclaredField("_hostname");
+            field.setAccessible(true);
+            field.set(sasl, hostname);
+        } catch (Exception e) {
+            LOG.trace("Failed to set SASL hostname", e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/60f86b37/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index 08a853a..fb8f89b 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -24,11 +24,15 @@ import static 
org.apache.qpid.jms.provider.amqp.AmqpSupport.NETWORK_HOST;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.OPEN_HOSTNAME;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.PORT;
 import static org.hamcrest.Matchers.arrayContaining;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -36,6 +40,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
 import javax.jms.ConnectionMetaData;
 import javax.jms.ExceptionListener;
 import javax.jms.IllegalStateException;
@@ -45,6 +50,7 @@ import javax.jms.Queue;
 import javax.jms.Session;
 
 import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
 import org.apache.qpid.jms.provider.ProviderRedirectedException;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.Wait;
@@ -54,6 +60,7 @@ import 
org.apache.qpid.jms.test.testpeer.basictypes.ConnectionError;
 import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.transaction.TxnCapability;
+import org.hamcrest.Matcher;
 import org.junit.Test;
 
 // TODO find a way to make the test abort immediately if the TestAmqpPeer 
throws an exception
@@ -117,6 +124,46 @@ public class ConnectionIntegrationTest extends 
QpidJmsTestCase {
         }
     }
 
+    @Test(timeout = 5000)
+    public void testAmqpHostnameSetByDefault() throws Exception {
+        doAmqpHostnameTestImpl("localhost", false, equalTo("localhost"));
+    }
+
+    @Test(timeout = 5000)
+    public void testAmqpHostnameSetByVhostOption() throws Exception {
+        String vhost = "myAmqpHost";
+        doAmqpHostnameTestImpl(vhost, true, equalTo(vhost));
+    }
+
+    @Test(timeout = 500000)
+    public void testAmqpHostnameNotSetWithEmptyVhostOption() throws Exception {
+        doAmqpHostnameTestImpl("", true, nullValue());
+    }
+
+    private void doAmqpHostnameTestImpl(String amqpHostname, boolean 
setHostnameOption, Matcher<?> hostnameMatcher) throws JMSException, 
InterruptedException, Exception, IOException {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            testPeer.expectAnonymousConnect(true, null, hostnameMatcher);
+            // Each connection creates a session for managing temporary 
destinations etc
+            testPeer.expectBegin(true);
+
+            String uri = "amqp://localhost:" + testPeer.getServerPort();
+            if(setHostnameOption) {
+                uri += "?amqp.vhost=" + amqpHostname;
+            }
+
+            ConnectionFactory factory = new JmsConnectionFactory(uri);
+            Connection connection = factory.createConnection();
+            // Set a clientID to provoke the actual AMQP connection process to 
occur.
+            connection.setClientID("clientName");
+
+            testPeer.waitForAllHandlersToComplete(1000);
+            assertNull(testPeer.getThrowable());
+
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
+
     @Test(timeout = 10000)
     public void testRemotelyEndConnectionListenerInvoked() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/60f86b37/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java
index 9fe598c..f4ef08a 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java
@@ -52,7 +52,7 @@ public class IdleTimeoutIntegrationTest extends 
QpidJmsTestCase {
     @Test(timeout = 5000)
     public void testIdleTimeoutIsAdvertisedByDefault() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            testPeer.expectAnonymousConnect(true, 
greaterThan(UnsignedInteger.valueOf(0)));
+            testPeer.expectAnonymousConnect(true, 
greaterThan(UnsignedInteger.valueOf(0)), null);
             // Each connection creates a session for managing temporary 
destinations etc
             testPeer.expectBegin(true);
 
@@ -75,7 +75,7 @@ public class IdleTimeoutIntegrationTest extends 
QpidJmsTestCase {
             int configuredTimeout = 54320;
             int advertisedValue = configuredTimeout / 2;
 
-            testPeer.expectAnonymousConnect(true, 
equalTo(UnsignedInteger.valueOf(advertisedValue)));
+            testPeer.expectAnonymousConnect(true, 
equalTo(UnsignedInteger.valueOf(advertisedValue)), null);
             // Each connection creates a session for managing temporary 
destinations etc
             testPeer.expectBegin(true);
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/60f86b37/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 7822c80..cbfe514 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -391,10 +391,10 @@ public class TestAmqpPeer implements AutoCloseable
 
     public void expectAnonymousConnect(boolean authorize)
     {
-        expectAnonymousConnect(authorize, null);
+        expectAnonymousConnect(authorize, null, null);
     }
 
-    public void expectAnonymousConnect(boolean authorize, Matcher<?> 
idleTimeoutMatcher)
+    public void expectAnonymousConnect(boolean authorize, Matcher<?> 
idleTimeoutMatcher, Matcher<?> hostnameMatcher)
     {
         SaslMechanismsFrame saslMechanismsFrame = new 
SaslMechanismsFrame().setSaslServerMechanisms(Symbol.valueOf("ANONYMOUS"));
         addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, 
AmqpHeader.SASL_HEADER,
@@ -402,7 +402,7 @@ public class TestAmqpPeer implements AutoCloseable
                                                     this, FrameType.SASL, 0,
                                                     saslMechanismsFrame, 
null)));
 
-        addHandler(new SaslInitMatcher()
+        SaslInitMatcher saslInitMatcher = new SaslInitMatcher()
             .withMechanism(equalTo(Symbol.valueOf("ANONYMOUS")))
             .withInitialResponse(equalTo(new Binary(new byte[0])))
             .onSuccess(new AmqpPeerRunnable()
@@ -417,7 +417,14 @@ public class TestAmqpPeer implements AutoCloseable
                             false);
                     _driverRunnable.expectHeader();
                 }
-            }));
+            });
+
+        if(hostnameMatcher != null)
+        {
+            saslInitMatcher.withHostname(hostnameMatcher);
+        }
+
+        addHandler(saslInitMatcher);
 
         addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, 
AmqpHeader.HEADER));
 
@@ -435,6 +442,11 @@ public class TestAmqpPeer implements AutoCloseable
             openMatcher.withIdleTimeOut(idleTimeoutMatcher);
         }
 
+        if(hostnameMatcher != null)
+        {
+            openMatcher.withHostname(hostnameMatcher);
+        }
+
         addHandler(openMatcher);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to