This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/master by this push:
new b9bcd2f AMQ-7165 - ensure failover updated uris are additive such
that statically configured uris are respected
b9bcd2f is described below
commit b9bcd2fcc37837cafa0ff4caa54ae5a04a26ab99
Author: gtully <[email protected]>
AuthorDate: Tue Mar 12 12:24:20 2019 +0000
AMQ-7165 - ensure failover updated uris are additive such that statically
configured uris are respected
---
.../transport/failover/FailoverTransport.java | 29 ++++++++++-------
.../failover/FailoverClusterTestSupport.java | 20 +++++++++++-
.../failover/FailoverComplexClusterTest.java | 38 +++++++++++++++++++++-
.../failover/TwoBrokerFailoverClusterTest.java | 5 ++-
4 files changed, 77 insertions(+), 15 deletions(-)
diff --git
a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
index 9c24828..6b6f518 100644
---
a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
+++
b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
@@ -32,6 +32,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
@@ -277,7 +278,7 @@ public class FailoverTransport implements
CompositeTransport {
reconnectOk = true;
}
- LOG.warn("Transport ({}) failed {} attempting to automatically
reconnect: {}",
+ LOG.warn("Transport ({}) failed{} attempting to automatically
reconnect",
connectedTransportURI, (reconnectOk ? "," : ", not"),
e);
failedConnectTransportURI = connectedTransportURI;
@@ -290,7 +291,6 @@ public class FailoverTransport implements
CompositeTransport {
transportListener.transportInterupted();
}
- updated.remove(failedConnectTransportURI);
reconnectTask.wakeup();
} else if (!isDisposed()) {
propagateFailureToExceptionListener(e);
@@ -791,14 +791,16 @@ public class FailoverTransport implements
CompositeTransport {
}
private List<URI> getConnectList() {
- if (!updated.isEmpty()) {
- return updated;
- }
- ArrayList<URI> l = new ArrayList<URI>(uris);
+ // updated have precedence
+ LinkedHashSet<URI> uniqueUris = new LinkedHashSet<URI>(updated);
+ uniqueUris.addAll(uris);
+
boolean removed = false;
if (failedConnectTransportURI != null) {
- removed = l.remove(failedConnectTransportURI);
+ removed = uniqueUris.remove(failedConnectTransportURI);
}
+
+ ArrayList<URI> l = new ArrayList<URI>(uniqueUris);
if (randomize) {
// Randomly, reorder the list by random swapping
for (int i = 0; i < l.size(); i++) {
@@ -813,7 +815,7 @@ public class FailoverTransport implements
CompositeTransport {
l.add(failedConnectTransportURI);
}
- LOG.debug("urlList connectionList:{}, from: {}", l, uris);
+ LOG.debug("urlList connectionList:{}, from: {}", l, uniqueUris);
return l;
}
@@ -926,7 +928,7 @@ public class FailoverTransport implements
CompositeTransport {
final boolean doReconnect() {
Exception failure = null;
synchronized (reconnectMutex) {
-
+ List<URI> connectList = null;
// First ensure we are up to date.
doUpdateURIsFromDisk();
@@ -936,7 +938,7 @@ public class FailoverTransport implements
CompositeTransport {
if ((connectedTransport.get() != null && !doRebalance &&
!priorityBackupAvailable) || disposed || connectionFailure != null) {
return false;
} else {
- List<URI> connectList = getConnectList();
+ connectList = getConnectList();
if (connectList.isEmpty()) {
failure = new IOException("No uris available to connect
to.");
} else {
@@ -1077,7 +1079,7 @@ public class FailoverTransport implements
CompositeTransport {
connectFailures++;
if (reconnectLimit != INFINITE && connectFailures >=
reconnectLimit) {
- LOG.error("Failed to connect to {} after: {} attempt(s)",
uris, connectFailures);
+ LOG.error("Failed to connect to {} after: {} attempt(s)",
connectList, connectFailures);
connectionFailure = failure;
// Make sure on initial startup, that the transportListener
has been
@@ -1098,7 +1100,7 @@ public class FailoverTransport implements
CompositeTransport {
int warnInterval = getWarnAfterReconnectAttempts();
if (warnInterval > 0 && (connectFailures == 1 || (connectFailures
% warnInterval) == 0)) {
LOG.warn("Failed to connect to {} after: {} attempt(s) with
{}, continuing to retry.",
- uris, connectFailures, (failure == null ? "?" :
failure.getLocalizedMessage()));
+ connectList, connectFailures, (failure == null ? "?"
: failure.getLocalizedMessage()));
}
}
@@ -1286,6 +1288,9 @@ public class FailoverTransport implements
CompositeTransport {
for (URI uri : updatedURIs) {
if (uri != null && !updated.contains(uri)) {
updated.add(uri);
+ if (failedConnectTransportURI != null &&
failedConnectTransportURI.equals(uri)) {
+ failedConnectTransportURI = null;
+ }
}
}
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
index 01dcce4..70d5b31 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
@@ -34,6 +34,7 @@ import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.PublishedAddressPolicy;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
@@ -43,7 +44,7 @@ import org.slf4j.LoggerFactory;
public class FailoverClusterTestSupport extends TestCase {
protected final Logger logger = LoggerFactory.getLogger(getClass());
- private static final int NUMBER_OF_CLIENTS = 30;
+ protected static final int NUMBER_OF_CLIENTS = 30;
private String clientUrl;
@@ -104,6 +105,22 @@ public class FailoverClusterTestSupport extends TestCase {
}
}
+ protected void assertAllConnected(final int expected) throws Exception {
+ assertTrue("All connections connected!", Wait.waitFor(new
Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ int connectedCount = 0;
+ for (ActiveMQConnection c : connections) {
+ if(c.getTransportChannel().isConnected()) {
+ connectedCount++;
+ }
+ }
+ logger.info("Found " + connectedCount + " of " + expected + "
connected");
+ return connectedCount == expected;
+ }
+ }));
+ }
+
protected void assertAllConnectedTo(String url) throws Exception {
for (ActiveMQConnection c : connections) {
assertEquals(url, c.getTransportChannel().getRemoteAddress());
@@ -175,6 +192,7 @@ public class FailoverClusterTestSupport extends TestCase {
connector.setUpdateClusterClients(false);
connector.setUpdateClusterClientsOnRemove(false);
}
+
connector.getPublishedAddressPolicy().setPublishedHostStrategy(PublishedAddressPolicy.PublishedHostStrategy.IPADDRESS);
}
protected void addNetworkBridge(BrokerService answer, String bridgeName,
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
index a92ceec..5b0a945 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.failover;
+import org.apache.activemq.broker.PublishedAddressPolicy;
import org.apache.activemq.broker.TransportConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -173,7 +174,7 @@ public class FailoverComplexClusterTest extends
FailoverClusterTestSupport {
initSingleTcBroker("", null, null);
Thread.sleep(2000);
- setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," +
BROKER_B_CLIENT_TC_ADDRESS +
")?useExponentialBackOff=false&initialReconnectDelay=500");
+ setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," +
BROKER_B_CLIENT_TC_ADDRESS +
")?useExponentialBackOff=false&initialReconnectDelay=500&randomize=false");
createClients(100);
Thread.sleep(5000);
@@ -236,6 +237,41 @@ public class FailoverComplexClusterTest extends
FailoverClusterTestSupport {
assertAllConnectedTo(BROKER_B_CLIENT_TC_ADDRESS);
}
+ public void testStaticInfoAvailableAfterPattialUpdate() throws Exception {
+
+ addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
+ TransportConnector connectorA =
getBroker(BROKER_A_NAME).addConnector(BROKER_A_CLIENT_TC_ADDRESS);
+ connectorA.setName("openwire");
+ connectorA.setRebalanceClusterClients(true);
+ connectorA.setUpdateClusterClients(true);
+
connectorA.getPublishedAddressPolicy().setPublishedHostStrategy(PublishedAddressPolicy.PublishedHostStrategy.IPADDRESS);
+
+ getBroker(BROKER_A_NAME).start();
+
+ setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS +
"?trace=true," + BROKER_B_CLIENT_TC_ADDRESS +
"?trace=true)?useExponentialBackOff=false&initialReconnectDelay=500");
+ createClients(1);
+
+ assertAllConnectedTo(BROKER_A_CLIENT_TC_ADDRESS);
+
+ getBroker(BROKER_A_NAME).stop();
+
+
+ addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
+ TransportConnector connectorB =
getBroker(BROKER_B_NAME).addConnector(BROKER_B_CLIENT_TC_ADDRESS);
+ connectorB.setName("openwire");
+ connectorB.setRebalanceClusterClients(true);
+ connectorB.setUpdateClusterClients(true);
+
connectorB.getPublishedAddressPolicy().setPublishedHostStrategy(PublishedAddressPolicy.PublishedHostStrategy.IPADDRESS);
+
+ getBroker(BROKER_B_NAME).start();
+
+ getBroker(BROKER_B_NAME).waitUntilStarted();
+ Thread.sleep(1000);
+
+ // verify can connect?
+ assertAllConnectedTo(BROKER_B_CLIENT_TC_ADDRESS);
+ }
+
/**
* Runs a 3 Broker dynamic failover test: <br/>
* <ul>
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
index 19addc3..32bfdc9 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
@@ -31,7 +31,7 @@ public class TwoBrokerFailoverClusterTest extends
FailoverClusterTestSupport {
getBroker(BROKER_B_NAME).waitUntilStarted();
Thread.sleep(2000);
- setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," +
BROKER_B_CLIENT_TC_ADDRESS + ")");
+ setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," +
BROKER_B_CLIENT_TC_ADDRESS +
")?randomize=false&jms.watchTopicAdvisories=false");
createClients();
Thread.sleep(5000);
@@ -46,14 +46,17 @@ public class TwoBrokerFailoverClusterTest extends
FailoverClusterTestSupport {
Thread.sleep(1000);
+ assertAllConnected(NUMBER_OF_CLIENTS);
assertAllConnectedTo(BROKER_B_CLIENT_TC_ADDRESS);
Thread.sleep(5000);
+ logger.info("Restarting A");
createBrokerA(false, "", null, null);
getBroker(BROKER_A_NAME).waitUntilStarted();
Thread.sleep(5000);
+ assertAllConnected(NUMBER_OF_CLIENTS);
assertClientsConnectedToTwoBrokers();
assertClientsConnectionsEvenlyDistributed(.35);
}