This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new cfffb46149 ARTEMIS-5368 Propagate Federation consumer priority
unchanged
cfffb46149 is described below
commit cfffb46149a66a5aedeb5edde017472e5e8d6d71
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Mar 27 14:52:36 2025 -0400
ARTEMIS-5368 Propagate Federation consumer priority unchanged
When passing along a federation queue consumer that matches the includes
of a federation queue policy retain the original consumer priority so
that if the consumers loop the propagation will stop at the source that
started the loop.
---
.../AMQPFederationAddressPolicyManager.java | 16 +-
.../federation/AMQPFederationConsumerManager.java | 42 +++++-
.../AMQPFederationQueuePolicyManager.java | 165 ++++++++++++---------
.../artemis/core/server/ServerConsumer.java | 40 +++++
.../core/server/impl/ServerConsumerImpl.java | 36 +++++
.../connect/AMQPFederationQueuePolicyTest.java | 104 ++++++++++++-
.../connect/AMQPFederationServerToServerTest.java | 25 +++-
.../tests/integration/cli/DummyServerConsumer.java | 22 +++
8 files changed, 362 insertions(+), 88 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
index 04540e7d84..10f9bb3fa3 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
@@ -93,7 +93,7 @@ public final class AMQPFederationAddressPolicyManager extends
AMQPFederationLoca
@Override
public synchronized void afterRemoveAddress(SimpleString address,
AddressInfo addressInfo) throws ActiveMQException {
if (isActive()) {
- final AMQPFederationConsumerManager entry =
federationConsumers.remove(address.toString());
+ final AMQPFederationAddressConsumerManager entry =
federationConsumers.remove(address.toString());
if (entry != null) {
logger.trace("Federated address {} was removed, closing federation
consumer", address);
@@ -357,7 +357,7 @@ public final class AMQPFederationAddressPolicyManager
extends AMQPFederationLoca
// current demand and don't need to re-check the server state before
trying to create the
// remote address consumer.
if (isActive() && testIfAddressMatchesPolicy(addressName,
RoutingType.MULTICAST)) {
- final AMQPFederationConsumerManager entry =
federationConsumers.get(addressName);
+ final AMQPFederationAddressConsumerManager entry =
federationConsumers.get(addressName);
if (entry != null) {
entry.recover();
@@ -451,7 +451,7 @@ public final class AMQPFederationAddressPolicyManager
extends AMQPFederationLoca
return false;
}
- private static class AMQPFederationAddressConsumerManager extends
AMQPFederationConsumerManager {
+ private static class AMQPFederationAddressConsumerManager extends
AMQPFederationConsumerManager<Binding> {
private final AMQPFederationAddressPolicyManager manager;
private final AddressInfo addressInfo;
@@ -486,5 +486,15 @@ public final class AMQPFederationAddressPolicyManager
extends AMQPFederationLoca
protected boolean isPluginBlockingFederationConsumerCreate() {
return manager.isPluginBlockingFederationConsumerCreate(addressInfo);
}
+
+ @Override
+ protected void whenDemandTrackingEntryAdded(Binding entry) {
+ // No current action needed
+ }
+
+ @Override
+ protected void whenDemandTrackingEntryRemoved(Binding entry) {
+ // No current action needed
+ }
}
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerManager.java
index 6c0577ed84..0c8f9968ed 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerManager.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerManager.java
@@ -32,8 +32,10 @@ import org.slf4j.LoggerFactory;
* <p>
* All interactions with the consumer tracking entry should occur under the
lock of the parent manager instance and this
* manager will perform any asynchronous work with a lock held on the parent
manager instance.
+ *
+ * @param <E> The type used in the demand tracking collection.
*/
-public abstract class AMQPFederationConsumerManager {
+public abstract class AMQPFederationConsumerManager<E> {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -48,7 +50,7 @@ public abstract class AMQPFederationConsumerManager {
private final AMQPFederation federation;
private final AMQPFederationLocalPolicyManager manager;
- private final Set<Object> demandTracking = new HashSet<>();
+ private final Set<E> demandTracking = new HashSet<>();
private State state = State.READY;
private AMQPFederationConsumer consumer;
@@ -94,6 +96,14 @@ public abstract class AMQPFederationConsumerManager {
} finally {
consumer = null;
+ demandTracking.forEach(entry -> {
+ try {
+ whenDemandTrackingEntryRemoved(entry);
+ } catch (Exception ignore) {
+ }
+ });
+ demandTracking.clear();
+
// Stop handler in closed state won't schedule idle timeout, ensure
// any pending task is canceled just for proper cleanup purposes.
if (pendingIdleTimeout != null) {
@@ -135,10 +145,12 @@ public abstract class AMQPFederationConsumerManager {
*
* @param demand A new unit of demand to add to this consumer manager.
*/
- public void addDemand(Object demand) {
+ public void addDemand(E demand) {
checkClosed();
- demandTracking.add(demand);
+ if (demandTracking.add(demand)) {
+ whenDemandTrackingEntryAdded(demand);
+ }
// This will create a new consumer only if there isn't one currently
assigned to this entry and any configured
// federation plugins don't block it from doing so. An already stopping
consumer will check on stop if it should
@@ -159,10 +171,12 @@ public abstract class AMQPFederationConsumerManager {
*
* @param demand The element of demand that should be removed from tracking.
*/
- public void removeDemand(Object demand) {
+ public void removeDemand(E demand) {
checkClosed();
- demandTracking.remove(demand);
+ if (demandTracking.remove(demand)) {
+ whenDemandTrackingEntryRemoved(demand);
+ }
if (hasDemand() || state == State.READY) {
return;
@@ -372,6 +386,22 @@ public abstract class AMQPFederationConsumerManager {
}
}
+ /**
+ * An event point that a subclass can use to perform an initialization
action whenever an entry is added to
+ * demand tracking.
+ *
+ * @param entry The entry that is being added to demand tracking.
+ */
+ protected abstract void whenDemandTrackingEntryAdded(E entry);
+
+ /**
+ * An event point that a subclass can use to perform a cleanup action
whenever an entry is removed from
+ * demand tracking.
+ *
+ * @param entry The entry that is being removed from demand tracking.
+ */
+ protected abstract void whenDemandTrackingEntryRemoved(E entry);
+
/**
* Creates a new federation consumer that this manager will monitor and
maintain. The returned consumer should be in
* an initial state ready for this manager to initialize once it is fully
configured.
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java
index 412004fb89..d6c5bb31ab 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java
@@ -23,15 +23,13 @@ import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
-import java.util.function.Predicate;
+import java.util.UUID;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
@@ -52,17 +50,22 @@ public final class AMQPFederationQueuePolicyManager extends
AMQPFederationLocalP
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- protected final Predicate<ServerConsumer> federationConsumerMatcher;
protected final FederationReceiveFromQueuePolicy policy;
protected final Map<FederationConsumerInfo,
AMQPFederationQueueConsumerManager> federationConsumers = new HashMap<>();
+ /*
+ * Unique for the lifetime of this policy manager which is either the
lifetime of the broker or until the
+ * configuration is reloaded and this federation instance happens to be
updated but not removed in which
+ * case the full federation instance is shutdown and then removed and
re-added as if it was new.
+ */
+ private final String CONSUMER_INFO_ATTACHMENT_KEY =
UUID.randomUUID().toString();
+
public AMQPFederationQueuePolicyManager(AMQPFederation federation,
AMQPFederationMetrics metrics, FederationReceiveFromQueuePolicy queuePolicy)
throws ActiveMQException {
super(federation, metrics, queuePolicy);
Objects.requireNonNull(queuePolicy, "The Queue match policy cannot be
null");
this.policy = queuePolicy;
- this.federationConsumerMatcher = createFederationConsumerMatcher(server,
queuePolicy);
}
/**
@@ -100,12 +103,12 @@ public final class AMQPFederationQueuePolicyManager
extends AMQPFederationLocalP
@Override
public synchronized void afterCloseConsumer(ServerConsumer consumer,
boolean failed) {
if (isActive()) {
- final FederationConsumerInfo consumerInfo =
createConsumerInfo(consumer);
- final AMQPFederationQueueConsumerManager entry =
federationConsumers.get(consumerInfo);
+ final FederationConsumerInfo consumerInfo = (FederationConsumerInfo)
consumer.getAttachment(CONSUMER_INFO_ATTACHMENT_KEY);
- if (entry != null) {
+ if (consumerInfo != null &&
federationConsumers.containsKey(consumerInfo)) {
+ final AMQPFederationQueueConsumerManager entry =
federationConsumers.get(consumerInfo);
logger.trace("Reducing demand on federated queue {}",
entry.getQueueName());
- entry.removeDemand(identifyConsumer(consumer));
+ entry.removeDemand(consumer);
}
}
}
@@ -156,17 +159,19 @@ public final class AMQPFederationQueuePolicyManager
extends AMQPFederationLocalP
final String queueName = consumer.getQueue().getName().toString();
if (testIfQueueMatchesPolicy(consumer.getQueueAddress().toString(),
queueName)) {
+ final boolean federationConsumer = isFederationConsumer(consumer);
+
// We should ignore federation consumers from remote peers but
configuration does allow
// these to be federated again for some very specific use cases so we
check before then
// moving onto any server plugin checks kick in.
- if (federationConsumerMatcher.test(consumer)) {
+ if (federationConsumer && !policy.isIncludeFederated()) {
return;
}
logger.trace("Federation Policy matched on consumer for binding: {}",
consumer.getBinding());
final AMQPFederationQueueConsumerManager entry;
- final FederationConsumerInfo consumerInfo =
createConsumerInfo(consumer);
+ final FederationConsumerInfo consumerInfo =
createConsumerInfo(consumer, federationConsumer);
// Check for existing consumer add demand from a additional local
consumer to ensure
// the remote consumer remains active until all local demand is
withdrawn.
@@ -174,13 +179,13 @@ public final class AMQPFederationQueuePolicyManager
extends AMQPFederationLocalP
logger.trace("Federation Queue Policy manager found existing
demand for queue: {}, adding demand", queueName);
entry = federationConsumers.get(consumerInfo);
} else {
- federationConsumers.put(consumerInfo, entry = new
AMQPFederationQueueConsumerManager(this, consumerInfo, consumer.getQueue()));
+ federationConsumers.put(consumerInfo, entry = new
AMQPFederationQueueConsumerManager(this, CONSUMER_INFO_ATTACHMENT_KEY,
consumerInfo, consumer.getQueue()));
}
// Demand passed all binding plugin blocking checks so we track it,
plugin can still
// stop federation of the queue based on some external criteria but
once it does
// (if ever) allow it we will have tracked all allowed demand.
- entry.addDemand(identifyConsumer(consumer));
+ entry.addDemand(consumer);
}
}
@@ -190,6 +195,7 @@ public final class AMQPFederationQueuePolicyManager extends
AMQPFederationLocalP
*
* @param addressName The address that was added on the remote.
* @param queueName The queue that was added on the remote.
+ *
* @throws Exception if an error occurs while processing the queue added
event.
*/
public synchronized void afterRemoteQueueAdded(String addressName, String
queueName) throws Exception {
@@ -219,6 +225,7 @@ public final class AMQPFederationQueuePolicyManager extends
AMQPFederationLocalP
*
* @param address The address that is being tested for a policy match.
* @param queueName The name of the queue that is being tested for a policy
match.
+ *
* @return {@code true} if the address given is a match against the policy
*/
private boolean testIfQueueMatchesPolicy(String address, String queueName) {
@@ -231,6 +238,7 @@ public final class AMQPFederationQueuePolicyManager extends
AMQPFederationLocalP
* configured matching policy.
*
* @param queueName The name of the queue that is being tested for a policy
match.
+ *
* @return {@code true} if the address given is a match against the policy
*/
private boolean testIfQueueMatchesPolicy(String queueName) {
@@ -239,23 +247,20 @@ public final class AMQPFederationQueuePolicyManager
extends AMQPFederationLocalP
/**
* Create a new {@link FederationConsumerInfo} based on the given {@link
ServerConsumer} and the configured
- * {@link FederationReceiveFromQueuePolicy}. A subclass must override this
method to return a consumer information
- * object with additional data used be that implementation.
+ * {@link FederationReceiveFromQueuePolicy}. This should only be called
once when a consumer is added and
+ * we begin tracking it as demand on a federated queue.
*
* @param consumer The {@link ServerConsumer} to use as a basis for the
consumer information object.
+ * @param federationConsumer Is the consumer one that was created by a
remote federation controller
+ *
* @return a new {@link FederationConsumerInfo} instance based on the
server consumer
*/
- private FederationConsumerInfo createConsumerInfo(ServerConsumer consumer) {
+ private FederationConsumerInfo createConsumerInfo(ServerConsumer consumer,
boolean federationConsumer) {
final Queue queue = consumer.getQueue();
final String queueName = queue.getName().toString();
final String address = queue.getAddress().toString();
-
- final int priority = configuration.isIgnoreSubscriptionPriorities() ?
- ActiveMQDefaultConfiguration.getDefaultConsumerPriority() +
policy.getPriorityAjustment() :
- consumer.getPriority() + policy.getPriorityAjustment();
-
- final String filterString =
- selectFilter(queue.getFilter(),
configuration.isIgnoreSubscriptionFilters() ? null : consumer.getFilter());
+ final int priority = selectPriority(consumer, federationConsumer);
+ final String filterString = selectFilter(consumer);
return new AMQPFederationGenericConsumerInfo(Role.QUEUE_CONSUMER,
address,
@@ -266,78 +271,84 @@ public final class AMQPFederationQueuePolicyManager
extends AMQPFederationLocalP
priority);
}
- @Override
- protected AMQPFederationConsumer
createFederationConsumer(FederationConsumerInfo consumerInfo) {
- Objects.requireNonNull(consumerInfo, "Federation Queue consumer
information object was null");
+ private String selectFilter(ServerConsumer consumer) {
+ final Filter consumerFilter;
+ final Filter queueFilter = consumer.getQueue().getFilter();
- if (logger.isTraceEnabled()) {
- logger.trace("AMQP Federation {} creating queue consumer: {} for
policy: {}", federation.getName(), consumerInfo, policy.getPolicyName());
+ if (!configuration.isIgnoreSubscriptionFilters()) {
+ consumerFilter = consumer.getFilter();
+ } else {
+ consumerFilter = null;
}
- // Don't initiate anything yet as the caller might need to register
error handlers etc
- // before the attach is sent otherwise they could miss the failure case.
- return new AMQPFederationQueueConsumer(this, configuration, session,
consumerInfo, metrics.newConsumerMetrics());
+ if (consumerFilter != null) {
+ return consumerFilter.getFilterString().toString();
+ } else if (queueFilter != null) {
+ return queueFilter.getFilterString().toString();
+ } else {
+ return null;
+ }
}
- /**
- * Creates a {@link Predicate} that should return true if the given
consumer is a federation created consumer which
- * should not be further federated.
- *
- * @param server The server instance for use in creating the filtering
{@link Predicate}.
- * @param policy The configured Queue matching policy that can provide
additional match criteria.
- * @return a {@link Predicate} that will return true if the consumer should
be filtered
- * @throws ActiveMQException if an error occurs while creating the new
consumer filter.
- */
- private Predicate<ServerConsumer>
createFederationConsumerMatcher(ActiveMQServer server,
FederationReceiveFromQueuePolicy policy) throws ActiveMQException {
- if (policy.isIncludeFederated()) {
- return (consumer) -> false; // Configuration says to federate these
+ private int selectPriority(ServerConsumer consumer, boolean
federationConsumer) {
+ // Use the priority from the federation consumer as indicated and only
choose values for
+ // non-federation consumers to help avoid an infinite descending
priority loop when the
+ // federation consumers are included and the configured brokers loop or
are bi-directional.
+ if (federationConsumer) {
+ return consumer.getPriority();
+ } else if (configuration.isIgnoreSubscriptionPriorities()) {
+ return ActiveMQDefaultConfiguration.getDefaultConsumerPriority() +
policy.getPriorityAjustment();
} else {
- // This filter matches on the same criteria as the original Core
client based
- // Federation code which allows this implementation to see those
consumers as
- // well as its own which in this methods implementation must also use
this same
- // mechanism to mark federation resources.
-
- final Filter metaDataMatcher =
- FilterImpl.createFilter("\"" + FEDERATION_NAME + "\" IS NOT NULL");
-
- return (consumer) -> {
- final ServerSession serverSession =
server.getSessionByID(consumer.getSessionID());
-
- if (serverSession != null && serverSession.getMetaData() != null) {
- return metaDataMatcher.match(serverSession.getMetaData());
- } else {
- return false;
- }
- };
+ return consumer.getPriority() + policy.getPriorityAjustment();
}
}
- private static String identifyConsumer(ServerConsumer consumer) {
- return consumer.getConnectionID().toString() + ":" +
- consumer.getSessionID() + ":" +
- consumer.getID();
+ /*
+ * This method matches on the same criteria as the original Core client
based Federation code which
+ * allows this implementation to see those consumers as well as its own
which in this methods
+ * implementation must also use this same mechanism to mark federation
resources.
+ */
+ private boolean isFederationConsumer(ServerConsumer consumer) {
+ final ServerSession serverSession =
server.getSessionByID(consumer.getSessionID());
+
+ // Care must be taken to only check this on consumer added and not on
other consumer removed
+ // events as the session can be removed before those events are
triggered and this will falsely
+ // indicate that the consumer is not a federation consumer. This check
works for both AMQP and
+ // Core federation consumers.
+ if (serverSession != null && serverSession.getMetaData() != null) {
+ return serverSession.getMetaData(FEDERATION_NAME) != null;
+ } else {
+ return false;
+ }
}
- private static String selectFilter(Filter queueFilter, Filter
consumerFilter) {
- if (consumerFilter != null) {
- return consumerFilter.getFilterString().toString();
- } else {
- return queueFilter != null ? queueFilter.getFilterString().toString()
: null;
+ @Override
+ protected AMQPFederationConsumer
createFederationConsumer(FederationConsumerInfo consumerInfo) {
+ Objects.requireNonNull(consumerInfo, "Federation Queue consumer
information object was null");
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("AMQP Federation {} creating queue consumer: {} for
policy: {}", federation.getName(), consumerInfo, policy.getPolicyName());
}
+
+ // Don't initiate anything yet as the caller might need to register
error handlers etc
+ // before the attach is sent otherwise they could miss the failure case.
+ return new AMQPFederationQueueConsumer(this, configuration, session,
consumerInfo, metrics.newConsumerMetrics());
}
- private static class AMQPFederationQueueConsumerManager extends
AMQPFederationConsumerManager {
+ private static class AMQPFederationQueueConsumerManager extends
AMQPFederationConsumerManager<ServerConsumer> {
private final AMQPFederationQueuePolicyManager manager;
private final Queue queue;
private final FederationConsumerInfo consumerInfo;
+ private final String policyInfoKey;
- AMQPFederationQueueConsumerManager(AMQPFederationQueuePolicyManager
manager, FederationConsumerInfo consumerInfo, Queue queue) {
+ AMQPFederationQueueConsumerManager(AMQPFederationQueuePolicyManager
manager, String policyInfoKey, FederationConsumerInfo consumerInfo, Queue
queue) {
super(manager);
this.manager = manager;
this.queue = queue;
this.consumerInfo = consumerInfo;
+ this.policyInfoKey = policyInfoKey;
}
/**
@@ -356,5 +367,17 @@ public final class AMQPFederationQueuePolicyManager
extends AMQPFederationLocalP
protected boolean isPluginBlockingFederationConsumerCreate() {
return manager.isPluginBlockingFederationConsumerCreate(queue);
}
+
+ @Override
+ protected void whenDemandTrackingEntryRemoved(ServerConsumer consumer) {
+ consumer.removeAttachment(policyInfoKey);
+ }
+
+ @Override
+ protected void whenDemandTrackingEntryAdded(ServerConsumer consumer) {
+ // Attach the consumer info to the server consumer for later use on
consumer close or other
+ // operations that need to retrieve the data used to create the
federation consumer identity.
+ consumer.addAttachment(policyInfoKey, consumerInfo);
+ }
}
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
index cc0bc74083..0b02a74f66 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server;
import java.util.List;
+import java.util.Map;
import java.util.function.Function;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -108,4 +109,43 @@ public interface ServerConsumer extends Consumer,
ConsumerInfo {
* @param transaction the tx
*/
void metricsAcknowledge(MessageReference ref, Transaction transaction);
+
+ /**
+ * Adds the given attachment to the {@link ServerConsumer} which will
overwrite any previously
+ * assigned value with the same key.
+ *
+ * @param key
+ * The key used to identify the attachment.
+ * @param attachment
+ * The actual value to store for the assigned key.
+ */
+ void addAttachment(String key, Object attachment);
+
+ /**
+ * Remove the any attachment entry from the {@link ServerConsumer} clearing
any previously assigned value
+ *
+ * @param key
+ * The key used to identify the attachment.
+ */
+ void removeAttachment(String key);
+
+ /**
+ * Gets any attachment that has been assigned to this {@link
ServerConsumer} using the provided key.
+ * If no value was assigned a null is returned.
+ *
+ * @param key
+ * The key identifying the target attachment.
+ *
+ * @return the assigned value associated with the given key or null if
nothing assigned.
+ */
+ Object getAttachment(String key);
+
+ /**
+ * Provides access to the full {@link Map} of consumer attachments in an
unmodifiable {@link Map} instance.
+ * If no attachments are assigned to the consumer an empty {@link Map}
instance is returned, never null.
+ *
+ * @return an unmodifiable {@link Map} that carries all consumer
attachments.
+ */
+ Map<String, Object> getAttachments();
+
}
\ No newline at end of file
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index d8b76a8b07..6977005735 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -21,9 +21,11 @@ import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -163,6 +165,8 @@ public class ServerConsumerImpl implements ServerConsumer,
ReadyListener {
private boolean isClosed = false;
+ private Map<String, Object> attachments;
+
@Override
public boolean isClosed() {
return isClosed;
@@ -638,6 +642,8 @@ public class ServerConsumerImpl implements ServerConsumer,
ReadyListener {
callback = null;
session = null;
+
+ attachments = null;
});
}
@@ -1609,6 +1615,36 @@ public class ServerConsumerImpl implements
ServerConsumer, ReadyListener {
return callback;
}
+ @Override
+ public synchronized void addAttachment(String key, Object attachment) {
+ if (attachments == null) {
+ attachments = new HashMap<>();
+ }
+
+ attachments.put(key, attachment);
+ }
+
+ @Override
+ public synchronized void removeAttachment(String key) {
+ if (attachments != null) {
+ attachments.remove(key);
+ }
+ }
+
+ @Override
+ public synchronized Object getAttachment(String key) {
+ if (attachments != null) {
+ return attachments.get(key);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public synchronized Map<String, Object> getAttachments() {
+ return attachments != null ? Collections.unmodifiableMap(attachments) :
Collections.emptyMap();
+ }
+
static class ServerConsumerMetrics extends TransactionOperationAbstract {
/**
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
index 146e3288eb..37785e53b8 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
@@ -2544,14 +2544,14 @@ public class AMQPFederationQueuePolicyTest extends
AmqpClientTestSupport {
target.waitForScriptToComplete(5, TimeUnit.SECONDS);
// broker should create a new receiver that extends the federation
receiver to this "broker"
- // but because this is a federation of a federation the priority
should drop by an additional
- // increment as we apply the adjustment on each step
+ // but because this is a federation of a federation the priority
should remain unchanged from
+ // what the source broker requested when creating the initial
consumer.
target.expectAttach().ofReceiver().withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString())
.withName(allOf(containsString(getTestName()),
containsString("test::test"),
containsString("queue-receiver"),
containsString(server.getNodeID().toString())))
-
.withProperty(FEDERATION_RECEIVER_PRIORITY.toString(),
DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT - 1).respond()
+
.withProperty(FEDERATION_RECEIVER_PRIORITY.toString(),
DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT).respond()
.withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString());
// Should get a flow but if the link goes away quick enough the
broker won't get to this before detaching.
target.expectFlow().withLinkCredit(1000).optional();
@@ -4822,6 +4822,104 @@ public class AMQPFederationQueuePolicyTest extends
AmqpClientTestSupport {
}
}
+ @Test
+ public void testOverlappingPolicyTargetsRetainDemandTrackingState() throws
Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .respond()
+
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+ peer.expectAttach().ofReceiver()
+ .withSenderSettleModeSettled()
+
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
+ .respondInKind();
+ peer.expectFlow().withLinkCredit(10);
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+ // Ensure that policies that overlap on the same queue don't break
the tracking of federation
+ // consumers for the other policy if they store state data in
consumer attachments. We want the
+ // state data to not match between these two so that one could trip
up the close event processing
+ // of the other if that was indeed broken.
+
+ final AMQPFederationQueuePolicyElement receiveFromQueue1 = new
AMQPFederationQueuePolicyElement();
+ receiveFromQueue1.setName("first");
+ receiveFromQueue1.addToIncludes("#", getTestName());
+ receiveFromQueue1.addProperty(IGNORE_QUEUE_CONSUMER_FILTERS,
Boolean.TRUE.toString());
+ receiveFromQueue1.addProperty(QUEUE_RECEIVER_IDLE_TIMEOUT, 10); //
Stagger final drain to make frames predictable
+
+ final AMQPFederationQueuePolicyElement receiveFromQueue2 = new
AMQPFederationQueuePolicyElement();
+ receiveFromQueue2.setName("second");
+ receiveFromQueue2.addToIncludes("#", getTestName());
+ receiveFromQueue2.addProperty(IGNORE_QUEUE_CONSUMER_FILTERS,
Boolean.FALSE.toString());
+ receiveFromQueue2.addProperty(QUEUE_RECEIVER_IDLE_TIMEOUT, 20); //
Stagger final drain to make frames predictable
+
+ final AMQPFederatedBrokerConnectionElement element = new
AMQPFederatedBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addLocalQueuePolicy(receiveFromQueue1);
+ element.addLocalQueuePolicy(receiveFromQueue2);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.addElement(element);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.ANYCAST)
+
.setAddress("test")
+
.setAutoCreated(false));
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString())
+ .withName(allOf(containsString(getTestName()),
+ containsString("test::" +
getTestName()),
+
containsString(server.getNodeID().toString())))
+
.withProperty(FEDERATION_RECEIVER_PRIORITY.toString(),
DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT)
+ .respond()
+
.withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString());
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString())
+ .withName(allOf(containsString(getTestName()),
+ containsString("test::" +
getTestName()),
+
containsString(server.getNodeID().toString())))
+
.withProperty(FEDERATION_RECEIVER_PRIORITY.toString(),
DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT)
+ .respond()
+
.withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString());
+ peer.expectFlow().withLinkCredit(1000);
+ peer.expectFlow().withLinkCredit(1000);
+
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer =
session.createConsumer(session.createQueue(getTestName()), "color='red' OR
color = 'blue'");
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(1000).withDrain(true)
+ .respond()
+
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true).afterDelay(2);
+ peer.expectFlow().withLinkCredit(1000).withDrain(true)
+ .respond()
+
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true).afterDelay(2);
+
+ peer.expectDetach().respond();
+ peer.expectDetach().respond();
+
+ consumer.close();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+ }
+ }
+ }
+
private static void sendQueueAddedEvent(ProtonTestPeer peer, String
address, String queue, int handle, int deliveryId) {
final Map<String, Object> eventMap = new LinkedHashMap<>();
eventMap.put(REQUESTED_ADDRESS_NAME, address);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
index ab0654e7ff..3e912931b1 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.amqp.connect;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_PRIORITIES;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.PULL_RECEIVER_BATCH_SIZE;
import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -1493,28 +1494,40 @@ public class AMQPFederationServerToServerTest extends
AmqpClientTestSupport {
@RepeatedTest(1)
@Timeout(20)
public void
testTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessageProduceOnLocal()
throws Exception {
-
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(true,
false);
+
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(true,
false, true);
}
@RepeatedTest(1)
@Timeout(20)
public void
testTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessageProduceOnRemote()
throws Exception {
-
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(false,
false);
+
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(false,
false, true);
}
@RepeatedTest(1)
@Timeout(20)
public void
testTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessageProduceOnLocalIncludeFederated()
throws Exception {
-
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(true,
true);
+
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(true,
true, true);
}
@RepeatedTest(1)
@Timeout(20)
public void
testTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessageProduceOnRemoteIncludeFederated()
throws Exception {
-
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(false,
true);
+
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(false,
true, true);
}
- public void
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(boolean
produceLocal, boolean includeFederated) throws Exception {
+ @RepeatedTest(1)
+ @Timeout(20)
+ public void
testTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessageProduceOnLocalIncludeFederatedAmdPriority()
throws Exception {
+
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(true,
true, false);
+ }
+
+ @RepeatedTest(1)
+ @Timeout(20)
+ public void
testTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessageProduceOnRemoteIncludeFederatedAndPriority()
throws Exception {
+
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(false,
true, false);
+ }
+
+ public void
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(boolean
produceLocal, boolean includeFederated, boolean ignorePriority) throws
Exception {
logger.info("Test started: {}", getTestName());
final AMQPFederationQueuePolicyElement localQueuePolicy = new
AMQPFederationQueuePolicyElement();
@@ -1523,6 +1536,7 @@ public class AMQPFederationServerToServerTest extends
AmqpClientTestSupport {
localQueuePolicy.addProperty(RECEIVER_CREDITS, 0); // Enable
Pull mode
localQueuePolicy.addProperty(PULL_RECEIVER_BATCH_SIZE, 1); // Pull mode
batch is one
localQueuePolicy.setIncludeFederated(includeFederated);
+ localQueuePolicy.addProperty(IGNORE_QUEUE_CONSUMER_PRIORITIES,
Boolean.toString(ignorePriority));
final AMQPFederationQueuePolicyElement remoteQueuePolicy = new
AMQPFederationQueuePolicyElement();
remoteQueuePolicy.setName("test-policy-2");
@@ -1530,6 +1544,7 @@ public class AMQPFederationServerToServerTest extends
AmqpClientTestSupport {
remoteQueuePolicy.addProperty(RECEIVER_CREDITS, 0); // Enable
Pull mode
remoteQueuePolicy.addProperty(PULL_RECEIVER_BATCH_SIZE, 1); // Pull mode
batch is one
remoteQueuePolicy.setIncludeFederated(includeFederated);
+ remoteQueuePolicy.addProperty(IGNORE_QUEUE_CONSUMER_PRIORITIES,
Boolean.toString(ignorePriority));
final AMQPFederatedBrokerConnectionElement element = new
AMQPFederatedBrokerConnectionElement();
element.setName(getTestName());
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index 8fae3800bf..6483f07ce9 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -16,7 +16,9 @@
*/
package org.apache.activemq.artemis.tests.integration.cli;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.function.Function;
import org.apache.activemq.artemis.api.core.RoutingType;
@@ -328,4 +330,24 @@ public class DummyServerConsumer implements ServerConsumer
{
public int getMessagesAcknowledgedAwaitingCommit() {
return 0;
}
+
+ @Override
+ public void addAttachment(String key, Object attachment) {
+
+ }
+
+ @Override
+ public Object getAttachment(String key) {
+ return null;
+ }
+
+ @Override
+ public Map<String, Object> getAttachments() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public void removeAttachment(String key) {
+
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact