This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 224b898  ARTEMIS-2007 - allow redistribution if there are unmatched 
messages pending on a queue and there is new remote demand
224b898 is described below

commit 224b89810d023e3ee73b014a48585da8f6048237
Author: gtully <[email protected]>
AuthorDate: Thu Jun 24 16:24:48 2021 +0100

    ARTEMIS-2007 - allow redistribution if there are unmatched messages pending 
on a queue and there is new remote demand
---
 .../protocol/mqtt/MQTTSubscriptionManager.java     |   6 +-
 .../artemis/core/server/ActiveMQServerLogger.java  |   2 +-
 .../apache/activemq/artemis/core/server/Queue.java |   2 +-
 .../artemis/core/server/impl/QueueImpl.java        | 118 +++---
 .../server/impl/ScheduledDeliveryHandlerTest.java  |   2 +-
 docs/user-manual/en/clusters.md                    |  71 +---
 .../cluster/SelectorRedistributionClusterTest.java | 406 +++++++++++++++++++++
 .../tests/unit/core/postoffice/impl/FakeQueue.java |   2 +-
 .../tests/unit/core/server/impl/QueueImplTest.java |  98 +++++
 .../unit/core/server/impl/fakes/FakeConsumer.java  |   7 +
 10 files changed, 593 insertions(+), 121 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index c23f994..b6db148 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -211,8 +211,10 @@ public class MQTTSubscriptionManager {
          Set<Consumer> queueConsumers;
          if (queue != null && (queueConsumers = (Set<Consumer>) 
queue.getConsumers()) != null) {
             for (Consumer consumer : queueConsumers) {
-               ((ServerConsumer) consumer).close(false);
-               consumerQoSLevels.remove(((ServerConsumer) consumer).getID());
+               if (consumer instanceof ServerConsumer) {
+                  ((ServerConsumer) consumer).close(false);
+                  consumerQoSLevels.remove(((ServerConsumer) 
consumer).getID());
+               }
             }
          }
       }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index a056c84..6bae295 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1481,7 +1481,7 @@ public interface ActiveMQServerLogger extends BasicLogger 
{
    void unableToFlushDeliveries(@Cause Exception e);
 
    @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 222237, value = "Unable to flush deliveries", format = 
Message.Format.MESSAGE_FORMAT)
+   @Message(id = 222237, value = "Unable to stop redistributor", format = 
Message.Format.MESSAGE_FORMAT)
    void unableToCancelRedistributor(@Cause Exception e);
 
    @LogMessage(level = Logger.Level.WARN)
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index dedf190..aa43170 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -380,7 +380,7 @@ public interface Queue extends Bindable,CriticalComponent {
 
    void addRedistributor(long delay);
 
-   void cancelRedistributor() throws Exception;
+   void cancelRedistributor();
 
    boolean hasMatchingConsumer(Message message);
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index e0d8126..6909cf6 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -113,7 +113,6 @@ import 
org.apache.activemq.artemis.utils.collections.NodeStore;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
 import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
-import org.apache.activemq.artemis.utils.collections.SingletonIterator;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
 import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
@@ -188,6 +187,8 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
    private boolean mirrorController;
 
+   private volatile boolean hasUnMatchedPending = false;
+
    // Messages will first enter intermediateMessageReferences
    // Before they are added to messageReferences
    // This is to avoid locking the queue on the producer
@@ -260,7 +261,8 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
    private final SimpleString address;
 
-   private ConsumerHolder<Redistributor> redistributor;
+   // redistributor goes in the consumers list, this signals its presence and 
allows for easy comparison/check
+   private volatile ConsumerHolder<Redistributor> redistributor;
 
    private ScheduledFuture<?> redistributorFuture;
 
@@ -1326,12 +1328,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       getExecutor().execute(new Runnable() {
          @Override
          public void run() {
-            try {
-               cancelRedistributor();
-            } catch (Exception e) {
-               // nothing that could be done anyway.. just logging
-               ActiveMQServerLogger.LOGGER.unableToCancelRedistributor(e);
-            }
+            cancelRedistributor();
          }
       });
 
@@ -1391,7 +1388,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
             return false;
          }
 
-         if (consumers.size() >= consumersBeforeDispatch) {
+         if (getConsumerCount() >= consumersBeforeDispatch) {
             if (dispatchingUpdater.compareAndSet(this, 
BooleanUtil.toInt(false), BooleanUtil.toInt(true))) {
                dispatchStartTimeUpdater.set(this, System.currentTimeMillis());
             }
@@ -1419,7 +1416,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
       try (ArtemisCloseable metric = measureCritical(CRITICAL_CONSUMER)) {
          synchronized (this) {
-            if (maxConsumers != MAX_CONSUMERS_UNLIMITED && consumers.size() >= 
maxConsumers) {
+            if (maxConsumers != MAX_CONSUMERS_UNLIMITED && getConsumerCount() 
>= maxConsumers) {
                throw 
ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name);
             }
 
@@ -1488,7 +1485,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
             if (consumerRemoved) {
                consumerRemovedTimestampUpdater.set(this, 
System.currentTimeMillis());
-               if (consumers.size() == 0) {
+               if (getConsumerCount() == 0) {
                   stopDispatch();
                }
             }
@@ -1525,11 +1522,6 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
             supports = false;
          }
       }
-      if (redistributor != null) {
-         if (!redistributor.consumer.supportsDirectDelivery()) {
-            supports = false;
-         }
-      }
       return supports;
    }
 
@@ -1540,10 +1532,11 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       if (redistributor != null) {
          // Just prompt delivery
          deliverAsync();
+         return;
       }
 
       if (delay > 0) {
-         if (consumers.isEmpty()) {
+         if (consumers.isEmpty() || hasUnMatchedPending) {
             DelayedAddRedistributor dar = new 
DelayedAddRedistributor(executor);
 
             redistributorFuture = scheduledExecutor.schedule(dar, delay, 
TimeUnit.MILLISECONDS);
@@ -1562,17 +1555,34 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    }
 
    @Override
-   public synchronized void cancelRedistributor() throws Exception {
+   public synchronized void cancelRedistributor() {
+      clearRedistributorFuture();
+
       if (redistributor != null) {
-         redistributor.consumer.stop();
-         redistributor = null;
+         try {
+            redistributor.consumer.stop();
+         } catch (Exception e) {
+            ActiveMQServerLogger.LOGGER.unableToCancelRedistributor(e);
+         } finally {
+            consumers.remove(redistributor);
+            redistributor = null;
+         }
       }
-
-      clearRedistributorFuture();
    }
 
    @Override
    public int getConsumerCount() {
+      // we don't want to count the redistributor, it is an internal transient 
entry in the consumer list
+      if (redistributor != null) {
+         synchronized (this) {
+            final int size = consumers.size();
+            if (size > 0 && redistributor != null) {
+               return size - 1;
+            } else {
+               return size;
+            }
+         }
+      }
       return consumers.size();
    }
 
@@ -1775,7 +1785,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    public Map<String, List<MessageReference>> getDeliveringMessages() {
       final Iterator<ConsumerHolder<? extends Consumer>> 
consumerHolderIterator;
       synchronized (this) {
-         consumerHolderIterator = redistributor == null ? consumers.iterator() 
: SingletonIterator.newInstance(redistributor);
+         consumerHolderIterator = consumers.iterator();
       }
 
       Map<String, List<MessageReference>> mapReturn = new HashMap<>();
@@ -2767,9 +2777,6 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       for (ConsumerHolder holder : this.consumers) {
          holder.resetIterator();
       }
-      if (redistributor != null) {
-         redistributor.resetIterator();
-      }
    }
 
    @Override
@@ -2972,6 +2979,10 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       // Either the iterator is empty or the consumer is busy
       int noDelivery = 0;
 
+      // track filters not matching, used to track when all consumers can't 
match, redistribution is then an option
+      int numNoMatch = 0;
+      int numAttempts = 0;
+
       int handled = 0;
 
       long timeout = System.nanoTime() + 
TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
@@ -2998,9 +3009,6 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
          MessageReference ref;
 
-         // filter evaluation or transformation may cause properties to be 
lazyDecoded, we need to reflect that
-         int existingMemoryEstimate = 0;
-
          Consumer handledconsumer = null;
 
          synchronized (this) {
@@ -3015,14 +3023,10 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
             }
 
             ConsumerHolder<? extends Consumer> holder;
-            if (redistributor == null) {
-               if (consumers.hasNext()) {
-                  holder = consumers.next();
-               } else {
-                  break;
-               }
+            if (consumers.hasNext()) {
+               holder = consumers.next();
             } else {
-               holder = redistributor;
+               break;
             }
 
             Consumer consumer = holder.consumer;
@@ -3032,6 +3036,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
                holder.iter = messageReferences.iterator();
             }
 
+            // LVQ support
             ref = nextDelivery();
             boolean nextDelivery = false;
             if (ref != null) {
@@ -3059,7 +3064,6 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
                   logger.trace("Queue " + this.getName() + " is delivering 
reference " + ref);
                }
 
-               existingMemoryEstimate = ref.getMessageMemoryEstimate();
                final SimpleString groupID = extractGroupID(ref);
                groupConsumer = getGroupConsumer(groupID);
 
@@ -3067,15 +3071,18 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
                   consumer = groupConsumer;
                }
 
+               numAttempts++;
                HandleStatus status = handle(ref, consumer);
 
                if (status == HandleStatus.HANDLED) {
 
-                  // if a message was delivered, any previous negative attemps 
need to be cleared
+                  // if a message was delivered, any previous negative 
attempts need to be cleared
                   // this is to avoid breaks on the loop when checking for any 
other factors.
                   noDelivery = 0;
+                  numNoMatch = 0;
+                  numAttempts = 0;
 
-                  if (redistributor == null) {
+                  if (consumer != redistributor) {
                      ref = handleMessageGroup(ref, consumer, groupConsumer, 
groupID);
                   }
 
@@ -3104,13 +3111,21 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
                   }
 
                   noDelivery++;
+                  numNoMatch = 0;
+                  numAttempts = 0;
+                  // no consumers.reset() b/c we skip this consumer
                } else if (status == HandleStatus.NO_MATCH) {
-                  // nothing to be done on this case, the iterators will just 
jump next
                   consumers.reset();
+                  numNoMatch++;
+                  // every attempt resulted in noMatch for number of consumers 
means we tried all consumers for a single message
+                  if (numNoMatch == numAttempts && numAttempts == 
consumers.size()) {
+                     hasUnMatchedPending = true;
+                     // one hit of unmatched message is enough, no need to 
reset counters
+                  }
                }
             }
 
-            if (redistributor != null || groupConsumer != null) {
+            if (groupConsumer != null) {
                if (noDelivery > 0) {
                   break;
                }
@@ -3303,15 +3318,14 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    }
 
    private void internalAddRedistributor(final ArtemisExecutor executor) {
-      // create the redistributor only once if there are no local consumers
-      if (consumers.isEmpty() && redistributor == null) {
+      if (redistributor == null && (consumers.isEmpty() || 
hasUnMatchedPending)) {
          if (logger.isTraceEnabled()) {
             logger.trace("QueueImpl::Adding redistributor on queue " + 
this.toString());
          }
-
-         redistributor = (new ConsumerHolder(new Redistributor(this, 
storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE)));
-
+         redistributor = new ConsumerHolder(new Redistributor(this, 
storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE));
          redistributor.consumer.start();
+         consumers.add(redistributor);
+         hasUnMatchedPending = false;
 
          deliverAsync();
       }
@@ -3749,9 +3763,9 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
          consumers.reset();
 
-         while (consumers.hasNext() || redistributor != null) {
+         while (consumers.hasNext()) {
 
-            ConsumerHolder<? extends Consumer> holder = redistributor == null 
? consumers.next() : redistributor;
+            ConsumerHolder<? extends Consumer> holder = consumers.next();
             Consumer consumer = holder.consumer;
 
             final SimpleString groupID = extractGroupID(ref);
@@ -3764,7 +3778,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
             HandleStatus status = handle(ref, consumer);
             if (status == HandleStatus.HANDLED) {
                final MessageReference reference;
-               if (redistributor == null) {
+               if (consumer != redistributor) {
                   reference = handleMessageGroup(ref, consumer, groupConsumer, 
groupID);
                } else {
                   reference = ref;
@@ -4527,12 +4541,12 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
             logger.debug(getAddress() + ":" + getName() + " has " + 
queueMessages + " message(s) and " + getConsumerCount() + " consumer(s) and is 
receiving messages at a rate of " + queueRate + " msgs/second.");
          }
 
-
-         if (consumers.size() == 0) {
+         final int consumerCount = getConsumerCount();
+         if (consumerCount == 0) {
             logger.debug("There are no consumers, no need to check slow 
consumer's rate");
             return;
          } else {
-            float queueThreshold = thresholdInMsgPerSecond * consumers.size();
+            float queueThreshold = thresholdInMsgPerSecond * consumerCount;
 
             if (queueRate < queueThreshold && queueMessages < queueThreshold) {
                if (logger.isDebugEnabled()) {
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 068a442..f3b215f 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -1475,7 +1475,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public void cancelRedistributor() throws Exception {
+      public void cancelRedistributor() {
 
       }
 
diff --git a/docs/user-manual/en/clusters.md b/docs/user-manual/en/clusters.md
index 3c039c2..d88a7c7 100644
--- a/docs/user-manual/en/clusters.md
+++ b/docs/user-manual/en/clusters.md
@@ -634,9 +634,7 @@ specified. The following shows all the available 
configuration options
   
   Keep in mind that this message forwarding/balancing is what we call
   "initial distribution." It is different than *redistribution* which
-  is [discussed below](#message-redistribution). This distinction is 
-  important because redistribution is configured differently and has 
-  unique semantics (e.g. it *does not* support filters (selectors)).
+  is [discussed below](#message-redistribution).
 
   Default is `ON_DEMAND`.
 
@@ -823,14 +821,14 @@ consumers on the queue the message won't get consumed and 
we have a
 
 This is where message redistribution comes in. With message
 redistribution Apache ActiveMQ Artemis can be configured to automatically
-*redistribute* messages from queues which have no consumers back to
+*redistribute* messages from queues which have no consumers or consumers
+with filters that don't match messages. The messages are re-routed to
 other nodes in the cluster which do have matching consumers. To enable
 this functionality `message-load-balancing` must be `ON_DEMAND`.
 
 Message redistribution can be configured to kick in immediately after
-the last consumer on a queue is closed, or to wait a configurable delay
-after the last consumer on a queue is closed before redistributing. By
-default message redistribution is disabled.
+the need to redistribute is detected, or to wait a configurable delay before 
redistributing.
+By default, message redistribution is disabled.
 
 Message redistribution can be configured on a per address basis, by
 specifying the redistribution delay in the address settings. For more
@@ -855,68 +853,15 @@ The attribute `match` can be an exact match or it can be 
a string that
 conforms to the Apache ActiveMQ Artemis wildcard syntax (described in 
[Wildcard Syntax](wildcard-syntax.md)).
 
 The element `redistribution-delay` defines the delay in milliseconds
-after the last consumer is closed on a queue before redistributing
-messages from that queue to other nodes of the cluster which do have
-matching consumers. A delay of zero means the messages will be
-immediately redistributed. A value of `-1` signifies that messages will
-never be redistributed. The default value is `-1`.
+between detecting the need for redistribution and actually attempting 
redistribution.
+A delay of zero means the messages will be immediately redistributed. 
+A value of `-1` signifies that messages will never be redistributed. The 
default value is `-1`.
 
 It often makes sense to introduce a delay before redistributing as it's
 a common case that a consumer closes but another one quickly is created
 on the same queue, in such a case you probably don't want to
 redistribute immediately since the new consumer will arrive shortly.
 
-#### Redistribution and filters (selectors) 
-
-Although "initial distribution" (described above) does support filters
-(selectors), redistribution does *not* support filters. Consider this
-scenario:
-
- 1. A cluster of 2 nodes - `A` and `B` - using a `redistribution-delay` of
-   `0` and a `message-load-balancing` of `ON_DEMAND`.
- 1. `A` and `B` each has the queue `foo`.
- 1. A producer sends a message which is routed to queue `foo` on node `A`. 
-   The message has property named `myProperty` with a value of `10`.
- 1. A consumer connects to queue `foo` on node `A` with the filter 
-   `myProperty=5`. This filter doesn't match the message.
- 1. A consumer connects to queue `foo` on node `B` with the filter 
-   `myProperty=10`. This filter *does* match the message .
-
-Despite the fact that the filter of the consumer on queue `foo` on node `B`
-matches the message, the message will *not* be redistributed from node `A` to
-node `B` because a consumer for the queue exists on node `A`.
-
-Not supporting redistribution based on filters was an explicit design decision
-in order to avoid two main problems - queue scanning and unnecessary 
-redistribution.
-
-From a performance perspective a consumer with a filter on a queue is already
-costly due to the scanning that the broker must do on the queue to find 
-matching messages. In general, this is a bit of an anti-pattern as it turns
-the broker into something akin to a database where you can "select" the data 
-you want using a filter. If brokers are configured in a cluster and a consumer 
-with a filter connects and no matches are found after scanning the local queue
-then potentially every instance of that queue in the cluster would need to be 
-scanned. This turns into a bit of a scalability nightmare with lots of 
consumers 
-(especially short-lived consumers) with filters connecting & disconnecting 
-frequently. The time & computing resources used for queue scanning would go 
-through the roof.
-
-It is also possible to get into a pathological situation where short-lived 
-consumers with filters connect to nodes around the cluster and messages get 
-redistributed back and forth between nodes without ever actually being 
consumed.
-
-One common use-case for consumers with filters (selectors) on queues is
-request/reply using a correlation ID. Following the standard pattern can be
-problematic in a cluster due to the lack of redistribution based on filters
-already described. However, there is a simple way to ensure an application
-using this request/reply pattern gets its reply even when using a correlation
-ID filter in a cluster - create the consumer before the request is sent. This
-will ensure that when the reply is sent it will be routed the proper cluster
-node since "*initial* distribution" (described above) does support filters.
-For example, in the scenario outlined above if steps 3 and 5 were switched
-(i.e. if the consumers were created before the message was sent) then the 
-consumer on node `B` would in fact receive the message.
 
 ## Cluster topologies
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/SelectorRedistributionClusterTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/SelectorRedistributionClusterTest.java
new file mode 100644
index 0000000..38cd430
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/SelectorRedistributionClusterTest.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.cluster;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.tests.util.JMSClusteredTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SelectorRedistributionClusterTest extends JMSClusteredTestBase {
+
+   private final String myQueue = "myQueue";
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      jmsServer1.getActiveMQServer().setIdentity("Server 1");
+      jmsServer2.getActiveMQServer().setIdentity("Server 2");
+   }
+
+   @Override
+   protected boolean enablePersistence() {
+      return true;
+   }
+
+   @Test
+   public void testSelectorRoutingReDistributionOnNoConsumer() throws 
Exception {
+      
server1.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
+      
server2.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
+      Connection conn1 = cf1.createConnection();
+      Connection conn2 = cf2.createConnection();
+      conn1.start();
+      conn2.start();
+
+      try {
+         Session session1 = conn1.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+
+         Session session2 = conn2.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+
+         javax.jms.Queue jmsQueue = session1.createQueue(myQueue);
+
+         MessageProducer prod1 = session1.createProducer(jmsQueue);
+
+         prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         TextMessage textMessage = session1.createTextMessage("m1");
+         textMessage.setIntProperty("N", 10);
+
+
+         // remote demand with a filter in advance of send, so routing sees 
remote filter match and can ignore the local consumer
+         MessageConsumer cons2 = session2.createConsumer(jmsQueue, "N = 10");
+         // local consumer that does not match message
+         MessageConsumer cons1 = session1.createConsumer(jmsQueue, "N = 0");
+
+         // verify cluster notifications have completed before send
+         waitForBindings(server1, myQueue, false, 1, 1, 4000);
+
+         prod1.send(textMessage);
+
+         TextMessage received = (TextMessage) cons2.receive(4000);
+         assertNotNull(received);
+         assertEquals("m1", received.getText());
+
+         // lets check some redistribution back by close with no acknowledge
+         session2.close();
+
+         // consumer on server 1 does not match, redistribution not done yet, 
message still available to local consumer
+         session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         cons2 = session2.createConsumer(jmsQueue, "N = 10");
+         received = (TextMessage) cons2.receive(4000);
+         assertNotNull(received);
+
+         // have to create consumer matching filter on server1 in advance such 
that redistribution happens fast
+         MessageConsumer cons11 = session1.createConsumer(jmsQueue, "N = 10");
+
+         // now expect redistribution
+         session2.close();
+
+         // get it from server1
+         received = (TextMessage) cons11.receive(4000);
+         assertNotNull(received);
+         assertEquals("m1", received.getText());
+
+         // done
+         received.acknowledge();
+
+      } finally {
+         conn1.close();
+         conn2.close();
+      }
+   }
+
+   @Test
+   public void 
testSelectorRoutingNoReDistributionNewMessageSkipsTillLocalClose() throws 
Exception {
+      
server1.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
+      
server2.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
+      Connection conn1 = cf1.createConnection();
+      Connection conn2 = cf2.createConnection();
+      conn1.start();
+      conn2.start();
+
+      try {
+         Session session1 = conn1.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+         Session session11 = conn1.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+
+         Session session2 = conn2.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+
+         javax.jms.Queue jmsQueue = session1.createQueue(myQueue);
+
+         MessageProducer prod1 = session1.createProducer(jmsQueue);
+
+         prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         TextMessage textMessage = session1.createTextMessage("m1");
+         textMessage.setIntProperty("N", 10);
+
+
+         // remote demand with a filter in advance of send
+         MessageConsumer cons2 = session2.createConsumer(jmsQueue, "N = 10");
+         // local consumer that does not match message
+         MessageConsumer cons1 = session1.createConsumer(jmsQueue, "N = 0");
+         // local consumer that matches message
+         MessageConsumer cons11 = session11.createConsumer(jmsQueue, "N = 10");
+
+         // verify cluster notifications have completed before send
+         waitForBindings(server1, myQueue, false, 1, 1, 4000);
+
+         prod1.send(textMessage);
+
+         TextMessage received = (TextMessage) cons11.receive(4000);
+         assertNotNull(received);
+         assertEquals("m1", received.getText());
+
+         // lets check some redistribution by close with no acknowledge so 
session rolls back delivery
+         session11.close();
+
+         // nothing for the existing remote binding
+         received = (TextMessage) cons2.receiveNoWait();
+         assertNull(received);
+
+         // send a second message, it will get routed to the remote binding
+         textMessage = session1.createTextMessage("m2");
+         textMessage.setIntProperty("N", 10);
+
+         prod1.send(textMessage);
+
+         received = (TextMessage) cons2.receive(4000);
+         assertNotNull(received);
+         assertEquals("m2", received.getText());
+         received.acknowledge();
+
+         // release the local consumer such that redistribution kicks in
+         session1.close();
+
+         received = (TextMessage) cons2.receive(4000);
+         assertNotNull(received);
+         assertEquals("m1", received.getText());
+
+         // done
+         received.acknowledge();
+
+      } finally {
+         conn1.close();
+         conn2.close();
+      }
+   }
+
+
+   @Test
+   public void testSelectorRoutingReDistributionDoesNotBlockLocalConsumer() 
throws Exception {
+      
server1.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
+      
server2.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
+      Connection conn1 = cf1.createConnection();
+      Connection conn2 = cf2.createConnection();
+      conn1.start();
+      conn2.start();
+
+      try {
+         Session session1 = conn1.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+         Session session11 = conn1.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+
+         Session session2 = conn2.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+
+         javax.jms.Queue jmsQueue = session1.createQueue(myQueue);
+
+         MessageProducer prod1 = session1.createProducer(jmsQueue);
+
+         prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         TextMessage textMessage = session1.createTextMessage("m1");
+         textMessage.setIntProperty("N", 10);
+
+
+         // local consumers that does not match message
+         MessageConsumer cons1_0 = session1.createConsumer(jmsQueue, "N = 0");
+         MessageConsumer cons1_1 = session1.createConsumer(jmsQueue, "N = 1");
+
+         // local consumer that matches message
+         MessageConsumer cons1_10 = session11.createConsumer(jmsQueue, "N = 
10");
+
+         prod1.send(textMessage);
+
+         TextMessage received = (TextMessage) cons1_10.receive(4000);
+         assertNotNull(received);
+         assertEquals("m1", received.getText());
+
+         // lets check some redistribution by close with no acknowledge so 
session rolls back delivery
+         session11.close();
+
+         // remote demand with a filter, consumer moved triggers 
redistribution event b/c all local consumers don't match
+         MessageConsumer cons2_10 = session2.createConsumer(jmsQueue, "N = 
10");
+
+         received = (TextMessage) cons2_10.receive(4000);
+         assertNotNull(received);
+         assertEquals("m1", received.getText());
+         received.acknowledge();
+
+         // check local consumers can still get dispatched
+         textMessage = session1.createTextMessage("m2");
+         textMessage.setIntProperty("N", 0);
+         prod1.send(textMessage);
+
+         textMessage = session1.createTextMessage("m3");
+         textMessage.setIntProperty("N", 1);
+         prod1.send(textMessage);
+
+
+         received = (TextMessage) cons1_0.receive(4000);
+         assertNotNull(received);
+         assertEquals("m2", received.getText());
+         received.acknowledge();
+
+         received = (TextMessage) cons1_1.receive(4000);
+         assertNotNull(received);
+         assertEquals("m3", received.getText());
+         received.acknowledge();
+
+         // verify redistributor still kicks in too
+         textMessage = session1.createTextMessage("m4");
+         textMessage.setIntProperty("N", 10);
+         prod1.send(textMessage);
+
+         received = (TextMessage) cons2_10.receive(4000);
+         assertNotNull(received);
+         assertEquals("m4", received.getText());
+         received.acknowledge();
+
+      } finally {
+         conn1.close();
+         conn2.close();
+      }
+   }
+
+
+   @Test
+   public void testSelectorRoutingReDistributionOnConsumerMove() throws 
Exception {
+      
server1.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
+      
server2.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
+      Connection conn1 = cf1.createConnection();
+      Connection conn2 = cf2.createConnection();
+      conn1.start();
+      conn2.start();
+
+      try {
+         Session session1 = conn1.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+         Session session11 = conn1.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+
+         Session session2 = conn2.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+
+         javax.jms.Queue jmsQueue = session1.createQueue(myQueue);
+
+         MessageProducer prod1 = session1.createProducer(jmsQueue);
+
+         prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         TextMessage textMessage = session1.createTextMessage("m1");
+         textMessage.setIntProperty("N", 10);
+
+
+         // local consumers that does not match message
+         MessageConsumer cons1 = session1.createConsumer(jmsQueue, "N = 0");
+         MessageConsumer cons12 = session1.createConsumer(jmsQueue, "N = 1");
+
+         // local consumer that matches message
+         MessageConsumer cons111 = session11.createConsumer(jmsQueue, "N = 
10");
+
+         prod1.send(textMessage);
+
+         TextMessage received = (TextMessage) cons111.receive(4000);
+         assertNotNull(received);
+         assertEquals("m1", received.getText());
+
+         // lets check some redistribution by close with no acknowledge so 
session rolls back delivery
+         session11.close();
+
+         // remote demand with a filter, consumer moved triggers 
redistribution event b/c all local consumers don't match
+         MessageConsumer cons2 = session2.createConsumer(jmsQueue, "N = 10");
+
+         received = (TextMessage) cons2.receive(4000);
+         assertNotNull(received);
+         assertEquals("m1", received.getText());
+
+         received.acknowledge();
+
+      } finally {
+         conn1.close();
+         conn2.close();
+      }
+   }
+
+   @Test
+   public void 
testSelectorRoutingReDistributionOnLocalNoMatchConsumerCloseNeedsNewRemoteDemand()
 throws Exception {
+      
server1.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
+      
server2.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
+      Connection conn1 = cf1.createConnection();
+      Connection conn2 = cf2.createConnection();
+      conn1.start();
+      conn2.start();
+
+      try {
+         Session session1 = conn1.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+         Session session1_n_10 = conn1.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+         Session session1_n_1 = conn1.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+
+         Session session2 = conn2.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+
+         javax.jms.Queue jmsQueue = session1.createQueue(myQueue);
+
+
+         MessageProducer prod1 = session1.createProducer(jmsQueue);
+
+         prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         TextMessage textMessage = session1.createTextMessage("m1");
+         textMessage.setIntProperty("N", 10);
+
+
+         // remote demand with a filter
+         MessageConsumer consumer2_n_10 = session2.createConsumer(jmsQueue, "N 
= 10");
+
+         // local consumers that does not match message
+         MessageConsumer consumer1_n_0 = session1.createConsumer(jmsQueue, "N 
= 0");
+         MessageConsumer consumer1_n_1 = session1_n_1.createConsumer(jmsQueue, 
"N = 1");
+
+         // local consumer that matches message
+         MessageConsumer consumer1_n_10 = 
session1_n_10.createConsumer(jmsQueue, "N = 10");
+
+         // verify cluster notifications have completed before send
+         waitForBindings(server1, myQueue, false, 1, 1, 4000);
+         waitForBindings(server1, myQueue, true, 1, 3, 4000);
+
+         prod1.send(textMessage);
+
+         TextMessage received = (TextMessage) consumer1_n_10.receive(4000);
+         assertNotNull(received);
+         assertEquals("m1", received.getText());
+
+         // lets prepare some non matching message for redistribution by close 
with no acknowledge so session rolls back delivery
+         session1_n_10.close();
+
+         // verify no redistribution event yet
+         assertNull(consumer2_n_10.receiveNoWait());
+
+         // local remove consumer event will not trigger redistribution
+         session1_n_1.close();
+
+         // verify no redistribution event yet
+         assertNull(consumer2_n_10.receiveNoWait());
+
+         // force a redistribution event on new remote consumer creation (that 
won't match in this case), trigger redistribution
+         MessageConsumer consumer2_n_0 = session2.createConsumer(jmsQueue, "N 
= 0");
+
+         // verify redistribution to remote
+         received = (TextMessage) consumer2_n_10.receive(4000);
+         assertNotNull(received);
+         assertEquals("m1", received.getText());
+
+         received.acknowledge();
+
+      } finally {
+         conn1.close();
+         conn2.close();
+      }
+   }
+}
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 06f78b2..fb2fdc5 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -439,7 +439,7 @@ public class FakeQueue extends CriticalComponentImpl 
implements Queue {
    }
 
    @Override
-   public void cancelRedistributor() throws Exception {
+   public void cancelRedistributor() {
       // no-op
 
    }
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
index 47555df..0493c8b 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
@@ -967,6 +967,104 @@ public class QueueImplTest extends ActiveMQTestBase {
       Assert.assertEquals(20, queue.getDeliveringCount());
    }
 
+
+   @Test
+   public void testNoMatchConsumersAllowsRedistribution() throws Exception {
+      QueueImpl queue = getTemporaryQueue();
+
+      final int numMessages = 2;
+      List<MessageReference> refs = new ArrayList<>();
+
+      for (int i = 0; i < numMessages; i++) {
+         MessageReference ref = generateReference(queue, i);
+         ref.getMessage().putStringProperty("color", "red");
+         refs.add(ref);
+
+         queue.addTail(ref);
+      }
+
+      Assert.assertEquals(numMessages, getMessageCount(queue));
+      queue.deliverNow();
+
+      Assert.assertEquals(numMessages, getMessageCount(queue));
+
+      FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color 
= 'green'"));
+      queue.addConsumer(consumer);
+
+      FakeConsumer consumer2 = new FakeConsumer(FilterImpl.createFilter("color 
= 'orange'"));
+      queue.addConsumer(consumer2);
+
+      queue.deliverNow();
+      Assert.assertEquals(0, consumer.getReferences().size());
+      Assert.assertEquals(0, consumer2.getReferences().size());
+
+      // verify redistributor is doing some work....
+      try {
+         // should attempt to add due to unmatched
+         queue.addRedistributor(0);
+         fail("expect error on attempt to add addRedistributor - npe b/c no 
storage etc");
+      } catch (NullPointerException expected) {
+      }
+
+      // verify with odd number as check depends on order/reset/wrap of 
consumers
+      FakeConsumer consumer3 = new FakeConsumer(FilterImpl.createFilter("color 
= 'blue'"));
+      queue.addConsumer(consumer3);
+
+      queue.deliverNow();
+
+      Assert.assertEquals(0, consumer.getReferences().size());
+      Assert.assertEquals(0, consumer2.getReferences().size());
+      Assert.assertEquals(0, consumer3.getReferences().size());
+
+      // verify redistributor is doing some work....
+      try {
+         // should attempt to add due to unmatched
+         queue.addRedistributor(0);
+         fail("expect error on attempt to add addRedistributor - npe b/c no 
storage etc");
+      } catch (NullPointerException expected) {
+      }
+
+      Assert.assertEquals(numMessages, getMessageCount(queue));
+   }
+
+   @Test
+   public void testNoMatchOn3AllowsRedistribution() throws Exception {
+      QueueImpl queue = getTemporaryQueue();
+
+      int i = 0;
+      MessageReference ref = generateReference(queue, i++);
+      ref.getMessage().putStringProperty("color", "red");
+      queue.addTail(ref);
+
+      ref = generateReference(queue, i++);
+      ref.getMessage().putStringProperty("color", "red");
+      queue.addTail(ref);
+
+      ref = generateReference(queue, i++);
+      ref.getMessage().putStringProperty("color", "blue");
+      queue.addTail(ref);
+
+      Assert.assertEquals(3, getMessageCount(queue));
+
+      FakeConsumer consumerRed = new 
FakeConsumer(FilterImpl.createFilter("color = 'red'"));
+      queue.addConsumer(consumerRed);
+
+      FakeConsumer consumerOrange = new 
FakeConsumer(FilterImpl.createFilter("color = 'orange'"));
+      queue.addConsumer(consumerOrange);
+
+      queue.deliverNow();
+      Assert.assertEquals(2, consumerRed.getReferences().size());
+      Assert.assertEquals(0, consumerOrange.getReferences().size());
+
+      // verify redistributor is doing some work....
+      try {
+         // should attempt to add due to unmatched
+         queue.addRedistributor(0);
+         fail("expect error on attempt to add addRedistributor - npe b/c no 
storage etc");
+      } catch (NullPointerException expected) {
+      }
+   }
+
    // Private 
------------------------------------------------------------------------------
 
    private void testConsumerWithFilters(final boolean direct) throws Exception 
{
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
index 2a5a330..fe31cfa 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
@@ -157,4 +157,11 @@ public class FakeConsumer implements Consumer {
       return Collections.emptyList();
    }
 
+   @Override
+   public String toString() {
+      if (filter != null) {
+         return filter + ", " + super.toString();
+      }
+      return super.toString();
+   }
 }

Reply via email to