This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b321ab8ff ARTEMIS-3759 Add mirror controller address filter support
0b321ab8ff is described below

commit 0b321ab8ff62268d55df56e504cbfc043e2de8eb
Author: iliya <[email protected]>
AuthorDate: Tue Apr 26 19:36:42 2022 +0300

    ARTEMIS-3759 Add mirror controller address filter support
    
    Allow replication only certain addresses with mirror controller.
    The configuration is similar to cluster address configuration.
    
    Co-authored-by: Robbie Gemmell <[email protected]>
---
 .../connect/mirror/AMQPMirrorControllerSource.java | 41 +++++++++++++
 .../amqp/connect/mirror/MirrorAddressFilter.java   | 70 ++++++++++++++++++++++
 .../connect/mirror/MirrorAddressFilterTest.java    | 36 +++++++++++
 .../AMQPMirrorBrokerConnectionElement.java         | 12 ++++
 .../deployers/impl/FileConfigurationParser.java    |  4 +-
 .../resources/schema/artemis-configuration.xsd     |  8 +++
 .../resources/ConfigurationTest-full-config.xml    |  2 +-
 docs/user-manual/en/amqp-broker-connections.md     | 19 ++++++
 .../integration/amqp/connect/AMQPReplicaTest.java  | 64 ++++++++++++++++++++
 9 files changed, 254 insertions(+), 2 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index 2d7cd10008..a70773634f 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -82,6 +82,7 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
    final boolean acks;
    final boolean addQueues;
    final boolean deleteQueues;
+   final MirrorAddressFilter addressFilter;
    private final AMQPBrokerConnection brokerConnection;
 
    final AMQPMirrorBrokerConnectionElement replicaConfig;
@@ -110,6 +111,7 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
       this.idSupplier = protonProtocolManager.getReferenceIDSupplier();
       this.addQueues = replicaConfig.isQueueCreation();
       this.deleteQueues = replicaConfig.isQueueRemoval();
+      this.addressFilter = new 
MirrorAddressFilter(replicaConfig.getAddressFilter());
       this.acks = replicaConfig.isMessageAcknowledgements();
       this.brokerConnection = brokerConnection;
    }
@@ -131,6 +133,11 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
       if (getControllerInUse() != null && !addressInfo.isInternal()) {
          return;
       }
+
+      if (ignoreAddress(addressInfo.getName())) {
+         return;
+      }
+
       if (addQueues) {
          Message message = createMessage(addressInfo.getName(), null, 
ADD_ADDRESS, null, addressInfo.toJSON());
          route(server, message);
@@ -145,6 +152,9 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
       if (invalidTarget(getControllerInUse()) || addressInfo.isInternal()) {
          return;
       }
+      if (ignoreAddress(addressInfo.getName())) {
+         return;
+      }
       if (deleteQueues) {
          Message message = createMessage(addressInfo.getName(), null, 
DELETE_ADDRESS, null, addressInfo.toJSON());
          route(server, message);
@@ -162,6 +172,12 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
          }
          return;
       }
+      if (ignoreAddress(queueConfiguration.getAddress())) {
+         if (logger.isTraceEnabled()) {
+            logger.trace("Skipping create " + queueConfiguration + ", queue 
address " + queueConfiguration.getAddress() + " doesn't match filter");
+         }
+         return;
+      }
       if (addQueues) {
          Message message = createMessage(queueConfiguration.getAddress(), 
queueConfiguration.getName(), CREATE_QUEUE, null, queueConfiguration.toJSON());
          route(server, message);
@@ -178,6 +194,10 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
          return;
       }
 
+      if (ignoreAddress(address)) {
+         return;
+      }
+
       if (deleteQueues) {
          Message message = createMessage(address, queue, DELETE_QUEUE, null, 
queue.toString());
          route(server, message);
@@ -188,12 +208,18 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
       return controller != null && sameNode(getRemoteMirrorId(), 
controller.getRemoteMirrorId());
    }
 
+   private boolean ignoreAddress(SimpleString address) {
+      return !addressFilter.match(address);
+   }
+
    private boolean sameNode(String remoteID, String sourceID) {
       return (remoteID != null && sourceID != null && 
remoteID.equals(sourceID));
    }
 
    @Override
    public void sendMessage(Message message, RoutingContext context, 
List<MessageReference> refs) {
+      SimpleString address = context.getAddress(message);
+
       if (invalidTarget(context.getMirrorSource())) {
          if (logger.isTraceEnabled()) {
             logger.trace("server " + server + " is discarding send to avoid 
infinite loop (reflection with the mirror)");
@@ -208,6 +234,13 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
          return;
       }
 
+      if (ignoreAddress(address)) {
+         if (logger.isTraceEnabled()) {
+            logger.trace("server " + server + " is discarding send to address 
" + address + ", address doesn't match filter");
+         }
+         return;
+      }
+
       if (logger.isTraceEnabled()) {
          logger.trace(server + " send message " + message);
       }
@@ -301,6 +334,13 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
          return;
       }
 
+      if (ignoreAddress(ref.getQueue().getAddress())) {
+         if (logger.isTraceEnabled()) {
+            logger.trace(server + " rejecting postAcknowledge queue=" + 
ref.getQueue().getName() + ", ref=" + ref + ", queue address is excluded");
+         }
+         return;
+      }
+
       if (logger.isTraceEnabled()) {
          logger.trace(server + " postAcknowledge " + ref);
       }
@@ -337,4 +377,5 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
          return true;
       }
    }
+
 }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorAddressFilter.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorAddressFilter.java
new file mode 100644
index 0000000000..ec4e710edc
--- /dev/null
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorAddressFilter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+
+public class MirrorAddressFilter {
+
+   private final SimpleString[] allowList;
+
+   private final SimpleString[] denyList;
+
+   public MirrorAddressFilter(String filter) {
+      Set<SimpleString> allowList = new HashSet<>();
+      Set<SimpleString> denyList = new HashSet<>();
+
+      if (filter != null && !filter.isEmpty()) {
+         String[] parts = filter.split(",");
+         for (String part : parts) {
+            if (!"".equals(part) && !"!".equals(part)) {
+               if (part.startsWith("!")) {
+                  denyList.add(new SimpleString(part.substring(1)));
+               } else {
+                  allowList.add(new SimpleString(part));
+               }
+            }
+         }
+      }
+
+      this.allowList = allowList.toArray(new SimpleString[]{});
+      this.denyList = denyList.toArray(new SimpleString[]{});
+   }
+
+   public boolean match(SimpleString checkAddress) {
+      if (denyList.length > 0) {
+         for (SimpleString pattern : denyList) {
+            if (checkAddress.startsWith(pattern)) {
+               return false;
+            }
+         }
+      }
+
+      if (allowList.length > 0) {
+         for (SimpleString pattern : allowList) {
+            if (checkAddress.startsWith(pattern)) {
+               return true;
+            }
+         }
+         return false;
+      }
+      return true;
+   }
+}
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorAddressFilterTest.java
 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorAddressFilterTest.java
new file mode 100644
index 0000000000..967b8cab3e
--- /dev/null
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorAddressFilterTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MirrorAddressFilterTest {
+
+   @Test
+   public void testAddressFilter() {
+      Assert.assertTrue(new MirrorAddressFilter("").match(new 
SimpleString("any")));
+      Assert.assertTrue(new MirrorAddressFilter("test").match(new 
SimpleString("test123")));
+      Assert.assertTrue(new MirrorAddressFilter("a,b").match(new 
SimpleString("b")));
+      Assert.assertTrue(new MirrorAddressFilter("!c").match(new 
SimpleString("a")));
+      Assert.assertTrue(new MirrorAddressFilter("!a,!").match(new 
SimpleString("b123")));
+      Assert.assertFalse(new MirrorAddressFilter("a,b,!ab").match(new 
SimpleString("ab")));
+      Assert.assertFalse(new MirrorAddressFilter("!a,!b").match(new 
SimpleString("b123")));
+      Assert.assertFalse(new MirrorAddressFilter("a,").match(new 
SimpleString("b")));
+   }
+}
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java
index 46a6610b50..98a130888f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java
@@ -30,6 +30,8 @@ public class AMQPMirrorBrokerConnectionElement extends 
AMQPBrokerConnectionEleme
 
    SimpleString mirrorSNF;
 
+   String addressFilter;
+
    public SimpleString getMirrorSNF() {
       return mirrorSNF;
    }
@@ -86,4 +88,14 @@ public class AMQPMirrorBrokerConnectionElement extends 
AMQPBrokerConnectionEleme
       this.messageAcknowledgements = messageAcknowledgements;
       return this;
    }
+
+   public String getAddressFilter() {
+      return addressFilter;
+   }
+
+   public AMQPMirrorBrokerConnectionElement setAddressFilter(String 
addressFilter) {
+      this.addressFilter = addressFilter;
+      return this;
+   }
+
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 413cc5475f..9e839c2f39 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -2094,8 +2094,10 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
                boolean queueCreation = 
getBooleanAttribute(e2,"queue-creation", true);
                boolean durable = getBooleanAttribute(e2, "durable", true);
                boolean queueRemoval = getBooleanAttribute(e2, "queue-removal", 
true);
+               String addressFilter = getAttributeValue(e2, "address-filter");
+
                AMQPMirrorBrokerConnectionElement amqpMirrorConnectionElement = 
new AMQPMirrorBrokerConnectionElement();
-               
amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable);
+               
amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter);
                connectionElement = amqpMirrorConnectionElement;
                
connectionElement.setType(AMQPBrokerConnectionAddressType.MIRROR);
             } else {
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd 
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 33fb5a35de..ca9f8140a9 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2442,6 +2442,14 @@
             </xsd:documentation>
          </xsd:annotation>
       </xsd:attribute>
+      <xsd:attribute name="address-filter" type="xsd:string" use="optional">
+         <xsd:annotation>
+            <xsd:documentation>
+               This defines a filter that mirror will use to determine witch 
events will be forwarded toward
+               target server based on source address.
+            </xsd:documentation>
+         </xsd:annotation>
+      </xsd:attribute>
 
    </xsd:complexType>
 
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml 
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 1adf01baf8..409254647f 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -442,7 +442,7 @@
             <receiver address-match="TEST-RECEIVER" />
             <peer address-match="TEST-PEER"/>
             <receiver queue-name="TEST-WITH-QUEUE-NAME"/>
-            <mirror message-acknowledgements="false" queue-creation="false" 
durable="false" queue-removal="false"/>
+            <mirror message-acknowledgements="false" queue-creation="false" 
durable="false" queue-removal="false" 
address-filter="TEST-QUEUE,!IGNORE-QUEUE"/>
          </amqp-connection>
          <amqp-connection uri="tcp://test2:222" name="test2">
             <mirror durable="false"/>
diff --git a/docs/user-manual/en/amqp-broker-connections.md 
b/docs/user-manual/en/amqp-broker-connections.md
index 0f1704de22..667ab8ceff 100644
--- a/docs/user-manual/en/amqp-broker-connections.md
+++ b/docs/user-manual/en/amqp-broker-connections.md
@@ -103,6 +103,25 @@ The following optional arguments can be utilized:
 * `queue-removal`: Specifies whether a queue- or address-removal event is 
sent. The default value is `true`.
 * `message-acknowledgements`: Specifies whether message acknowledgements are 
sent. The default value is `true`.
 * `queue-creation`: Specifies whether a queue- or address-creation event is 
sent. The default value is `true`.
+* `address-filter`: An optional comma-separated list of inclusion and/or 
exclusion filter entries used to govern which addresses (and related queues) 
mirroring events will be created for on this broker-connection. That is, events 
will only be mirrored to the target broker for addresses that match the filter.
+  An address is matched when it begins with an inclusion entry specified in 
this field, unless the address is also explicitly excluded by another entry. An 
exclusion entry is prefixed with `!` to denote any address beginning with that 
value does not match.
+  If no inclusion entry is specified in the list, all addresses not explicitly 
excluded will match. If the address-filter attribute is not specified, then all 
addresses (and related queues) will match and be mirrored.
+
+  Examples:
+
+  - 'eu'
+    matches all addresses starting with 'eu'
+  - '!eu'
+    matches all address except for those starting with 'eu'
+  - 'eu.uk,eu.de'
+    matches all addresses starting with either 'eu.uk' or 'eu.de'
+  - 'eu,!eu.uk'
+    matches all addresses starting with 'eu' but not those starting with 
'eu.uk'
+
+  **Note:**
+
+  - Address exclusion will always take precedence over address inclusion.
+  - Address matching on mirror elements is prefix-based and does not support 
wild-card matching.
 
 An example of a mirror configuration is shown below:
 ```xml
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
index 12db46f2a1..190456dca0 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
@@ -473,6 +473,70 @@ public class AMQPReplicaTest extends AmqpClientTestSupport 
{
       }
    }
 
+   @Test
+   public void testAddressFilter() throws Exception {
+      final String REPLICATED = "replicated";
+      final String NON_REPLICATED = "nonReplicated";
+      final String ADDRESS_FILTER = REPLICATED + "," + "!" + NON_REPLICATED;
+      final String MSG = "msg";
+
+      server.start();
+
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_2.setIdentity("server_2");
+      server_2.getConfiguration().setName("server_2");
+
+      AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("mirror-source", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+      AMQPMirrorBrokerConnectionElement replica = new 
AMQPMirrorBrokerConnectionElement().setDurable(true).setAddressFilter(ADDRESS_FILTER);
+      amqpConnection.addElement(replica);
+      server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+      server_2.start();
+
+      try (Connection connection = CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:" + AMQP_PORT_2).createConnection()) {
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+         // Send to non replicated address
+         try (MessageProducer producer = 
session.createProducer(session.createQueue(NON_REPLICATED))) {
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+            for (int i = 0; i < 2; i++) {
+               producer.send(session.createTextMessage("never receive"));
+            }
+         }
+
+         // Check nothing was added to SnF queue
+         Assert.assertEquals(0, 
server_2.locateQueue(replica.getMirrorSNF()).getMessagesAdded());
+
+         // Send to replicated address
+         try (MessageProducer producer = 
session.createProducer(session.createQueue(REPLICATED))) {
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+            for (int i = 0; i < 2; i++) {
+               producer.send(session.createTextMessage(MSG));
+            }
+         }
+
+         // Check some messages were sent to SnF queue
+         
Assert.assertTrue(server_2.locateQueue(replica.getMirrorSNF()).getMessagesAdded()
 > 0);
+      }
+
+      try (Connection connection = CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:" + AMQP_PORT).createConnection()) {
+         connection.start();
+
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+         try (MessageConsumer consumer = 
session.createConsumer(session.createQueue(REPLICATED))) {
+            Message message = consumer.receive(3000);
+            Assert.assertNotNull(message);
+            Assert.assertEquals(MSG, message.getBody(String.class));
+         }
+
+         try (MessageConsumer consumer = 
session.createConsumer(session.createQueue(NON_REPLICATED))) {
+            Assert.assertNull(consumer.receiveNoWait());
+         }
+      }
+
+   }
+
    @Test
    public void testRouteSurviving() throws Exception {
       testRouteSurvivor(false);

Reply via email to