http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java index 63702c9..b26cb5f 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java @@ -75,54 +75,54 @@ public class AmqpProviderTest extends QpidJmsTestCase { @Test(timeout=20000) public void testCreate() throws Exception { - provider = new AmqpProvider(getDefaultURI()); + provider = new AmqpProviderFactory().createProvider(getDefaultURI()); } @Test(timeout=20000, expected=RuntimeException.class) public void testGetMessageFactoryTrowsWhenNotConnected() throws Exception { - provider = new AmqpProvider(getDefaultURI()); + provider = new AmqpProviderFactory().createProvider(getDefaultURI()); provider.getMessageFactory(); } @Test(timeout=20000) public void testUnInitializedProviderReturnsDefaultConnectTimeout() throws Exception { - provider = new AmqpProvider(getDefaultURI()); + provider = new AmqpProviderFactory().createProvider(getDefaultURI()); assertEquals(JmsConnectionInfo.DEFAULT_CONNECT_TIMEOUT, provider.getConnectTimeout()); } @Test(timeout=20000) public void testUnInitializedProviderReturnsDefaultCloseTimeout() throws Exception { - provider = new AmqpProvider(getDefaultURI()); + provider = new AmqpProviderFactory().createProvider(getDefaultURI()); assertEquals(JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT, provider.getCloseTimeout()); } @Test(timeout=20000) public void testUnInitializedProviderReturnsDefaultSendTimeout() throws Exception { - provider = new AmqpProvider(getDefaultURI()); + provider = new AmqpProviderFactory().createProvider(getDefaultURI()); assertEquals(JmsConnectionInfo.DEFAULT_SEND_TIMEOUT, provider.getSendTimeout()); } @Test(timeout=20000) public void testUnInitializedProviderReturnsDefaultRequestTimeout() throws Exception { - provider = new AmqpProvider(getDefaultURI()); + provider = new AmqpProviderFactory().createProvider(getDefaultURI()); assertEquals(JmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT, provider.getRequestTimeout()); } @Test(timeout=20000) public void testGetDefaultDrainTimeout() throws Exception { - provider = new AmqpProvider(getDefaultURI()); + provider = new AmqpProviderFactory().createProvider(getDefaultURI()); assertEquals(TimeUnit.MINUTES.toMillis(1), provider.getDrainTimeout()); } @Test(timeout=20000) public void testGetDefaultIdleTimeout() throws Exception { - provider = new AmqpProvider(getDefaultURI()); + provider = new AmqpProviderFactory().createProvider(getDefaultURI()); assertEquals(TimeUnit.MINUTES.toMillis(1), provider.getIdleTimeout()); } @Test(timeout=20000) public void testEnableTraceFrames() throws Exception { - provider = new AmqpProvider(getDefaultURI()); + provider = new AmqpProviderFactory().createProvider(getDefaultURI()); TransportImpl transport = (TransportImpl) provider.getProtonTransport(); assertNotNull(transport); assertNull(transport.getProtocolTracer()); @@ -131,19 +131,13 @@ public class AmqpProviderTest extends QpidJmsTestCase { } @Test(timeout=20000) - public void testConnectWithUnknownProtocol() throws Exception { - try (TestAmqpPeer testPeer = new TestAmqpPeer()) { - provider = new AmqpProvider(getPeerURI(testPeer)); - provider.setTransportType("ftp"); - try { - provider.connect(connectionInfo); - fail("Should have failed to connect."); - } catch (Exception ex) { - } - - provider.close(); - - testPeer.waitForAllHandlersToComplete(1000); + public void testCreateFailsWithUnknownProtocol() throws Exception { + try { + AmqpProviderFactory factory = new AmqpProviderFactory(); + factory.setTransportScheme("ftp"); + factory.createProvider(new URI("ftp://localhost:5672")); + fail("Should have failed to connect."); + } catch (Exception ex) { } } @@ -153,7 +147,7 @@ public class AmqpProviderTest extends QpidJmsTestCase { URI peerURI = getPeerURI(testPeer); testPeer.close(); - provider = new AmqpProvider(peerURI); + provider = new AmqpProviderFactory().createProvider(peerURI); try { provider.connect(connectionInfo); fail("Should have failed to connect."); @@ -166,7 +160,7 @@ public class AmqpProviderTest extends QpidJmsTestCase { public void testDisableSaslLayer() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer()) { - provider = new AmqpProvider(getPeerURI(testPeer)); + provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer)); provider.setSaslLayer(false); provider.connect(connectionInfo); @@ -185,7 +179,7 @@ public class AmqpProviderTest extends QpidJmsTestCase { try (TestAmqpPeer testPeer = new TestAmqpPeer()) { testPeer.expectSaslAnonymous(); - provider = new AmqpProvider(getPeerURI(testPeer)); + provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer)); TransportImpl transport = (TransportImpl) provider.getProtonTransport(); @@ -210,7 +204,7 @@ public class AmqpProviderTest extends QpidJmsTestCase { try (TestAmqpPeer testPeer = new TestAmqpPeer()) { testPeer.expectSaslAnonymous(); - provider = new AmqpProvider(getPeerURI(testPeer)); + provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer)); TransportImpl transport = (TransportImpl) provider.getProtonTransport(); @@ -235,7 +229,7 @@ public class AmqpProviderTest extends QpidJmsTestCase { try (TestAmqpPeer testPeer = new TestAmqpPeer()) { testPeer.expectSaslAnonymous(); - provider = new AmqpProvider(getPeerURI(testPeer)); + provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer)); TransportImpl transport = (TransportImpl) provider.getProtonTransport(); @@ -259,7 +253,7 @@ public class AmqpProviderTest extends QpidJmsTestCase { try (TestAmqpPeer testPeer = new TestAmqpPeer()) { testPeer.expectSaslAnonymous(); - provider = new AmqpProvider(getPeerURI(testPeer)); + provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer)); provider.connect(connectionInfo); assertNull(provider.getProviderListener()); @@ -294,7 +288,7 @@ public class AmqpProviderTest extends QpidJmsTestCase { try (TestAmqpPeer testPeer = new TestAmqpPeer()) { testPeer.expectSaslAnonymous(); - provider = new AmqpProvider(getPeerURI(testPeer)); + provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer)); provider.connect(connectionInfo); assertTrue(provider.toString().contains("localhost")); assertTrue(provider.toString().contains(String.valueOf(testPeer.getServerPort()))); @@ -315,7 +309,7 @@ public class AmqpProviderTest extends QpidJmsTestCase { testPeer.expectOpen(); testPeer.expectClose(); - provider = new AmqpProvider(getPeerURI(testPeer)); + provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer)); provider.connect(connectionInfo); provider.close(); @@ -352,7 +346,7 @@ public class AmqpProviderTest extends QpidJmsTestCase { connectionInfo.setUsername(TEST_USERNAME); connectionInfo.setPassword(TEST_PASSWORD); - provider = new AmqpProvider(getPeerURI(testPeer)); + provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer)); testPeer.expectSaslPlain(TEST_USERNAME, TEST_PASSWORD); testPeer.expectOpen(); testPeer.expectBegin(); @@ -403,7 +397,7 @@ public class AmqpProviderTest extends QpidJmsTestCase { private void doErrorDuringOperationFailsRequesTTestImpl(Op operation) throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer()) { - provider = new AmqpProvider(getPeerURI(testPeer)); + provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer)); final AtomicBoolean errorThrown = new AtomicBoolean(); JmsResource resourceInfo = new JmsAbstractResource() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSupportTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSupportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSupportTest.java index 70ebd11..15f29d8 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSupportTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSupportTest.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; @@ -30,46 +32,55 @@ import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.junit.Test; +import org.mockito.Mockito; public class AmqpSupportTest { @Test - public void testCreateRedirectionException() { + public void testCreateRedirectionException() throws URISyntaxException { ErrorCondition condition = new ErrorCondition(); + AmqpProvider mockProvider = Mockito.mock(AmqpProvider.class); + Mockito.when(mockProvider.getRemoteURI()).thenReturn(new URI("amqp://localhost:5672")); + Map<Symbol, Object> info = new HashMap<>(); info.put(AmqpSupport.PORT, "5672"); info.put(AmqpSupport.OPEN_HOSTNAME, "localhost.localdomain"); info.put(AmqpSupport.NETWORK_HOST, "localhost"); info.put(AmqpSupport.SCHEME, "amqp"); - info.put(AmqpSupport.PATH, "websocket"); + info.put(AmqpSupport.PATH, "/websocket"); condition.setInfo(info); Symbol error = AmqpError.INTERNAL_ERROR; String message = "Failed to connect"; - Exception result = AmqpSupport.createRedirectException(error, message, condition); + Exception result = AmqpSupport.createRedirectException(mockProvider, error, message, condition); assertNotNull(result); assertTrue(result instanceof ProviderRedirectedException); ProviderRedirectedException pre = (ProviderRedirectedException) result; - assertEquals(5672, pre.getPort()); - assertEquals("localhost.localdomain", pre.getHostname()); - assertEquals("localhost", pre.getNetworkHost()); - assertEquals("amqp", pre.getScheme()); - assertEquals("websocket", pre.getPath()); + URI redirection = pre.getRedirectionURI(); + + assertEquals(5672, redirection.getPort()); + assertTrue("localhost.localdomain", redirection.getQuery().contains("amqp.vhost=localhost.localdomain")); + assertEquals("localhost", redirection.getHost()); + assertEquals("amqp", redirection.getScheme()); + assertEquals("/websocket", redirection.getPath()); } @Test - public void testCreateRedirectionExceptionWithNoRedirectInfo() { + public void testCreateRedirectionExceptionWithNoRedirectInfo() throws URISyntaxException { + AmqpProvider mockProvider = Mockito.mock(AmqpProvider.class); + Mockito.when(mockProvider.getRemoteURI()).thenReturn(new URI("amqp://localhost:5672")); + ErrorCondition condition = new ErrorCondition(); Symbol error = AmqpError.INTERNAL_ERROR; String message = "Failed to connect"; - Exception result = AmqpSupport.createRedirectException(error, message, condition); + Exception result = AmqpSupport.createRedirectException(mockProvider, error, message, condition); assertNotNull(result); assertFalse(result instanceof ProviderRedirectedException); @@ -77,7 +88,10 @@ public class AmqpSupportTest { } @Test - public void testCreateRedirectionExceptionWithNoNetworkHost() { + public void testCreateRedirectionExceptionWithNoNetworkHost() throws URISyntaxException { + AmqpProvider mockProvider = Mockito.mock(AmqpProvider.class); + Mockito.when(mockProvider.getRemoteURI()).thenReturn(new URI("amqp://localhost:5672")); + ErrorCondition condition = new ErrorCondition(); Map<Symbol, Object> info = new HashMap<>(); @@ -91,7 +105,7 @@ public class AmqpSupportTest { Symbol error = AmqpError.INTERNAL_ERROR; String message = "Failed to connect"; - Exception result = AmqpSupport.createRedirectException(error, message, condition); + Exception result = AmqpSupport.createRedirectException(mockProvider, error, message, condition); assertNotNull(result); assertFalse(result instanceof ProviderRedirectedException); @@ -99,7 +113,10 @@ public class AmqpSupportTest { } @Test - public void testCreateRedirectionExceptionWithEmptyNetworkHost() { + public void testCreateRedirectionExceptionWithEmptyNetworkHost() throws URISyntaxException { + AmqpProvider mockProvider = Mockito.mock(AmqpProvider.class); + Mockito.when(mockProvider.getRemoteURI()).thenReturn(new URI("amqp://localhost:5672")); + ErrorCondition condition = new ErrorCondition(); Map<Symbol, Object> info = new HashMap<>(); @@ -114,7 +131,7 @@ public class AmqpSupportTest { Symbol error = AmqpError.INTERNAL_ERROR; String message = "Failed to connect"; - Exception result = AmqpSupport.createRedirectException(error, message, condition); + Exception result = AmqpSupport.createRedirectException(mockProvider, error, message, condition); assertNotNull(result); assertFalse(result instanceof ProviderRedirectedException); @@ -122,7 +139,10 @@ public class AmqpSupportTest { } @Test - public void testCreateRedirectionExceptionWithInvalidPort() { + public void testCreateRedirectionExceptionWithInvalidPort() throws URISyntaxException { + AmqpProvider mockProvider = Mockito.mock(AmqpProvider.class); + Mockito.when(mockProvider.getRemoteURI()).thenReturn(new URI("amqp://localhost:5672")); + ErrorCondition condition = new ErrorCondition(); Map<Symbol, Object> info = new HashMap<>(); @@ -137,7 +157,7 @@ public class AmqpSupportTest { Symbol error = AmqpError.INTERNAL_ERROR; String message = "Failed to connect"; - Exception result = AmqpSupport.createRedirectException(error, message, condition); + Exception result = AmqpSupport.createRedirectException(mockProvider, error, message, condition); assertNotNull(result); assertFalse(result instanceof ProviderRedirectedException); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java index e543eec..1120285 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java @@ -419,4 +419,31 @@ public class FailoverProviderTest extends FailoverProviderTestSupport { assertEquals(SEND_TIMEOUT, provider.getSendTimeout()); assertEquals(REQUEST_TIMEOUT, provider.getRequestTimeout()); } + + @Test(timeout = 30000) + public void testAmqpOpenServerListBehaviourDefault() { + provider = new FailoverProvider(uris); + assertEquals("REPLACE", provider.getAmqpOpenServerListBehaviour()); + } + + @Test(timeout = 30000) + public void testSetGetAmqpOpenServerListBehaviour() { + provider = new FailoverProvider(uris); + String behaviour = "ADD"; + assertFalse(behaviour.equals(provider.getAmqpOpenServerListBehaviour())); + + provider.setAmqpOpenServerListBehaviour(behaviour); + assertEquals(behaviour, provider.getAmqpOpenServerListBehaviour()); + } + + @Test(timeout = 30000) + public void testSetInvalidAmqpOpenServerListBehaviourThrowsIAE() { + provider = new FailoverProvider(uris); + try { + provider.setAmqpOpenServerListBehaviour("invalid"); + fail("no exception was thrown"); + } catch (IllegalArgumentException iae) { + // Expected + } + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java index 5c9bef6..bcf9d2c 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java @@ -22,6 +22,7 @@ import static org.apache.qpid.jms.provider.amqp.AmqpSupport.PORT; import static org.junit.Assert.assertTrue; import java.net.URI; +import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -29,7 +30,6 @@ import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.ConnectionFactory; -import javax.jms.JMSException; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsConnectionFactory; @@ -57,7 +57,7 @@ public class FailoverRedirectTest extends QpidJmsTestCase { TestAmqpPeer redirectedPeer = new TestAmqpPeer();) { final CountDownLatch connected = new CountDownLatch(1); - final String redirectURI = createPeerURI(redirectedPeer); + final URI redirectURI = createPeerURI(redirectedPeer); LOG.info("Backup peer is at: {}", redirectURI); redirectedPeer.expectSaslAnonymous(); @@ -76,7 +76,7 @@ public class FailoverRedirectTest extends QpidJmsTestCase { @Override public void onConnectionEstablished(URI remoteURI) { LOG.info("Connection Established: {}", remoteURI); - if (redirectURI.equals(remoteURI.toString())) { + if (isExpectedHost(redirectURI, remoteURI)) { connected.countDown(); } } @@ -100,8 +100,8 @@ public class FailoverRedirectTest extends QpidJmsTestCase { final CountDownLatch connectedToPrimary = new CountDownLatch(1); final CountDownLatch connectedToBackup = new CountDownLatch(1); - final String rejectingURI = createPeerURI(rejectingPeer); - final String redirectURI = createPeerURI(redirectedPeer); + final URI rejectingURI = createPeerURI(rejectingPeer); + final URI redirectURI = createPeerURI(redirectedPeer); LOG.info("Primary is at {}: Backup peer is at: {}", rejectingURI, redirectURI); redirectedPeer.expectSaslAnonymous(); @@ -123,7 +123,7 @@ public class FailoverRedirectTest extends QpidJmsTestCase { @Override public void onConnectionEstablished(URI remoteURI) { LOG.info("Connection Established: {}", remoteURI); - if (remoteURI.toString().equals(rejectingURI)) { + if (isExpectedHost(rejectingURI, remoteURI)) { connectedToPrimary.countDown(); } } @@ -131,7 +131,7 @@ public class FailoverRedirectTest extends QpidJmsTestCase { @Override public void onConnectionRestored(URI remoteURI) { LOG.info("Connection Reestablished: {}", remoteURI); - if (remoteURI.toString().equals(redirectURI)) { + if (isExpectedHost(redirectURI, remoteURI)) { connectedToBackup.countDown(); } } @@ -149,8 +149,8 @@ public class FailoverRedirectTest extends QpidJmsTestCase { } } - private JmsConnection establishAnonymousConnecton(TestAmqpPeer testPeer) throws JMSException { - final String remoteURI = "failover:(" + createPeerURI(testPeer) + ")"; + private JmsConnection establishAnonymousConnecton(TestAmqpPeer testPeer) throws Exception { + final String remoteURI = "failover:(" + createPeerURI(testPeer).toString() + ")"; ConnectionFactory factory = new JmsConnectionFactory(remoteURI); Connection connection = factory.createConnection(); @@ -158,7 +158,22 @@ public class FailoverRedirectTest extends QpidJmsTestCase { return (JmsConnection) connection; } - private String createPeerURI(TestAmqpPeer peer) { - return "amqp://localhost:" + peer.getServerPort(); + private boolean isExpectedHost(URI expected, URI actual) { + if (!expected.getHost().equals(actual.getHost())) { + LOG.info("Expected host {} but got host {}", expected.getHost(), actual.getHost()); + return false; + } + + if (expected.getPort() != actual.getPort()) { + LOG.info("Expected host {} on port {} but got host {} on port {}", + expected.getHost(), expected.getPort(), actual.getHost(), actual.getPort()); + return false; + } + + return true; + } + + private URI createPeerURI(TestAmqpPeer peer) throws URISyntaxException { + return new URI("amqp://localhost:" + peer.getServerPort()); } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverUriPoolTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverUriPoolTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverUriPoolTest.java index 39917cf..b59e1b6 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverUriPoolTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverUriPoolTest.java @@ -29,6 +29,7 @@ import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -299,6 +300,33 @@ public class FailoverUriPoolTest extends QpidJmsTestCase { } @Test + public void testAddAllHandlesNulls() throws URISyntaxException { + FailoverUriPool pool = new FailoverUriPool(uris, null); + pool.setRandomize(false); + pool.addAll(null); + + assertEquals(uris.size(), pool.size()); + } + + @Test + public void testAddAllHandlesEmpty() throws URISyntaxException { + FailoverUriPool pool = new FailoverUriPool(uris, null); + pool.setRandomize(false); + pool.addAll(Collections.emptyList()); + + assertEquals(uris.size(), pool.size()); + } + + @Test + public void testAddAll() throws URISyntaxException { + FailoverUriPool pool = new FailoverUriPool(null, null); + pool.setRandomize(false); + pool.addAll(uris); + + assertEquals(uris.size(), pool.size()); + } + + @Test public void testRemoveURIFromPool() throws URISyntaxException { FailoverUriPool pool = new FailoverUriPool(uris, null); pool.setRandomize(false); @@ -526,4 +554,16 @@ public class FailoverUriPoolTest extends QpidJmsTestCase { return resolutionWorks; } + + @Test + public void testRemoveAll() throws URISyntaxException { + FailoverUriPool pool = new FailoverUriPool(uris, null); + assertEquals(uris.size(), pool.size()); + + pool.removeAll(); + assertTrue(pool.isEmpty()); + assertEquals(0, pool.size()); + + pool.removeAll(); + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java new file mode 100644 index 0000000..bdfa3e5 --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java @@ -0,0 +1,1142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.failover; + +import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SCHEME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.lang.reflect.Field; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.net.ssl.SSLContext; + +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.JmsDefaultConnectionListener; +import org.apache.qpid.jms.test.QpidJmsTestCase; +import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; +import org.apache.qpid.jms.transports.TransportSslOptions; +import org.apache.qpid.jms.transports.TransportSupport; +import org.apache.qpid.jms.util.PropertyUtil; +import org.apache.qpid.jms.util.URISupport; +import org.apache.qpid.proton.amqp.Symbol; +import org.hamcrest.Matchers; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FailoverWithAmqpOpenProvidedServerListIntegrationTest extends QpidJmsTestCase { + + private static final Logger LOG = LoggerFactory.getLogger(FailoverWithAmqpOpenProvidedServerListIntegrationTest.class); + + private static final String BROKER_JKS_KEYSTORE = "src/test/resources/broker-jks.keystore"; + private static final String BROKER_JKS_TRUSTSTORE = "src/test/resources/broker-jks.truststore"; + private static final String PASSWORD = "password"; + private static final String CLIENT_JKS_KEYSTORE = "src/test/resources/client-jks.keystore"; + private static final String CLIENT_JKS_TRUSTSTORE = "src/test/resources/client-jks.truststore"; + + private static final String JAVAX_NET_SSL_KEY_STORE = "javax.net.ssl.keyStore"; + private static final String JAVAX_NET_SSL_KEY_STORE_PASSWORD = "javax.net.ssl.keyStorePassword"; + private static final String JAVAX_NET_SSL_TRUST_STORE = "javax.net.ssl.trustStore"; + private static final String JAVAX_NET_SSL_TRUST_STORE_PASSWORD = "javax.net.ssl.trustStorePassword"; + + private static final Symbol FAILOVER_SERVER_LIST = Symbol.valueOf("failover-server-list"); + private static final Symbol NETWORK_HOST = Symbol.valueOf("network-host"); + private static final Symbol HOSTNAME = Symbol.valueOf("hostname"); + private static final Symbol PORT = Symbol.valueOf("port"); + + /* + * Verify that when the Open frame contains a failover server list, and the client is configured to + * replace the servers in its existing URI pool, it does so, leaving the server successfully connected + * to plus the announced failover servers. + */ + @Test(timeout = 20000) + public void testFailoverHandlesServerProvidedFailoverListReplace() throws Exception { + doFailoverHandlesServerProvidedFailoverListTestImpl(true); + } + + /* + * Verify that when the Open frame contains a failover server list, and the client is configured to + * add the servers to its existing URI pool, it does so. + */ + @Test(timeout = 20000) + public void testFailoverHandlesServerProvidedFailoverListAdd() throws Exception { + doFailoverHandlesServerProvidedFailoverListTestImpl(false); + } + + private void doFailoverHandlesServerProvidedFailoverListTestImpl(boolean replace) throws Exception { + try (TestAmqpPeer primaryPeer = new TestAmqpPeer(); + TestAmqpPeer backupPeer1 = new TestAmqpPeer(); + TestAmqpPeer backupPeer2 = new TestAmqpPeer();) { + + final URI primaryPeerURI = createPeerURI(primaryPeer); + final URI backupPeer1URI = createPeerURI(backupPeer1); + final URI backupPeer2URI = createPeerURI(backupPeer2); + LOG.info("Primary is at: {}", primaryPeerURI); + LOG.info("Backup1 is at: {}", backupPeer1URI); + LOG.info("Backup2 is at: {}", backupPeer2URI); + + final CountDownLatch connectedToPrimary = new CountDownLatch(1); + final CountDownLatch connectedToBackup1 = new CountDownLatch(1); + final CountDownLatch connectedToBackup2 = new CountDownLatch(1); + + // Expect the authentication as soon as the connection object is created + primaryPeer.expectSaslAnonymous(); + + String failoverParams = null; + if (replace) { + failoverParams = "?failover.maxReconnectAttempts=10&failover.amqpOpenServerListBehaviour=REPLACE"; + } else { + failoverParams = "?failover.maxReconnectAttempts=10&failover.amqpOpenServerListBehaviour=ADD"; + } + + // We only give it the primary/dropping peer details. It can only connect to the backup + // peer by identifying the details in the announced failover-server-list. + final JmsConnection connection = establishAnonymousConnecton(failoverParams, primaryPeer); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection Established: {}", remoteURI); + if (isExpectedHost(primaryPeerURI, remoteURI)) { + connectedToPrimary.countDown(); + } + } + + @Override + public void onConnectionRestored(URI remoteURI) { + LOG.info("Connection Reestablished: {}", remoteURI); + if (isExpectedHost(backupPeer1URI, remoteURI)) { + connectedToBackup1.countDown(); + } else if (isExpectedHost(backupPeer2URI, remoteURI)) { + connectedToBackup2.countDown(); + } + } + }); + + // Verify the existing failover URIs are as expected, the initial peer only + List<URI> beforeOpenFailoverURIs = new ArrayList<>(); + beforeOpenFailoverURIs.add(primaryPeerURI); + + assertFailoverURIList(connection, beforeOpenFailoverURIs); + + // Set the primary up to expect the connection, have the failover list containing the backup1 advertised + Map<Symbol,Object> backupPeer1Details = new HashMap<>(); + backupPeer1Details.put(NETWORK_HOST, "localhost"); + backupPeer1Details.put(PORT, backupPeer1.getServerPort()); + + List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>(); + failoverServerList.add(backupPeer1Details); + + Map<Symbol,Object> server1ConnectionProperties = new HashMap<Symbol, Object>(); + server1ConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList); + + primaryPeer.expectOpen(server1ConnectionProperties); + primaryPeer.expectBegin(); + + // Provoke the actual AMQP connection + connection.start(); + + assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS)); + + // Verify the failover URIs are as expected, now containing initial peer and the backup1 + List<URI> afterOpenFailoverURIs = new ArrayList<>(); + afterOpenFailoverURIs.add(primaryPeerURI); + afterOpenFailoverURIs.add(backupPeer1URI); + + assertFailoverURIList(connection, afterOpenFailoverURIs); + + // Set the backup1 to expect a connection, have the failover list containing the backup2 advertised + Map<Symbol,Object> backupPeer2Details = new HashMap<>(); + backupPeer2Details.put(NETWORK_HOST, "localhost"); + backupPeer2Details.put(PORT, backupPeer2.getServerPort()); + + List<Map<Symbol, Object>> backup1FailoverServerList = new ArrayList<Map<Symbol, Object>>(); + backup1FailoverServerList.add(backupPeer2Details); + + Map<Symbol,Object> backup1serverConnectionProperties = new HashMap<Symbol, Object>(); + backup1serverConnectionProperties.put(FAILOVER_SERVER_LIST, backup1FailoverServerList); + + backupPeer1.expectSaslAnonymous(); + backupPeer1.expectOpen(backup1serverConnectionProperties); + backupPeer1.expectBegin(); + + // Kill the primary peer + primaryPeer.close(); + + assertTrue("Should connect to backup1 peer", connectedToBackup1.await(5, TimeUnit.SECONDS)); + assertEquals("Should not yet connect to backup2 peer", 1, connectedToBackup2.getCount()); + + // Verify the failover URIs are as expected + List<URI> afterFirstReconnectFailoverURIs = new ArrayList<>(); + if (replace) { + // Now containing backup1 and backup2 peers + afterFirstReconnectFailoverURIs.add(backupPeer1URI); + afterFirstReconnectFailoverURIs.add(backupPeer2URI); + } else { + // Now containing primary, backup1, and backup2 peers + afterFirstReconnectFailoverURIs.add(primaryPeerURI); + afterFirstReconnectFailoverURIs.add(backupPeer1URI); + afterFirstReconnectFailoverURIs.add(backupPeer2URI); + } + + assertFailoverURIList(connection, afterFirstReconnectFailoverURIs); + + // Set the backup2 to expect a connection + backupPeer2.expectSaslAnonymous(); + backupPeer2.expectOpen(); + backupPeer2.expectBegin(); + + // Kill the backup1 peer + backupPeer1.close(); + + assertTrue("Should connect to backup2 peer", connectedToBackup2.await(5, TimeUnit.SECONDS)); + + // Verify the failover URIs are as expected + List<URI> afterSecondReconnectFailoverURIs = new ArrayList<>(); + if (replace) { + // Still containing backup1 and backup2 peers + afterSecondReconnectFailoverURIs.add(backupPeer1URI); + afterSecondReconnectFailoverURIs.add(backupPeer2URI); + } else { + // Still containing primary, backup1, and backup2 peers + afterSecondReconnectFailoverURIs.add(primaryPeerURI); + afterSecondReconnectFailoverURIs.add(backupPeer1URI); + afterSecondReconnectFailoverURIs.add(backupPeer2URI); + } + + assertFailoverURIList(connection, afterSecondReconnectFailoverURIs); + + backupPeer2.expectClose(); + connection.close(); + backupPeer2.waitForAllHandlersToComplete(1000); + } + } + + /* + * Verify that when the Open frame contains a failover server list, and the client is configured to ignore it, + * no change occurs in the failover URIs in use by the client after connecting. + */ + @Test(timeout = 20000) + public void testFailoverHandlesServerProvidedFailoverListIgnore() throws Exception { + try (TestAmqpPeer primaryPeer = new TestAmqpPeer();) { + + final URI primaryPeerURI = createPeerURI(primaryPeer); + LOG.info("Peer is at: {}", primaryPeerURI); + + final CountDownLatch connectedToPrimary = new CountDownLatch(1); + + // Expect the authentication as soon as the connection object is created + primaryPeer.expectSaslAnonymous(); + + String failoverParams = "?failover.maxReconnectAttempts=10&failover.amqpOpenServerListBehaviour=IGNORE"; + + // We only give it the primary peer details. It can only connect to the backup + // peer by identifying the details in the announced failover-server-list. + final JmsConnection connection = establishAnonymousConnecton(failoverParams, primaryPeer); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection Established: {}", remoteURI); + if (isExpectedHost(primaryPeerURI, remoteURI)) { + connectedToPrimary.countDown(); + } + } + }); + + // Verify the existing failover URIs are as expected, the initial peer only + List<URI> primaryPeerOnlyFailoverURIs = new ArrayList<>(); + primaryPeerOnlyFailoverURIs.add(primaryPeerURI); + + assertFailoverURIList(connection, primaryPeerOnlyFailoverURIs); + + // Set the primary up to expect the connection, have the failover list containing another server + Map<Symbol,Object> otherPeerDetails = new HashMap<>(); + otherPeerDetails.put(NETWORK_HOST, "testhost"); + otherPeerDetails.put(PORT, "4567"); + + List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>(); + failoverServerList.add(otherPeerDetails); + + Map<Symbol,Object> serverConnectionProperties = new HashMap<Symbol, Object>(); + serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList); + + primaryPeer.expectOpen(serverConnectionProperties); + primaryPeer.expectBegin(); + + // Provoke the actual AMQP connection + connection.start(); + + assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS)); + + // Verify the existing failover URIs are as expected, still the initial peer only + assertFailoverURIList(connection, primaryPeerOnlyFailoverURIs); + + primaryPeer.expectClose(); + connection.close(); + primaryPeer.waitForAllHandlersToComplete(1000); + } + } + + /* + * Verify that when the Open frame contains a failover server list, and it specifies an AMQP hostname in + * a particular servers details, the hostname is used when failover occurs. + */ + @Test(timeout = 20000) + public void testFailoverHandlesServerProvidedFailoverListWithHostname() throws Exception { + try (TestAmqpPeer primaryPeer = new TestAmqpPeer(); + TestAmqpPeer backupPeer = new TestAmqpPeer();) { + + final URI primaryPeerURI = createPeerURI(primaryPeer); + final URI backupPeerURI = createPeerURI(backupPeer); + LOG.info("Primary is at: {}", primaryPeerURI); + LOG.info("Backup is at: {}", backupPeerURI); + + final CountDownLatch connectedToPrimary = new CountDownLatch(1); + final CountDownLatch connectedToBackup = new CountDownLatch(1); + + // Expect the authentication as soon as the connection object is created + primaryPeer.expectSaslAnonymous(); + + // We only give it the primary/dropping peer details. It can only connect to the backup + // peer by identifying the details in the announced failover-server-list. + final JmsConnection connection = establishAnonymousConnecton(null, primaryPeer); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection Established: {}", remoteURI); + if (isExpectedHost(primaryPeerURI, remoteURI)) { + connectedToPrimary.countDown(); + } + } + + @Override + public void onConnectionRestored(URI remoteURI) { + LOG.info("Connection Reestablished: {}", remoteURI); + if (isExpectedHost(backupPeerURI, remoteURI)) { + connectedToBackup.countDown(); + } + } + }); + + // Verify the existing failover URIs are as expected, the initial peer only + List<URI> beforeOpenFailoverURIs = new ArrayList<>(); + beforeOpenFailoverURIs.add(primaryPeerURI); + + assertFailoverURIList(connection, beforeOpenFailoverURIs); + + // Set the primary up to expect the connection, have the failover list containing the backup1 advertised + Map<Symbol,Object> backupPeer1Details = new HashMap<>(); + backupPeer1Details.put(NETWORK_HOST, "localhost"); + backupPeer1Details.put(PORT, backupPeer.getServerPort()); + String myAmqpVhost = "myAmqpHostname"; + backupPeer1Details.put(HOSTNAME, myAmqpVhost); + + List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>(); + failoverServerList.add(backupPeer1Details); + + Map<Symbol,Object> server1ConnectionProperties = new HashMap<Symbol, Object>(); + server1ConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList); + + primaryPeer.expectOpen(server1ConnectionProperties); + primaryPeer.expectBegin(); + + // Provoke the actual AMQP connection + connection.start(); + + assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS)); + + // Verify the failover URIs are as expected, now containing initial peer and the backup (with vhost details) + List<URI> afterOpenFailoverURIs = new ArrayList<>(); + afterOpenFailoverURIs.add(primaryPeerURI); + afterOpenFailoverURIs.add(new URI(backupPeerURI.toString() + "?amqp.vhost=" + myAmqpVhost)); + + assertFailoverURIList(connection, afterOpenFailoverURIs); + + // Verify the client fails over to the advertised backup, and uses the correct AMQP hostname when doing so + backupPeer.expectSaslAnonymous(); + backupPeer.expectOpen(null, Matchers.equalTo(myAmqpVhost), false); + backupPeer.expectBegin(); + + primaryPeer.close(); + + backupPeer.waitForAllHandlersToComplete(3000); + + backupPeer.expectClose(); + connection.close(); + backupPeer.waitForAllHandlersToComplete(1000); + } + } + + /* + * Verify that when the Open frame contains a failover server list and we are connected via SSL configured with + * system properties the redirect uses those properties to connect to the new host. + */ + @Test(timeout = 20000) + public void testFailoverUsingSSLConfiguredBySystemProperties() throws Exception { + TransportSslOptions serverSslOptions = new TransportSslOptions(); + serverSslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE); + serverSslOptions.setTrustStoreLocation(BROKER_JKS_TRUSTSTORE); + serverSslOptions.setKeyStorePassword(PASSWORD); + serverSslOptions.setTrustStorePassword(PASSWORD); + serverSslOptions.setVerifyHost(false); + + SSLContext serverSslContext = TransportSupport.createSslContext(serverSslOptions); + + setSslSystemPropertiesForCurrentTest(CLIENT_JKS_KEYSTORE, PASSWORD, CLIENT_JKS_TRUSTSTORE, PASSWORD); + + try (TestAmqpPeer primaryPeer = new TestAmqpPeer(serverSslContext, false); + TestAmqpPeer backupPeer = new TestAmqpPeer(serverSslContext, false);) { + + final URI primaryPeerURI = createPeerURI(primaryPeer); + final URI backupPeerURI = createPeerURI(backupPeer); + LOG.info("Primary is at: {}", primaryPeerURI); + LOG.info("Backup is at: {}", backupPeerURI); + + final CountDownLatch connectedToPrimary = new CountDownLatch(1); + final CountDownLatch connectedToBackup = new CountDownLatch(1); + + // Expect the authentication as soon as the connection object is created + primaryPeer.expectSaslAnonymous(); + + // We only give it the primary/dropping peer details. It can only connect to the backup + // peer by identifying the details in the announced failover-server-list. + final JmsConnection connection = establishAnonymousConnecton(null, null, true, primaryPeer); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection Established: {}", remoteURI); + if (isExpectedHost(primaryPeerURI, remoteURI)) { + connectedToPrimary.countDown(); + } + } + + @Override + public void onConnectionRestored(URI remoteURI) { + LOG.info("Connection Reestablished: {}", remoteURI); + if (isExpectedHost(backupPeerURI, remoteURI)) { + connectedToBackup.countDown(); + } + } + }); + + // Verify the existing failover URIs are as expected, the initial peer only + List<URI> beforeOpenFailoverURIs = new ArrayList<>(); + beforeOpenFailoverURIs.add(primaryPeerURI); + + assertFailoverURIList(connection, beforeOpenFailoverURIs); + + // Set the primary up to expect the connection, have the failover list containing the backup advertised + Map<Symbol,Object> backupPeerDetails = new HashMap<>(); + backupPeerDetails.put(NETWORK_HOST, "localhost"); + backupPeerDetails.put(PORT, backupPeer.getServerPort()); + + List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>(); + failoverServerList.add(backupPeerDetails); + + Map<Symbol,Object> serverConnectionProperties = new HashMap<Symbol, Object>(); + serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList); + + primaryPeer.expectOpen(serverConnectionProperties); + primaryPeer.expectBegin(); + + // Provoke the actual AMQP connection + connection.start(); + + assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS)); + + // Verify the failover URIs are as expected, now containing initial peer and the backup1 + List<URI> afterOpenFailoverURIs = new ArrayList<>(); + afterOpenFailoverURIs.add(primaryPeerURI); + afterOpenFailoverURIs.add(backupPeerURI); + + assertFailoverURIList(connection, afterOpenFailoverURIs); + + // Verify the client fails over to the advertised backup, and uses the correct AMQP hostname when doing so + backupPeer.expectSaslAnonymous(); + backupPeer.expectOpen(); + backupPeer.expectBegin(); + + primaryPeer.close(); + + assertTrue("Should connect to backup peer", connectedToBackup.await(5, TimeUnit.SECONDS)); + + backupPeer.waitForAllHandlersToComplete(3000); + + backupPeer.expectClose(); + connection.close(); + backupPeer.waitForAllHandlersToComplete(1000); + } + } + + /* + * Verify that when the Open frame contains a failover server list and we are connected via SSL configured with + * URI options on the AMQP URI the redirect uses those properties to connect to the new host. + */ + @Test(timeout = 20000) + public void testFailoverUsingSSLConfiguredByTransportOptions() throws Exception { + TransportSslOptions sslOptions = new TransportSslOptions(); + sslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE); + sslOptions.setKeyStorePassword(PASSWORD); + sslOptions.setVerifyHost(false); + + SSLContext serverSslContext = TransportSupport.createSslContext(sslOptions); + + try (TestAmqpPeer primaryPeer = new TestAmqpPeer(serverSslContext, false); + TestAmqpPeer backupPeer = new TestAmqpPeer(serverSslContext, false);) { + + final URI primaryPeerURI = createPeerURI(primaryPeer); + final URI backupPeerURI = createPeerURI(backupPeer); + LOG.info("Primary is at: {}", primaryPeerURI); + LOG.info("Backup is at: {}", backupPeerURI); + + final CountDownLatch connectedToPrimary = new CountDownLatch(1); + final CountDownLatch connectedToBackup = new CountDownLatch(1); + + // Expect the authentication as soon as the connection object is + // created + primaryPeer.expectSaslAnonymous(); + + String connectionOptions = "transport.trustStoreLocation=" + CLIENT_JKS_TRUSTSTORE + "&" + + "transport.trustStorePassword=" + PASSWORD; + + Map<String, String> expectedUriOptions = new LinkedHashMap<>(); + expectedUriOptions.put("transport.trustStoreLocation", CLIENT_JKS_TRUSTSTORE); + expectedUriOptions.put("transport.trustStorePassword", PASSWORD); + + // We only give it the primary/dropping peer details. It can only + // connect to the backup + // peer by identifying the details in the announced + // failover-server-list. + final JmsConnection connection = establishAnonymousConnecton(connectionOptions, null, true, primaryPeer); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection Established: {}", remoteURI); + if (isExpectedHost(primaryPeerURI, remoteURI)) { + connectedToPrimary.countDown(); + } + } + + @Override + public void onConnectionRestored(URI remoteURI) { + LOG.info("Connection Reestablished: {}", remoteURI); + if (isExpectedHost(backupPeerURI, remoteURI)) { + connectedToBackup.countDown(); + } + } + }); + + // Verify the existing failover URIs are as expected, the initial + // peer only + List<URI> beforeOpenFailoverURIs = new ArrayList<>(); + beforeOpenFailoverURIs.add(URISupport.applyParameters(primaryPeerURI, expectedUriOptions)); + + assertFailoverURIList(connection, beforeOpenFailoverURIs); + + // Set the primary up to expect the connection, have the failover + // list containing the backup advertised + Map<Symbol, Object> backupPeerDetails = new HashMap<>(); + backupPeerDetails.put(NETWORK_HOST, "localhost"); + backupPeerDetails.put(PORT, backupPeer.getServerPort()); + + List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>(); + failoverServerList.add(backupPeerDetails); + + Map<Symbol, Object> serverConnectionProperties = new HashMap<Symbol, Object>(); + serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList); + + primaryPeer.expectOpen(serverConnectionProperties); + primaryPeer.expectBegin(); + + // Provoke the actual AMQP connection + connection.start(); + + assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS)); + + // Verify the failover URIs are as expected, now containing initial + // peer and the backup1 + List<URI> afterOpenFailoverURIs = new ArrayList<>(); + afterOpenFailoverURIs.add(URISupport.applyParameters(primaryPeerURI, expectedUriOptions)); + afterOpenFailoverURIs.add(URISupport.applyParameters(backupPeerURI, expectedUriOptions)); + + assertFailoverURIList(connection, afterOpenFailoverURIs); + + // Verify the client fails over to the advertised backup, and uses + // the correct AMQP hostname when doing so + backupPeer.expectSaslAnonymous(); + backupPeer.expectOpen(); + backupPeer.expectBegin(); + + primaryPeer.close(); + + assertTrue("Should connect to backup peer", connectedToBackup.await(5, TimeUnit.SECONDS)); + + backupPeer.waitForAllHandlersToComplete(3000); + + backupPeer.expectClose(); + connection.close(); + backupPeer.waitForAllHandlersToComplete(1000); + } + } + + /* + * Verify that when the Open frame contains a failover server list and we are connected via SSL + * configured with the Failover URI with nested options the redirect uses those properties to + * connect to the new host. + */ + @Test(timeout = 20000) + public void testFailoverUsingSSLConfiguredByNestedTransportOptions() throws Exception { + TransportSslOptions sslOptions = new TransportSslOptions(); + sslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE); + sslOptions.setKeyStorePassword(PASSWORD); + sslOptions.setVerifyHost(false); + + SSLContext serverSslContext = TransportSupport.createSslContext(sslOptions); + + try (TestAmqpPeer primaryPeer = new TestAmqpPeer(serverSslContext, false); + TestAmqpPeer backupPeer = new TestAmqpPeer(serverSslContext, false);) { + + final URI primaryPeerURI = createPeerURI(primaryPeer); + final URI backupPeerURI = createPeerURI(backupPeer); + LOG.info("Primary is at: {}", primaryPeerURI); + LOG.info("Backup is at: {}", backupPeerURI); + + final CountDownLatch connectedToPrimary = new CountDownLatch(1); + final CountDownLatch connectedToBackup = new CountDownLatch(1); + + // Expect the authentication as soon as the connection object is + // created + primaryPeer.expectSaslAnonymous(); + + String failoverOptions = "?failover.nested.transport.trustStoreLocation=" + CLIENT_JKS_TRUSTSTORE + "&" + + "failover.nested.transport.trustStorePassword=" + PASSWORD; + + Map<String, String> expectedUriOptions = new LinkedHashMap<>(); + expectedUriOptions.put("transport.trustStoreLocation", CLIENT_JKS_TRUSTSTORE); + expectedUriOptions.put("transport.trustStorePassword", PASSWORD); + + // We only give it the primary/dropping peer details. It can only + // connect to the backup + // peer by identifying the details in the announced + // failover-server-list. + final JmsConnection connection = establishAnonymousConnecton(null, failoverOptions, true, primaryPeer); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection Established: {}", remoteURI); + if (isExpectedHost(primaryPeerURI, remoteURI)) { + connectedToPrimary.countDown(); + } + } + + @Override + public void onConnectionRestored(URI remoteURI) { + LOG.info("Connection Reestablished: {}", remoteURI); + if (isExpectedHost(backupPeerURI, remoteURI)) { + connectedToBackup.countDown(); + } + } + }); + + // Verify the existing failover URIs are as expected, the initial + // peer only + List<URI> beforeOpenFailoverURIs = new ArrayList<>(); + beforeOpenFailoverURIs.add(URISupport.applyParameters(primaryPeerURI, expectedUriOptions)); + + assertFailoverURIList(connection, beforeOpenFailoverURIs); + + // Set the primary up to expect the connection, have the failover + // list containing the backup advertised + Map<Symbol, Object> backupPeerDetails = new HashMap<>(); + backupPeerDetails.put(NETWORK_HOST, "localhost"); + backupPeerDetails.put(PORT, backupPeer.getServerPort()); + + List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>(); + failoverServerList.add(backupPeerDetails); + + Map<Symbol, Object> serverConnectionProperties = new HashMap<Symbol, Object>(); + serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList); + + primaryPeer.expectOpen(serverConnectionProperties); + primaryPeer.expectBegin(); + + // Provoke the actual AMQP connection + connection.start(); + + assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS)); + + // Verify the failover URIs are as expected, now containing initial + // peer and the backup1 + List<URI> afterOpenFailoverURIs = new ArrayList<>(); + afterOpenFailoverURIs.add(URISupport.applyParameters(primaryPeerURI, expectedUriOptions)); + afterOpenFailoverURIs.add(URISupport.applyParameters(backupPeerURI, expectedUriOptions)); + + assertFailoverURIList(connection, afterOpenFailoverURIs); + + // Verify the client fails over to the advertised backup, and uses + // the correct AMQP hostname when doing so + backupPeer.expectSaslAnonymous(); + backupPeer.expectOpen(); + backupPeer.expectBegin(); + + primaryPeer.close(); + + assertTrue("Should connect to backup peer", connectedToBackup.await(5, TimeUnit.SECONDS)); + + backupPeer.waitForAllHandlersToComplete(3000); + + backupPeer.expectClose(); + connection.close(); + backupPeer.waitForAllHandlersToComplete(1000); + } + } + + /* + * Verify that when the Open frame contains a failover server list and we are connected via SSL + * configured with with a custom SSLContext the redirect uses those properties to connect to + * the new host. + */ + @Test(timeout = 20000) + public void testFailoverUsingSSLConfiguredByCustomSSLContext() throws Exception { + TransportSslOptions serverSslOptions = new TransportSslOptions(); + serverSslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE); + serverSslOptions.setTrustStoreLocation(BROKER_JKS_TRUSTSTORE); + serverSslOptions.setKeyStorePassword(PASSWORD); + serverSslOptions.setTrustStorePassword(PASSWORD); + serverSslOptions.setVerifyHost(false); + + SSLContext serverSslContext = TransportSupport.createSslContext(serverSslOptions); + + TransportSslOptions clientSslOptions = new TransportSslOptions(); + clientSslOptions.setKeyStoreLocation(CLIENT_JKS_KEYSTORE); + clientSslOptions.setTrustStoreLocation(CLIENT_JKS_TRUSTSTORE); + clientSslOptions.setKeyStorePassword(PASSWORD); + clientSslOptions.setTrustStorePassword(PASSWORD); + + SSLContext clientSslContext = TransportSupport.createSslContext(clientSslOptions); + + try (TestAmqpPeer primaryPeer = new TestAmqpPeer(serverSslContext, false); + TestAmqpPeer backupPeer = new TestAmqpPeer(serverSslContext, false);) { + + final URI primaryPeerURI = createPeerURI(primaryPeer); + final URI backupPeerURI = createPeerURI(backupPeer); + LOG.info("Primary is at: {}", primaryPeerURI); + LOG.info("Backup is at: {}", backupPeerURI); + + final CountDownLatch connectedToPrimary = new CountDownLatch(1); + final CountDownLatch connectedToBackup = new CountDownLatch(1); + + // Expect the authentication as soon as the connection object is + // created + primaryPeer.expectSaslAnonymous(); + + // We only give it the primary/dropping peer details. It can only + // connect to the backup peer by identifying the details in the announced + // failover-server-list. + final JmsConnectionFactory factory = new JmsConnectionFactory( + "failover:(amqps://localhost:" + primaryPeer.getServerPort() + ")"); + factory.setSslContext(clientSslContext); + + JmsConnection connection = (JmsConnection) factory.createConnection(); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection Established: {}", remoteURI); + if (isExpectedHost(primaryPeerURI, remoteURI)) { + connectedToPrimary.countDown(); + } + } + + @Override + public void onConnectionRestored(URI remoteURI) { + LOG.info("Connection Reestablished: {}", remoteURI); + if (isExpectedHost(backupPeerURI, remoteURI)) { + connectedToBackup.countDown(); + } + } + }); + + // Verify the existing failover URIs are as expected, the initial peer only + List<URI> beforeOpenFailoverURIs = new ArrayList<>(); + beforeOpenFailoverURIs.add(primaryPeerURI); + + assertFailoverURIList(connection, beforeOpenFailoverURIs); + + // Set the primary up to expect the connection, have the failover + // list containing the backup advertised + Map<Symbol, Object> backupPeerDetails = new HashMap<>(); + backupPeerDetails.put(NETWORK_HOST, "localhost"); + backupPeerDetails.put(PORT, backupPeer.getServerPort()); + + List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>(); + failoverServerList.add(backupPeerDetails); + + Map<Symbol, Object> serverConnectionProperties = new HashMap<Symbol, Object>(); + serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList); + + primaryPeer.expectOpen(serverConnectionProperties); + primaryPeer.expectBegin(); + + // Provoke the actual AMQP connection + connection.start(); + + assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS)); + + // Verify the failover URIs are as expected, now containing initial + // peer and the backup1 + List<URI> afterOpenFailoverURIs = new ArrayList<>(); + afterOpenFailoverURIs.add(primaryPeerURI); + afterOpenFailoverURIs.add(backupPeerURI); + + assertFailoverURIList(connection, afterOpenFailoverURIs); + + // Verify the client fails over to the advertised backup, and uses + // the correct AMQP hostname when doing so + backupPeer.expectSaslAnonymous(); + backupPeer.expectOpen(); + backupPeer.expectBegin(); + + primaryPeer.close(); + + assertTrue("Should connect to backup peer", connectedToBackup.await(5, TimeUnit.SECONDS)); + + backupPeer.waitForAllHandlersToComplete(3000); + + backupPeer.expectClose(); + connection.close(); + backupPeer.waitForAllHandlersToComplete(1000); + } + } + + /* + * Verify that when the Open frame contains a failover server list and we are connected via SSL + * that a remote listed in the open frame failover list is ignored when insecure redirects are + * prohibited. + */ + @Test(timeout = 20000) + public void testFailoverIgnoresInsecureServerWhenNotConfiguredToAllow() throws Exception { + doTestFailoverHandlingOfInsecureRedirectAdvertisement(false); + } + + /* + * Verify that when the Open frame contains a failover server list and we are connected via SSL + * that a remote listed in the open frame failover list is accepted when insecure redirects are + * allowed. + */ + @Test(timeout = 20000) + public void testFailoverAcceptsInsecureServerWhenConfiguredToAllow() throws Exception { + doTestFailoverHandlingOfInsecureRedirectAdvertisement(true); + } + + private void doTestFailoverHandlingOfInsecureRedirectAdvertisement(boolean allow) throws Exception { + + TransportSslOptions serverSslOptions = new TransportSslOptions(); + serverSslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE); + serverSslOptions.setTrustStoreLocation(BROKER_JKS_TRUSTSTORE); + serverSslOptions.setKeyStorePassword(PASSWORD); + serverSslOptions.setTrustStorePassword(PASSWORD); + serverSslOptions.setVerifyHost(false); + + SSLContext serverSslContext = TransportSupport.createSslContext(serverSslOptions); + + setSslSystemPropertiesForCurrentTest(CLIENT_JKS_KEYSTORE, PASSWORD, CLIENT_JKS_TRUSTSTORE, PASSWORD); + + try (TestAmqpPeer primaryPeer = new TestAmqpPeer(serverSslContext, false)) { + + final URI primaryPeerURI = createPeerURI(primaryPeer); + LOG.info("Primary is at: {}", primaryPeerURI); + + final CountDownLatch connectedToPrimary = new CountDownLatch(1); + + // Expect the authentication as soon as the connection object is created + primaryPeer.expectSaslAnonymous(); + + String failoverOptions = "failover.nested.amqp.allowNonSecureRedirects=" + allow; + String connectionOptions = "amqp.allowNonSecureRedirects=" + allow; + + // We only give it the primary/dropping peer details. It can only connect to the backup + // peer by identifying the details in the announced failover-server-list. + final JmsConnection connection = establishAnonymousConnecton(null, failoverOptions, true, primaryPeer); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection Established: {}", remoteURI); + if (isExpectedHost(primaryPeerURI, remoteURI)) { + connectedToPrimary.countDown(); + } + } + + @Override + public void onConnectionRestored(URI remoteURI) { + LOG.info("Connection Reestablished: {}", remoteURI); + } + }); + + // Verify the existing failover URIs are as expected, the initial peer only + List<URI> beforeOpenFailoverURIs = new ArrayList<>(); + beforeOpenFailoverURIs.add(new URI(primaryPeerURI + "?" + connectionOptions)); + + assertFailoverURIList(connection, beforeOpenFailoverURIs); + + // Set the primary up to expect the connection, have the failover list containing the backup advertised + Map<Symbol,Object> backupPeerDetails = new HashMap<>(); + backupPeerDetails.put(NETWORK_HOST, "localhost"); + backupPeerDetails.put(PORT, 5673); + backupPeerDetails.put(SCHEME, "amqp"); + + List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>(); + failoverServerList.add(backupPeerDetails); + + Map<Symbol,Object> serverConnectionProperties = new HashMap<Symbol, Object>(); + serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList); + + primaryPeer.expectOpen(serverConnectionProperties); + primaryPeer.expectBegin(); + primaryPeer.expectClose(); + + // Provoke the actual AMQP connection + connection.start(); + + assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS)); + + // Verify the failover URIs are as expected, now containing initial peer and the backup1 + List<URI> afterOpenFailoverURIs = new ArrayList<>(); + afterOpenFailoverURIs.add(new URI(primaryPeerURI + "?" + connectionOptions)); + if (allow) { + afterOpenFailoverURIs.add(new URI("amqp://localhost:5673?" + connectionOptions)); + } + + assertFailoverURIList(connection, afterOpenFailoverURIs); + + connection.close(); + + primaryPeer.waitForAllHandlersToComplete(1000); + } + } + + /* + * Verify that when the Open frame contains a failover server list and we are connected via + * the 'amqp' transport and the redirect contains a 'ws' scheme that failover reconnect list + * is updated to contain the 'amqpws' redirect. + */ + @Test(timeout = 20000) + public void testFailoverAcceptsUpdateUsingTransportSchemeWS() throws Exception { + doTestFailoverAcceptsUpdateUsingTransportSchemes("ws", "amqpws"); + } + + /* + * Verify that when the Open frame contains a failover server list and we are connected via + * the 'amqp' transport and the redirect contains a 'ws' scheme that failover reconnect list + * is updated to contain the 'amqpws' redirect. + */ + @Test(timeout = 20000) + public void testFailoverAcceptsUpdateUsingTransportSchemeWSS() throws Exception { + doTestFailoverAcceptsUpdateUsingTransportSchemes("wss", "amqpwss"); + } + + private void doTestFailoverAcceptsUpdateUsingTransportSchemes(String transportScheme, String expected) throws Exception { + + TransportSslOptions serverSslOptions = new TransportSslOptions(); + serverSslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE); + serverSslOptions.setTrustStoreLocation(BROKER_JKS_TRUSTSTORE); + serverSslOptions.setKeyStorePassword(PASSWORD); + serverSslOptions.setTrustStorePassword(PASSWORD); + serverSslOptions.setVerifyHost(false); + + SSLContext serverSslContext = TransportSupport.createSslContext(serverSslOptions); + + setSslSystemPropertiesForCurrentTest(CLIENT_JKS_KEYSTORE, PASSWORD, CLIENT_JKS_TRUSTSTORE, PASSWORD); + + try (TestAmqpPeer primaryPeer = new TestAmqpPeer(serverSslContext, false)) { + + final URI primaryPeerURI = createPeerURI(primaryPeer); + LOG.info("Primary is at: {}", primaryPeerURI); + + final CountDownLatch connectedToPrimary = new CountDownLatch(1); + + // Expect the authentication as soon as the connection object is created + primaryPeer.expectSaslAnonymous(); + + // Allow non-secure redirects for this test for simplicity. + String failoverOptions = "failover.nested.amqp.allowNonSecureRedirects=true"; + String connectionOptions = "amqp.allowNonSecureRedirects=true"; + + // We only give it the primary/dropping peer details. It can only connect to the backup + // peer by identifying the details in the announced failover-server-list. + final JmsConnection connection = establishAnonymousConnecton(null, failoverOptions, true, primaryPeer); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection Established: {}", remoteURI); + if (isExpectedHost(primaryPeerURI, remoteURI)) { + connectedToPrimary.countDown(); + } + } + + @Override + public void onConnectionRestored(URI remoteURI) { + LOG.info("Connection Reestablished: {}", remoteURI); + } + }); + + // Verify the existing failover URIs are as expected, the initial peer only + List<URI> beforeOpenFailoverURIs = new ArrayList<>(); + beforeOpenFailoverURIs.add(new URI(primaryPeerURI + "?" + connectionOptions)); + + assertFailoverURIList(connection, beforeOpenFailoverURIs); + + // Set the primary up to expect the connection, have the failover list containing the backup advertised + Map<Symbol,Object> backupPeerDetails = new HashMap<>(); + backupPeerDetails.put(NETWORK_HOST, "localhost"); + backupPeerDetails.put(PORT, 5673); + backupPeerDetails.put(SCHEME, transportScheme); + + List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>(); + failoverServerList.add(backupPeerDetails); + + Map<Symbol,Object> serverConnectionProperties = new HashMap<Symbol, Object>(); + serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList); + + primaryPeer.expectOpen(serverConnectionProperties); + primaryPeer.expectBegin(); + primaryPeer.expectClose(); + + // Provoke the actual AMQP connection + connection.start(); + + assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS)); + + // Verify the failover URIs are as expected, now containing initial peer and the backup1 + List<URI> afterOpenFailoverURIs = new ArrayList<>(); + afterOpenFailoverURIs.add(new URI(primaryPeerURI + "?" + connectionOptions)); + afterOpenFailoverURIs.add(new URI(expected + "://localhost:5673?" + connectionOptions)); + + assertFailoverURIList(connection, afterOpenFailoverURIs); + + connection.close(); + + primaryPeer.waitForAllHandlersToComplete(1000); + } + } + + private void setSslSystemPropertiesForCurrentTest(String keystore, String keystorePassword, String truststore, String truststorePassword) { + setTestSystemProperty(JAVAX_NET_SSL_KEY_STORE, keystore); + setTestSystemProperty(JAVAX_NET_SSL_KEY_STORE_PASSWORD, keystorePassword); + setTestSystemProperty(JAVAX_NET_SSL_TRUST_STORE, truststore); + setTestSystemProperty(JAVAX_NET_SSL_TRUST_STORE_PASSWORD, truststorePassword); + } + + private void assertFailoverURIList(JmsConnection connection, List<URI> expectedURIs) throws Exception { + FailoverProvider provider = getFailoverProvider(connection); + + Field urisField = provider.getClass().getDeclaredField("uris"); + urisField.setAccessible(true); + Object urisObj = urisField.get(provider); + + assertNotNull("Expected to get a uri pool instance", urisObj); + assertTrue("Unexpected uri pool type: " + urisObj.getClass(), urisObj instanceof FailoverUriPool); + FailoverUriPool uriPool = (FailoverUriPool) urisObj; + + List<URI> current = uriPool.getList(); + assertEquals(expectedURIs, current); + } + + private FailoverProvider getFailoverProvider(JmsConnection connection) throws Exception { + Field field = connection.getClass().getDeclaredField("provider"); + field.setAccessible(true); + Object providerObj = field.get(connection); + + assertNotNull("Expected to get a provdier instance", providerObj); + assertTrue("Unexpected provider type: " + providerObj.getClass(), providerObj instanceof FailoverProvider); + FailoverProvider provider = (FailoverProvider) providerObj; + return provider; + } + + private JmsConnection establishAnonymousConnecton(String failoverParams, TestAmqpPeer... peers) throws Exception { + return establishAnonymousConnecton(null, failoverParams, peers); + } + + private JmsConnection establishAnonymousConnecton(String connectionParams, String failoverParams, TestAmqpPeer... peers) throws Exception { + return establishAnonymousConnecton(connectionParams, failoverParams, false, peers); + } + + private JmsConnection establishAnonymousConnecton(String connectionParams, String failoverParams, boolean ssl, TestAmqpPeer... peers) throws Exception { + if (peers.length == 0) { + throw new IllegalArgumentException("No test peers were given, at least 1 required"); + } + + String remoteURI = "failover:("; + boolean first = true; + for (TestAmqpPeer peer : peers) { + if (!first) { + remoteURI += ","; + } + remoteURI += createPeerURI(peer, connectionParams).toString(); + first = false; + } + + if (failoverParams == null) { + remoteURI += ")?failover.maxReconnectAttempts=10"; + } else { + remoteURI += ")" + (failoverParams.startsWith("?") ? "" : "?") + failoverParams; + } + + ConnectionFactory factory = new JmsConnectionFactory(remoteURI); + Connection connection = factory.createConnection(); + + return (JmsConnection) connection; + } + + private URI createPeerURI(TestAmqpPeer peer) throws Exception { + return createPeerURI(peer, null); + } + + private URI createPeerURI(TestAmqpPeer peer, String params) throws Exception { + String scheme = peer.isSSL() ? "amqps" : "amqp"; + URI result = new URI(scheme, "localhost:" + peer.getServerPort(), null, null, null); + + Map<String, String> queryParameters = PropertyUtil.parseQuery(params); + + return URISupport.applyParameters(result, queryParameters); + } + + private boolean isExpectedHost(URI expected, URI actual) { + if (!expected.getHost().equals(actual.getHost())) { + LOG.info("Expected host {} but got host {}", expected.getHost(), actual.getHost()); + return false; + } + + if (expected.getPort() != actual.getPort()) { + LOG.info("Expected host {} on port {} but got host {} on port {}", + expected.getHost(), expected.getPort(), actual.getHost(), actual.getPort()); + return false; + } + + return true; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index f751beb..719fbc7 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -19,8 +19,8 @@ package org.apache.qpid.jms.test.testpeer; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DYNAMIC_NODE_LIFETIME_POLICY; -import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.GLOBAL; +import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.equalTo; @@ -227,6 +227,11 @@ public class TestAmqpPeer implements AutoCloseable return _driverRunnable.getClientSocket(); } + public boolean isSSL() + { + return _driverRunnable.isSSL(); + } + public int getAdvertisedIdleTimeout() { return advertisedIdleTimeout; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java index 0f01256..eb0b2a0 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java @@ -258,4 +258,8 @@ class TestAmqpPeerRunner implements Runnable public boolean isSendSaslHeaderPreEmptively() { return _sendSaslHeaderPreEmptively; } + + public boolean isSSL() { + return _serverSocket instanceof SSLServerSocket; + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpToMockServerTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpToMockServerTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpToMockServerTest.java index 718c422..14418ef 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpToMockServerTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpToMockServerTest.java @@ -217,7 +217,7 @@ public class NettyTcpToMockServerTest extends QpidJmsTestCase { } } - @Test(timeout = 60 * 1000) + @Test(timeout = 20 * 1000) public void testConnectToWSServerWhenRedirectedWithNewPath() throws Exception { try (NettySimpleAmqpServer primary = createWSServer(createServerOptions()); NettySimpleAmqpServer redirect = createWSServer(createServerOptions())) { @@ -314,7 +314,7 @@ public class NettyTcpToMockServerTest extends QpidJmsTestCase { protected URI createFailoverURI(NettyServer server) throws Exception { URI serverURI = createConnectionURI(server, null); - String failoverURI = "failover:(" + serverURI.toString() + ")"; + String failoverURI = "failover:(" + serverURI.toString() + "?amqp.vhost=localhost)?failover.maxReconnectAttempts=3"; return new URI(failoverURI); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/URISupportTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/URISupportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/URISupportTest.java index ba2674e..3e16c50 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/URISupportTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/URISupportTest.java @@ -272,14 +272,29 @@ public class URISupportTest { parameters.put("t.proxyHost", "localhost"); parameters.put("t.proxyPort", "80"); + uri = URISupport.applyParameters(uri, parameters, "t."); + Map<String,String> appliedParameters = URISupport.parseParameters(uri); + assertEquals("all params applied with no prefix", 3, appliedParameters.size()); + verifyParams(appliedParameters); + } + + @Test + public void testApplyParametersOverwritesOriginalParameters() throws Exception { + URI uri = new URI("http://0.0.0.0:61616?proxyHost=host&proxyPort=21&timeout=1000"); + + Map<String,String> parameters = new HashMap<String, String>(); + parameters.put("proxyHost", "localhost"); + parameters.put("proxyPort", "80"); + uri = URISupport.applyParameters(uri, parameters); Map<String,String> appliedParameters = URISupport.parseParameters(uri); assertEquals("all params applied with no prefix", 3, appliedParameters.size()); + verifyParams(appliedParameters); } private void verifyParams(Map<String,String> parameters) { - assertEquals(parameters.get("proxyHost"), "localhost"); - assertEquals(parameters.get("proxyPort"), "80"); + assertEquals("localhost", parameters.get("proxyHost")); + assertEquals("80", parameters.get("proxyPort")); } //---- isCompositeURI ----------------------------------------------------// --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
