This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push: new cea9ff6667 ARTEMIS-4259 JMS consumer + FQQN + selector not working cea9ff6667 is described below commit cea9ff6667cfec360750ffb036fe3ac51e2ffce6 Author: Clebert Suconic <clebertsuco...@apache.org> AuthorDate: Tue Jun 6 15:42:02 2023 -0400 ARTEMIS-4259 JMS consumer + FQQN + selector not working co-authored with Justin Bertram --- .../artemis/jms/client/ActiveMQSession.java | 53 ++- .../protocol/amqp/broker/AMQPSessionCallback.java | 8 +- .../amqp/proton/ProtonServerSenderContext.java | 26 +- .../core/protocol/openwire/OpenWireConnection.java | 4 +- .../core/protocol/openwire/amq/AMQConsumer.java | 2 +- .../client/AutoCreateJmsDestinationTest.java | 2 + .../jms/multiprotocol/JMSFQQNConsumerTest.java | 456 +++++++++++++++++++++ 7 files changed, 524 insertions(+), 27 deletions(-) diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index 2a726fa1fb..b8aa15c481 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -840,26 +840,15 @@ public class ActiveMQSession implements QueueSession, TopicSession { throw new RuntimeException("Subscription name cannot be null for durable topic consumer"); // Non durable sub - queueName = new SimpleString(UUID.randomUUID().toString()); - if (!CompositeAddress.isFullyQualified(dest.getAddress())) { - createTemporaryQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, response); + if (CompositeAddress.isFullyQualified(dest.getAddress())) { + queueName = createFQQNSubscription(dest, coreFilterString, response); } else { - if (!response.isExists() || !response.getQueueNames().contains(AutoCreateUtil.getCoreQueueName(session, dest.getSimpleAddress()))) { - if (response.isAutoCreateQueues()) { - try { - createQueue(dest, RoutingType.MULTICAST, dest.getSimpleAddress(), null, true, true, response); - } catch (ActiveMQQueueExistsException e) { - // The queue was created by another client/admin between the query check and send create queue packet - } - } else { - throw new InvalidDestinationException("Destination " + dest.getName() + " does not exist"); - } - } - queueName = CompositeAddress.extractQueueName(dest.getSimpleAddress()); + queueName = new SimpleString(UUID.randomUUID().toString()); + createTemporaryQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, response); } - consumer = createClientConsumer(dest, queueName, null); + consumer = createClientConsumer(dest, queueName, coreFilterString); autoDeleteQueueName = queueName; } else { // Durable sub @@ -928,6 +917,38 @@ public class ActiveMQSession implements QueueSession, TopicSession { } } + // This method is for the actual queue creation on the Multicast queue / subscription + private SimpleString createFQQNSubscription(ActiveMQDestination dest, + SimpleString coreFilterString, + AddressQuery response) throws ActiveMQException, JMSException { + SimpleString queueName; + queueName = CompositeAddress.extractQueueName(dest.getSimpleAddress()); + if (!response.isExists() || !response.getQueueNames().contains(AutoCreateUtil.getCoreQueueName(session, dest.getSimpleAddress()))) { + if (response.isAutoCreateQueues()) { + try { + createQueue(dest, RoutingType.MULTICAST, dest.getSimpleAddress(), coreFilterString, true, true, response); + return queueName; + } catch (ActiveMQQueueExistsException e) { + // The queue was created by another client/admin between the query check and send create queue packet + // on this case we will switch to the regular verification to validate the coreFilterString + } + } else { + throw new InvalidDestinationException("Destination " + dest.getName() + " does not exist"); + } + } + + QueueQuery queueQuery = session.queueQuery(queueName); + + if (!queueQuery.isExists()) { + throw new InvalidDestinationException("Destination " + queueName + " does not exist"); + } + + if (coreFilterString != null && queueQuery.getFilterString() != null && !coreFilterString.equals(queueQuery.getFilterString())) { + throw new JMSException(queueName + " filter mismatch [" + coreFilterString + "] is different than existing filter [" + queueQuery.getFilterString() + "]"); + } + return queueName; + } + private ClientConsumer createClientConsumer(ActiveMQDestination destination, SimpleString queueName, SimpleString coreFilterString) throws ActiveMQException { QueueAttributes queueAttributes = destination.getQueueAttributes() == null ? new QueueAttributes() : destination.getQueueAttributes(); int priority = queueAttributes.getConsumerPriority() == null ? ActiveMQDefaultConfiguration.getDefaultConsumerPriority() : queueAttributes.getConsumerPriority(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index f30c09b3dd..5a1b23ea6e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -306,11 +306,15 @@ public class AMQPSessionCallback implements SessionCallback { } public QueueQueryResult queueQuery(SimpleString queueName, RoutingType routingType, boolean autoCreate) throws Exception { + return queueQuery(queueName, routingType, autoCreate, null); + } + + public QueueQueryResult queueQuery(SimpleString queueName, RoutingType routingType, boolean autoCreate, SimpleString filter) throws Exception { QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(queueName); if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateQueues() && autoCreate) { try { - serverSession.createQueue(new QueueConfiguration(queueName).setRoutingType(routingType).setAutoCreated(true)); + serverSession.createQueue(new QueueConfiguration(queueName).setRoutingType(routingType).setFilterString(filter).setAutoCreated(true)); } catch (ActiveMQQueueExistsException e) { // The queue may have been created by another thread in the mean time. Catch and do nothing. } @@ -321,7 +325,7 @@ public class AMQPSessionCallback implements SessionCallback { if (queueQueryResult.isExists() && !queueQueryResult.isAutoCreated()) { //if routingType is null we bypass the check if (routingType != null && queueQueryResult.getRoutingType() != routingType) { - throw new IllegalStateException("Incorrect Routing Type for queue, expecting: " + routingType); + throw new IllegalStateException("Incorrect Routing Type for queue " + queueName + ", expecting: " + routingType + " while it had " + queueQueryResult.getRoutingType()); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index ccdff0fca4..8641fbc843 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -29,6 +29,7 @@ import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.Message; @@ -1083,11 +1084,15 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr shared = hasCapabilities(SHARED, source); global = hasCapabilities(GLOBAL, source); + final boolean isFQQN; + //find out if we have an address made up of the address and queue name, if yes then set queue name if (CompositeAddress.isFullyQualified(source.getAddress())) { + isFQQN = true; addressToUse = SimpleString.toSimpleString(CompositeAddress.extractAddressName(source.getAddress())); queueNameToUse = SimpleString.toSimpleString(CompositeAddress.extractQueueName(source.getAddress())); } else { + isFQQN = false; addressToUse = SimpleString.toSimpleString(source.getAddress()); } //check to see if the client has defined how we act @@ -1169,8 +1174,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr supportedFilters.put(filter.getKey(), filter.getValue()); } - queue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.MULTICAST); SimpleString simpleStringSelector = SimpleString.toSimpleString(selector); + queue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.MULTICAST, simpleStringSelector, isFQQN); //if the address specifies a broker configured queue then we always use this, treat it as a queue if (queue != null) { @@ -1234,10 +1239,13 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } else { if (queueNameToUse != null) { - //a queue consumer can receive from a multicast queue if it uses a fully qualified name - //setting routingType to null means do not check the routingType against the Queue's routing type. - routingTypeToUse = null; - SimpleString matchingAnycastQueue = getMatchingQueue(queueNameToUse, addressToUse, null); + SimpleString matchingAnycastQueue; + QueueQueryResult result = sessionSPI.queueQuery(CompositeAddress.toFullyQualified(addressToUse, queueNameToUse), null, false, null); + if (result.isExists()) { + // if the queue exists and we're using FQQN then just ignore the routing-type + routingTypeToUse = null; + } + matchingAnycastQueue = getMatchingQueue(queueNameToUse, addressToUse, routingTypeToUse, null, false); if (matchingAnycastQueue != null) { queue = matchingAnycastQueue; } else { @@ -1284,15 +1292,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } - private SimpleString getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType) throws Exception { + private SimpleString getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType, SimpleString filter, boolean matchFilter) throws Exception { if (queueName != null) { - QueueQueryResult result = sessionSPI.queueQuery(CompositeAddress.toFullyQualified(address, queueName), routingType, true); + QueueQueryResult result = sessionSPI.queueQuery(CompositeAddress.toFullyQualified(address, queueName), routingType, true, filter); if (!result.isExists()) { throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist"); } else { if (!result.getAddress().equals(address)) { throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist for address '" + address + "'"); } + if (matchFilter && filter != null && result.getFilterString() != null && !filter.equals(result.getFilterString())) { + throw new ActiveMQIllegalStateException("Queue: " + queueName + " filter mismatch [" + filter + "] is different than existing filter [" + result.getFilterString() + "]"); + + } return sessionSPI.getMatchingQueue(address, queueName, routingType); } } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 4e1934b659..3b3b336476 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -1786,7 +1786,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } private void clearupOperationContext() { - server.getStorageManager().clearContext(); + if (server != null && server.getStorageManager() != null) { + server.getStorageManager().clearContext(); + } } private Transaction lookupTX(TransactionId txID, AMQSession session) throws Exception { diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 8397e5f84e..a98369f48e 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -160,7 +160,7 @@ public class AMQConsumer { if (openwireDestination.isTopic()) { SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), destinationName.toString(), info.getSubscriptionName(), selector, destinationName); - serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.getPriority(), info.isBrowser(), false, -1); + serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, CompositeAddress.isFullyQualified(destinationName.toString()) ? selector : null, info.getPriority(), info.isBrowser(), false, -1); serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener); //only advisory topic consumers need this. ((ServerConsumerImpl)serverConsumer).setPreAcknowledge(preAck); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java index 50c092e960..8016ff57b4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java @@ -361,6 +361,8 @@ public class AutoCreateJmsDestinationTest extends JMSTestBase { @Test public void testAutoCreateOnReconnect() throws Exception { Connection connection = cf.createConnection(); + runAfter(() -> ((ActiveMQConnectionFactory)cf).close()); + runAfter(connection::close); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSFQQNConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSFQQNConsumerTest.java new file mode 100644 index 0000000000..25444abade --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSFQQNConsumerTest.java @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.jms.multiprotocol; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import java.lang.invoke.MethodHandles; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.CompositeAddress; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JMSFQQNConsumerTest extends MultiprotocolJMSClientTestSupport { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Test + public void testFQQNTopicConsumerWithSelectorAMQP() throws Exception { + testFQQNTopicConsumerWithSelector("AMQP", true); + } + + @Test + public void testFQQNTopicConsumerWithSelectorOpenWire() throws Exception { + testFQQNTopicConsumerWithSelector("OPENWIRE", false); + } + + @Test + public void testFQQNTopicConsumerWithSelectorCore() throws Exception { + testFQQNTopicConsumerWithSelector("CORE", true); + } + + private void testFQQNTopicConsumerWithSelector(String protocol, boolean validateFilterChange) throws Exception { + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:5672"); + final String queue = "queue"; + final String address = "address"; + final String filter = "prop='match'"; + try (Connection c = factory.createConnection()) { + c.start(); + + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Topic t = s.createTopic(CompositeAddress.toFullyQualified(address, queue)); + + MessageConsumer mc = s.createConsumer(t, filter); + + Wait.assertTrue(() -> server.locateQueue(queue) != null, 2000, 100); + org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(SimpleString.toSimpleString(queue)); + + Assert.assertEquals(RoutingType.MULTICAST, serverQueue.getRoutingType()); + Assert.assertNotNull(serverQueue.getFilter()); + Assert.assertEquals(filter, serverQueue.getFilter().getFilterString().toString()); + assertEquals(filter, server.locateQueue(queue).getFilter().getFilterString().toString()); + + MessageProducer producer = s.createProducer(s.createTopic("address")); + + Message message = s.createTextMessage("hello"); + message.setStringProperty("prop", "match"); + producer.send(message); + + Assert.assertNotNull(mc.receive(5000)); + + message = s.createTextMessage("hello"); + message.setStringProperty("prop", "nomatch"); + producer.send(message); + + if (protocol.equals("OPENWIRE")) { + Assert.assertNull(mc.receive(500)); // false negatives in openwire + } else { + Assert.assertNull(mc.receiveNoWait()); + } + } + + if (validateFilterChange) { + boolean thrownException = false; + try (Connection c = factory.createConnection()) { + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic t = s.createTopic(CompositeAddress.toFullyQualified(address, queue)); + MessageConsumer mc = s.createConsumer(t, "shouldThrowException=true"); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + thrownException = true; + } + Assert.assertTrue(thrownException); + + // validating the case where I am adding a consumer without a filter + // on this case the consumer will have no filter, but the filter on the queue should take care of things + try (Connection c = factory.createConnection()) { + c.start(); + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic t = s.createTopic(CompositeAddress.toFullyQualified(address, queue)); + MessageConsumer mc = s.createConsumer(t); + + Wait.assertTrue(() -> server.locateQueue(queue) != null, 2000, 100); + org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(SimpleString.toSimpleString(queue)); + + Wait.assertEquals(1, () -> serverQueue.getConsumers().size()); + serverQueue.getConsumers().forEach(serverConsumer -> { + Assert.assertNull(serverConsumer.getFilter()); + }); + + + MessageProducer producer = s.createProducer(s.createTopic("address")); + + Message message = s.createTextMessage("hello"); + message.setStringProperty("prop", "match"); + producer.send(message); + + Assert.assertNotNull(mc.receive(5000)); + + message = s.createTextMessage("hello"); + message.setStringProperty("prop", "nomatch"); + producer.send(message); + + if (protocol.equals("OPENWIRE")) { + Assert.assertNull(mc.receive(500)); // false negatives in openwire + } else { + Assert.assertNull(mc.receiveNoWait()); + } + + } + } + } + + + @Test + public void testFQQNTopicFilterConsumerOnlyAMQP() throws Exception { + testFQQNTopicFilterConsumerOnly("AMQP"); + } + + @Test + public void testFQQNTopicFilterConsumerOnlyOPENWIRE() throws Exception { + testFQQNTopicFilterConsumerOnly("OPENWIRE"); + } + + @Test + public void testFQQNTopicFilterConsumerOnlyCORE() throws Exception { + testFQQNTopicFilterConsumerOnly("CORE"); + } + + private void testFQQNTopicFilterConsumerOnly(String protocol) throws Exception { + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:5672"); + final String queue = "queue"; + final String address = "address"; + final String filter = "prop='match'"; + + // predefining the queue without a filter + // so consumers will filter out messages + server.addAddressInfo(new AddressInfo(address).addRoutingType(RoutingType.MULTICAST)); + server.createQueue(new QueueConfiguration().setAddress(address).setName(queue).setRoutingType(RoutingType.MULTICAST)); + + try (Connection c = factory.createConnection()) { + c.start(); + + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Topic t = s.createTopic(CompositeAddress.toFullyQualified(address, queue)); + + MessageConsumer mc = s.createConsumer(t, filter); + + Wait.assertTrue(() -> server.locateQueue(queue) != null, 2000, 100); + org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(SimpleString.toSimpleString(queue)); + Assert.assertEquals(RoutingType.MULTICAST, serverQueue.getRoutingType()); + Assert.assertNull(serverQueue.getFilter()); // it was pre-created without a filter, so we will just filter on the consumer + + Wait.assertEquals(1, () -> serverQueue.getConsumers().size()); + serverQueue.getConsumers().forEach(consumer -> { + Assert.assertNotNull(consumer.getFilter()); + Assert.assertEquals(filter, consumer.getFilter().getFilterString().toString()); + }); + + MessageProducer producer = s.createProducer(s.createTopic("address")); + + Message message = s.createTextMessage("hello"); + message.setStringProperty("prop", "match"); + producer.send(message); + + Assert.assertNotNull(mc.receive(5000)); + + message = s.createTextMessage("hello"); + message.setStringProperty("prop", "nomatch"); + producer.send(message); + + if (protocol.equals("OPENWIRE")) { + assertNull(mc.receive(100)); // i have had false negatives with openwire, hence this + } else { + assertNull(mc.receiveNoWait()); + } + } + } + + @Test + public void testFQQNTopicConsumerDontExistAMQP() throws Exception { + testFQQNTopicConsumerDontExist("AMQP"); + } + + /* this commented out code is just to make a point that this test would not be valid in openwire. + As openwire is calling the method createSubscription from its 1.1 implementation. + Hence there's no need to test this over JMS1.1 with openWire + @Test + public void testFQQNTopicConsumerDontExistOPENWIRE() throws Exception { + testFQQNTopicConsumerDontExist("OPENWIRE"); + } */ + + @Test + public void testFQQNTopicConsumerDontExistCORE() throws Exception { + testFQQNTopicConsumerDontExist("CORE"); + } + + private void testFQQNTopicConsumerDontExist(String protocol) throws Exception { + AddressSettings settings = new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false); + server.getAddressSettingsRepository().clear(); + server.getAddressSettingsRepository().addMatch("#", settings); + + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:5672"); + + final String queue = "queue"; + final String address = "address"; + + boolean thrownException = false; + try (Connection c = factory.createConnection()) { + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic t = s.createTopic(CompositeAddress.toFullyQualified(address, queue)); + MessageConsumer mc = s.createConsumer(t); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + thrownException = true; + } + + Assert.assertTrue(thrownException); + } + + @Test + public void testFQQNQueueConsumerWithSelectorAMQP() throws Exception { + testFQQNQueueConsumerWithSelector("AMQP"); + } + + @Test + public void testFQQNQueueConsumerWithSelectorOpenWire() throws Exception { + testFQQNQueueConsumerWithSelector("OPENWIRE"); + } + + @Test + public void testFQQNQueueConsumerWithSelectorCore() throws Exception { + testFQQNQueueConsumerWithSelector("CORE"); + } + + private void testFQQNQueueConsumerWithSelector(String protocol) throws Exception { + AddressSettings settings = new AddressSettings().setDefaultQueueRoutingType(RoutingType.ANYCAST).setDefaultAddressRoutingType(RoutingType.ANYCAST); + server.getAddressSettingsRepository().addMatch("#", settings); + + final String queue = "myQueue"; + final String address = "address"; + final String filter = "prop='match'"; + + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:5672"); + + try (Connection c = factory.createConnection()) { + c.start(); + + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String queueQuery = CompositeAddress.toFullyQualified(address, queue) + (protocol.equals("OPENWIRE") ? "?selectorAware=true" : ""); + + Queue q = s.createQueue(queueQuery); + + MessageConsumer mc = s.createConsumer(q, filter); + + Wait.assertTrue(() -> server.locateQueue(queue) != null, 2000, 100); + org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(SimpleString.toSimpleString(queue)); + + Assert.assertEquals(RoutingType.ANYCAST, serverQueue.getRoutingType()); + + Assert.assertNull(serverQueue.getFilter()); + + MessageProducer p = s.createProducer(q); + + Message m = s.createMessage(); + m.setStringProperty("prop", "match"); + p.send(m); + + assertNotNull(mc.receive(1000)); + + m = s.createMessage(); + m.setStringProperty("prop", "no-match"); + p.send(m); + + if (protocol.equals("OPENWIRE")) { + assertNull(mc.receive(100)); // i have had false negatives with openwire, hence this + } else { + assertNull(mc.receiveNoWait()); + } + + Wait.assertEquals(1, () -> serverQueue.getConsumers().size()); + + serverQueue.getConsumers().forEach(queueConsumer -> { + Assert.assertNotNull(queueConsumer.getFilter()); + Assert.assertEquals(filter, queueConsumer.getFilter().getFilterString().toString()); + }); + + mc.close(); + + Wait.assertEquals(0, () -> serverQueue.getConsumers().size()); + + + String invalidFilter = "notHappening=true"; + + mc = s.createConsumer(q, invalidFilter); + + Wait.assertEquals(1, () -> serverQueue.getConsumers().size()); + serverQueue.getConsumers().forEach(queueConsumer -> { + Assert.assertNotNull(queueConsumer.getFilter()); + Assert.assertEquals(invalidFilter, queueConsumer.getFilter().getFilterString().toString()); + }); + + } + } + + + + @Test + public void testFQQNTopicMultiConsumerWithSelectorAMQP() throws Exception { + testFQQNTopicMultiConsumerWithSelector("AMQP", true); + } + + @Test + public void testFQQNTopicMultiConsumerWithSelectorOpenWire() throws Exception { + testFQQNTopicMultiConsumerWithSelector("OPENWIRE", false); + } + + @Test + public void testFQQNTopicMultiConsumerWithSelectorCORE() throws Exception { + testFQQNTopicMultiConsumerWithSelector("CORE", true); + } + + private void testFQQNTopicMultiConsumerWithSelector(String protocol, boolean validateFilterChange) throws Exception { + + class RunnableConsumer implements Runnable { + int errors = 0; + final int expected; + final Connection c; + final Session session; + final Topic topic; + final MessageConsumer consumer; + final String queueName; + final String filter; + final CountDownLatch done; + + + RunnableConsumer(Connection c, String queueName, int expected, String filter, CountDownLatch done) throws Exception { + this.c = c; + this.session = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + this.queueName = queueName; + this.expected = expected; + this.topic = session.createTopic(queueName); + this.consumer = session.createConsumer(topic, filter); + this.done = done; + this.filter = filter; + } + + @Override + public void run() { + try { + for (int i = 0; i < expected; i++) { + TextMessage message = (TextMessage) consumer.receive(5000); + logger.debug("Queue {} received message {}", queueName, message.getText()); + Assert.assertEquals(i, message.getIntProperty("i")); + Assert.assertNotNull(message); + } + if (protocol.equals("OPENWIRE")) { + Assert.assertNull(consumer.receive(500)); // false negatives in openwire + } else { + Assert.assertNull(consumer.receiveNoWait()); + } + } catch (Throwable e) { + errors++; + logger.warn(e.getMessage(), e); + } finally { + done.countDown(); + } + } + } + + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:5672"); + final String address = "address"; + int threads = 10; + + ExecutorService executor = Executors.newFixedThreadPool(threads); + runAfter(executor::shutdownNow); + try (Connection c = factory.createConnection()) { + c.start(); + + CountDownLatch doneLatch = new CountDownLatch(threads); + + RunnableConsumer[] consumers = new RunnableConsumer[threads]; + for (int i = 0; i < threads; i++) { + consumers[i] = new RunnableConsumer(c, address + "::" + "queue" + i, i * 10, "i < " + (i * 10), doneLatch); + } + + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer p = s.createProducer(s.createTopic(address)); + + for (int i = 0; i < threads * 10; i++) { + Message message = s.createTextMessage("i=" + i); + message.setIntProperty("i", i); + p.send(message); + } + + for (RunnableConsumer consumer : consumers) { + executor.execute(consumer); + } + + Assert.assertTrue(doneLatch.await(10, TimeUnit.SECONDS)); + + for (RunnableConsumer consumer : consumers) { + Assert.assertEquals("Error on consumer for queue " + consumer.queueName, 0, consumer.errors); + } + } + } + + +} \ No newline at end of file