Repository: qpid-jms Updated Branches: refs/heads/master 2a5f89a1c -> 14e4a92b4
https://issues.apache.org/jira/browse/QPIDJMS-44 Add in support for handling connection redirection errors and auto reconnect for the failover provider when a redirect is detected. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/14e4a92b Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/14e4a92b Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/14e4a92b Branch: refs/heads/master Commit: 14e4a92b4621a746586f5b8911d1cef6f1673d2b Parents: 2a5f89a Author: Timothy Bish <[email protected]> Authored: Fri Apr 24 17:14:05 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Fri Apr 24 17:14:05 2015 -0400 ---------------------------------------------------------------------- .../provider/ProviderRedirectedException.java | 64 ++++++++ .../jms/provider/amqp/AmqpAbstractResource.java | 46 ++++++ .../qpid/jms/provider/amqp/AmqpProvider.java | 2 - .../jms/provider/failover/FailoverProvider.java | 14 +- .../qpid/jms/JmsDefaultConnectionListener.java | 46 ++++++ .../integration/ConnectionIntegrationTest.java | 56 ++++++- .../FailedConnectionsIntegrationTest.java | 119 ++++++++++++++ .../provider/failover/FailoverRedirectTest.java | 159 +++++++++++++++++++ .../qpid/jms/test/testpeer/TestAmqpPeer.java | 64 +++++++- 9 files changed, 565 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/14e4a92b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderRedirectedException.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderRedirectedException.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderRedirectedException.java new file mode 100644 index 0000000..f521a38 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderRedirectedException.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider; + +import java.io.IOException; + +/** + * {@link IOException} derivative that defines that the remote peer has requested that this + * connection be redirected to some alternative peer. + */ +public class ProviderRedirectedException extends IOException { + + private static final long serialVersionUID = 5872211116061710369L; + + private final String hostname; + private final String networkHost; + private final int port; + + /** + * @param reason + */ + public ProviderRedirectedException(String reason, String hostname, String networkHost, int port) { + super(reason); + + this.hostname = hostname; + this.networkHost = networkHost; + this.port = port; + } + + /** + * @return the host name of the container being redirected to. + */ + public String getHostname() { + return hostname; + } + + /** + * @return the DNS host name or IP address of the peer this connection is being redirected to. + */ + public String getNetworkHost() { + return networkHost; + } + + /** + * @return the port number on the peer this connection is being redirected to. + */ + public int getPort() { + return port; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/14e4a92b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java index 204f6fe..5216276 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java @@ -17,6 +17,7 @@ package org.apache.qpid.jms.provider.amqp; import java.io.IOException; +import java.util.Map; import javax.jms.InvalidDestinationException; import javax.jms.JMSException; @@ -25,8 +26,10 @@ import javax.jms.JMSSecurityException; import org.apache.qpid.jms.meta.JmsConnectionInfo; import org.apache.qpid.jms.meta.JmsResource; import org.apache.qpid.jms.provider.AsyncResult; +import org.apache.qpid.jms.provider.ProviderRedirectedException; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transport.AmqpError; +import org.apache.qpid.proton.amqp.transport.ConnectionError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Endpoint; import org.apache.qpid.proton.engine.EndpointState; @@ -241,6 +244,8 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp remoteError = new JMSSecurityException(message); } else if (error.equals(AmqpError.NOT_FOUND)) { remoteError = new InvalidDestinationException(message); + } else if (error.equals(ConnectionError.REDIRECT)) { + remoteError = createRedirectException(error, message, getEndpoint().getRemoteCondition()); } else { remoteError = new JMSException(message); } @@ -331,6 +336,47 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp } /** + * When a redirect type exception is received this method is called to create the + * appropriate redirect exception type containing the error details needed. + * + * @param error + * the Symbol that defines the redirection error type. + * @param message + * the basic error message that should used or amended for the returned exception. + * @param condition + * the ErrorCondition that describes the redirection. + * + * @returns an Exception that captures the details of the redirection error. + */ + @SuppressWarnings("unchecked") + protected Exception createRedirectException(Symbol error, String message, ErrorCondition condition) { + Exception result = null; + Map<String, Object> info = condition.getInfo(); + + if (info == null) { + result = new IOException(message + " : Redirection information not set."); + } else { + String hostname = (String) info.get("hostname"); + + String networkHost = (String) info.get("network-host"); + if (networkHost == null || networkHost.isEmpty()) { + result = new IOException(message + " : Redirection information not set."); + } + + int port = 0; + try { + port = Integer.valueOf(info.get("port").toString()); + } catch (Exception ex) { + result = new IOException(message + " : Redirection information not set."); + } + + result = new ProviderRedirectedException(message, hostname, networkHost, port); + } + + return result; + } + + /** * When aborting the open operation, and there isnt an error condition, * provided by the peer, the returned exception will be used instead. * A subclass may override this method to provide alternative behaviour. http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/14e4a92b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index 7191ddb..2314269 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -734,12 +734,10 @@ public class AmqpProvider implements Provider, TransportListener { amqpResource.processRemoteOpen(this); break; case LINK_REMOTE_CLOSE: - LOG.info("Link closed: {}", protonEvent.getLink().getContext()); amqpResource = (AmqpResource) protonEvent.getLink().getContext(); amqpResource.processRemoteClose(this); break; case LINK_REMOTE_DETACH: - LOG.info("Link detach: {}", protonEvent.getLink().getContext()); amqpResource = (AmqpResource) protonEvent.getLink().getContext(); amqpResource.processRemoteDetach(this); break; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/14e4a92b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java index 4ee8bf3..924192e 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java @@ -18,6 +18,7 @@ package org.apache.qpid.jms.provider.failover; import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -47,6 +48,7 @@ import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; import org.apache.qpid.jms.provider.ProviderFactory; import org.apache.qpid.jms.provider.ProviderFuture; import org.apache.qpid.jms.provider.ProviderListener; +import org.apache.qpid.jms.provider.ProviderRedirectedException; import org.apache.qpid.jms.util.IOExceptionSupport; import org.apache.qpid.jms.util.ThreadPoolUtils; import org.slf4j.Logger; @@ -505,6 +507,16 @@ public class FailoverProvider extends DefaultProviderListener implements Provide this.provider = null; if (reconnectAllowed()) { + + if (cause instanceof ProviderRedirectedException) { + ProviderRedirectedException redirect = (ProviderRedirectedException) cause; + try { + uris.add(new URI(failedURI.getScheme() + "://" + redirect.getNetworkHost() + ":" + redirect.getPort())); + } catch (URISyntaxException ex) { + LOG.warn("Could not construct redirection URI from remote provided information"); + } + } + ProviderListener listener = this.listener; if (listener != null) { listener.onConnectionInterrupted(failedURI); @@ -890,7 +902,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide public void run() { requests.put(id, this); if (provider == null) { - whenOffline(IOExceptionSupport.create(new IOException("Connection failed."))); + whenOffline(new IOException("Connection failed.")); } else { try { LOG.debug("Executing Failover Task: {}", this); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/14e4a92b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java new file mode 100644 index 0000000..203677f --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.jms; + +import java.net.URI; + +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; + +public class JmsDefaultConnectionListener implements JmsConnectionListener { + + @Override + public void onConnectionEstablished(URI remoteURI) { + } + + @Override + public void onConnectionFailure(Throwable error) { + } + + @Override + public void onConnectionInterrupted(URI remoteURI) { + } + + @Override + public void onConnectionRestored(URI remoteURI) { + } + + @Override + public void onInboundMessage(JmsInboundMessageDispatch envelope) { + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/14e4a92b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java index 6d4e3f0..6027034 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java @@ -21,12 +21,16 @@ package org.apache.qpid.jms.integration; import static org.hamcrest.Matchers.arrayContaining; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import javax.jms.Connection; import javax.jms.ConnectionMetaData; @@ -38,10 +42,12 @@ import javax.jms.Queue; import javax.jms.Session; import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.provider.ProviderRedirectedException; import org.apache.qpid.jms.test.QpidJmsTestCase; import org.apache.qpid.jms.test.Wait; import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError; +import org.apache.qpid.jms.test.testpeer.basictypes.ConnectionError; import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher; import org.apache.qpid.proton.amqp.transaction.TxnCapability; import org.junit.Test; @@ -118,7 +124,7 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase { // Tell the test peer to close the connection when executing its last handler testPeer.remotelyCloseConnection(true); - //Add the exception listener + // Add the exception listener connection.setExceptionListener(new ExceptionListener() { @Override @@ -136,6 +142,54 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase { } } + @Test(timeout = 10000) + public void testRemotelyEndConnectionWithRedirect() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + final CountDownLatch done = new CountDownLatch(1); + final AtomicReference<JMSException> asyncError = new AtomicReference<JMSException>(); + + final String REDIRECTED_HOSTNAME = "vhost"; + final String REDIRECTED_NETWORK_HOST = "localhost"; + final int REDIRECTED_PORT = 5677; + + // Don't set a ClientId, so that the underlying AMQP connection isn't established yet + Connection connection = testFixture.establishConnecton(testPeer, false, null, null, null, false); + + // Tell the test peer to close the connection when executing its last handler + Map<String, Object> errorInfo = new HashMap<String, Object>(); + errorInfo.put("hostname", REDIRECTED_HOSTNAME); + errorInfo.put("network-host", REDIRECTED_NETWORK_HOST); + errorInfo.put("port", 5677); + + testPeer.remotelyCloseConnection(true, ConnectionError.REDIRECT, "Connection redirected", errorInfo); + + // Add the exception listener + connection.setExceptionListener(new ExceptionListener() { + + @Override + public void onException(JMSException exception) { + asyncError.set(exception); + done.countDown(); + } + }); + + // Trigger the underlying AMQP connection + connection.start(); + + assertTrue("Connection should report failure", done.await(5, TimeUnit.SECONDS)); + + assertTrue(asyncError.get() instanceof JMSException); + assertTrue(asyncError.get().getCause() instanceof ProviderRedirectedException); + + ProviderRedirectedException redirect = (ProviderRedirectedException) asyncError.get().getCause(); + assertEquals(REDIRECTED_HOSTNAME, redirect.getHostname()); + assertEquals(REDIRECTED_NETWORK_HOST, redirect.getNetworkHost()); + assertEquals(REDIRECTED_PORT, redirect.getPort()); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + @Test(timeout = 5000) public void testRemotelyEndConnectionWithSessionWithConsumer() throws Exception { final String BREAD_CRUMB = "ErrorMessage"; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/14e4a92b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java new file mode 100644 index 0000000..7660632 --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.integration; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; + +import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.provider.ProviderRedirectedException; +import org.apache.qpid.jms.test.QpidJmsTestCase; +import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; +import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError; +import org.apache.qpid.jms.test.testpeer.basictypes.ConnectionError; +import org.junit.Test; + +/** + * Test the handling of various error on connection that should return + * specific error types to the JMS client. + */ +public class FailedConnectionsIntegrationTest extends QpidJmsTestCase { + + @Test(timeout = 5000) + public void testConnectWithInvalidClientId() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + testPeer.rejectConnect(AmqpError.INVALID_FIELD, "Client ID already in use", null); + try { + establishAnonymousConnecton(testPeer, true); + fail("Should have thrown JMSException"); + } catch (JMSException jmsEx) { + } catch (Exception ex) { + fail("Should have thrown JMSException: " + ex); + } + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 5000) + public void testConnectSecurityViolation() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + testPeer.rejectConnect(AmqpError.UNAUTHORIZED_ACCESS, "Anonymous connections not allowed", null); + try { + establishAnonymousConnecton(testPeer, true); + fail("Should have thrown JMSSecurityException"); + } catch (JMSException jmsEx) { + } catch (Exception ex) { + fail("Should have thrown JMSSecurityException: " + ex); + } + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 5000) + public void testConnectWithRedirect() throws Exception { + Map<String, Object> redirectInfo = new HashMap<String, Object>(); + + redirectInfo.put("hostname", "localhost"); + redirectInfo.put("network-host", "127.0.0.1"); + redirectInfo.put("port", 5672); + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + testPeer.rejectConnect(ConnectionError.REDIRECT, "Server is full, go away", redirectInfo); + try { + establishAnonymousConnecton(testPeer, true); + fail("Should have thrown JMSException"); + } catch (JMSException jmsex) { + assertTrue(jmsex.getCause() instanceof ProviderRedirectedException); + ProviderRedirectedException redirectEx = (ProviderRedirectedException) jmsex.getCause(); + assertEquals("localhost", redirectEx.getHostname()); + assertEquals("127.0.0.1", redirectEx.getNetworkHost()); + assertEquals(5672, redirectEx.getPort()); + } catch (Exception ex) { + fail("Should have thrown JMSException: " + ex); + } + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + Connection establishAnonymousConnecton(TestAmqpPeer testPeer, boolean setClientId) throws JMSException { + + final String remoteURI = "amqp://localhost:" + testPeer.getServerPort(); + + ConnectionFactory factory = new JmsConnectionFactory(remoteURI); + Connection connection = factory.createConnection(); + + if (setClientId) { + // Set a clientId to provoke the actual AMQP connection process to occur. + connection.setClientID("clientName"); + } + + assertNull(testPeer.getThrowable()); + return connection; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/14e4a92b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java new file mode 100644 index 0000000..fb9e890 --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.failover; + +import static org.junit.Assert.assertTrue; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; + +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.JmsDefaultConnectionListener; +import org.apache.qpid.jms.test.QpidJmsTestCase; +import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; +import org.apache.qpid.jms.test.testpeer.basictypes.ConnectionError; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests that when failover is used and the remote sends a redirect error, the + * failover transport obtains the new peer connection info and attempts to connect + * there. + */ +public class FailoverRedirectTest extends QpidJmsTestCase { + + private static final Logger LOG = LoggerFactory.getLogger(FailoverRedirectTest.class); + + @Test(timeout = 40000) + public void testFailoverHandlesRedirection() throws Exception { + try (TestAmqpPeer rejectingPeer = new TestAmqpPeer(); + TestAmqpPeer redirectedPeer = new TestAmqpPeer();) { + + final CountDownLatch connected = new CountDownLatch(1); + final String redirectURI = createPeerURI(redirectedPeer); + LOG.info("Backup peer is at: {}", redirectURI); + + redirectedPeer.expectAnonymousConnect(true); + redirectedPeer.expectBegin(true); + + Map<String, Object> redirectInfo = new HashMap<String, Object>(); + + redirectInfo.put("hostname", "localhost"); + redirectInfo.put("network-host", "localhost"); + redirectInfo.put("port", redirectedPeer.getServerPort()); + + rejectingPeer.rejectConnect(ConnectionError.REDIRECT, "Server is full, go away", redirectInfo); + + final JmsConnection connection = establishAnonymousConnecton(rejectingPeer); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection Established: {}", remoteURI); + if (redirectURI.equals(remoteURI.toString())) { + connected.countDown(); + } + } + }); + connection.start(); + + rejectingPeer.waitForAllHandlersToComplete(1000); + assertTrue("Should connect to backup peer", connected.await(15, TimeUnit.SECONDS)); + + redirectedPeer.expectClose(); + connection.close(); + redirectedPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 40000) + public void testFailoverHandlesRemotelyEndConnectionWithRedirection() throws Exception { + try (TestAmqpPeer rejectingPeer = new TestAmqpPeer(); + TestAmqpPeer redirectedPeer = new TestAmqpPeer();) { + + final CountDownLatch connectedToPrimary = new CountDownLatch(1); + final CountDownLatch connectedToBackup = new CountDownLatch(1); + + final String rejectingURI = createPeerURI(rejectingPeer); + final String redirectURI = createPeerURI(redirectedPeer); + LOG.info("Primary is at {}: Backup peer is at: {}", rejectingURI, redirectURI); + + redirectedPeer.expectAnonymousConnect(true); + redirectedPeer.expectBegin(true); + + Map<String, Object> redirectInfo = new HashMap<String, Object>(); + + redirectInfo.put("hostname", "localhost"); + redirectInfo.put("network-host", "localhost"); + redirectInfo.put("port", redirectedPeer.getServerPort()); + + rejectingPeer.expectAnonymousConnect(true); + rejectingPeer.expectBegin(true); + rejectingPeer.remotelyCloseConnection(true, ConnectionError.REDIRECT, "Server is full, go away", redirectInfo); + + final JmsConnection connection = establishAnonymousConnecton(rejectingPeer); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection Established: {}", remoteURI); + if (remoteURI.toString().equals(rejectingURI)) { + connectedToPrimary.countDown(); + } + } + + @Override + public void onConnectionRestored(URI remoteURI) { + LOG.info("Connection Reestablished: {}", remoteURI); + if (remoteURI.toString().equals(redirectURI)) { + connectedToBackup.countDown(); + } + } + }); + connection.start(); + + rejectingPeer.waitForAllHandlersToComplete(1000); + + assertTrue("Should connect to primary peer", connectedToPrimary.await(15, TimeUnit.SECONDS)); + assertTrue("Should connect to backup peer", connectedToBackup.await(15, TimeUnit.SECONDS)); + + redirectedPeer.expectClose(); + connection.close(); + redirectedPeer.waitForAllHandlersToComplete(1000); + } + } + + private JmsConnection establishAnonymousConnecton(TestAmqpPeer testPeer) throws JMSException { + final String remoteURI = "failover:(" + createPeerURI(testPeer) + ")"; + + ConnectionFactory factory = new JmsConnectionFactory(remoteURI); + Connection connection = factory.createConnection(); + + return (JmsConnection) connection; + } + + private String createPeerURI(TestAmqpPeer peer) { + return "amqp://localhost:" + peer.getServerPort(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/14e4a92b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index 0c3bac5..7824b47 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -29,6 +29,7 @@ import static org.hamcrest.Matchers.nullValue; import java.io.IOException; import java.net.Socket; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -458,6 +459,62 @@ public class TestAmqpPeer implements AutoCloseable addHandler(openMatcher); } + // TODO - Reject any incoming connection using the supplied information + public void rejectConnect(Symbol errorType, String errorMessage, Map<String, Object> errorInfo) { + + SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(Symbol.valueOf("ANONYMOUS")); + addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER, + new FrameSender( + this, FrameType.SASL, 0, + saslMechanismsFrame, null))); + + addHandler(new SaslInitMatcher() + .withMechanism(equalTo(Symbol.valueOf("ANONYMOUS"))) + .withInitialResponse(nullValue()) + .onSuccess(new AmqpPeerRunnable() + { + @Override + public void run() + { + TestAmqpPeer.this.sendFrame( + FrameType.SASL, 0, + new SaslOutcomeFrame().setCode(UnsignedByte.valueOf((byte)0)), + null, + false); + _driverRunnable.expectHeader(); + } + })); + + addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER)); + + Map<Symbol, Object> properties = new HashMap<Symbol, Object>(); + properties.put(Symbol.valueOf("amqp:connection-establishment-failed"), true); + + OpenFrame open = new OpenFrame(); + open.setProperties(properties); + open.setContainerId("test-amqp-peer-container-id"); + + addHandler(new OpenMatcher() + .withContainerId(notNullValue(String.class)) + .onSuccess(new FrameSender(this, FrameType.AMQP, CONNECTION_CHANNEL, open, null))); + + // Now generate the Close with the supplied error + final CloseFrame closeFrame = new CloseFrame(); + if (errorType != null) { + org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error(); + detachError.setCondition(errorType); + detachError.setDescription(errorMessage); + detachError.setInfo(errorInfo); + closeFrame.setError(detachError); + } + + CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler(); + final FrameSender closeSender = new FrameSender(this, FrameType.AMQP, CONNECTION_CHANNEL, closeFrame, null); + comp.add(closeSender); + + addHandler(new CloseMatcher().withError(Matchers.nullValue())); + } + public void expectClose() { addHandler(new CloseMatcher() @@ -1296,10 +1353,14 @@ public class TestAmqpPeer implements AutoCloseable } public void remotelyCloseConnection(boolean expectCloseResponse) { - remotelyCloseConnection(expectCloseResponse, null, null); + remotelyCloseConnection(expectCloseResponse, null, null, null); } public void remotelyCloseConnection(boolean expectCloseResponse, Symbol errorType, String errorMessage) { + remotelyCloseConnection(expectCloseResponse, errorType, errorMessage, null); + } + + public void remotelyCloseConnection(boolean expectCloseResponse, Symbol errorType, String errorMessage, Map<String, Object> info) { synchronized (_handlersLock) { // Prepare a composite to insert this action at the end of the handler sequence CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler(); @@ -1310,6 +1371,7 @@ public class TestAmqpPeer implements AutoCloseable org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error(); detachError.setCondition(errorType); detachError.setDescription(errorMessage); + detachError.setInfo(info); closeFrame.setError(detachError); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
