This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new cea9ff6667 ARTEMIS-4259 JMS consumer + FQQN + selector not working
cea9ff6667 is described below

commit cea9ff6667cfec360750ffb036fe3ac51e2ffce6
Author: Clebert Suconic <clebertsuco...@apache.org>
AuthorDate: Tue Jun 6 15:42:02 2023 -0400

    ARTEMIS-4259 JMS consumer + FQQN + selector not working
    
    co-authored with Justin Bertram
---
 .../artemis/jms/client/ActiveMQSession.java        |  53 ++-
 .../protocol/amqp/broker/AMQPSessionCallback.java  |   8 +-
 .../amqp/proton/ProtonServerSenderContext.java     |  26 +-
 .../core/protocol/openwire/OpenWireConnection.java |   4 +-
 .../core/protocol/openwire/amq/AMQConsumer.java    |   2 +-
 .../client/AutoCreateJmsDestinationTest.java       |   2 +
 .../jms/multiprotocol/JMSFQQNConsumerTest.java     | 456 +++++++++++++++++++++
 7 files changed, 524 insertions(+), 27 deletions(-)

diff --git 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
index 2a726fa1fb..b8aa15c481 100644
--- 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
+++ 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
@@ -840,26 +840,15 @@ public class ActiveMQSession implements QueueSession, 
TopicSession {
                   throw new RuntimeException("Subscription name cannot be null 
for durable topic consumer");
                // Non durable sub
 
-               queueName = new SimpleString(UUID.randomUUID().toString());
 
-               if (!CompositeAddress.isFullyQualified(dest.getAddress())) {
-                  createTemporaryQueue(dest, RoutingType.MULTICAST, queueName, 
coreFilterString, response);
+               if (CompositeAddress.isFullyQualified(dest.getAddress())) {
+                  queueName = createFQQNSubscription(dest, coreFilterString, 
response);
                } else {
-                  if (!response.isExists() || 
!response.getQueueNames().contains(AutoCreateUtil.getCoreQueueName(session, 
dest.getSimpleAddress()))) {
-                     if (response.isAutoCreateQueues()) {
-                        try {
-                           createQueue(dest, RoutingType.MULTICAST, 
dest.getSimpleAddress(), null, true, true, response);
-                        } catch (ActiveMQQueueExistsException e) {
-                           // The queue was created by another client/admin 
between the query check and send create queue packet
-                        }
-                     } else {
-                        throw new InvalidDestinationException("Destination " + 
dest.getName() + " does not exist");
-                     }
-                  }
-                  queueName = 
CompositeAddress.extractQueueName(dest.getSimpleAddress());
+                  queueName = new SimpleString(UUID.randomUUID().toString());
+                  createTemporaryQueue(dest, RoutingType.MULTICAST, queueName, 
coreFilterString, response);
                }
 
-               consumer = createClientConsumer(dest, queueName, null);
+               consumer = createClientConsumer(dest, queueName, 
coreFilterString);
                autoDeleteQueueName = queueName;
             } else {
                // Durable sub
@@ -928,6 +917,38 @@ public class ActiveMQSession implements QueueSession, 
TopicSession {
       }
    }
 
+   // This method is for the actual queue creation on the Multicast queue / 
subscription
+   private SimpleString createFQQNSubscription(ActiveMQDestination dest,
+                                        SimpleString coreFilterString,
+                                        AddressQuery response) throws 
ActiveMQException, JMSException {
+      SimpleString queueName;
+      queueName = CompositeAddress.extractQueueName(dest.getSimpleAddress());
+      if (!response.isExists() || 
!response.getQueueNames().contains(AutoCreateUtil.getCoreQueueName(session, 
dest.getSimpleAddress()))) {
+         if (response.isAutoCreateQueues()) {
+            try {
+               createQueue(dest, RoutingType.MULTICAST, 
dest.getSimpleAddress(), coreFilterString, true, true, response);
+               return queueName;
+            } catch (ActiveMQQueueExistsException e) {
+               // The queue was created by another client/admin between the 
query check and send create queue packet
+               // on this case we will switch to the regular verification to 
validate the coreFilterString
+            }
+         } else {
+            throw new InvalidDestinationException("Destination " + 
dest.getName() + " does not exist");
+         }
+      }
+
+      QueueQuery queueQuery = session.queueQuery(queueName);
+
+      if (!queueQuery.isExists()) {
+         throw new InvalidDestinationException("Destination " + queueName + " 
does not exist");
+      }
+
+      if (coreFilterString != null && queueQuery.getFilterString() != null && 
!coreFilterString.equals(queueQuery.getFilterString())) {
+         throw new JMSException(queueName + " filter mismatch [" + 
coreFilterString + "] is different than existing filter [" + 
queueQuery.getFilterString() + "]");
+      }
+      return queueName;
+   }
+
    private ClientConsumer createClientConsumer(ActiveMQDestination 
destination, SimpleString queueName, SimpleString coreFilterString) throws 
ActiveMQException {
       QueueAttributes queueAttributes = destination.getQueueAttributes() == 
null ? new QueueAttributes() : destination.getQueueAttributes();
       int priority = queueAttributes.getConsumerPriority() == null ? 
ActiveMQDefaultConfiguration.getDefaultConsumerPriority() : 
queueAttributes.getConsumerPriority();
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index f30c09b3dd..5a1b23ea6e 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -306,11 +306,15 @@ public class AMQPSessionCallback implements 
SessionCallback {
    }
 
    public QueueQueryResult queueQuery(SimpleString queueName, RoutingType 
routingType, boolean autoCreate) throws Exception {
+      return queueQuery(queueName, routingType, autoCreate, null);
+   }
+
+   public QueueQueryResult queueQuery(SimpleString queueName, RoutingType 
routingType, boolean autoCreate, SimpleString filter) throws Exception {
       QueueQueryResult queueQueryResult = 
serverSession.executeQueueQuery(queueName);
 
       if (!queueQueryResult.isExists() && 
queueQueryResult.isAutoCreateQueues() && autoCreate) {
          try {
-            serverSession.createQueue(new 
QueueConfiguration(queueName).setRoutingType(routingType).setAutoCreated(true));
+            serverSession.createQueue(new 
QueueConfiguration(queueName).setRoutingType(routingType).setFilterString(filter).setAutoCreated(true));
          } catch (ActiveMQQueueExistsException e) {
             // The queue may have been created by another thread in the mean 
time.  Catch and do nothing.
          }
@@ -321,7 +325,7 @@ public class AMQPSessionCallback implements SessionCallback 
{
       if (queueQueryResult.isExists() && !queueQueryResult.isAutoCreated()) {
          //if routingType is null we bypass the check
          if (routingType != null && queueQueryResult.getRoutingType() != 
routingType) {
-            throw new IllegalStateException("Incorrect Routing Type for queue, 
expecting: " + routingType);
+            throw new IllegalStateException("Incorrect Routing Type for queue 
" + queueName + ", expecting: " + routingType + " while it had " + 
queueQueryResult.getRoutingType());
          }
       }
 
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index ccdff0fca4..8641fbc843 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -29,6 +29,7 @@ import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import 
org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.api.core.Message;
@@ -1083,11 +1084,15 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
             shared = hasCapabilities(SHARED, source);
             global = hasCapabilities(GLOBAL, source);
 
+            final boolean isFQQN;
+
             //find out if we have an address made up of the address and queue 
name, if yes then set queue name
             if (CompositeAddress.isFullyQualified(source.getAddress())) {
+               isFQQN = true;
                addressToUse = 
SimpleString.toSimpleString(CompositeAddress.extractAddressName(source.getAddress()));
                queueNameToUse = 
SimpleString.toSimpleString(CompositeAddress.extractQueueName(source.getAddress()));
             } else {
+               isFQQN = false;
                addressToUse = SimpleString.toSimpleString(source.getAddress());
             }
             //check to see if the client has defined how we act
@@ -1169,8 +1174,8 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
                   supportedFilters.put(filter.getKey(), filter.getValue());
                }
 
-               queue = getMatchingQueue(queueNameToUse, addressToUse, 
RoutingType.MULTICAST);
                SimpleString simpleStringSelector = 
SimpleString.toSimpleString(selector);
+               queue = getMatchingQueue(queueNameToUse, addressToUse, 
RoutingType.MULTICAST, simpleStringSelector, isFQQN);
 
                //if the address specifies a broker configured queue then we 
always use this, treat it as a queue
                if (queue != null) {
@@ -1234,10 +1239,13 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
                }
             } else {
                if (queueNameToUse != null) {
-                  //a queue consumer can receive from a multicast queue if it 
uses a fully qualified name
-                  //setting routingType to null means do not check the 
routingType against the Queue's routing type.
-                  routingTypeToUse = null;
-                  SimpleString matchingAnycastQueue = 
getMatchingQueue(queueNameToUse, addressToUse, null);
+                  SimpleString matchingAnycastQueue;
+                  QueueQueryResult result = 
sessionSPI.queueQuery(CompositeAddress.toFullyQualified(addressToUse, 
queueNameToUse), null, false, null);
+                  if (result.isExists()) {
+                     // if the queue exists and we're using FQQN then just 
ignore the routing-type
+                     routingTypeToUse = null;
+                  }
+                  matchingAnycastQueue = getMatchingQueue(queueNameToUse, 
addressToUse, routingTypeToUse, null, false);
                   if (matchingAnycastQueue != null) {
                      queue = matchingAnycastQueue;
                   } else {
@@ -1284,15 +1292,19 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
       }
 
 
-      private SimpleString getMatchingQueue(SimpleString queueName, 
SimpleString address, RoutingType routingType) throws Exception {
+      private SimpleString getMatchingQueue(SimpleString queueName, 
SimpleString address, RoutingType routingType, SimpleString filter, boolean 
matchFilter) throws Exception {
          if (queueName != null) {
-            QueueQueryResult result = 
sessionSPI.queueQuery(CompositeAddress.toFullyQualified(address, queueName), 
routingType, true);
+            QueueQueryResult result = 
sessionSPI.queueQuery(CompositeAddress.toFullyQualified(address, queueName), 
routingType, true, filter);
             if (!result.isExists()) {
                throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName 
+ "' does not exist");
             } else {
                if (!result.getAddress().equals(address)) {
                   throw new ActiveMQAMQPNotFoundException("Queue: '" + 
queueName + "' does not exist for address '" + address + "'");
                }
+               if (matchFilter && filter != null && result.getFilterString() 
!= null && !filter.equals(result.getFilterString())) {
+                  throw new ActiveMQIllegalStateException("Queue: " + 
queueName + " filter mismatch [" + filter + "] is different than existing 
filter [" + result.getFilterString() + "]");
+
+               }
                return sessionSPI.getMatchingQueue(address, queueName, 
routingType);
             }
          }
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 4e1934b659..3b3b336476 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1786,7 +1786,9 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
    }
 
    private void clearupOperationContext() {
-      server.getStorageManager().clearContext();
+      if (server != null && server.getStorageManager() != null) {
+         server.getStorageManager().clearContext();
+      }
    }
 
    private Transaction lookupTX(TransactionId txID, AMQSession session) throws 
Exception {
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 8397e5f84e..a98369f48e 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -160,7 +160,7 @@ public class AMQConsumer {
       if (openwireDestination.isTopic()) {
          SimpleString queueName = createTopicSubscription(info.isDurable(), 
info.getClientId(), destinationName.toString(), info.getSubscriptionName(), 
selector, destinationName);
 
-         serverConsumer = session.getCoreSession().createConsumer(nativeId, 
queueName, null, info.getPriority(), info.isBrowser(), false, -1);
+         serverConsumer = session.getCoreSession().createConsumer(nativeId, 
queueName, CompositeAddress.isFullyQualified(destinationName.toString()) ? 
selector : null, info.getPriority(), info.isBrowser(), false, -1);
          serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
          //only advisory topic consumers need this.
          ((ServerConsumerImpl)serverConsumer).setPreAcknowledge(preAck);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java
index 50c092e960..8016ff57b4 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java
@@ -361,6 +361,8 @@ public class AutoCreateJmsDestinationTest extends 
JMSTestBase {
    @Test
    public void testAutoCreateOnReconnect() throws Exception {
       Connection connection = cf.createConnection();
+      runAfter(() -> ((ActiveMQConnectionFactory)cf).close());
+      runAfter(connection::close);
       connection.start();
       Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSFQQNConsumerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSFQQNConsumerTest.java
new file mode 100644
index 0000000000..25444abade
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSFQQNConsumerTest.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.CompositeAddress;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JMSFQQNConsumerTest extends MultiprotocolJMSClientTestSupport {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   @Test
+   public void testFQQNTopicConsumerWithSelectorAMQP() throws Exception {
+      testFQQNTopicConsumerWithSelector("AMQP", true);
+   }
+
+   @Test
+   public void testFQQNTopicConsumerWithSelectorOpenWire() throws Exception {
+      testFQQNTopicConsumerWithSelector("OPENWIRE", false);
+   }
+
+   @Test
+   public void testFQQNTopicConsumerWithSelectorCore() throws Exception {
+      testFQQNTopicConsumerWithSelector("CORE", true);
+   }
+
+   private void testFQQNTopicConsumerWithSelector(String protocol, boolean 
validateFilterChange) throws Exception {
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:5672");
+      final String queue = "queue";
+      final String address = "address";
+      final String filter = "prop='match'";
+      try (Connection c = factory.createConnection()) {
+         c.start();
+
+         Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         Topic t = s.createTopic(CompositeAddress.toFullyQualified(address, 
queue));
+
+         MessageConsumer mc = s.createConsumer(t, filter);
+
+         Wait.assertTrue(() -> server.locateQueue(queue) != null, 2000, 100);
+         org.apache.activemq.artemis.core.server.Queue serverQueue = 
server.locateQueue(SimpleString.toSimpleString(queue));
+
+         Assert.assertEquals(RoutingType.MULTICAST, 
serverQueue.getRoutingType());
+         Assert.assertNotNull(serverQueue.getFilter());
+         Assert.assertEquals(filter, 
serverQueue.getFilter().getFilterString().toString());
+         assertEquals(filter, 
server.locateQueue(queue).getFilter().getFilterString().toString());
+
+         MessageProducer producer = s.createProducer(s.createTopic("address"));
+
+         Message message = s.createTextMessage("hello");
+         message.setStringProperty("prop", "match");
+         producer.send(message);
+
+         Assert.assertNotNull(mc.receive(5000));
+
+         message = s.createTextMessage("hello");
+         message.setStringProperty("prop", "nomatch");
+         producer.send(message);
+
+         if (protocol.equals("OPENWIRE")) {
+            Assert.assertNull(mc.receive(500)); // false negatives in openwire
+         } else {
+            Assert.assertNull(mc.receiveNoWait());
+         }
+      }
+
+      if (validateFilterChange) {
+         boolean thrownException = false;
+         try (Connection c = factory.createConnection()) {
+            Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Topic t = s.createTopic(CompositeAddress.toFullyQualified(address, 
queue));
+            MessageConsumer mc = s.createConsumer(t, 
"shouldThrowException=true");
+         } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+            thrownException = true;
+         }
+         Assert.assertTrue(thrownException);
+
+         // validating the case where I am adding a consumer without a filter
+         // on this case the consumer will have no filter, but the filter on 
the queue should take care of things
+         try (Connection c = factory.createConnection()) {
+            c.start();
+            Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Topic t = s.createTopic(CompositeAddress.toFullyQualified(address, 
queue));
+            MessageConsumer mc = s.createConsumer(t);
+
+            Wait.assertTrue(() -> server.locateQueue(queue) != null, 2000, 
100);
+            org.apache.activemq.artemis.core.server.Queue serverQueue = 
server.locateQueue(SimpleString.toSimpleString(queue));
+
+            Wait.assertEquals(1, () -> serverQueue.getConsumers().size());
+            serverQueue.getConsumers().forEach(serverConsumer -> {
+               Assert.assertNull(serverConsumer.getFilter());
+            });
+
+
+            MessageProducer producer = 
s.createProducer(s.createTopic("address"));
+
+            Message message = s.createTextMessage("hello");
+            message.setStringProperty("prop", "match");
+            producer.send(message);
+
+            Assert.assertNotNull(mc.receive(5000));
+
+            message = s.createTextMessage("hello");
+            message.setStringProperty("prop", "nomatch");
+            producer.send(message);
+
+            if (protocol.equals("OPENWIRE")) {
+               Assert.assertNull(mc.receive(500)); // false negatives in 
openwire
+            } else {
+               Assert.assertNull(mc.receiveNoWait());
+            }
+
+         }
+      }
+   }
+
+
+   @Test
+   public void testFQQNTopicFilterConsumerOnlyAMQP() throws Exception {
+      testFQQNTopicFilterConsumerOnly("AMQP");
+   }
+
+   @Test
+   public void testFQQNTopicFilterConsumerOnlyOPENWIRE() throws Exception {
+      testFQQNTopicFilterConsumerOnly("OPENWIRE");
+   }
+
+   @Test
+   public void testFQQNTopicFilterConsumerOnlyCORE() throws Exception {
+      testFQQNTopicFilterConsumerOnly("CORE");
+   }
+
+   private void testFQQNTopicFilterConsumerOnly(String protocol) throws 
Exception {
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:5672");
+      final String queue = "queue";
+      final String address = "address";
+      final String filter = "prop='match'";
+
+      // predefining the queue without a filter
+      // so consumers will filter out messages
+      server.addAddressInfo(new 
AddressInfo(address).addRoutingType(RoutingType.MULTICAST));
+      server.createQueue(new 
QueueConfiguration().setAddress(address).setName(queue).setRoutingType(RoutingType.MULTICAST));
+
+      try (Connection c = factory.createConnection()) {
+         c.start();
+
+         Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         Topic t = s.createTopic(CompositeAddress.toFullyQualified(address, 
queue));
+
+         MessageConsumer mc = s.createConsumer(t, filter);
+
+         Wait.assertTrue(() -> server.locateQueue(queue) != null, 2000, 100);
+         org.apache.activemq.artemis.core.server.Queue serverQueue = 
server.locateQueue(SimpleString.toSimpleString(queue));
+         Assert.assertEquals(RoutingType.MULTICAST, 
serverQueue.getRoutingType());
+         Assert.assertNull(serverQueue.getFilter()); // it was pre-created 
without a filter, so we will just filter on the consumer
+
+         Wait.assertEquals(1, () -> serverQueue.getConsumers().size());
+         serverQueue.getConsumers().forEach(consumer -> {
+            Assert.assertNotNull(consumer.getFilter());
+            Assert.assertEquals(filter, 
consumer.getFilter().getFilterString().toString());
+         });
+
+         MessageProducer producer = s.createProducer(s.createTopic("address"));
+
+         Message message = s.createTextMessage("hello");
+         message.setStringProperty("prop", "match");
+         producer.send(message);
+
+         Assert.assertNotNull(mc.receive(5000));
+
+         message = s.createTextMessage("hello");
+         message.setStringProperty("prop", "nomatch");
+         producer.send(message);
+
+         if (protocol.equals("OPENWIRE")) {
+            assertNull(mc.receive(100)); // i have had false negatives with 
openwire, hence this
+         } else {
+            assertNull(mc.receiveNoWait());
+         }
+      }
+   }
+
+   @Test
+   public void testFQQNTopicConsumerDontExistAMQP() throws Exception {
+      testFQQNTopicConsumerDontExist("AMQP");
+   }
+
+   /* this commented out code is just to make a point that this test would not 
be valid in openwire.
+      As openwire is calling the method createSubscription from its 1.1 
implementation.
+     Hence there's no need to test this over JMS1.1 with openWire
+   @Test
+   public void testFQQNTopicConsumerDontExistOPENWIRE() throws Exception {
+      testFQQNTopicConsumerDontExist("OPENWIRE");
+   } */
+
+   @Test
+   public void testFQQNTopicConsumerDontExistCORE() throws Exception {
+      testFQQNTopicConsumerDontExist("CORE");
+   }
+
+   private void testFQQNTopicConsumerDontExist(String protocol) throws 
Exception {
+      AddressSettings settings = new 
AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false);
+      server.getAddressSettingsRepository().clear();
+      server.getAddressSettingsRepository().addMatch("#", settings);
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:5672");
+
+      final String queue = "queue";
+      final String address = "address";
+
+      boolean thrownException = false;
+      try (Connection c = factory.createConnection()) {
+         Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic t = s.createTopic(CompositeAddress.toFullyQualified(address, 
queue));
+         MessageConsumer mc = s.createConsumer(t);
+      } catch (Exception e) {
+         logger.debug(e.getMessage(), e);
+         thrownException = true;
+      }
+
+      Assert.assertTrue(thrownException);
+   }
+
+   @Test
+   public void testFQQNQueueConsumerWithSelectorAMQP() throws Exception {
+      testFQQNQueueConsumerWithSelector("AMQP");
+   }
+
+   @Test
+   public void testFQQNQueueConsumerWithSelectorOpenWire() throws Exception {
+      testFQQNQueueConsumerWithSelector("OPENWIRE");
+   }
+
+   @Test
+   public void testFQQNQueueConsumerWithSelectorCore() throws Exception {
+      testFQQNQueueConsumerWithSelector("CORE");
+   }
+
+   private void testFQQNQueueConsumerWithSelector(String protocol) throws 
Exception {
+      AddressSettings settings = new 
AddressSettings().setDefaultQueueRoutingType(RoutingType.ANYCAST).setDefaultAddressRoutingType(RoutingType.ANYCAST);
+      server.getAddressSettingsRepository().addMatch("#", settings);
+
+      final String queue = "myQueue";
+      final String address = "address";
+      final String filter = "prop='match'";
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:5672");
+
+      try (Connection c = factory.createConnection()) {
+         c.start();
+
+         Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         String queueQuery = CompositeAddress.toFullyQualified(address, queue) 
+ (protocol.equals("OPENWIRE") ? "?selectorAware=true" : "");
+
+         Queue q = s.createQueue(queueQuery);
+
+         MessageConsumer mc = s.createConsumer(q, filter);
+
+         Wait.assertTrue(() -> server.locateQueue(queue) != null, 2000, 100);
+         org.apache.activemq.artemis.core.server.Queue serverQueue = 
server.locateQueue(SimpleString.toSimpleString(queue));
+
+         Assert.assertEquals(RoutingType.ANYCAST, 
serverQueue.getRoutingType());
+
+         Assert.assertNull(serverQueue.getFilter());
+
+         MessageProducer p = s.createProducer(q);
+
+         Message m = s.createMessage();
+         m.setStringProperty("prop", "match");
+         p.send(m);
+
+         assertNotNull(mc.receive(1000));
+
+         m = s.createMessage();
+         m.setStringProperty("prop", "no-match");
+         p.send(m);
+
+         if (protocol.equals("OPENWIRE")) {
+            assertNull(mc.receive(100)); // i have had false negatives with 
openwire, hence this
+         } else {
+            assertNull(mc.receiveNoWait());
+         }
+
+         Wait.assertEquals(1, () -> serverQueue.getConsumers().size());
+
+         serverQueue.getConsumers().forEach(queueConsumer -> {
+            Assert.assertNotNull(queueConsumer.getFilter());
+            Assert.assertEquals(filter, 
queueConsumer.getFilter().getFilterString().toString());
+         });
+
+         mc.close();
+
+         Wait.assertEquals(0, () -> serverQueue.getConsumers().size());
+
+
+         String invalidFilter = "notHappening=true";
+
+         mc = s.createConsumer(q, invalidFilter);
+
+         Wait.assertEquals(1, () -> serverQueue.getConsumers().size());
+         serverQueue.getConsumers().forEach(queueConsumer -> {
+            Assert.assertNotNull(queueConsumer.getFilter());
+            Assert.assertEquals(invalidFilter, 
queueConsumer.getFilter().getFilterString().toString());
+         });
+
+      }
+   }
+
+
+
+   @Test
+   public void testFQQNTopicMultiConsumerWithSelectorAMQP() throws Exception {
+      testFQQNTopicMultiConsumerWithSelector("AMQP", true);
+   }
+
+   @Test
+   public void testFQQNTopicMultiConsumerWithSelectorOpenWire() throws 
Exception {
+      testFQQNTopicMultiConsumerWithSelector("OPENWIRE", false);
+   }
+
+   @Test
+   public void testFQQNTopicMultiConsumerWithSelectorCORE() throws Exception {
+      testFQQNTopicMultiConsumerWithSelector("CORE", true);
+   }
+
+   private void testFQQNTopicMultiConsumerWithSelector(String protocol, 
boolean validateFilterChange) throws Exception {
+
+      class RunnableConsumer implements Runnable {
+         int errors = 0;
+         final int expected;
+         final Connection c;
+         final Session session;
+         final Topic topic;
+         final MessageConsumer consumer;
+         final String queueName;
+         final String filter;
+         final CountDownLatch done;
+
+
+         RunnableConsumer(Connection c, String queueName, int expected, String 
filter, CountDownLatch done) throws Exception {
+            this.c = c;
+            this.session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            this.queueName = queueName;
+            this.expected = expected;
+            this.topic = session.createTopic(queueName);
+            this.consumer = session.createConsumer(topic, filter);
+            this.done = done;
+            this.filter = filter;
+         }
+
+         @Override
+         public void run() {
+            try {
+               for (int i = 0; i < expected; i++) {
+                  TextMessage message = (TextMessage) consumer.receive(5000);
+                  logger.debug("Queue {} received message {}", queueName, 
message.getText());
+                  Assert.assertEquals(i, message.getIntProperty("i"));
+                  Assert.assertNotNull(message);
+               }
+               if (protocol.equals("OPENWIRE")) {
+                  Assert.assertNull(consumer.receive(500)); // false negatives 
in openwire
+               } else {
+                  Assert.assertNull(consumer.receiveNoWait());
+               }
+            } catch (Throwable e) {
+               errors++;
+               logger.warn(e.getMessage(), e);
+            } finally {
+               done.countDown();
+            }
+         }
+      }
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:5672");
+      final String address = "address";
+      int threads = 10;
+
+      ExecutorService executor = Executors.newFixedThreadPool(threads);
+      runAfter(executor::shutdownNow);
+      try (Connection c = factory.createConnection()) {
+         c.start();
+
+         CountDownLatch doneLatch = new CountDownLatch(threads);
+
+         RunnableConsumer[] consumers = new RunnableConsumer[threads];
+         for (int i = 0; i < threads; i++) {
+            consumers[i] = new RunnableConsumer(c, address + "::" + "queue" + 
i, i * 10, "i < " + (i * 10), doneLatch);
+         }
+
+         Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer p = s.createProducer(s.createTopic(address));
+
+         for (int i = 0; i < threads * 10; i++) {
+            Message message = s.createTextMessage("i=" + i);
+            message.setIntProperty("i", i);
+            p.send(message);
+         }
+
+         for (RunnableConsumer consumer : consumers) {
+            executor.execute(consumer);
+         }
+
+         Assert.assertTrue(doneLatch.await(10, TimeUnit.SECONDS));
+
+         for (RunnableConsumer consumer : consumers) {
+            Assert.assertEquals("Error on consumer for queue " + 
consumer.queueName,  0, consumer.errors);
+         }
+      }
+   }
+
+
+}
\ No newline at end of file


Reply via email to