This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new fa67499 NO-JIRA Avoiding Intermittent failures on FederatedQueueTest
fa67499 is described below
commit fa67499509129409a2c7755d2176277d7303eb7e
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Apr 8 16:49:57 2020 -0400
NO-JIRA Avoiding Intermittent failures on FederatedQueueTest
---
.../integration/federation/FederatedQueueTest.java | 75 ++++++++--------------
.../integration/federation/FederatedTestBase.java | 20 ++++++
2 files changed, 47 insertions(+), 48 deletions(-)
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java
index 7dcd81e..5af8de1 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java
@@ -26,13 +26,14 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import
org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
import
org.apache.activemq.artemis.core.config.federation.FederationUpstreamConfiguration;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Before;
@@ -51,6 +52,12 @@ public class FederatedQueueTest extends FederatedTestBase {
}
+ @Override
+ protected void configureQueues(ActiveMQServer server) throws Exception {
+ server.getAddressSettingsRepository().addMatch("#", new
AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false));
+ createSimpleQueue(server, getName());
+ }
+
protected ConnectionFactory getCF(int i) throws Exception {
return new ActiveMQConnectionFactory("vm://" + i);
}
@@ -110,10 +117,8 @@ public class FederatedQueueTest extends FederatedTestBase {
MessageConsumer consumer0 = session0.createConsumer(queue0);
MessageConsumer consumer1 = session1.createConsumer(queue1);
- Wait.assertTrue(() ->
getServer(1).getPostOffice().getBinding(SimpleString.toSimpleString(queueName))
!= null);
- //Wait for local and federated consumer to be established on Server 1
- assertTrue(Wait.waitFor(() ->
getServer(1).locateQueue(SimpleString.toSimpleString(queueName)).getConsumerCount()
== 2,
- 5000, 100));
+
+ Wait.waitFor(() -> getConsumerCount(getServer(1), queueName, 2));
MessageProducer producer1 = session1.createProducer(queue1);
producer1.send(session1.createTextMessage("hello"));
@@ -265,10 +270,6 @@ public class FederatedQueueTest extends FederatedTestBase {
@Test
public void testFederatedQueueBiDirectionalUpstream() throws Exception {
String queueName = getName();
- //Set queue up on both brokers
- for (int i = 0; i < 2; i++) {
- getServer(i).createQueue(SimpleString.toSimpleString(queueName),
RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
- }
FederationConfiguration federationConfiguration0 =
FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1",
queueName);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0);
getServer(0).getFederationManager().deploy();
@@ -283,10 +284,6 @@ public class FederatedQueueTest extends FederatedTestBase {
@Test
public void testFederatedQueueBiDirectionalDownstream() throws Exception {
String queueName = getName();
- //Set queue up on both brokers
- for (int i = 0; i < 2; i++) {
- getServer(i).createQueue(SimpleString.toSimpleString(queueName),
RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
- }
FederationConfiguration federationConfiguration0 =
FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1",
queueName, "server0");
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0);
getServer(0).getFederationManager().deploy();
@@ -301,10 +298,6 @@ public class FederatedQueueTest extends FederatedTestBase {
@Test
public void testFederatedQueueBiDirectionalDownstreamUpstream() throws
Exception {
String queueName = getName();
- //Set queue up on both brokers
- for (int i = 0; i < 2; i++) {
- getServer(i).createQueue(SimpleString.toSimpleString(queueName),
RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
- }
FederationConfiguration federationConfiguration0 =
FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream",
"server1", queueName, null, false, "server0");
@@ -319,10 +312,6 @@ public class FederatedQueueTest extends FederatedTestBase {
@Test
public void
testFederatedQueueBiDirectionalDownstreamUpstreamSharedConnection() throws
Exception {
String queueName = getName();
- //Set queue up on both brokers
- for (int i = 0; i < 2; i++) {
- getServer(i).createQueue(SimpleString.toSimpleString(queueName),
RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
- }
FederationConfiguration federationConfiguration0 =
FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream",
"server1", queueName, null, true, "server0");
@@ -338,10 +327,6 @@ public class FederatedQueueTest extends FederatedTestBase {
@Test
public void testFederatedQueueShareUpstreamConnectionFalse() throws
Exception {
String queueName = getName();
- //Set queue up on both brokers
- for (int i = 0; i < 2; i++) {
- getServer(i).createQueue(SimpleString.toSimpleString(queueName),
RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
- }
FederationConfiguration federationConfiguration0 =
FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream",
"server1", queueName, null, false, "server0");
@@ -355,10 +340,6 @@ public class FederatedQueueTest extends FederatedTestBase {
@Test
public void testFederatedQueueShareUpstreamConnectionTrue() throws
Exception {
String queueName = getName();
- //Set queue up on both brokers
- for (int i = 0; i < 2; i++) {
- getServer(i).createQueue(SimpleString.toSimpleString(queueName),
RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
- }
FederationConfiguration federationConfiguration0 =
FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream",
"server1", queueName, null, true, "server0");
@@ -459,11 +440,6 @@ public class FederatedQueueTest extends FederatedTestBase {
public void testFederatedQueueChainOfBrokers() throws Exception {
String queueName = getName();
- //Set queue up on all three brokers
- for (int i = 0; i < 3; i++) {
- getServer(i).createQueue(SimpleString.toSimpleString(queueName),
RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
- }
-
//Connect broker 0 (consumer will be here at end of chain) to broker 1
FederationConfiguration federationConfiguration0 =
FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1",
queueName, true);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0);
@@ -502,11 +478,6 @@ public class FederatedQueueTest extends FederatedTestBase {
public void testFederatedQueueRemoteBrokerRestart() throws Exception {
String queueName = getName();
- //Set queue up on both brokers
- for (int i = 0; i < 2; i++) {
- getServer(i).createQueue(SimpleString.toSimpleString(queueName),
RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
- }
-
FederationConfiguration federationConfiguration =
FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1",
queueName);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
@@ -538,6 +509,8 @@ public class FederatedQueueTest extends FederatedTestBase {
assertNull(consumer0.receiveNoWait());
getServer(1).start();
+ Wait.assertTrue(getServer(1)::isActive);
+ createSimpleQueue(getServer(1), getName());
connection1 = cf1.createConnection();
connection1.start();
@@ -546,22 +519,27 @@ public class FederatedQueueTest extends FederatedTestBase
{
producer = session1.createProducer(queue1);
producer.send(session1.createTextMessage("hello"));
- Wait.assertTrue(() ->
getServer(1).getPostOffice().getBinding(SimpleString.toSimpleString(queueName))
!= null);
-
- Wait.waitFor(() -> ((QueueBinding)
getServer(1).getPostOffice().getBinding(SimpleString.toSimpleString(queueName))).consumerCount()
== 1);
+ Wait.waitFor(() -> getConsumerCount(getServer(1), queueName, 1));
assertNotNull(consumer0.receive(1000));
}
+ private boolean getConsumerCount(ActiveMQServer server, String queueName,
int count) {
+ QueueBinding binding =
(QueueBinding)server.getPostOffice().getBinding(SimpleString.toSimpleString(queueName));
+ if (binding == null) {
+ return false;
+ }
+ if (binding.consumerCount() != count) {
+ return false;
+ }
+
+ return true;
+ }
+
@Test
public void testFederatedQueueLocalBrokerRestart() throws Exception {
String queueName = getName();
- //Set queue up on both brokers
- for (int i = 0; i < 2; i++) {
- getServer(i).createQueue(SimpleString.toSimpleString(queueName),
RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
- }
-
FederationConfiguration federationConfiguration =
FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1",
queueName);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
@@ -594,6 +572,7 @@ public class FederatedQueueTest extends FederatedTestBase {
getServer(0).start();
Wait.waitFor(() -> getServer(0).isActive());
+ createSimpleQueue(getServer(0), getName());
connection0 = getCF(0).createConnection();
connection0.start();
@@ -608,7 +587,7 @@ public class FederatedQueueTest extends FederatedTestBase {
.getBinding(SimpleString.toSimpleString(queueName)))
.consumerCount() == 1);
- assertNotNull(consumer0.receive(1000));
+ assertNotNull(consumer0.receive(5000));
}
private Message createTextMessage(Session session1, String group) throws
JMSException {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedTestBase.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedTestBase.java
index 00f711d..1cb6f19 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedTestBase.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedTestBase.java
@@ -20,9 +20,13 @@ import java.util.ArrayList;
import java.util.List;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Before;
@@ -47,11 +51,27 @@ public class FederatedTestBase extends ActiveMQTestBase {
config.addConnectorConfiguration("server" + j, "vm://" + j);
}
ActiveMQServer server =
addServer(ActiveMQServers.newActiveMQServer(config, mBeanServer, false));
+
servers.add(server);
server.start();
+
+ configureQueues(server);
}
}
+ protected void configureQueues(ActiveMQServer server) throws Exception {
+ }
+
+ protected void createSimpleQueue(ActiveMQServer server, String queueName)
throws Exception {
+ SimpleString simpleStringQueueName =
SimpleString.toSimpleString(queueName);
+ try {
+ server.addAddressInfo(new AddressInfo(simpleStringQueueName,
RoutingType.ANYCAST));
+ server.createQueue(simpleStringQueueName, RoutingType.ANYCAST,
simpleStringQueueName, null, true, false);
+ } catch (Exception ignored) {
+ }
+
+ }
+
protected int numberOfServers() {
return 3;
}