QPIDJMS-95: if a [transport] failure is reported while we are trying to open then fail the attempt to open the new connection
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/9aabc27a Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/9aabc27a Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/9aabc27a Branch: refs/heads/master Commit: 9aabc27a4de515377cd149c41b6983435a241e3d Parents: e46354b Author: Robert Gemmell <[email protected]> Authored: Fri Aug 21 17:29:13 2015 +0100 Committer: Robert Gemmell <[email protected]> Committed: Fri Aug 21 18:45:13 2015 +0100 ---------------------------------------------------------------------- .../qpid/jms/provider/amqp/AmqpProvider.java | 18 ++- .../failover/FailoverIntegrationTest.java | 122 +++++++++++++++++++ .../qpid/jms/test/testpeer/TestAmqpPeer.java | 11 ++ .../jms/test/testpeer/TestAmqpPeerRunner.java | 12 ++ 4 files changed, 159 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9aabc27a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index 3a2e709..7383352 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -126,6 +126,7 @@ public class AmqpProvider implements Provider, TransportListener { private final Transport protonTransport = Transport.Factory.create(); private final Collector protonCollector = new CollectorImpl(); + private AsyncResult connectionOpenRequest; private ScheduledFuture<?> nextIdleTimeoutCheck; /** @@ -240,7 +241,6 @@ public class AmqpProvider implements Provider, TransportListener { try { checkClosed(); resource.visit(new JmsResourceVistor() { - @Override public void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception { AmqpSession session = connection.createSession(sessionInfo); @@ -296,8 +296,7 @@ public class AmqpProvider implements Provider, TransportListener { } connection = new AmqpConnection(AmqpProvider.this, protonConnection, authenticator, connectionInfo); - connection.open(new AsyncResult() { - + AsyncResult wrappedOpenRequest = new AsyncResult() { @Override public void onSuccess() { fireConnectionEstablished(); @@ -313,7 +312,11 @@ public class AmqpProvider implements Provider, TransportListener { public boolean isComplete() { return request.isComplete(); } - }); + }; + + connectionOpenRequest = wrappedOpenRequest; + + connection.open(wrappedOpenRequest); } @Override @@ -838,6 +841,8 @@ public class AmqpProvider implements Provider, TransportListener { } void fireConnectionEstablished() { + //The request onSuccess calls this method + connectionOpenRequest = null; long now = System.currentTimeMillis(); long deadline = protonTransport.tick(now); @@ -854,6 +859,11 @@ public class AmqpProvider implements Provider, TransportListener { } void fireProviderException(Throwable ex) { + if(connectionOpenRequest != null) { + connectionOpenRequest.onFailure(ex); + connectionOpenRequest = null; + } + ProviderListener listener = this.listener; if (listener != null) { listener.onConnectionFailure(IOExceptionSupport.create(ex)); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9aabc27a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java new file mode 100644 index 0000000..6a086b3 --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.failover; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; + +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.JmsDefaultConnectionListener; +import org.apache.qpid.jms.test.QpidJmsTestCase; +import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FailoverIntegrationTest extends QpidJmsTestCase { + + private static final Logger LOG = LoggerFactory.getLogger(FailoverIntegrationTest.class); + + @Test(timeout = 20000) + public void testFailoverHandlesImmediateTransportDropAfterConnect() throws Exception { + try (TestAmqpPeer originalPeer = new TestAmqpPeer(); + TestAmqpPeer rejectingPeer = new TestAmqpPeer(); + TestAmqpPeer finalPeer = new TestAmqpPeer();) { + + final CountDownLatch originalConnected = new CountDownLatch(1); + final CountDownLatch finalConnected = new CountDownLatch(1); + + // Create a peer to connect to, one to fail to reconnect to, and a final one to reconnect to + final String originalURI = createPeerURI(originalPeer); + final String rejectingURI = createPeerURI(rejectingPeer); + final String finalURI = createPeerURI(finalPeer); + + LOG.info("Original peer is at: {}", originalURI); + LOG.info("Rejecting peer is at: {}", rejectingURI); + LOG.info("Final peer is at: {}", finalURI); + + // Connect to the first + originalPeer.expectSaslAnonymousConnect(); + originalPeer.expectBegin(); + + final JmsConnection connection = establishAnonymousConnecton(originalPeer, rejectingPeer, finalPeer); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection Established: {}", remoteURI); + if (originalURI.equals(remoteURI.toString())) { + originalConnected.countDown(); + } + } + + @Override + public void onConnectionRestored(URI remoteURI) { + LOG.info("Connection Restored: {}", remoteURI); + if (finalURI.equals(remoteURI.toString())) { + finalConnected.countDown(); + } + } + }); + connection.start(); + + assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS)); + assertEquals("should not yet have connected to final peer", 1L, finalConnected.getCount()); + + // Set expectations on rejecting and final peer + rejectingPeer.expectSaslHeaderThenDrop(); + + finalPeer.expectSaslAnonymousConnect(); + finalPeer.expectBegin(); + + // Close the original peer and wait for things to shake out. + originalPeer.close(); + + rejectingPeer.waitForAllHandlersToComplete(2000); + + assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS)); + + //Shut it down + finalPeer.expectClose(); + connection.close(); + finalPeer.waitForAllHandlersToComplete(1000); + } + } + + private JmsConnection establishAnonymousConnecton(TestAmqpPeer origPeer, TestAmqpPeer rejectingPeer, TestAmqpPeer finalPeer) throws JMSException { + final String remoteURI = "failover:(" + createPeerURI(origPeer) + "," + + createPeerURI(rejectingPeer) + "," + + createPeerURI(finalPeer) + ")"; + + ConnectionFactory factory = new JmsConnectionFactory(remoteURI); + Connection connection = factory.createConnection(); + + return (JmsConnection) connection; + } + + private String createPeerURI(TestAmqpPeer peer) { + return "amqp://localhost:" + peer.getServerPort(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9aabc27a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index 6f34ea5..62694d7 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -1562,4 +1562,15 @@ public class TestAmqpPeer implements AutoCloseable _firstAssertionError = ae; } } + + public void expectSaslHeaderThenDrop() { + AmqpPeerRunnable exitAfterHeader = new AmqpPeerRunnable() { + @Override + public void run() { + _driverRunnable.exitReadLoopEarly(); + } + }; + + addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER, exitAfterHeader)); + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9aabc27a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java index 17ec2f6..854e64c 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java @@ -48,6 +48,7 @@ class TestAmqpPeerRunner implements Runnable private final Object _inputHandlingLock = new Object(); private final TestFrameParser _testFrameParser; private volatile boolean _suppressReadExceptionOnClose; + private volatile boolean _exitReadLoopEarly; private volatile Throwable _throwable; @@ -108,6 +109,13 @@ class TestAmqpPeerRunner implements Runnable _testFrameParser.input(networkInputByteBuffer); } + + if(_exitReadLoopEarly) + { + LOGGER.trace("Exiting read loop early"); + break; + } + LOGGER.trace("Attempting read"); attemptingRead = true; } @@ -217,4 +225,8 @@ class TestAmqpPeerRunner implements Runnable public boolean isNeedClientCert() { return needClientCert; } + + public void exitReadLoopEarly() { + _exitReadLoopEarly = true; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
