This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new cfffb46149 ARTEMIS-5368 Propagate Federation consumer priority 
unchanged
cfffb46149 is described below

commit cfffb46149a66a5aedeb5edde017472e5e8d6d71
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Mar 27 14:52:36 2025 -0400

    ARTEMIS-5368 Propagate Federation consumer priority unchanged
    
    When passing along a federation queue consumer that matches the includes
    of a federation queue policy retain the original consumer priority so
    that if the consumers loop the propagation will stop at the source that
    started the loop.
---
 .../AMQPFederationAddressPolicyManager.java        |  16 +-
 .../federation/AMQPFederationConsumerManager.java  |  42 +++++-
 .../AMQPFederationQueuePolicyManager.java          | 165 ++++++++++++---------
 .../artemis/core/server/ServerConsumer.java        |  40 +++++
 .../core/server/impl/ServerConsumerImpl.java       |  36 +++++
 .../connect/AMQPFederationQueuePolicyTest.java     | 104 ++++++++++++-
 .../connect/AMQPFederationServerToServerTest.java  |  25 +++-
 .../tests/integration/cli/DummyServerConsumer.java |  22 +++
 8 files changed, 362 insertions(+), 88 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
index 04540e7d84..10f9bb3fa3 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
@@ -93,7 +93,7 @@ public final class AMQPFederationAddressPolicyManager extends 
AMQPFederationLoca
    @Override
    public synchronized void afterRemoveAddress(SimpleString address, 
AddressInfo addressInfo) throws ActiveMQException {
       if (isActive()) {
-         final AMQPFederationConsumerManager entry = 
federationConsumers.remove(address.toString());
+         final AMQPFederationAddressConsumerManager entry = 
federationConsumers.remove(address.toString());
 
          if (entry != null) {
             logger.trace("Federated address {} was removed, closing federation 
consumer", address);
@@ -357,7 +357,7 @@ public final class AMQPFederationAddressPolicyManager 
extends AMQPFederationLoca
       // current demand and don't need to re-check the server state before 
trying to create the
       // remote address consumer.
       if (isActive() && testIfAddressMatchesPolicy(addressName, 
RoutingType.MULTICAST)) {
-         final AMQPFederationConsumerManager entry = 
federationConsumers.get(addressName);
+         final AMQPFederationAddressConsumerManager entry = 
federationConsumers.get(addressName);
 
          if (entry != null) {
             entry.recover();
@@ -451,7 +451,7 @@ public final class AMQPFederationAddressPolicyManager 
extends AMQPFederationLoca
       return false;
    }
 
-   private static class AMQPFederationAddressConsumerManager extends 
AMQPFederationConsumerManager {
+   private static class AMQPFederationAddressConsumerManager extends 
AMQPFederationConsumerManager<Binding> {
 
       private final AMQPFederationAddressPolicyManager manager;
       private final AddressInfo addressInfo;
@@ -486,5 +486,15 @@ public final class AMQPFederationAddressPolicyManager 
extends AMQPFederationLoca
       protected boolean isPluginBlockingFederationConsumerCreate() {
          return manager.isPluginBlockingFederationConsumerCreate(addressInfo);
       }
+
+      @Override
+      protected void whenDemandTrackingEntryAdded(Binding entry) {
+         // No current action needed
+      }
+
+      @Override
+      protected void whenDemandTrackingEntryRemoved(Binding entry) {
+         // No current action needed
+      }
    }
 }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerManager.java
index 6c0577ed84..0c8f9968ed 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerManager.java
@@ -32,8 +32,10 @@ import org.slf4j.LoggerFactory;
  * <p>
  * All interactions with the consumer tracking entry should occur under the 
lock of the parent manager instance and this
  * manager will perform any asynchronous work with a lock held on the parent 
manager instance.
+ *
+ * @param <E> The type used in the demand tracking collection.
  */
-public abstract class AMQPFederationConsumerManager {
+public abstract class AMQPFederationConsumerManager<E> {
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
@@ -48,7 +50,7 @@ public abstract class AMQPFederationConsumerManager {
 
    private final AMQPFederation federation;
    private final AMQPFederationLocalPolicyManager manager;
-   private final Set<Object> demandTracking = new HashSet<>();
+   private final Set<E> demandTracking = new HashSet<>();
 
    private State state = State.READY;
    private AMQPFederationConsumer consumer;
@@ -94,6 +96,14 @@ public abstract class AMQPFederationConsumerManager {
          } finally {
             consumer = null;
 
+            demandTracking.forEach(entry -> {
+               try {
+                  whenDemandTrackingEntryRemoved(entry);
+               } catch (Exception ignore) {
+               }
+            });
+            demandTracking.clear();
+
             // Stop handler in closed state won't schedule idle timeout, ensure
             // any pending task is canceled just for proper cleanup purposes.
             if (pendingIdleTimeout != null) {
@@ -135,10 +145,12 @@ public abstract class AMQPFederationConsumerManager {
     *
     * @param demand A new unit of demand to add to this consumer manager.
     */
-   public void addDemand(Object demand) {
+   public void addDemand(E demand) {
       checkClosed();
 
-      demandTracking.add(demand);
+      if (demandTracking.add(demand)) {
+         whenDemandTrackingEntryAdded(demand);
+      }
 
       // This will create a new consumer only if there isn't one currently 
assigned to this entry and any configured
       // federation plugins don't block it from doing so. An already stopping 
consumer will check on stop if it should
@@ -159,10 +171,12 @@ public abstract class AMQPFederationConsumerManager {
     *
     * @param demand The element of demand that should be removed from tracking.
     */
-   public void removeDemand(Object demand) {
+   public void removeDemand(E demand) {
       checkClosed();
 
-      demandTracking.remove(demand);
+      if (demandTracking.remove(demand)) {
+         whenDemandTrackingEntryRemoved(demand);
+      }
 
       if (hasDemand() || state == State.READY) {
          return;
@@ -372,6 +386,22 @@ public abstract class AMQPFederationConsumerManager {
       }
    }
 
+   /**
+    * An event point that a subclass can use to perform an initialization 
action whenever an entry is added to
+    * demand tracking.
+    *
+    * @param entry The entry that is being added to demand tracking.
+    */
+   protected abstract void whenDemandTrackingEntryAdded(E entry);
+
+   /**
+    * An event point that a subclass can use to perform a cleanup action 
whenever an entry is removed from
+    * demand tracking.
+    *
+    * @param entry The entry that is being removed from demand tracking.
+    */
+   protected abstract void whenDemandTrackingEntryRemoved(E entry);
+
    /**
     * Creates a new federation consumer that this manager will monitor and 
maintain. The returned consumer should be in
     * an initial state ready for this manager to initialize once it is fully 
configured.
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java
index 412004fb89..d6c5bb31ab 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueuePolicyManager.java
@@ -23,15 +23,13 @@ import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
-import java.util.function.Predicate;
+import java.util.UUID;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerSession;
@@ -52,17 +50,22 @@ public final class AMQPFederationQueuePolicyManager extends 
AMQPFederationLocalP
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-   protected final Predicate<ServerConsumer> federationConsumerMatcher;
    protected final FederationReceiveFromQueuePolicy policy;
    protected final Map<FederationConsumerInfo, 
AMQPFederationQueueConsumerManager> federationConsumers = new HashMap<>();
 
+   /*
+    * Unique for the lifetime of this policy manager which is either the 
lifetime of the broker or until the
+    * configuration is reloaded and this federation instance happens to be 
updated but not removed in which
+    * case the full federation instance is shutdown and then removed and 
re-added as if it was new.
+    */
+   private final String CONSUMER_INFO_ATTACHMENT_KEY = 
UUID.randomUUID().toString();
+
    public AMQPFederationQueuePolicyManager(AMQPFederation federation, 
AMQPFederationMetrics metrics, FederationReceiveFromQueuePolicy queuePolicy) 
throws ActiveMQException {
       super(federation, metrics, queuePolicy);
 
       Objects.requireNonNull(queuePolicy, "The Queue match policy cannot be 
null");
 
       this.policy = queuePolicy;
-      this.federationConsumerMatcher = createFederationConsumerMatcher(server, 
queuePolicy);
    }
 
    /**
@@ -100,12 +103,12 @@ public final class AMQPFederationQueuePolicyManager 
extends AMQPFederationLocalP
    @Override
    public synchronized void afterCloseConsumer(ServerConsumer consumer, 
boolean failed) {
       if (isActive()) {
-         final FederationConsumerInfo consumerInfo = 
createConsumerInfo(consumer);
-         final AMQPFederationQueueConsumerManager entry = 
federationConsumers.get(consumerInfo);
+         final FederationConsumerInfo consumerInfo = (FederationConsumerInfo) 
consumer.getAttachment(CONSUMER_INFO_ATTACHMENT_KEY);
 
-         if (entry != null) {
+         if (consumerInfo != null && 
federationConsumers.containsKey(consumerInfo)) {
+            final AMQPFederationQueueConsumerManager entry = 
federationConsumers.get(consumerInfo);
             logger.trace("Reducing demand on federated queue {}", 
entry.getQueueName());
-            entry.removeDemand(identifyConsumer(consumer));
+            entry.removeDemand(consumer);
          }
       }
    }
@@ -156,17 +159,19 @@ public final class AMQPFederationQueuePolicyManager 
extends AMQPFederationLocalP
       final String queueName = consumer.getQueue().getName().toString();
 
       if (testIfQueueMatchesPolicy(consumer.getQueueAddress().toString(), 
queueName)) {
+         final boolean federationConsumer = isFederationConsumer(consumer);
+
          // We should ignore federation consumers from remote peers but 
configuration does allow
          // these to be federated again for some very specific use cases so we 
check before then
          // moving onto any server plugin checks kick in.
-         if (federationConsumerMatcher.test(consumer)) {
+         if (federationConsumer && !policy.isIncludeFederated()) {
             return;
          }
 
          logger.trace("Federation Policy matched on consumer for binding: {}", 
consumer.getBinding());
 
          final AMQPFederationQueueConsumerManager entry;
-         final FederationConsumerInfo consumerInfo = 
createConsumerInfo(consumer);
+         final FederationConsumerInfo consumerInfo = 
createConsumerInfo(consumer, federationConsumer);
 
          // Check for existing consumer add demand from a additional local 
consumer to ensure
          // the remote consumer remains active until all local demand is 
withdrawn.
@@ -174,13 +179,13 @@ public final class AMQPFederationQueuePolicyManager 
extends AMQPFederationLocalP
             logger.trace("Federation Queue Policy manager found existing 
demand for queue: {}, adding demand", queueName);
             entry = federationConsumers.get(consumerInfo);
          } else {
-            federationConsumers.put(consumerInfo, entry = new 
AMQPFederationQueueConsumerManager(this, consumerInfo, consumer.getQueue()));
+            federationConsumers.put(consumerInfo, entry = new 
AMQPFederationQueueConsumerManager(this, CONSUMER_INFO_ATTACHMENT_KEY, 
consumerInfo, consumer.getQueue()));
          }
 
          // Demand passed all binding plugin blocking checks so we track it, 
plugin can still
          // stop federation of the queue based on some external criteria but 
once it does
          // (if ever) allow it we will have tracked all allowed demand.
-         entry.addDemand(identifyConsumer(consumer));
+         entry.addDemand(consumer);
       }
    }
 
@@ -190,6 +195,7 @@ public final class AMQPFederationQueuePolicyManager extends 
AMQPFederationLocalP
     *
     * @param addressName The address that was added on the remote.
     * @param queueName   The queue that was added on the remote.
+    *
     * @throws Exception if an error occurs while processing the queue added 
event.
     */
    public synchronized void afterRemoteQueueAdded(String addressName, String 
queueName) throws Exception {
@@ -219,6 +225,7 @@ public final class AMQPFederationQueuePolicyManager extends 
AMQPFederationLocalP
     *
     * @param address   The address that is being tested for a policy match.
     * @param queueName The name of the queue that is being tested for a policy 
match.
+    *
     * @return {@code true} if the address given is a match against the policy
     */
    private boolean testIfQueueMatchesPolicy(String address, String queueName) {
@@ -231,6 +238,7 @@ public final class AMQPFederationQueuePolicyManager extends 
AMQPFederationLocalP
     * configured matching policy.
     *
     * @param queueName The name of the queue that is being tested for a policy 
match.
+    *
     * @return {@code true} if the address given is a match against the policy
     */
    private boolean testIfQueueMatchesPolicy(String queueName) {
@@ -239,23 +247,20 @@ public final class AMQPFederationQueuePolicyManager 
extends AMQPFederationLocalP
 
    /**
     * Create a new {@link FederationConsumerInfo} based on the given {@link 
ServerConsumer} and the configured
-    * {@link FederationReceiveFromQueuePolicy}. A subclass must override this 
method to return a consumer information
-    * object with additional data used be that implementation.
+    * {@link FederationReceiveFromQueuePolicy}. This should only be called 
once when a consumer is added and
+    * we begin tracking it as demand on a federated queue.
     *
     * @param consumer The {@link ServerConsumer} to use as a basis for the 
consumer information object.
+    * @param federationConsumer Is the consumer one that was created by a 
remote federation controller
+    *
     * @return a new {@link FederationConsumerInfo} instance based on the 
server consumer
     */
-   private FederationConsumerInfo createConsumerInfo(ServerConsumer consumer) {
+   private FederationConsumerInfo createConsumerInfo(ServerConsumer consumer, 
boolean federationConsumer) {
       final Queue queue = consumer.getQueue();
       final String queueName = queue.getName().toString();
       final String address = queue.getAddress().toString();
-
-      final int priority = configuration.isIgnoreSubscriptionPriorities() ?
-         ActiveMQDefaultConfiguration.getDefaultConsumerPriority() + 
policy.getPriorityAjustment() :
-         consumer.getPriority() + policy.getPriorityAjustment();
-
-      final String filterString =
-         selectFilter(queue.getFilter(), 
configuration.isIgnoreSubscriptionFilters() ? null : consumer.getFilter());
+      final int priority = selectPriority(consumer, federationConsumer);
+      final String filterString = selectFilter(consumer);
 
       return new AMQPFederationGenericConsumerInfo(Role.QUEUE_CONSUMER,
                                                    address,
@@ -266,78 +271,84 @@ public final class AMQPFederationQueuePolicyManager 
extends AMQPFederationLocalP
                                                    priority);
    }
 
-   @Override
-   protected AMQPFederationConsumer 
createFederationConsumer(FederationConsumerInfo consumerInfo) {
-      Objects.requireNonNull(consumerInfo, "Federation Queue consumer 
information object was null");
+   private String selectFilter(ServerConsumer consumer) {
+      final Filter consumerFilter;
+      final Filter queueFilter = consumer.getQueue().getFilter();
 
-      if (logger.isTraceEnabled()) {
-         logger.trace("AMQP Federation {} creating queue consumer: {} for 
policy: {}", federation.getName(), consumerInfo, policy.getPolicyName());
+      if (!configuration.isIgnoreSubscriptionFilters()) {
+         consumerFilter = consumer.getFilter();
+      } else {
+         consumerFilter = null;
       }
 
-      // Don't initiate anything yet as the caller might need to register 
error handlers etc
-      // before the attach is sent otherwise they could miss the failure case.
-      return new AMQPFederationQueueConsumer(this, configuration, session, 
consumerInfo, metrics.newConsumerMetrics());
+      if (consumerFilter != null) {
+         return consumerFilter.getFilterString().toString();
+      } else if (queueFilter != null) {
+         return queueFilter.getFilterString().toString();
+      } else {
+         return null;
+      }
    }
 
-   /**
-    * Creates a {@link Predicate} that should return true if the given 
consumer is a federation created consumer which
-    * should not be further federated.
-    *
-    * @param server The server instance for use in creating the filtering 
{@link Predicate}.
-    * @param policy The configured Queue matching policy that can provide 
additional match criteria.
-    * @return a {@link Predicate} that will return true if the consumer should 
be filtered
-    * @throws ActiveMQException if an error occurs while creating the new 
consumer filter.
-    */
-   private Predicate<ServerConsumer> 
createFederationConsumerMatcher(ActiveMQServer server, 
FederationReceiveFromQueuePolicy policy) throws ActiveMQException {
-      if (policy.isIncludeFederated()) {
-         return (consumer) -> false; // Configuration says to federate these
+   private int selectPriority(ServerConsumer consumer, boolean 
federationConsumer) {
+      // Use the priority from the federation consumer as indicated and only 
choose values for
+      // non-federation consumers to help avoid an infinite descending 
priority loop when the
+      // federation consumers are included and the configured brokers loop or 
are bi-directional.
+      if (federationConsumer) {
+         return consumer.getPriority();
+      } else if (configuration.isIgnoreSubscriptionPriorities()) {
+         return ActiveMQDefaultConfiguration.getDefaultConsumerPriority() + 
policy.getPriorityAjustment();
       } else {
-         // This filter matches on the same criteria as the original Core 
client based
-         // Federation code which allows this implementation to see those 
consumers as
-         // well as its own which in this methods implementation must also use 
this same
-         // mechanism to mark federation resources.
-
-         final Filter metaDataMatcher =
-            FilterImpl.createFilter("\"" + FEDERATION_NAME + "\" IS NOT NULL");
-
-         return (consumer) -> {
-            final ServerSession serverSession = 
server.getSessionByID(consumer.getSessionID());
-
-            if (serverSession != null && serverSession.getMetaData() != null) {
-               return metaDataMatcher.match(serverSession.getMetaData());
-            } else {
-               return false;
-            }
-         };
+         return consumer.getPriority() + policy.getPriorityAjustment();
       }
    }
 
-   private static String identifyConsumer(ServerConsumer consumer) {
-      return consumer.getConnectionID().toString() + ":" +
-             consumer.getSessionID() + ":" +
-             consumer.getID();
+   /*
+    * This method matches on the same criteria as the original Core client 
based Federation code which
+    * allows this implementation to see those consumers as well as its own 
which in this methods
+    * implementation must also use this same mechanism to mark federation 
resources.
+    */
+   private boolean isFederationConsumer(ServerConsumer consumer) {
+      final ServerSession serverSession = 
server.getSessionByID(consumer.getSessionID());
+
+      // Care must be taken to only check this on consumer added and not on 
other consumer removed
+      // events as the session can be removed before those events are 
triggered and this will falsely
+      // indicate that the consumer is not a federation consumer. This check 
works for both AMQP and
+      // Core federation consumers.
+      if (serverSession != null && serverSession.getMetaData() != null) {
+         return serverSession.getMetaData(FEDERATION_NAME) != null;
+      } else {
+         return false;
+      }
    }
 
-   private static String selectFilter(Filter queueFilter, Filter 
consumerFilter) {
-      if (consumerFilter != null) {
-         return consumerFilter.getFilterString().toString();
-      } else {
-         return queueFilter != null ? queueFilter.getFilterString().toString() 
: null;
+   @Override
+   protected AMQPFederationConsumer 
createFederationConsumer(FederationConsumerInfo consumerInfo) {
+      Objects.requireNonNull(consumerInfo, "Federation Queue consumer 
information object was null");
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("AMQP Federation {} creating queue consumer: {} for 
policy: {}", federation.getName(), consumerInfo, policy.getPolicyName());
       }
+
+      // Don't initiate anything yet as the caller might need to register 
error handlers etc
+      // before the attach is sent otherwise they could miss the failure case.
+      return new AMQPFederationQueueConsumer(this, configuration, session, 
consumerInfo, metrics.newConsumerMetrics());
    }
 
-   private static class AMQPFederationQueueConsumerManager extends 
AMQPFederationConsumerManager {
+   private static class AMQPFederationQueueConsumerManager extends 
AMQPFederationConsumerManager<ServerConsumer> {
 
       private final AMQPFederationQueuePolicyManager manager;
       private final Queue queue;
       private final FederationConsumerInfo consumerInfo;
+      private final String policyInfoKey;
 
-      AMQPFederationQueueConsumerManager(AMQPFederationQueuePolicyManager 
manager, FederationConsumerInfo consumerInfo, Queue queue) {
+      AMQPFederationQueueConsumerManager(AMQPFederationQueuePolicyManager 
manager, String policyInfoKey, FederationConsumerInfo consumerInfo, Queue 
queue) {
          super(manager);
 
          this.manager = manager;
          this.queue = queue;
          this.consumerInfo = consumerInfo;
+         this.policyInfoKey = policyInfoKey;
       }
 
       /**
@@ -356,5 +367,17 @@ public final class AMQPFederationQueuePolicyManager 
extends AMQPFederationLocalP
       protected boolean isPluginBlockingFederationConsumerCreate() {
          return manager.isPluginBlockingFederationConsumerCreate(queue);
       }
+
+      @Override
+      protected void whenDemandTrackingEntryRemoved(ServerConsumer consumer) {
+         consumer.removeAttachment(policyInfoKey);
+      }
+
+      @Override
+      protected void whenDemandTrackingEntryAdded(ServerConsumer consumer) {
+         // Attach the consumer info to the server consumer for later use on 
consumer close or other
+         // operations that need to retrieve the data used to create the 
federation consumer identity.
+         consumer.addAttachment(policyInfoKey, consumerInfo);
+      }
    }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
index cc0bc74083..0b02a74f66 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.server;
 
 import java.util.List;
+import java.util.Map;
 import java.util.function.Function;
 
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -108,4 +109,43 @@ public interface ServerConsumer extends Consumer, 
ConsumerInfo {
     * @param transaction the tx
     */
    void metricsAcknowledge(MessageReference ref, Transaction transaction);
+
+   /**
+    * Adds the given attachment to the {@link ServerConsumer} which will 
overwrite any previously
+    * assigned value with the same key.
+    *
+    * @param key
+    *    The key used to identify the attachment.
+    * @param attachment
+    *    The actual value to store for the assigned key.
+    */
+   void addAttachment(String key, Object attachment);
+
+   /**
+    * Remove the any attachment entry from the {@link ServerConsumer} clearing 
any previously assigned value
+    *
+    * @param key
+    *    The key used to identify the attachment.
+    */
+   void removeAttachment(String key);
+
+   /**
+    * Gets any attachment that has been assigned to this {@link 
ServerConsumer} using the provided key.
+    * If no value was assigned a null is returned.
+    *
+    * @param key
+    *    The key identifying the target attachment.
+    *
+    * @return the assigned value associated with the given key or null if 
nothing assigned.
+    */
+   Object getAttachment(String key);
+
+   /**
+    * Provides access to the full {@link Map} of consumer attachments in an 
unmodifiable {@link Map} instance.
+    * If no attachments are assigned to the consumer an empty {@link Map} 
instance is returned, never null.
+    *
+    * @return an unmodifiable {@link Map} that carries all consumer 
attachments.
+    */
+   Map<String, Object> getAttachments();
+
 }
\ No newline at end of file
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index d8b76a8b07..6977005735 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -21,9 +21,11 @@ import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -163,6 +165,8 @@ public class ServerConsumerImpl implements ServerConsumer, 
ReadyListener {
 
    private boolean isClosed = false;
 
+   private Map<String, Object> attachments;
+
    @Override
    public boolean isClosed() {
       return isClosed;
@@ -638,6 +642,8 @@ public class ServerConsumerImpl implements ServerConsumer, 
ReadyListener {
          callback = null;
 
          session = null;
+
+         attachments = null;
       });
 
    }
@@ -1609,6 +1615,36 @@ public class ServerConsumerImpl implements 
ServerConsumer, ReadyListener {
       return callback;
    }
 
+   @Override
+   public synchronized void addAttachment(String key, Object attachment) {
+      if (attachments == null) {
+         attachments = new HashMap<>();
+      }
+
+      attachments.put(key, attachment);
+   }
+
+   @Override
+   public synchronized void removeAttachment(String key) {
+      if (attachments != null) {
+         attachments.remove(key);
+      }
+   }
+
+   @Override
+   public synchronized Object getAttachment(String key) {
+      if (attachments != null) {
+         return attachments.get(key);
+      } else {
+         return null;
+      }
+   }
+
+   @Override
+   public synchronized Map<String, Object> getAttachments() {
+      return attachments != null ? Collections.unmodifiableMap(attachments) : 
Collections.emptyMap();
+   }
+
    static class ServerConsumerMetrics extends TransactionOperationAbstract {
 
       /**
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
index 146e3288eb..37785e53b8 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
@@ -2544,14 +2544,14 @@ public class AMQPFederationQueuePolicyTest extends 
AmqpClientTestSupport {
 
          target.waitForScriptToComplete(5, TimeUnit.SECONDS);
          // broker should create a new receiver that extends the federation 
receiver to this "broker"
-         // but because this is a federation of a federation the priority 
should drop by an additional
-         // increment as we apply the adjustment on each step
+         // but because this is a federation of a federation the priority 
should remain unchanged from
+         // what the source broker requested when creating the initial 
consumer.
          
target.expectAttach().ofReceiver().withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString())
                                            
.withName(allOf(containsString(getTestName()),
                                                            
containsString("test::test"),
                                                            
containsString("queue-receiver"),
                                                            
containsString(server.getNodeID().toString())))
-                                           
.withProperty(FEDERATION_RECEIVER_PRIORITY.toString(), 
DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT - 1).respond()
+                                           
.withProperty(FEDERATION_RECEIVER_PRIORITY.toString(), 
DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT).respond()
                                            
.withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString());
          // Should get a flow but if the link goes away quick enough the 
broker won't get to this before detaching.
          target.expectFlow().withLinkCredit(1000).optional();
@@ -4822,6 +4822,104 @@ public class AMQPFederationQueuePolicyTest extends 
AmqpClientTestSupport {
       }
    }
 
+   @Test
+   public void testOverlappingPolicyTargetsRetainDemandTrackingState() throws 
Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect();
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.expectAttach().ofSender()
+                            
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+                            .respond()
+                            
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+         peer.expectAttach().ofReceiver()
+                            .withSenderSettleModeSettled()
+                            
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
+                            .respondInKind();
+         peer.expectFlow().withLinkCredit(10);
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+         // Ensure that policies that overlap on the same queue don't break 
the tracking of federation
+         // consumers for the other policy if they store state data in 
consumer attachments. We want the
+         // state data to not match between these two so that one could trip 
up the close event processing
+         // of the other if that was indeed broken.
+
+         final AMQPFederationQueuePolicyElement receiveFromQueue1 = new 
AMQPFederationQueuePolicyElement();
+         receiveFromQueue1.setName("first");
+         receiveFromQueue1.addToIncludes("#", getTestName());
+         receiveFromQueue1.addProperty(IGNORE_QUEUE_CONSUMER_FILTERS, 
Boolean.TRUE.toString());
+         receiveFromQueue1.addProperty(QUEUE_RECEIVER_IDLE_TIMEOUT, 10); // 
Stagger final drain to make frames predictable
+
+         final AMQPFederationQueuePolicyElement receiveFromQueue2 = new 
AMQPFederationQueuePolicyElement();
+         receiveFromQueue2.setName("second");
+         receiveFromQueue2.addToIncludes("#", getTestName());
+         receiveFromQueue2.addProperty(IGNORE_QUEUE_CONSUMER_FILTERS, 
Boolean.FALSE.toString());
+         receiveFromQueue2.addProperty(QUEUE_RECEIVER_IDLE_TIMEOUT, 20); // 
Stagger final drain to make frames predictable
+
+         final AMQPFederatedBrokerConnectionElement element = new 
AMQPFederatedBrokerConnectionElement();
+         element.setName(getTestName());
+         element.addLocalQueuePolicy(receiveFromQueue1);
+         element.addLocalQueuePolicy(receiveFromQueue2);
+
+         final AMQPBrokerConnectConfiguration amqpConnection =
+            new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+         amqpConnection.setReconnectAttempts(0);// No reconnects
+         amqpConnection.addElement(element);
+
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+         
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.ANYCAST)
+                                                                
.setAddress("test")
+                                                                
.setAutoCreated(false));
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofReceiver()
+                            
.withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString())
+                            .withName(allOf(containsString(getTestName()),
+                                            containsString("test::" + 
getTestName()),
+                                            
containsString(server.getNodeID().toString())))
+                            
.withProperty(FEDERATION_RECEIVER_PRIORITY.toString(), 
DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT)
+                            .respond()
+                            
.withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString());
+         peer.expectAttach().ofReceiver()
+                            
.withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString())
+                            .withName(allOf(containsString(getTestName()),
+                                            containsString("test::" + 
getTestName()),
+                                            
containsString(server.getNodeID().toString())))
+                            
.withProperty(FEDERATION_RECEIVER_PRIORITY.toString(), 
DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT)
+                            .respond()
+                            
.withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString());
+         peer.expectFlow().withLinkCredit(1000);
+         peer.expectFlow().withLinkCredit(1000);
+
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final MessageConsumer consumer = 
session.createConsumer(session.createQueue(getTestName()), "color='red' OR 
color = 'blue'");
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.expectFlow().withLinkCredit(1000).withDrain(true)
+                             .respond()
+                             
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true).afterDelay(2);
+            peer.expectFlow().withLinkCredit(1000).withDrain(true)
+                             .respond()
+                             
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true).afterDelay(2);
+
+            peer.expectDetach().respond();
+            peer.expectDetach().respond();
+
+            consumer.close();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.close();
+         }
+      }
+   }
+
    private static void sendQueueAddedEvent(ProtonTestPeer peer, String 
address, String queue, int handle, int deliveryId) {
       final Map<String, Object> eventMap = new LinkedHashMap<>();
       eventMap.put(REQUESTED_ADDRESS_NAME, address);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
index ab0654e7ff..3e912931b1 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.activemq.artemis.tests.integration.amqp.connect;
 
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_PRIORITIES;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.PULL_RECEIVER_BATCH_SIZE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -1493,28 +1494,40 @@ public class AMQPFederationServerToServerTest extends 
AmqpClientTestSupport {
    @RepeatedTest(1)
    @Timeout(20)
    public void 
testTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessageProduceOnLocal()
 throws Exception {
-      
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(true,
 false);
+      
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(true,
 false, true);
    }
 
    @RepeatedTest(1)
    @Timeout(20)
    public void 
testTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessageProduceOnRemote()
 throws Exception {
-      
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(false,
 false);
+      
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(false,
 false, true);
    }
 
    @RepeatedTest(1)
    @Timeout(20)
    public void 
testTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessageProduceOnLocalIncludeFederated()
 throws Exception {
-      
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(true,
 true);
+      
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(true,
 true, true);
    }
 
    @RepeatedTest(1)
    @Timeout(20)
    public void 
testTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessageProduceOnRemoteIncludeFederated()
 throws Exception {
-      
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(false,
 true);
+      
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(false,
 true, true);
    }
 
-   public void 
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(boolean
 produceLocal, boolean includeFederated) throws Exception {
+   @RepeatedTest(1)
+   @Timeout(20)
+   public void 
testTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessageProduceOnLocalIncludeFederatedAmdPriority()
 throws Exception {
+      
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(true,
 true, false);
+   }
+
+   @RepeatedTest(1)
+   @Timeout(20)
+   public void 
testTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessageProduceOnRemoteIncludeFederatedAndPriority()
 throws Exception {
+      
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(false,
 true, false);
+   }
+
+   public void 
doTestTwoPullConsumerOnPullingFederationConfigurationEachCanTakeOneMessage(boolean
 produceLocal, boolean includeFederated, boolean ignorePriority) throws 
Exception {
       logger.info("Test started: {}", getTestName());
 
       final AMQPFederationQueuePolicyElement localQueuePolicy = new 
AMQPFederationQueuePolicyElement();
@@ -1523,6 +1536,7 @@ public class AMQPFederationServerToServerTest extends 
AmqpClientTestSupport {
       localQueuePolicy.addProperty(RECEIVER_CREDITS, 0);         // Enable 
Pull mode
       localQueuePolicy.addProperty(PULL_RECEIVER_BATCH_SIZE, 1); // Pull mode 
batch is one
       localQueuePolicy.setIncludeFederated(includeFederated);
+      localQueuePolicy.addProperty(IGNORE_QUEUE_CONSUMER_PRIORITIES, 
Boolean.toString(ignorePriority));
 
       final AMQPFederationQueuePolicyElement remoteQueuePolicy = new 
AMQPFederationQueuePolicyElement();
       remoteQueuePolicy.setName("test-policy-2");
@@ -1530,6 +1544,7 @@ public class AMQPFederationServerToServerTest extends 
AmqpClientTestSupport {
       remoteQueuePolicy.addProperty(RECEIVER_CREDITS, 0);         // Enable 
Pull mode
       remoteQueuePolicy.addProperty(PULL_RECEIVER_BATCH_SIZE, 1); // Pull mode 
batch is one
       remoteQueuePolicy.setIncludeFederated(includeFederated);
+      remoteQueuePolicy.addProperty(IGNORE_QUEUE_CONSUMER_PRIORITIES, 
Boolean.toString(ignorePriority));
 
       final AMQPFederatedBrokerConnectionElement element = new 
AMQPFederatedBrokerConnectionElement();
       element.setName(getTestName());
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index 8fae3800bf..6483f07ce9 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -16,7 +16,9 @@
  */
 package org.apache.activemq.artemis.tests.integration.cli;
 
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Function;
 
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -328,4 +330,24 @@ public class DummyServerConsumer implements ServerConsumer 
{
    public int getMessagesAcknowledgedAwaitingCommit() {
       return 0;
    }
+
+   @Override
+   public void addAttachment(String key, Object attachment) {
+
+   }
+
+   @Override
+   public Object getAttachment(String key) {
+      return null;
+   }
+
+   @Override
+   public Map<String, Object> getAttachments() {
+      return Collections.emptyMap();
+   }
+
+   @Override
+   public void removeAttachment(String key) {
+
+   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact



Reply via email to