This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git


The following commit(s) were added to refs/heads/main by this push:
     new 2f6bde0  QPIDJMS-549: use retained ref that isnt nulled during 
failover, and avoid multiple signals to the related code that let it NPE more 
than once
2f6bde0 is described below

commit 2f6bde0a388cd1431cacb154ba55a78c22e263af
Author: Robbie Gemmell <[email protected]>
AuthorDate: Fri Oct 15 17:37:08 2021 +0100

    QPIDJMS-549: use retained ref that isnt nulled during failover, and avoid 
multiple signals to the related code that let it NPE more than once
---
 .../qpid/jms/provider/amqp/AmqpProvider.java       | 22 ++++--
 .../jms/provider/failover/FailoverProvider.java    |  5 +-
 ...hAmqpOpenProvidedServerListIntegrationTest.java | 88 ++++++++++++++++++++++
 .../qpid/jms/test/testpeer/TestAmqpPeer.java       | 15 +++-
 4 files changed, 122 insertions(+), 8 deletions(-)

diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 8058678..d7f2f35 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -448,15 +448,21 @@ public class AmqpProvider implements Provider, 
TransportListener , AmqpResourceP
 
                         AmqpConnectionBuilder builder = new 
AmqpConnectionBuilder(AmqpProvider.this, connectionInfo);
                         connectionRequest = new AsyncResult() {
+                            AtomicBoolean signalled = new AtomicBoolean();
+
                             @Override
                             public void onSuccess() {
-                                fireConnectionEstablished();
-                                request.onSuccess();
+                                if (signalled.compareAndSet(false, true)) {
+                                    fireConnectionEstablished();
+                                    request.onSuccess();
+                                }
                             }
 
                             @Override
                             public void onFailure(ProviderException result) {
-                                request.onFailure(result);
+                                if (signalled.compareAndSet(false, true)) {
+                                    request.onFailure(result);
+                                }
                             }
 
                             @Override
@@ -1470,12 +1476,14 @@ public class AmqpProvider implements Provider, 
TransportListener , AmqpResourceP
 
     @Override
     public List<URI> getAlternateURIs() {
-        List<URI> alternates = new ArrayList<>();
+        List<URI> alternates = null;
 
         if (connection != null) {
             // If there are failover servers in the open then we signal that 
to the listeners
             List<AmqpRedirect> failoverList = 
connection.getProperties().getFailoverServerList();
             if (!failoverList.isEmpty()) {
+                alternates = new ArrayList<>();
+
                 for (AmqpRedirect redirect : failoverList) {
                     try {
                         alternates.add(redirect.toURI());
@@ -1486,7 +1494,11 @@ public class AmqpProvider implements Provider, 
TransportListener , AmqpResourceP
             }
         }
 
-        return alternates;
+        if (alternates != null) {
+            return alternates;
+        } else {
+            return Collections.emptyList();
+        }
     }
 
     public org.apache.qpid.proton.engine.Transport getProtonTransport() {
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index 238e3fc..ce13e7b 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -940,8 +940,9 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
         Provider provider = this.provider;
         if (provider != null) {
             return provider.getAlternateURIs();
+        } else {
+            return Collections.emptyList();
         }
-        return null;
     };
 
     @Override
@@ -1296,7 +1297,7 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
                     LOG.debug("Request received error: {}", 
result.getMessage());
                     // If we managed to receive an Open frame it might contain
                     // a failover update so process it before handling the 
error.
-                    processAlternates(provider.getAlternateURIs());
+                    processAlternates(activeProvider.getAlternateURIs());
                     handleProviderFailure(activeProvider, 
ProviderExceptionSupport.createOrPassthroughFatal(result));
                 }
             } finally {
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java
index 01ecbf8..742afcc 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java
@@ -18,6 +18,7 @@ package org.apache.qpid.jms.provider.failover;
 
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SCHEME;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -41,6 +42,7 @@ import org.apache.qpid.jms.JmsDefaultConnectionListener;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
+import org.apache.qpid.jms.test.testpeer.basictypes.ConnectionError;
 import org.apache.qpid.jms.transports.TransportOptions;
 import org.apache.qpid.jms.transports.TransportSupport;
 import org.apache.qpid.jms.util.PropertyUtil;
@@ -393,6 +395,92 @@ public class 
FailoverWithAmqpOpenProvidedServerListIntegrationTest extends QpidJ
         }
     }
 
+    @Test(timeout = 20000)
+    public void 
testFailoverHandlesRemoteCloseWithRedirectDuringConnectionSessionEstablishment()
 throws Exception {
+        try (TestAmqpPeer primaryPeer = new TestAmqpPeer();
+             TestAmqpPeer backupPeer = new TestAmqpPeer();) {
+
+            final URI primaryPeerURI = createPeerURI(primaryPeer);
+            final URI backupPeerURI = createPeerURI(backupPeer);
+            LOG.info("Primary is at {}: Backup peer is at: {}", 
primaryPeerURI, backupPeerURI);
+
+            final CountDownLatch connectedToPrimary = new CountDownLatch(1);
+            final CountDownLatch connectedToBackup = new CountDownLatch(1);
+
+            Map<Symbol, Object> redirectInfo = new HashMap<Symbol, Object>();
+            redirectInfo.put(NETWORK_HOST, "localhost");
+            redirectInfo.put(PORT, backupPeer.getServerPort());
+
+            // Have the failover-server-list containing a third (non-existent) 
servers details
+            String thirdPeerTestHost = "test-host";
+            int thirdPeerTestPort = 45678;
+
+            Map<Symbol,Object> thirdPeerTestDetails = new HashMap<>();
+            thirdPeerTestDetails.put(NETWORK_HOST, thirdPeerTestHost);
+            thirdPeerTestDetails.put(PORT, thirdPeerTestPort);
+
+            List<Map<Symbol, Object>> failoverServerList = new 
ArrayList<Map<Symbol, Object>>();
+            failoverServerList.add(thirdPeerTestDetails);
+
+            Map<Symbol,Object> forcingPeerConnectionProperties = new 
HashMap<Symbol, Object>();
+            forcingPeerConnectionProperties.put(FAILOVER_SERVER_LIST, 
failoverServerList);
+
+            primaryPeer.expectSaslAnonymous();
+            primaryPeer.expectOpen(true);
+            // Prepare a bare Open without any failure hint, suggesting 
success,
+            // but defer writing it until the Close is ready to send too.
+            primaryPeer.sendOpenFrameAfterLastAction(true, 
forcingPeerConnectionProperties);
+            // Then send a redirecting Close, prompting the Open to actually 
be written.
+            primaryPeer.remotelyCloseConnection(false, 
ConnectionError.REDIRECT, "Server is going away", redirectInfo);
+            primaryPeer.expectBegin(false);// From the connection-session, 
prompted by 'successful' Open.
+            primaryPeer.expectClose(false);
+
+            backupPeer.expectSaslAnonymous();
+            backupPeer.expectOpen();
+            backupPeer.expectBegin();
+
+            final JmsConnection connection = establishAnonymousConnecton(null, 
primaryPeer);
+            connection.addConnectionListener(new 
JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (primaryPeerURI.equals(remoteURI)) {
+                        connectedToPrimary.countDown();
+                    }
+
+                    if (backupPeerURI.equals(remoteURI)) {
+                        connectedToBackup.countDown();
+                    }
+                }
+            });
+
+            // Verify the existing failover URIs are as expected, the initial 
peer only
+            List<URI> beforeOpenFailoverURIs = new ArrayList<>();
+            beforeOpenFailoverURIs.add(primaryPeerURI);
+
+            assertFailoverURIList(connection, beforeOpenFailoverURIs);
+
+            connection.start();
+
+            primaryPeer.waitForAllHandlersToComplete(3000);
+
+            assertTrue("Should connect to backup peer", 
connectedToBackup.await(5, TimeUnit.SECONDS));
+            assertFalse("Should not connect to primary peer", 
connectedToPrimary.await(10, TimeUnit.MILLISECONDS));
+
+            // Verify the failover URIs are as expected, now containing 
initial peer, its advertised third test-details peer, and the peer it then 
redirected to.
+            List<URI> afterOpenFailoverURIs = new ArrayList<>();
+            afterOpenFailoverURIs.add(primaryPeerURI);
+            afterOpenFailoverURIs.add(new URI("amqp://" + thirdPeerTestHost + 
":" + thirdPeerTestPort));
+            afterOpenFailoverURIs.add(backupPeerURI);
+
+            assertFailoverURIList(connection, afterOpenFailoverURIs);
+
+            backupPeer.expectClose();
+            connection.close();
+            backupPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
     /*
      * Verify that when the Open frame contains a failover server list and we 
are connected via SSL configured with
      * system properties the redirect uses those properties to connect to the 
new host.
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 388323b..69581bf 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -928,11 +928,24 @@ public class TestAmqpPeer implements AutoCloseable
     }
 
     public void sendPreemptiveServerOpenFrame() {
+        sendOpenFrameAfterLastAction(false, null);
+    }
+
+    public void sendOpenFrameAfterLastAction(boolean deferWrite, Map<Symbol, 
Object> serverProperties) {
         // Arrange to send the Open frame after the previous handler
         OpenFrame open = createOpenFrame();
+        if (serverProperties != null) {
+            open.setProperties(serverProperties);
+        }
 
         CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
-        comp.add(new FrameSender(this, FrameType.AMQP, 0, open, null));
+
+        FrameSender openSender = new FrameSender(this, FrameType.AMQP, 0, 
open, null);
+        if (deferWrite) {
+            openSender.setDeferWrite(true);
+        }
+
+        comp.add(openSender);
     }
 
     public void expectOpen(Symbol[] desiredCapabilities, Symbol[] 
serverCapabilities,

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to