This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new d7a7116a4c ARTEMIS-4754 Structure the names used for federation
internal queues
d7a7116a4c is described below
commit d7a7116a4c31bdd830e66f54240b7204a83101ae
Author: Timothy Bish <[email protected]>
AuthorDate: Wed May 1 16:39:23 2024 -0400
ARTEMIS-4754 Structure the names used for federation internal queues
When creating internal temporary queues for the federation control links
and the
events link we should use a structured naming convention to ease in
configuring
security for the federation user where all internal names fall under a root
prefix
which can be used to grant read and write access for the federation user.
This
change allows security on the wildcarded address
"$ACTIVEMQ_ARTEMIS_FEDERATION.#".
This change also includes some further restrictions added to federation
resources
and adds support for wildcarding '$' prefixed addresses.
---
.../protocol/amqp/broker/AMQPSessionCallback.java | 21 +-
.../amqp/connect/federation/AMQPFederation.java | 48 ++++
.../AMQPFederationCommandDispatcher.java | 25 +-
.../federation/AMQPFederationConstants.java | 37 ++-
.../federation/AMQPFederationEventDispatcher.java | 19 +-
.../amqp/proton/AMQPConnectionContext.java | 4 +-
.../activemq/artemis/core/settings/impl/Match.java | 5 +
.../artemis/core/settings/impl/MatchTest.java | 23 ++
.../amqp/connect/AMQPFederationConnectTest.java | 265 ++++++++++++++++++++-
9 files changed, 412 insertions(+), 35 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index c67cac0a1d..4612f7873b 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -263,15 +263,32 @@ public class AMQPSessionCallback implements
SessionCallback {
}
public void createTemporaryQueue(SimpleString queueName, RoutingType
routingType) throws Exception {
- createTemporaryQueue(queueName, queueName, routingType, null);
+ createTemporaryQueue(queueName, queueName, routingType, null, null);
+ }
+
+ public void createTemporaryQueue(SimpleString queueName, RoutingType
routingType, Integer maxConsumers) throws Exception {
+ createTemporaryQueue(queueName, queueName, routingType, null,
maxConsumers);
}
public void createTemporaryQueue(SimpleString address,
SimpleString queueName,
RoutingType routingType,
SimpleString filter) throws Exception {
+ createTemporaryQueue(address, queueName, routingType, filter, null);
+ }
+
+ public void createTemporaryQueue(SimpleString address,
+ SimpleString queueName,
+ RoutingType routingType,
+ SimpleString filter,
+ Integer maxConsumers) throws Exception {
try {
- serverSession.createQueue(new
QueueConfiguration(queueName).setAddress(address).setRoutingType(routingType).setFilterString(filter).setTemporary(true).setDurable(false));
+ serverSession.createQueue(new
QueueConfiguration(queueName).setAddress(address)
+
.setRoutingType(routingType)
+
.setFilterString(filter)
+
.setTemporary(true)
+
.setDurable(false)
+
.setMaxConsumers(maxConsumers));
} catch (ActiveMQSecurityException se) {
throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingTempDestination(se.getMessage());
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java
index 1f3c818155..891e264111 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java
@@ -17,6 +17,10 @@
package org.apache.activemq.artemis.protocol.amqp.connect.federation;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK_PREFIX;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_BASE_VALIDATION_ADDRESS;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_EVENTS_LINK_PREFIX;
+
import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.Objects;
@@ -170,6 +174,50 @@ public abstract class AMQPFederation implements
FederationInternal {
}
}
+ /**
+ * Performs the prefixing for federation events queues that places the
events queues into
+ * the name-space of federation related internal queues.
+ *
+ * @param suffix
+ * A suffix to append to the federation events link (normally the AMQP
link name).
+ *
+ * @return the full internal queue name to use for the given suffix.
+ */
+ String prefixEventsLinkQueueName(String suffix) {
+ final StringBuilder builder = new StringBuilder();
+ final char delimiter = getWildcardConfiguration().getDelimiter();
+
+ builder.append(FEDERATION_BASE_VALIDATION_ADDRESS)
+ .append(delimiter)
+ .append(FEDERATION_EVENTS_LINK_PREFIX)
+ .append(delimiter)
+ .append(suffix);
+
+ return builder.toString();
+ }
+
+ /**
+ * Performs the prefixing for federation control queue name that places the
queues
+ * into the name-space of federation related internal queues.
+ *
+ * @param suffix
+ * A suffix to append to the federation control link (normally the AMQP
link name).
+ *
+ * @return the full internal queue name to use for the given suffix.
+ */
+ String prefixControlLinkQueueName(String suffix) {
+ final StringBuilder builder = new StringBuilder();
+ final char delimiter = getWildcardConfiguration().getDelimiter();
+
+ builder.append(FEDERATION_BASE_VALIDATION_ADDRESS)
+ .append(delimiter)
+ .append(FEDERATION_CONTROL_LINK_PREFIX)
+ .append(delimiter)
+ .append(suffix);
+
+ return builder.toString();
+ }
+
/**
* Adds a remote linked closed event interceptor that can intercept the
closed event and
* if it returns true indicate that the close has been handled and that no
further action
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCommandDispatcher.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCommandDispatcher.java
index 33b977c3fd..5e08346db5 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCommandDispatcher.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCommandDispatcher.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.protocol.amqp.connect.federation;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederation.FEDERATION_INSTANCE_RECORD;
+
import java.util.Objects;
import org.apache.activemq.artemis.api.core.RoutingType;
@@ -24,11 +26,13 @@ import
org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
import
org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
import
org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy;
import
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import
org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.proton.SenderController;
+import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Sender;
/**
@@ -42,6 +46,8 @@ public class AMQPFederationCommandDispatcher implements
SenderController {
private final AMQPSessionCallback session;
private final ActiveMQServer server;
+ private String controlAddress;
+
AMQPFederationCommandDispatcher(Sender sender, ActiveMQServer server,
AMQPSessionCallback session) {
this.session = session;
this.sender = sender;
@@ -105,33 +111,40 @@ public class AMQPFederationCommandDispatcher implements
SenderController {
@Override
public Consumer init(ProtonServerSenderContext senderContext) throws
Exception {
+ final Connection protonConnection =
senderContext.getSender().getSession().getConnection();
+ final org.apache.qpid.proton.engine.Record attachments =
protonConnection.attachments();
+ final AMQPFederation federation =
attachments.get(FEDERATION_INSTANCE_RECORD, AMQPFederation.class);
+
+ if (federation == null) {
+ throw new ActiveMQAMQPIllegalStateException("Cannot create a
federation link from non-federation connection");
+ }
+
// Get the dynamically generated name to use for local creation of a
matching temporary
// queue that we will send control message to and the broker will
dispatch as remote
// credit is made available.
- final SimpleString queueName =
SimpleString.toSimpleString(sender.getRemoteTarget().getAddress());
+ controlAddress =
federation.prefixControlLinkQueueName(sender.getRemoteTarget().getAddress());
try {
- session.createTemporaryQueue(queueName, RoutingType.ANYCAST);
+
session.createTemporaryQueue(SimpleString.toSimpleString(getControlLinkAddress()),
RoutingType.ANYCAST, 1);
} catch (Exception e) {
throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
}
- return (Consumer) session.createSender(senderContext, queueName, null,
false);
+ return (Consumer) session.createSender(senderContext,
SimpleString.toSimpleString(getControlLinkAddress()), null, false);
}
@Override
public void close() throws Exception {
// Make a best effort to remove the temporary queue used for control
commands on close.
- final SimpleString queueName =
SimpleString.toSimpleString(sender.getRemoteTarget().getAddress());
try {
- session.removeTemporaryQueue(queueName);
+
session.removeTemporaryQueue(SimpleString.toSimpleString(getControlLinkAddress()));
} catch (Exception e) {
// Ignored as the temporary queue should be removed on connection
termination.
}
}
private String getControlLinkAddress() {
- return sender.getRemoteTarget().getAddress();
+ return controlAddress;
}
}
\ No newline at end of file
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java
index 85183c121d..b0e75163bb 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java
@@ -30,10 +30,39 @@ public final class AMQPFederationConstants {
/**
* Address used by a remote broker instance to validate that an incoming
federation connection
- * has access right to perform federation operations. The user that
connects to the AMQP federation
- * endpoint and attempt to create the control link must have write access
to this address.
- */
- public static final String FEDERATION_CONTROL_LINK_VALIDATION_ADDRESS =
"$ACTIVEMQ_ARTEMIS_FEDERATION";
+ * has access rights to perform federation operations. The user that
connects to the AMQP federation
+ * endpoint and attempts to create the control link must have write access
to this address and any
+ * address prefixed by this value.
+ *
+ * When securing a federation user account the user must have read and
write permissions to addresses
+ * under this prefix using the broker defined delimiter, this include the
ability to create non-durable
+ * resources.
+ *
+ * <pre>
+ * $ACTIVEMQ_ARTEMIS_FEDERATION.#;
+ * </pre>
+ */
+ public static final String FEDERATION_BASE_VALIDATION_ADDRESS =
"$ACTIVEMQ_ARTEMIS_FEDERATION";
+
+ /**
+ * The prefix value added when creating a federation control link beyond
the initial portion of the
+ * validation address prefix. Links for command and control of federation
operations follow the form:
+ *
+ * <pre>
+ * $ACTIVEMQ_ARTEMIS_FEDERATION.control.<unique-id>
+ * </pre>
+ */
+ public static final String FEDERATION_CONTROL_LINK_PREFIX = "control";
+
+ /**
+ * The prefix value added when creating a federation events links beyond
the initial portion of the
+ * validation address prefix. Links for federation events follow the form:
+ *
+ * <pre>
+ * $ACTIVEMQ_ARTEMIS_FEDERATION.events.<unique-id>
+ * </pre>
+ */
+ public static final String FEDERATION_EVENTS_LINK_PREFIX = "events";
/**
* A desired capability added to the federation control link that must be
offered
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventDispatcher.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventDispatcher.java
index 703b054e1e..b3e69b1eeb 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventDispatcher.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationEventDispatcher.java
@@ -68,6 +68,8 @@ public class AMQPFederationEventDispatcher implements
SenderController, ActiveMQ
private final Set<String> addressWatches = new HashSet<>();
private final Set<String> queueWatches = new HashSet<>();
+ private String eventsAddress;
+
public AMQPFederationEventDispatcher(AMQPFederation federation,
AMQPSessionCallback session, Sender sender) {
this.session = session;
this.sender = sender;
@@ -76,7 +78,7 @@ public class AMQPFederationEventDispatcher implements
SenderController, ActiveMQ
}
private String getEventsLinkAddress() {
- return sender.getName();
+ return eventsAddress;
}
/**
@@ -100,8 +102,7 @@ public class AMQPFederationEventDispatcher implements
SenderController, ActiveMQ
public Consumer init(ProtonServerSenderContext senderContext) throws
Exception {
final Connection protonConnection =
senderContext.getSender().getSession().getConnection();
final org.apache.qpid.proton.engine.Record attachments =
protonConnection.attachments();
-
- AMQPFederation federation = attachments.get(FEDERATION_INSTANCE_RECORD,
AMQPFederation.class);
+ final AMQPFederation federation =
attachments.get(FEDERATION_INSTANCE_RECORD, AMQPFederation.class);
if (federation == null) {
throw new ActiveMQAMQPIllegalStateException("Cannot create a
federation link from non-federation connection");
@@ -115,7 +116,7 @@ public class AMQPFederationEventDispatcher implements
SenderController, ActiveMQ
// Create a temporary queue using the unique link name which is where
events will
// be sent to so that they can be held until credit is granted by the
remote.
- final SimpleString queueName =
SimpleString.toSimpleString(sender.getName());
+ eventsAddress = federation.prefixEventsLinkQueueName(sender.getName());
if (sender.getLocalState() != EndpointState.ACTIVE) {
// Indicate that event link capabilities is supported.
@@ -131,11 +132,11 @@ public class AMQPFederationEventDispatcher implements
SenderController, ActiveMQ
throw new ActiveMQAMQPInternalErrorException("Remote Terminus did
not arrive as dynamic node: " + remoteTerminus);
}
- remoteTerminus.setAddress(queueName.toString());
+ remoteTerminus.setAddress(getEventsLinkAddress());
}
try {
- session.createTemporaryQueue(queueName, RoutingType.ANYCAST);
+
session.createTemporaryQueue(SimpleString.toSimpleString(getEventsLinkAddress()),
RoutingType.ANYCAST, 1);
} catch (Exception e) {
throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
}
@@ -145,18 +146,16 @@ public class AMQPFederationEventDispatcher implements
SenderController, ActiveMQ
server.registerBrokerPlugin(this); // Start listening for bindings and
consumer events.
- return (Consumer) session.createSender(senderContext, queueName, null,
false);
+ return (Consumer) session.createSender(senderContext,
SimpleString.toSimpleString(getEventsLinkAddress()), null, false);
}
@Override
public void close() {
// Make a best effort to remove the temporary queue used for event
messages on close.
- final SimpleString queueName =
SimpleString.toSimpleString(sender.getRemoteTarget().getAddress());
-
server.unRegisterBrokerPlugin(this);
try {
- session.removeTemporaryQueue(queueName);
+
session.removeTemporaryQueue(SimpleString.toSimpleString(getEventsLinkAddress()));
} catch (Exception e) {
// Ignored as the temporary queue should be removed on connection
termination.
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index e1f6fe192c..5e8c76f14a 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -79,7 +79,7 @@ import java.lang.invoke.MethodHandles;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK_VALIDATION_ADDRESS;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_BASE_VALIDATION_ADDRESS;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_EVENT_LINK;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_QUEUE_RECEIVER;
import static
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_LINK_INITIALIZER_KEY;
@@ -472,7 +472,7 @@ public class AMQPConnectionContext extends
ProtonInitializable implements EventH
private void handleFederationControlLinkOpened(AMQPSessionContext
protonSession, Receiver receiver) throws Exception {
try {
try {
-
protonSession.getSessionSPI().check(SimpleString.toSimpleString(FEDERATION_CONTROL_LINK_VALIDATION_ADDRESS),
CheckType.SEND, getSecurityAuth());
+
protonSession.getSessionSPI().check(SimpleString.toSimpleString(FEDERATION_BASE_VALIDATION_ADDRESS),
CheckType.SEND, getSecurityAuth());
} catch (ActiveMQSecurityException e) {
throw new ActiveMQAMQPSecurityException(
"User does not have permission to attach to the federation
control address");
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/Match.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/Match.java
index 37a2b41d3c..bddc87dbf0 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/Match.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/Match.java
@@ -36,6 +36,10 @@ public class Match<T> {
private static final String DOT_REPLACEMENT = "\\.";
+ private static final String DOLLAR = "$";
+
+ private static final String DOLLAR_REPLACEMENT = "\\$";
+
private final String match;
private final Pattern pattern;
@@ -75,6 +79,7 @@ public class Match<T> {
actMatch =
actMatch.replace(wildcardConfiguration.getDelimiterString() +
wildcardConfiguration.getAnyWordsString(),
wildcardConfiguration.getAnyWordsString());
}
actMatch = actMatch.replace(Match.DOT, Match.DOT_REPLACEMENT);
+ actMatch = actMatch.replace(Match.DOLLAR, Match.DOLLAR_REPLACEMENT);
actMatch =
actMatch.replace(wildcardConfiguration.getSingleWordString(),
String.format(WORD_WILDCARD_REPLACEMENT_FORMAT,
Pattern.quote(wildcardConfiguration.getDelimiterString())));
if (direct) {
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/impl/MatchTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/impl/MatchTest.java
index 130bbaa8ba..61df153473 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/impl/MatchTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/impl/MatchTest.java
@@ -78,6 +78,29 @@ public class MatchTest {
Assert.assertFalse(predicate.test("testing.A"));
Assert.assertFalse(predicate.test("test"));
Assert.assertFalse(predicate.test("test.A.B"));
+ }
+
+ @Test
+ public void testDollarMatchingDirectTrue() {
+ final Pattern pattern = Match.createPattern("$test.#", new
WildcardConfiguration(), true);
+ final Predicate<String> predicate = pattern.asPredicate();
+
+ Assert.assertTrue(predicate.test("$test.A"));
+ Assert.assertTrue(predicate.test("$test.A.B"));
+
+ Assert.assertFalse(predicate.test("$testing.A"));
+ Assert.assertFalse(predicate.test("$test"));
+ }
+
+ @Test
+ public void testDollarMatchingDirectFalse() {
+ final Pattern pattern = Match.createPattern("$test.#", new
WildcardConfiguration(), false);
+ final Predicate<String> predicate = pattern.asPredicate();
+
+ Assert.assertTrue(predicate.test("$test"));
+ Assert.assertTrue(predicate.test("$test.A"));
+ Assert.assertTrue(predicate.test("$test.A.B"));
+ Assert.assertFalse(predicate.test("$testing.A"));
}
}
\ No newline at end of file
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
index 13b2619ed5..13ddb976be 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
@@ -28,7 +28,9 @@ import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADD_QUEUE_POLICY;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONFIGURATION;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK;
-import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK_VALIDATION_ADDRESS;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK_PREFIX;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_BASE_VALIDATION_ADDRESS;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_EVENTS_LINK_PREFIX;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_EVENT_LINK;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LARGE_MESSAGE_THRESHOLD;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LINK_ATTACH_TIMEOUT;
@@ -62,10 +64,9 @@ import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFedera
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement;
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
-import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
@@ -158,21 +159,23 @@ public class AMQPFederationConnectTest extends
AmqpClientTestSupport {
federationConfiguration.put(IGNORE_QUEUE_CONSUMER_PRIORITIES,
AMQP_INGNORE_CONSUMER_PRIORITIES);
federationConfiguration.put(AmqpSupport.TUNNEL_CORE_MESSAGES,
AMQP_TUNNEL_CORE_MESSAGES);
+ final String controlLinkAddress = "test-control-address";
+
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
-
.withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString())
- .withName(allOf(containsString("Federation"),
containsString("myFederation")))
+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .withName(allOf(containsString("federation-"),
containsString("myFederation")))
.withProperty(FEDERATION_CONFIGURATION.toString(),
federationConfiguration)
.withTarget().withDynamic(true)
.withCapabilities("temporary-topic")
.and()
.respond()
- .withTarget().withAddress("test-control-address")
+ .withTarget().withAddress(controlLinkAddress)
.and()
-
.withOfferedCapabilities(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString());
+
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
peer.start();
final URI remoteURI = peer.getServerURI();
@@ -193,7 +196,9 @@ public class AMQPFederationConnectTest extends
AmqpClientTestSupport {
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
- Wait.assertTrue(() -> server.locateQueue("test-control-address") !=
null);
+ Wait.assertTrue(() ->
server.locateQueue(FEDERATION_BASE_VALIDATION_ADDRESS +
+ "." +
FEDERATION_CONTROL_LINK_PREFIX +
+ "." + controlLinkAddress) !=
null);
}
}
@@ -203,7 +208,7 @@ public class AMQPFederationConnectTest extends
AmqpClientTestSupport {
peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
peer.expectOpen().respond();
peer.expectBegin().respond();
-
peer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond();
+
peer.expectAttach().ofSender().withDesiredCapability(FEDERATION_CONTROL_LINK.toString()).respond();
peer.expectConnectionToDrop();
peer.start();
@@ -645,7 +650,7 @@ public class AMQPFederationConnectTest extends
AmqpClientTestSupport {
@Test(timeout = 20000)
public void testControlLinkPassesConnectAttemptWhenUserHasPrivledges()
throws Exception {
- enableSecurity(server, FEDERATION_CONTROL_LINK_VALIDATION_ADDRESS);
+ enableSecurity(server, FEDERATION_BASE_VALIDATION_ADDRESS);
server.start();
try (ProtonTestClient peer = new ProtonTestClient()) {
@@ -665,9 +670,31 @@ public class AMQPFederationConnectTest extends
AmqpClientTestSupport {
}
}
+ @Test(timeout = 20000)
+ public void
testControlAndEventsLinksPassesConnectAttemptWhenUserHasPrivledges() throws
Exception {
+ enableSecurity(server, FEDERATION_BASE_VALIDATION_ADDRESS + ".#");
+ server.start();
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, getTestName(), true, fullUser,
fullPass, true, true);
+ peer.connect("localhost", AMQP_PORT);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+
+ server.stop();
+
+ logger.info("Test stopped");
+ }
+ }
+
@Test(timeout = 20000)
public void
testControlLinkRefusesConnectAttemptWhenUseDoesNotHavePrivledgesForControlAddress()
throws Exception {
- enableSecurity(server, FEDERATION_CONTROL_LINK_VALIDATION_ADDRESS);
+ enableSecurity(server, FEDERATION_BASE_VALIDATION_ADDRESS);
server.start();
try (ProtonTestClient peer = new ProtonTestClient()) {
@@ -773,6 +800,222 @@ public class AMQPFederationConnectTest extends
AmqpClientTestSupport {
}
}
+ @Test(timeout = 20000)
+ public void testControlLinkSenderQueueCreatedWithMaxConsumersOfOne() throws
Exception {
+ final String controlLinkAddress = "test-control-address";
+ final String federationControlSenderAddress =
FEDERATION_BASE_VALIDATION_ADDRESS +
+ "." +
FEDERATION_CONTROL_LINK_PREFIX +
+ "." + controlLinkAddress;
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .withName(allOf(containsString("federation-"),
containsString("myFederation")))
+ .withTarget().withDynamic(true)
+ .withCapabilities("temporary-topic")
+ .and()
+ .respond()
+ .withTarget().withAddress(controlLinkAddress)
+ .and()
+
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+ final AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration(
+ getTestName(), "tcp://" + remoteURI.getHost() + ":" +
remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ final AMQPFederatedBrokerConnectionElement federation = new
AMQPFederatedBrokerConnectionElement("myFederation");
+ amqpConnection.addElement(federation);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ Wait.assertTrue(() ->
server.locateQueue(federationControlSenderAddress) != null);
+
+ // Try and bind to the control address which should be rejected as
the queue
+ // was created with max consumers of one.
+ peer.expectAttach().ofSender()
+ .withName("test-control-link-suspect")
+ .withNullSource();
+ peer.expectDetach().withClosed(true)
+ .withError(AmqpError.INTERNAL_ERROR.toString());
+ peer.remoteAttach().ofReceiver()
+ .withName("test-control-link-suspect")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+ .withTarget().also()
+
.withSource().withAddress(federationControlSenderAddress)
+ .also()
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testEventSenderLinkFromTargetUsesNamespacedDynamicQueue()
throws Exception {
+ final String federationControlLinkName = "federation-test";
+
+ server.start();
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ peer.queueClientSaslAnonymousConnect();
+ peer.remoteOpen().queue();
+ peer.expectOpen();
+ peer.remoteBegin().queue();
+ peer.expectBegin();
+ peer.remoteAttach().ofSender()
+ .withName(federationControlLinkName)
+
.withDesiredCapabilities(FEDERATION_CONTROL_LINK.toString())
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+ .withSource().also()
+ .withTarget().withDynamic(true)
+ .withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withLifetimePolicyOfDeleteOnClose()
+ .withCapabilities("temporary-topic")
+ .also()
+ .queue();
+ peer.expectAttach().ofReceiver()
+ .withName(federationControlLinkName)
+ .withTarget()
+ .withAddress(notNullValue())
+ .also()
+
.withOfferedCapability(FEDERATION_CONTROL_LINK.toString());
+ peer.expectFlow();
+
+ final String federationEventsSenderLinkName = "events-receiver-test";
+
+ peer.remoteAttach().ofReceiver()
+ .withName(federationEventsSenderLinkName)
+
.withDesiredCapabilities(FEDERATION_EVENT_LINK.toString())
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+ .withTarget().also()
+ .withSource().withDynamic(true)
+ .withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withLifetimePolicyOfDeleteOnClose()
+ .withCapabilities("temporary-topic")
+ .also()
+ .queue();
+ peer.remoteFlow().withLinkCredit(10).queue();
+ peer.expectAttach().ofSender()
+ .withName(federationEventsSenderLinkName)
+ .withSource()
+ .withAddress(notNullValue())
+ .also()
+
.withOfferedCapability(FEDERATION_EVENT_LINK.toString());
+
+ peer.connect("localhost", AMQP_PORT);
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ // The events receiver from the remote should trigger a temporary
queue to be created on
+ // the server to allow sends of events beyond currently available
credit.
+ Wait.assertTrue(() ->
server.locateQueue(FEDERATION_BASE_VALIDATION_ADDRESS +
+ "." +
FEDERATION_EVENTS_LINK_PREFIX +
+ "." +
federationEventsSenderLinkName) != null);
+
+ server.stop();
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testEventsLinkAtTargetIsCreatedWithMaxConsumersOfOne() throws
Exception {
+ final String federationControlLinkName = "federation-test";
+
+ server.start();
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ peer.queueClientSaslAnonymousConnect();
+ peer.remoteOpen().queue();
+ peer.expectOpen();
+ peer.remoteBegin().queue();
+ peer.expectBegin();
+ peer.remoteAttach().ofSender()
+ .withName(federationControlLinkName)
+
.withDesiredCapabilities(FEDERATION_CONTROL_LINK.toString())
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+ .withSource().also()
+ .withTarget().withDynamic(true)
+ .withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withLifetimePolicyOfDeleteOnClose()
+ .withCapabilities("temporary-topic")
+ .also()
+ .queue();
+ peer.expectAttach().ofReceiver()
+ .withName(federationControlLinkName)
+ .withTarget()
+ .withAddress(notNullValue())
+ .also()
+
.withOfferedCapability(FEDERATION_CONTROL_LINK.toString());
+ peer.expectFlow();
+
+ final String federationEventsSenderLinkName = "events-receiver-test";
+ final String federationEventsSenderAddress =
FEDERATION_BASE_VALIDATION_ADDRESS +
+ "." +
FEDERATION_EVENTS_LINK_PREFIX +
+ "." +
federationEventsSenderLinkName;
+
+ peer.remoteAttach().ofReceiver()
+ .withName(federationEventsSenderLinkName)
+
.withDesiredCapabilities(FEDERATION_EVENT_LINK.toString())
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+ .withTarget().also()
+ .withSource().withDynamic(true)
+ .withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withLifetimePolicyOfDeleteOnClose()
+ .withCapabilities("temporary-topic")
+ .also()
+ .queue();
+ peer.remoteFlow().withLinkCredit(10).queue();
+ peer.expectAttach().ofSender()
+ .withName(federationEventsSenderLinkName)
+ .withSource()
+ .withAddress(notNullValue())
+ .also()
+
.withOfferedCapability(FEDERATION_EVENT_LINK.toString());
+
+ peer.connect("localhost", AMQP_PORT);
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ // The events receiver from the remote should trigger a temporary
queue to be created on
+ // the server to allow sends of events beyond currently available
credit.
+ Wait.assertTrue(() ->
server.locateQueue(federationEventsSenderAddress) != null);
+
+ // Try and bind to the events address which should be rejected as the
queue
+ // was created with max consumers of one.
+ peer.expectAttach().ofSender()
+ .withName("test-events-link-suspect")
+ .withNullSource();
+ peer.expectDetach().withClosed(true)
+ .withError(AmqpError.INTERNAL_ERROR.toString());
+ peer.remoteAttach().ofReceiver()
+ .withName("test-events-link-suspect")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+ .withTarget().also()
+
.withSource().withAddress(federationEventsSenderAddress)
+ .also()
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ server.stop();
+ }
+ }
+
// Use these methods to script the initial handshake that a broker that is
establishing
// a federation connection with a remote broker instance would perform.