Repository: qpid-jms Updated Branches: refs/heads/master 55516e477 -> 60f86b37e
QPIDJMS-61: add ability to configure the contents of the SASL Init and Open frame hostname field, enabling vhost usage Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/60f86b37 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/60f86b37 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/60f86b37 Branch: refs/heads/master Commit: 60f86b37e0a0a143c7c844acb5455604208a2c4a Parents: 55516e4 Author: Robert Gemmell <[email protected]> Authored: Mon Jun 8 17:10:03 2015 +0100 Committer: Robert Gemmell <[email protected]> Committed: Mon Jun 8 17:10:20 2015 +0100 ---------------------------------------------------------------------- .../qpid/jms/provider/amqp/AmqpConnection.java | 9 +++- .../qpid/jms/provider/amqp/AmqpProvider.java | 38 ++++++++++++++++ .../integration/ConnectionIntegrationTest.java | 47 ++++++++++++++++++++ .../integration/IdleTimeoutIntegrationTest.java | 4 +- .../qpid/jms/test/testpeer/TestAmqpPeer.java | 20 +++++++-- 5 files changed, 111 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/60f86b37/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java index e20045a..4467aab 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java @@ -84,8 +84,15 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn @Override protected void doOpen() { + String hostname = provider.getVhost(); + if(hostname == null) { + hostname = remoteURI.getHost(); + } else if (hostname.isEmpty()) { + hostname = null; + } + + getEndpoint().setHostname(hostname); getEndpoint().setContainer(resource.getClientId()); - getEndpoint().setHostname(remoteURI.getHost()); getEndpoint().setDesiredCapabilities(new Symbol[] { SOLE_CONNECTION_CAPABILITY }); super.doOpen(); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/60f86b37/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index b229cd6..5b532d1 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBufUtil; import io.netty.util.ReferenceCountUtil; import java.io.IOException; +import java.lang.reflect.Field; import java.net.URI; import java.nio.ByteBuffer; import java.security.Principal; @@ -103,6 +104,7 @@ public class AmqpProvider implements Provider, TransportListener { private AmqpConnection connection; private org.apache.qpid.jms.transports.Transport transport; private String transportType = AmqpProviderFactory.DEFAULT_TRANSPORT_TYPE; + private String vhost; private boolean traceFrames; private boolean traceBytes; private boolean presettleConsumers; @@ -271,6 +273,15 @@ public class AmqpProvider implements Provider, TransportListener { Sasl sasl = protonTransport.sasl(); if (sasl != null) { sasl.client(); + + String hostname = getVhost(); + if(hostname == null) { + hostname = remoteURI.getHost(); + } else if (hostname.isEmpty()) { + hostname = null; + } + + setHostname(sasl, hostname); } connection = new AmqpConnection(AmqpProvider.this, protonConnection, sasl, connectionInfo); connection.open(new AsyncResult() { @@ -877,6 +888,22 @@ public class AmqpProvider implements Provider, TransportListener { return this.traceBytes; } + public String getVhost() { + return vhost; + } + + /** + * Sets the hostname to be used in the AMQP SASL Init and Open frames. + * + * If set null, the host provided in the remoteURI will be used. If set to + * the empty string, the hostname field of the frames will be cleared. + * + * @param vhost the hostname to include in SASL Init and Open frames. + */ + public void setVhost(String vhost) { + this.vhost = vhost; + } + public int getIdleTimeout() { return idleTimeout; } @@ -1016,4 +1043,15 @@ public class AmqpProvider implements Provider, TransportListener { return null; } + + private static void setHostname(Sasl sasl, String hostname) { + //TODO: this is a hack until Proton 0.10+ is available with sasl#setHostname method. + try { + Field field = sasl.getClass().getDeclaredField("_hostname"); + field.setAccessible(true); + field.set(sasl, hostname); + } catch (Exception e) { + LOG.trace("Failed to set SASL hostname", e); + } + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/60f86b37/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java index 08a853a..fb8f89b 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java @@ -24,11 +24,15 @@ import static org.apache.qpid.jms.provider.amqp.AmqpSupport.NETWORK_HOST; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.OPEN_HOSTNAME; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.PORT; import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -36,6 +40,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.jms.Connection; +import javax.jms.ConnectionFactory; import javax.jms.ConnectionMetaData; import javax.jms.ExceptionListener; import javax.jms.IllegalStateException; @@ -45,6 +50,7 @@ import javax.jms.Queue; import javax.jms.Session; import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.provider.ProviderRedirectedException; import org.apache.qpid.jms.test.QpidJmsTestCase; import org.apache.qpid.jms.test.Wait; @@ -54,6 +60,7 @@ import org.apache.qpid.jms.test.testpeer.basictypes.ConnectionError; import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transaction.TxnCapability; +import org.hamcrest.Matcher; import org.junit.Test; // TODO find a way to make the test abort immediately if the TestAmqpPeer throws an exception @@ -117,6 +124,46 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase { } } + @Test(timeout = 5000) + public void testAmqpHostnameSetByDefault() throws Exception { + doAmqpHostnameTestImpl("localhost", false, equalTo("localhost")); + } + + @Test(timeout = 5000) + public void testAmqpHostnameSetByVhostOption() throws Exception { + String vhost = "myAmqpHost"; + doAmqpHostnameTestImpl(vhost, true, equalTo(vhost)); + } + + @Test(timeout = 500000) + public void testAmqpHostnameNotSetWithEmptyVhostOption() throws Exception { + doAmqpHostnameTestImpl("", true, nullValue()); + } + + private void doAmqpHostnameTestImpl(String amqpHostname, boolean setHostnameOption, Matcher<?> hostnameMatcher) throws JMSException, InterruptedException, Exception, IOException { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + testPeer.expectAnonymousConnect(true, null, hostnameMatcher); + // Each connection creates a session for managing temporary destinations etc + testPeer.expectBegin(true); + + String uri = "amqp://localhost:" + testPeer.getServerPort(); + if(setHostnameOption) { + uri += "?amqp.vhost=" + amqpHostname; + } + + ConnectionFactory factory = new JmsConnectionFactory(uri); + Connection connection = factory.createConnection(); + // Set a clientID to provoke the actual AMQP connection process to occur. + connection.setClientID("clientName"); + + testPeer.waitForAllHandlersToComplete(1000); + assertNull(testPeer.getThrowable()); + + testPeer.expectClose(); + connection.close(); + } + } + @Test(timeout = 10000) public void testRemotelyEndConnectionListenerInvoked() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/60f86b37/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java index 9fe598c..f4ef08a 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java @@ -52,7 +52,7 @@ public class IdleTimeoutIntegrationTest extends QpidJmsTestCase { @Test(timeout = 5000) public void testIdleTimeoutIsAdvertisedByDefault() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - testPeer.expectAnonymousConnect(true, greaterThan(UnsignedInteger.valueOf(0))); + testPeer.expectAnonymousConnect(true, greaterThan(UnsignedInteger.valueOf(0)), null); // Each connection creates a session for managing temporary destinations etc testPeer.expectBegin(true); @@ -75,7 +75,7 @@ public class IdleTimeoutIntegrationTest extends QpidJmsTestCase { int configuredTimeout = 54320; int advertisedValue = configuredTimeout / 2; - testPeer.expectAnonymousConnect(true, equalTo(UnsignedInteger.valueOf(advertisedValue))); + testPeer.expectAnonymousConnect(true, equalTo(UnsignedInteger.valueOf(advertisedValue)), null); // Each connection creates a session for managing temporary destinations etc testPeer.expectBegin(true); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/60f86b37/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index 7822c80..cbfe514 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -391,10 +391,10 @@ public class TestAmqpPeer implements AutoCloseable public void expectAnonymousConnect(boolean authorize) { - expectAnonymousConnect(authorize, null); + expectAnonymousConnect(authorize, null, null); } - public void expectAnonymousConnect(boolean authorize, Matcher<?> idleTimeoutMatcher) + public void expectAnonymousConnect(boolean authorize, Matcher<?> idleTimeoutMatcher, Matcher<?> hostnameMatcher) { SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(Symbol.valueOf("ANONYMOUS")); addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER, @@ -402,7 +402,7 @@ public class TestAmqpPeer implements AutoCloseable this, FrameType.SASL, 0, saslMechanismsFrame, null))); - addHandler(new SaslInitMatcher() + SaslInitMatcher saslInitMatcher = new SaslInitMatcher() .withMechanism(equalTo(Symbol.valueOf("ANONYMOUS"))) .withInitialResponse(equalTo(new Binary(new byte[0]))) .onSuccess(new AmqpPeerRunnable() @@ -417,7 +417,14 @@ public class TestAmqpPeer implements AutoCloseable false); _driverRunnable.expectHeader(); } - })); + }); + + if(hostnameMatcher != null) + { + saslInitMatcher.withHostname(hostnameMatcher); + } + + addHandler(saslInitMatcher); addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER)); @@ -435,6 +442,11 @@ public class TestAmqpPeer implements AutoCloseable openMatcher.withIdleTimeOut(idleTimeoutMatcher); } + if(hostnameMatcher != null) + { + openMatcher.withHostname(hostnameMatcher); + } + addHandler(openMatcher); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
