This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/artemis.git
commit 7331ccef8c7b7148fb73378f8193a41d2d13194b Author: Clebert Suconic <[email protected]> AuthorDate: Thu May 7 11:18:33 2026 -0400 ARTEMIS-6050 Compatibility tests using Clustering --- .../resources/multiVersionCluster/broker1.groovy | 71 +++++++++ .../multiVersionCluster/broker1Stop.groovy | 19 +++ .../broker1WaitForTopology.groovy | 42 +++++ .../resources/multiVersionCluster/broker2.groovy | 71 +++++++++ .../multiVersionCluster/broker2Stop.groovy | 19 +++ .../broker2WaitForTopology.groovy | 42 +++++ .../compatibility/MultiVersionClusterTest.java | 173 +++++++++++++++++++++ 7 files changed, 437 insertions(+) diff --git a/tests/compatibility-tests/src/main/resources/multiVersionCluster/broker1.groovy b/tests/compatibility-tests/src/main/resources/multiVersionCluster/broker1.groovy new file mode 100644 index 0000000000..2a6487182a --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/multiVersionCluster/broker1.groovy @@ -0,0 +1,71 @@ +package multiVersionCluster +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.activemq.artemis.api.core.QueueConfiguration +import org.apache.activemq.artemis.api.core.RoutingType +import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration +import org.apache.activemq.artemis.core.config.CoreAddressConfiguration +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl +import org.apache.activemq.artemis.core.server.JournalType +import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy +import org.apache.activemq.artemis.core.settings.impl.AddressSettings +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType + +String folder = arg[0]; +String id = arg[1]; +String port = arg[2]; +String otherPort = arg[3] + +configuration = new ConfigurationImpl(); +configuration.setJournalType(JournalType.NIO); +configuration.setBrokerInstance(new File(folder + "/" + id)); +configuration.addAcceptorConfiguration("artemis", "tcp://localhost:" + port); +configuration.addConnectorConfiguration("local", "tcp://localhost:" + port); +configuration.addConnectorConfiguration("other", "tcp://localhost:" + otherPort); +configuration.setSecurityEnabled(false); +configuration.setPersistenceEnabled(true); + +if (configuration.metaClass.hasMetaProperty("globalMaxMessages")) { + configuration.globalMaxMessages = 10 +} else { + configuration.globalMaxSize = 10 * 1024 +} + +configuration.addAddressSetting("#", new AddressSettings() + .setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE) + .setRedistributionDelay(0).setMaxSizeBytes(100 * 1024 * 1024).setMaxSizeMessages(10_000)); + +// Configure cluster connection +ClusterConnectionConfiguration clusterConfiguration = new ClusterConnectionConfiguration() + .setName("my-cluster") + .setConnectorName("local") + .setRetryInterval(500) + .setMessageLoadBalancingType(MessageLoadBalancingType.ON_DEMAND) + .setStaticConnectors(["other"]) + +configuration.addClusterConfiguration(clusterConfiguration) + +configuration.addAddressConfiguration(new CoreAddressConfiguration().setName("MultiVersionClusterTestQueue")); +configuration.addQueueConfiguration(new QueueConfiguration("MultiVersionClusterTestQueue") + .setAddress("MultiVersionClusterTestQueue") + .setRoutingType(RoutingType.ANYCAST)); + +theBroker1 = new EmbeddedActiveMQ(); +theBroker1.setConfiguration(configuration); +theBroker1.start(); diff --git a/tests/compatibility-tests/src/main/resources/multiVersionCluster/broker1Stop.groovy b/tests/compatibility-tests/src/main/resources/multiVersionCluster/broker1Stop.groovy new file mode 100644 index 0000000000..c418b1b4e9 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/multiVersionCluster/broker1Stop.groovy @@ -0,0 +1,19 @@ +package multiVersionCluster +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +theBroker1.stop(); diff --git a/tests/compatibility-tests/src/main/resources/multiVersionCluster/broker1WaitForTopology.groovy b/tests/compatibility-tests/src/main/resources/multiVersionCluster/broker1WaitForTopology.groovy new file mode 100644 index 0000000000..5378d95b52 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/multiVersionCluster/broker1WaitForTopology.groovy @@ -0,0 +1,42 @@ +package multiVersionCluster +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Wait for cluster to form - should see 2 members +def timeout = System.currentTimeMillis() + 10000 +def formed = false + +while (System.currentTimeMillis() < timeout && !formed) { + def clusterManager = theBroker1.getActiveMQServer().getClusterManager() + def clusterConnection = clusterManager.getClusterConnection("my-cluster") + if (clusterConnection != null) { + def topology = clusterConnection.getTopology() + if (topology != null && topology.getMembers().size() == 2) { + formed = true + break + } + } + Thread.sleep(100) +} + +if (!formed) { + throw new RuntimeException("Broker1: Cluster topology did not form in time") +} + +println("Broker1: Cluster topology formed with " + + theBroker1.getActiveMQServer().getClusterManager().getClusterConnection("my-cluster").getTopology().getMembers().size() + + " members") diff --git a/tests/compatibility-tests/src/main/resources/multiVersionCluster/broker2.groovy b/tests/compatibility-tests/src/main/resources/multiVersionCluster/broker2.groovy new file mode 100644 index 0000000000..6bda892b81 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/multiVersionCluster/broker2.groovy @@ -0,0 +1,71 @@ +package multiVersionCluster +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.activemq.artemis.api.core.QueueConfiguration +import org.apache.activemq.artemis.api.core.RoutingType +import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration +import org.apache.activemq.artemis.core.config.CoreAddressConfiguration +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl +import org.apache.activemq.artemis.core.server.JournalType +import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy +import org.apache.activemq.artemis.core.settings.impl.AddressSettings +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType + +String folder = arg[0]; +String id = arg[1]; +String port = arg[2]; +String otherPort = arg[3] + +configuration = new ConfigurationImpl(); +configuration.setJournalType(JournalType.NIO); +configuration.setBrokerInstance(new File(folder + "/" + id)); +configuration.addAcceptorConfiguration("artemis", "tcp://localhost:" + port); +configuration.addConnectorConfiguration("local", "tcp://localhost:" + port); +configuration.addConnectorConfiguration("other", "tcp://localhost:" + otherPort); +configuration.setSecurityEnabled(false); +configuration.setPersistenceEnabled(true); + +if (configuration.metaClass.hasMetaProperty("globalMaxMessages")) { + configuration.globalMaxMessages = 10 +} else { + configuration.globalMaxSize = 10 * 1024 +} + +configuration.addAddressSetting("#", new AddressSettings() + .setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE) + .setRedistributionDelay(0).setMaxSizeBytes(100 * 1024 * 1024).setMaxSizeMessages(10_000)); + +// Configure cluster connection +ClusterConnectionConfiguration clusterConfiguration = new ClusterConnectionConfiguration() + .setName("my-cluster") + .setConnectorName("local") + .setRetryInterval(500) + .setMessageLoadBalancingType(MessageLoadBalancingType.ON_DEMAND) + .setStaticConnectors(["other"]) + +configuration.addClusterConfiguration(clusterConfiguration) + +configuration.addAddressConfiguration(new CoreAddressConfiguration().setName("MultiVersionClusterTestQueue")); +configuration.addQueueConfiguration(new QueueConfiguration("MultiVersionClusterTestQueue") + .setAddress("MultiVersionClusterTestQueue") + .setRoutingType(RoutingType.ANYCAST)); + +theBroker2 = new EmbeddedActiveMQ(); +theBroker2.setConfiguration(configuration); +theBroker2.start(); diff --git a/tests/compatibility-tests/src/main/resources/multiVersionCluster/broker2Stop.groovy b/tests/compatibility-tests/src/main/resources/multiVersionCluster/broker2Stop.groovy new file mode 100644 index 0000000000..0c014a81e4 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/multiVersionCluster/broker2Stop.groovy @@ -0,0 +1,19 @@ +package multiVersionCluster +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +theBroker2.stop(); diff --git a/tests/compatibility-tests/src/main/resources/multiVersionCluster/broker2WaitForTopology.groovy b/tests/compatibility-tests/src/main/resources/multiVersionCluster/broker2WaitForTopology.groovy new file mode 100644 index 0000000000..ef794ac8c7 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/multiVersionCluster/broker2WaitForTopology.groovy @@ -0,0 +1,42 @@ +package multiVersionCluster +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Wait for cluster to form - should see 2 members +def timeout = System.currentTimeMillis() + 10000 +def formed = false + +while (System.currentTimeMillis() < timeout && !formed) { + def clusterManager = theBroker2.getActiveMQServer().getClusterManager() + def clusterConnection = clusterManager.getClusterConnection("my-cluster") + if (clusterConnection != null) { + def topology = clusterConnection.getTopology() + if (topology != null && topology.getMembers().size() == 2) { + formed = true + break + } + } + Thread.sleep(100) +} + +if (!formed) { + throw new RuntimeException("Broker2: Cluster topology did not form in time") +} + +println("Broker2: Cluster topology formed with " + + theBroker2.getActiveMQServer().getClusterManager().getClusterConnection("my-cluster").getTopology().getMembers().size() + + " members") diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/MultiVersionClusterTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/MultiVersionClusterTest.java new file mode 100644 index 0000000000..d4f034e4fd --- /dev/null +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/MultiVersionClusterTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.compatibility; + +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ARTEMIS_2_44_0; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.tests.compatibility.base.ClasspathBase; +import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension; +import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters; +import org.apache.activemq.artemis.utils.FileUtil; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ExtendWith(ParameterizedTestExtension.class) +public class MultiVersionClusterTest extends ClasspathBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final String QUEUE_NAME = "MultiVersionClusterTestQueue"; + + private final String broker1Version; + private final ClassLoader broker1Classloader; + + private final String broker2Version; + private final ClassLoader broker2Classloader; + + @Parameters(name = "broker1={0}, broker2={1}") + public static Collection getParameters() { + List<Object[]> combinations = new ArrayList<>(); + + // Test clustering with mixed versions + combinations.add(new Object[]{ARTEMIS_2_44_0, SNAPSHOT}); + combinations.add(new Object[]{SNAPSHOT, ARTEMIS_2_44_0}); + + // The SNAPSHOT/SNAPSHOT is here as a test validation only + combinations.add(new Object[]{SNAPSHOT, SNAPSHOT}); + + return combinations; + } + + public MultiVersionClusterTest(String broker1Version, String broker2Version) throws Exception { + this.broker1Version = broker1Version; + this.broker1Classloader = getClasspath(broker1Version); + + this.broker2Version = broker2Version; + this.broker2Classloader = getClasspath(broker2Version); + } + + @AfterEach + public void cleanupServers() { + try { + evaluate(broker1Classloader, "multiVersionCluster/broker1Stop.groovy"); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + try { + evaluate(broker2Classloader, "multiVersionCluster/broker2Stop.groovy"); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + } + + @TestTemplate + public void testCluster() throws Throwable { + FileUtil.deleteDirectory(serverFolder.getAbsoluteFile()); + System.out.println("Starting broker1 with version " + broker1Version); + evaluate(broker1Classloader, "multiVersionCluster/broker1.groovy", serverFolder.getAbsolutePath(), "broker1", "61000", "61001"); + + System.out.println("Starting broker2 with version " + broker2Version); + evaluate(broker2Classloader, "multiVersionCluster/broker2.groovy", serverFolder.getAbsolutePath(), "broker2", "61001", "61000"); + + // Wait for cluster to form + evaluate(broker1Classloader, "multiVersionCluster/broker1WaitForTopology.groovy"); + evaluate(broker2Classloader, "multiVersionCluster/broker2WaitForTopology.groovy"); + + // Send messages on broker0 + send(new ActiveMQConnectionFactory("tcp://localhost:61000"), 100, 1024); + // Receive messages on broker1 + receive(new ActiveMQConnectionFactory("tcp://localhost:61001"), 100, 1024); + + // Send large messages on broker0 + send(new ActiveMQConnectionFactory("tcp://localhost:61000"), 100, 1024); + // Receive large messages on broker1 + receive(new ActiveMQConnectionFactory("tcp://localhost:61001"), 100, 1024); + + // send amqp messages on broker 0 + send(new JmsConnectionFactory("amqp://localhost:61000"), 100, 1024); + // receive amqp messages on broker 1 + receive(new JmsConnectionFactory("amqp://localhost:61001"), 100, 1024); + + // send amqp large messages on broker 0 + send(new JmsConnectionFactory("amqp://localhost:61000"), 10, 300 * 1024); + // receive amqp large messages on broker 1 + receive(new JmsConnectionFactory("amqp://localhost:61001"), 10, 300 * 1024); + + evaluate(broker1Classloader, "multiVersionCluster/broker1Stop.groovy"); + evaluate(broker2Classloader, "multiVersionCluster/broker2Stop.groovy"); + } + + private void send(ConnectionFactory factory, int numberOfMessages, int textSize) throws Throwable { + try (Connection connection = factory.createConnection()) { + Queue queue; + + { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + queue = session.createQueue(QUEUE_NAME); + MessageProducer producer = session.createProducer(queue); + boolean pending = false; + for (int i = 0; i < numberOfMessages; i++) { + producer.send(session.createTextMessage("A".repeat(textSize))); + pending = true; + if (i > 0 && i % 100 == 0) { + session.commit(); + pending = false; + } + } + if (pending) { + session.commit(); + } + session.close(); + } + } + } + + private void receive(ConnectionFactory factory, int numberOfMessages, int textSize) throws Throwable { + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(QUEUE_NAME); + connection.start(); + MessageConsumer consumer = session.createConsumer(queue); + for (int i = 0; i < numberOfMessages; i++) { + TextMessage message = (TextMessage) consumer.receive(5000); + assertNotNull(message, "Message " + i + " was not received"); + assertEquals("A".repeat(textSize), message.getText()); + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
