This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new eb26f67 ARTEMIS-3137 support XPath filters
new 8fdb0d1 This closes #3466
eb26f67 is described below
commit eb26f67ab6913a490f92c426efa76e35252dceb1
Author: Justin Bertram <[email protected]>
AuthorDate: Tue Feb 23 13:19:23 2021 -0600
ARTEMIS-3137 support XPath filters
Change summary:
- Remove the existing Xalan-based XPath evaluator since Xalan appears
to be no longer maintained.
- Implement a JAXP XPath evaluator (from the ActiveMQ 5.x code-base).
- Pull in the changes from https://issues.apache.org/jira/browse/AMQ-5333
to enable configurable XML parser features.
- Add a method to the base Message interface to make it easier to get
the message body as a string. This relieves the filter from having
to deal with message implementation details.
- Update the Qpid JMS client to get the jms.validateSelector parameter.
---
.../apache/activemq/artemis/api/core/Message.java | 4 +
.../artemis/core/message/impl/CoreMessage.java | 18 +++
.../protocol/amqp/broker/AMQPStandardMessage.java | 11 ++
artemis-selector/pom.xml | 10 --
.../selector/filter/JAXPXPathEvaluator.java | 69 +++++++++
.../artemis/selector/filter/XPathExpression.java | 54 +++++--
.../selector/filter/XalanXPathEvaluator.java | 84 -----------
.../artemis/core/filter/impl/FilterImpl.java | 14 +-
docs/user-manual/en/filter-expressions.md | 52 ++++++-
pom.xml | 2 +-
tests/integration-tests/pom.xml | 8 +
.../integration/amqp/JMSXPathSelectorTest.java | 162 +++++++++++++++++++++
.../tests/integration/client/ConsumerTest.java | 26 ++++
.../artemis/tests/integration/stomp/StompTest.java | 15 ++
14 files changed, 411 insertions(+), 118 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 3d65ab7..e20a660 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -761,4 +761,8 @@ public interface Message {
Object getOwner();
void setOwner(Object object);
+
+ default String getStringBody() {
+ return null;
+ }
}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 8ae9543..ee5eb60 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -1271,4 +1271,22 @@ public class CoreMessage extends RefCountMessage
implements ICoreMessage {
public void setOwner(Object object) {
this.owner = object;
}
+
+ @Override
+ public String getStringBody() {
+ String body = null;
+
+ if (type == TEXT_TYPE) {
+ try {
+ SimpleString simpleBody =
getDataBuffer().readNullableSimpleString();
+ if (simpleBody != null) {
+ body = simpleBody.toString();
+ }
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+
+ return body;
+ }
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
index 80358aa..7e490f3 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
@@ -31,6 +31,7 @@ import
org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Footer;
@@ -323,5 +324,15 @@ public class AMQPStandardMessage extends AMQPMessage {
ensureScanning();
return super.toString();
}
+
+ @Override
+ public String getStringBody() {
+ final Section body = getBody();
+ if (body instanceof AmqpValue && ((AmqpValue) body).getValue()
instanceof String) {
+ return (String) ((AmqpValue) body).getValue();
+ } else {
+ return null;
+ }
+ }
}
diff --git a/artemis-selector/pom.xml b/artemis-selector/pom.xml
index 96bb1ce..ca28c77 100644
--- a/artemis-selector/pom.xml
+++ b/artemis-selector/pom.xml
@@ -42,16 +42,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>xml-apis</groupId>
- <artifactId>xml-apis</artifactId>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>xalan</groupId>
- <artifactId>xalan</artifactId>
- <optional>true</optional>
- </dependency>
- <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git
a/artemis-selector/src/main/java/org/apache/activemq/artemis/selector/filter/JAXPXPathEvaluator.java
b/artemis-selector/src/main/java/org/apache/activemq/artemis/selector/filter/JAXPXPathEvaluator.java
new file mode 100644
index 0000000..90349d1
--- /dev/null
+++
b/artemis-selector/src/main/java/org/apache/activemq/artemis/selector/filter/JAXPXPathEvaluator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.selector.filter;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathFactory;
+import java.io.StringReader;
+
+import org.xml.sax.InputSource;
+
+public class JAXPXPathEvaluator implements XPathExpression.XPathEvaluator {
+
+ // this is not thread-safe
https://docs.oracle.com/javase/8/docs/api/javax/xml/xpath/XPathFactory.html
+ private static final XPathFactory FACTORY = XPathFactory.newInstance();
+
+ private final String xpathExpression;
+ private final XPath xpath;
+ private final DocumentBuilder builder;
+
+ public JAXPXPathEvaluator(String xpathExpression, DocumentBuilder builder) {
+ this.xpathExpression = xpathExpression;
+ this.builder = builder;
+ synchronized (FACTORY) {
+ this.xpath = FACTORY.newXPath();
+ }
+ }
+
+ @Override
+ public boolean evaluate(Filterable m) throws FilterException {
+ String stringBody = m.getBodyAs(String.class);
+ if (stringBody != null) {
+ return evaluate(stringBody);
+ }
+ return false;
+ }
+
+ protected boolean evaluate(String text) {
+ return evaluate(new InputSource(new StringReader(text)));
+ }
+
+ protected boolean evaluate(InputSource inputSource) {
+ try {
+ return ((Boolean)xpath.evaluate(xpathExpression,
builder.parse(inputSource), XPathConstants.BOOLEAN)).booleanValue();
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return xpathExpression;
+ }
+}
diff --git
a/artemis-selector/src/main/java/org/apache/activemq/artemis/selector/filter/XPathExpression.java
b/artemis-selector/src/main/java/org/apache/activemq/artemis/selector/filter/XPathExpression.java
index 4f0f8e6..d0fe836 100755
---
a/artemis-selector/src/main/java/org/apache/activemq/artemis/selector/filter/XPathExpression.java
+++
b/artemis-selector/src/main/java/org/apache/activemq/artemis/selector/filter/XPathExpression.java
@@ -16,37 +16,55 @@
*/
package org.apache.activemq.artemis.selector.filter;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import java.util.Map;
+import java.util.Properties;
+
/**
* Used to evaluate an XPath Expression in a JMS selector.
*/
public final class XPathExpression implements BooleanExpression {
+ private final String xpath;
+ private final XPathEvaluator evaluator;
+
+ private static DocumentBuilder builder;
+
public static XPathEvaluatorFactory XPATH_EVALUATOR_FACTORY = null;
+ public static final String DOCUMENT_BUILDER_FACTORY_FEATURE_PREFIX =
"org.apache.activemq.documentBuilderFactory.feature:";
static {
- // Install the xalan xpath evaluator if it available.
- new XalanXPathEvaluator("//root").evaluate("<root></root>");
+ DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ factory.setNamespaceAware(true);
+ factory.setIgnoringElementContentWhitespace(true);
+ factory.setIgnoringComments(true);
+
try {
- XPATH_EVALUATOR_FACTORY = new XPathExpression.XPathEvaluatorFactory()
{
- @Override
- public XPathExpression.XPathEvaluator create(String xpath) {
- return new XalanXPathEvaluator(xpath);
- }
- };
+
factory.setFeature("http://xml.org/sax/features/external-general-entities",
false);
+
factory.setFeature("http://xml.org/sax/features/external-parameter-entities",
false);
+
factory.setFeature("http://apache.org/xml/features/disallow-doctype-decl",
true);
+
+ // setup features from system properties (if any)
+ setupFeatures(factory);
+ builder = factory.newDocumentBuilder();
+ } catch (ParserConfigurationException e) {
+ throw new RuntimeException(e);
+ }
+
+ new JAXPXPathEvaluator("//root", builder).evaluate("<root></root>");
+ try {
+ XPATH_EVALUATOR_FACTORY = xpath -> new JAXPXPathEvaluator(xpath,
builder);
} catch (Throwable e) {
}
}
- private final String xpath;
- private final XPathEvaluator evaluator;
-
public interface XPathEvaluatorFactory {
-
XPathEvaluator create(String xpath);
}
public interface XPathEvaluator {
-
boolean evaluate(Filterable message) throws FilterException;
}
@@ -79,4 +97,14 @@ public final class XPathExpression implements
BooleanExpression {
return object == Boolean.TRUE;
}
+ protected static void setupFeatures(DocumentBuilderFactory factory) throws
ParserConfigurationException {
+ Properties properties = System.getProperties();
+ for (Map.Entry<Object, Object> prop : properties.entrySet()) {
+ String key = (String) prop.getKey();
+ if (key.startsWith(DOCUMENT_BUILDER_FACTORY_FEATURE_PREFIX)) {
+ Boolean value = Boolean.valueOf((String)prop.getValue());
+
factory.setFeature(key.substring(DOCUMENT_BUILDER_FACTORY_FEATURE_PREFIX.length()),
value);
+ }
+ }
+ }
}
diff --git
a/artemis-selector/src/main/java/org/apache/activemq/artemis/selector/filter/XalanXPathEvaluator.java
b/artemis-selector/src/main/java/org/apache/activemq/artemis/selector/filter/XalanXPathEvaluator.java
deleted file mode 100644
index 3a1714c..0000000
---
a/artemis-selector/src/main/java/org/apache/activemq/artemis/selector/filter/XalanXPathEvaluator.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.selector.filter;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import java.io.StringReader;
-
-import org.apache.xpath.CachedXPathAPI;
-import org.apache.xpath.objects.XObject;
-import org.w3c.dom.Document;
-import org.w3c.dom.traversal.NodeIterator;
-import org.xml.sax.InputSource;
-
-public class XalanXPathEvaluator implements XPathExpression.XPathEvaluator {
-
- private final String xpath;
-
- public XalanXPathEvaluator(String xpath) {
- this.xpath = xpath;
- }
-
- @Override
- public boolean evaluate(Filterable m) throws FilterException {
- String stringBody = m.getBodyAs(String.class);
- if (stringBody != null) {
- return evaluate(stringBody);
- }
- return false;
- }
-
- protected boolean evaluate(String text) {
- return evaluate(new InputSource(new StringReader(text)));
- }
-
- protected boolean evaluate(InputSource inputSource) {
- try {
- DocumentBuilder dbuilder = createDocumentBuilder();
- Document doc = dbuilder.parse(inputSource);
-
- //An XPath expression could return a true or false value instead of a
node.
- //eval() is a better way to determine the boolean value of the exp.
- //For compliance with legacy behavior where selecting an empty node
returns true,
- //selectNodeIterator is attempted in case of a failure.
-
- CachedXPathAPI cachedXPathAPI = new CachedXPathAPI();
- XObject result = cachedXPathAPI.eval(doc, xpath);
- if (result.bool())
- return true;
- else {
- NodeIterator iterator = cachedXPathAPI.selectNodeIterator(doc,
xpath);
- return (iterator.nextNode() != null);
- }
- } catch (Throwable e) {
- return false;
- }
- }
-
- private DocumentBuilder createDocumentBuilder() throws
ParserConfigurationException {
- DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
- factory.setNamespaceAware(true);
-
-
factory.setFeature("http://xml.org/sax/features/external-general-entities",
false);
-
factory.setFeature("http://xml.org/sax/features/external-parameter-entities",
false);
-
factory.setFeature("http://apache.org/xml/features/disallow-doctype-decl",
true);
-
- return factory.newDocumentBuilder();
- }
-}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
index 288a91c..2a8aa8c 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.filter.impl;
import java.util.Map;
+
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.FilterConstants;
import org.apache.activemq.artemis.api.core.Message;
@@ -29,9 +30,9 @@ import
org.apache.activemq.artemis.selector.filter.BooleanExpression;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.filter.Filterable;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
+import org.apache.activemq.artemis.utils.ByteUtil;
import static
org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID;
-import org.apache.activemq.artemis.utils.ByteUtil;
/**
* This class implements an ActiveMQ Artemis filter
@@ -251,8 +252,15 @@ public class FilterImpl implements Filter {
@Override
public <T> T getBodyAs(Class<T> type) throws FilterException {
- // TODO: implement to support content based selection
- return null;
+ T body = null;
+ if (!message.isLargeMessage() && type == String.class) {
+ try {
+ body = type.cast(message.getStringBody());
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ return body;
}
@Override
diff --git a/docs/user-manual/en/filter-expressions.md
b/docs/user-manual/en/filter-expressions.md
index 40eb225..13b3185 100644
--- a/docs/user-manual/en/filter-expressions.md
+++ b/docs/user-manual/en/filter-expressions.md
@@ -62,12 +62,50 @@ previous example to be `convert_string_expressions:age >
18`, then it would
match the aforementioned message.
The JMS spec also states that property identifiers (and therefore the
-identifiers which are valid for use in a filter expression) are an,
-"unlimited-length sequence of letters and digits, the first of which must be
-a letter. A letter is any character for which the method
-`Character.isJavaLetter` returns `true`. This includes `_` and `$`. A letter
-or digit is any character for which the method `Character.isJavaLetterOrDigit`
-returns `true`." This constraint means that hyphens (i.e. `-`) cannot be used.
+identifiers which are valid for use in a filter expression) are an:
+
+> unlimited-length sequence of letters and digits, the first of which must be
+> a letter. A letter is any character for which the method
+> `Character.isJavaLetter` returns `true`. This includes `_` and `$`. A letter
+> or digit is any character for which the method
`Character.isJavaLetterOrDigit`
+> returns `true`.
+
+This constraint means that hyphens (i.e. `-`) cannot be used.
However, this constraint can be overcome by using the `hyphenated_props:`
prefix. For example, if a message had the `foo-bar` property set to `0` then
-the filter expression `hyphenated_props:foo-bar = 0` would match it.
\ No newline at end of file
+the filter expression `hyphenated_props:foo-bar = 0` would match it.
+
+## XPath
+
+Apache ActiveMQ Artemis also supports special
[XPath](https://en.wikipedia.org/wiki/XPath)
+filters which operate on the *body* of a message. The body must be XML. To
+use an XPath filter use this syntax:
+```
+XPATH '<xpath-expression>'
+```
+
+XPath filters are supported with and between producers and consumers using
+the following protocols:
+
+ - OpenWire JMS
+ - Core (and Core JMS)
+ - STOMP
+ - AMQP
+
+Since XPath applies to the body of the message and requires parsing of XML
+**it may be significantly slower** than normal filters.
+
+Large messages are **not** supported.
+
+The XML parser used for XPath is configured with these default "features":
+
+ - `http://xml.org/sax/features/external-general-entities`: `false`
+ - `http://xml.org/sax/features/external-parameter-entities`: `false`
+ - `http://apache.org/xml/features/disallow-doctype-decl`: `true`
+
+However, in order to deal with any implementation-specific issues the features
+can be customized by using system properties starting with the
+`org.apache.activemq.documentBuilderFactory.feature:` prefix, e.g.:
+```
+-Dorg.apache.activemq.documentBuilderFactory.feature:http://xml.org/sax/features/external-general-entities=true
+```
diff --git a/pom.xml b/pom.xml
index f77badb..8134726 100644
--- a/pom.xml
+++ b/pom.xml
@@ -110,7 +110,7 @@
<proton.version>0.33.8</proton.version>
<resteasy.version>3.15.0.Final</resteasy.version>
<slf4j.version>1.7.21</slf4j.version>
- <qpid.jms.version>0.55.0</qpid.jms.version>
+ <qpid.jms.version>0.56.0</qpid.jms.version>
<johnzon.version>0.9.5</johnzon.version>
<json-p.spec.version>1.0-alpha-1</json-p.spec.version>
<javax.inject.version>1</javax.inject.version>
diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml
index 6f496ad..0eb1041 100644
--- a/tests/integration-tests/pom.xml
+++ b/tests/integration-tests/pom.xml
@@ -424,6 +424,14 @@
<version>${vertx.version}</version>
<scope>test</scope>
</dependency>
+
+ <!-- need this for org.apache.activemq.filter.XalanXPathEvaluator for
Xpath selectors with OpenWire JMS clients-->
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-broker</artifactId>
+ <version>${activemq5-version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSXPathSelectorTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSXPathSelectorTest.java
new file mode 100644
index 0000000..b8455d2
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSXPathSelectorTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.net.URI;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.junit.Test;
+
+public class JMSXPathSelectorTest extends JMSClientTestSupport {
+
+ private static final String NORMAL_QUEUE_NAME = "NORMAL";
+
+ private ConnectionSupplier AMQPConnection = () -> createConnection();
+ private ConnectionSupplier CoreConnection = () -> createCoreConnection();
+ private ConnectionSupplier OpenWireConnection = () ->
createOpenWireConnection();
+
+ @Override
+ protected String getConfiguredProtocols() {
+ return "AMQP,OPENWIRE,CORE";
+ }
+
+ @Override
+ protected URI getBrokerQpidJMSConnectionURI() {
+ try {
+ return new URI(getBrokerQpidJMSConnectionString() +
"?jms.validateSelector=false");
+ } catch (Exception e) {
+ throw new RuntimeException();
+ }
+ }
+
+ @Override
+ protected void addConfiguration(ActiveMQServer server) {
+ server.getConfiguration().setPersistenceEnabled(false);
+ server.getAddressSettingsRepository().addMatch(NORMAL_QUEUE_NAME, new
AddressSettings());
+ }
+
+ @Override
+ protected void createAddressAndQueues(ActiveMQServer server) throws
Exception {
+ super.createAddressAndQueues(server);
+
+ //Add Standard Queue
+ server.addAddressInfo(new
AddressInfo(SimpleString.toSimpleString(NORMAL_QUEUE_NAME),
RoutingType.ANYCAST));
+ server.createQueue(new
QueueConfiguration(NORMAL_QUEUE_NAME).setRoutingType(RoutingType.ANYCAST));
+ }
+
+ @Test
+ public void testJMSSelectorsAMQPProducerAMQPConsumer() throws Exception {
+ testJMSSelectors(AMQPConnection, AMQPConnection);
+ }
+
+ @Test
+ public void testJMSSelectorsCoreProducerCoreConsumer() throws Exception {
+ testJMSSelectors(CoreConnection, CoreConnection);
+ }
+
+ @Test
+ public void testJMSSelectorsCoreProducerAMQPConsumer() throws Exception {
+ testJMSSelectors(CoreConnection, AMQPConnection);
+ }
+
+ @Test
+ public void testJMSSelectorsAMQPProducerCoreConsumer() throws Exception {
+ testJMSSelectors(AMQPConnection, CoreConnection);
+ }
+
+ @Test
+ public void testJMSSelectorsOpenWireProducerOpenWireConsumer() throws
Exception {
+ testJMSSelectors(OpenWireConnection, OpenWireConnection);
+ }
+
+ @Test
+ public void testJMSSelectorsCoreProducerOpenWireConsumer() throws Exception
{
+ testJMSSelectors(CoreConnection, OpenWireConnection);
+ }
+
+ @Test
+ public void testJMSSelectorsOpenWireProducerCoreConsumer() throws Exception
{
+ testJMSSelectors(OpenWireConnection, CoreConnection);
+ }
+
+ @Test
+ public void testJMSSelectorsAMQPProducerOpenWireConsumer() throws Exception
{
+ testJMSSelectors(AMQPConnection, OpenWireConnection);
+ }
+
+ @Test
+ public void testJMSSelectorsOpenWireProducerAMQPConsumer() throws Exception
{
+ testJMSSelectors(OpenWireConnection, AMQPConnection);
+ }
+
+ public void testJMSSelectors(ConnectionSupplier producerConnectionSupplier,
ConnectionSupplier consumerConnectionSupplier) throws Exception {
+ testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier,
NORMAL_QUEUE_NAME, "<root><a key='first' num='1'/><b key='second'
num='2'>b</b></root>", "<root><b key='first' num='1'/><c key='second'
num='2'>c</c></root>", "XPATH 'root/a'");
+ testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier,
NORMAL_QUEUE_NAME, "<root><a key='first' num='1'/><b key='second'
num='2'>b</b></root>", "<root><z key='first' num='1'/></root>", "XPATH
'root/a'");
+ testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier,
NORMAL_QUEUE_NAME, "<root><a key='first' num='1'/><b key='second'
num='2'>b</b></root>", "foo", "XPATH 'root/a'");
+ }
+
+ public void testJMSSelector(ConnectionSupplier producerConnectionSupplier,
ConnectionSupplier consumerConnectionSupplier, String queueName, String
goodBody, String badBody, String selector) throws Exception {
+ testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier,
queueName, goodBody, badBody, selector, Message.DEFAULT_DELIVERY_MODE,
Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+ }
+
+ public void testJMSSelector(ConnectionSupplier producerConnectionSupplier,
ConnectionSupplier consumerConnectionSupplier, String queueName, String
goodBody, String badBody, String selector, int deliveryMode, int priority, long
timeToLive) throws Exception {
+ sendMessage(producerConnectionSupplier, queueName, goodBody, badBody,
deliveryMode, priority, timeToLive);
+ receive(consumerConnectionSupplier, queueName, goodBody, selector);
+ }
+
+ private void receive(ConnectionSupplier consumerConnectionSupplier, String
queueName, String body, String selector) throws JMSException {
+ try (Connection consumerConnection =
consumerConnectionSupplier.createConnection()) {
+ Session consumerSession = consumerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue consumerQueue = consumerSession.createQueue(queueName);
+ MessageConsumer consumer =
consumerSession.createConsumer(consumerQueue, selector);
+ TextMessage msg = (TextMessage) consumer.receive(1000);
+ assertNotNull(msg);
+ assertEquals(body, msg.getText());
+ assertNull(consumer.receiveNoWait());
+ consumer.close();
+ }
+ }
+
+ private void sendMessage(ConnectionSupplier producerConnectionSupplier,
String queueName, String goodBody, String badBody, int deliveryMode, int
priority, long timeToLive) throws JMSException {
+ try (Connection producerConnection =
producerConnectionSupplier.createConnection()) {
+ Session producerSession = producerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue1 = producerSession.createQueue(queueName);
+ MessageProducer p = producerSession.createProducer(null);
+
+ TextMessage message1 = producerSession.createTextMessage();
+ message1.setText(badBody);
+ p.send(queue1, message1);
+
+ TextMessage message2 = producerSession.createTextMessage();
+ message2.setText(goodBody);
+ p.send(queue1, message2, deliveryMode, priority, timeToLive);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
index 12559da..482eeab 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
@@ -1275,4 +1275,30 @@ public class ConsumerTest extends ActiveMQTestBase {
Assert.assertEquals("The consumer " + i + " must receive all the
messages sent.", messages * runs, receivedMessages.get(i));
}
}
+
+ @Test
+ public void testConsumerXpathSelector() throws Exception {
+ final SimpleString BODY = SimpleString.toSimpleString("<root><a
key='first' num='1'/><b key='second' num='2'>b</b></root>");
+ ClientSessionFactory sf = createSessionFactory(locator);
+
+ ClientSession session = sf.createSession(false, true, false, true);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ ClientMessage message = session.createMessage(false);
+
message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString("wrong"));
+ producer.send(message);
+ message = session.createMessage(false);
+ message.getBodyBuffer().writeNullableSimpleString(BODY);
+ producer.send(message);
+
+ ClientConsumer consumer = session.createConsumer(QUEUE.toString(),
"XPATH 'root/a'");
+ session.start();
+ ClientMessage message2 = consumer.receive(1000);
+
+ Assert.assertEquals(BODY,
message2.getBodyBuffer().readNullableSimpleString());
+ Assert.assertEquals(1, getMessageCount(((Queue)
server.getPostOffice().getBinding(QUEUE).getBindable())));
+
+ session.close();
+ }
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index e4f2492..24fc087 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -844,6 +844,21 @@ public class StompTest extends StompTestBase {
}
@Test
+ public void testSubscribeWithAutoAckAndXpathSelector() throws Exception {
+ conn.connect(defUser, defPass);
+ subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, null,
"XPATH 'root/a'");
+
+ sendJmsMessage("<root><b key='first' num='1'/><c key='second'
num='2'>c</c></root>");
+ sendJmsMessage("<root><a key='first' num='1'/><b key='second'
num='2'>b</b></root>");
+
+ ClientStompFrame frame = conn.receiveFrame(10000);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ Assert.assertTrue("Should have received the real message but got: " +
frame, frame.getBody().equals("<root><a key='first' num='1'/><b key='second'
num='2'>b</b></root>"));
+
+ conn.disconnect();
+ }
+
+ @Test
public void testSubscribeWithClientAck() throws Exception {
conn.connect(defUser, defPass);
subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.CLIENT);