This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit eb7b0b09467c0395dbb5d628ccb93734feae1f5e
Author: Clebert Suconic <clebertsuco...@apache.org>
AuthorDate: Tue Apr 16 14:43:02 2024 -0400

    ARTEMIS-4725 Mirroring tests using multiple versions
---
 tests/compatibility-tests/pom.xml                  |  22 +++
 .../artemis/tests/compatibility/GroovyRun.java     |   1 +
 .../multiVersionMirror/backupServer.groovy         |  19 ++-
 .../resources/multiVersionMirror/mainServer.groovy |  11 +-
 .../tests/compatibility/MirroredVersionTest.java   | 163 +++++++++++++++++++--
 5 files changed, 194 insertions(+), 22 deletions(-)

diff --git a/tests/compatibility-tests/pom.xml 
b/tests/compatibility-tests/pom.xml
index f5b89dac9e..dda16306fe 100644
--- a/tests/compatibility-tests/pom.xml
+++ b/tests/compatibility-tests/pom.xml
@@ -445,6 +445,28 @@
                         </configuration>
                      </execution>
                      <execution>
+                        <phase>compile</phase>
+                        <goals>
+                           <goal>dependency-scan</goal>
+                        </goals>
+                        <id>2_33_0-check</id>
+                        <configuration>
+                           <optional>true</optional>
+                           <libListWithDeps>
+                              
<arg>org.apache.activemq:artemis-jms-server:2.33.0</arg>
+                              <arg>org.apache.activemq:artemis-cli:2.33.0</arg>
+                              
<arg>org.apache.activemq:artemis-jms-client:2.33.0</arg>
+                              
<arg>org.apache.activemq:artemis-hornetq-protocol:2.33.0</arg>
+                              
<arg>org.apache.activemq:artemis-amqp-protocol:2.33.0</arg>
+                              
<arg>org.apache.groovy:groovy-all:pom:${groovy.version}</arg>
+                           </libListWithDeps>
+                           <libList>
+                              
<arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
+                           </libList>
+                           <file>${basedir}/target/ARTEMIS-2_33_0.cp</file>
+                        </configuration>
+                     </execution>
+                      <execution>
                         <phase>compile</phase>
                         <goals>
                            <goal>dependency-scan</goal>
diff --git 
a/tests/compatibility-tests/src/main/java/org/apache/activemq/artemis/tests/compatibility/GroovyRun.java
 
b/tests/compatibility-tests/src/main/java/org/apache/activemq/artemis/tests/compatibility/GroovyRun.java
index c51e7dff72..3a73aa3b75 100644
--- 
a/tests/compatibility-tests/src/main/java/org/apache/activemq/artemis/tests/compatibility/GroovyRun.java
+++ 
b/tests/compatibility-tests/src/main/java/org/apache/activemq/artemis/tests/compatibility/GroovyRun.java
@@ -43,6 +43,7 @@ public class GroovyRun {
    public static final String TWO_EIGHTEEN_ZERO = "ARTEMIS-2_18_0";
    public static final String TWO_TWENTYTWO_ZERO = "ARTEMIS-2_22_0";
    public static final String TWO_TWENTYEIGHT_ZERO = "ARTEMIS-2_28_0";
+   public static final String TWO_THIRTYTHREE_ZERO = "ARTEMIS-2_33_0";
    public static final String HORNETQ_235 = "HORNETQ-235";
    public static final String HORNETQ_247 = "HORNETQ-247";
    public static final String AMQ_5_11 = "AMQ_5_11";
diff --git 
a/tests/compatibility-tests/src/main/resources/multiVersionMirror/backupServer.groovy
 
b/tests/compatibility-tests/src/main/resources/multiVersionMirror/backupServer.groovy
index da024c4c89..035d8beb30 100644
--- 
a/tests/compatibility-tests/src/main/resources/multiVersionMirror/backupServer.groovy
+++ 
b/tests/compatibility-tests/src/main/resources/multiVersionMirror/backupServer.groovy
@@ -29,6 +29,9 @@ import 
org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ
 
 String folder = arg[0];
 String id = arg[1];
+String queueName = arg[2]
+String topicName = arg[3]
+boolean useDual = Boolean.parseBoolean(arg[4])
 
 configuration = new ConfigurationImpl();
 configuration.setJournalType(JournalType.NIO);
@@ -37,8 +40,20 @@ configuration.addAcceptorConfiguration("artemis", 
"tcp://localhost:61617");
 configuration.setSecurityEnabled(false);
 configuration.setPersistenceEnabled(true);
 
-configuration.addAddressConfiguration(new 
CoreAddressConfiguration().setName("TestQueue").addRoutingType(RoutingType.ANYCAST));
-configuration.addQueueConfiguration(new 
QueueConfiguration("TestQueue").setAddress("TestQueue").setRoutingType(RoutingType.ANYCAST));
+configuration.addAddressConfiguration(new 
CoreAddressConfiguration().setName(queueName).addRoutingType(RoutingType.ANYCAST));
+configuration.addQueueConfiguration(new 
QueueConfiguration(queueName).setAddress(queueName).setRoutingType(RoutingType.ANYCAST));
+configuration.addAddressConfiguration(new 
CoreAddressConfiguration().setName(topicName).addRoutingType(RoutingType.MULTICAST));
+
+if (useDual) {
+    try {
+        AMQPBrokerConnectConfiguration connection = new 
AMQPBrokerConnectConfiguration("mirror", 
"tcp://localhost:61616").setReconnectAttempts(-1).setRetryInterval(100);
+        AMQPMirrorBrokerConnectionElement replication = new 
AMQPMirrorBrokerConnectionElement().setDurable(true).setSync(false).setMessageAcknowledgements(true)
+        connection.addElement(replication);
+        configuration.addAMQPConnection(connection);
+    } catch (Throwable ignored) {
+    }
+}
+
 
 theBackupServer = new EmbeddedActiveMQ();
 theBackupServer.setConfiguration(configuration);
diff --git 
a/tests/compatibility-tests/src/main/resources/multiVersionMirror/mainServer.groovy
 
b/tests/compatibility-tests/src/main/resources/multiVersionMirror/mainServer.groovy
index 921d88006b..70d6ab11cb 100644
--- 
a/tests/compatibility-tests/src/main/resources/multiVersionMirror/mainServer.groovy
+++ 
b/tests/compatibility-tests/src/main/resources/multiVersionMirror/mainServer.groovy
@@ -29,6 +29,8 @@ import 
org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ
 
 String folder = arg[0];
 String id = arg[1];
+String queueName = arg[2]
+String topicName = arg[3]
 
 configuration = new ConfigurationImpl();
 configuration.setJournalType(JournalType.NIO);
@@ -37,12 +39,13 @@ configuration.addAcceptorConfiguration("artemis", 
"tcp://localhost:61616");
 configuration.setSecurityEnabled(false);
 configuration.setPersistenceEnabled(true);
 
-configuration.addAddressConfiguration(new 
CoreAddressConfiguration().setName("TestQueue").addRoutingType(RoutingType.ANYCAST));
-configuration.addQueueConfiguration(new 
QueueConfiguration("TestQueue").setAddress("TestQueue").setRoutingType(RoutingType.ANYCAST));
+configuration.addAddressConfiguration(new 
CoreAddressConfiguration().setName(queueName).addRoutingType(RoutingType.ANYCAST));
+configuration.addQueueConfiguration(new 
QueueConfiguration(queueName).setAddress(queueName).setRoutingType(RoutingType.ANYCAST));
+configuration.addAddressConfiguration(new 
CoreAddressConfiguration().setName(topicName).addRoutingType(RoutingType.MULTICAST));
 
 try {
     AMQPBrokerConnectConfiguration connection = new 
AMQPBrokerConnectConfiguration("mirror", 
"tcp://localhost:61617").setReconnectAttempts(-1).setRetryInterval(100);
-    AMQPMirrorBrokerConnectionElement replication = new 
AMQPMirrorBrokerConnectionElement().setDurable(true).setSync(true).setMessageAcknowledgements(true);
+    AMQPMirrorBrokerConnectionElement replication = new 
AMQPMirrorBrokerConnectionElement().setDurable(true).setSync(false).setMessageAcknowledgements(true)
     connection.addElement(replication);
     configuration.addAMQPConnection(connection);
 } catch (Throwable ignored) {
@@ -50,4 +53,4 @@ try {
 
 theMainServer = new EmbeddedActiveMQ();
 theMainServer.setConfiguration(configuration);
-theMainServer.start();
+theMainServer.start();
\ No newline at end of file
diff --git 
a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/MirroredVersionTest.java
 
b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/MirroredVersionTest.java
index b5f908c231..b6fb3860a9 100644
--- 
a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/MirroredVersionTest.java
+++ 
b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/MirroredVersionTest.java
@@ -23,6 +23,8 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.io.File;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.lang.invoke.MethodHandles;
@@ -30,7 +32,10 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
 import org.apache.activemq.artemis.tests.compatibility.base.ClasspathBase;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.apache.activemq.artemis.utils.Wait;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.junit.After;
 import org.junit.Assert;
@@ -41,32 +46,41 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static 
org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
+import static 
org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_THIRTYTHREE_ZERO;
 import static 
org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_TWENTYEIGHT_ZERO;
 
 @RunWith(Parameterized.class)
 public class MirroredVersionTest extends ClasspathBase {
 
+   private static final String QUEUE_NAME = "MirroredQueue";
+   private static final String TOPIC_NAME = "MirroredTopic";
+
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
    private final ClassLoader mainClassloader;
 
    private final ClassLoader backupClassLoader;
 
+   private final boolean useDual;
+
 
-   @Parameterized.Parameters(name = "BrokerA={0}, BrokerB={1}")
+   @Parameterized.Parameters(name = "BrokerA={0}, BrokerB={1}, dualMirror={2}")
    public static Collection getParameters() {
       List<Object[]> combinations = new ArrayList<>();
-      combinations.add(new Object[]{TWO_TWENTYEIGHT_ZERO, SNAPSHOT});
-      combinations.add(new Object[]{SNAPSHOT, TWO_TWENTYEIGHT_ZERO});
-      // The SNAPSHOT/SNAPSHOT is here as a test validation only, like in 
other cases where SNAPSHOT/SNAPSHOT is used.
-      combinations.add(new Object[]{SNAPSHOT, SNAPSHOT});
+      combinations.add(new Object[]{TWO_THIRTYTHREE_ZERO, SNAPSHOT, true});
+      combinations.add(new Object[]{SNAPSHOT, TWO_THIRTYTHREE_ZERO, true});
+      combinations.add(new Object[]{TWO_TWENTYEIGHT_ZERO, SNAPSHOT, false});
+      combinations.add(new Object[]{SNAPSHOT, TWO_TWENTYEIGHT_ZERO, false});
+      combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, true});
       return combinations;
    }
 
-   public MirroredVersionTest(String main, String backup) throws Exception {
+   public MirroredVersionTest(String main, String backup, boolean useDual) 
throws Exception {
       this.mainClassloader = getClasspath(main);
 
       this.backupClassLoader = getClasspath(backup);
+
+      this.useDual = useDual;
    }
 
    @After
@@ -79,6 +93,9 @@ public class MirroredVersionTest extends ClasspathBase {
          evaluate(backupClassLoader, 
"multiVersionMirror/backupServerStop.groovy");
       } catch (Exception ignored) {
       }
+
+      FileUtil.deleteDirectory(new 
File(serverFolder.getRoot().getAbsolutePath(), "1"));
+      FileUtil.deleteDirectory(new 
File(serverFolder.getRoot().getAbsolutePath(), "2"));
    }
 
    private String createBody(int size) {
@@ -93,26 +110,24 @@ public class MirroredVersionTest extends ClasspathBase {
 
    @Test
    public void testMirrorReplica() throws Throwable {
-      testMirrorReplicat(100);
+      testMirrorReplica(100);
    }
 
    @Test
    public void testMirrorReplicaLM() throws Throwable {
-      testMirrorReplicat(300 * 1024);
+      testMirrorReplica(300 * 1024);
    }
 
-   public void testMirrorReplicat(int stringSize) throws Throwable {
+   public void testMirrorReplica(int stringSize) throws Throwable {
       String body = createBody(stringSize);
       logger.debug("Starting live");
-      evaluate(mainClassloader, "multiVersionMirror/mainServer.groovy", 
serverFolder.getRoot().getAbsolutePath(), "1");
-      logger.debug("Starting backup");
-      evaluate(backupClassLoader, "multiVersionMirror/backupServer.groovy", 
serverFolder.getRoot().getAbsolutePath(), "2");
+      startMainBroker();
 
       ConnectionFactory factoryMain = new 
JmsConnectionFactory("amqp://localhost:61616");
 
       try (Connection connection = factoryMain.createConnection()) {
          Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
-         MessageProducer producer = 
session.createProducer(session.createQueue("TestQueue"));
+         MessageProducer producer = 
session.createProducer(session.createQueue(QUEUE_NAME));
          for (int i = 0; i < 10; i++) {
             TextMessage message = session.createTextMessage("hello " + i + 
body);
             message.setIntProperty("count", i);
@@ -121,11 +136,18 @@ public class MirroredVersionTest extends ClasspathBase {
          session.commit();
       }
 
+      logger.debug("restarting main server");
+      evaluate(mainClassloader, "multiVersionMirror/mainServerStop.groovy");
+      startMainBroker();
+
+      logger.debug("starting backup");
+      startBackupBroker();
+
       ConnectionFactory factoryReplica = new 
JmsConnectionFactory("amqp://localhost:61617");
 
       try (Connection connection = factoryReplica.createConnection()) {
          Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
-         MessageConsumer consumer = 
session.createConsumer(session.createQueue("TestQueue"));
+         MessageConsumer consumer = 
session.createConsumer(session.createQueue(QUEUE_NAME));
          connection.start();
          for (int i = 0; i < 10; i++) {
             TextMessage message = (TextMessage) consumer.receive(5000);
@@ -139,11 +161,11 @@ public class MirroredVersionTest extends ClasspathBase {
       logger.debug("Restarting backup");
 
       evaluate(backupClassLoader, 
"multiVersionMirror/backupServerStop.groovy");
-      evaluate(backupClassLoader, "multiVersionMirror/backupServer.groovy", 
serverFolder.getRoot().getAbsolutePath(), "2");
+      startBackupBroker();
 
       try (Connection connection = factoryReplica.createConnection()) {
          Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
-         MessageConsumer consumer = 
session.createConsumer(session.createQueue("TestQueue"));
+         MessageConsumer consumer = 
session.createConsumer(session.createQueue(QUEUE_NAME));
          connection.start();
          for (int i = 0; i < 10; i++) {
             TextMessage message = (TextMessage) consumer.receive(5000);
@@ -154,4 +176,113 @@ public class MirroredVersionTest extends ClasspathBase {
          session.commit();
       }
    }
+
+   private void startMainBroker() throws Exception {
+      evaluate(mainClassloader, "multiVersionMirror/mainServer.groovy", 
serverFolder.getRoot().getAbsolutePath(), "1", QUEUE_NAME, TOPIC_NAME);
+   }
+
+   private void startBackupBroker() throws Exception {
+      evaluate(backupClassLoader, "multiVersionMirror/backupServer.groovy", 
serverFolder.getRoot().getAbsolutePath(), "2", QUEUE_NAME, TOPIC_NAME, 
String.valueOf(useDual));
+   }
+
+   @Test
+   public void testTopic() throws Throwable {
+      int stringSize = 100;
+      String body = createBody(stringSize);
+      logger.debug("Starting live");
+      startMainBroker();
+      logger.debug("Starting backup");
+      startBackupBroker();
+
+      String clientID1 = "CONNECTION_1";
+      String clientID2 = "CONNECTION_2";
+
+      String sub1 = "SUB_1";
+      String sub2 = "SUB_2";
+
+      ConnectionFactory factoryMain = new 
JmsConnectionFactory("amqp://localhost:61616");
+
+      try (javax.jms.Connection connection = factoryMain.createConnection()) {
+         connection.setClientID(clientID1);
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         Topic topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createDurableConsumer(topic, sub1);
+      }
+      try (javax.jms.Connection connection = factoryMain.createConnection()) {
+         connection.setClientID(clientID2);
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         Topic topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createDurableConsumer(topic, sub2);
+      }
+
+      evaluate(backupClassLoader, 
"multiVersionMirror/backupServerStop.groovy");
+
+      try (Connection connection = factoryMain.createConnection()) {
+         Session session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
+         Topic topic = session.createTopic(TOPIC_NAME);
+         MessageProducer producer = session.createProducer(null);
+         for (int i = 0; i < 10; i++) {
+            TextMessage message = session.createTextMessage("hello " + i + 
body);
+            message.setIntProperty("count", i);
+            producer.send(topic, message);
+         }
+         session.commit();
+      }
+
+      evaluate(mainClassloader, "multiVersionMirror/mainServerStop.groovy");
+      startBackupBroker();
+      startMainBroker();
+
+      ConnectionFactory factoryReplica = new 
JmsConnectionFactory("amqp://localhost:61617");
+
+      try (Connection connection = factoryReplica.createConnection()) {
+         connection.setClientID(clientID1);
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         Topic topic = session.createTopic(TOPIC_NAME);
+         connection.start();
+         MessageConsumer consumer = session.createDurableConsumer(topic, sub1);
+         for (int i = 0; i < 10; i++) {
+            TextMessage message = (TextMessage)consumer.receive(5000);
+            Assert.assertNotNull(message);
+         }
+         session.rollback();
+      }
+
+      logger.debug("Restarting backup");
+      evaluate(backupClassLoader, 
"multiVersionMirror/backupServerStop.groovy");
+      startBackupBroker();
+
+      try (Connection connection = factoryReplica.createConnection()) {
+         connection.setClientID(clientID1);
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         Topic topic = session.createTopic(TOPIC_NAME);
+         connection.start();
+         MessageConsumer consumer = session.createDurableConsumer(topic, sub1);
+         for (int i = 0; i < 10; i++) {
+            TextMessage message = (TextMessage)consumer.receive(5000);
+            Assert.assertNotNull(message);
+         }
+         session.commit();
+      }
+
+      try (Connection connection = factoryReplica.createConnection()) {
+         connection.setClientID(clientID2);
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         Topic topic = session.createTopic(TOPIC_NAME);
+         connection.start();
+         MessageConsumer consumer = session.createDurableConsumer(topic, sub2);
+         for (int i = 0; i < 10; i++) {
+            TextMessage message = (TextMessage)consumer.receive(5000);
+            Assert.assertNotNull(message);
+         }
+         session.commit();
+      }
+
+      if (useDual) {
+         SimpleManagement simpleManagementMainServer = new 
SimpleManagement("tcp://localhost:61616", null, null);
+         Wait.assertEquals(0, () -> 
simpleManagementMainServer.getMessageCountOnQueue(clientID1 + "." + sub1), 
5000);
+         Wait.assertEquals(0, () -> 
simpleManagementMainServer.getMessageCountOnQueue(clientID2 + "." + sub2), 
5000);
+      }
+   }
+
 }
\ No newline at end of file

Reply via email to