This is an automated email from the ASF dual-hosted git repository. robbie pushed a commit to branch 0.x in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
commit 763316dc3028795d463c74a2e0ea1f41fa9de7bd Author: Robbie Gemmell <[email protected]> AuthorDate: Fri Oct 15 17:37:08 2021 +0100 QPIDJMS-549: use retained ref that isnt nulled during failover, and avoid multiple signals to the related code that let it NPE more than once (cherry picked from commit 2f6bde0a388cd1431cacb154ba55a78c22e263af) --- .../qpid/jms/provider/amqp/AmqpProvider.java | 22 ++++-- .../jms/provider/failover/FailoverProvider.java | 5 +- ...hAmqpOpenProvidedServerListIntegrationTest.java | 88 ++++++++++++++++++++++ .../qpid/jms/test/testpeer/TestAmqpPeer.java | 15 +++- 4 files changed, 122 insertions(+), 8 deletions(-) diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index 8058678..d7f2f35 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -448,15 +448,21 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP AmqpConnectionBuilder builder = new AmqpConnectionBuilder(AmqpProvider.this, connectionInfo); connectionRequest = new AsyncResult() { + AtomicBoolean signalled = new AtomicBoolean(); + @Override public void onSuccess() { - fireConnectionEstablished(); - request.onSuccess(); + if (signalled.compareAndSet(false, true)) { + fireConnectionEstablished(); + request.onSuccess(); + } } @Override public void onFailure(ProviderException result) { - request.onFailure(result); + if (signalled.compareAndSet(false, true)) { + request.onFailure(result); + } } @Override @@ -1470,12 +1476,14 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP @Override public List<URI> getAlternateURIs() { - List<URI> alternates = new ArrayList<>(); + List<URI> alternates = null; if (connection != null) { // If there are failover servers in the open then we signal that to the listeners List<AmqpRedirect> failoverList = connection.getProperties().getFailoverServerList(); if (!failoverList.isEmpty()) { + alternates = new ArrayList<>(); + for (AmqpRedirect redirect : failoverList) { try { alternates.add(redirect.toURI()); @@ -1486,7 +1494,11 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP } } - return alternates; + if (alternates != null) { + return alternates; + } else { + return Collections.emptyList(); + } } public org.apache.qpid.proton.engine.Transport getProtonTransport() { diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java index 238e3fc..ce13e7b 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java @@ -940,8 +940,9 @@ public class FailoverProvider extends DefaultProviderListener implements Provide Provider provider = this.provider; if (provider != null) { return provider.getAlternateURIs(); + } else { + return Collections.emptyList(); } - return null; }; @Override @@ -1296,7 +1297,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide LOG.debug("Request received error: {}", result.getMessage()); // If we managed to receive an Open frame it might contain // a failover update so process it before handling the error. - processAlternates(provider.getAlternateURIs()); + processAlternates(activeProvider.getAlternateURIs()); handleProviderFailure(activeProvider, ProviderExceptionSupport.createOrPassthroughFatal(result)); } } finally { diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java index 01ecbf8..742afcc 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java @@ -18,6 +18,7 @@ package org.apache.qpid.jms.provider.failover; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SCHEME; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -41,6 +42,7 @@ import org.apache.qpid.jms.JmsDefaultConnectionListener; import org.apache.qpid.jms.test.QpidJmsTestCase; import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError; +import org.apache.qpid.jms.test.testpeer.basictypes.ConnectionError; import org.apache.qpid.jms.transports.TransportOptions; import org.apache.qpid.jms.transports.TransportSupport; import org.apache.qpid.jms.util.PropertyUtil; @@ -393,6 +395,92 @@ public class FailoverWithAmqpOpenProvidedServerListIntegrationTest extends QpidJ } } + @Test(timeout = 20000) + public void testFailoverHandlesRemoteCloseWithRedirectDuringConnectionSessionEstablishment() throws Exception { + try (TestAmqpPeer primaryPeer = new TestAmqpPeer(); + TestAmqpPeer backupPeer = new TestAmqpPeer();) { + + final URI primaryPeerURI = createPeerURI(primaryPeer); + final URI backupPeerURI = createPeerURI(backupPeer); + LOG.info("Primary is at {}: Backup peer is at: {}", primaryPeerURI, backupPeerURI); + + final CountDownLatch connectedToPrimary = new CountDownLatch(1); + final CountDownLatch connectedToBackup = new CountDownLatch(1); + + Map<Symbol, Object> redirectInfo = new HashMap<Symbol, Object>(); + redirectInfo.put(NETWORK_HOST, "localhost"); + redirectInfo.put(PORT, backupPeer.getServerPort()); + + // Have the failover-server-list containing a third (non-existent) servers details + String thirdPeerTestHost = "test-host"; + int thirdPeerTestPort = 45678; + + Map<Symbol,Object> thirdPeerTestDetails = new HashMap<>(); + thirdPeerTestDetails.put(NETWORK_HOST, thirdPeerTestHost); + thirdPeerTestDetails.put(PORT, thirdPeerTestPort); + + List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>(); + failoverServerList.add(thirdPeerTestDetails); + + Map<Symbol,Object> forcingPeerConnectionProperties = new HashMap<Symbol, Object>(); + forcingPeerConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList); + + primaryPeer.expectSaslAnonymous(); + primaryPeer.expectOpen(true); + // Prepare a bare Open without any failure hint, suggesting success, + // but defer writing it until the Close is ready to send too. + primaryPeer.sendOpenFrameAfterLastAction(true, forcingPeerConnectionProperties); + // Then send a redirecting Close, prompting the Open to actually be written. + primaryPeer.remotelyCloseConnection(false, ConnectionError.REDIRECT, "Server is going away", redirectInfo); + primaryPeer.expectBegin(false);// From the connection-session, prompted by 'successful' Open. + primaryPeer.expectClose(false); + + backupPeer.expectSaslAnonymous(); + backupPeer.expectOpen(); + backupPeer.expectBegin(); + + final JmsConnection connection = establishAnonymousConnecton(null, primaryPeer); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection Established: {}", remoteURI); + if (primaryPeerURI.equals(remoteURI)) { + connectedToPrimary.countDown(); + } + + if (backupPeerURI.equals(remoteURI)) { + connectedToBackup.countDown(); + } + } + }); + + // Verify the existing failover URIs are as expected, the initial peer only + List<URI> beforeOpenFailoverURIs = new ArrayList<>(); + beforeOpenFailoverURIs.add(primaryPeerURI); + + assertFailoverURIList(connection, beforeOpenFailoverURIs); + + connection.start(); + + primaryPeer.waitForAllHandlersToComplete(3000); + + assertTrue("Should connect to backup peer", connectedToBackup.await(5, TimeUnit.SECONDS)); + assertFalse("Should not connect to primary peer", connectedToPrimary.await(10, TimeUnit.MILLISECONDS)); + + // Verify the failover URIs are as expected, now containing initial peer, its advertised third test-details peer, and the peer it then redirected to. + List<URI> afterOpenFailoverURIs = new ArrayList<>(); + afterOpenFailoverURIs.add(primaryPeerURI); + afterOpenFailoverURIs.add(new URI("amqp://" + thirdPeerTestHost + ":" + thirdPeerTestPort)); + afterOpenFailoverURIs.add(backupPeerURI); + + assertFailoverURIList(connection, afterOpenFailoverURIs); + + backupPeer.expectClose(); + connection.close(); + backupPeer.waitForAllHandlersToComplete(1000); + } + } + /* * Verify that when the Open frame contains a failover server list and we are connected via SSL configured with * system properties the redirect uses those properties to connect to the new host. diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index 388323b..69581bf 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -928,11 +928,24 @@ public class TestAmqpPeer implements AutoCloseable } public void sendPreemptiveServerOpenFrame() { + sendOpenFrameAfterLastAction(false, null); + } + + public void sendOpenFrameAfterLastAction(boolean deferWrite, Map<Symbol, Object> serverProperties) { // Arrange to send the Open frame after the previous handler OpenFrame open = createOpenFrame(); + if (serverProperties != null) { + open.setProperties(serverProperties); + } CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler(); - comp.add(new FrameSender(this, FrameType.AMQP, 0, open, null)); + + FrameSender openSender = new FrameSender(this, FrameType.AMQP, 0, open, null); + if (deferWrite) { + openSender.setDeferWrite(true); + } + + comp.add(openSender); } public void expectOpen(Symbol[] desiredCapabilities, Symbol[] serverCapabilities, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
