This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c3f772645 ARTEMIS-4666 Correctly set queue match when parsing XML 
confing
2c3f772645 is described below

commit 2c3f77264594f11f2bf0047bc16f2b1cb26d577e
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Apr 5 11:40:43 2024 -0400

    ARTEMIS-4666 Correctly set queue match when parsing XML confing
    
    Correct the XML parser for core federation queue match policy loading
    to call the setQueueMatch instead of setAddressMatch when reading the
    queue match element.
---
 .../deployers/impl/FileConfigurationParser.java    |   2 +-
 .../config/impl/FileConfigurationParserTest.java   |  46 +++++
 .../FederationQueueMatchXMLConfigParsingTest.java  | 209 +++++++++++++++++++++
 .../core-federated-queue-match-server1.xml         |  95 ++++++++++
 .../core-federated-queue-match-server2.xml         |  58 ++++++
 5 files changed, 409 insertions(+), 1 deletion(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 09653e5d6d..6b885e75d1 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -2632,7 +2632,7 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
 
    private FederationQueuePolicyConfiguration.Matcher getQueueMatcher(Element 
child) {
       FederationQueuePolicyConfiguration.Matcher matcher = new 
FederationQueuePolicyConfiguration.Matcher();
-      matcher.setAddressMatch(child.getAttribute("queue-match"));
+      matcher.setQueueMatch(child.getAttribute("queue-match"));
       matcher.setAddressMatch(child.getAttribute("address-match"));
       return matcher;
    }
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
index a0f74aa992..aefb6a53eb 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
@@ -22,16 +22,19 @@ import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.BridgeConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
 import org.apache.activemq.artemis.core.config.FileDeploymentManager;
 import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
 import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
+import 
org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
 import 
org.apache.activemq.artemis.core.config.ha.PrimaryOnlyPolicyConfiguration;
 import 
org.apache.activemq.artemis.core.config.ha.SharedStorePrimaryPolicyConfiguration;
 import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
@@ -802,4 +805,47 @@ public class FileConfigurationParserTest extends 
ServerTestBase {
       "</address-settings>" + "\n";
 
    private static String lastPart = "</core>";
+
+   @Test
+   public void testParseQueueMatchInFederationConfiguration() throws Exception 
{
+      String configStr = firstPart +
+                         "<federations>" +
+                          "<federation name=\"server-1-federation\">" +
+                           "<upstream name=\"upstream\">" +
+                            "<static-connectors>" +
+                             "<connector-ref>server-connector</connector-ref>" 
+
+                            "</static-connectors>" +
+                            "<policy ref=\"queue-federation\"/>" +
+                           "</upstream>" +
+                           "" +
+                           "<queue-policy name=\"queue-federation\">" +
+                            "<include queue-match=\"myQueue\" 
address-match=\"#\"/>" +
+                           "</queue-policy>" +
+                          "</federation>" +
+                         "</federations>" +
+                         lastPart;
+
+      final FileConfigurationParser parser = new FileConfigurationParser();
+      final ByteArrayInputStream input = new 
ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
+
+      final Configuration configuration = parser.parseMainConfig(input);
+      final List<FederationConfiguration> federations = 
configuration.getFederationConfigurations();
+
+      assertEquals(1, federations.size());
+
+      final FederationConfiguration federation = federations.get(0);
+      final FederationQueuePolicyConfiguration policy =
+         (FederationQueuePolicyConfiguration) 
federation.getQueuePolicies().get("queue-federation");
+
+      assertNotNull(policy);
+
+      final Set<FederationQueuePolicyConfiguration.Matcher> matches = 
policy.getIncludes();
+
+      assertEquals(1, matches.size());
+
+      final FederationQueuePolicyConfiguration.Matcher match = 
matches.iterator().next();
+
+      assertEquals("#", match.getAddressMatch());
+      assertEquals("myQueue", match.getQueueMatch());
+   }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederationQueueMatchXMLConfigParsingTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederationQueueMatchXMLConfigParsingTest.java
new file mode 100644
index 0000000000..16ceecd4ac
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederationQueueMatchXMLConfigParsingTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import java.lang.invoke.MethodHandles;
+import java.net.URL;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
+import org.apache.activemq.artemis.tests.integration.jms.RedeployTest;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FederationQueueMatchXMLConfigParsingTest extends ActiveMQTestBase 
{
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   @Test(timeout = 20000)
+   public void testOpenWireOverCoreFederationDownstream() throws Exception {
+      doTestSimpleQueueFederationOverCoreFederationDoownstream("OPENWIRE");
+   }
+
+   @Test(timeout = 20000)
+   public void testCoreOverCoreFederationDownstream() throws Exception {
+      doTestSimpleQueueFederationOverCoreFederationDoownstream("CORE");
+   }
+
+   @Test(timeout = 20000)
+   public void testAMQPOverCoreFederationDownstream() throws Exception {
+      doTestSimpleQueueFederationOverCoreFederationDoownstream("AMQP");
+   }
+
+   private void 
doTestSimpleQueueFederationOverCoreFederationDoownstream(String clientProtocol) 
throws Exception {
+      final URL urlServer1 = 
RedeployTest.class.getClassLoader().getResource("core-federated-queue-match-server1.xml");
+      final URL urlServer2 = 
RedeployTest.class.getClassLoader().getResource("core-federated-queue-match-server2.xml");
+
+      final int MESSAGE_COUNT = 5;
+
+      final CountDownLatch receivedAllLatch = new 
CountDownLatch(MESSAGE_COUNT);
+
+      final EmbeddedActiveMQ embeddedActiveMQ1 = new EmbeddedActiveMQ();
+      embeddedActiveMQ1.setConfigResourcePath(urlServer1.toURI().toString());
+      embeddedActiveMQ1.start();
+
+      final EmbeddedActiveMQ embeddedActiveMQ2 = new EmbeddedActiveMQ();
+      embeddedActiveMQ2.setConfigResourcePath(urlServer2.toURI().toString());
+      embeddedActiveMQ2.start();
+
+      final ConnectionFactory consumerConnectionFactory = 
CFUtil.createConnectionFactory(clientProtocol, "tcp://localhost:61618");
+      final ConnectionFactory producerConnectionFactory = 
CFUtil.createConnectionFactory(clientProtocol, "tcp://localhost:61616");
+
+      try (Connection consumerConnection = 
consumerConnectionFactory.createConnection()) {
+
+         final Session consumerSession = 
consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         final Destination consumerDestination = 
consumerSession.createQueue("exampleQueueTwo");
+         final MessageConsumer consumer = 
consumerSession.createConsumer(consumerDestination);
+
+         consumerConnection.start();
+
+         consumer.setMessageListener((message) -> {
+            logger.info("Received message: {} ", message);
+            receivedAllLatch.countDown();
+         });
+
+         try (Connection producerConnection = 
producerConnectionFactory.createConnection()) {
+            producerConnection.start();
+
+            final Session producerSession = 
producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            final Destination producerQueue = 
producerSession.createQueue("exampleQueueTwo");
+            final MessageProducer producer = 
producerSession.createProducer(producerQueue);
+            final UUID uuid = UUID.randomUUID();
+
+            for (int i = 0; i < MESSAGE_COUNT; i++) {
+               producer.send(producerSession.createTextMessage("Test message:" 
+ uuid));
+               logger.trace("Sent message: {}", uuid);
+            }
+
+            logger.info("Sent {} messages to queue for federation dispatch.", 
MESSAGE_COUNT);
+         }
+
+         assertTrue(receivedAllLatch.await(10, TimeUnit.SECONDS));
+      } finally {
+         try {
+            embeddedActiveMQ1.stop();
+         } catch (Exception ex) {
+         }
+         try {
+            embeddedActiveMQ2.stop();
+         } catch (Exception ex) {
+         }
+      }
+   }
+
+   @Test(timeout = 20000)
+   public void testQueuePolicyMatchesOnlyIndicatedQueueOpenwire() throws 
Exception {
+      doTestQueueMatchPolicyOnlyMatchesIndicatedQueue("OPENWIRE");
+   }
+
+   @Test(timeout = 20000)
+   public void testQueuePolicyMatchesOnlyIndicatedQueueCore() throws Exception 
{
+      doTestQueueMatchPolicyOnlyMatchesIndicatedQueue("CORE");
+   }
+
+   @Test(timeout = 20000)
+   public void testQueuePolicyMatchesOnlyIndicatedQueueAMQP() throws Exception 
{
+      doTestQueueMatchPolicyOnlyMatchesIndicatedQueue("AMQP");
+   }
+
+   private void doTestQueueMatchPolicyOnlyMatchesIndicatedQueue(String 
clientProtocol) throws Exception {
+      final URL urlServer1 = 
RedeployTest.class.getClassLoader().getResource("core-federated-queue-match-server1.xml");
+      final URL urlServer2 = 
RedeployTest.class.getClassLoader().getResource("core-federated-queue-match-server2.xml");
+
+      final int MESSAGE_COUNT = 5;
+
+      final AtomicInteger receivedOnQ1Count = new AtomicInteger();
+      final CountDownLatch receivedAllOnQ2Latch = new 
CountDownLatch(MESSAGE_COUNT);
+
+      final EmbeddedActiveMQ embeddedActiveMQ1 = new EmbeddedActiveMQ();
+      embeddedActiveMQ1.setConfigResourcePath(urlServer1.toURI().toString());
+      embeddedActiveMQ1.start();
+
+      final EmbeddedActiveMQ embeddedActiveMQ2 = new EmbeddedActiveMQ();
+      embeddedActiveMQ2.setConfigResourcePath(urlServer2.toURI().toString());
+      embeddedActiveMQ2.start();
+
+      final ConnectionFactory consumerConnectionFactory = 
CFUtil.createConnectionFactory(clientProtocol, "tcp://localhost:61616");
+      final ConnectionFactory producerConnectionFactory = 
CFUtil.createConnectionFactory(clientProtocol, "tcp://localhost:61618");
+
+      try (Connection consumerConnection = 
consumerConnectionFactory.createConnection()) {
+
+         final Session consumerSession = 
consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         final Destination consumerDestination1 = 
consumerSession.createQueue("exampleQueueOne");
+         final Destination consumerDestination2 = 
consumerSession.createQueue("exampleQueueTwo");
+         final MessageConsumer consumer1 = 
consumerSession.createConsumer(consumerDestination1);
+         final MessageConsumer consumer2 = 
consumerSession.createConsumer(consumerDestination2);
+
+         consumerConnection.start();
+
+         consumer1.setMessageListener((message) -> {
+            logger.info("Consumer #1 Received message: {} ", message);
+            receivedOnQ1Count.incrementAndGet();
+         });
+
+         consumer2.setMessageListener((message) -> {
+            logger.info("Consumer #2 Received message: {} ", message);
+            receivedAllOnQ2Latch.countDown();
+         });
+
+         try (Connection producerConnection = 
producerConnectionFactory.createConnection()) {
+            producerConnection.start();
+
+            final Session producerSession = 
producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            final Destination producerQueue1 = 
producerSession.createQueue("exampleQueueOne");
+            final Destination producerQueue2 = 
producerSession.createQueue("exampleQueueTwo");
+            final MessageProducer producer1 = 
producerSession.createProducer(producerQueue1);
+            final MessageProducer producer2 = 
producerSession.createProducer(producerQueue2);
+            final UUID uuid = UUID.randomUUID();
+
+            for (int i = 0; i < MESSAGE_COUNT; i++) {
+               producer1.send(producerSession.createTextMessage("Test 
message:" + uuid));
+               producer2.send(producerSession.createTextMessage("Test 
message:" + uuid));
+               logger.trace("Sent message: {}", uuid);
+            }
+
+            logger.info("Sent {} messages to queues for federation dispatch.", 
MESSAGE_COUNT);
+         }
+
+         assertTrue(receivedAllOnQ2Latch.await(10, TimeUnit.SECONDS));
+         assertEquals(0, receivedOnQ1Count.get());
+      } finally {
+         try {
+            embeddedActiveMQ1.stop();
+         } catch (Exception ex) {
+         }
+         try {
+            embeddedActiveMQ2.stop();
+         } catch (Exception ex) {
+         }
+      }
+   }
+}
diff --git 
a/tests/integration-tests/src/test/resources/core-federated-queue-match-server1.xml
 
b/tests/integration-tests/src/test/resources/core-federated-queue-match-server1.xml
new file mode 100644
index 0000000000..0abd14e44b
--- /dev/null
+++ 
b/tests/integration-tests/src/test/resources/core-federated-queue-match-server1.xml
@@ -0,0 +1,95 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<configuration xmlns="urn:activemq" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xsi:schemaLocation="urn:activemq 
/schema/artemis-configuration.xsd">
+    <core xmlns="urn:activemq:core">
+
+        <name>server1</name>
+
+        <persistence-enabled>false</persistence-enabled>
+
+        <security-enabled>false</security-enabled>
+
+        <connectors>
+            <connector name="netty-connector">tcp://localhost:61616</connector>
+            <connector 
name="server-2-connector">tcp://localhost:61618</connector>
+        </connectors>
+
+        <!-- Acceptors -->
+        <acceptors>
+            <acceptor name="netty-acceptor">tcp://localhost:61616</acceptor>
+        </acceptors>
+
+        <!-- Federation -->
+
+        <addresses>
+            <address name="exampleQueueOne">
+                <anycast>
+                    <queue name="exampleQueueOne"/>
+                </anycast>
+            </address>
+            <address name="exampleQueueTwo">
+                <anycast>
+                    <queue name="exampleQueueTwo"/>
+                </anycast>
+            </address>
+        </addresses>
+
+        <federations>
+            <federation name="server-1-federation">
+                <upstream name="server-1-upstream">
+                    <circuit-breaker-timeout>1000</circuit-breaker-timeout>
+                    <share-connection>true</share-connection>
+                    <static-connectors>
+                        <connector-ref>server-2-connector</connector-ref>
+                    </static-connectors>
+                    <policy ref="policySetA"/>
+                </upstream>
+                <downstream name="server-1-downstream" 
priority-adjustment="-1">
+                    <circuit-breaker-timeout>1000</circuit-breaker-timeout>
+                    <share-connection>true</share-connection>
+                    <static-connectors>
+                        <connector-ref>server-2-connector</connector-ref>
+                    </static-connectors>
+                    <policy ref="policySetA"/>
+                    
<upstream-connector-ref>netty-connector</upstream-connector-ref>
+                </downstream>
+
+                <policy-set name="policySetA">
+                    <policy ref="queue-federation"/>
+                </policy-set>
+
+                <queue-policy name="queue-federation">
+                    <include queue-match="exampleQueueTwo" address-match="#"/>
+                </queue-policy>
+            </federation>
+        </federations>
+
+        <address-settings>
+            <address-setting match="#">
+                <dead-letter-address>deadLetterQueue</dead-letter-address>
+                <expiry-address>expiryQueue</expiry-address>
+                <auto-delete-addresses>false</auto-delete-addresses>
+                <max-delivery-attempts>3</max-delivery-attempts>
+            </address-setting>
+        </address-settings>
+
+    </core>
+</configuration>
diff --git 
a/tests/integration-tests/src/test/resources/core-federated-queue-match-server2.xml
 
b/tests/integration-tests/src/test/resources/core-federated-queue-match-server2.xml
new file mode 100644
index 0000000000..e49e163ceb
--- /dev/null
+++ 
b/tests/integration-tests/src/test/resources/core-federated-queue-match-server2.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<configuration xmlns="urn:activemq" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xsi:schemaLocation="urn:activemq 
/schema/artemis-configuration.xsd">
+    <core xmlns="urn:activemq:core">
+
+        <name>server-2</name>
+
+        <persistence-enabled>false</persistence-enabled>
+
+        <security-enabled>false</security-enabled>
+
+        <!-- Acceptors -->
+        <acceptors>
+            <acceptor name="netty-acceptor">tcp://localhost:61618</acceptor>
+        </acceptors>
+
+        <addresses>
+            <address name="exampleQueueOne">
+                <anycast>
+                    <queue name="exampleQueueOne"/>
+                </anycast>
+            </address>
+            <address name="exampleQueueTwo">
+                <anycast>
+                    <queue name="exampleQueueTwo"/>
+                </anycast>
+            </address>
+        </addresses>
+
+        <address-settings>
+            <address-setting match="#">
+                <dead-letter-address>deadLetterQueue</dead-letter-address>
+                <expiry-address>expiryQueue</expiry-address>
+                <auto-delete-addresses>false</auto-delete-addresses>
+                <max-delivery-attempts>3</max-delivery-attempts>
+            </address-setting>
+        </address-settings>
+
+    </core>
+</configuration>

Reply via email to