Apply patch for AMQNET-554. Suport for message properties, and selectors. 
Thanks Stephane Ramet!


Project: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/commit/16d8f06d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/tree/16d8f06d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/diff/16d8f06d

Branch: refs/heads/master
Commit: 16d8f06dd0b178c5030c0da7345aa16e2d51a761
Parents: 51ec9a2
Author: Jim Gomes <[email protected]>
Authored: Thu Jul 7 20:47:10 2016 +0000
Committer: Jim Gomes <[email protected]>
Committed: Thu Jul 7 20:47:10 2016 +0000

----------------------------------------------------------------------
 src/main/csharp/BaseMessage.cs                  |  154 ++-
 src/main/csharp/DefaultMessageConverter.cs      |  500 +++++++-
 src/main/csharp/IMessageConverter.cs            |   28 +-
 src/main/csharp/IMessageConverterEx.cs          |   44 +
 src/main/csharp/MessageConsumer.cs              |   93 +-
 src/main/csharp/QueueBrowser.cs                 |   31 +-
 .../csharp/Readers/AbstractMessageReader.cs     |  126 ++
 .../Readers/ByCorrelationIdMessageReader.cs     |  139 +++
 src/main/csharp/Readers/ByIdMessageReader.cs    |  136 ++
 .../csharp/Readers/ByLookupIdMessageReader.cs   |  145 +++
 .../csharp/Readers/BySelectorMessageReader.cs   |  290 +++++
 src/main/csharp/Readers/IMessageReader.cs       |   93 ++
 src/main/csharp/Readers/MessageReaderUtil.cs    |   91 ++
 .../csharp/Readers/NonFilteringMessageReader.cs |  128 ++
 src/main/csharp/Selector/ANDExpression.cs       |   47 +
 .../csharp/Selector/AlignedNumericValues.cs     |  175 +++
 .../csharp/Selector/ArithmeticExpression.cs     |   57 +
 src/main/csharp/Selector/BinaryExpression.cs    |   59 +
 .../csharp/Selector/BooleanCastExpression.cs    |   45 +
 .../Selector/BooleanConstantExpression.cs       |   38 +
 .../csharp/Selector/BooleanUnaryExpression.cs   |   39 +
 .../csharp/Selector/ComparisonExpression.cs     |  162 +++
 src/main/csharp/Selector/ConstantExpression.cs  |  157 +++
 src/main/csharp/Selector/DivideExpression.cs    |   67 +
 src/main/csharp/Selector/EqualExpression.cs     |   47 +
 src/main/csharp/Selector/GreaterExpression.cs   |   42 +
 .../csharp/Selector/GreaterOrEqualExpression.cs |   43 +
 src/main/csharp/Selector/IBooleanExpression.cs  |   35 +
 src/main/csharp/Selector/IExpression.cs         |   35 +
 src/main/csharp/Selector/InExpression.cs        |   98 ++
 src/main/csharp/Selector/IsNullExpression.cs    |   59 +
 src/main/csharp/Selector/LesserExpression.cs    |   42 +
 .../csharp/Selector/LesserOrEqualExpression.cs  |   43 +
 src/main/csharp/Selector/LikeExpression.cs      |  124 ++
 src/main/csharp/Selector/LogicExpression.cs     |   48 +
 .../csharp/Selector/MessageEvaluationContext.cs |   78 ++
 src/main/csharp/Selector/MinusExpression.cs     |   67 +
 src/main/csharp/Selector/ModExpression.cs       |   67 +
 src/main/csharp/Selector/MultiplyExpression.cs  |   67 +
 src/main/csharp/Selector/NOTExpression.cs       |   45 +
 src/main/csharp/Selector/NegateExpression.cs    |   51 +
 src/main/csharp/Selector/ORExpression.cs        |   46 +
 src/main/csharp/Selector/ParseException.cs      |  197 +++
 src/main/csharp/Selector/PlusExpression.cs      |   68 +
 src/main/csharp/Selector/PropertyExpression.cs  |   53 +
 src/main/csharp/Selector/SelectorParser.cs      | 1172 ++++++++++++++++++
 src/main/csharp/Selector/SelectorParser.csc     |  589 +++++++++
 .../csharp/Selector/SelectorParserConstants.cs  |   75 ++
 .../Selector/SelectorParserTokenManager.cs      | 1042 ++++++++++++++++
 src/main/csharp/Selector/SimpleCharStream.cs    |  366 ++++++
 src/main/csharp/Selector/Token.cs               |   78 ++
 src/main/csharp/Selector/TokenMgrError.cs       |  130 ++
 src/main/csharp/Selector/UnaryExpression.cs     |   66 +
 src/main/csharp/Session.cs                      |   12 +-
 vs2008-msmq.csproj                              |   47 +
 55 files changed, 7585 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/BaseMessage.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/BaseMessage.cs b/src/main/csharp/BaseMessage.cs
index db73f4c..8ea7e31 100644
--- a/src/main/csharp/BaseMessage.cs
+++ b/src/main/csharp/BaseMessage.cs
@@ -1,4 +1,4 @@
-/*
+ /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -24,28 +24,9 @@ namespace Apache.NMS.MSMQ
 
        public class BaseMessage : IMessage
        {
-               private PrimitiveMap propertiesMap = new PrimitiveMap();
-               private IDestination destination;
-               private string correlationId;
-               private TimeSpan timeToLive;
-               private string messageId;
-               private MsgDeliveryMode deliveryMode;
-               private MsgPriority priority;
-               private Destination replyTo;
-               private byte[] content;
-               private string type;
-               private event AcknowledgeHandler Acknowledger;
-               private DateTime timestamp = new DateTime();
-               private bool readOnlyMsgBody = false;
-
-               public bool ReadOnlyBody
-               {
-                       get { return readOnlyMsgBody; }
-                       set { readOnlyMsgBody = value; }
-               }
-
-               // IMessage interface
+               #region Acknowledgement
 
+               private event AcknowledgeHandler Acknowledger;
                public void Acknowledge()
                {
                        if(null != Acknowledger)
@@ -54,6 +35,27 @@ namespace Apache.NMS.MSMQ
                        }
                }
 
+               #endregion
+
+               #region Message body
+
+               private byte[] content;
+               public byte[] Content
+               {
+                       get { return content; }
+                       set { this.content = value; }
+               }
+
+               private bool readOnlyMsgBody = false;
+               /// <summary>
+               /// Whether the message body is read-only.
+               /// </summary>
+               public bool ReadOnlyBody
+               {
+                       get { return readOnlyMsgBody; }
+                       set { readOnlyMsgBody = value; }
+               }
+
                /// <summary>
                /// Clears out the message body. Clearing a message's body does 
not clear its header
                /// values or property entries.
@@ -67,26 +69,82 @@ namespace Apache.NMS.MSMQ
                        this.readOnlyMsgBody = false;
                }
 
+               #endregion
+
+               #region Message properties
+
+               private PrimitiveMap propertiesMap = new PrimitiveMap();
+               private MessagePropertyIntercepter propertyHelper;
+               /// <summary>
+               /// Provides access to the message properties (headers)
+               /// </summary>
+               public Apache.NMS.IPrimitiveMap Properties
+               {
+                       get
+                       {
+                if(propertyHelper == null)
+                {
+                                   propertyHelper = new 
Apache.NMS.Util.MessagePropertyIntercepter(
+                                           this, propertiesMap, 
this.ReadOnlyProperties);
+                               }
+
+                return propertyHelper;
+                       }
+               }
+
+               private bool readOnlyMsgProperties = false;
+               /// <summary>
+               /// Whether the message properties is read-only.
+               /// </summary>
+               public virtual bool ReadOnlyProperties
+               {
+                       get { return this.readOnlyMsgProperties; }
+
+                       set
+                       {
+                               if(this.propertyHelper != null)
+                               {
+                                       this.propertyHelper.ReadOnly = value;
+                               }
+                               this.readOnlyMsgProperties = value;
+                       }
+               }
+
                /// <summary>
                /// Clears a message's properties.
-               ///
                /// The message's header fields and body are not cleared.
                /// </summary>
-               public virtual void ClearProperties()
+               public void ClearProperties()
                {
-                       propertiesMap.Clear();
+            this.ReadOnlyProperties = false;
+            this.propertiesMap.Clear();
                }
 
-               // Properties
+               public object GetObjectProperty(string name)
+               {
+                       return Properties[name];
+               }
 
-               public IPrimitiveMap Properties
+               public void SetObjectProperty(string name, object value)
                {
-                       get { return propertiesMap; }
+            Properties[name] = value;
                }
 
+               #endregion
 
-               // NMS headers
+               #region Message header fields
 
+               private string messageId;
+               /// <summary>
+               /// The message ID which is set by the provider
+               /// </summary>
+               public string NMSMessageId
+               {
+                       get { return messageId; }
+                       set { messageId = value; }
+               }
+
+               private string correlationId;
                /// <summary>
                /// The correlation ID used to correlate messages with 
conversations or long running business processes
                /// </summary>
@@ -96,6 +154,7 @@ namespace Apache.NMS.MSMQ
                        set { correlationId = value; }
                }
 
+               private IDestination destination;
                /// <summary>
                /// The destination of the message
                /// </summary>
@@ -105,6 +164,7 @@ namespace Apache.NMS.MSMQ
                        set { destination = value; }
                }
 
+               private TimeSpan timeToLive;
                /// <summary>
                /// The time in milliseconds that this message should expire in
                /// </summary>
@@ -114,15 +174,7 @@ namespace Apache.NMS.MSMQ
                        set { timeToLive = value; }
                }
 
-               /// <summary>
-               /// The message ID which is set by the provider
-               /// </summary>
-               public string NMSMessageId
-               {
-                       get { return messageId; }
-                       set { messageId = value; }
-               }
-
+               private MsgDeliveryMode deliveryMode;
                /// <summary>
                /// Whether or not this message is persistent
                /// </summary>
@@ -132,6 +184,7 @@ namespace Apache.NMS.MSMQ
                        set { deliveryMode = value; }
                }
 
+               private MsgPriority priority;
                /// <summary>
                /// The Priority on this message
                /// </summary>
@@ -150,7 +203,7 @@ namespace Apache.NMS.MSMQ
             set { }
                }
 
-
+               private Destination replyTo;
                /// <summary>
                /// The destination that the consumer of this message should 
send replies to
                /// </summary>
@@ -160,7 +213,7 @@ namespace Apache.NMS.MSMQ
                        set { replyTo = (Destination) value; }
                }
 
-
+               private DateTime timestamp = new DateTime();
                /// <summary>
                /// The timestamp the broker added to the message
                /// </summary>
@@ -170,12 +223,7 @@ namespace Apache.NMS.MSMQ
                        set { timestamp = value; }
                }
 
-               public byte[] Content
-               {
-                       get { return content; }
-                       set { this.content = value; }
-               }
-
+               private string type;
                /// <summary>
                /// The type name of this message
                /// </summary>
@@ -185,15 +233,9 @@ namespace Apache.NMS.MSMQ
                        set { type = value; }
                }
 
+        #endregion
 
-               public object GetObjectProperty(string name)
-               {
-                       return null;
-               }
-
-               public void SetObjectProperty(string name, object value)
-               {
-               }
+        #region Check access mode
 
                protected void FailIfReadOnlyBody()
                {
@@ -205,11 +247,13 @@ namespace Apache.NMS.MSMQ
 
                protected void FailIfWriteOnlyBody()
                {
-                       if( ReadOnlyBody == false )
+                       if(ReadOnlyBody == false)
                        {
                                throw new MessageNotReadableException("Message 
is in Write-Only mode.");
                        }
                }
+
+        #endregion
        }
 }
 

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/DefaultMessageConverter.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/DefaultMessageConverter.cs 
b/src/main/csharp/DefaultMessageConverter.cs
index 2aa3438..83097fc 100644
--- a/src/main/csharp/DefaultMessageConverter.cs
+++ b/src/main/csharp/DefaultMessageConverter.cs
@@ -32,12 +32,61 @@ namespace Apache.NMS.MSMQ
                StreamMessage
        }
 
-       public class DefaultMessageConverter : IMessageConverter
+    /// <summary>
+    /// This class provides default rules for converting MSMQ to and from
+    /// NMS messages, when the peer system expects or produces compatible
+    /// mappings, typically when the peer system is also implemented on
+    /// Apache.NMS.
+    /// Default mappings are as follows :
+    /// <ul>
+    /// <li>
+    ///   the MSMQ Message.AppSetting field is used for specifying the NMS
+    ///   message type, as specified by the <c>NMSMessageType</c> enumeration.
+    /// </li>
+    /// <li>
+    ///   the MSMQ Message.Extension field is populated with a map
+    ///   (a marshalled <c>PrimitiveMap</c>) of message properties.
+    /// </li>
+    /// <li>
+    ///   in earlier versions of Apache.NMS.MSMQ, the MSMQ Message.Label
+    ///   field was populated with the value of the NMSType field. Setting
+    ///   <c>SetLabelAsNMSType</c> to true (the default value) applies that
+    ///   same rule, which makes it compatible with existing NMS peers. If
+    ///   set to false, the Message.Label field is populated with the value
+    ///   of a "Label" property, if it exists, thus making it readable by
+    ///   standard management or monitoring tools. The NMSType value is then
+    ///   transmitted as a field in the Message.Extension map.
+    /// </li>
+    /// </ul>
+    /// Please note that in earlier versions of Apache.NMS, only one property
+    /// was set in the Message.Extension field : the NMSCorrelationID.
+    /// The native Message.CorrelationId field is not settable, except for
+    /// reply messages explicitely created as such through the MSMQ API.
+    /// Transmission of the correlation id. through a mapped property called
+    /// NMSCorrelationID is therefore maintained.
+    /// When exchanging messages with a non compatible peer, a specific
+    /// message converter must be provided, which should at least be able to
+    /// map message types and define the encoding used for text messages.
+    /// </summary>
+       public class DefaultMessageConverter : IMessageConverterEx
        {
+        private bool setLabelAsNMSType = true;
+        public bool SetLabelAsNMSType
+        {
+            get { return setLabelAsNMSType; }
+            set { setLabelAsNMSType = value; }
+        }
+
+        #region Messages
+        /// <summary>
+        /// Converts the specified NMS message to an equivalent MSMQ message.
+        /// </summary>
+        /// <param name="message">NMS message to be converted.</param>
+        /// <result>Converted MSMQ message.</result>
                public virtual Message ToMsmqMessage(IMessage message)
                {
                        Message msmqMessage = new Message();
-                       PrimitiveMap metaData = new PrimitiveMap();
+                       PrimitiveMap propertyData = new PrimitiveMap();
 
                        ConvertMessageBodyToMSMQ(message, msmqMessage);
 
@@ -48,32 +97,72 @@ namespace Apache.NMS.MSMQ
 
                        if(message.NMSCorrelationID != null)
                        {
-                               metaData.SetString("NMSCorrelationID", 
message.NMSCorrelationID);
+                               propertyData.SetString("NMSCorrelationID", 
message.NMSCorrelationID);
                        }
 
                        msmqMessage.Recoverable = (message.NMSDeliveryMode == 
MsgDeliveryMode.Persistent);
-                       msmqMessage.Priority = 
ToMessagePriority(message.NMSPriority);
+                       msmqMessage.Priority = 
ToMsmqMessagePriority(message.NMSPriority);
                        msmqMessage.ResponseQueue = 
ToMsmqDestination(message.NMSReplyTo);
                        if(message.NMSType != null)
                        {
-                               msmqMessage.Label = message.NMSType;
+                if(SetLabelAsNMSType)
+                {
+                                   propertyData.SetString("NMSType", 
message.NMSType);
+                }
+                else
+                {
+                    msmqMessage.Label = message.NMSType;
+                }
                        }
 
-                       // Store the NMS meta data in the extension area
-                       msmqMessage.Extension = metaData.Marshal();
+            // Populate property data
+            foreach(object keyObject in message.Properties.Keys)
+            {
+              string key = (keyObject as string);
+              object val = message.Properties.GetString(key);
+              if(!SetLabelAsNMSType && string.Compare(key, "Label", true) == 0 
&& val != null)
+              {
+                               msmqMessage.Label = val.ToString();
+              }
+              else
+              {
+                               propertyData[key] = val;
+              }
+            }
+
+                       // Store the NMS property data in the extension area
+                       msmqMessage.Extension = propertyData.Marshal();
                        return msmqMessage;
                }
 
+        /// <summary>
+        /// Converts the specified MSMQ message to an equivalent NMS message
+        /// (including its message body).
+        /// </summary>
+        /// <param name="message">MSMQ message to be converted.</param>
+        /// <result>Converted NMS message.</result>
                public virtual IMessage ToNmsMessage(Message message)
                {
-                       BaseMessage answer = CreateNmsMessage(message);
-                       // Get the NMS meta data from the extension area
-                       PrimitiveMap metaData = 
PrimitiveMap.Unmarshal(message.Extension);
+            return ToNmsMessage(message, true);
+        }
+
+        /// <summary>
+        /// Converts the specified MSMQ message to an equivalent NMS message.
+        /// </summary>
+        /// <param name="message">MSMQ message to be converted.</param>
+        /// <param name="convertBody">true if message body should be 
converted.</param>
+        /// <result>Converted NMS message.</result>
+               public virtual IMessage ToNmsMessage(Message message, bool 
convertBody)
+               {
+                       BaseMessage answer = CreateNmsMessage(message, 
convertBody);
+
+                       // Get the NMS property data from the extension area
+                       PrimitiveMap propertyData = 
PrimitiveMap.Unmarshal(message.Extension);
 
                        try
                        {
                                answer.NMSMessageId = message.Id;
-                               answer.NMSCorrelationID = 
metaData.GetString("NMSCorrelationID");
+                               answer.NMSCorrelationID = 
propertyData.GetString("NMSCorrelationID");
                                answer.NMSDeliveryMode = (message.Recoverable ? 
MsgDeliveryMode.Persistent : MsgDeliveryMode.NonPersistent);
                                answer.NMSDestination = 
ToNmsDestination(message.DestinationQueue);
                        }
@@ -83,18 +172,85 @@ namespace Apache.NMS.MSMQ
 
                        try
                        {
-                               answer.NMSType = message.Label;
                                answer.NMSReplyTo = 
ToNmsDestination(message.ResponseQueue);
                                answer.NMSTimeToLive = message.TimeToBeReceived;
+                           answer.NMSPriority = 
ToNmsMsgPriority(message.Priority);
+                       }
+                       catch(InvalidOperationException)
+                       {
+                       }
+
+                       try
+                       {
+                if(message.Label != null)
+                {
+                    if(SetLabelAsNMSType)
+                    {
+                        answer.NMSType = message.Label;
+                    }
+                    else
+                    {
+                        answer.Properties["Label"] = message.Label;
+                    }
+                }
+                answer.Properties["LookupId"] = message.LookupId;
                        }
                        catch(InvalidOperationException)
                        {
                        }
 
+            foreach(object keyObject in propertyData.Keys)
+            {
+                           try
+                           {
+                    string key = (keyObject as string);
+                    if(string.Compare(key, "NMSType", true) == 0)
+                    {
+                                   answer.NMSType = 
propertyData.GetString(key);
+                    }
+                    else if(string.Compare(key, "NMSCorrelationID", true) == 0)
+                    {
+                                   answer.NMSCorrelationID = 
propertyData.GetString("NMSCorrelationID");
+                    }
+                    else
+                    {
+                                   answer.Properties[key] = propertyData[key];
+                    }
+                           }
+                           catch(InvalidOperationException)
+                           {
+                           }
+            }
                        return answer;
                }
 
-               private static MessagePriority ToMessagePriority(MsgPriority 
msgPriority)
+        #endregion
+
+        #region Message priority
+
+        // Message priorities are defined as follows :
+        // | MSMQ               | NMS                |
+        // | MessagePriority   | MsgPriority        |
+        // +--------------------+--------------------+
+        // | Lowest             | Lowest             |
+        // | VeryLow            | VeryLow            |
+        // | Low                | Low                |
+        // |                \-> | AboveLow           |
+        // |                /-> | BelowNormal        |
+        // | Normal             | Normal             |
+        // | AboveNormal        | AboveNormal        |
+        // | High               | High               |
+        // | VeryHigh           | VeryHigh           |
+        // | Highest            | Highest            |
+        // +--------------------+--------------------+
+
+        /// <summary>
+        /// Converts the specified NMS message priority to an equivalent MSMQ
+        /// message priority.
+        /// </summary>
+        /// <param name="msgPriority">NMS message priority to be 
converted.</param>
+        /// <result>Converted MSMQ message priority.</result>
+               private static MessagePriority 
ToMsmqMessagePriority(MsgPriority msgPriority)
                {
                        switch(msgPriority)
                        {
@@ -127,6 +283,153 @@ namespace Apache.NMS.MSMQ
                        }
                }
 
+        /// <summary>
+        /// Converts the specified MSMQ message priority to an equivalent NMS
+        /// message priority.
+        /// </summary>
+        /// <param name="messagePriority">MSMQ message priority to be 
converted.</param>
+        /// <result>Converted NMS message priority.</result>
+               private static MsgPriority ToNmsMsgPriority(MessagePriority 
messagePriority)
+               {
+                       switch(messagePriority)
+                       {
+                       case MessagePriority.Lowest:
+                               return MsgPriority.Lowest;
+
+                       case MessagePriority.VeryLow:
+                               return MsgPriority.VeryLow;
+
+                       case MessagePriority.Low:
+                               return MsgPriority.Low;
+
+                       default:
+                       case MessagePriority.Normal:
+                               return MsgPriority.Normal;
+
+                       case MessagePriority.AboveNormal:
+                               return MsgPriority.AboveNormal;
+
+                       case MessagePriority.High:
+                               return MsgPriority.High;
+
+                       case MessagePriority.VeryHigh:
+                               return MsgPriority.VeryHigh;
+
+                       case MessagePriority.Highest:
+                               return MsgPriority.Highest;
+                       }
+               }
+
+        #endregion
+
+        #region Message creation
+
+        // Conversion of the message body has been separated from the creation
+        // of the NMS message object for performance reasons when using
+        // selectors (selectors handle only message attributes, not message
+        // bodies).
+        // CreateNmsMessage(Message) is maintained for compatibility reasons
+        // with existing clients that may have implemented derived classes,
+        // instead of completely removing the body conversion part from the
+        // method.
+
+        /// <summary>
+        /// Creates an NMS message of appropriate type for the specified MSMQ
+        /// message, and convert the message body.
+        /// </summary>
+        /// <param name="message">MSMQ message.</param>
+        /// <result>NMS message created for retrieving the MSMQ 
message.</result>
+               protected virtual BaseMessage CreateNmsMessage(Message message)
+               {
+            return CreateNmsMessage(message, true);
+        }
+
+        /// <summary>
+        /// Creates an NMS message of appropriate type for the specified MSMQ
+        /// message, and convert the message body if specified.
+        /// </summary>
+        /// <param name="message">MSMQ message.</param>
+        /// <param name="convertBody">true if the message body must be
+        /// converted.</param>
+        /// <result>NMS message created for retrieving the MSMQ 
message.</result>
+               protected virtual BaseMessage CreateNmsMessage(Message message,
+            bool convertBody)
+               {
+                       BaseMessage result = null;
+
+                       if((int) NMSMessageType.TextMessage == 
message.AppSpecific)
+                       {
+                               TextMessage textMessage = new TextMessage();
+
+                if(convertBody)
+                {
+                    ConvertTextMessageBodyToNMS(message, textMessage);
+                }
+
+                               result = textMessage;
+                       }
+                       else if((int) NMSMessageType.BytesMessage == 
message.AppSpecific)
+                       {
+                               BytesMessage bytesMessage = new BytesMessage();
+
+                if(convertBody)
+                {
+                    ConvertBytesMessageBodyToNMS(message, bytesMessage);
+                }
+
+                               result = bytesMessage;
+                       }
+                       else if((int) NMSMessageType.ObjectMessage == 
message.AppSpecific)
+                       {
+                               ObjectMessage objectMessage = new 
ObjectMessage();
+
+                if(convertBody)
+                {
+                    ConvertObjectMessageBodyToNMS(message, objectMessage);
+                }
+
+                               result = objectMessage;
+                       }
+                       else if((int) NMSMessageType.MapMessage == 
message.AppSpecific)
+                       {
+                               MapMessage mapMessage = new MapMessage();
+
+                if(convertBody)
+                {
+                    ConvertMapMessageBodyToNMS(message, mapMessage);
+                }
+
+                               result = mapMessage;
+                       }
+                       else if((int) NMSMessageType.StreamMessage == 
message.AppSpecific)
+                       {
+                               StreamMessage streamMessage = new 
StreamMessage();
+
+                if(convertBody)
+                {
+                    ConvertStreamMessageBodyToNMS(message, streamMessage);
+                }
+
+                               result = streamMessage;
+                       }
+                       else
+                       {
+                               BaseMessage baseMessage = new BaseMessage();
+                               result = baseMessage;
+                       }
+
+                       return result;
+               }
+
+        #endregion
+
+        #region Message body
+
+        /// <summary>
+        /// Converts an NMS message body to the equivalent MSMQ message body.
+        /// </summary>
+        /// <param name="message">Source NMS message.</param>
+        /// <param name="answer">Target MSMQ message.</param>
                protected virtual void ConvertMessageBodyToMSMQ(IMessage 
message, Message answer)
                {
                        if(message is TextMessage)
@@ -172,78 +475,133 @@ namespace Apache.NMS.MSMQ
                        }
                }
 
-               protected virtual BaseMessage CreateNmsMessage(Message message)
+        /// <summary>
+        /// Converts an MSMQ message body to the equivalent NMS message body.
+        /// </summary>
+        /// <param name="message">Source MSMQ message.</param>
+        /// <param name="answer">Target NMS message.</param>
+               public virtual void ConvertMessageBodyToNMS(Message message, 
IMessage answer)
                {
-                       BaseMessage result = null;
-
-                       if((int) NMSMessageType.TextMessage == 
message.AppSpecific)
+                       if(answer is TextMessage)
                        {
-                               TextMessage textMessage = new TextMessage();
-                               string content = String.Empty;
-
-                               if(message.BodyStream != null && 
message.BodyStream.Length > 0)
-                               {
-                                       byte[] buf = null;
-                                       buf = new 
byte[message.BodyStream.Length];
-                                       message.BodyStream.Read(buf, 0, 
buf.Length);
-                                       content = Encoding.UTF32.GetString(buf);
-                               }
-
-                               textMessage.Text = content;
-                               result = textMessage;
+                               ConvertTextMessageBodyToNMS(message, 
(TextMessage)answer);
                        }
-                       else if((int) NMSMessageType.BytesMessage == 
message.AppSpecific)
+                       else if(answer is BytesMessage)
                        {
-                               byte[] buf = null;
-
-                               if(message.BodyStream != null && 
message.BodyStream.Length > 0)
-                               {
-                                       buf = new 
byte[message.BodyStream.Length];
-                                       message.BodyStream.Read(buf, 0, 
buf.Length);
-                               }
-
-                               BytesMessage bytesMessage = new BytesMessage();
-                               bytesMessage.Content = buf;
-                               result = bytesMessage;
+                               ConvertBytesMessageBodyToNMS(message, 
(BytesMessage)answer);
                        }
-                       else if((int) NMSMessageType.ObjectMessage == 
message.AppSpecific)
+                       else if(answer is ObjectMessage)
                        {
-                               ObjectMessage objectMessage = new 
ObjectMessage();
-
-                               objectMessage.Body = message.Body;
-                               result = objectMessage;
+                               ConvertObjectMessageBodyToNMS(message, 
(ObjectMessage)answer);
                        }
-                       else if((int) NMSMessageType.MapMessage == 
message.AppSpecific)
+                       else if(answer is MapMessage)
                        {
-                               byte[] buf = null;
-
-                               if(message.BodyStream != null && 
message.BodyStream.Length > 0)
-                               {
-                                       buf = new 
byte[message.BodyStream.Length];
-                                       message.BodyStream.Read(buf, 0, 
buf.Length);
-                               }
-
-                               MapMessage mapMessage = new MapMessage();
-                               mapMessage.Body = PrimitiveMap.Unmarshal(buf);
-                               result = mapMessage;
+                               ConvertMapMessageBodyToNMS(message, 
(MapMessage)answer);
                        }
-                       else if((int) NMSMessageType.StreamMessage == 
message.AppSpecific)
+                       else if(answer is StreamMessage)
                        {
-                               StreamMessage streamMessage = new 
StreamMessage();
+                               ConvertStreamMessageBodyToNMS(message, 
(StreamMessage)answer);
+                       }
 
-                               // TODO: Implement
-                               result = streamMessage;
+                       return;
+               }
+
+        /// <summary>
+        /// Converts an MSMQ message body to the equivalent NMS text message
+        /// body.
+        /// </summary>
+        /// <param name="message">Source MSMQ message.</param>
+        /// <param name="answer">Target NMS text message.</param>
+               public virtual void ConvertTextMessageBodyToNMS(Message message,
+            TextMessage answer)
+               {
+                       string content = String.Empty;
+
+                       if(message.BodyStream != null && 
message.BodyStream.Length > 0)
+                       {
+                               byte[] buf = new 
byte[message.BodyStream.Length];
+                               message.BodyStream.Read(buf, 0, buf.Length);
+                               content = Encoding.UTF32.GetString(buf);
                        }
-                       else
+
+                       answer.Text = content;
+               }
+
+        /// <summary>
+        /// Converts an MSMQ message body to the equivalent NMS bytes message
+        /// body.
+        /// </summary>
+        /// <param name="message">Source MSMQ message.</param>
+        /// <param name="answer">Target NMS bytes message.</param>
+               public virtual void ConvertBytesMessageBodyToNMS(Message 
message,
+            BytesMessage answer)
+               {
+                       byte[] buf = null;
+
+                       if(message.BodyStream != null && 
message.BodyStream.Length > 0)
                        {
-                               BaseMessage baseMessage = new BaseMessage();
+                               buf = new byte[message.BodyStream.Length];
+                               message.BodyStream.Read(buf, 0, buf.Length);
+                       }
 
-                               result = baseMessage;
+                       answer.Content = buf;
+               }
+
+        /// <summary>
+        /// Converts an MSMQ message body to the equivalent NMS object message
+        /// body.
+        /// </summary>
+        /// <param name="message">Source MSMQ message.</param>
+        /// <param name="answer">Target NMS object message.</param>
+               public virtual void ConvertObjectMessageBodyToNMS(Message 
message,
+            ObjectMessage answer)
+               {
+                       answer.Body = message.Body;
+               }
+
+        /// <summary>
+        /// Converts an MSMQ message body to the equivalent NMS map message
+        /// body.
+        /// </summary>
+        /// <param name="message">Source MSMQ message.</param>
+        /// <param name="answer">Target NMS map message.</param>
+               public virtual void ConvertMapMessageBodyToNMS(Message message,
+            MapMessage answer)
+               {
+                       byte[] buf = null;
+
+                       if(message.BodyStream != null && 
message.BodyStream.Length > 0)
+                       {
+                               buf = new byte[message.BodyStream.Length];
+                               message.BodyStream.Read(buf, 0, buf.Length);
                        }
 
-                       return result;
+                       answer.Body = PrimitiveMap.Unmarshal(buf);
+               }
+
+        /// <summary>
+        /// Converts an MSMQ message body to the equivalent NMS stream message
+        /// body.
+        /// </summary>
+        /// <param name="message">Source MSMQ message.</param>
+        /// <param name="answer">Target NMS stream message.</param>
+               public virtual void ConvertStreamMessageBodyToNMS(Message 
message,
+            StreamMessage answer)
+               {
+                       // TODO: Implement
+            throw new NotImplementedException();
                }
 
+        #endregion
+
+        #region Destination
+
+        /// <summary>
+        /// Converts an NMS destination to the equivalent MSMQ destination
+        /// (ie. queue).
+        /// </summary>
+        /// <param name="destination">NMS destination.</param>
+        /// <result>MSMQ queue.</result>
                public MessageQueue ToMsmqDestination(IDestination destination)
                {
                        if(null == destination)
@@ -254,6 +612,12 @@ namespace Apache.NMS.MSMQ
                        return new MessageQueue((destination as 
Destination).Path);
                }
 
+        /// <summary>
+        /// Converts an MSMQ destination (ie. queue) to the equivalent NMS
+        /// destination.
+        /// </summary>
+        /// <param name="destinationQueue">MSMQ destination queue.</param>
+        /// <result>NMS destination.</result>
                protected virtual IDestination ToNmsDestination(MessageQueue 
destinationQueue)
                {
                        if(null == destinationQueue)
@@ -263,5 +627,7 @@ namespace Apache.NMS.MSMQ
 
                        return new Queue(destinationQueue.Path);
                }
+
+        #endregion
        }
 }

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/IMessageConverter.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/IMessageConverter.cs 
b/src/main/csharp/IMessageConverter.cs
index 152377b..14c6669 100644
--- a/src/main/csharp/IMessageConverter.cs
+++ b/src/main/csharp/IMessageConverter.cs
@@ -20,15 +20,27 @@ namespace Apache.NMS.MSMQ
 {
        public interface IMessageConverter
        {
-
-               /// <summary>
-               /// Method ToMSMQMessageQueue
-               /// </summary>
-               /// <param name="destination">An IDestination</param>
-               /// <returns>A  MessageQueue</returns>
-               MessageQueue ToMsmqDestination(IDestination destination);
-
+        /// <summary>
+        /// Converts the specified NMS message to an equivalent MSMQ message.
+        /// </summary>
+        /// <param name="message">NMS message to be converted.</param>
+        /// <result>Converted MSMQ message.</result>
                Message ToMsmqMessage(IMessage message);
+
+        /// <summary>
+        /// Converts the specified MSMQ message to an equivalent NMS message
+        /// (including its message body).
+        /// </summary>
+        /// <param name="message">MSMQ message to be converted.</param>
+        /// <result>Converted NMS message.</result>
                IMessage ToNmsMessage(Message message);
+
+        /// <summary>
+        /// Converts an NMS destination to the equivalent MSMQ destination
+        /// (ie. queue).
+        /// </summary>
+        /// <param name="destination">NMS destination.</param>
+        /// <result>MSMQ queue.</result>
+               MessageQueue ToMsmqDestination(IDestination destination);
        }
 }

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/IMessageConverterEx.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/IMessageConverterEx.cs 
b/src/main/csharp/IMessageConverterEx.cs
new file mode 100644
index 0000000..92be928
--- /dev/null
+++ b/src/main/csharp/IMessageConverterEx.cs
@@ -0,0 +1,44 @@
+using System.Messaging;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ
+{
+    /// <summary>
+    /// Extended IMessageConverter interface supporting new methods for
+    /// optimizing message selection through "selectors".
+    /// The original IMessageConverter is maintained for compatibility
+    /// reasons with existing clients implementing it.
+    /// </summary>
+       public interface IMessageConverterEx : IMessageConverter
+       {
+        /// <summary>
+        /// Converts the specified MSMQ message to an equivalent NMS message.
+        /// </summary>
+        /// <param name="message">MSMQ message to be converted.</param>
+        /// <param name="convertBody">true if message body should be 
converted.</param>
+        /// <result>Converted NMS message.</result>
+               IMessage ToNmsMessage(Message message, bool convertBody);
+
+        /// <summary>
+        /// Converts an MSMQ message body to the equivalent NMS message body.
+        /// </summary>
+        /// <param name="message">Source MSMQ message.</param>
+        /// <param name="answer">Target NMS message.</param>
+               void ConvertMessageBodyToNMS(Message message, IMessage answer);
+       }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/MessageConsumer.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/MessageConsumer.cs 
b/src/main/csharp/MessageConsumer.cs
index 6961298..eaaed5c 100644
--- a/src/main/csharp/MessageConsumer.cs
+++ b/src/main/csharp/MessageConsumer.cs
@@ -1,6 +1,8 @@
 using System;
 using System.Messaging;
 using System.Threading;
+using Apache.NMS.Util;
+using Apache.NMS.MSMQ.Readers;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -17,12 +19,11 @@ using System.Threading;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using Apache.NMS.Util;
 
 namespace Apache.NMS.MSMQ
 {
     /// <summary>
-    /// An object capable of receiving messages from some destination
+    /// An object capable of receiving messages from some destination.
     /// </summary>
     public class MessageConsumer : IMessageConsumer
     {
@@ -31,8 +32,6 @@ namespace Apache.NMS.MSMQ
         private readonly Session session;
         private readonly AcknowledgementMode acknowledgementMode;
         private MessageQueue messageQueue;
-        private event MessageListener listener;
-        private int listenerCount = 0;
         private Thread asyncDeliveryThread = null;
         private AutoResetEvent pause = new AutoResetEvent(false);
         private Atomic<bool> asyncDelivery = new Atomic<bool>(false);
@@ -44,17 +43,46 @@ namespace Apache.NMS.MSMQ
             set { this.consumerTransformer = value; }
         }
 
-        public MessageConsumer(Session session, AcknowledgementMode 
acknowledgementMode, MessageQueue messageQueue)
+        private IMessageReader reader;
+
+        /// <summary>
+        /// Constructs a message consumer on the specified queue.
+        /// </summary>
+        /// <param name="session">The messaging session.</param>
+        /// <param name="acknowledgementMode">The message acknowledgement 
mode.</param>
+        /// <param name="messageQueue">The message queue to consume messages 
from.</param>
+        public MessageConsumer(Session session,
+            AcknowledgementMode acknowledgementMode, MessageQueue messageQueue)
+            : this(session, acknowledgementMode, messageQueue, null)
+        {
+        }
+
+        /// <summary>
+        /// Constructs a message consumer on the specified queue, using a
+        /// selector for filtering incoming messages.
+        /// </summary>
+        /// <param name="session">The messaging session.</param>
+        /// <param name="acknowledgementMode">The message acknowledgement 
mode.</param>
+        /// <param name="messageQueue">The message queue to consume messages 
from.</param>
+        /// <param name="selector">The selection criteria.</param>
+        public MessageConsumer(Session session,
+            AcknowledgementMode acknowledgementMode, MessageQueue messageQueue,
+            string selector)
         {
             this.session = session;
             this.acknowledgementMode = acknowledgementMode;
             this.messageQueue = messageQueue;
-            if(null != this.messageQueue)
+            if(this.messageQueue != null)
             {
                 this.messageQueue.MessageReadPropertyFilter.SetAll();
             }
+
+            reader = MessageReaderUtil.CreateMessageReader(
+                messageQueue, session.MessageConverter, selector);
         }
 
+        private int listenerCount = 0;
+        private event MessageListener listener;
         public event MessageListener Listener
         {
             add
@@ -85,32 +113,8 @@ namespace Apache.NMS.MSMQ
 
             if(messageQueue != null)
             {
-                Message message;
-
-                try
-                {
-                    message = messageQueue.Receive(zeroTimeout);
-                }
-                catch
-                {
-                    message = null;
-                }
-
-                if(null == message)
-                {
-                    ReceiveCompletedEventHandler receiveMsg =
-                            delegate(Object source, ReceiveCompletedEventArgs 
asyncResult) {
-                                message = 
messageQueue.EndReceive(asyncResult.AsyncResult);
-                                pause.Set();
-                            };
-
-                    messageQueue.ReceiveCompleted += receiveMsg;
-                    messageQueue.BeginReceive();
-                    pause.WaitOne();
-                    messageQueue.ReceiveCompleted -= receiveMsg;
-                }
-
-                nmsMessage = ToNmsMessage(message);
+                nmsMessage = reader.Receive();
+                nmsMessage = TransformMessage(nmsMessage);
             }
 
             return nmsMessage;
@@ -122,8 +126,8 @@ namespace Apache.NMS.MSMQ
 
             if(messageQueue != null)
             {
-                Message message = messageQueue.Receive(timeout);
-                nmsMessage = ToNmsMessage(message);
+                nmsMessage = reader.Receive(timeout);
+                nmsMessage = TransformMessage(nmsMessage);
             }
 
             return nmsMessage;
@@ -135,8 +139,8 @@ namespace Apache.NMS.MSMQ
 
             if(messageQueue != null)
             {
-                Message message = messageQueue.Receive(zeroTimeout);
-                nmsMessage = ToNmsMessage(message);
+                nmsMessage = reader.Receive(zeroTimeout);
+                nmsMessage = TransformMessage(nmsMessage);
             }
 
             return nmsMessage;
@@ -226,25 +230,20 @@ namespace Apache.NMS.MSMQ
             session.Connection.HandleException(e);
         }
 
-        protected virtual IMessage ToNmsMessage(Message message)
+        protected virtual IMessage TransformMessage(IMessage message)
         {
-            if(message == null)
-            {
-                return null;
-            }
-
-            IMessage converted = 
session.MessageConverter.ToNmsMessage(message);
+            IMessage transformed = message;
 
-            if(this.ConsumerTransformer != null)
+            if(message != null && this.ConsumerTransformer != null)
             {
-                IMessage newMessage = ConsumerTransformer(this.session, this, 
converted);
+                IMessage newMessage = ConsumerTransformer(this.session, this, 
message);
                 if(newMessage != null)
                 {
-                    converted = newMessage;
+                    transformed = newMessage;
                 }
             }
 
-            return converted;
+            return transformed;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/QueueBrowser.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/QueueBrowser.cs b/src/main/csharp/QueueBrowser.cs
index 32752c5..3ff795d 100644
--- a/src/main/csharp/QueueBrowser.cs
+++ b/src/main/csharp/QueueBrowser.cs
@@ -19,6 +19,7 @@ using System.Collections;
 using System.Messaging;
 using Apache.NMS;
 using Apache.NMS.Util;
+using Apache.NMS.MSMQ.Readers;
 
 namespace Apache.NMS.MSMQ
 {
@@ -30,7 +31,17 @@ namespace Apache.NMS.MSMQ
         private readonly Session session;
         private MessageQueue messageQueue;
 
+        private string selector;
+
+        private IMessageReader reader;
+        
                public QueueBrowser(Session session, MessageQueue messageQueue)
+            : this(session, messageQueue, null)
+               {
+               }
+
+               public QueueBrowser(Session session, MessageQueue messageQueue,
+            string selector)
                {
             this.session = session;
             this.messageQueue = messageQueue;
@@ -39,6 +50,8 @@ namespace Apache.NMS.MSMQ
                 this.messageQueue.MessageReadPropertyFilter.SetAll();
             }
 
+            reader = MessageReaderUtil.CreateMessageReader(
+                messageQueue, session.MessageConverter, selector);
                }
 
                ~QueueBrowser()
@@ -95,7 +108,7 @@ namespace Apache.NMS.MSMQ
 
                public string MessageSelector
                {
-                       get { throw new NotSupportedException(); }
+                       get { return selector; }
                }
 
                public IQueue Queue
@@ -107,11 +120,14 @@ namespace Apache.NMS.MSMQ
                {
                        private readonly Session session;
                        private readonly MessageEnumerator innerEnumerator;
+            private readonly IMessageReader reader;
 
-                       public Enumerator(Session session, MessageQueue 
messageQueue)
+                       public Enumerator(Session session, MessageQueue 
messageQueue,
+                IMessageReader reader)
                        {
                                this.session = session;
                                this.innerEnumerator = 
messageQueue.GetMessageEnumerator2();
+                this.reader = reader;
                        }
 
                        public object Current
@@ -124,7 +140,14 @@ namespace Apache.NMS.MSMQ
 
                        public bool MoveNext()
                        {
-                               return this.innerEnumerator.MoveNext();
+                while(this.innerEnumerator.MoveNext())
+                {
+                                   
if(reader.Matches(this.innerEnumerator.Current))
+                    {
+                        return true;
+                    }
+                }
+                return false;
                        }
 
                        public void Reset()
@@ -135,7 +158,7 @@ namespace Apache.NMS.MSMQ
 
                public IEnumerator GetEnumerator()
                {
-                       return new Enumerator(this.session, this.messageQueue);
+                       return new Enumerator(this.session, this.messageQueue, 
this.reader);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/Readers/AbstractMessageReader.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Readers/AbstractMessageReader.cs 
b/src/main/csharp/Readers/AbstractMessageReader.cs
new file mode 100644
index 0000000..7874696
--- /dev/null
+++ b/src/main/csharp/Readers/AbstractMessageReader.cs
@@ -0,0 +1,126 @@
+using System;
+using System.Messaging;
+using Apache.NMS.MSMQ;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ.Readers
+{
+    /// <summary>
+    /// Abstract MSMQ message reader. Derived classes support various
+    /// message filtering methods.
+    /// </summary>
+       public abstract class AbstractMessageReader : IMessageReader
+       {
+        protected MessageQueue messageQueue;
+        protected IMessageConverter messageConverter;
+        protected IMessageConverterEx messageConverterEx;
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="messageQueue">The MSMQ message queue from which
+        /// messages will be read.</param>
+        /// <param name="messageConverter">A message converter for mapping
+        /// MSMQ messages to NMS messages.</param>
+        public AbstractMessageReader(MessageQueue messageQueue,
+            IMessageConverter messageConverter)
+        {
+            this.messageQueue = messageQueue;
+
+            this.messageConverter = messageConverter;
+            this.messageConverterEx = (messageConverter as 
IMessageConverterEx);
+        }
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available.
+        /// </summary>
+        /// <returns>Peeked message.</returns>
+        public abstract IMessage Peek();
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available or the specified time-out occurs.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Peeked message.</returns>
+        public abstract IMessage Peek(TimeSpan timeSpan);
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by
+        /// the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of 
execution
+        /// until a message is available.
+        /// </summary>
+        /// <returns>Received message.</returns>
+        public abstract IMessage Receive();
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by the
+        /// MessageQueue matching the selection criteria, and waits until 
either
+        /// a message is available in the queue, or the time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Received message.</returns>
+        public abstract IMessage Receive(TimeSpan timeSpan);
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of 
execution
+        /// until a message is available.
+        /// </summary>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public abstract IMessage Receive(MessageQueueTransaction transaction);
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria,
+        /// and waits until either a message is available in the queue, or the
+        /// time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public abstract IMessage Receive(TimeSpan timeSpan,
+            MessageQueueTransaction transaction);
+
+        /// <summary>
+        /// Checks if an MSMQ message matches the selection criteria.
+        /// </summary>
+        /// <param name="message">MSMQ message.</param>
+        /// <return>true if the message matches the selection 
criteria.</return>
+        public abstract bool Matches(Message message);
+
+        /// <summary>
+        /// Converts an MSMQ message to an NMS message, using the converter
+        /// specified at construction time.
+        /// </summary>
+        /// <param name="message">MSMQ message.</param>
+        /// <return>NMS message.</return>
+        protected IMessage Convert(Message message)
+        {
+            return message == null ? null : 
messageConverter.ToNmsMessage(message);
+        }
+       }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/Readers/ByCorrelationIdMessageReader.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Readers/ByCorrelationIdMessageReader.cs 
b/src/main/csharp/Readers/ByCorrelationIdMessageReader.cs
new file mode 100644
index 0000000..fad3d1a
--- /dev/null
+++ b/src/main/csharp/Readers/ByCorrelationIdMessageReader.cs
@@ -0,0 +1,139 @@
+using System;
+using System.Messaging;
+using Apache.NMS.MSMQ;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ.Readers
+{
+    /// <summary>
+    /// MSMQ message reader, returning messages matching the specified
+    /// message identifier.
+    /// </summary>
+       public class ByCorrelationIdMessageReader : AbstractMessageReader
+       {
+        private string correlationId;
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="messageQueue">The MSMQ message queue from which
+        /// messages will be read.</param>
+        /// <param name="messageConverter">A message converter for mapping
+        /// MSMQ messages to NMS messages.</param>
+        /// <param name="correlationId">The correlation identifier of messages
+        /// to be read.</param>
+        public ByCorrelationIdMessageReader(MessageQueue messageQueue,
+            IMessageConverter messageConverter, string correlationId)
+            : base(messageQueue, messageConverter)
+        {
+            this.correlationId = correlationId;
+        }
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available.
+        /// </summary>
+        /// <returns>Peeked message.</returns>
+        public override IMessage Peek()
+        {
+            return Convert(messageQueue.PeekByCorrelationId(correlationId));
+        }
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available or the specified time-out occurs.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Peeked message.</returns>
+        public override IMessage Peek(TimeSpan timeSpan)
+        {
+            return Convert(messageQueue.PeekByCorrelationId(correlationId,
+                timeSpan));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by
+        /// the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of 
execution
+        /// until a message is available.
+        /// </summary>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive()
+        {
+            return Convert(messageQueue.ReceiveByCorrelationId(correlationId));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by the
+        /// MessageQueue matching the selection criteria, and waits until 
either
+        /// a message is available in the queue, or the time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(TimeSpan timeSpan)
+        {
+            return Convert(messageQueue.ReceiveByCorrelationId(correlationId,
+                timeSpan));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of 
execution
+        /// until a message is available.
+        /// </summary>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(MessageQueueTransaction transaction)
+        {
+            return Convert(messageQueue.ReceiveByCorrelationId(correlationId,
+                transaction));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria,
+        /// and waits until either a message is available in the queue, or the
+        /// time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(TimeSpan timeSpan,
+            MessageQueueTransaction transaction)
+        {
+            return Convert(messageQueue.ReceiveByCorrelationId(correlationId,
+                timeSpan, transaction));
+        }
+
+        /// <summary>
+        /// Checks if an MSMQ message matches the selection criteria.
+        /// </summary>
+        /// <param name="message">MSMQ message.</param>
+        /// <return>true if the message matches the selection 
criteria.</return>
+        public override bool Matches(Message message)
+        {
+            // NB: case-sensitive match
+            return message.CorrelationId == correlationId;
+        }
+       }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/Readers/ByIdMessageReader.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Readers/ByIdMessageReader.cs 
b/src/main/csharp/Readers/ByIdMessageReader.cs
new file mode 100644
index 0000000..f981ca8
--- /dev/null
+++ b/src/main/csharp/Readers/ByIdMessageReader.cs
@@ -0,0 +1,136 @@
+using System;
+using System.Messaging;
+using Apache.NMS.MSMQ;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ.Readers
+{
+    /// <summary>
+    /// MSMQ message reader, returning messages matching the specified
+    /// message identifier.
+    /// </summary>
+       public class ByIdMessageReader : AbstractMessageReader
+       {
+        private string messageId;
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="messageQueue">The MSMQ message queue from which
+        /// messages will be read.</param>
+        /// <param name="messageConverter">A message converter for mapping
+        /// MSMQ messages to NMS messages.</param>
+        /// <param name="messageId">The message identifier of messages to
+        /// be read.</param>
+        public ByIdMessageReader(MessageQueue messageQueue,
+            IMessageConverter messageConverter, string messageId)
+            : base(messageQueue, messageConverter)
+        {
+            this.messageId = messageId;
+        }
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available.
+        /// </summary>
+        /// <returns>Peeked message.</returns>
+        public override IMessage Peek()
+        {
+            return Convert(messageQueue.PeekById(messageId));
+        }
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available or the specified time-out occurs.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Peeked message.</returns>
+        public override IMessage Peek(TimeSpan timeSpan)
+        {
+            return Convert(messageQueue.PeekById(messageId, timeSpan));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by
+        /// the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of 
execution
+        /// until a message is available.
+        /// </summary>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive()
+        {
+            return Convert(messageQueue.ReceiveById(messageId));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by the
+        /// MessageQueue matching the selection criteria, and waits until 
either
+        /// a message is available in the queue, or the time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(TimeSpan timeSpan)
+        {
+            return Convert(messageQueue.ReceiveById(messageId, timeSpan));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of 
execution
+        /// until a message is available.
+        /// </summary>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(MessageQueueTransaction transaction)
+        {
+            return Convert(messageQueue.ReceiveById(messageId, transaction));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria,
+        /// and waits until either a message is available in the queue, or the
+        /// time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(TimeSpan timeSpan,
+            MessageQueueTransaction transaction)
+        {
+            return Convert(messageQueue.ReceiveById(messageId, timeSpan,
+                transaction));
+        }
+
+        /// <summary>
+        /// Checks if an MSMQ message matches the selection criteria.
+        /// </summary>
+        /// <param name="message">MSMQ message.</param>
+        /// <return>true if the message matches the selection 
criteria.</return>
+        public override bool Matches(Message message)
+        {
+            // NB: case-sensitive match
+            return message.Id == messageId;
+        }
+       }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/Readers/ByLookupIdMessageReader.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Readers/ByLookupIdMessageReader.cs 
b/src/main/csharp/Readers/ByLookupIdMessageReader.cs
new file mode 100644
index 0000000..421c52b
--- /dev/null
+++ b/src/main/csharp/Readers/ByLookupIdMessageReader.cs
@@ -0,0 +1,145 @@
+using System;
+using System.Messaging;
+using Apache.NMS.MSMQ;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ.Readers
+{
+    /// <summary>
+    /// MSMQ message reader, returning messages matching the specified
+    /// lookup identifier.
+    /// </summary>
+       public class ByLookupIdMessageReader : AbstractMessageReader
+       {
+        private Int64 lookupId;
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="messageQueue">The MSMQ message queue from which
+        /// messages will be read.</param>
+        /// <param name="messageConverter">A message converter for mapping
+        /// MSMQ messages to NMS messages.</param>
+        /// <param name="lookupId">The lookup identifier of the message
+        /// to be read.</param>
+        public ByLookupIdMessageReader(MessageQueue messageQueue,
+            IMessageConverter messageConverter, Int64 lookupId)
+            : base(messageQueue, messageConverter)
+        {
+            this.lookupId = lookupId;
+        }
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available.
+        /// </summary>
+        /// <returns>Peeked message.</returns>
+        public override IMessage Peek()
+        {
+            return Convert(messageQueue.PeekByLookupId(lookupId));
+        }
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available or the specified time-out occurs.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Peeked message.</returns>
+        public override IMessage Peek(TimeSpan timeSpan)
+        {
+            // No time-out option for receiving messages by lookup identifiers:
+            // either the message is present in the queue, or the method throws
+            // an exception immediately if the message is not in the queue. 
+            return Convert(messageQueue.PeekByLookupId(lookupId));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by
+        /// the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of 
execution
+        /// until a message is available.
+        /// </summary>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive()
+        {
+            return Convert(messageQueue.ReceiveByLookupId(lookupId));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by the
+        /// MessageQueue matching the selection criteria, and waits until 
either
+        /// a message is available in the queue, or the time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(TimeSpan timeSpan)
+        {
+            // No time-out option for receiving messages by lookup identifiers:
+            // either the message is present in the queue, or the method throws
+            // an exception immediately if the message is not in the queue. 
+            return Convert(messageQueue.ReceiveByLookupId(lookupId));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of 
execution
+        /// until a message is available.
+        /// </summary>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(MessageQueueTransaction transaction)
+        {
+            return Convert(messageQueue.ReceiveByLookupId(
+                MessageLookupAction.Current, lookupId, transaction));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria,
+        /// and waits until either a message is available in the queue, or the
+        /// time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(TimeSpan timeSpan,
+            MessageQueueTransaction transaction)
+        {
+            // No time-out option for receiving messages by lookup identifiers:
+            // either the message is present in the queue, or the method throws
+            // an exception immediately if the message is not in the queue. 
+            return Convert(messageQueue.ReceiveByLookupId(
+                MessageLookupAction.Current, lookupId, transaction));
+        }
+
+        /// <summary>
+        /// Checks if an MSMQ message matches the selection criteria.
+        /// </summary>
+        /// <param name="message">MSMQ message.</param>
+        /// <return>true if the message matches the selection 
criteria.</return>
+        public override bool Matches(Message message)
+        {
+            return message.LookupId == lookupId;
+        }
+       }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/Readers/BySelectorMessageReader.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Readers/BySelectorMessageReader.cs 
b/src/main/csharp/Readers/BySelectorMessageReader.cs
new file mode 100644
index 0000000..e7cd5c3
--- /dev/null
+++ b/src/main/csharp/Readers/BySelectorMessageReader.cs
@@ -0,0 +1,290 @@
+using System;
+using System.Messaging;
+using Apache.NMS.MSMQ;
+using Apache.NMS;
+using Apache.NMS.Selector;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ.Readers
+{
+    /// <summary>
+    /// MSMQ message reader, returning messages matching the specified
+    /// selector.
+    /// </summary>
+       public class BySelectorMessageReader : AbstractMessageReader
+       {
+        private string selector;
+        private MessageEvaluationContext evaluationContext;
+        private IBooleanExpression selectionExpression;
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="messageQueue">The MSMQ message queue from which
+        /// messages will be read.</param>
+        /// <param name="messageConverter">A message converter for mapping
+        /// MSMQ messages to NMS messages.</param>
+        /// <param name="selector">The selector string.</param>
+        public BySelectorMessageReader(MessageQueue messageQueue,
+            IMessageConverter messageConverter, string selector)
+            : base(messageQueue, messageConverter)
+        {
+            this.selector = selector;
+
+            SelectorParser selectorParser = new SelectorParser();
+            selectionExpression = selectorParser.Parse(selector);
+
+            evaluationContext = new MessageEvaluationContext(null);
+        }
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available.
+        /// </summary>
+        /// <returns>Peeked message.</returns>
+        public override IMessage Peek()
+        {
+            return InternalPeek(DateTime.MaxValue, true);
+        }
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available or the specified time-out occurs.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Peeked message.</returns>
+        public override IMessage Peek(TimeSpan timeSpan)
+        {
+            DateTime maxTime = DateTime.Now + timeSpan;
+            return InternalPeek(maxTime, true);
+        }
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by
+        /// the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of 
execution
+        /// until a message is available.
+        /// </summary>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive()
+        {
+            return InternalReceive(DateTime.MaxValue, null);
+        }
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by the
+        /// MessageQueue matching the selection criteria, and waits until 
either
+        /// a message is available in the queue, or the time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(TimeSpan timeSpan)
+        {
+            return InternalReceive(DateTime.Now + timeSpan, null);
+        }
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of 
execution
+        /// until a message is available.
+        /// </summary>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(MessageQueueTransaction transaction)
+        {
+            return InternalReceive(DateTime.MaxValue, transaction);
+        }
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria,
+        /// and waits until either a message is available in the queue, or the
+        /// time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(TimeSpan timeSpan,
+            MessageQueueTransaction transaction)
+        {
+            return InternalReceive(DateTime.Now + timeSpan, transaction);
+        }
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria,
+        /// and waits until either a message is available in the queue, or the
+        /// time-out expires.
+        /// </summary>
+        /// <param name="maxTime">Reception time-out.</param>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public IMessage InternalReceive(DateTime maxTime,
+            MessageQueueTransaction transaction)
+        {
+            // In a shared connection / multi-consumer context, the message may
+            // have been consumed by another client, after it was peeked but
+            // before it was peeked by this client. Hence the loop.
+            // (not sure it can be shared AND transactional, though).
+            while(true)
+            {
+                IMessage peekedMessage = InternalPeek(maxTime, false);
+
+                if(peekedMessage == null)
+                {
+                    return null;
+                }
+
+                try
+                {
+                    long lookupId = 
peekedMessage.Properties.GetLong("LookupId");
+
+                    Message message = (transaction == null ?
+                        messageQueue.ReceiveByLookupId(lookupId) :
+                        messageQueue.ReceiveByLookupId(
+                            MessageLookupAction.Current, lookupId, 
transaction));
+
+                    return Convert(message);
+                }
+                catch(InvalidOperationException exc)
+                {
+                    // TODO: filter exceptions, catch only exceptions due to   
                 
+                    // unknown lookup id.
+                }
+            }
+        }
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue, matching the selection criteria.
+        /// </summary>
+        /// <param name="maxTime">Reception time-out.</param>
+        /// <param name="convertBody">true if message body should be 
converted.</param>
+        /// <returns>Peeked message.</returns>
+        private IMessage InternalPeek(DateTime maxTime, bool convertBody)
+        {
+            TimeSpan timeSpan = maxTime - DateTime.Now;
+            if(timeSpan <= TimeSpan.Zero)
+            {
+                timeSpan = TimeSpan.Zero;
+            }
+
+            Cursor cursor = messageQueue.CreateCursor();
+
+            PeekAction action = PeekAction.Current;
+
+            while(true)
+            {
+                Message msmqMessage = null;
+
+                try
+                {
+                    msmqMessage = messageQueue.Peek(timeSpan, cursor, action);
+                }
+                catch(MessageQueueException exc)
+                {
+                    if(exc.MessageQueueErrorCode != 
MessageQueueErrorCode.IOTimeout)
+                    {
+                        throw exc;
+                    }
+                }
+
+                if(msmqMessage == null)
+                {
+                    return null;
+                }
+
+                IMessage nmsMessage = InternalMatch(msmqMessage, convertBody);
+
+                if(nmsMessage != null)
+                {
+                    return nmsMessage;
+                }
+
+                action = PeekAction.Next;
+            }
+        }
+
+        /// <summary>
+        /// Checks if an MSMQ message matches the selection criteria. If 
matched
+        /// the method returns the converted NMS message. Else it returns null.
+        /// </summary>
+        /// <param name="message">The MSMQ message to check.</param>
+        /// <param name="convertBody">true if the message body should be
+        /// converted.</param>
+        /// <returns>The matching message converted to NMS, or null.</returns>
+        private IMessage InternalMatch(Message message, bool convertBody)
+        {
+            if(messageConverterEx == null)
+            {
+                IMessage nmsMessage = messageConverter.ToNmsMessage(message);
+
+                evaluationContext.Message = nmsMessage;
+
+                if(selectionExpression.Matches(evaluationContext))
+                {
+                    return nmsMessage;
+                }
+            }
+            else
+            {
+                // This version converts the message body only for those
+                // messages matching the selection criteria.
+                // Relies on MessageConverterEx for partial conversions.
+                IMessage nmsMessage = messageConverterEx.ToNmsMessage(
+                    message, false);
+
+                evaluationContext.Message = nmsMessage;
+
+                if(selectionExpression.Matches(evaluationContext))
+                {
+                    if(convertBody)
+                    {
+                        messageConverterEx.ConvertMessageBodyToNMS(
+                            message, nmsMessage);
+                    }
+
+                    return nmsMessage;
+                }
+            }
+
+            return null;
+        }
+
+        /// <summary>
+        /// Checks if an MSMQ message matches the selection criteria.
+        /// </summary>
+        /// <param name="message">MSMQ message.</param>
+        /// <return>true if the message matches the selection 
criteria.</return>
+        public override bool Matches(Message message)
+        {
+            IMessage nmsMessage = messageConverterEx == null ?
+                messageConverter.ToNmsMessage(message) :
+                messageConverterEx.ToNmsMessage(message, false);
+
+            evaluationContext.Message = nmsMessage;
+
+            return selectionExpression.Matches(evaluationContext);
+        }
+       }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/Readers/IMessageReader.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Readers/IMessageReader.cs 
b/src/main/csharp/Readers/IMessageReader.cs
new file mode 100644
index 0000000..8168664
--- /dev/null
+++ b/src/main/csharp/Readers/IMessageReader.cs
@@ -0,0 +1,93 @@
+using System;
+using System.Messaging;
+using Apache.NMS.MSMQ;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ.Readers
+{
+    /// <summary>
+    /// MSMQ message reader.
+    /// </summary>
+       public interface IMessageReader
+       {
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available.
+        /// </summary>
+        /// <returns>Peeked message.</returns>
+        IMessage Peek();
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available or the specified time-out occurs.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Peeked message.</returns>
+        IMessage Peek(TimeSpan timeSpan);
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by
+        /// the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of 
execution
+        /// until a message is available.
+        /// </summary>
+        /// <returns>Received message.</returns>
+        IMessage Receive();
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by the
+        /// MessageQueue matching the selection criteria, and waits until 
either
+        /// a message is available in the queue, or the time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Received message.</returns>
+        IMessage Receive(TimeSpan timeSpan);
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of 
execution
+        /// until a message is available.
+        /// </summary>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        IMessage Receive(MessageQueueTransaction transaction);
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria,
+        /// and waits until either a message is available in the queue, or the
+        /// time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        IMessage Receive(TimeSpan timeSpan, MessageQueueTransaction 
transaction);
+
+        /// <summary>
+        /// Checks if an MSMQ message matches the selection criteria.
+        /// </summary>
+        /// <param name="message">MSMQ message.</param>
+        /// <return>true if the message matches the selection 
criteria.</return>
+        bool Matches(Message message);
+       }
+}

Reply via email to