This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new d7a7116a4c ARTEMIS-4754 Structure the names used for federation 
internal queues
d7a7116a4c is described below

commit d7a7116a4c31bdd830e66f54240b7204a83101ae
Author: Timothy Bish <[email protected]>
AuthorDate: Wed May 1 16:39:23 2024 -0400

    ARTEMIS-4754 Structure the names used for federation internal queues
    
    When creating internal temporary queues for the federation control links 
and the
    events link we should use a structured naming convention to ease in 
configuring
    security for the federation user where all internal names fall under a root 
prefix
    which can be used to grant read and write access for the federation user. 
This
    change allows security on the wildcarded address 
"$ACTIVEMQ_ARTEMIS_FEDERATION.#".
    This change also includes some further restrictions added to federation 
resources
    and adds support for wildcarding '$' prefixed addresses.
---
 .../protocol/amqp/broker/AMQPSessionCallback.java  |  21 +-
 .../amqp/connect/federation/AMQPFederation.java    |  48 ++++
 .../AMQPFederationCommandDispatcher.java           |  25 +-
 .../federation/AMQPFederationConstants.java        |  37 ++-
 .../federation/AMQPFederationEventDispatcher.java  |  19 +-
 .../amqp/proton/AMQPConnectionContext.java         |   4 +-
 .../activemq/artemis/core/settings/impl/Match.java |   5 +
 .../artemis/core/settings/impl/MatchTest.java      |  23 ++
 .../amqp/connect/AMQPFederationConnectTest.java    | 265 ++++++++++++++++++++-
 9 files changed, 412 insertions(+), 35 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index c67cac0a1d..4612f7873b 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -263,15 +263,32 @@ public class AMQPSessionCallback implements 
SessionCallback {
    }
 
    public void createTemporaryQueue(SimpleString queueName, RoutingType 
routingType) throws Exception {
-      createTemporaryQueue(queueName, queueName, routingType, null);
+      createTemporaryQueue(queueName, queueName, routingType, null, null);
+   }
+
+   public void createTemporaryQueue(SimpleString queueName, RoutingType 
routingType, Integer maxConsumers) throws Exception {
+      createTemporaryQueue(queueName, queueName, routingType, null, 
maxConsumers);
    }
 
    public void createTemporaryQueue(SimpleString address,
                                     SimpleString queueName,
                                     RoutingType routingType,
                                     SimpleString filter) throws Exception {
+      createTemporaryQueue(address, queueName, routingType, filter, null);
+   }
+
+   public void createTemporaryQueue(SimpleString address,
+                                    SimpleString queueName,
+                                    RoutingType routingType,
+                                    SimpleString filter,
+                                    Integer maxConsumers) throws Exception {
       try {
-         serverSession.createQueue(new 
QueueConfiguration(queueName).setAddress(address).setRoutingType(routingType).setFilterString(filter).setTemporary(true).setDurable(false));
+         serverSession.createQueue(new 
QueueConfiguration(queueName).setAddress(address)
+                                                                    
.setRoutingType(routingType)
+                                                                    
.setFilterString(filter)
+                                                                    
.setTemporary(true)
+                                                                    
.setDurable(false)
+                                                                    
.setMaxConsumers(maxConsumers));
       } catch (ActiveMQSecurityException se) {
          throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingTempDestination(se.getMessage());
       }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java
index 1f3c818155..891e264111 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java
@@ -17,6 +17,10 @@
 
 package org.apache.activemq.artemis.protocol.amqp.connect.federation;
 
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK_PREFIX;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_BASE_VALIDATION_ADDRESS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_EVENTS_LINK_PREFIX;
+
 import java.lang.invoke.MethodHandles;
 import java.util.Map;
 import java.util.Objects;
@@ -170,6 +174,50 @@ public abstract class AMQPFederation implements 
FederationInternal {
       }
    }
 
+   /**
+    * Performs the prefixing for federation events queues that places the 
events queues into
+    * the name-space of federation related internal queues.
+    *
+    * @param suffix
+    *    A suffix to append to the federation events link (normally the AMQP 
link name).
+    *
+    * @return the full internal queue name to use for the given suffix.
+    */
+   String prefixEventsLinkQueueName(String suffix) {
+      final StringBuilder builder = new StringBuilder();
+      final char delimiter = getWildcardConfiguration().getDelimiter();
+
+      builder.append(FEDERATION_BASE_VALIDATION_ADDRESS)
+             .append(delimiter)
+             .append(FEDERATION_EVENTS_LINK_PREFIX)
+             .append(delimiter)
+             .append(suffix);
+
+      return builder.toString();
+   }
+
+   /**
+    * Performs the prefixing for federation control queue name that places the 
queues
+    * into the name-space of federation related internal queues.
+    *
+    * @param suffix
+    *    A suffix to append to the federation control link (normally the AMQP 
link name).
+    *
+    * @return the full internal queue name to use for the given suffix.
+    */
+   String prefixControlLinkQueueName(String suffix) {
+      final StringBuilder builder = new StringBuilder();
+      final char delimiter = getWildcardConfiguration().getDelimiter();
+
+      builder.append(FEDERATION_BASE_VALIDATION_ADDRESS)
+             .append(delimiter)
+             .append(FEDERATION_CONTROL_LINK_PREFIX)
+             .append(delimiter)
+             .append(suffix);
+
+      return builder.toString();
+   }
+
    /**
     * Adds a remote linked closed event interceptor that can intercept the 
closed event and
     * if it returns true indicate that the close has been handled and that no 
further action
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCommandDispatcher.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCommandDispatcher.java
index 33b977c3fd..5e08346db5 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCommandDispatcher.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCommandDispatcher.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.connect.federation;
 
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederation.FEDERATION_INSTANCE_RECORD;
+
 import java.util.Objects;
 
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -24,11 +26,13 @@ import 
org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
 import 
org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
 import 
org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy;
 import 
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
 import 
org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.SenderController;
+import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Sender;
 
 /**
@@ -42,6 +46,8 @@ public class AMQPFederationCommandDispatcher implements 
SenderController {
    private final AMQPSessionCallback session;
    private final ActiveMQServer server;
 
+   private String controlAddress;
+
    AMQPFederationCommandDispatcher(Sender sender, ActiveMQServer server, 
AMQPSessionCallback session) {
       this.session = session;
       this.sender = sender;
@@ -105,33 +111,40 @@ public class AMQPFederationCommandDispatcher implements 
SenderController {
 
    @Override
    public Consumer init(ProtonServerSenderContext senderContext) throws 
Exception {
+      final Connection protonConnection = 
senderContext.getSender().getSession().getConnection();
+      final org.apache.qpid.proton.engine.Record attachments = 
protonConnection.attachments();
+      final AMQPFederation federation = 
attachments.get(FEDERATION_INSTANCE_RECORD, AMQPFederation.class);
+
+      if (federation == null) {
+         throw new ActiveMQAMQPIllegalStateException("Cannot create a 
federation link from non-federation connection");
+      }
+
       // Get the dynamically generated name to use for local creation of a 
matching temporary
       // queue that we will send control message to and the broker will 
dispatch as remote
       // credit is made available.
-      final SimpleString queueName = 
SimpleString.toSimpleString(sender.getRemoteTarget().getAddress());
+      controlAddress = 
federation.prefixControlLinkQueueName(sender.getRemoteTarget().getAddress());
 
       try {
-         session.createTemporaryQueue(queueName, RoutingType.ANYCAST);
+         
session.createTemporaryQueue(SimpleString.toSimpleString(getControlLinkAddress()),
 RoutingType.ANYCAST, 1);
       } catch (Exception e) {
          throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
       }
 
-      return (Consumer) session.createSender(senderContext, queueName, null, 
false);
+      return (Consumer) session.createSender(senderContext, 
SimpleString.toSimpleString(getControlLinkAddress()), null, false);
    }
 
    @Override
    public void close() throws Exception {
       // Make a best effort to remove the temporary queue used for control 
commands on close.
-      final SimpleString queueName = 
SimpleString.toSimpleString(sender.getRemoteTarget().getAddress());
 
       try {
-         session.removeTemporaryQueue(queueName);
+         
session.removeTemporaryQueue(SimpleString.toSimpleString(getControlLinkAddress()));
       } catch (Exception e) {
          // Ignored as the temporary queue should be removed on connection 
termination.
       }
    }
 
    private String getControlLinkAddress() {
-      return sender.getRemoteTarget().getAddress();
+      return controlAddress;
    }
 }
\ No newline at end of file
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java
index 85183c121d..b0e75163bb 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java
@@ -30,10 +30,39 @@ public final class AMQPFederationConstants {
 
    /**
     * Address used by a remote broker instance to validate that an incoming 
federation connection
-    * has access right to perform federation operations. The user that 
connects to the AMQP federation
-    * endpoint and attempt to create the control link must have write access 
to this address.
-    */
-   public static final String FEDERATION_CONTROL_LINK_VALIDATION_ADDRESS = 
"$ACTIVEMQ_ARTEMIS_FEDERATION";
+    * has access rights to perform federation operations. The user that 
connects to the AMQP federation
+    * endpoint and attempts to create the control link must have write access 
to this address and any
+    * address prefixed by this value.
+    *
+    * When securing a federation user account the user must have read and 
write permissions to addresses
+    * under this prefix using the broker defined delimiter, this include the 
ability to create non-durable
+    * resources.
+    *
+    * <pre>
+    *    $ACTIVEMQ_ARTEMIS_FEDERATION.#;
+    * </pre>
+    */
+   public static final String FEDERATION_BASE_VALIDATION_ADDRESS = 
"$ACTIVEMQ_ARTEMIS_FEDERATION";
+
+   /**
+    * The prefix value added when creating a federation control link beyond 
the initial portion of the
+    * validation address prefix. Links for command and control of federation 
operations follow the form:
+    *
+    * <pre>
+    *    $ACTIVEMQ_ARTEMIS_FEDERATION.control.&lt;unique-id&gt;
+    * </pre>
+    */
+   public static final String FEDERATION_CONTROL_LINK_PREFIX = "control";
+
+   /**
+    * The prefix value added when creating a federation events links beyond 
the initial portion of the
+    * validation address prefix. Links for federation events follow the form:
+    *
+    * <pre>
+    *    $ACTIVEMQ_ARTEMIS_FEDERATION.events.&lt;unique-id&gt;
+    * </pre>
+    */
+   public static final String FEDERATION_EVENTS_LINK_PREFIX = "events";
 
    /**
     * A desired capability added to the federation control link that must be 
offered
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventDispatcher.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventDispatcher.java
index 703b054e1e..b3e69b1eeb 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventDispatcher.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventDispatcher.java
@@ -68,6 +68,8 @@ public class AMQPFederationEventDispatcher implements 
SenderController, ActiveMQ
    private final Set<String> addressWatches = new HashSet<>();
    private final Set<String> queueWatches = new HashSet<>();
 
+   private String eventsAddress;
+
    public AMQPFederationEventDispatcher(AMQPFederation federation, 
AMQPSessionCallback session, Sender sender) {
       this.session = session;
       this.sender = sender;
@@ -76,7 +78,7 @@ public class AMQPFederationEventDispatcher implements 
SenderController, ActiveMQ
    }
 
    private String getEventsLinkAddress() {
-      return sender.getName();
+      return eventsAddress;
    }
 
    /**
@@ -100,8 +102,7 @@ public class AMQPFederationEventDispatcher implements 
SenderController, ActiveMQ
    public Consumer init(ProtonServerSenderContext senderContext) throws 
Exception {
       final Connection protonConnection = 
senderContext.getSender().getSession().getConnection();
       final org.apache.qpid.proton.engine.Record attachments = 
protonConnection.attachments();
-
-      AMQPFederation federation = attachments.get(FEDERATION_INSTANCE_RECORD, 
AMQPFederation.class);
+      final AMQPFederation federation = 
attachments.get(FEDERATION_INSTANCE_RECORD, AMQPFederation.class);
 
       if (federation == null) {
          throw new ActiveMQAMQPIllegalStateException("Cannot create a 
federation link from non-federation connection");
@@ -115,7 +116,7 @@ public class AMQPFederationEventDispatcher implements 
SenderController, ActiveMQ
 
       // Create a temporary queue using the unique link name which is where 
events will
       // be sent to so that they can be held until credit is granted by the 
remote.
-      final SimpleString queueName = 
SimpleString.toSimpleString(sender.getName());
+      eventsAddress = federation.prefixEventsLinkQueueName(sender.getName());
 
       if (sender.getLocalState() != EndpointState.ACTIVE) {
          // Indicate that event link capabilities is supported.
@@ -131,11 +132,11 @@ public class AMQPFederationEventDispatcher implements 
SenderController, ActiveMQ
             throw new ActiveMQAMQPInternalErrorException("Remote Terminus did 
not arrive as dynamic node: " + remoteTerminus);
          }
 
-         remoteTerminus.setAddress(queueName.toString());
+         remoteTerminus.setAddress(getEventsLinkAddress());
       }
 
       try {
-         session.createTemporaryQueue(queueName, RoutingType.ANYCAST);
+         
session.createTemporaryQueue(SimpleString.toSimpleString(getEventsLinkAddress()),
 RoutingType.ANYCAST, 1);
       } catch (Exception e) {
          throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
       }
@@ -145,18 +146,16 @@ public class AMQPFederationEventDispatcher implements 
SenderController, ActiveMQ
 
       server.registerBrokerPlugin(this); // Start listening for bindings and 
consumer events.
 
-      return (Consumer) session.createSender(senderContext, queueName, null, 
false);
+      return (Consumer) session.createSender(senderContext, 
SimpleString.toSimpleString(getEventsLinkAddress()), null, false);
    }
 
    @Override
    public void close() {
       // Make a best effort to remove the temporary queue used for event 
messages on close.
-      final SimpleString queueName = 
SimpleString.toSimpleString(sender.getRemoteTarget().getAddress());
-
       server.unRegisterBrokerPlugin(this);
 
       try {
-         session.removeTemporaryQueue(queueName);
+         
session.removeTemporaryQueue(SimpleString.toSimpleString(getEventsLinkAddress()));
       } catch (Exception e) {
          // Ignored as the temporary queue should be removed on connection 
termination.
       }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index e1f6fe192c..5e8c76f14a 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -79,7 +79,7 @@ import java.lang.invoke.MethodHandles;
 
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK_VALIDATION_ADDRESS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_BASE_VALIDATION_ADDRESS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_EVENT_LINK;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_QUEUE_RECEIVER;
 import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_LINK_INITIALIZER_KEY;
@@ -472,7 +472,7 @@ public class AMQPConnectionContext extends 
ProtonInitializable implements EventH
    private void handleFederationControlLinkOpened(AMQPSessionContext 
protonSession, Receiver receiver) throws Exception {
       try {
          try {
-            
protonSession.getSessionSPI().check(SimpleString.toSimpleString(FEDERATION_CONTROL_LINK_VALIDATION_ADDRESS),
 CheckType.SEND, getSecurityAuth());
+            
protonSession.getSessionSPI().check(SimpleString.toSimpleString(FEDERATION_BASE_VALIDATION_ADDRESS),
 CheckType.SEND, getSecurityAuth());
          } catch (ActiveMQSecurityException e) {
             throw new ActiveMQAMQPSecurityException(
                "User does not have permission to attach to the federation 
control address");
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/Match.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/Match.java
index 37a2b41d3c..bddc87dbf0 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/Match.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/Match.java
@@ -36,6 +36,10 @@ public class Match<T> {
 
    private static final String DOT_REPLACEMENT = "\\.";
 
+   private static final String DOLLAR = "$";
+
+   private static final String DOLLAR_REPLACEMENT = "\\$";
+
    private final String match;
 
    private final Pattern pattern;
@@ -75,6 +79,7 @@ public class Match<T> {
             actMatch = 
actMatch.replace(wildcardConfiguration.getDelimiterString() + 
wildcardConfiguration.getAnyWordsString(), 
wildcardConfiguration.getAnyWordsString());
          }
          actMatch = actMatch.replace(Match.DOT, Match.DOT_REPLACEMENT);
+         actMatch = actMatch.replace(Match.DOLLAR, Match.DOLLAR_REPLACEMENT);
          actMatch = 
actMatch.replace(wildcardConfiguration.getSingleWordString(), 
String.format(WORD_WILDCARD_REPLACEMENT_FORMAT, 
Pattern.quote(wildcardConfiguration.getDelimiterString())));
 
          if (direct) {
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/impl/MatchTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/impl/MatchTest.java
index 130bbaa8ba..61df153473 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/impl/MatchTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/impl/MatchTest.java
@@ -78,6 +78,29 @@ public class MatchTest {
       Assert.assertFalse(predicate.test("testing.A"));
       Assert.assertFalse(predicate.test("test"));
       Assert.assertFalse(predicate.test("test.A.B"));
+   }
+
+   @Test
+   public void testDollarMatchingDirectTrue() {
+      final Pattern pattern = Match.createPattern("$test.#", new 
WildcardConfiguration(), true);
+      final Predicate<String> predicate = pattern.asPredicate();
+
+      Assert.assertTrue(predicate.test("$test.A"));
+      Assert.assertTrue(predicate.test("$test.A.B"));
+
+      Assert.assertFalse(predicate.test("$testing.A"));
+      Assert.assertFalse(predicate.test("$test"));
+   }
+
+   @Test
+   public void testDollarMatchingDirectFalse() {
+      final Pattern pattern = Match.createPattern("$test.#", new 
WildcardConfiguration(), false);
+      final Predicate<String> predicate = pattern.asPredicate();
+
+      Assert.assertTrue(predicate.test("$test"));
+      Assert.assertTrue(predicate.test("$test.A"));
+      Assert.assertTrue(predicate.test("$test.A.B"));
 
+      Assert.assertFalse(predicate.test("$testing.A"));
    }
 }
\ No newline at end of file
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
index 13b2619ed5..13ddb976be 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
@@ -28,7 +28,9 @@ import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADD_QUEUE_POLICY;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONFIGURATION;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK_VALIDATION_ADDRESS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK_PREFIX;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_BASE_VALIDATION_ADDRESS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_EVENTS_LINK_PREFIX;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_EVENT_LINK;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LARGE_MESSAGE_THRESHOLD;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LINK_ATTACH_TIMEOUT;
@@ -62,10 +64,9 @@ import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFedera
 import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement;
 import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
-import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
 import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
@@ -158,21 +159,23 @@ public class AMQPFederationConnectTest extends 
AmqpClientTestSupport {
       federationConfiguration.put(IGNORE_QUEUE_CONSUMER_PRIORITIES, 
AMQP_INGNORE_CONSUMER_PRIORITIES);
       federationConfiguration.put(AmqpSupport.TUNNEL_CORE_MESSAGES, 
AMQP_TUNNEL_CORE_MESSAGES);
 
+      final String controlLinkAddress = "test-control-address";
+
       try (ProtonTestServer peer = new ProtonTestServer()) {
          peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
          peer.expectOpen().respond();
          peer.expectBegin().respond();
          peer.expectAttach().ofSender()
-                            
.withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString())
-                            .withName(allOf(containsString("Federation"), 
containsString("myFederation")))
+                            
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+                            .withName(allOf(containsString("federation-"), 
containsString("myFederation")))
                             .withProperty(FEDERATION_CONFIGURATION.toString(), 
federationConfiguration)
                             .withTarget().withDynamic(true)
                                          .withCapabilities("temporary-topic")
                             .and()
                             .respond()
-                            .withTarget().withAddress("test-control-address")
+                            .withTarget().withAddress(controlLinkAddress)
                             .and()
-                            
.withOfferedCapabilities(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString());
+                            
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
          peer.start();
 
          final URI remoteURI = peer.getServerURI();
@@ -193,7 +196,9 @@ public class AMQPFederationConnectTest extends 
AmqpClientTestSupport {
 
          peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
 
-         Wait.assertTrue(() -> server.locateQueue("test-control-address") != 
null);
+         Wait.assertTrue(() -> 
server.locateQueue(FEDERATION_BASE_VALIDATION_ADDRESS +
+                                                  "." + 
FEDERATION_CONTROL_LINK_PREFIX +
+                                                  "." + controlLinkAddress) != 
null);
       }
    }
 
@@ -203,7 +208,7 @@ public class AMQPFederationConnectTest extends 
AmqpClientTestSupport {
          peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
          peer.expectOpen().respond();
          peer.expectBegin().respond();
-         
peer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond();
+         
peer.expectAttach().ofSender().withDesiredCapability(FEDERATION_CONTROL_LINK.toString()).respond();
          peer.expectConnectionToDrop();
          peer.start();
 
@@ -645,7 +650,7 @@ public class AMQPFederationConnectTest extends 
AmqpClientTestSupport {
 
    @Test(timeout = 20000)
    public void testControlLinkPassesConnectAttemptWhenUserHasPrivledges() 
throws Exception {
-      enableSecurity(server, FEDERATION_CONTROL_LINK_VALIDATION_ADDRESS);
+      enableSecurity(server, FEDERATION_BASE_VALIDATION_ADDRESS);
       server.start();
 
       try (ProtonTestClient peer = new ProtonTestClient()) {
@@ -665,9 +670,31 @@ public class AMQPFederationConnectTest extends 
AmqpClientTestSupport {
       }
    }
 
+   @Test(timeout = 20000)
+   public void 
testControlAndEventsLinksPassesConnectAttemptWhenUserHasPrivledges() throws 
Exception {
+      enableSecurity(server, FEDERATION_BASE_VALIDATION_ADDRESS + ".#");
+      server.start();
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         scriptFederationConnectToRemote(peer, getTestName(), true, fullUser, 
fullPass, true, true);
+         peer.connect("localhost", AMQP_PORT);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+
+         server.stop();
+
+         logger.info("Test stopped");
+      }
+   }
+
    @Test(timeout = 20000)
    public void 
testControlLinkRefusesConnectAttemptWhenUseDoesNotHavePrivledgesForControlAddress()
 throws Exception {
-      enableSecurity(server, FEDERATION_CONTROL_LINK_VALIDATION_ADDRESS);
+      enableSecurity(server, FEDERATION_BASE_VALIDATION_ADDRESS);
       server.start();
 
       try (ProtonTestClient peer = new ProtonTestClient()) {
@@ -773,6 +800,222 @@ public class AMQPFederationConnectTest extends 
AmqpClientTestSupport {
       }
    }
 
+   @Test(timeout = 20000)
+   public void testControlLinkSenderQueueCreatedWithMaxConsumersOfOne() throws 
Exception {
+      final String controlLinkAddress = "test-control-address";
+      final String federationControlSenderAddress = 
FEDERATION_BASE_VALIDATION_ADDRESS +
+                                                    "." + 
FEDERATION_CONTROL_LINK_PREFIX +
+                                                    "." + controlLinkAddress;
+
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.expectAttach().ofSender()
+                            
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+                            .withName(allOf(containsString("federation-"), 
containsString("myFederation")))
+                            .withTarget().withDynamic(true)
+                                         .withCapabilities("temporary-topic")
+                            .and()
+                            .respond()
+                            .withTarget().withAddress(controlLinkAddress)
+                            .and()
+                            
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+         final AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(
+            getTestName(), "tcp://" + remoteURI.getHost() + ":" + 
remoteURI.getPort());
+         amqpConnection.setReconnectAttempts(0);// No reconnects
+         final AMQPFederatedBrokerConnectionElement federation = new 
AMQPFederatedBrokerConnectionElement("myFederation");
+         amqpConnection.addElement(federation);
+
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         Wait.assertTrue(() -> 
server.locateQueue(federationControlSenderAddress) != null);
+
+         // Try and bind to the control address which should be rejected as 
the queue
+         // was created with max consumers of one.
+         peer.expectAttach().ofSender()
+                            .withName("test-control-link-suspect")
+                            .withNullSource();
+         peer.expectDetach().withClosed(true)
+                            .withError(AmqpError.INTERNAL_ERROR.toString());
+         peer.remoteAttach().ofReceiver()
+                            .withName("test-control-link-suspect")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            .withTarget().also()
+                            
.withSource().withAddress(federationControlSenderAddress)
+                                         .also()
+                                         .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+      }
+   }
+
+   @Test(timeout = 20000)
+   public void testEventSenderLinkFromTargetUsesNamespacedDynamicQueue() 
throws Exception {
+      final String federationControlLinkName = "federation-test";
+
+      server.start();
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         peer.queueClientSaslAnonymousConnect();
+         peer.remoteOpen().queue();
+         peer.expectOpen();
+         peer.remoteBegin().queue();
+         peer.expectBegin();
+         peer.remoteAttach().ofSender()
+                            .withName(federationControlLinkName)
+                            
.withDesiredCapabilities(FEDERATION_CONTROL_LINK.toString())
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            .withSource().also()
+                            .withTarget().withDynamic(true)
+                                         .withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withLifetimePolicyOfDeleteOnClose()
+                                         .withCapabilities("temporary-topic")
+                                         .also()
+                            .queue();
+         peer.expectAttach().ofReceiver()
+                            .withName(federationControlLinkName)
+                            .withTarget()
+                               .withAddress(notNullValue())
+                            .also()
+                            
.withOfferedCapability(FEDERATION_CONTROL_LINK.toString());
+         peer.expectFlow();
+
+         final String federationEventsSenderLinkName = "events-receiver-test";
+
+         peer.remoteAttach().ofReceiver()
+                            .withName(federationEventsSenderLinkName)
+                            
.withDesiredCapabilities(FEDERATION_EVENT_LINK.toString())
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            .withTarget().also()
+                            .withSource().withDynamic(true)
+                                         .withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withLifetimePolicyOfDeleteOnClose()
+                                         .withCapabilities("temporary-topic")
+                                         .also()
+                                         .queue();
+         peer.remoteFlow().withLinkCredit(10).queue();
+         peer.expectAttach().ofSender()
+                            .withName(federationEventsSenderLinkName)
+                            .withSource()
+                               .withAddress(notNullValue())
+                            .also()
+                            
.withOfferedCapability(FEDERATION_EVENT_LINK.toString());
+
+         peer.connect("localhost", AMQP_PORT);
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         // The events receiver from the remote should trigger a temporary 
queue to be created on
+         // the server to allow sends of events beyond currently available 
credit.
+         Wait.assertTrue(() -> 
server.locateQueue(FEDERATION_BASE_VALIDATION_ADDRESS +
+                                                  "." + 
FEDERATION_EVENTS_LINK_PREFIX +
+                                                  "." + 
federationEventsSenderLinkName) != null);
+
+         server.stop();
+      }
+   }
+
+   @Test(timeout = 20000)
+   public void testEventsLinkAtTargetIsCreatedWithMaxConsumersOfOne() throws 
Exception {
+      final String federationControlLinkName = "federation-test";
+
+      server.start();
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         peer.queueClientSaslAnonymousConnect();
+         peer.remoteOpen().queue();
+         peer.expectOpen();
+         peer.remoteBegin().queue();
+         peer.expectBegin();
+         peer.remoteAttach().ofSender()
+                            .withName(federationControlLinkName)
+                            
.withDesiredCapabilities(FEDERATION_CONTROL_LINK.toString())
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            .withSource().also()
+                            .withTarget().withDynamic(true)
+                                         .withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withLifetimePolicyOfDeleteOnClose()
+                                         .withCapabilities("temporary-topic")
+                                         .also()
+                            .queue();
+         peer.expectAttach().ofReceiver()
+                            .withName(federationControlLinkName)
+                            .withTarget()
+                               .withAddress(notNullValue())
+                            .also()
+                            
.withOfferedCapability(FEDERATION_CONTROL_LINK.toString());
+         peer.expectFlow();
+
+         final String federationEventsSenderLinkName = "events-receiver-test";
+         final String federationEventsSenderAddress = 
FEDERATION_BASE_VALIDATION_ADDRESS +
+                                                      "." + 
FEDERATION_EVENTS_LINK_PREFIX +
+                                                      "." + 
federationEventsSenderLinkName;
+
+         peer.remoteAttach().ofReceiver()
+                            .withName(federationEventsSenderLinkName)
+                            
.withDesiredCapabilities(FEDERATION_EVENT_LINK.toString())
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            .withTarget().also()
+                            .withSource().withDynamic(true)
+                                         .withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withLifetimePolicyOfDeleteOnClose()
+                                         .withCapabilities("temporary-topic")
+                                         .also()
+                                         .queue();
+         peer.remoteFlow().withLinkCredit(10).queue();
+         peer.expectAttach().ofSender()
+                            .withName(federationEventsSenderLinkName)
+                            .withSource()
+                               .withAddress(notNullValue())
+                            .also()
+                            
.withOfferedCapability(FEDERATION_EVENT_LINK.toString());
+
+         peer.connect("localhost", AMQP_PORT);
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         // The events receiver from the remote should trigger a temporary 
queue to be created on
+         // the server to allow sends of events beyond currently available 
credit.
+         Wait.assertTrue(() -> 
server.locateQueue(federationEventsSenderAddress) != null);
+
+         // Try and bind to the events address which should be rejected as the 
queue
+         // was created with max consumers of one.
+         peer.expectAttach().ofSender()
+                            .withName("test-events-link-suspect")
+                            .withNullSource();
+         peer.expectDetach().withClosed(true)
+                            .withError(AmqpError.INTERNAL_ERROR.toString());
+         peer.remoteAttach().ofReceiver()
+                            .withName("test-events-link-suspect")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            .withTarget().also()
+                            
.withSource().withAddress(federationEventsSenderAddress)
+                                         .also()
+                                         .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         server.stop();
+      }
+   }
+
    // Use these methods to script the initial handshake that a broker that is 
establishing
    // a federation connection with a remote broker instance would perform.
 


Reply via email to