This is an automated email from the ASF dual-hosted git repository.

michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cc8485  AMQNET-617: Support Message Selectors
     new c70c7fd  Merge pull request #39 from 
HavretGC/AMQNET-617/support_consumer_selectors
9cc8485 is described below

commit 9cc8485befd11992f88f4492073947019a91dff4
Author: Havret <h4v...@gmail.com>
AuthorDate: Fri Sep 27 15:45:55 2019 +0200

    AMQNET-617: Support Message Selectors
---
 README.md                                          |  2 +-
 src/NMS.AMQP/Meta/ConsumerInfo.cs                  |  2 +-
 src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs         |  8 +-
 .../Provider/Amqp/Filters/AmqpNmsSelectorType.cs   | 32 ++++++++
 .../NmsMessageConsumerTest.cs                      | 87 ++++++++++++++++++++++
 5 files changed, 123 insertions(+), 8 deletions(-)

diff --git a/README.md b/README.md
index 6862b99..4d947ad 100644
--- a/README.md
+++ b/README.md
@@ -62,7 +62,7 @@ So in general most of the top level classes that implement 
the Apache.NMS interf
 | IMessageProducer | Y * | Anonymous producers are only supported on 
connections with the ANONYMOUS-RELAY capability. |
 | MsgDeliveryMode.Persistent | Y | Producers will block on send until an 
outcome is received or will timeout after waiting the RequestTimeout timespan 
amount. Exceptions may be throw depending on the outcome or if the producer 
times out. |
 | MsgDeliveryMode.NonPersistent | Y | Producers will not block on send nor 
expect to receive an outcome. Should an exception be raised from the outcome 
the exception will be delivered using the the connection ExceptionListener. |
-| IMessageConsumer | Y * | Message Selectors and noLocal filter are not 
supported. |
+| IMessageConsumer | Y * | NoLocal filter is not supported. |
 | Durable Consumers | Y | |
 | IQueueBrowser | N | The provider will throw NotImplementedException for the 
ISession create methods. |
 | Configurable NMSMessageID and amqp serializtion | N | For future 
consideration. The prodiver will generate a MessageID from a sequence and 
serialize it as a string. |
diff --git a/src/NMS.AMQP/Meta/ConsumerInfo.cs 
b/src/NMS.AMQP/Meta/ConsumerInfo.cs
index c6042bc..c75093b 100644
--- a/src/NMS.AMQP/Meta/ConsumerInfo.cs
+++ b/src/NMS.AMQP/Meta/ConsumerInfo.cs
@@ -42,7 +42,7 @@ namespace Apache.NMS.AMQP.Meta
         public string SubscriptionName { get; internal set; } = null;
 
         public bool NoLocal { get; internal set; } = false;
-        public bool HasSelector => !string.IsNullOrEmpty(Selector);
+        public bool HasSelector => !string.IsNullOrWhiteSpace(Selector);
         public bool IsDurable { get; set; }
         public bool IsBrowser { get; set; }
         public bool LocalMessageExpiry { get; set; }
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs 
b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
index 1481248..a238ace 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
@@ -25,6 +25,7 @@ using Amqp.Transactions;
 using Amqp.Types;
 using Apache.NMS.AMQP.Message;
 using Apache.NMS.AMQP.Meta;
+using Apache.NMS.AMQP.Provider.Amqp.Filters;
 using Apache.NMS.AMQP.Provider.Amqp.Message;
 using Apache.NMS.AMQP.Util;
 
@@ -164,14 +165,9 @@ namespace Apache.NMS.AMQP.Provider.Amqp
                 filters.Add(SymbolUtil.ATTACH_FILTER_NO_LOCAL, 
"NoLocalFilter{}");
             }
 
-            // Selector
-            // qpid jms defines a selector filter as an amqp described type 
-            //      AmqpJmsSelectorType where
-            //          Descriptor = 0x0000468C00000004UL
-            //          Described = "<selector_string>" (type string)
             if (info.HasSelector)
             {
-                filters.Add(SymbolUtil.ATTACH_FILTER_SELECTOR, info.Selector);
+                filters.Add(SymbolUtil.ATTACH_FILTER_SELECTOR, new 
AmqpNmsSelectorType(info.Selector));
             }
 
             // Assign filters
diff --git a/src/NMS.AMQP/Provider/Amqp/Filters/AmqpNmsSelectorType.cs 
b/src/NMS.AMQP/Provider/Amqp/Filters/AmqpNmsSelectorType.cs
new file mode 100644
index 0000000..fb7835d
--- /dev/null
+++ b/src/NMS.AMQP/Provider/Amqp/Filters/AmqpNmsSelectorType.cs
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Amqp.Types;
+
+namespace Apache.NMS.AMQP.Provider.Amqp.Filters
+{
+    public class AmqpNmsSelectorType : DescribedValue
+    {
+        private const ulong DESCRIPTOR = 0x0000468C00000004L;
+
+        public AmqpNmsSelectorType(string selector) : base(DESCRIPTOR, 
selector)
+        {
+        }
+
+        public override string ToString() => $"AmqpNmsSelectorType{{ 
{this.Value} }}";
+    }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs 
b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
new file mode 100644
index 0000000..462b4d7
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test
+{
+    [TestFixture]
+    public class NmsMessageConsumerTest : AmqpTestSupport
+    {
+        [Test, Timeout(60_000)]
+        public void TestSelectors()
+        {
+            PurgeQueue(TimeSpan.FromMilliseconds(500));
+            
+            Connection = CreateAmqpConnection();
+            Connection.Start();
+
+            ISession session = 
Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            IQueue queue = session.GetQueue(TestName);
+            IMessageProducer producer = session.CreateProducer(queue);
+
+            ITextMessage message = session.CreateTextMessage("Hello");
+            producer.Send(message, MsgDeliveryMode.Persistent, 
MsgPriority.Normal, TimeSpan.Zero);
+
+            string text = "Hello + 9";
+            message = session.CreateTextMessage(text);
+            producer.Send(message, MsgDeliveryMode.Persistent, 
MsgPriority.Highest, TimeSpan.Zero);
+            
+            producer.Close();
+
+            IMessageConsumer messageConsumer = session.CreateConsumer(queue, 
"JMSPriority > 8");
+            IMessage msg = messageConsumer.Receive(TimeSpan.FromSeconds(5));
+            Assert.NotNull(msg, "No message was received");
+            Assert.IsInstanceOf<ITextMessage>(msg);
+            Assert.AreEqual(text, ((ITextMessage) msg).Text);
+            Assert.IsNull(messageConsumer.Receive(TimeSpan.FromSeconds(1)));
+        }
+
+        [Test, Timeout(60_000)]
+        public void TestSelectorsWithJMSType()
+        {
+            PurgeQueue(TimeSpan.FromMilliseconds(500));
+            
+            Connection = CreateAmqpConnection();
+            Connection.Start();
+
+            ISession session = 
Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            IQueue queue = session.GetQueue(TestName);
+            IMessageProducer producer = session.CreateProducer(queue);
+
+            ITextMessage message1 = session.CreateTextMessage("text");
+            producer.Send(message1, MsgDeliveryMode.Persistent, 
MsgPriority.Normal, TimeSpan.Zero);
+
+            string type = "myJMSType";
+            string text = "text" + type;
+            ITextMessage message2 = session.CreateTextMessage(text);
+            message2.NMSType = type;
+            producer.Send(message2, MsgDeliveryMode.Persistent, 
MsgPriority.Highest, TimeSpan.Zero);
+            
+            producer.Close();
+
+            IMessageConsumer messageConsumer = session.CreateConsumer(queue, 
$"JMSType = '{type}'");
+            IMessage msg = messageConsumer.Receive(TimeSpan.FromSeconds(5));
+            Assert.NotNull(msg, "No message was received");
+            Assert.IsInstanceOf<ITextMessage>(msg);
+            Assert.AreEqual(text, ((ITextMessage) msg).Text);
+            Assert.IsNull(messageConsumer.Receive(TimeSpan.FromSeconds(1)));
+        }
+    }
+}
\ No newline at end of file

Reply via email to