Repository: qpid-broker-j
Updated Branches:
  refs/heads/master dc2e9d425 -> db971ea2b


QPID-8139: [Broker-J][AMQP 1.0] Make sure that selector filter can handle 
JMSMessageID and JMSCorrelationID values with prefixes as defined in AMQP JMS 
mapping specification


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/f1ce8666
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/f1ce8666
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/f1ce8666

Branch: refs/heads/master
Commit: f1ce8666b80aa1ddd30445a6b2ce47db6b6943e9
Parents: dc2e9d4
Author: Alex Rudyy <[email protected]>
Authored: Fri Apr 20 17:46:46 2018 +0100
Committer: Alex Rudyy <[email protected]>
Committed: Thu May 24 17:15:19 2018 +0100

----------------------------------------------------------------------
 .../apache/qpid/server/filter/Filterable.java   |   5 +-
 .../qpid/server/filter/FilterableMessage.java   |   4 +-
 .../filter/JMSMessagePropertyExpression.java    |   8 +-
 .../qpid/server/filter/JMSSelectorFilter.java   |  11 +-
 .../qpid/server/message/AMQMessageHeader.java   |   4 +-
 .../qpid/server/message/MessageInfoImpl.java    |   5 +-
 .../message/internal/InternalMessageHeader.java |   8 +-
 .../server/exchange/HeadersBindingTest.java     |   4 +-
 .../protocol/v0_10/MessageConverter_v0_10.java  |   5 +-
 .../MessageConverter_v0_10_to_Internal.java     |   4 +-
 .../protocol/v0_10/MessageTransferHeader.java   |   2 +-
 .../v0_8/MessageConverter_v0_8_to_Internal.java |   4 +-
 .../v1_0/MessageConverter_v1_0_to_Internal.java |   2 -
 .../protocol/v1_0/MessageMetaData_1_0.java      |   8 +-
 .../protocol/v1_0/SendingLinkEndpoint.java      |   5 +-
 .../v1_0/selector/AmqpMessageIdHelper.java      | 391 +++++++++++++++++++
 .../selector/JMSMessagePropertyExpression.java  | 119 ++++++
 .../JMSMessagePropertyExpressionTest.java       | 345 ++++++++++++++++
 .../qpid/tests/protocol/v1_0/Interaction.java   |   8 +
 .../tests/protocol/v1_0/MessageDecoder.java     |  11 +
 .../JMSMessagePropertiesFilterTest.java         | 356 +++++++++++++++++
 21 files changed, 1273 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java 
b/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
index a15bdc1..589238c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
@@ -20,7 +20,6 @@
 */
 package org.apache.qpid.server.filter;
 
-import org.apache.qpid.server.filter.FilterableMessage;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.ServerMessage;
@@ -104,7 +103,7 @@ public interface Filterable extends FilterableMessage
                 }
 
                 @Override
-                public String getMessageId()
+                public Object getMessageId()
                 {
                     return message.getMessageHeader().getMessageId();
                 }
@@ -116,7 +115,7 @@ public interface Filterable extends FilterableMessage
                 }
 
                 @Override
-                public String getCorrelationId()
+                public Object getCorrelationId()
                 {
                     return message.getMessageHeader().getCorrelationId();
                 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-core/src/main/java/org/apache/qpid/server/filter/FilterableMessage.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/filter/FilterableMessage.java
 
b/broker-core/src/main/java/org/apache/qpid/server/filter/FilterableMessage.java
index d41c33b..05ac9ee 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/filter/FilterableMessage.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/filter/FilterableMessage.java
@@ -31,11 +31,11 @@ public interface FilterableMessage
 
     byte getPriority();
 
-    String getMessageId();
+    Object getMessageId();
 
     long getTimestamp();
 
-    String getCorrelationId();
+    Object getCorrelationId();
 
     long getExpiration();
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-core/src/main/java/org/apache/qpid/server/filter/JMSMessagePropertyExpression.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/filter/JMSMessagePropertyExpression.java
 
b/broker-core/src/main/java/org/apache/qpid/server/filter/JMSMessagePropertyExpression.java
index c94dd0d..b5cf4eb 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/filter/JMSMessagePropertyExpression.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/filter/JMSMessagePropertyExpression.java
@@ -213,9 +213,9 @@ public class JMSMessagePropertyExpression implements 
PropertyExpression<Filterab
         public Object evaluate(FilterableMessage message)
         {
 
-            String messageId = message.getMessageId();
+            Object messageId = message.getMessageId();
 
-            return messageId;
+            return messageId == null ? null : String.valueOf(messageId);
 
         }
     }
@@ -236,9 +236,9 @@ public class JMSMessagePropertyExpression implements 
PropertyExpression<Filterab
         public Object evaluate(FilterableMessage message)
         {
 
-            String correlationId = message.getCorrelationId();
+            Object correlationId = message.getCorrelationId();
 
-            return correlationId;
+            return correlationId == null ? null : 
String.valueOf(correlationId);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
 
b/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
index 3e85861..5ba3da8 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
@@ -39,9 +39,16 @@ public class JMSSelectorFilter implements MessageFilter
 
     public JMSSelectorFilter(String selector) throws ParseException, 
TokenMgrError, SelectorParsingException
     {
+        this(selector, JMSMessagePropertyExpression.FACTORY);
+    }
+
+    public JMSSelectorFilter(String selector,
+                             PropertyExpressionFactory<? extends 
FilterableMessage> propertyExpressionFactory)
+            throws ParseException, TokenMgrError, SelectorParsingException
+    {
         _selector = selector;
-        SelectorParser<FilterableMessage> selectorParser = new 
SelectorParser<>();
-        
selectorParser.setPropertyExpressionFactory(JMSMessagePropertyExpression.FACTORY);
+        SelectorParser selectorParser = new SelectorParser<>();
+        selectorParser.setPropertyExpressionFactory(propertyExpressionFactory);
         _matcher = selectorParser.parse(selector);
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
 
b/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
index c7d28d1..4b159a7 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
@@ -25,7 +25,7 @@ import java.util.Set;
 
 public interface AMQMessageHeader
 {
-    String getCorrelationId();
+    Object getCorrelationId();
 
     long getExpiration();
 
@@ -35,7 +35,7 @@ public interface AMQMessageHeader
 
     String getGroupId();
 
-    String getMessageId();
+    Object getMessageId();
 
     String getMimeType();
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java 
b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java
index 7626dc6..cda04d8 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java
@@ -62,10 +62,11 @@ public class MessageInfoImpl implements MessageInfo
         _arrivalTime = message.getArrivalTime() == 0L ? null : new 
Date(message.getArrivalTime());
         _messageType = message.getMessageType();
         _persistent = message.isPersistent();
-        _messageId = messageHeader.getMessageId();
+        _messageId = messageHeader.getMessageId() == null ? null : 
String.valueOf(messageHeader.getMessageId());
         _expirationTime = messageHeader.getExpiration() == 0L ? null : new 
Date(messageHeader.getExpiration());
         _applicationId = messageHeader.getAppId();
-        _correlationId = messageHeader.getCorrelationId();
+        _correlationId =
+                messageHeader.getCorrelationId() == null ? null : 
String.valueOf(messageHeader.getCorrelationId());
         _encoding = messageHeader.getEncoding();
         _mimeType = messageHeader.getMimeType();
         _priority = messageHeader.getPriority();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
 
b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
index 46f4a2e..0a85e3b 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
@@ -49,11 +49,11 @@ public final class InternalMessageHeader implements 
AMQMessageHeader, Serializab
     private long _arrivalTime;
 
     public InternalMessageHeader(final Map<String, Object> headers,
-                                 final String correlationId,
+                                 final Object correlationId,
                                  final long expiration,
                                  final String userId,
                                  final String appId,
-                                 final String messageId,
+                                 final Object messageId,
                                  final String mimeType,
                                  final String encoding,
                                  final byte priority,
@@ -65,11 +65,11 @@ public final class InternalMessageHeader implements 
AMQMessageHeader, Serializab
     {
         _headers = headers == null ? new LinkedHashMap<>() : new 
LinkedHashMap<>(headers);
 
-        _correlationId = correlationId;
+        _correlationId = correlationId == null ? null : 
String.valueOf(correlationId);
         _expiration = expiration;
         _userId = userId;
         _appId = appId;
-        _messageId = messageId;
+        _messageId = messageId == null ? null : String.valueOf(messageId);
         _mimeType = mimeType;
         _encoding = encoding;
         _priority = priority;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
 
b/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
index 126c07d..05796df 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
@@ -56,7 +56,7 @@ public class HeadersBindingTest extends UnitTestBase
         private final Map<String, Object> _headers = new HashMap<String, 
Object>();
 
         @Override
-        public String getCorrelationId()
+        public Object getCorrelationId()
         {
             return null;
         }
@@ -86,7 +86,7 @@ public class HeadersBindingTest extends UnitTestBase
         }
 
         @Override
-        public String getMessageId()
+        public Object getMessageId()
         {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
index 8c9db87..42afc4f 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
@@ -139,9 +139,10 @@ public class MessageConverter_v0_10 implements 
MessageConverter<ServerMessage, M
         
messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding());
         messageProps.setContentLength(size);
         
messageProps.setContentType(serverMsg.getMessageHeader().getMimeType());
-        if(serverMsg.getMessageHeader().getCorrelationId() != null)
+        if (serverMsg.getMessageHeader().getCorrelationId() != null)
         {
-            
messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes(UTF_8));
+            
messageProps.setCorrelationId(String.valueOf(serverMsg.getMessageHeader().getCorrelationId())
+                                                .getBytes(UTF_8));
         }
 
         Header header = new Header(deliveryProps, messageProps, null);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
index 8121dd6..ce8bf62 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
@@ -157,7 +157,7 @@ public class MessageConverter_v0_10_to_Internal implements 
MessageConverter<Mess
         }
 
         @Override
-        public String getCorrelationId()
+        public Object getCorrelationId()
         {
             return _delegate.getCorrelationId();
         }
@@ -187,7 +187,7 @@ public class MessageConverter_v0_10_to_Internal implements 
MessageConverter<Mess
         }
 
         @Override
-        public String getMessageId()
+        public Object getMessageId()
         {
             return _delegate.getMessageId();
         }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
index 6156781..ff92fd9 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
@@ -110,7 +110,7 @@ class MessageTransferHeader implements AMQMessageHeader
     {
         UUID id = _messageProps == null ? null : _messageProps.getMessageId();
 
-        return id == null ? null : "ID:"+String.valueOf(id);
+        return id == null ? null : "ID:" + String.valueOf(id);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
 
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
index 28efa43..5ad2ebe 100644
--- 
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
+++ 
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
@@ -203,7 +203,7 @@ public class MessageConverter_v0_8_to_Internal implements 
MessageConverter<AMQMe
         }
 
         @Override
-        public String getCorrelationId()
+        public Object getCorrelationId()
         {
             return _delegate.getCorrelationId();
         }
@@ -233,7 +233,7 @@ public class MessageConverter_v0_8_to_Internal implements 
MessageConverter<AMQMe
         }
 
         @Override
-        public String getMessageId()
+        public Object getMessageId()
         {
             return _delegate.getMessageId();
         }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
index 15039d6..c486358 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
@@ -33,8 +33,6 @@ import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import 
org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
-import 
org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotationsSection;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
 @PluggableService
 public class MessageConverter_v1_0_to_Internal implements 
MessageConverter<Message_1_0, InternalMessage>

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
index d1ea39a..9f3d83d 100755
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
@@ -485,7 +485,7 @@ public class MessageMetaData_1_0 implements 
StorableMessageMetaData
         private final AtomicReference<String> _decodedUserId = new 
AtomicReference<>();
 
         @Override
-        public String getCorrelationId()
+        public Object getCorrelationId()
         {
             if (_propertiesSection == null || 
_propertiesSection.getValue().getCorrelationId() == null)
             {
@@ -493,7 +493,7 @@ public class MessageMetaData_1_0 implements 
StorableMessageMetaData
             }
             else
             {
-                return 
_propertiesSection.getValue().getCorrelationId().toString();
+                return _propertiesSection.getValue().getCorrelationId();
             }
         }
 
@@ -505,7 +505,7 @@ public class MessageMetaData_1_0 implements 
StorableMessageMetaData
         }
 
         @Override
-        public String getMessageId()
+        public Object getMessageId()
         {
             if (_propertiesSection == null || 
_propertiesSection.getValue().getMessageId() == null)
             {
@@ -513,7 +513,7 @@ public class MessageMetaData_1_0 implements 
StorableMessageMetaData
             }
             else
             {
-                return _propertiesSection.getValue().getMessageId().toString();
+                return _propertiesSection.getValue().getMessageId();
             }
         }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index a69b96a..0bd9b4b 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -51,6 +51,7 @@ import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.NotFoundException;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.LinkModel;
+import 
org.apache.qpid.server.protocol.v1_0.selector.JMSMessagePropertyExpression;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
 import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
@@ -170,8 +171,8 @@ public class SendingLinkEndpoint extends 
AbstractLinkEndpoint<Source, Target>
                         
org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter 
selectorFilter = 
(org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter) 
entry.getValue();
                         try
                         {
-                            messageFilter = new 
JMSSelectorFilter(selectorFilter.getValue());
-
+                            messageFilter = new 
JMSSelectorFilter(selectorFilter.getValue(),
+                                                                  
JMSMessagePropertyExpression.FACTORY);
                             actualFilters.put(entry.getKey(), 
entry.getValue());
                         }
                         catch (ParseException | SelectorParsingException | 
TokenMgrError e)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/selector/AmqpMessageIdHelper.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/selector/AmqpMessageIdHelper.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/selector/AmqpMessageIdHelper.java
new file mode 100644
index 0000000..75f05ad
--- /dev/null
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/selector/AmqpMessageIdHelper.java
@@ -0,0 +1,391 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.selector;
+
+//
+// Based on like named file from 3c8d67a6eee38934d233f39b2d614ac4c28b39be of 
the Apache Qpid JMS <https://git-wip-us.apache.org/repos/asf/qpid-jms.git>
+//
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
+
+
+/**
+ * Helper class for identifying and converting message-id and correlation-id 
values between
+ * the AMQP types and the Strings values used by JMS.
+ * <p>
+ * <p>AMQP messages allow for 4 types of message-id/correlation-id: 
message-id-string, message-id-binary,
+ * message-id-uuid, or message-id-ulong. In order to accept or return a string 
representation of these
+ * for interoperability with other AMQP clients, the following encoding can be 
used after removing or
+ * before adding the "ID:" prefix used for a JMSMessageID value:<br>
+ * <p>
+ * {@literal "AMQP_BINARY:<hex representation of binary content>"}<br>
+ * {@literal "AMQP_UUID:<string representation of uuid>"}<br>
+ * {@literal "AMQP_ULONG:<string representation of ulong>"}<br>
+ * {@literal "AMQP_STRING:<string>"}<br>
+ * <p>
+ * <p>The AMQP_STRING encoding exists only for escaping message-id-string 
values that happen to begin
+ * with one of the encoding prefixes (including AMQP_STRING itself). It MUST 
NOT be used otherwise.
+ * <p>
+ * <p>When provided a string for conversion which attempts to identify itself 
as an encoded binary, uuid, or
+ * ulong but can't be converted into the indicated format, an exception will 
be thrown.
+ */
+public class AmqpMessageIdHelper
+{
+    public static final AmqpMessageIdHelper INSTANCE = new 
AmqpMessageIdHelper();
+
+    public static final String AMQP_STRING_PREFIX = "AMQP_STRING:";
+    public static final String AMQP_UUID_PREFIX = "AMQP_UUID:";
+    public static final String AMQP_ULONG_PREFIX = "AMQP_ULONG:";
+    public static final String AMQP_BINARY_PREFIX = "AMQP_BINARY:";
+    public static final String AMQP_NO_PREFIX = "AMQP_NO_PREFIX:";
+    public static final String JMS_ID_PREFIX = "ID:";
+
+    private static final String AMQP_PREFIX = "AMQP_";
+    private static final int JMS_ID_PREFIX_LENGTH = JMS_ID_PREFIX.length();
+    private static final int AMQP_UUID_PREFIX_LENGTH = 
AMQP_UUID_PREFIX.length();
+    private static final int AMQP_ULONG_PREFIX_LENGTH = 
AMQP_ULONG_PREFIX.length();
+    private static final int AMQP_STRING_PREFIX_LENGTH = 
AMQP_STRING_PREFIX.length();
+    private static final int AMQP_BINARY_PREFIX_LENGTH = 
AMQP_BINARY_PREFIX.length();
+    private static final int AMQP_NO_PREFIX_LENGTH = AMQP_NO_PREFIX.length();
+    private static final char[] HEX_CHARS = "0123456789ABCDEF".toCharArray();
+
+    /**
+     * Checks whether the given string begins with "ID:" prefix used to denote 
a JMSMessageID
+     *
+     * @param string the string to check
+     * @return true if and only id the string begins with "ID:"
+     */
+    public boolean hasMessageIdPrefix(String string)
+    {
+        if (string == null)
+        {
+            return false;
+        }
+
+        return string.startsWith(JMS_ID_PREFIX);
+    }
+
+    public String toMessageIdString(Object idObject)
+    {
+        if (idObject instanceof String)
+        {
+            final String stringId = (String) idObject;
+
+            boolean hasMessageIdPrefix = hasMessageIdPrefix(stringId);
+            if (!hasMessageIdPrefix)
+            {
+                // For JMSMessageID, has no "ID:" prefix, we need to record
+                // that for later use as a JMSCorrelationID.
+                return JMS_ID_PREFIX + AMQP_NO_PREFIX + stringId;
+            }
+            else if (hasTypeEncodingPrefix(stringId, JMS_ID_PREFIX_LENGTH))
+            {
+                // We are for a JMSMessageID value, but have 'ID:' followed by
+                // one of the encoding prefixes. Need to escape the entire 
string
+                // to preserve for later re-use as a JMSCorrelationID.
+                return JMS_ID_PREFIX + AMQP_STRING_PREFIX + stringId;
+            }
+            else
+            {
+                // It has "ID:" prefix and doesn't have encoding prefix, use 
it as-is.
+                return stringId;
+            }
+        }
+        else
+        {
+            // Not a string, convert it
+            return convertToIdString(idObject);
+        }
+    }
+
+    public String toCorrelationIdString(Object idObject)
+    {
+
+        if (idObject instanceof String)
+        {
+            final String stringId = (String) idObject;
+
+            boolean hasMessageIdPrefix = hasMessageIdPrefix(stringId);
+            if (!hasMessageIdPrefix)
+            {
+                // For JMSCorrelationID, has no "ID:" prefix, use it as-is.
+                return stringId;
+            }
+            else if (hasTypeEncodingPrefix(stringId, JMS_ID_PREFIX_LENGTH))
+            {
+                // We are for a JMSCorrelationID value, but have 'ID:' 
followed by
+                // one of the encoding prefixes. Need to escape the entire 
string
+                // to preserve for later re-use as a JMSCorrelationID.
+                return JMS_ID_PREFIX + AMQP_STRING_PREFIX + stringId;
+            }
+            else
+            {
+                // It has "ID:" prefix and doesn't have encoding prefix, use 
it as-is.
+                return stringId;
+            }
+        }
+        else
+        {
+            // Not a string, convert it
+            return convertToIdString(idObject);
+        }
+    }
+
+    /**
+     * Takes the provided non-String AMQP message-id/correlation-id object, and
+     * convert it it to a String usable as either a JMSMessageID or 
JMSCorrelationID
+     * value, encoding the type information as a prefix to convey for later use
+     * in reversing the process if used to set JMSCorrelationID on a message.
+     *
+     * @param idObject the object to process
+     * @return string to be used for the actual JMS ID.
+     */
+    private String convertToIdString(Object idObject)
+    {
+        if (idObject == null)
+        {
+            return null;
+        }
+
+        if (idObject instanceof UUID)
+        {
+            return JMS_ID_PREFIX + AMQP_UUID_PREFIX + idObject.toString();
+        }
+        else if (idObject instanceof UnsignedLong)
+        {
+            return JMS_ID_PREFIX + AMQP_ULONG_PREFIX + idObject.toString();
+        }
+        else if (idObject instanceof Binary)
+        {
+            ByteBuffer dup = ((Binary) idObject).asByteBuffer();
+
+            byte[] bytes = new byte[dup.remaining()];
+            dup.get(bytes);
+
+            String hex = convertBinaryToHexString(bytes);
+
+            return JMS_ID_PREFIX + AMQP_BINARY_PREFIX + hex;
+        }
+        else
+        {
+            throw new IllegalArgumentException("Unsupported type provided: " + 
idObject.getClass());
+        }
+    }
+
+    private boolean hasTypeEncodingPrefix(String stringId, int offset)
+    {
+        if (!stringId.startsWith(AMQP_PREFIX, offset))
+        {
+            return false;
+        }
+
+        return hasAmqpBinaryPrefix(stringId, offset) ||
+               hasAmqpUuidPrefix(stringId, offset) ||
+               hasAmqpUlongPrefix(stringId, offset) ||
+               hasAmqpStringPrefix(stringId, offset) ||
+               hasAmqpNoPrefix(stringId, offset);
+    }
+
+    private boolean hasAmqpStringPrefix(String stringId, int offset)
+    {
+        return stringId.startsWith(AMQP_STRING_PREFIX, offset);
+    }
+
+    private boolean hasAmqpUlongPrefix(String stringId, int offset)
+    {
+        return stringId.startsWith(AMQP_ULONG_PREFIX, offset);
+    }
+
+    private boolean hasAmqpUuidPrefix(String stringId, int offset)
+    {
+        return stringId.startsWith(AMQP_UUID_PREFIX, offset);
+    }
+
+    private boolean hasAmqpBinaryPrefix(String stringId, int offset)
+    {
+        return stringId.startsWith(AMQP_BINARY_PREFIX, offset);
+    }
+
+    private boolean hasAmqpNoPrefix(String stringId, int offset)
+    {
+        return stringId.startsWith(AMQP_NO_PREFIX, offset);
+    }
+
+    /**
+     * Takes the provided id string and return the appropriate amqp messageId 
style object.
+     * Converts the type based on any relevant encoding information found as a 
prefix.
+     *
+     * @param origId the object to be converted
+     * @return the amqp messageId style object
+     * @throws public class MessageConversionException extends RuntimeException
+     *                if the provided baseId String indicates an encoded type 
but can't be converted to that type.
+     */
+    public Object toIdObject(final String origId) throws 
MessageConversionException
+    {
+        if (origId == null)
+        {
+            return null;
+        }
+
+        if (!AmqpMessageIdHelper.INSTANCE.hasMessageIdPrefix(origId))
+        {
+            // We have a string without any "ID:" prefix, it is an
+            // application-specific String, use it as-is.
+            return origId;
+        }
+
+        try
+        {
+            if (hasAmqpNoPrefix(origId, JMS_ID_PREFIX_LENGTH))
+            {
+                // Prefix telling us there was originally no "ID:" prefix,
+                // strip it and return the remainder
+                return origId.substring(JMS_ID_PREFIX_LENGTH + 
AMQP_NO_PREFIX_LENGTH);
+            }
+            else if (hasAmqpUuidPrefix(origId, JMS_ID_PREFIX_LENGTH))
+            {
+                String uuidString = origId.substring(JMS_ID_PREFIX_LENGTH + 
AMQP_UUID_PREFIX_LENGTH);
+                return UUID.fromString(uuidString);
+            }
+            else if (hasAmqpUlongPrefix(origId, JMS_ID_PREFIX_LENGTH))
+            {
+                String ulongString = origId.substring(JMS_ID_PREFIX_LENGTH + 
AMQP_ULONG_PREFIX_LENGTH);
+                return UnsignedLong.valueOf(ulongString);
+            }
+            else if (hasAmqpStringPrefix(origId, JMS_ID_PREFIX_LENGTH))
+            {
+                return origId.substring(JMS_ID_PREFIX_LENGTH + 
AMQP_STRING_PREFIX_LENGTH);
+            }
+            else if (hasAmqpBinaryPrefix(origId, JMS_ID_PREFIX_LENGTH))
+            {
+                String hexString = origId.substring(JMS_ID_PREFIX_LENGTH + 
AMQP_BINARY_PREFIX_LENGTH);
+                byte[] bytes = convertHexStringToBinary(hexString);
+                return new Binary(bytes);
+            }
+            else
+            {
+                // We have a string without any encoding prefix needing 
processed,
+                // so transmit it as-is, including the "ID:"
+                return origId;
+            }
+        }
+        catch (IllegalArgumentException e)
+        {
+            throw new MessageConversionException("Unable to convert ID value", 
e);
+        }
+    }
+
+    /**
+     * Convert the provided hex-string into a binary representation where each 
byte represents
+     * two characters of the hex string.
+     * <p>
+     * The hex characters may be upper or lower case.
+     *
+     * @param hexString string to convert
+     * @return a byte array containing the binary representation
+     * @throws IllegalArgumentException if the provided String is a non-even 
length or contains non-hex characters
+     */
+    public byte[] convertHexStringToBinary(String hexString) throws 
IllegalArgumentException
+    {
+        int length = hexString.length();
+
+        // As each byte needs two characters in the hex encoding, the string 
must be an even length.
+        if (length % 2 != 0)
+        {
+            throw new IllegalArgumentException("The provided hex String must 
be an even length, but was of length "
+                                               + length
+                                               + ": "
+                                               + hexString);
+        }
+
+        byte[] binary = new byte[length / 2];
+
+        for (int i = 0; i < length; i += 2)
+        {
+            char highBitsChar = hexString.charAt(i);
+            char lowBitsChar = hexString.charAt(i + 1);
+
+            int highBits = hexCharToInt(highBitsChar, hexString) << 4;
+            int lowBits = hexCharToInt(lowBitsChar, hexString);
+
+            binary[i / 2] = (byte) (highBits + lowBits);
+        }
+
+        return binary;
+    }
+
+    private int hexCharToInt(char ch, String orig) throws 
IllegalArgumentException
+    {
+        if (ch >= '0' && ch <= '9')
+        {
+            // subtract '0' to get difference in position as an int
+            return ch - '0';
+        }
+        else if (ch >= 'A' && ch <= 'F')
+        {
+            // subtract 'A' to get difference in position as an int
+            // and then add 10 for the offset of 'A'
+            return ch - 'A' + 10;
+        }
+        else if (ch >= 'a' && ch <= 'f')
+        {
+            // subtract 'a' to get difference in position as an int
+            // and then add 10 for the offset of 'a'
+            return ch - 'a' + 10;
+        }
+
+        throw new IllegalArgumentException("The provided hex string contains 
non-hex character '" + ch + "': " + orig);
+    }
+
+    /**
+     * Convert the provided binary into a hex-string representation where each 
character
+     * represents 4 bits of the provided binary, i.e each byte requires two 
characters.
+     * <p>
+     * The returned hex characters are upper-case.
+     *
+     * @param bytes binary to convert
+     * @return a String containing a hex representation of the bytes
+     */
+    public String convertBinaryToHexString(byte[] bytes)
+    {
+        // Each byte is represented as 2 chars
+        StringBuilder builder = new StringBuilder(bytes.length * 2);
+
+        for (byte b : bytes)
+        {
+            // The byte will be expanded to int before shifting, replicating 
the
+            // sign bit, so mask everything beyond the first 4 bits afterwards
+            int highBitsInt = (b >> 4) & 0xF;
+            // We only want the first 4 bits
+            int lowBitsInt = b & 0xF;
+
+            builder.append(HEX_CHARS[highBitsInt]);
+            builder.append(HEX_CHARS[lowBitsInt]);
+        }
+
+        return builder.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/selector/JMSMessagePropertyExpression.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/selector/JMSMessagePropertyExpression.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/selector/JMSMessagePropertyExpression.java
new file mode 100644
index 0000000..46c0ca0
--- /dev/null
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/selector/JMSMessagePropertyExpression.java
@@ -0,0 +1,119 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.selector;
+
+import java.util.HashMap;
+
+import org.apache.qpid.server.filter.Expression;
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.filter.PropertyExpression;
+import org.apache.qpid.server.filter.PropertyExpressionFactory;
+
+public class JMSMessagePropertyExpression implements 
PropertyExpression<Filterable>
+{
+    public static final PropertyExpressionFactory<Filterable> FACTORY = 
JMSMessagePropertyExpression::new;
+
+    static final String JMS_MESSAGE_ID = "JMSMessageID";
+    static final String JMS_CORRELATION_ID = "JMSCorrelationID";
+    static final String JMS_REPLY_TO = "JMSReplyTo";
+    static final String JMS_TYPE = "JMSType";
+    static final String JMS_DELIVERY_MODE = "JMSDeliveryMode";
+    static final String JMS_PRIORITY = "JMSPriority";
+    static final String JMS_TIMESTAMP = "JMSTimestamp";
+    static final String JMS_EXPIRATION = "JMSExpiration";
+    static final String JMS_REDELIVERED = "JMSRedelivered";
+
+    private static final HashMap<String, Expression> JMS_PROPERTY_EXPRESSIONS 
= new HashMap<>();
+
+    static
+    {
+        JMS_PROPERTY_EXPRESSIONS.put(JMS_REPLY_TO, (Expression<Filterable>) 
Filterable::getReplyTo);
+        JMS_PROPERTY_EXPRESSIONS.put(JMS_TYPE, (Expression<Filterable>) 
Filterable::getType);
+        JMS_PROPERTY_EXPRESSIONS.put(JMS_DELIVERY_MODE,
+                                     (Expression<Filterable>) message -> 
(message.isPersistent()
+                                             ? JMSDeliveryMode.PERSISTENT
+                                             : 
JMSDeliveryMode.NON_PERSISTENT).toString());
+        JMS_PROPERTY_EXPRESSIONS.put(JMS_PRIORITY, (Expression<Filterable>) 
message -> (int) message.getPriority());
+        JMS_PROPERTY_EXPRESSIONS.put(JMS_MESSAGE_ID,
+                                     (Expression<Filterable>) message -> 
AmqpMessageIdHelper.INSTANCE.toMessageIdString(
+                                             message.getMessageId()));
+        JMS_PROPERTY_EXPRESSIONS.put(JMS_TIMESTAMP, (Expression<Filterable>) 
Filterable::getTimestamp);
+        JMS_PROPERTY_EXPRESSIONS.put(JMS_CORRELATION_ID,
+                                     (Expression<Filterable>) message -> 
AmqpMessageIdHelper.INSTANCE.toCorrelationIdString(
+                                             message.getCorrelationId()));
+        JMS_PROPERTY_EXPRESSIONS.put(JMS_EXPIRATION, (Expression<Filterable>) 
Filterable::getExpiration);
+        JMS_PROPERTY_EXPRESSIONS.put(JMS_REDELIVERED, (Expression<Filterable>) 
Filterable::isRedelivered);
+    }
+
+    private final String name;
+    private final Expression jmsPropertyExpression;
+
+    private JMSMessagePropertyExpression(String name)
+    {
+        this.name = name;
+        jmsPropertyExpression = JMS_PROPERTY_EXPRESSIONS.get(name);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Object evaluate(Filterable message)
+    {
+        if (jmsPropertyExpression != null)
+        {
+            return jmsPropertyExpression.evaluate(message);
+        }
+        else
+        {
+            return message.getHeader(name);
+        }
+    }
+
+    public String getName()
+    {
+        return name;
+    }
+
+    @Override
+    public String toString()
+    {
+        return name;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return name.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        return (o != null)
+               && this.getClass().equals(o.getClass())
+               && name.equals(((JMSMessagePropertyExpression) o).name);
+    }
+
+    // Constants - defined the same as JMS
+    enum JMSDeliveryMode
+    {
+        NON_PERSISTENT, PERSISTENT
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/selector/JMSMessagePropertyExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/selector/JMSMessagePropertyExpressionTest.java
 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/selector/JMSMessagePropertyExpressionTest.java
new file mode 100644
index 0000000..2eb578f
--- /dev/null
+++ 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/selector/JMSMessagePropertyExpressionTest.java
@@ -0,0 +1,345 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.selector;
+
+import static 
org.apache.qpid.server.protocol.v1_0.selector.AmqpMessageIdHelper.AMQP_BINARY_PREFIX;
+import static 
org.apache.qpid.server.protocol.v1_0.selector.AmqpMessageIdHelper.AMQP_NO_PREFIX;
+import static 
org.apache.qpid.server.protocol.v1_0.selector.AmqpMessageIdHelper.AMQP_STRING_PREFIX;
+import static 
org.apache.qpid.server.protocol.v1_0.selector.AmqpMessageIdHelper.AMQP_ULONG_PREFIX;
+import static 
org.apache.qpid.server.protocol.v1_0.selector.AmqpMessageIdHelper.AMQP_UUID_PREFIX;
+import static 
org.apache.qpid.server.protocol.v1_0.selector.AmqpMessageIdHelper.JMS_ID_PREFIX;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+
+import org.junit.Test;
+
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.filter.PropertyExpression;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
+
+public class JMSMessagePropertyExpressionTest
+{
+
+    @Test
+    public void evaluateJMSMessageIDForStringWithIDPrefix()
+    {
+        final String id = "ID:testID";
+        final Filterable message = createFilterableWithMessageId(id);
+
+        PropertyExpression<Filterable> expression =
+                
JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_MESSAGE_ID);
+        Object value = expression.evaluate(message);
+
+        assertThat(value, is(equalTo(id)));
+    }
+
+    @Test
+    public void evaluateJMSMessageIDForStringWithoutIDPrefix()
+    {
+        final String id = "testID";
+        final Filterable message = createFilterableWithMessageId(id);
+
+        PropertyExpression<Filterable> expression =
+                
JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_MESSAGE_ID);
+        Object value = expression.evaluate(message);
+
+        assertThat(value, is(equalTo(JMS_ID_PREFIX + AMQP_NO_PREFIX + id)));
+    }
+
+    @Test
+    public void evaluateJMSMessageIDForUUID()
+    {
+        final UUID id = UUID.randomUUID();
+        final Filterable message = createFilterableWithMessageId(id);
+
+        PropertyExpression<Filterable> expression =
+                
JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_MESSAGE_ID);
+        Object value = expression.evaluate(message);
+
+        assertThat(value, is(equalTo(JMS_ID_PREFIX + AMQP_UUID_PREFIX + id)));
+    }
+
+    @Test
+    public void evaluateJMSMessageIDForUnsignedLong()
+    {
+        final UnsignedLong id = UnsignedLong.valueOf(42);
+        final Filterable message = createFilterableWithMessageId(id);
+
+        PropertyExpression<Filterable> expression =
+                
JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_MESSAGE_ID);
+        Object value = expression.evaluate(message);
+
+        assertThat(value, is(equalTo(JMS_ID_PREFIX + AMQP_ULONG_PREFIX + id)));
+    }
+
+    @Test
+    public void evaluateJMSMessageIDForBinary()
+    {
+        byte[] data = {1, 2, 3};
+        final Binary id = new Binary(data);
+        final Filterable message = createFilterableWithMessageId(id);
+
+        PropertyExpression<Filterable> expression =
+                
JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_MESSAGE_ID);
+        Object value = expression.evaluate(message);
+
+        assertThat(value, is(equalTo(JMS_ID_PREFIX + AMQP_BINARY_PREFIX + 
AmqpMessageIdHelper.INSTANCE.convertBinaryToHexString(data))));
+    }
+
+    @Test
+    public void evaluateJMSMessageIDForStringWithAMQPBindingPrefixes()
+    {
+        final String id = "ID:AMQP_ULONG:string-id";
+        final Filterable message = createFilterableWithMessageId(id);
+
+        PropertyExpression<Filterable> expression =
+                
JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_MESSAGE_ID);
+        Object value = expression.evaluate(message);
+
+        assertThat(value, is(equalTo(JMS_ID_PREFIX + AMQP_STRING_PREFIX + 
id)));
+    }
+
+    @Test
+    public void evaluateJMSCorrelationIDForStringWithIDPrefix()
+    {
+        final String id = "ID:testID";
+        final Filterable message = createFilterableWithCorrelationId(id);
+
+        PropertyExpression<Filterable> expression =
+                
JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_CORRELATION_ID);
+        Object value = expression.evaluate(message);
+
+        assertThat(value, is(equalTo(id)));
+    }
+
+    @Test
+    public void evaluateJMSCorrelationIDForStringWithoutIDPrefix()
+    {
+        final String id = "testID";
+        final Filterable message = createFilterableWithCorrelationId(id);
+
+        PropertyExpression<Filterable> expression =
+                
JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_CORRELATION_ID);
+        Object value = expression.evaluate(message);
+
+        assertThat(value, is(equalTo(id)));
+    }
+
+    @Test
+    public void evaluateJMSCorrelationIDForUUID()
+    {
+        final UUID id = UUID.randomUUID();
+        final Filterable message = createFilterableWithCorrelationId(id);
+
+        PropertyExpression<Filterable> expression =
+                
JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_CORRELATION_ID);
+        Object value = expression.evaluate(message);
+
+        assertThat(value, is(equalTo(JMS_ID_PREFIX + AMQP_UUID_PREFIX + id)));
+    }
+
+    @Test
+    public void evaluateJMSCorrelationIDForUnsignedLong()
+    {
+        final UnsignedLong id = UnsignedLong.valueOf(42);
+        final Filterable message = createFilterableWithCorrelationId(id);
+
+        PropertyExpression<Filterable> expression =
+                
JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_CORRELATION_ID);
+        Object value = expression.evaluate(message);
+
+        assertThat(value, is(equalTo(JMS_ID_PREFIX + AMQP_ULONG_PREFIX + id)));
+    }
+
+    @Test
+    public void evaluateJMSCorrelationIDForBinary()
+    {
+        byte[] data = {1, 2, 3};
+        final Binary id = new Binary(data);
+        final Filterable message = createFilterableWithCorrelationId(id);
+
+        PropertyExpression<Filterable> expression =
+                
JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_CORRELATION_ID);
+        Object value = expression.evaluate(message);
+
+        assertThat(value,
+                   is(equalTo(JMS_ID_PREFIX
+                              + AMQP_BINARY_PREFIX
+                              + 
AmqpMessageIdHelper.INSTANCE.convertBinaryToHexString(data))));
+    }
+
+    @Test
+    public void evaluateJMSCorrelationIDForStringWithAMQPBindingPrefixes()
+    {
+        final String id = "ID:AMQP_ULONG:string-id";
+        final Filterable message = createFilterableWithCorrelationId(id);
+
+        PropertyExpression<Filterable> expression =
+                
JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_CORRELATION_ID);
+        Object value = expression.evaluate(message);
+
+        assertThat(value, is(equalTo(JMS_ID_PREFIX + AMQP_STRING_PREFIX + 
id)));
+    }
+
+    @Test
+    public void evaluateJMSCorrelationIDForStringWithAMQPTypePrefixes()
+    {
+        final String id = "AMQP_ULONG:foo";
+        final Filterable message = createFilterableWithCorrelationId(id);
+
+        PropertyExpression<Filterable> expression =
+                
JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_CORRELATION_ID);
+        Object value = expression.evaluate(message);
+
+        assertThat(value, is(equalTo(id)));
+    }
+
+    @Test
+    public void evaluateJMSReplyTo()
+    {
+        final String replyTo = "testReplyTo";
+        final Filterable message = mock(Filterable.class);
+        when(message.getReplyTo()).thenReturn(replyTo);
+
+        PropertyExpression<Filterable> expression =
+                
JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_REPLY_TO);
+        Object value = expression.evaluate(message);
+
+        assertThat(value, is(equalTo(replyTo)));
+    }
+
+    @Test
+    public void evaluateJMSType()
+    {
+        final String type = "testType";
+        final Filterable message = mock(Filterable.class);
+        when(message.getType()).thenReturn(type);
+
+        PropertyExpression<Filterable> expression =
+                
JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_TYPE);
+        Object value = expression.evaluate(message);
+
+        assertThat(value, is(equalTo(type)));
+    }
+
+    @Test
+    public void evaluateJMSDeliveryModeForPersistentMessage()
+    {
+        final Filterable message = mock(Filterable.class);
+        when(message.isPersistent()).thenReturn(true);
+
+        PropertyExpression<Filterable> expression =
+                
JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_DELIVERY_MODE);
+        Object value = expression.evaluate(message);
+
+        assertThat(value, 
is(equalTo(JMSMessagePropertyExpression.JMSDeliveryMode.PERSISTENT.toString())));
+    }
+
+    @Test
+    public void evaluateJMSDeliveryModeForNonPersistentMessage()
+    {
+        final Filterable message = mock(Filterable.class);
+        when(message.isPersistent()).thenReturn(false);
+
+        PropertyExpression<Filterable> expression =
+                
JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_DELIVERY_MODE);
+        Object value = expression.evaluate(message);
+
+        assertThat(value, 
is(equalTo(JMSMessagePropertyExpression.JMSDeliveryMode.NON_PERSISTENT.toString())));
+    }
+
+    @Test
+    public void evaluateJMSPriority()
+    {
+        int priority = 5;
+        final Filterable message = mock(Filterable.class);
+        when(message.getPriority()).thenReturn((byte) priority);
+
+        PropertyExpression<Filterable> expression =
+                
JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_PRIORITY);
+        Object value = expression.evaluate(message);
+
+        assertThat(value, is(equalTo(priority)));
+    }
+
+    @Test
+    public void evaluateJMSTimestamp()
+    {
+        long timestamp = System.currentTimeMillis();
+        final Filterable message = mock(Filterable.class);
+        when(message.getTimestamp()).thenReturn(timestamp);
+
+        PropertyExpression<Filterable> expression =
+                
JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_TIMESTAMP);
+        Object value = expression.evaluate(message);
+
+        assertThat(value, is(equalTo(timestamp)));
+    }
+
+    @Test
+    public void evaluateJMSExpiration()
+    {
+        long expiration = System.currentTimeMillis() + 10000;
+        final Filterable message = mock(Filterable.class);
+        when(message.getExpiration()).thenReturn(expiration);
+
+        PropertyExpression<Filterable> expression =
+                
JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_EXPIRATION);
+        Object value = expression.evaluate(message);
+
+        assertThat(value, is(equalTo(expiration)));
+    }
+
+    @Test
+    public void evaluateJMSRedelivered()
+    {
+        boolean redelivered = true;
+        final Filterable message = mock(Filterable.class);
+        when(message.isRedelivered()).thenReturn(redelivered);
+
+        PropertyExpression<Filterable> expression =
+                
JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_REDELIVERED);
+        Object value = expression.evaluate(message);
+
+        assertThat(value, is(equalTo(redelivered)));
+    }
+
+    private Filterable createFilterableWithCorrelationId(final Object id)
+    {
+        final Filterable message = mock(Filterable.class);
+        when(message.getCorrelationId()).thenReturn(id);
+        return message;
+    }
+
+    private Filterable createFilterableWithMessageId(final Object id)
+    {
+        final Filterable message = mock(Filterable.class);
+        when(message.getMessageId()).thenReturn(id);
+        return message;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
 
b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 77f20d0..0919edc 100644
--- 
a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ 
b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -58,6 +58,7 @@ import 
org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
@@ -113,6 +114,7 @@ public class Interaction extends 
AbstractInteraction<Interaction>
     private Object _decodedLatestDelivery;
     private UnsignedInteger _latestDeliveryId;
     private Map<String, Object> _latestDeliveryApplicationProperties;
+    private Properties _latestDeliveryProperties;
 
     Interaction(final FrameTransport frameTransport)
     {
@@ -1083,6 +1085,7 @@ public class Interaction extends 
AbstractInteraction<Interaction>
                                 });
         _decodedLatestDelivery = messageDecoder.getData();
         _latestDeliveryApplicationProperties = 
messageDecoder.getApplicationProperties();
+        _latestDeliveryProperties = messageDecoder.getProperties();
         _latestDelivery = null;
         return this;
     }
@@ -1102,6 +1105,11 @@ public class Interaction extends 
AbstractInteraction<Interaction>
         return _latestDeliveryApplicationProperties;
     }
 
+    public Properties getLatestDeliveryProperties()
+    {
+        return _latestDeliveryProperties;
+    }
+
     private List<Transfer> receiveAllTransfers(final Class<?>... ignore) 
throws Exception
     {
         List<Transfer> transfers = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java
 
b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java
index 0df1abd..8af6f3e 100644
--- 
a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java
+++ 
b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java
@@ -45,6 +45,7 @@ import 
org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSect
 import org.apache.qpid.server.protocol.v1_0.type.messaging.FooterSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
 import 
org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotationsSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 
@@ -213,4 +214,14 @@ public class MessageDecoder
         }
         return Collections.emptyMap();
     }
+
+    public Properties getProperties() throws AmqpErrorException
+    {
+        parse();
+        if (_propertiesSection != null)
+        {
+            return _propertiesSection.getValue();
+        }
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f1ce8666/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/JMSMessagePropertiesFilterTest.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/JMSMessagePropertiesFilterTest.java
 
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/JMSMessagePropertiesFilterTest.java
new file mode 100644
index 0000000..49a2e37
--- /dev/null
+++ 
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/JMSMessagePropertiesFilterTest.java
@@ -0,0 +1,356 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.extensions.bindmapjms;
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assume.assumeThat;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.UUID;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.selector.AmqpMessageIdHelper;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+public class JMSMessagePropertiesFilterTest extends BrokerAdminUsingTestBase
+{
+    private static final String TEST_MESSAGE_CONTENT = "testContent";
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+        _brokerAddress = 
getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @Test
+    @BrokerSpecific(kind = KIND_BROKER_J)
+    @SpecificationTest(section = "3.5.1",
+            description = "A source can restrict the messages transferred from 
a source by specifying a filter."
+                          + ""
+                          + "AMQP JMS Mapping\n"
+                          + ""
+                          + "3.2.1.1 JMSMessageID And JMSCorrelationID 
Handling\n"
+                          + "JMSMessageID representation for UUID values : "
+                          + "'ID:AMQP_UUID:<string representation of uuid>'")
+    public void selectorWithJMSMessageIDAsUUID() throws Exception
+    {
+        final UUID messageId = UUID.randomUUID();
+        final Properties properties = new Properties();
+        properties.setMessageId(messageId);
+        perform(properties, String.format("JMSMessageID='ID:AMQP_UUID:%s'", 
messageId), "messageId", messageId);
+    }
+
+    @Test
+    @BrokerSpecific(kind = KIND_BROKER_J)
+    @SpecificationTest(section = "3.5.1",
+            description = "A source can restrict the messages transferred from 
a source by specifying a filter.\n"
+                          + ""
+                          + "AMQP JMS Mapping\n"
+                          + ""
+                          + "3.2.1.1 JMSMessageID And JMSCorrelationID 
Handling\n"
+                          + "JMSMessageID representation for unsigned long 
values : "
+                          + "'ID:AMQP_ULONG:<string representation of ulong>'")
+    public void selectorWithJMSMessageIDAsUnsignedLong() throws Exception
+    {
+        final UnsignedLong messageId = UnsignedLong.ONE;
+        final Properties properties = new Properties();
+        properties.setMessageId(messageId);
+        perform(properties,
+                String.format("JMSMessageID='ID:AMQP_ULONG:%d'", 
messageId.longValue()),
+                "messageId",
+                messageId);
+    }
+
+    @Test
+    @BrokerSpecific(kind = KIND_BROKER_J)
+    @SpecificationTest(section = "3.5.1",
+            description = "A source can restrict the messages transferred from 
a source by specifying a filter.\n"
+                          + ""
+                          + "AMQP JMS Mapping\n"
+                          + ""
+                          + "3.2.1.1 JMSMessageID And JMSCorrelationID 
Handling\n"
+                          + "JMSMessageID representation for Binary values : "
+                          + "'ID:AMQP_BINARY:<hex representation of bytes>'")
+    public void selectorWithJMSMessageIDAsBinary() throws Exception
+    {
+        byte[] data = {1, 2, 3};
+        final Binary messageId = new Binary(data);
+        final Properties properties = new Properties();
+        properties.setMessageId(messageId);
+        perform(properties,
+                String.format("JMSMessageID='ID:AMQP_BINARY:%s'",
+                              
AmqpMessageIdHelper.INSTANCE.convertBinaryToHexString(data)),
+                "messageId",
+                messageId);
+    }
+
+    @Test
+    @BrokerSpecific(kind = KIND_BROKER_J)
+    @SpecificationTest(section = "3.5.1",
+            description = "A source can restrict the messages transferred from 
a source by specifying a filter.\n"
+                          + ""
+                          + "AMQP JMS Mapping\n"
+                          + ""
+                          + "3.2.1.1 JMSMessageID And JMSCorrelationID 
Handling\n"
+                          + "JMSMessageID representation for String values 
without prefix 'ID:' : "
+                          + "'ID:AMQP_NO_PREFIX:<original-string>'")
+    public void selectorWithJMSMessageIDAsStringWithoutPrefix() throws 
Exception
+    {
+        final String messageId = "testId";
+        final Properties properties = new Properties();
+        properties.setMessageId(messageId);
+        perform(properties, 
String.format("JMSMessageID='ID:AMQP_NO_PREFIX:%s'", messageId), "messageId", 
messageId);
+    }
+
+    @Test
+    @BrokerSpecific(kind = KIND_BROKER_J)
+    @SpecificationTest(section = "3.5.1",
+            description = "A source can restrict the messages transferred from 
a source by specifying a filter.\n"
+                          + ""
+                          + "AMQP JMS Mapping\n"
+                          + ""
+                          + "3.2.1.1 JMSMessageID And JMSCorrelationID 
Handling\n"
+                          + "JMSMessageID representation for String values 
with prefix 'ID:' : "
+                          + "'ID:<original-string>'")
+    public void selectorWithJMSMessageIDAsStringWithPrefix() throws Exception
+    {
+        final String messageId = "ID:testId";
+        final Properties properties = new Properties();
+        properties.setMessageId(messageId);
+        perform(properties, String.format("JMSMessageID='%s'", messageId), 
"messageId", messageId);
+    }
+
+
+    @Test
+    @BrokerSpecific(kind = KIND_BROKER_J)
+    @SpecificationTest(section = "3.5.1",
+            description = "A source can restrict the messages transferred from 
a source by specifying a filter."
+                          + ""
+                          + "AMQP JMS Mapping\n"
+                          + ""
+                          + "3.2.1.1 JMSMessageID And JMSCorrelationID 
Handling\n"
+                          + "JMSCorrelationID representation for UUID values : 
"
+                          + "'ID:AMQP_UUID:<string representation of uuid>'")
+    public void selectorWithJMSCorrelationIDAsUUID() throws Exception
+    {
+        final UUID correlationId = UUID.randomUUID();
+        final Properties properties = new Properties();
+        properties.setCorrelationId(correlationId);
+        perform(properties,
+                String.format("JMSCorrelationID='ID:AMQP_UUID:%s'", 
correlationId),
+                "correlationId",
+                correlationId);
+    }
+
+    @Test
+    @BrokerSpecific(kind = KIND_BROKER_J)
+    @SpecificationTest(section = "3.5.1",
+            description = "A source can restrict the messages transferred from 
a source by specifying a filter.\n"
+                          + ""
+                          + "AMQP JMS Mapping\n"
+                          + ""
+                          + "3.2.1.1 JMSMessageID And JMSCorrelationID 
Handling\n"
+                          + "JMSCorrelationID representation for unsigned long 
values : "
+                          + "'ID:AMQP_ULONG:<string representation of ulong>'")
+    public void selectorWithJMSCorrelationIDAsUnsignedLong() throws Exception
+    {
+        final UnsignedLong correlationId = UnsignedLong.ONE;
+        final Properties properties = new Properties();
+        properties.setCorrelationId(correlationId);
+        perform(properties,
+                String.format("JMSCorrelationID='ID:AMQP_ULONG:%d'", 
correlationId.longValue()),
+                "correlationId",
+                correlationId);
+    }
+
+    @Test
+    @BrokerSpecific(kind = KIND_BROKER_J)
+    @SpecificationTest(section = "3.5.1",
+            description = "A source can restrict the messages transferred from 
a source by specifying a filter.\n"
+                          + ""
+                          + "AMQP JMS Mapping\n"
+                          + ""
+                          + "3.2.1.1 JMSMessageID And JMSCorrelationID 
Handling\n"
+                          + "JMSCorrelationID representation for Binary values 
: "
+                          + "'ID:AMQP_BINARY:<hex representation of bytes>'")
+    public void selectorWithJMSCorrelationIDAsBinary() throws Exception
+    {
+        byte[] data = {1, 2, 3};
+        final Binary correlationId = new Binary(data);
+        final Properties properties = new Properties();
+        properties.setCorrelationId(correlationId);
+        perform(properties,
+                String.format("JMSCorrelationID='ID:AMQP_BINARY:%s'",
+                              
AmqpMessageIdHelper.INSTANCE.convertBinaryToHexString(data)),
+                "correlationId",
+                correlationId);
+    }
+
+    @Test
+    @BrokerSpecific(kind = KIND_BROKER_J)
+    @SpecificationTest(section = "3.5.1",
+            description = "A source can restrict the messages transferred from 
a source by specifying a filter.\n"
+                          + ""
+                          + "AMQP JMS Mapping\n"
+                          + ""
+                          + "3.2.1.1 JMSMessageID And JMSCorrelationID 
Handling\n"
+                          + "JMSCorrelationID representation for String values 
without prefix 'ID:' : "
+                          + "'<original-string>'")
+    public void selectorWithJMSCorrelationIDAsStringWithoutPrefix() throws 
Exception
+    {
+        final String correlationId = "testId";
+        final Properties properties = new Properties();
+        properties.setCorrelationId(correlationId);
+        perform(properties,
+                String.format("JMSCorrelationID='%s'", correlationId),
+                "correlationId",
+                correlationId);
+    }
+
+    @Test
+    @BrokerSpecific(kind = KIND_BROKER_J)
+    @SpecificationTest(section = "3.5.1",
+            description = "A source can restrict the messages transferred from 
a source by specifying a filter.\n"
+                          + ""
+                          + "AMQP JMS Mapping\n"
+                          + ""
+                          + "3.2.1.1 JMSMessageID And JMSCorrelationID 
Handling\n"
+                          + "JMSCorrelationID representation for String values 
with prefix 'ID:' : "
+                          + "'ID:<original-string>'")
+    public void selectorWithJMSCorrelationIDAsStringWithPrefix() throws 
Exception
+    {
+        final String correlationId = "ID:testId";
+        final Properties properties = new Properties();
+        properties.setCorrelationId(correlationId);
+        perform(properties, String.format("JMSCorrelationID='%s'", 
correlationId), "correlationId", correlationId);
+    }
+
+    private void perform(final Properties properties,
+                         final String selector,
+                         final String propertyName,
+                         final Object propertyValue) throws Exception
+    {
+        try (FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse()
+                       .open().consumeResponse(Open.class)
+                       .begin().consumeResponse(Begin.class)
+                       .attachRole(Role.SENDER)
+                       .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                       .attach().consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class);
+            Flow flow = interaction.getLatestResponse(Flow.class);
+            assumeThat("insufficient credit for the test", 
flow.getLinkCredit().intValue(), is(greaterThan(1)));
+
+            for (int i = 0; i < 2; i++)
+            {
+
+                QpidByteBuffer payload =
+                        generateMessagePayloadWithMessageProperties(properties,
+                                                                    
TEST_MESSAGE_CONTENT);
+                interaction.transferPayload(payload)
+                           .transferSettled(true)
+                           .transfer();
+            }
+            interaction.detachClose(true).detach().close().sync();
+        }
+
+        try (FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+
+            interaction.negotiateProtocol().consumeResponse()
+                       .open().consumeResponse(Open.class)
+                       .begin().consumeResponse(Begin.class)
+                       .attachRole(Role.RECEIVER)
+                       .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                       .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                       
.attachSourceFilter(Collections.singletonMap(Symbol.valueOf("selector-filter"),
+                                                                    new 
JMSSelectorFilter(selector)))
+                       .attach().consumeResponse()
+                       .flowIncomingWindow(UnsignedInteger.ONE)
+                       .flowNextIncomingId(UnsignedInteger.ZERO)
+                       .flowOutgoingWindow(UnsignedInteger.ZERO)
+                       .flowNextOutgoingId(UnsignedInteger.ZERO)
+                       .flowLinkCredit(UnsignedInteger.ONE)
+                       .flowHandleFromLinkHandle()
+                       .flow();
+
+            Object data = 
interaction.receiveDelivery().decodeLatestDelivery().getDecodedLatestDelivery();
+            assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+            Properties deliveryProperties = 
interaction.getLatestDeliveryProperties();
+            assertThat(deliveryProperties, is(notNullValue()));
+
+            Method getter = Properties.class.getMethod("get"
+                                                       + 
Character.toUpperCase(propertyName.charAt(0))
+                                                       + 
propertyName.substring(1));
+            assertThat(getter.invoke(deliveryProperties), 
is(equalTo(propertyValue)));
+
+            interaction.dispositionSettled(true)
+                       .dispositionRole(Role.RECEIVER)
+                       .dispositionState(new Accepted())
+                       .disposition();
+            interaction.close().sync();
+        }
+    }
+
+    private QpidByteBuffer generateMessagePayloadWithMessageProperties(final 
Properties properties, String content)
+    {
+        MessageEncoder messageEncoder = new MessageEncoder();
+        messageEncoder.setProperties(properties);
+        messageEncoder.addData(content);
+        return messageEncoder.getPayload();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to