Author: tabish
Date: Tue Nov 19 16:38:19 2013
New Revision: 1543485

URL: http://svn.apache.org/r1543485
Log:
https://issues.apache.org/jira/browse/AMQNET-458

Implementation

Modified:
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/BaseCommand.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs?rev=1543485&r1=1543484&r2=1543485&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs
 Tue Nov 19 16:38:19 2013
@@ -22,6 +22,6 @@ using System.Runtime.InteropServices;
 [assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2013 Apache Software 
Foundation")]
 [assembly: AssemblyTrademarkAttribute("")]
 [assembly: AssemblyCultureAttribute("")]
-[assembly: AssemblyVersionAttribute("1.7.0.3237")]
+[assembly: AssemblyVersionAttribute("1.7.0.3244")]
 [assembly: AssemblyInformationalVersionAttribute("1.7.0")]
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs?rev=1543485&r1=1543484&r2=1543485&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs 
(original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs 
Tue Nov 19 16:38:19 2013
@@ -302,29 +302,29 @@ namespace Apache.NMS.MQTT
                        }
                }
 
-//             internal void AddDispatcher(ConsumerId id, IDispatcher 
dispatcher)
-//             {
-//                     if(!this.closing.Value)
-//                     {
-//                             this.dispatchers.Add(id, dispatcher);
-//                     }
-//             }
-//
-//             internal void RemoveDispatcher(ConsumerId id)
-//             {
-//                     if(!this.closing.Value)
-//                     {
-//                             this.dispatchers.Remove(id);
-//                     }
-//             }
-//
-//             internal void AddProducer(ProducerId id, MessageProducer 
producer)
-//             {
-//                     if(!this.closing.Value)
-//                     {
-//                             this.producers.Add(id, producer);
-//                     }
-//             }
+               internal void AddDispatcher(int id, IDispatcher dispatcher)
+               {
+                       if(!this.closing.Value)
+                       {
+                               this.dispatchers.Add(id, dispatcher);
+                       }
+               }
+
+               internal void RemoveDispatcher(int id)
+               {
+                       if(!this.closing.Value)
+                       {
+                               this.dispatchers.Remove(id);
+                       }
+               }
+
+               internal void AddProducer(int id, MessageProducer producer)
+               {
+                       if(!this.closing.Value)
+                       {
+                               this.producers.Add(id, producer);
+                       }
+               }
 
                internal void RemoveProducer(int id)
                {
@@ -520,7 +520,6 @@ namespace Apache.NMS.MQTT
                                }
                        }
                }
-
        }
 }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs?rev=1543485&r1=1543484&r2=1543485&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs
 Tue Nov 19 16:38:19 2013
@@ -36,7 +36,6 @@ namespace Apache.NMS.MQTT.Messages
 
                public virtual void OnSend()
                {
-                       base.OnSend();
                        StoreContent();
                }
 
@@ -491,7 +490,8 @@ namespace Apache.NMS.MQTT.Messages
                     EndianBinaryReader reader = new EndianBinaryReader(target);
                     this.length = reader.ReadInt32();
                     
-                    target = 
this.Connection.CompressionPolicy.CreateDecompressionStream(target);
+                                       // TODO we could compress from 
+                    // target = 
this.Connection.CompressionPolicy.CreateDecompressionStream(target);
                 }
                 else
                 {
@@ -510,14 +510,14 @@ namespace Apache.NMS.MQTT.Messages
                                this.outputBuffer = new MemoryStream();
                 Stream target = this.outputBuffer;
 
-                if(this.Connection != null && this.Connection.UseCompression)
-                {
-                    this.length = 0;
-                                       this.Compressed = true;
-
-                    target = 
this.Connection.CompressionPolicy.CreateCompressionStream(target);              
      
-                    target = new LengthTrackerStream(target, this);
-                }
+//                if(this.Connection != null && this.Connection.UseCompression)
+//                {
+//                    this.length = 0;
+//                                     this.Compressed = true;
+//
+//                    target = 
this.Connection.CompressionPolicy.CreateCompressionStream(target);              
      
+//                    target = new LengthTrackerStream(target, this);
+//                }
                 
                                this.dataOut = new EndianBinaryWriter(target);
                        }

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs?rev=1543485&r1=1543484&r2=1543485&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs
 Tue Nov 19 16:38:19 2013
@@ -26,14 +26,19 @@ namespace Apache.NMS.MQTT.Messages
        public class MQTTMessage : IMessage, ICloneable
        {
                private readonly PUBLISH publish = new PUBLISH();
-               private MessagePropertyIntercepter propertyHelper;
-               private PrimitiveMap properties;
                private Connection connection;
                private Topic destination;
                private short messageId;
+               private byte[] content;
+        private bool compressed;
+               private int redeliveryCounter;
+        private bool persistent;
 
                public event AcknowledgeHandler Acknowledger;
 
+        private bool readOnlyMsgProperties;
+        private bool readOnlyMsgBody;
+
                public static MQTTMessage Transform(IMessage message)
                {
                        return (MQTTMessage) message;
@@ -45,17 +50,12 @@ namespace Apache.NMS.MQTT.Messages
 
         public override int GetHashCode()
         {
-            MessageId id = this.MessageId;
-
-            return id != null ? id.GetHashCode() : base.GetHashCode();
+            return messageId != 0 ? messageId : base.GetHashCode();
         }
 
                public virtual object Clone()
                {
-                       MQTTMessage cloneMessage = (MQTTMessage) base.Clone();
-
-                       cloneMessage.propertyHelper = new 
MessagePropertyIntercepter(cloneMessage, cloneMessage.properties, 
this.ReadOnlyProperties) { AllowByteArrays = false };
-                       return cloneMessage;
+                       return this.MemberwiseClone();
                }
 
         public override bool Equals(object that)
@@ -69,10 +69,10 @@ namespace Apache.NMS.MQTT.Messages
 
         public virtual bool Equals(MQTTMessage that)
         {
-            MessageId oMsg = that.MessageId;
-            MessageId thisMsg = this.MessageId;
+            short oMsg = that.MessageId;
+            short thisMsg = this.MessageId;
             
-            return thisMsg != null && oMsg != null && oMsg.Equals(thisMsg);
+            return thisMsg != 0 && oMsg != 0 && oMsg == thisMsg;
         }
         
                public void Acknowledge()
@@ -113,21 +113,35 @@ namespace Apache.NMS.MQTT.Messages
 
                #region Properties
 
+        public byte[] Content
+        {
+            get { return content; }
+            set { this.content = value; }
+        }
+
+        public bool Compressed
+        {
+            get { return compressed; }
+            set { this.compressed = value; }
+        }
+
+        public virtual bool ReadOnlyProperties
+        {
+            get { return this.readOnlyMsgProperties; }
+            set { this.readOnlyMsgProperties = value; }
+        }
+
+        public virtual bool ReadOnlyBody
+        {
+            get { return this.readOnlyMsgBody; }
+            set { this.readOnlyMsgBody = value; }
+        }
+
                public IPrimitiveMap Properties
                {
                        get
                        {
-                               if(null == properties)
-                               {
-                                       properties = 
PrimitiveMap.Unmarshal(MarshalledProperties);
-                                       propertyHelper = new 
MessagePropertyIntercepter(this, properties, this.ReadOnlyProperties)
-                                                            {AllowByteArrays = 
false};
-                                       
-                                       // Since JMS doesn't define a Byte 
array interface for properties we
-                                       // disable them here to prevent sending 
invalid data to the broker.
-                               }
-
-                               return propertyHelper;
+                               throw new NotSupportedException("MQTT does not 
support Message properties.");
                        }
                }
 
@@ -142,8 +156,8 @@ namespace Apache.NMS.MQTT.Messages
                /// </summary>
                public string NMSCorrelationID
                {
-                       get { return CorrelationId; }
-                       set { CorrelationId = value; }
+                       get { return String.Empty; }
+                       set {}
                }
 
                /// <summary>
@@ -151,38 +165,17 @@ namespace Apache.NMS.MQTT.Messages
                /// </summary>
                public IDestination NMSDestination
                {
-                       get { return Destination; }
-            set { Destination = value as ActiveMQDestination; }
+                       get { return destination; }
+            set { this.destination = value as Topic; }
                }
 
-               private TimeSpan timeToLive = TimeSpan.FromMilliseconds(0);
                /// <summary>
                /// The time in milliseconds that this message should expire in
                /// </summary>
                public TimeSpan NMSTimeToLive
                {
-                       get
-                       {
-                               if(Expiration > 0 && 
timeToLive.TotalMilliseconds <= 0.0)
-                               {
-                                       timeToLive = 
TimeSpan.FromMilliseconds(Expiration - Timestamp);
-                               }
-
-                               return timeToLive;
-                       }
-
-                       set
-                       {
-                               timeToLive = value;
-                               if(timeToLive.TotalMilliseconds > 0)
-                               {
-                                       Expiration = Timestamp + (long) 
timeToLive.TotalMilliseconds;
-                               }
-                               else
-                               {
-                                       Expiration = 0;
-                               }
-                       }
+                       get { return TimeSpan.MaxValue; }
+                       set {}
                }
 
                /// <summary>
@@ -190,33 +183,8 @@ namespace Apache.NMS.MQTT.Messages
                /// </summary>
                public string NMSMessageId
                {
-                       get
-                       {
-                           return null != MessageId ? 
BaseDataStreamMarshaller.ToString(MessageId) : String.Empty;
-                       }
-
-                   set
-            {
-                if(value != null) 
-                {
-                    try 
-                    {
-                        MessageId id = new MessageId(value);
-                        this.MessageId = id;
-                    } 
-                    catch(FormatException) 
-                    {
-                        // we must be some foreign JMS provider or strange 
user-supplied
-                        // String so lets set the IDs to be 1
-                        MessageId id = new MessageId();
-                        this.MessageId = id;
-                    }
-                } 
-                else
-                {
-                    this.MessageId = null;
-                }
-            }
+                       get { return this.messageId.ToString(); }
+                       set { this.messageId = Int16.Parse(value); }
                }
 
                /// <summary>
@@ -224,8 +192,8 @@ namespace Apache.NMS.MQTT.Messages
                /// </summary>
                public MsgDeliveryMode NMSDeliveryMode
                {
-                       get { return (Persistent ? MsgDeliveryMode.Persistent : 
MsgDeliveryMode.NonPersistent); }
-                       set { Persistent = (MsgDeliveryMode.Persistent == 
value); }
+                       get { return (persistent ? MsgDeliveryMode.Persistent : 
MsgDeliveryMode.NonPersistent); }
+                       set { persistent = (MsgDeliveryMode.Persistent == 
value); }
                }
 
                /// <summary>
@@ -233,8 +201,8 @@ namespace Apache.NMS.MQTT.Messages
                /// </summary>
                public MsgPriority NMSPriority
                {
-                       get { return (MsgPriority) Priority; }
-                       set { Priority = (byte) value; }
+                       get { return MsgPriority.Normal; }
+                       set {}
                }
 
                /// <summary>
@@ -242,22 +210,22 @@ namespace Apache.NMS.MQTT.Messages
                /// </summary>
                public bool NMSRedelivered
                {
-                       get { return (RedeliveryCounter > 0); }
+                       get { return (redeliveryCounter > 0); }
 
             set
             {
                 if(value == true)
                 {
-                    if(this.RedeliveryCounter <= 0)
+                    if(this.redeliveryCounter <= 0)
                     {
-                        this.RedeliveryCounter = 1;
+                        this.redeliveryCounter = 1;
                     }
                 }
                 else
                 {
-                    if(this.RedeliveryCounter > 0)
+                    if(this.redeliveryCounter > 0)
                     {
-                        this.RedeliveryCounter = 0;
+                        this.redeliveryCounter = 0;
                     }
                 }
             }
@@ -268,8 +236,8 @@ namespace Apache.NMS.MQTT.Messages
                /// </summary>
                public IDestination NMSReplyTo
                {
-                       get { return ReplyTo; }
-                       set { ReplyTo = ActiveMQDestination.Transform(value); }
+                       get { return null; }
+                       set { }
                }
 
                /// <summary>
@@ -277,15 +245,8 @@ namespace Apache.NMS.MQTT.Messages
                /// </summary>
                public DateTime NMSTimestamp
                {
-                       get { return DateUtils.ToDateTime(Timestamp); }
-                       set
-                       {
-                               Timestamp = DateUtils.ToJavaTimeUtc(value);
-                               if(timeToLive.TotalMilliseconds > 0)
-                               {
-                                       Expiration = Timestamp + (long) 
timeToLive.TotalMilliseconds;
-                               }
-                       }
+                       get { return DateTime.Now; }
+                       set {}
                }
 
                /// <summary>
@@ -297,7 +258,7 @@ namespace Apache.NMS.MQTT.Messages
                        set {  }
                }
 
-               public int MessageId
+               public short MessageId
                {
                        get { return this.messageId; }
                        set { this.messageId = value; }
@@ -305,15 +266,6 @@ namespace Apache.NMS.MQTT.Messages
 
                #endregion
 
-               public object GetObjectProperty(string name)
-               {
-                       return Properties[name];
-               }
-
-               public void SetObjectProperty(string name, object value)
-               {
-                       Properties[name] = value;
-               }
        }
 }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs?rev=1543485&r1=1543484&r2=1543485&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs 
(original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs Tue 
Nov 19 16:38:19 2013
@@ -35,22 +35,22 @@ namespace Apache.NMS.MQTT
                        get { return this.name; }
                }
 
-               DestinationType DestinationType 
+               public DestinationType DestinationType 
                { 
                        get { return DestinationType.Topic; }
                }
                
-               bool IsTopic
+               public bool IsTopic
                { 
                        get { return true; }
                }
                
-               bool IsQueue 
+               public bool IsQueue 
                { 
                        get { return false; }
                }
                
-               bool IsTemporary 
+               public bool IsTemporary 
                { 
                        get { return false; }
                }

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/BaseCommand.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/BaseCommand.cs?rev=1543485&r1=1543484&r2=1543485&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/BaseCommand.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/BaseCommand.cs
 Tue Nov 19 16:38:19 2013
@@ -29,6 +29,16 @@ namespace Apache.NMS.MQTT.Transport
             set { this.commandId = value; }
         }
 
+               public virtual int CommandType
+               {
+                       get { return 0; }
+               }
+
+               public virtual string CommandName
+               {
+                       get { return this.GetType().Name; }
+               }
+
         public override int GetHashCode()
         {
             return (CommandId * 37) + CommandType;
@@ -143,14 +153,20 @@ namespace Apache.NMS.MQTT.Transport
 
         public virtual Object Clone()
         {
-            // Since we are a derived class use the base's Clone()
-            // to perform the shallow copy. Since it is shallow it
-            // will include our derived class. Since we are derived,
-            // this method is an override.
-            BaseCommand o = (BaseCommand) base.Clone();
-
-            return o;
+            return this.MemberwiseClone();
         }
+
+               public int HashCode(object value)
+               {
+                       if(value != null)
+                       {
+                               return value.GetHashCode();
+                       }
+                       else
+                       {
+                               return -1;
+                       }
+               }
     }
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj?rev=1543485&r1=1543484&r2=1543485&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj Tue Nov 
19 16:38:19 2013
@@ -91,6 +91,8 @@
     <Compile 
Include="src\main\csharp\Transport\MQTTTransportFactoryAttribute.cs" />
     <Compile Include="src\main\csharp\Commands\PUBCOMP.cs" />
     <Compile Include="src\main\csharp\RequestTimedOutException.cs" />
+    <Compile Include="src\main\csharp\Threads\DefaultThreadPools.cs" />
+    <Compile Include="src\main\csharp\Threads\PooledTaskRunner.cs" />
   </ItemGroup>
   <ItemGroup>
     <Folder Include="keyfile\" />


Reply via email to