This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
The following commit(s) were added to refs/heads/main by this push:
new 2f6bde0 QPIDJMS-549: use retained ref that isnt nulled during
failover, and avoid multiple signals to the related code that let it NPE more
than once
2f6bde0 is described below
commit 2f6bde0a388cd1431cacb154ba55a78c22e263af
Author: Robbie Gemmell <[email protected]>
AuthorDate: Fri Oct 15 17:37:08 2021 +0100
QPIDJMS-549: use retained ref that isnt nulled during failover, and avoid
multiple signals to the related code that let it NPE more than once
---
.../qpid/jms/provider/amqp/AmqpProvider.java | 22 ++++--
.../jms/provider/failover/FailoverProvider.java | 5 +-
...hAmqpOpenProvidedServerListIntegrationTest.java | 88 ++++++++++++++++++++++
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 15 +++-
4 files changed, 122 insertions(+), 8 deletions(-)
diff --git
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 8058678..d7f2f35 100644
---
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -448,15 +448,21 @@ public class AmqpProvider implements Provider,
TransportListener , AmqpResourceP
AmqpConnectionBuilder builder = new
AmqpConnectionBuilder(AmqpProvider.this, connectionInfo);
connectionRequest = new AsyncResult() {
+ AtomicBoolean signalled = new AtomicBoolean();
+
@Override
public void onSuccess() {
- fireConnectionEstablished();
- request.onSuccess();
+ if (signalled.compareAndSet(false, true)) {
+ fireConnectionEstablished();
+ request.onSuccess();
+ }
}
@Override
public void onFailure(ProviderException result) {
- request.onFailure(result);
+ if (signalled.compareAndSet(false, true)) {
+ request.onFailure(result);
+ }
}
@Override
@@ -1470,12 +1476,14 @@ public class AmqpProvider implements Provider,
TransportListener , AmqpResourceP
@Override
public List<URI> getAlternateURIs() {
- List<URI> alternates = new ArrayList<>();
+ List<URI> alternates = null;
if (connection != null) {
// If there are failover servers in the open then we signal that
to the listeners
List<AmqpRedirect> failoverList =
connection.getProperties().getFailoverServerList();
if (!failoverList.isEmpty()) {
+ alternates = new ArrayList<>();
+
for (AmqpRedirect redirect : failoverList) {
try {
alternates.add(redirect.toURI());
@@ -1486,7 +1494,11 @@ public class AmqpProvider implements Provider,
TransportListener , AmqpResourceP
}
}
- return alternates;
+ if (alternates != null) {
+ return alternates;
+ } else {
+ return Collections.emptyList();
+ }
}
public org.apache.qpid.proton.engine.Transport getProtonTransport() {
diff --git
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index 238e3fc..ce13e7b 100644
---
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -940,8 +940,9 @@ public class FailoverProvider extends
DefaultProviderListener implements Provide
Provider provider = this.provider;
if (provider != null) {
return provider.getAlternateURIs();
+ } else {
+ return Collections.emptyList();
}
- return null;
};
@Override
@@ -1296,7 +1297,7 @@ public class FailoverProvider extends
DefaultProviderListener implements Provide
LOG.debug("Request received error: {}",
result.getMessage());
// If we managed to receive an Open frame it might contain
// a failover update so process it before handling the
error.
- processAlternates(provider.getAlternateURIs());
+ processAlternates(activeProvider.getAlternateURIs());
handleProviderFailure(activeProvider,
ProviderExceptionSupport.createOrPassthroughFatal(result));
}
} finally {
diff --git
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java
index 01ecbf8..742afcc 100644
---
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java
+++
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java
@@ -18,6 +18,7 @@ package org.apache.qpid.jms.provider.failover;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SCHEME;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -41,6 +42,7 @@ import org.apache.qpid.jms.JmsDefaultConnectionListener;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
+import org.apache.qpid.jms.test.testpeer.basictypes.ConnectionError;
import org.apache.qpid.jms.transports.TransportOptions;
import org.apache.qpid.jms.transports.TransportSupport;
import org.apache.qpid.jms.util.PropertyUtil;
@@ -393,6 +395,92 @@ public class
FailoverWithAmqpOpenProvidedServerListIntegrationTest extends QpidJ
}
}
+ @Test(timeout = 20000)
+ public void
testFailoverHandlesRemoteCloseWithRedirectDuringConnectionSessionEstablishment()
throws Exception {
+ try (TestAmqpPeer primaryPeer = new TestAmqpPeer();
+ TestAmqpPeer backupPeer = new TestAmqpPeer();) {
+
+ final URI primaryPeerURI = createPeerURI(primaryPeer);
+ final URI backupPeerURI = createPeerURI(backupPeer);
+ LOG.info("Primary is at {}: Backup peer is at: {}",
primaryPeerURI, backupPeerURI);
+
+ final CountDownLatch connectedToPrimary = new CountDownLatch(1);
+ final CountDownLatch connectedToBackup = new CountDownLatch(1);
+
+ Map<Symbol, Object> redirectInfo = new HashMap<Symbol, Object>();
+ redirectInfo.put(NETWORK_HOST, "localhost");
+ redirectInfo.put(PORT, backupPeer.getServerPort());
+
+ // Have the failover-server-list containing a third (non-existent)
servers details
+ String thirdPeerTestHost = "test-host";
+ int thirdPeerTestPort = 45678;
+
+ Map<Symbol,Object> thirdPeerTestDetails = new HashMap<>();
+ thirdPeerTestDetails.put(NETWORK_HOST, thirdPeerTestHost);
+ thirdPeerTestDetails.put(PORT, thirdPeerTestPort);
+
+ List<Map<Symbol, Object>> failoverServerList = new
ArrayList<Map<Symbol, Object>>();
+ failoverServerList.add(thirdPeerTestDetails);
+
+ Map<Symbol,Object> forcingPeerConnectionProperties = new
HashMap<Symbol, Object>();
+ forcingPeerConnectionProperties.put(FAILOVER_SERVER_LIST,
failoverServerList);
+
+ primaryPeer.expectSaslAnonymous();
+ primaryPeer.expectOpen(true);
+ // Prepare a bare Open without any failure hint, suggesting
success,
+ // but defer writing it until the Close is ready to send too.
+ primaryPeer.sendOpenFrameAfterLastAction(true,
forcingPeerConnectionProperties);
+ // Then send a redirecting Close, prompting the Open to actually
be written.
+ primaryPeer.remotelyCloseConnection(false,
ConnectionError.REDIRECT, "Server is going away", redirectInfo);
+ primaryPeer.expectBegin(false);// From the connection-session,
prompted by 'successful' Open.
+ primaryPeer.expectClose(false);
+
+ backupPeer.expectSaslAnonymous();
+ backupPeer.expectOpen();
+ backupPeer.expectBegin();
+
+ final JmsConnection connection = establishAnonymousConnecton(null,
primaryPeer);
+ connection.addConnectionListener(new
JmsDefaultConnectionListener() {
+ @Override
+ public void onConnectionEstablished(URI remoteURI) {
+ LOG.info("Connection Established: {}", remoteURI);
+ if (primaryPeerURI.equals(remoteURI)) {
+ connectedToPrimary.countDown();
+ }
+
+ if (backupPeerURI.equals(remoteURI)) {
+ connectedToBackup.countDown();
+ }
+ }
+ });
+
+ // Verify the existing failover URIs are as expected, the initial
peer only
+ List<URI> beforeOpenFailoverURIs = new ArrayList<>();
+ beforeOpenFailoverURIs.add(primaryPeerURI);
+
+ assertFailoverURIList(connection, beforeOpenFailoverURIs);
+
+ connection.start();
+
+ primaryPeer.waitForAllHandlersToComplete(3000);
+
+ assertTrue("Should connect to backup peer",
connectedToBackup.await(5, TimeUnit.SECONDS));
+ assertFalse("Should not connect to primary peer",
connectedToPrimary.await(10, TimeUnit.MILLISECONDS));
+
+ // Verify the failover URIs are as expected, now containing
initial peer, its advertised third test-details peer, and the peer it then
redirected to.
+ List<URI> afterOpenFailoverURIs = new ArrayList<>();
+ afterOpenFailoverURIs.add(primaryPeerURI);
+ afterOpenFailoverURIs.add(new URI("amqp://" + thirdPeerTestHost +
":" + thirdPeerTestPort));
+ afterOpenFailoverURIs.add(backupPeerURI);
+
+ assertFailoverURIList(connection, afterOpenFailoverURIs);
+
+ backupPeer.expectClose();
+ connection.close();
+ backupPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
/*
* Verify that when the Open frame contains a failover server list and we
are connected via SSL configured with
* system properties the redirect uses those properties to connect to the
new host.
diff --git
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 388323b..69581bf 100644
---
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -928,11 +928,24 @@ public class TestAmqpPeer implements AutoCloseable
}
public void sendPreemptiveServerOpenFrame() {
+ sendOpenFrameAfterLastAction(false, null);
+ }
+
+ public void sendOpenFrameAfterLastAction(boolean deferWrite, Map<Symbol,
Object> serverProperties) {
// Arrange to send the Open frame after the previous handler
OpenFrame open = createOpenFrame();
+ if (serverProperties != null) {
+ open.setProperties(serverProperties);
+ }
CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
- comp.add(new FrameSender(this, FrameType.AMQP, 0, open, null));
+
+ FrameSender openSender = new FrameSender(this, FrameType.AMQP, 0,
open, null);
+ if (deferWrite) {
+ openSender.setDeferWrite(true);
+ }
+
+ comp.add(openSender);
}
public void expectOpen(Symbol[] desiredCapabilities, Symbol[]
serverCapabilities,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]