ARTEMIS-2151 JMS Selectors broken in some cases Create Test Case Fix OpenWire so selectors are translated Fix GroupID to call groupId method
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/faa6ffa3 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/faa6ffa3 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/faa6ffa3 Branch: refs/heads/master Commit: faa6ffa3b4fbaef4e57f9dfc53db0a9fb6843b88 Parents: ad7e7c7 Author: Michael André Pearce <michael.andre.pea...@me.com> Authored: Mon Oct 29 05:46:55 2018 +0000 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Wed Oct 31 11:55:22 2018 -0400 ---------------------------------------------------------------------- .../artemis/utils/SelectorTranslator.java | 1 + .../artemis/api/core/FilterConstants.java | 5 + .../core/protocol/openwire/amq/AMQConsumer.java | 3 +- .../artemis/core/filter/impl/FilterImpl.java | 2 + .../tests/integration/amqp/JMSSelectorTest.java | 162 +++++++++++++++++++ 5 files changed, 172 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/faa6ffa3/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SelectorTranslator.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SelectorTranslator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SelectorTranslator.java index 637cdff..dd391a9 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SelectorTranslator.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SelectorTranslator.java @@ -48,6 +48,7 @@ public class SelectorTranslator { filterString = SelectorTranslator.parse(filterString, "JMSTimestamp", "AMQTimestamp"); filterString = SelectorTranslator.parse(filterString, "JMSMessageID", "AMQUserID"); filterString = SelectorTranslator.parse(filterString, "JMSExpiration", "AMQExpiration"); + filterString = SelectorTranslator.parse(filterString, "JMSXGroupID", "AMQGroupID"); return filterString; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/faa6ffa3/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java index 37b221c..3803a54 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java @@ -68,6 +68,11 @@ public final class FilterConstants { public static final SimpleString ACTIVEMQ_ADDRESS = new SimpleString("AMQAddress"); /** + * Name of the ActiveMQ Artemis Message group id header. + */ + public static final SimpleString ACTIVEMQ_GROUP_ID = new SimpleString("AMQGroupID"); + + /** * All ActiveMQ Artemis headers are prepended by this prefix. */ public static final SimpleString ACTIVEMQ_PREFIX = new SimpleString("AMQ"); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/faa6ffa3/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index e522f37..45d9fa1 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -46,6 +46,7 @@ import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.reader.MessageUtil; +import org.apache.activemq.artemis.utils.SelectorTranslator; import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; @@ -117,7 +118,7 @@ public class AMQConsumer { public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception { - SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector()); + SimpleString selector = info.getSelector() == null ? null : new SimpleString(SelectorTranslator.convertToActiveMQFilterString(info.getSelector())); boolean preAck = false; if (info.isNoLocal()) { if (!AdvisorySupport.isAdvisoryTopic(openwireDestination)) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/faa6ffa3/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java index 7ee7b6b..560e53a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java @@ -176,6 +176,8 @@ public class FilterImpl implements Filter { return msg.getEncodeSize(); } else if (FilterConstants.ACTIVEMQ_ADDRESS.equals(fieldName)) { return msg.getAddress(); + } else if (FilterConstants.ACTIVEMQ_GROUP_ID.equals(fieldName)) { + return msg.getGroupID(); } else { return null; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/faa6ffa3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSelectorTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSelectorTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSelectorTest.java new file mode 100644 index 0000000..c61898f --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSelectorTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.junit.Test; + +public class JMSSelectorTest extends JMSClientTestSupport { + + private static final String NORMAL_QUEUE_NAME = "NORMAL"; + + private ConnectionSupplier AMQPConnection = () -> createConnection(); + private ConnectionSupplier CoreConnection = () -> createCoreConnection(); + private ConnectionSupplier OpenWireConnection = () -> createOpenWireConnection(); + + @Override + protected String getConfiguredProtocols() { + return "AMQP,OPENWIRE,CORE"; + } + + @Override + protected void addConfiguration(ActiveMQServer server) { + server.getConfiguration().setPersistenceEnabled(false); + server.getAddressSettingsRepository().addMatch(NORMAL_QUEUE_NAME, new AddressSettings()); + } + + @Override + protected void createAddressAndQueues(ActiveMQServer server) throws Exception { + super.createAddressAndQueues(server); + + //Add Standard Queue + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(NORMAL_QUEUE_NAME), RoutingType.ANYCAST)); + server.createQueue(SimpleString.toSimpleString(NORMAL_QUEUE_NAME), RoutingType.ANYCAST, SimpleString.toSimpleString(NORMAL_QUEUE_NAME), null, true, false, -1, false, true); + } + + @Test + public void testJMSSelectorsAMQPProducerAMQPConsumer() throws Exception { + testJMSSelectors(AMQPConnection, AMQPConnection); + } + + @Test + public void testJMSSelectorsCoreProducerCoreConsumer() throws Exception { + testJMSSelectors(CoreConnection, CoreConnection); + } + + @Test + public void testJMSSelectorsCoreProducerAMQPConsumer() throws Exception { + testJMSSelectors(CoreConnection, AMQPConnection); + } + + @Test + public void testJMSSelectorsAMQPProducerCoreConsumer() throws Exception { + testJMSSelectors(AMQPConnection, CoreConnection); + } + + @Test + public void testJMSSelectorsOpenWireProducerOpenWireConsumer() throws Exception { + testJMSSelectors(OpenWireConnection, OpenWireConnection); + } + + @Test + public void testJMSSelectorsCoreProducerOpenWireConsumer() throws Exception { + testJMSSelectors(CoreConnection, OpenWireConnection); + } + + @Test + public void testJMSSelectorsOpenWireProducerCoreConsumer() throws Exception { + testJMSSelectors(OpenWireConnection, CoreConnection); + } + + @Test + public void testJMSSelectorsAMQPProducerOpenWireConsumer() throws Exception { + testJMSSelectors(AMQPConnection, OpenWireConnection); + } + + @Test + public void testJMSSelectorsOpenWireProducerAMQPConsumer() throws Exception { + testJMSSelectors(OpenWireConnection, AMQPConnection); + } + + public void testJMSSelectors(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception { + testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, NORMAL_QUEUE_NAME, message -> message.setStringProperty("color", "blue"), "color = 'blue'"); + testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, NORMAL_QUEUE_NAME, message -> message.setJMSCorrelationID("correlation"), "JMSCorrelationID = 'correlation'"); + testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, NORMAL_QUEUE_NAME, null, "JMSPriority = 1", Message.DEFAULT_DELIVERY_MODE, 1, Message.DEFAULT_TIME_TO_LIVE); + testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, NORMAL_QUEUE_NAME, message -> message.setStringProperty("JMSXGroupID", "groupA"), "JMSXGroupID = 'groupA'"); + } + + public void testJMSSelector(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier, String queueName, MessageSetter setValue, String selector) throws Exception { + testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, queueName, setValue, selector, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + } + + public void testJMSSelector(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier, String queueName, MessageSetter setValue, String selector, int deliveryMode, int priority, long timeToLive) throws Exception { + + sendMessage(producerConnectionSupplier, queueName, setValue, deliveryMode, priority, timeToLive); + + receiveLVQ(consumerConnectionSupplier, queueName, selector); + } + + private void receiveLVQ(ConnectionSupplier consumerConnectionSupplier, String queueName, String selector) throws JMSException { + try (Connection consumerConnection = consumerConnectionSupplier.createConnection()) { + + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerQueue = consumerSession.createQueue(queueName); + MessageConsumer consumer = consumerSession.createConsumer(consumerQueue, selector); + TextMessage msg = (TextMessage) consumer.receive(1000); + assertNotNull(msg); + assertEquals("how are you", msg.getText()); + assertNull(consumer.receive(1000)); + consumer.close(); + } + } + + private void sendMessage(ConnectionSupplier producerConnectionSupplier, String queueName, MessageSetter setValue, int deliveryMode, int priority, long timeToLive) throws JMSException { + try (Connection producerConnection = producerConnectionSupplier.createConnection()) { + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue1 = producerSession.createQueue(queueName); + MessageProducer p = producerSession.createProducer(null); + + TextMessage message1 = producerSession.createTextMessage(); + message1.setText("hello"); + p.send(queue1, message1); + + TextMessage message2 = producerSession.createTextMessage(); + if (setValue != null) { + setValue.accept(message2); + } + message2.setText("how are you"); + p.send(queue1, message2, deliveryMode, priority, timeToLive); + } + } + + public interface MessageSetter { + + void accept(javax.jms.Message message) throws JMSException; + } +} \ No newline at end of file