This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 7fef992b1f ARTEMIS-5381 Use a stable FQQN for federation address
receivers
7fef992b1f is described below
commit 7fef992b1fa17b479ed954145a03bd811a89f396
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Apr 10 11:18:13 2025 -0400
ARTEMIS-5381 Use a stable FQQN for federation address receivers
When subscribing a federation address consumer a stable subscription queue
name must be used to ensure that reconnections recover past subscriptions
and consume messages sent while the receiver was offline. This also supports
connections to previous versions where this was not done to ensure full
backwards and forwards compatibility on federation configurations.
---
.../amqp/connect/federation/AMQPFederation.java | 5 +
.../federation/AMQPFederationAddressConsumer.java | 31 +-
.../AMQPFederationAddressPolicyManager.java | 5 +-
.../AMQPFederationAddressSenderController.java | 16 +-
.../federation/AMQPFederationCapabilities.java | 108 ++++
.../federation/AMQPFederationCommandProcessor.java | 25 +-
.../federation/AMQPFederationConstants.java | 26 +
.../federation/AMQPFederationQueueConsumer.java | 1 +
.../connect/federation/AMQPFederationSource.java | 27 +-
.../connect/federation/AMQPFederationTarget.java | 9 +-
.../protocol/amqp/proton/AMQPSessionContext.java | 4 +-
.../connect/AMQPFederationAddressPolicyTest.java | 549 ++++++++++++++++++++-
.../amqp/connect/AMQPFederationConnectTest.java | 54 +-
.../connect/AMQPFederationServerToServerTest.java | 187 +++++++
14 files changed, 1016 insertions(+), 31 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java
index c3023e134c..770b5e15fa 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java
@@ -164,6 +164,11 @@ public abstract class AMQPFederation implements Federation
{
*/
public abstract AMQPFederationConfiguration getConfiguration();
+ /**
+ * {@return the federation capabilities that is in effect following
negotiation}
+ */
+ public abstract AMQPFederationCapabilities getCapabilities();
+
/**
* Initialize this federation instance if not already initialized.
*
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
index a39df03890..6f3066f7ed 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
@@ -95,10 +95,23 @@ public final class AMQPFederationAddressConsumer extends
AMQPFederationConsumer
}
private String generateLinkName() {
- return "federation-" + federation.getName() +
- "-address-receiver-" + consumerInfo.getAddress() +
- "-" + federation.getServer().getNodeID() +
- "-" + LINK_SEQUENCE_ID.incrementAndGet();
+ if (federation.getCapabilities().isUseFQQNAddressSubscriptions()) {
+ return "federation-" + federation.getName() +
+ "-policy-" + policy.getPolicyName() +
+ "-address-receiver-" + consumerInfo.getAddress() +
+ "-" + federation.getServer().getNodeID() +
+ "-" + LINK_SEQUENCE_ID.incrementAndGet();
+ } else {
+ // Stable legacy link naming that is used as the address
subscription. This uses the
+ // non-sequence ID version which can allow link stealing to grab a
closing link which
+ // will eventually result in the broker connection being closed and
rebuilt as we see
+ // it as an unexpected link detach but this allows for stable names
and eventually
+ // connection recovery which is arguably less broken than the
sequence ID variant which
+ // creates unstable subscription queues that can be orphaned or
consumed out of order.
+ return "federation-" + federation.getName() +
+ "-address-receiver-" + consumerInfo.getAddress() +
+ "-" + federation.getServer().getNodeID();
+ }
}
@Override
@@ -112,7 +125,6 @@ public final class AMQPFederationAddressConsumer extends
AMQPFederationConsumer
final Receiver protonReceiver =
session.getSession().receiver(generateLinkName());
final Target target = new Target();
final Source source = new Source();
- final String address = consumerInfo.getAddress();
if (RoutingType.ANYCAST.equals(consumerInfo.getRoutingType())) {
source.setCapabilities(AmqpSupport.QUEUE_CAPABILITY);
@@ -133,10 +145,15 @@ public final class AMQPFederationAddressConsumer extends
AMQPFederationConsumer
source.setDefaultOutcome(DEFAULT_OUTCOME);
source.setDurable(TerminusDurability.NONE);
source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
- source.setAddress(address);
source.setFilter(filtersMap);
- target.setAddress(address);
+ if (federation.getCapabilities().isUseFQQNAddressSubscriptions()) {
+ source.setAddress(consumerInfo.getFqqn());
+ } else {
+ source.setAddress(consumerInfo.getAddress()); // Legacy behavior
+ }
+
+ target.setAddress(consumerInfo.getAddress());
final Map<String, Object> addressSourceProperties = new HashMap<>();
// If the remote needs to create the address then it should apply
these
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
index 10f9bb3fa3..3c3e2402bb 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
@@ -436,7 +436,10 @@ public final class AMQPFederationAddressPolicyManager
extends AMQPFederationLoca
}
private String generateQueueName(AddressInfo address) {
- return "federation." + federation.getName() + ".address." +
address.getName() + ".node." + server.getNodeID();
+ return "federation." + federation.getName() +
+ ".policy." + getPolicyName() +
+ ".address." + address.getName() +
+ ".node." + server.getNodeID();
}
private static boolean isAddressInDivertForwards(final SimpleString
targetAddress, final SimpleString forwardAddress) {
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.java
index 11c59c550a..acc5006e84 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.java
@@ -47,6 +47,7 @@ import
org.apache.activemq.artemis.protocol.amqp.proton.SenderController;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
+import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
@@ -82,8 +83,8 @@ public final class AMQPFederationAddressSenderController
extends AMQPFederationS
public ServerConsumer createServerConsumer(ProtonServerSenderContext
senderContext) throws Exception {
final Sender sender = senderContext.getSender();
final Source source = (Source) sender.getRemoteSource();
+ final SimpleString sourceAddress = SimpleString.of(source.getAddress());
final String selector;
- final SimpleString queueName = SimpleString.of(sender.getName());
final Connection protonConnection = session.getSession().getConnection();
// Match the settlement mode of the remote instead of relying on the
default of MIXED.
@@ -126,7 +127,18 @@ public final class AMQPFederationAddressSenderController
extends AMQPFederationS
selector = jmsSelector;
}
- final SimpleString address = SimpleString.of(source.getAddress());
+ final SimpleString address;
+ final SimpleString queueName;
+
+ // Either we have negotiated subscriptions using FQQN or we default to
older behavior based on link names
+ if (CompositeAddress.isFullyQualified(sourceAddress)) {
+ address = CompositeAddress.extractAddressName(sourceAddress);
+ queueName = CompositeAddress.extractQueueName(sourceAddress);
+ } else {
+ address = sourceAddress;
+ queueName = SimpleString.of(sender.getName());
+ }
+
final AddressQueryResult addressQueryResult;
try {
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCapabilities.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCapabilities.java
new file mode 100644
index 0000000000..82b5fd297a
--- /dev/null
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCapabilities.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.connect.federation;
+
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_V1;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_VERSION;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.engine.Link;
+
+/**
+ * Capabilities class that provides a reconciliation between what the remote
offered as compared to what
+ * this federation instance desired in order to determine what features can
and cannot be used.
+ */
+public final class AMQPFederationCapabilities {
+
+ private boolean initialized;
+ private int localVersion = FEDERATION_V1;
+ private int remoteVersion = FEDERATION_V1;
+ private boolean fqqnAddressSubscriptions;
+
+ /**
+ * Initialize the federation versions all federation capabilities using the
state of the opened control
+ * link to match on locally set desired capabilities sent to the remote and
remotely offered capabilities.
+ * <p>
+ * We cannot use any feature that was not indicated as locally desired when
offered by the remote.
+ *
+ * @param controlLink The federation control link on the source or target
side of the connection.
+ *
+ * @return this federation capabilities instance fully initialized.
+ */
+ public AMQPFederationCapabilities initialize(Link controlLink) {
+ if (!initialized) {
+ initialized = true;
+
+ final Map<Symbol, Object> localProperties =
controlLink.getProperties() != null ? controlLink.getProperties() :
Collections.emptyMap();
+ final Map<Symbol, Object> remoteProperties =
controlLink.getRemoteProperties() != null ? controlLink.getRemoteProperties() :
Collections.emptyMap();
+
+ final Object local = localProperties.getOrDefault(FEDERATION_VERSION,
FEDERATION_V1);
+ final Object remote =
remoteProperties.getOrDefault(FEDERATION_VERSION, FEDERATION_V1);
+
+ if (!(local instanceof Integer localVersionNo)) {
+ throw new IllegalArgumentException("Invalid value set on
federation local version number: " + local);
+ } else {
+ this.localVersion = localVersionNo.intValue();
+ }
+
+ if (!(remote instanceof Integer remoteVersionNo)) {
+ throw new IllegalArgumentException("Invalid value sent in
federation remote version number: " + local);
+ } else {
+ this.remoteVersion = remoteVersionNo.intValue();
+ }
+
+ // Remote must be using V2 otherwise it cannot handle FQQN
subscriptions and we need to fall back.
+ if (remoteVersion >= 2) {
+ fqqnAddressSubscriptions = true;
+ }
+ }
+
+ return this;
+ }
+
+ /**
+ * {@return the federation version in use on the local side of the
connection.}
+ */
+ public int getLocalVersion() {
+ return localVersion;
+ }
+
+ /**
+ * {@return the federation version in use on the remote side of the
connection.}
+ */
+ public int getRemoteVersion() {
+ return remoteVersion;
+ }
+
+ /**
+ * {@return <code>true</code> if federation address receivers can use FQQN
source addresses or only legacy style.}
+ */
+ public boolean isUseFQQNAddressSubscriptions() {
+ checkIsInitialized();
+
+ return fqqnAddressSubscriptions;
+ }
+
+ private void checkIsInitialized() {
+ if (!initialized) {
+ throw new IllegalStateException("Cannot check capabilities until this
instance is initialized");
+ }
+ }
+}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCommandProcessor.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCommandProcessor.java
index 9f4d38f44b..34c9d26bdf 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCommandProcessor.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationCommandProcessor.java
@@ -18,6 +18,8 @@
package org.apache.activemq.artemis.protocol.amqp.connect.federation;
import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -44,6 +46,8 @@ import org.slf4j.LoggerFactory;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.OPERATION_TYPE;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADD_QUEUE_POLICY;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_V2;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_VERSION;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADD_ADDRESS_POLICY;
/**
@@ -55,7 +59,7 @@ public class AMQPFederationCommandProcessor extends
ProtonAbstractReceiver {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
// Capabilities that are offered to the remote sender that indicate this
receiver supports the
- // control link functions which allows the link open to complete.
+ // control link functions and any other federation specific feature that
are offered to the remote.
private static final Symbol[] OFFERED_LINK_CAPABILITIES = new Symbol[]
{FEDERATION_CONTROL_LINK};
private static final int PROCESSOR_RECEIVER_CREDITS = 10;
@@ -87,15 +91,22 @@ public class AMQPFederationCommandProcessor extends
ProtonAbstractReceiver {
final Target target = (Target) receiver.getRemoteTarget();
+ if (target == null || !target.getDynamic()) {
+ throw new ActiveMQAMQPInternalErrorException("Remote Target did not
arrive as dynamic node: " + target);
+ }
+
// Match the settlement mode of the remote instead of relying on the
default of MIXED.
receiver.setSenderSettleMode(receiver.getRemoteSenderSettleMode());
// We don't currently support SECOND so enforce that the answer is
always FIRST
receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
- if (target == null || !target.getDynamic()) {
- throw new ActiveMQAMQPInternalErrorException("Remote Target did not
arrive as dynamic node: " + target);
- }
+ // Send the remote the version of AMQP Federation this target broker
implements so it
+ // can match features with us.
+ final Map<Symbol, Object> receiverProperties = new HashMap<>();
+ receiverProperties.put(FEDERATION_VERSION, FEDERATION_V2);
+
+ receiver.setProperties(receiverProperties);
// The target needs a unique address for the remote to send commands to
which will get
// deleted on connection close so no state is retained between
connections, we know our
@@ -103,10 +114,12 @@ public class AMQPFederationCommandProcessor extends
ProtonAbstractReceiver {
// as the address for the dynamic node.
target.setAddress(receiver.getName());
- // We need to offer back that we support control link instructions for
the remote to succeed in
- // opening its sender link.
+ // Send back offered capabilities to advertise this is a control link
command processor.
receiver.setOfferedCapabilities(OFFERED_LINK_CAPABILITIES);
+ // Once we have configured this end of the control link we must
initialize the federation capabilities
+ federation.getCapabilities().initialize(receiver);
+
topUpCreditIfNeeded();
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java
index b40788b048..5d4cfe0d04 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java
@@ -28,6 +28,32 @@ import org.apache.qpid.proton.amqp.Symbol;
*/
public final class AMQPFederationConstants {
+ /**
+ * Property added on the AMQP federation control link that carries a
version associated with the
+ * side of the link the attach frame that carries it came from. A control
link will provide two
+ * versions to the AMQP federation instance, the local side and the remote
side once the link has
+ * fully opened.
+ */
+ public static final Symbol FEDERATION_VERSION =
Symbol.getSymbol("federation_version");
+
+ /**
+ * Default AMQP federation version used when the version is not set on the
control link as that
+ * would indicate all versions prior to the addition of versions being
added on the control link.
+ */
+ public static final int FEDERATION_V1 = 1;
+
+ /**
+ * Version 2 marker for AMQP federation control links. Version 2.0 bump
primarily adjusts the address federation
+ * link names to avoid potential link stealing when demand is quickly added
and removed and as a result the AMQP
+ * source address value uses an FQQN that provides a stable address and
queue binding name for the remote federation
+ * subscription which is recoverable on reconnects or on broker restart.
+ * <p>
+ * Also in this version AMQP federation queue consumers that are included
in federation demand tracking are forwarded
+ * to the remote with their original priority values to avoid a case of
infinite decreasing priority loops that can
+ * occur in some configurations.
+ */
+ public static final int FEDERATION_V2 = 2;
+
/**
* Address used by a remote broker instance to validate that an incoming
federation connection has access rights to
* perform federation operations. The user that connects to the AMQP
federation endpoint and attempts to create the
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
index 4332bd50ec..3181ff1843 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
@@ -89,6 +89,7 @@ public final class AMQPFederationQueueConsumer extends
AMQPFederationConsumer {
private String generateLinkName() {
return "federation-" + federation.getName() +
+ "-policy-" + policy.getPolicyName() +
"-queue-receiver-" + consumerInfo.getFqqn() +
"-" + federation.getServer().getNodeID() + ":" +
LINK_SEQUENCE_ID.getAndIncrement();
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java
index a396b8e80c..c48e0afd1e 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java
@@ -20,11 +20,12 @@ package
org.apache.activemq.artemis.protocol.amqp.connect.federation;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_EVENT_LINK;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_NAME;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_V2;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_VERSION;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONFIGURATION;
import static
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_LINK_INITIALIZER_KEY;
import java.lang.invoke.MethodHandles;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -73,9 +74,9 @@ public class AMQPFederationSource extends AMQPFederation {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- // Capabilities set on the sender link used to send policies or other
control messages to
- // the remote federation target.
- private static final Symbol[] CONTROL_LINK_CAPABILITIES = new Symbol[]
{FEDERATION_CONTROL_LINK};
+ // Capabilities we desire from the target federation instance, some can be
required and others can
+ // be optional, checks must be performed once the remote attaches.
+ private static final Symbol[] CONTROL_LINK_DESIRED_CAPABILITIES = new
Symbol[] {FEDERATION_CONTROL_LINK};
// Capabilities set on the events links used to react to federation
resources updates
private static final Symbol[] EVENT_LINK_CAPABILITIES = new Symbol[]
{FEDERATION_EVENT_LINK};
@@ -89,6 +90,7 @@ public class AMQPFederationSource extends AMQPFederation {
private final Map<String, Object> properties;
private volatile AMQPFederationConfiguration configuration;
+ private volatile AMQPFederationCapabilities capabilities;
/**
* Creates a new AMQP Federation instance that will manage the state of a
single AMQP broker federation instance
@@ -145,6 +147,15 @@ public class AMQPFederationSource extends AMQPFederation {
return configuration;
}
+ @Override
+ public synchronized AMQPFederationCapabilities getCapabilities() {
+ if (!connected) {
+ throw new IllegalStateException("Cannot access connection while
federation is not connected");
+ }
+
+ return capabilities;
+ }
+
/**
* Adds a new {@link FederationReceiveFromQueuePolicy} entry to the set of
policies that the remote end of this
* federation will use to create demand on the this server when local
demand is present.
@@ -595,12 +606,13 @@ public class AMQPFederationSource extends AMQPFederation {
// Send our local configuration data to the remote side of the
control link
// for use when creating remote federation resources.
final Map<Symbol, Object> senderProperties = new HashMap<>();
+ senderProperties.put(FEDERATION_VERSION, FEDERATION_V2);
senderProperties.put(FEDERATION_CONFIGURATION,
configuration.toConfigurationMap());
senderProperties.put(FEDERATION_NAME, getName());
sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
- sender.setDesiredCapabilities(CONTROL_LINK_CAPABILITIES);
+ sender.setDesiredCapabilities(CONTROL_LINK_DESIRED_CAPABILITIES);
sender.setProperties(senderProperties);
sender.setTarget(target);
sender.setSource(new Source());
@@ -635,9 +647,9 @@ public class AMQPFederationSource extends AMQPFederation {
return;
}
- if (!AmqpSupport.verifyOfferedCapabilities(sender)) {
+ if (!AmqpSupport.verifyOfferedCapabilities(sender,
FEDERATION_CONTROL_LINK)) {
brokerConnection.connectError(
-
ActiveMQAMQPProtocolMessageBundle.BUNDLE.missingOfferedCapability(Arrays.toString(CONTROL_LINK_CAPABILITIES)));
+
ActiveMQAMQPProtocolMessageBundle.BUNDLE.missingOfferedCapability(FEDERATION_CONTROL_LINK.toString()));
return;
}
@@ -667,6 +679,7 @@ public class AMQPFederationSource extends AMQPFederation {
session.addSender(sender, senderContext);
+ capabilities = new
AMQPFederationCapabilities().initialize(sender);
connected = true;
// Setup events sender link to the target if there are any
remote policies and
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationTarget.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationTarget.java
index 9c3bb68201..aa0a69ebf1 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationTarget.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationTarget.java
@@ -43,8 +43,9 @@ public class AMQPFederationTarget extends AMQPFederation {
private final AMQPRemoteBrokerConnection brokerConnection;
private final AMQPConnectionContext connection;
private final AMQPFederationConfiguration configuration;
+ private final AMQPFederationCapabilities capabilities;
- public AMQPFederationTarget(AMQPRemoteBrokerConnection brokerConnection,
String name, AMQPFederationConfiguration configuration, AMQPSessionContext
session) {
+ public AMQPFederationTarget(AMQPRemoteBrokerConnection brokerConnection,
String name, AMQPFederationConfiguration configuration,
AMQPFederationCapabilities capabilities, AMQPSessionContext session) {
super(name, brokerConnection.getServer());
Objects.requireNonNull(session, "Provided session instance cannot be
null");
@@ -54,6 +55,7 @@ public class AMQPFederationTarget extends AMQPFederation {
this.connection = session.getAMQPConnectionContext();
this.connection.addLinkRemoteCloseListener(getName(),
this::handleLinkRemoteClose);
this.configuration = configuration;
+ this.capabilities = capabilities;
this.connected = true;
}
@@ -62,6 +64,11 @@ public class AMQPFederationTarget extends AMQPFederation {
return connection;
}
+ @Override
+ public AMQPFederationCapabilities getCapabilities() {
+ return capabilities;
+ }
+
@Override
public AMQPSessionContext getSessionContext() {
return session;
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
index fdfd3c9ae3..d6bdcaaad3 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
@@ -33,6 +33,7 @@ import
org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import
org.apache.activemq.artemis.protocol.amqp.connect.AMQPRemoteBrokerConnection;
import
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederation;
+import
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationCapabilities;
import
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationCommandProcessor;
import
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConfiguration;
import
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationEventDispatcher;
@@ -451,7 +452,8 @@ public class AMQPSessionContext extends ProtonInitializable
{
final AMQPRemoteBrokerConnection brokerConnection =
AMQPRemoteBrokerConnection.getOrCreateRemoteBrokerConnection(server,
connection, protonConnection);
final AMQPFederationConfiguration configuration = new
AMQPFederationConfiguration(connection, federationConfigurationMap);
- final AMQPFederationTarget federation = new
AMQPFederationTarget(brokerConnection, remoteFederationName, configuration,
this);
+ final AMQPFederationCapabilities capabilities = new
AMQPFederationCapabilities();
+ final AMQPFederationTarget federation = new
AMQPFederationTarget(brokerConnection, remoteFederationName, configuration,
capabilities, this);
federation.initialize();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
index 57deec7d04..1764761dc9 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
@@ -32,6 +32,9 @@ import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_EVENT_LINK;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_POLICY_NAME;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_V1;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_V2;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_VERSION;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LARGE_MESSAGE_THRESHOLD;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.OPERATION_TYPE;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.POLICY_NAME;
@@ -49,6 +52,7 @@ import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
import static
org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_LARGE_MESSAGE_FORMAT;
import static
org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.CoreMatchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -58,6 +62,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.startsWith;
import java.lang.invoke.MethodHandles;
import java.net.URI;
@@ -166,8 +171,10 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
.respond()
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
peer.expectAttach().ofReceiver()
.withSenderSettleModeSettled()
@@ -217,6 +224,7 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
containsString(server.getNodeID().toString())))
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(),
expectedSourceProperties)
.withProperty(FEDERATION_POLICY_NAME.toString(),
"address-policy")
+
.withSource().withAddress(startsWith("test::federation." + getTestName() +
".policy.address-policy.")).and()
.respond()
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
peer.expectFlow().withLinkCredit(1000);
@@ -260,6 +268,347 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
}
}
+ @Test
+ @Timeout(20)
+ public void
testFederationCreatesReceiverWithoutFQQNWhenLocalQueueIsStaticlyDefinedAndFQQNNotSupportedOnRemote()
throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
+
.withDesiredCapabilities(FEDERATION_CONTROL_LINK.toString())
+ .respond()
+
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString()); // No FQQN for
address subscriptions
+ peer.expectAttach().ofReceiver()
+ .withSenderSettleModeSettled()
+ .withSource().withDynamic(true)
+ .and()
+
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
+ .respondInKind()
+ .withTarget().withAddress("test-dynamic-events");
+ peer.expectFlow().withLinkCredit(10);
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Test started, peer listening on: {}", remoteURI);
+
+ final AMQPFederationAddressPolicyElement receiveFromAddress = new
AMQPFederationAddressPolicyElement();
+ receiveFromAddress.setName("address-policy");
+ receiveFromAddress.addToIncludes(getTestName());
+ receiveFromAddress.setAutoDelete(false);
+ receiveFromAddress.setAutoDeleteDelay(-1L);
+ receiveFromAddress.setAutoDeleteMessageCount(-1L);
+
+ final AMQPFederatedBrokerConnectionElement element = new
AMQPFederatedBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addLocalAddressPolicy(receiveFromAddress);
+ element.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.addElement(element);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final Map<String, Object> expectedSourceProperties = new HashMap<>();
+ expectedSourceProperties.put(ADDRESS_AUTO_DELETE, false);
+ expectedSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, -1L);
+ expectedSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
+
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName(allOf(containsString(getTestName()),
+
not(containsString("address-policy")),
+ containsString("address-receiver"),
+
containsString(server.getNodeID().toString())))
+
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(),
expectedSourceProperties)
+ .withProperty(FEDERATION_POLICY_NAME.toString(),
"address-policy")
+ .withSource().withAddress(getTestName()).and() //
Legacy address only
+ .respond()
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
+ peer.expectFlow().withLinkCredit(1000);
+
+
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.MULTICAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of(getTestName())).isExists());
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void
testFederationCreatesReceiverWithStbleFQQNWhenLocalDemandIsAppliedRepeatedly()
throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .respond()
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
+
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+ peer.expectAttach().ofReceiver()
+ .withSenderSettleModeSettled()
+ .withSource().withDynamic(true)
+ .and()
+
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
+ .respondInKind()
+ .withTarget().withAddress("test-dynamic-events");
+ peer.expectFlow().withLinkCredit(10);
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Test started, peer listening on: {}", remoteURI);
+
+ final AMQPFederationAddressPolicyElement receiveFromAddress = new
AMQPFederationAddressPolicyElement();
+ receiveFromAddress.setName("address-policy");
+ receiveFromAddress.addToIncludes(getTestName());
+ receiveFromAddress.setAutoDelete(false);
+ receiveFromAddress.setAutoDeleteDelay(-1L);
+ receiveFromAddress.setAutoDeleteMessageCount(-1L);
+
+ final AMQPFederatedBrokerConnectionElement element = new
AMQPFederatedBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addLocalAddressPolicy(receiveFromAddress);
+ element.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.addElement(element);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final Map<String, Object> expectedSourceProperties = new HashMap<>();
+ expectedSourceProperties.put(ADDRESS_AUTO_DELETE, false);
+ expectedSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, -1L);
+ expectedSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
+
+ final AtomicReference<String> capturedSourceAddress1 = new
AtomicReference<>();
+ final AtomicReference<String> capturedSourceAddress2 = new
AtomicReference<>();
+
+ peer.expectAttach().ofReceiver()
+ .withCapture((attach) ->
capturedSourceAddress1.set(attach.getSource().getAddress()))
+
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName(allOf(containsString(getTestName()),
+ containsString("address-policy"),
+ containsString("address-receiver"),
+
containsString(server.getNodeID().toString())))
+
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(),
expectedSourceProperties)
+ .withProperty(FEDERATION_POLICY_NAME.toString(),
"address-policy")
+ .withSource().withAddress(startsWith(getTestName()
+ "::federation." + getTestName())).and()
+ .respond()
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
+ peer.expectFlow().withLinkCredit(1000);
+
+
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.MULTICAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of(getTestName())).isExists());
+
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ // Should be no frames generated as we already federated the address
and the statically added
+ // queue should retain demand when this consumer leaves.
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+
+ session.createConsumer(session.createTopic(getTestName()));
+ session.createConsumer(session.createTopic(getTestName()));
+
+ connection.start();
+ }
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(1000).withDrain(true)
+ .respond()
+
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+ peer.expectDetach().respond();
+
+ // This should trigger the federation consumer to be shutdown as the
statically defined queue
+ // should be the only remaining demand on the address.
+ logger.info("Removing Queues from federated address to eliminate
demand");
+ server.destroyQueue(SimpleString.of(getTestName()));
+ Wait.assertFalse(() ->
server.queueQuery(SimpleString.of(getTestName())).isExists());
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofReceiver()
+ .withCapture((attach) ->
capturedSourceAddress2.set(attach.getSource().getAddress()))
+
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName(allOf(containsString(getTestName()),
+ containsString("address-policy"),
+ containsString("address-receiver"),
+
containsString(server.getNodeID().toString())))
+
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(),
expectedSourceProperties)
+ .withProperty(FEDERATION_POLICY_NAME.toString(),
"address-policy")
+ .withSource().withAddress(startsWith(getTestName()
+ "::federation." + getTestName())).and()
+ .respond()
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
+ peer.expectFlow().withLinkCredit(1000);
+
+ // Create new demand and ensure the attach carries a stable FQQN
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+
+ session.createConsumer(session.createTopic(getTestName()));
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(1000).withDrain(true)
+ .respond()
+
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+ peer.expectDetach().respond();
+
+ connection.start();
+ }
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ // Each connect to the remote should use a stable source address
+ assertNotNull(capturedSourceAddress1.get());
+ assertNotNull(capturedSourceAddress2.get());
+ assertEquals(capturedSourceAddress1.get(),
capturedSourceAddress2.get());
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void
testFederationUsesStableFQQNForAddressConsumersAcrossConnections() throws
Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
+ .respond()
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
+
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+ peer.expectAttach().ofReceiver()
+ .withSenderSettleModeSettled()
+ .withSource().withDynamic(true)
+ .and()
+
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
+ .respondInKind()
+ .withTarget().withAddress("test-dynamic-events");
+ peer.expectFlow().withLinkCredit(10);
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Test started, peer listening on: {}", remoteURI);
+
+ final AMQPFederationAddressPolicyElement receiveFromAddress = new
AMQPFederationAddressPolicyElement();
+ receiveFromAddress.setName("address-policy");
+ receiveFromAddress.addToIncludes(getTestName());
+ receiveFromAddress.setAutoDelete(false);
+ receiveFromAddress.setAutoDeleteDelay(-1L);
+ receiveFromAddress.setAutoDeleteMessageCount(-1L);
+
+ final AMQPFederatedBrokerConnectionElement element = new
AMQPFederatedBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addLocalAddressPolicy(receiveFromAddress);
+ element.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(10);
+ amqpConnection.setRetryInterval(10);
+ amqpConnection.addElement(element);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final AtomicReference<String> capturedSourceAddress1 = new
AtomicReference<>();
+ final AtomicReference<String> capturedSourceAddress2 = new
AtomicReference<>();
+
+ peer.expectAttach().ofReceiver()
+ .withCapture((attach) ->
capturedSourceAddress1.set(attach.getSource().getAddress()))
+
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName(allOf(containsString(getTestName()),
+ containsString("address-policy"),
+ containsString("address-receiver"),
+
containsString(server.getNodeID().toString())))
+ .withProperty(FEDERATION_POLICY_NAME.toString(),
"address-policy")
+ .withSource().withAddress(startsWith(getTestName()
+ "::federation." + getTestName())).and()
+ .respond()
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
+ peer.expectFlow().withLinkCredit(1000);
+
+
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.MULTICAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of(getTestName())).isExists());
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose().optional();
+ peer.expectConnectionToDrop();
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
+ .respond()
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
+
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+ peer.expectAttach().ofReceiver()
+ .withSenderSettleModeSettled()
+ .withSource().withDynamic(true)
+ .and()
+
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
+ .respondInKind()
+ .withTarget().withAddress("test-dynamic-events");
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectAttach().ofReceiver()
+ .withCapture((attach) ->
capturedSourceAddress2.set(attach.getSource().getAddress()))
+
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName(allOf(containsString(getTestName()),
+ containsString("address-policy"),
+ containsString("address-receiver"),
+
containsString(server.getNodeID().toString())))
+ .withProperty(FEDERATION_POLICY_NAME.toString(),
"address-policy")
+ .withSource().withAddress(startsWith(getTestName()
+ "::federation." + getTestName())).and()
+ .respond()
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
+ peer.expectFlow().withLinkCredit(1000);
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ // Each connect to the remote should use a stable source address
+ assertNotNull(capturedSourceAddress1.get());
+ assertNotNull(capturedSourceAddress2.get());
+ assertEquals(capturedSourceAddress1.get(),
capturedSourceAddress2.get());
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+ }
+ }
+
@Test
@Timeout(20)
public void testFederationCreatesAddressReceiverLinkForAddressMatch()
throws Exception {
@@ -1812,6 +2161,78 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
}
}
+ @Test
+ @Timeout(20)
+ public void
testRemoteFederatesAddressWhenDemandIsAppliedUsingFQQNWhenSupported() throws
Exception {
+ server.start();
+
+ final List<String> includes = new ArrayList<>();
+ includes.add("address1");
+
+ final Map<String, Object> properties = new HashMap<>();
+ properties.put(ADDRESS_RECEIVER_IDLE_TIMEOUT, 5);
+
+ final FederationReceiveFromAddressPolicy policy =
+ new FederationReceiveFromAddressPolicy("test-address-policy",
+ true, 30_000L, 1000L, 1, true,
+ includes, null, properties,
null,
+
DEFAULT_WILDCARD_CONFIGURATION);
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, "test", true);
+ peer.connect("localhost", AMQP_PORT);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDisposition().withSettled(true).withState().accepted();
+
+ sendAddresPolicyToRemote(peer, policy);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+
.withSource().withAddress(startsWith("address1::federation.")) // FQQN prefix
+ .and()
+ .respondInKind(); // Server detected demand
+ peer.expectFlow().withLinkCredit(1000);
+ peer.remoteTransfer().withBody().withString("test-message")
+ .also()
+ .withDeliveryId(1)
+ .queue();
+ peer.expectDisposition().withSettled(true).withState().accepted();
+
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer =
session.createConsumer(session.createTopic("address1"));
+
+ connection.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final Message message = consumer.receive(5_000);
+ assertNotNull(message);
+ assertInstanceOf(TextMessage.class, message);
+ assertEquals("test-message", ((TextMessage) message).getText());
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(999).withDrain(true)
+ .respond()
+
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+ peer.expectDetach(); // demand will be gone and receiver link
should close.
+ }
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+
+ server.stop();
+ }
+ }
+
@Test
@Timeout(20)
public void
testRemoteFederatesAddressWhenDemandIsAppliedUsingControllerDefinedLinkCredit()
throws Exception {
@@ -4304,7 +4725,9 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
.respond()
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
peer.expectAttach().ofReceiver()
.withSenderSettleModeSettled()
@@ -4589,7 +5012,9 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
- .respondInKind();
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
+ .respondInKind()
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2);
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
.respondInKind();
@@ -4890,6 +5315,117 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
}
}
+ @Test
+ @Timeout(20)
+ public void
testLegacyStolenLinkHandlesEventualDetachResponseByTerminatingBrokerConnection()
throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .respond()
+
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+ peer.expectAttach().ofReceiver()
+ .withSenderSettleModeSettled()
+ .withSource().withDynamic(true)
+ .and()
+
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
+ .respondInKind()
+ .withTarget().withAddress("test-dynamic-events");
+ peer.expectFlow().withLinkCredit(10);
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Test started, peer listening on: {}", remoteURI);
+
+ final AMQPFederationAddressPolicyElement receiveFromAddress = new
AMQPFederationAddressPolicyElement();
+ receiveFromAddress.setName("address-policy");
+ receiveFromAddress.addToIncludes(getTestName());
+
+ final AMQPFederatedBrokerConnectionElement element = new
AMQPFederatedBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addLocalAddressPolicy(receiveFromAddress);
+ element.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(1);
+ amqpConnection.setRetryInterval(20);
+ amqpConnection.addElement(element);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName(allOf(containsString(getTestName()),
+ containsString("address-receiver"),
+
containsString(server.getNodeID().toString())))
+ .respond()
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
+ peer.expectFlow().withLinkCredit(1000);
+
+ // Create address and binding which creates demand.
+
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.MULTICAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(1000).withDrain(true)
+ .respond()
+
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+ peer.expectDetach(); // No response yet as we want to evaluate
eventual response.
+
+ // Remove demand which should create a detach cycle.
+ server.destroyQueue(SimpleString.of(getTestName()));
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().optional(); // Proton sends flow on detached link
that was stolen.
+ peer.expectConnectionToDrop();
+
+ // Create queue again which creates demand again rapidly and steals
the link
+
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.MULTICAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
+ peer.remoteDetach().withHandle(2).later(20);
+
+ // Eventually when the detach arrives the link detach is treated as
unexpected because it
+ // is now attached to a new federation address consumer instance and
the broker connection
+ // is terminated and rebuilt.
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .respond()
+
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+ peer.expectAttach().ofReceiver()
+ .withSenderSettleModeSettled()
+ .withSource().withDynamic(true)
+ .and()
+
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
+ .respondInKind()
+ .withTarget().withAddress("test-dynamic-events");
+ peer.expectFlow().withLinkCredit(10);
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName(allOf(containsString(getTestName()),
+ containsString("address-receiver"),
+
containsString(server.getNodeID().toString())))
+ .respond()
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
+ peer.expectFlow().withLinkCredit(1000);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+ }
+ }
+
private static void sendAddressAddedEvent(ProtonTestPeer peer, String
address, int handle, int deliveryId) {
final Map<String, Object> eventMap = new LinkedHashMap<>();
eventMap.put(REQUESTED_ADDRESS_NAME, address);
@@ -4975,15 +5511,19 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
scriptFederationConnectToRemote(peer, federationName,
AmqpSupport.AMQP_CREDITS_DEFAULT, AmqpSupport.AMQP_LOW_CREDITS_DEFAULT);
}
+ private static void scriptFederationConnectToRemote(ProtonTestClient peer,
String federationName, boolean fqqnSubscriptions) {
+ scriptFederationConnectToRemote(peer, federationName,
AmqpSupport.AMQP_CREDITS_DEFAULT, AmqpSupport.AMQP_LOW_CREDITS_DEFAULT, false,
false, fqqnSubscriptions);
+ }
+
private static void scriptFederationConnectToRemote(ProtonTestClient peer,
String federationName, int amqpCredits, int amqpLowCredits) {
- scriptFederationConnectToRemote(peer, federationName, amqpCredits,
amqpLowCredits, false, false);
+ scriptFederationConnectToRemote(peer, federationName, amqpCredits,
amqpLowCredits, false, false, false);
}
private static void scriptFederationConnectToRemote(ProtonTestClient peer,
String federationName, boolean eventsSender, boolean eventsReceiver) {
- scriptFederationConnectToRemote(peer, federationName,
AmqpSupport.AMQP_CREDITS_DEFAULT, AmqpSupport.AMQP_LOW_CREDITS_DEFAULT,
eventsSender, eventsReceiver);
+ scriptFederationConnectToRemote(peer, federationName,
AmqpSupport.AMQP_CREDITS_DEFAULT, AmqpSupport.AMQP_LOW_CREDITS_DEFAULT,
eventsSender, eventsReceiver, false);
}
- private static void scriptFederationConnectToRemote(ProtonTestClient peer,
String federationName, int amqpCredits, int amqpLowCredits, boolean
eventsSender, boolean eventsReceiver) {
+ private static void scriptFederationConnectToRemote(ProtonTestClient peer,
String federationName, int amqpCredits, int amqpLowCredits, boolean
eventsSender, boolean eventsReceiver, boolean fqqnAddressSubs) {
final String federationControlLinkName = "Federation:control:" +
UUID.randomUUID().toString();
final Map<String, Object> federationConfiguration = new HashMap<>();
@@ -4992,6 +5532,7 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
final Map<String, Object> senderProperties = new HashMap<>();
senderProperties.put(FEDERATION_CONFIGURATION.toString(),
federationConfiguration);
+ senderProperties.put(FEDERATION_VERSION.toString(), fqqnAddressSubs ?
FEDERATION_V2 : FEDERATION_V1);
peer.queueClientSaslAnonymousConnect();
peer.remoteOpen().queue();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
index 40dc05f7eb..a2b7b53920 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
@@ -37,6 +37,9 @@ import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_EVENTS_LINK_PREFIX;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_EVENT_LINK;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_NAME;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_V1;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_V2;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_VERSION;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LARGE_MESSAGE_THRESHOLD;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LINK_ATTACH_TIMEOUT;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.OPERATION_TYPE;
@@ -273,6 +276,7 @@ public class AMQPFederationConnectTest extends
AmqpClientTestSupport {
peer.expectAttach().ofSender()
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
.withName(allOf(containsString("federation-"),
containsString("myFederation")))
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
.withProperty(FEDERATION_NAME.toString(),
"myFederation")
.withProperty(FEDERATION_CONFIGURATION.toString(),
federationConfiguration)
.withTarget().withDynamic(true)
@@ -281,7 +285,8 @@ public class AMQPFederationConnectTest extends
AmqpClientTestSupport {
.respond()
.withTarget().withAddress(controlLinkAddress)
.and()
-
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString())
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2);
peer.start();
final URI remoteURI = peer.getServerURI();
@@ -314,6 +319,45 @@ public class AMQPFederationConnectTest extends
AmqpClientTestSupport {
}
}
+ @Test
+ @Timeout(20)
+ public void
testFederationCreatesControlLinkAndContinuesIfConnectedToOldVersion() throws
Exception {
+ final String controlLinkAddress = "test-control-address";
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
+ .respond()
+ .withTarget().withAddress(controlLinkAddress)
+ .and()
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V1)
+
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+ AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.addElement(new
AMQPFederatedBrokerConnectionElement(getTestName()));
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ Wait.assertTrue(() ->
server.locateQueue(FEDERATION_BASE_VALIDATION_ADDRESS +
+ "." +
FEDERATION_CONTROL_LINK_PREFIX +
+ "." + controlLinkAddress) !=
null);
+
+ peer.close();
+ }
+ }
+
@Test
@Timeout(20)
public void
testFederationCreatesControlLinkAndClosesConnectionIfCapabilityIsAbsent()
throws Exception {
@@ -381,6 +425,10 @@ public class AMQPFederationConnectTest extends
AmqpClientTestSupport {
peer.waitForScriptToComplete(10, TimeUnit.SECONDS);
+ Wait.assertTrue(() ->
server.locateQueue(FEDERATION_BASE_VALIDATION_ADDRESS +
+ "." +
FEDERATION_CONTROL_LINK_PREFIX +
+ ".dynamic-name") != null);
+
server.stop();
}
}
@@ -1325,6 +1373,7 @@ public class AMQPFederationConnectTest extends
AmqpClientTestSupport {
peer.remoteAttach().ofSender()
.withName(federationControlLinkName)
.withDesiredCapabilities(FEDERATION_CONTROL_LINK.toString())
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
.withSenderSettleModeUnsettled()
.withReceivervSettlesFirst()
.withSource().also()
@@ -1340,7 +1389,8 @@ public class AMQPFederationConnectTest extends
AmqpClientTestSupport {
.withTarget()
.withAddress(notNullValue())
.also()
-
.withOfferedCapability(FEDERATION_CONTROL_LINK.toString());
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
+
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
peer.expectFlow();
if (eventsSender) {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
index 3e912931b1..f9cea5cdca 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
@@ -56,6 +56,7 @@ import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFedera
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import
org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConsumerControlType;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
@@ -1620,4 +1621,190 @@ public class AMQPFederationServerToServerTest extends
AmqpClientTestSupport {
Wait.assertTrue(() ->
remoteServer.queueQuery(SimpleString.of(getTestName())).getConsumerCount() ==
2, 10_000);
}
}
+
+ @Test
+ @Timeout(20)
+ public void
testAddressFederationWithRestartReceivesMessagesSentToAddressWhileOffline()
throws Exception {
+ logger.info("Test started: {}", getTestName());
+
+ final AMQPFederationAddressPolicyElement localAddressPolicy = new
AMQPFederationAddressPolicyElement();
+ localAddressPolicy.setName("test-policy");
+ localAddressPolicy.addToIncludes(getTestName());
+ localAddressPolicy.setAutoDelete(false);
+ localAddressPolicy.setAutoDeleteDelay(-1L);
+ localAddressPolicy.setAutoDeleteMessageCount(-1L);
+
+ final AMQPFederatedBrokerConnectionElement element = new
AMQPFederatedBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addLocalAddressPolicy(localAddressPolicy);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:"
+ SERVER_PORT_REMOTE);
+ amqpConnection.setReconnectAttempts(10);// Limit reconnects
+ amqpConnection.addElement(element);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ remoteServer.start();
+ server.start();
+
+ final ConnectionFactory factoryLocal =
CFUtil.createConnectionFactory("AMQP", "failover:(amqp://localhost:" +
SERVER_PORT + ")");
+ final ConnectionFactory factoryRemote =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT_REMOTE);
+
+ final Connection connectionL = factoryLocal.createConnection();
+ final Connection connectionR = factoryRemote.createConnection();
+
+ connectionL.setClientID("durableClientId");
+
+ final Session sessionL =
connectionL.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Session sessionR =
connectionR.createSession(Session.AUTO_ACKNOWLEDGE);
+
+ final Topic topic = sessionL.createTopic(getTestName());
+
+ final MessageConsumer consumerL = sessionL.createDurableConsumer(topic,
"durable");
+
+ connectionL.start();
+ connectionR.start();
+
+ // Demand on local address should trigger receiver on remote.
+ Wait.assertTrue(() ->
server.addressQuery(SimpleString.of(getTestName())).isExists());
+ Wait.assertTrue(() ->
remoteServer.addressQuery(SimpleString.of(getTestName())).isExists());
+
+ // Captures state of JMS consumers and federation consumers attached on
each node
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of(getTestName()),
false).getQueueNames().size() >= 1);
+ Wait.assertTrue(() ->
remoteServer.bindingQuery(SimpleString.of(getTestName()),
false).getQueueNames().size() >= 1);
+
+ final MessageProducer producerR = sessionR.createProducer(topic);
+ final TextMessage message = sessionR.createTextMessage("Hello World");
+
+ message.setStringProperty("testProperty", "testValue");
+ producerR.send(message);
+
+ final Message received1 = consumerL.receive(5_000);
+
+ assertNotNull(received1);
+ assertInstanceOf(TextMessage.class, received1);
+ assertEquals("Hello World", ((TextMessage) received1).getText());
+ assertTrue(message.propertyExists("testProperty"));
+ assertEquals("testValue", received1.getStringProperty("testProperty"));
+
+ server.stop();
+
+ // Send message to federated address while server 1 is offline
+ message.setStringProperty("testProperty", "testValueTwo");
+ producerR.send(message);
+
+ server.start();
+
+ // check federation has reconnected
+ Wait.assertTrue(() ->
server.getManagementService().getResources(AMQPFederationConsumerControlType.class).length
== 1);
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of(getTestName()),
false).getQueueNames().size() >= 1);
+ Wait.assertTrue(() ->
remoteServer.bindingQuery(SimpleString.of(getTestName()),
false).getQueueNames().size() >= 1);
+
+ final Message received2 = consumerL.receive(5_000);
+
+ assertNotNull(received2);
+ assertInstanceOf(TextMessage.class, received2);
+ assertEquals("Hello World", ((TextMessage) received2).getText());
+ assertTrue(message.propertyExists("testProperty"));
+ assertEquals("testValueTwo",
received2.getStringProperty("testProperty"));
+
+ connectionL.close();
+ connectionR.close();
+
+ server.stop();
+ remoteServer.stop();
+ }
+
+ @Test
+ @Timeout(20)
+ public void
testRemoteAddressFederationWithRestartReceivesMessagesSentToAddressWhileOffline()
throws Exception {
+ logger.info("Test started: {}", getTestName());
+
+ final AMQPFederationAddressPolicyElement remoteAddressPolicy = new
AMQPFederationAddressPolicyElement();
+ remoteAddressPolicy.setName("test-policy");
+ remoteAddressPolicy.addToIncludes(getTestName());
+ remoteAddressPolicy.setAutoDelete(false);
+ remoteAddressPolicy.setAutoDeleteDelay(-1L);
+ remoteAddressPolicy.setAutoDeleteMessageCount(-1L);
+
+ final AMQPFederatedBrokerConnectionElement element = new
AMQPFederatedBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addRemoteAddressPolicy(remoteAddressPolicy);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:"
+ SERVER_PORT_REMOTE);
+ amqpConnection.setReconnectAttempts(10);// Limit reconnects
+ amqpConnection.addElement(element);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ remoteServer.start();
+ server.start();
+
+ final ConnectionFactory factoryLocal =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT);
+ final ConnectionFactory factoryRemote =
CFUtil.createConnectionFactory("AMQP", "failover:(amqp://localhost:" +
SERVER_PORT_REMOTE + ")");
+
+ final Connection connectionL = factoryLocal.createConnection();
+ final Connection connectionR = factoryRemote.createConnection();
+
+ connectionR.setClientID("durableClientId");
+
+ final Session sessionL =
connectionL.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Session sessionR =
connectionR.createSession(Session.AUTO_ACKNOWLEDGE);
+
+ final Topic topic = sessionR.createTopic(getTestName());
+
+ final MessageConsumer consumerR = sessionR.createDurableConsumer(topic,
"durable");
+
+ connectionL.start();
+ connectionR.start();
+
+ // Demand on local address should trigger receiver on remote.
+ Wait.assertTrue(() ->
server.addressQuery(SimpleString.of(getTestName())).isExists());
+ Wait.assertTrue(() ->
remoteServer.addressQuery(SimpleString.of(getTestName())).isExists());
+
+ // Captures state of JMS consumers and federation consumers attached on
each node
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of(getTestName()),
false).getQueueNames().size() >= 1);
+ Wait.assertTrue(() ->
remoteServer.bindingQuery(SimpleString.of(getTestName()),
false).getQueueNames().size() >= 1);
+
+ final MessageProducer producerL = sessionL.createProducer(topic);
+ final TextMessage message = sessionL.createTextMessage("Hello World");
+
+ message.setStringProperty("testProperty", "testValue");
+ producerL.send(message);
+
+ final Message received1 = consumerR.receive(5_000);
+
+ assertNotNull(received1);
+ assertInstanceOf(TextMessage.class, received1);
+ assertEquals("Hello World", ((TextMessage) received1).getText());
+ assertTrue(message.propertyExists("testProperty"));
+ assertEquals("testValue", received1.getStringProperty("testProperty"));
+
+ remoteServer.stop();
+
+ // Send message to federated address while server 1 is offline
+ message.setStringProperty("testProperty", "testValueTwo");
+ producerL.send(message);
+
+ remoteServer.start();
+
+ // check federation has reconnected
+ Wait.assertTrue(() ->
remoteServer.getManagementService().getResources(AMQPFederationConsumerControlType.class).length
== 1);
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of(getTestName()),
false).getQueueNames().size() >= 1);
+ Wait.assertTrue(() ->
remoteServer.bindingQuery(SimpleString.of(getTestName()),
false).getQueueNames().size() >= 1);
+
+ final Message received2 = consumerR.receive(5_000);
+
+ assertNotNull(received2);
+ assertInstanceOf(TextMessage.class, received2);
+ assertEquals("Hello World", ((TextMessage) received2).getText());
+ assertTrue(message.propertyExists("testProperty"));
+ assertEquals("testValueTwo",
received2.getStringProperty("testProperty"));
+
+ connectionL.close();
+ connectionR.close();
+
+ server.stop();
+ remoteServer.stop();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact