This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new fa67499  NO-JIRA Avoiding Intermittent failures on FederatedQueueTest
fa67499 is described below

commit fa67499509129409a2c7755d2176277d7303eb7e
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Apr 8 16:49:57 2020 -0400

    NO-JIRA Avoiding Intermittent failures on FederatedQueueTest
---
 .../integration/federation/FederatedQueueTest.java | 75 ++++++++--------------
 .../integration/federation/FederatedTestBase.java  | 20 ++++++
 2 files changed, 47 insertions(+), 48 deletions(-)

diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java
index 7dcd81e..5af8de1 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java
@@ -26,13 +26,14 @@ import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.FederationConfiguration;
 import 
org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
 import 
org.apache.activemq.artemis.core.config.federation.FederationUpstreamConfiguration;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.transformer.Transformer;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.Before;
@@ -51,6 +52,12 @@ public class FederatedQueueTest extends FederatedTestBase {
    }
 
 
+   @Override
+   protected void configureQueues(ActiveMQServer server) throws Exception {
+      server.getAddressSettingsRepository().addMatch("#", new 
AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false));
+      createSimpleQueue(server, getName());
+   }
+
    protected ConnectionFactory getCF(int i) throws Exception {
       return new ActiveMQConnectionFactory("vm://" + i);
    }
@@ -110,10 +117,8 @@ public class FederatedQueueTest extends FederatedTestBase {
          MessageConsumer consumer0 = session0.createConsumer(queue0);
          MessageConsumer consumer1 = session1.createConsumer(queue1);
 
-         Wait.assertTrue(() -> 
getServer(1).getPostOffice().getBinding(SimpleString.toSimpleString(queueName)) 
!= null);
-         //Wait for local and federated consumer to be established on Server 1
-         assertTrue(Wait.waitFor(() -> 
getServer(1).locateQueue(SimpleString.toSimpleString(queueName)).getConsumerCount()
 == 2,
-                                 5000, 100));
+
+         Wait.waitFor(() -> getConsumerCount(getServer(1), queueName, 2));
 
          MessageProducer producer1 = session1.createProducer(queue1);
          producer1.send(session1.createTextMessage("hello"));
@@ -265,10 +270,6 @@ public class FederatedQueueTest extends FederatedTestBase {
    @Test
    public void testFederatedQueueBiDirectionalUpstream() throws Exception {
       String queueName = getName();
-      //Set queue up on both brokers
-      for (int i = 0; i < 2; i++) {
-         getServer(i).createQueue(SimpleString.toSimpleString(queueName), 
RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
-      }
       FederationConfiguration federationConfiguration0 = 
FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", 
queueName);
       
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0);
       getServer(0).getFederationManager().deploy();
@@ -283,10 +284,6 @@ public class FederatedQueueTest extends FederatedTestBase {
    @Test
    public void testFederatedQueueBiDirectionalDownstream() throws Exception {
       String queueName = getName();
-      //Set queue up on both brokers
-      for (int i = 0; i < 2; i++) {
-         getServer(i).createQueue(SimpleString.toSimpleString(queueName), 
RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
-      }
       FederationConfiguration federationConfiguration0 = 
FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1", 
queueName, "server0");
       
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0);
       getServer(0).getFederationManager().deploy();
@@ -301,10 +298,6 @@ public class FederatedQueueTest extends FederatedTestBase {
    @Test
    public void testFederatedQueueBiDirectionalDownstreamUpstream() throws 
Exception {
       String queueName = getName();
-      //Set queue up on both brokers
-      for (int i = 0; i < 2; i++) {
-         getServer(i).createQueue(SimpleString.toSimpleString(queueName), 
RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
-      }
 
       FederationConfiguration federationConfiguration0 = 
FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream",
           "server1", queueName, null, false, "server0");
@@ -319,10 +312,6 @@ public class FederatedQueueTest extends FederatedTestBase {
    @Test
    public void 
testFederatedQueueBiDirectionalDownstreamUpstreamSharedConnection() throws 
Exception {
       String queueName = getName();
-      //Set queue up on both brokers
-      for (int i = 0; i < 2; i++) {
-         getServer(i).createQueue(SimpleString.toSimpleString(queueName), 
RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
-      }
 
       FederationConfiguration federationConfiguration0 = 
FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream",
           "server1", queueName, null, true, "server0");
@@ -338,10 +327,6 @@ public class FederatedQueueTest extends FederatedTestBase {
    @Test
    public void testFederatedQueueShareUpstreamConnectionFalse() throws 
Exception {
       String queueName = getName();
-      //Set queue up on both brokers
-      for (int i = 0; i < 2; i++) {
-         getServer(i).createQueue(SimpleString.toSimpleString(queueName), 
RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
-      }
 
       FederationConfiguration federationConfiguration0 = 
FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream",
           "server1", queueName, null, false, "server0");
@@ -355,10 +340,6 @@ public class FederatedQueueTest extends FederatedTestBase {
    @Test
    public void testFederatedQueueShareUpstreamConnectionTrue() throws 
Exception {
       String queueName = getName();
-      //Set queue up on both brokers
-      for (int i = 0; i < 2; i++) {
-         getServer(i).createQueue(SimpleString.toSimpleString(queueName), 
RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
-      }
 
       FederationConfiguration federationConfiguration0 = 
FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream",
           "server1", queueName, null, true, "server0");
@@ -459,11 +440,6 @@ public class FederatedQueueTest extends FederatedTestBase {
    public void testFederatedQueueChainOfBrokers() throws Exception {
       String queueName = getName();
 
-      //Set queue up on all three brokers
-      for (int i = 0; i < 3; i++) {
-         getServer(i).createQueue(SimpleString.toSimpleString(queueName), 
RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
-      }
-
       //Connect broker 0 (consumer will be here at end of chain) to broker 1
       FederationConfiguration federationConfiguration0 = 
FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", 
queueName, true);
       
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0);
@@ -502,11 +478,6 @@ public class FederatedQueueTest extends FederatedTestBase {
    public void testFederatedQueueRemoteBrokerRestart() throws Exception {
       String queueName = getName();
 
-      //Set queue up on both brokers
-      for (int i = 0; i < 2; i++) {
-         getServer(i).createQueue(SimpleString.toSimpleString(queueName), 
RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
-      }
-
       FederationConfiguration federationConfiguration = 
FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", 
queueName);
       
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
       getServer(0).getFederationManager().deploy();
@@ -538,6 +509,8 @@ public class FederatedQueueTest extends FederatedTestBase {
       assertNull(consumer0.receiveNoWait());
 
       getServer(1).start();
+      Wait.assertTrue(getServer(1)::isActive);
+      createSimpleQueue(getServer(1), getName());
 
       connection1 = cf1.createConnection();
       connection1.start();
@@ -546,22 +519,27 @@ public class FederatedQueueTest extends FederatedTestBase 
{
       producer = session1.createProducer(queue1);
       producer.send(session1.createTextMessage("hello"));
 
-      Wait.assertTrue(() -> 
getServer(1).getPostOffice().getBinding(SimpleString.toSimpleString(queueName)) 
!= null);
-
-      Wait.waitFor(() -> ((QueueBinding) 
getServer(1).getPostOffice().getBinding(SimpleString.toSimpleString(queueName))).consumerCount()
 == 1);
+      Wait.waitFor(() -> getConsumerCount(getServer(1), queueName, 1));
 
       assertNotNull(consumer0.receive(1000));
    }
 
+   private boolean getConsumerCount(ActiveMQServer server, String queueName, 
int count) {
+      QueueBinding binding = 
(QueueBinding)server.getPostOffice().getBinding(SimpleString.toSimpleString(queueName));
+      if (binding == null) {
+         return false;
+      }
+      if (binding.consumerCount() != count) {
+         return false;
+      }
+
+      return true;
+   }
+
    @Test
    public void testFederatedQueueLocalBrokerRestart() throws Exception {
       String queueName = getName();
 
-      //Set queue up on both brokers
-      for (int i = 0; i < 2; i++) {
-         getServer(i).createQueue(SimpleString.toSimpleString(queueName), 
RoutingType.ANYCAST, SimpleString.toSimpleString(queueName), null, true, false);
-      }
-
       FederationConfiguration federationConfiguration = 
FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", 
queueName);
       
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
       getServer(0).getFederationManager().deploy();
@@ -594,6 +572,7 @@ public class FederatedQueueTest extends FederatedTestBase {
 
       getServer(0).start();
       Wait.waitFor(() -> getServer(0).isActive());
+      createSimpleQueue(getServer(0), getName());
 
       connection0 = getCF(0).createConnection();
       connection0.start();
@@ -608,7 +587,7 @@ public class FederatedQueueTest extends FederatedTestBase {
             .getBinding(SimpleString.toSimpleString(queueName)))
             .consumerCount() == 1);
 
-      assertNotNull(consumer0.receive(1000));
+      assertNotNull(consumer0.receive(5000));
    }
 
    private Message createTextMessage(Session session1, String group) throws 
JMSException {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedTestBase.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedTestBase.java
index 00f711d..1cb6f19 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedTestBase.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedTestBase.java
@@ -20,9 +20,13 @@ import java.util.ArrayList;
 import java.util.List;
 import javax.management.MBeanServer;
 import javax.management.MBeanServerFactory;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.junit.Before;
 
@@ -47,11 +51,27 @@ public class FederatedTestBase extends ActiveMQTestBase {
             config.addConnectorConfiguration("server" + j, "vm://" + j);
          }
          ActiveMQServer server = 
addServer(ActiveMQServers.newActiveMQServer(config, mBeanServer, false));
+
          servers.add(server);
          server.start();
+
+         configureQueues(server);
       }
    }
 
+   protected void configureQueues(ActiveMQServer server) throws Exception {
+   }
+
+   protected void createSimpleQueue(ActiveMQServer server, String queueName) 
throws Exception {
+      SimpleString simpleStringQueueName = 
SimpleString.toSimpleString(queueName);
+      try {
+         server.addAddressInfo(new AddressInfo(simpleStringQueueName, 
RoutingType.ANYCAST));
+         server.createQueue(simpleStringQueueName, RoutingType.ANYCAST, 
simpleStringQueueName, null, true, false);
+      } catch (Exception ignored) {
+      }
+
+   }
+
    protected int numberOfServers() {
       return 3;
    }

Reply via email to