Author: tabish
Date: Tue Nov 19 18:32:02 2013
New Revision: 1543529

URL: http://svn.apache.org/r1543529
Log:
https://issues.apache.org/jira/browse/AMQNET-458

Implementation

Added:
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs
   (with props)
Modified:
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBCOMP.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs
 Tue Nov 19 18:32:02 2013
@@ -30,12 +30,12 @@ namespace Apache.NMS.MQTT.Commands
                        set { this.returnCode = value; }
                }
 
-               public int CommandType
+               public override int CommandType
                {
                        get { return TYPE; }
                }
 
-               public string CommandName
+               public override string CommandName
                {
                        get { return "CONNACK"; }
                }

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs
 Tue Nov 19 18:32:02 2013
@@ -32,12 +32,12 @@ namespace Apache.NMS.MQTT.Commands
                public const byte TYPE = 1;
                public const String PROTOCOL_NAME = "MQIsdp";
 
-               public int CommandType
+               public override int CommandType
                {
                        get { return TYPE; }
                }
 
-               public string CommandName
+               public override string CommandName
                {
                        get { return "CONNECT"; }
                }

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs
 Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
        {
                public const byte TYPE = 14;
 
-               public int CommandType
+               public override int CommandType
                {
                        get { return TYPE; }
                }
 
-               public string CommandName
+               public override string CommandName
                {
                        get { return "DISCONNECT"; }
                }

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs
 Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
        {
                public const byte TYPE = 11;
 
-               public int CommandType
+               public override int CommandType
                {
                        get { return TYPE; }
                }
 
-               public string CommandName
+               public override string CommandName
                {
                        get { return "PINGREQ"; }
                }

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs
 Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
        {
                public const byte TYPE = 13;
 
-               public int CommandType
+               public override int CommandType
                {
                        get { return TYPE; }
                }
 
-               public string CommandName
+               public override string CommandName
                {
                        get { return "PINGRESP"; }
                }

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs
 Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
        {
                public const byte TYPE = 4;
 
-               public int CommandType
+               public override int CommandType
                {
                        get { return TYPE; }
                }
 
-               public string CommandName
+               public override string CommandName
                {
                        get { return "PUBACK"; }
                }

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBCOMP.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBCOMP.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBCOMP.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBCOMP.cs
 Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
        {
                public const byte TYPE = 7;
 
-               public int CommandType
+               public override int CommandType
                {
                        get { return TYPE; }
                }
 
-               public string CommandName
+               public override string CommandName
                {
                        get { return "PUBCOMP"; }
                }

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs
 Tue Nov 19 18:32:02 2013
@@ -32,12 +32,12 @@ namespace Apache.NMS.MQTT.Commands
        {
                public const byte TYPE = 3;
 
-               public int CommandType
+               public override int CommandType
                {
                        get { return TYPE; }
                }
 
-               public string CommandName
+               public override string CommandName
                {
                        get { return "PUBLISH"; }
                }

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs
 Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
        {
                public const byte TYPE = 5;
 
-               public int CommandType
+               public override int CommandType
                {
                        get { return TYPE; }
                }
 
-               public string CommandName
+               public override string CommandName
                {
                        get { return "PUBREC"; }
                }

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs
 Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
        {
                public const byte TYPE = 6;
 
-               public int CommandType
+               public override int CommandType
                {
                        get { return TYPE; }
                }
 
-               public string CommandName
+               public override string CommandName
                {
                        get { return "PUBREL"; }
                }

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs
 Tue Nov 19 18:32:02 2013
@@ -29,12 +29,12 @@ namespace Apache.NMS.MQTT.Commands
        {
                public const byte TYPE = 8;
 
-               public int CommandType
+               public override int CommandType
                {
                        get { return TYPE; }
                }
 
-               public string CommandName
+               public override string CommandName
                {
                        get { return "SUBACK"; }
                }

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs
 Tue Nov 19 18:32:02 2013
@@ -27,12 +27,12 @@ namespace Apache.NMS.MQTT.Commands
        {
                public const byte TYPE = 7;
 
-               public int CommandType
+               public override int CommandType
                {
                        get { return TYPE; }
                }
 
-               public string CommandName
+               public override string CommandName
                {
                        get { return "SUBSCRIBE"; }
                }

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs
 Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
        {
                public const byte TYPE = 10;
 
-               public int CommandType
+               public override int CommandType
                {
                        get { return TYPE; }
                }
 
-               public string CommandName
+               public override string CommandName
                {
                        get { return "UNSUBACK"; }
                }

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs
 Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
        {
                public const byte TYPE = 9;
 
-               public int CommandType
+               public override int CommandType
                {
                        get { return TYPE; }
                }
 
-               public string CommandName
+               public override string CommandName
                {
                        get { return "UNSUBSCRIBE"; }
                }

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs 
(original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs 
Tue Nov 19 18:32:02 2013
@@ -28,7 +28,6 @@ namespace Apache.NMS.MQTT
 {
        public class Connection : IConnection
        {
-               private static readonly IdGenerator CONNECTION_ID_GENERATOR = 
new IdGenerator();
                private static readonly TimeSpan InfiniteTimeSpan = 
TimeSpan.FromMilliseconds(Timeout.Infinite);
 
                private AcknowledgementMode acknowledgementMode = 
AcknowledgementMode.AutoAcknowledge;
@@ -38,14 +37,12 @@ namespace Apache.NMS.MQTT
         private readonly IList sessions = ArrayList.Synchronized(new 
ArrayList());
         private readonly IDictionary dispatchers = Hashtable.Synchronized(new 
Hashtable());
                private readonly IDictionary producers = 
Hashtable.Synchronized(new Hashtable());
-        private readonly object myLock = new object();
         private readonly Atomic<bool> connected = new Atomic<bool>(false);
         private readonly Atomic<bool> closed = new Atomic<bool>(false);
         private readonly Atomic<bool> closing = new Atomic<bool>(false);
         private readonly Atomic<bool> transportFailed = new 
Atomic<bool>(false);
                private readonly object connectedLock = new object();
         private Exception firstFailureError = null;
-               private bool userSpecifiedClientID;
         private int sessionCounter = 0;
         private readonly Atomic<bool> started = new Atomic<bool>(false);
         private ConnectionMetaData metaData = null;
@@ -55,6 +52,8 @@ namespace Apache.NMS.MQTT
         private readonly ThreadPoolExecutor executor = new 
ThreadPoolExecutor();
                private readonly IdGenerator clientIdGenerator;
                private IRedeliveryPolicy redeliveryPolicy;
+               private Scheduler scheduler = null;
+               private bool userSpecifiedClientID;
 
                public Connection(Uri connectionUri, ITransport transport, 
IdGenerator clientIdGenerator)
                {
@@ -63,6 +62,7 @@ namespace Apache.NMS.MQTT
 
                        SetTransport(transport);
 
+                       this.messageTransformation = new 
MQTTMessageTransformation(this);
                        this.info = new CONNECT();
                }
 
@@ -121,21 +121,21 @@ namespace Apache.NMS.MQTT
             get { return info.ClientId; }
             set
             {
-                if(this.connected.Value)
-                {
-                    throw new NMSException("You cannot change the ClientId 
once the Connection is connected");
-                }
-
-                this.info.ClientId = value;
-                this.userSpecifiedClientID = true;
-                CheckConnected();
+                               if(this.connected.Value)
+                               {
+                                       throw new NMSException("You cannot 
change the ClientId once the Connection is connected");
+                               }
+
+                               this.info.ClientId = value;
+                               this.userSpecifiedClientID = true;
+                               CheckConnected();
             }
         }
 
                /// <summary>
                /// The Default Client Id used if the ClientId property is not 
set explicity.
                /// </summary>
-               public string DefaultClientId
+               internal string DefaultClientId
                {
                        set
                        {
@@ -282,7 +282,8 @@ namespace Apache.NMS.MQTT
                protected virtual Session CreateMQTTSession(AcknowledgementMode 
ackMode)
                {
                        CheckConnected();
-                       return new Session(this, ackMode);
+                       int sessionId = Interlocked.Increment(ref 
sessionCounter);
+                       return new Session(this, ackMode, sessionId);
                }
 
                internal void AddSession(Session session)
@@ -340,7 +341,81 @@ namespace Apache.NMS.MQTT
 
                public void Close()
                {
-                       // TODO
+                       if(!this.closed.Value && !transportFailed.Value)
+                       {
+                               this.Stop();
+                       }
+
+                       lock(connectedLock)
+                       {
+                               if(this.closed.Value)
+                               {
+                                       return;
+                               }
+
+                               try
+                               {
+                                       Tracer.InfoFormat("Connection[{0}]: 
Closing Connection Now.", this.ClientId);
+                                       this.closing.Value = true;
+
+                    Scheduler scheduler = this.scheduler;
+                    if (scheduler != null) 
+                                       {
+                        try 
+                                               {
+                            scheduler.Stop();
+                        } 
+                                               catch (Exception e) 
+                                               {
+                            throw NMSExceptionSupport.Create(e);
+                        }
+                    }
+
+                                       lock(sessions.SyncRoot)
+                                       {
+                                               foreach(Session session in 
sessions)
+                                               {
+                                                       session.Shutdown();
+                                               }
+                                       }
+                                       sessions.Clear();
+
+                                       // Connected is true only when we've 
successfully sent our CONNECT
+                                       // to the broker, so if we haven't 
announced ourselves there's no need to
+                                       // inform the broker of a remove, and 
if the transport is failed, why bother.
+                                       if(connected.Value && 
!transportFailed.Value)
+                                       {
+                                               DISCONNECT disconnect = new 
DISCONNECT();
+                                               transport.Oneway(disconnect);
+                                       }
+
+                                       executor.Shutdown();
+                                       if 
(!executor.AwaitTermination(TimeSpan.FromMinutes(1)))
+                                       {
+                                               
Tracer.DebugFormat("Connection[{0}]: Failed to properly shutdown its executor", 
this.ClientId);
+                                       }
+
+                                       Tracer.DebugFormat("Connection[{0}]: 
Disposing of the Transport.", this.ClientId);
+                                       transport.Stop();
+                                       transport.Dispose();
+                               }
+                               catch(Exception ex)
+                               {
+                                       Tracer.ErrorFormat("Connection[{0}]: 
Error during connection close: {1}", ClientId, ex);
+                               }
+                               finally
+                               {
+                                       if(executor != null)
+                                       {
+                                               executor.Shutdown();
+                                       }
+
+                                       this.transport = null;
+                                       this.closed.Value = true;
+                                       this.connected.Value = false;
+                                       this.closing.Value = false;
+                               }
+                       }
                }
 
                public void Dispose()
@@ -520,6 +595,36 @@ namespace Apache.NMS.MQTT
                                }
                        }
                }
+
+           internal Scheduler Scheduler
+               {
+                       get
+                       {
+                       Scheduler result = this.scheduler;
+                       if (result == null) 
+                               {
+                           lock (this) 
+                                       {
+                               result = scheduler;
+                               if (result == null) 
+                                               {
+                                   CheckClosed();
+                                   try 
+                                                       {
+                                       result = scheduler = new Scheduler(
+                                                                       
"MQTTConnection["+this.info.ClientId+"] Scheduler");
+                                       scheduler.Start();
+                                   }
+                                                       catch(Exception e)
+                                                       {
+                                       throw NMSExceptionSupport.Create(e);
+                                   }
+                               }
+                           }
+                       }
+                       return result;
+                       }
+           }
        }
 }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs
 Tue Nov 19 18:32:02 2013
@@ -107,6 +107,8 @@ namespace Apache.NMS.MQTT
 
                 if(this.clientId != null)
                 {
+                                       // Set the connection factory version 
as the default, the user can
+                                       // still override this via a call to 
Connection.ClientId = XXX
                     connection.DefaultClientId = this.clientId;
                 }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
 Tue Nov 19 18:32:02 2013
@@ -38,11 +38,39 @@ namespace Apache.NMS.MQTT
                private ThreadPoolExecutor executor;
                private int consumerId;
                protected bool disposed = false;
+               private Topic destination = null;
 
                private event MessageListener listener;
 
-               public MessageConsumer()
+               public MessageConsumer(Session session, Topic destination, int 
consumerId)
                {
+                       if(destination == null)
+                       {
+                               throw new InvalidDestinationException("Consumer 
cannot receive on Null Destinations.");
+            }
+            else if(destination.TopicName == null)
+            {
+                throw new InvalidDestinationException("The destination object 
was not given a physical name.");
+            }
+
+                       this.session = session;
+                       this.consumerId = consumerId;
+                       this.destination = destination;
+                       this.messageTransformation = 
this.session.Connection.MessageTransformation;
+                       this.unconsumedMessages = new 
FifoMessageDispatchChannel();
+
+                       // If the destination contained a URI query, then use 
it to set public properties
+                       // on the ConsumerInfo
+                       if(destination.Options != null)
+                       {
+                               // Get options prefixed with "consumer.*"
+                               StringDictionary options = 
URISupport.GetProperties(destination.Options, "consumer.");
+                               // Extract out custom extension options 
"consumer.nms.*"
+                               StringDictionary customConsumerOptions = 
URISupport.ExtractProperties(options, "nms.");
+
+                               URISupport.SetProperties(this, options);
+                               URISupport.SetProperties(this, 
customConsumerOptions, "nms.");
+                       }
                }
 
                #region Property Accessors
@@ -94,6 +122,11 @@ namespace Apache.NMS.MQTT
                        get { return this.consumerId; }
                }
 
+               public Topic Destination
+               {
+                       get { return this.destination; }
+               }
+
                #endregion
 
                public void Start()

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs
 Tue Nov 19 18:32:02 2013
@@ -36,11 +36,20 @@ namespace Apache.NMS.MQTT
 
                private readonly MessageTransformation messageTransformation;
 
-               public MessageProducer(Session session, TimeSpan requestTimeout)
+               public MessageProducer(Session session, Topic destination, 
TimeSpan requestTimeout, int producerId)
                {
                        this.session = session;
                        this.RequestTimeout = requestTimeout;
+                       this.producerId = producerId;
+                       this.destination = destination;
                        this.messageTransformation = 
session.Connection.MessageTransformation;
+
+                       // If the destination contained a URI query, then use 
it to set public
+                       // properties on the ProducerInfo
+                       if (destination != null && destination.Options != null)
+                       {
+                               URISupport.SetProperties(this, 
destination.Options, "producer.");
+                       }
                }
 
                ~MessageProducer()

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs 
(original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs 
Tue Nov 19 18:32:02 2013
@@ -16,6 +16,7 @@
 //
 using System;
 using System.Collections;
+using System.Threading;
 using Apache.NMS.MQTT.Messages;
 
 namespace Apache.NMS.MQTT
@@ -43,9 +44,10 @@ namespace Apache.NMS.MQTT
 
         private readonly AcknowledgementMode acknowledgementMode;
         private TimeSpan disposeStopTimeout = TimeSpan.FromMilliseconds(30000);
+        private TimeSpan closeStopTimeout = 
TimeSpan.FromMilliseconds(Timeout.Infinite);
         private TimeSpan requestTimeout;
 
-               public Session(Connection connection, AcknowledgementMode 
acknowledgementMode)
+               public Session(Connection connection, AcknowledgementMode 
acknowledgementMode, int sessionId)
                {
             this.connection = connection;
             this.acknowledgementMode = acknowledgementMode;
@@ -54,6 +56,7 @@ namespace Apache.NMS.MQTT
             this.ProducerTransformer = connection.ProducerTransformer;
 
             this.executor = new SessionExecutor(this, this.consumers);
+                       this.sessionId = sessionId;
 
             if(connection.IsStarted)
             {
@@ -146,6 +149,11 @@ namespace Apache.NMS.MQTT
             set { this.requestTimeout = value; }
         }
 
+               public int SessionId
+               {
+                       get { return this.sessionId; }
+               }
+
         private ConsumerTransformerDelegate consumerTransformer;
         /// <summary>
         /// A Delegate that is called each time a Message is dispatched to 
allow the client to do
@@ -221,8 +229,65 @@ namespace Apache.NMS.MQTT
 
                internal void DoClose()
                {
+                       Shutdown();
                }
 
+        internal void Shutdown()
+        {
+            Tracer.InfoFormat("Executing Shutdown on Session with Id {0}", 
this.SessionId);
+
+            if(this.closed)
+            {
+                return;
+            }
+
+            lock(myLock)
+            {
+                if(this.closed || this.closing)
+                {
+                    return;
+                }
+
+                try
+                {
+                    this.closing = true;
+
+                    // Stop all message deliveries from this Session
+                    this.executor.Stop(this.closeStopTimeout);
+
+                    lock(consumers.SyncRoot)
+                    {
+                        foreach(MessageConsumer consumer in consumers.Values)
+                        {
+                            consumer.FailureError = 
this.connection.FirstFailureError;
+                            consumer.Shutdown();
+                        }
+                    }
+                    consumers.Clear();
+
+                    lock(producers.SyncRoot)
+                    {
+                        foreach(MessageProducer producer in producers.Values)
+                        {
+                            producer.Shutdown();
+                        }
+                    }
+                    producers.Clear();
+
+                    Connection.RemoveSession(this);
+                }
+                catch(Exception ex)
+                {
+                    Tracer.ErrorFormat("Error during session close: {0}", ex);
+                }
+                finally
+                {
+                    this.closed = true;
+                    this.closing = false;
+                }
+            }
+        }
+
         public IMessageProducer CreateProducer()
         {
             return CreateProducer(null);
@@ -476,12 +541,22 @@ namespace Apache.NMS.MQTT
         /// if the message is in a transaction.
         /// </summary>
         /// <param name="message">
-        /// A <see cref="ActiveMQMessage"/>
+        /// A <see cref="MQTTMessage"/>
         /// </param>
         private static void DoNothingAcknowledge(MQTTMessage message)
         {
         }
 
+               private int NextConsumerId
+               {
+                       get { return Interlocked.Increment(ref 
this.consumerCounter); }
+               }
+
+               private int NextProducerId
+               {
+                       get { return Interlocked.Increment(ref 
this.producerCounter); }
+               }
+
        }
 }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs 
(original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs Tue 
Nov 19 18:32:02 2013
@@ -15,6 +15,7 @@
 // limitations under the License.
 //
 using System;
+using System.Collections.Specialized;
 
 namespace Apache.NMS.MQTT
 {
@@ -23,7 +24,8 @@ namespace Apache.NMS.MQTT
        /// </summary>
        public class Topic : ITopic
        {
-               string name;
+               private string name;
+               private StringDictionary options = null;
 
                public Topic(string name)
                {
@@ -54,6 +56,16 @@ namespace Apache.NMS.MQTT
                { 
                        get { return false; }
                }
+
+               /// <summary>
+               /// Dictionary of name/value pairs representing option values 
specified
+               /// in the URI used to create this Destination.  A null value 
is returned
+               /// if no options were specified.
+               /// </summary>
+               internal StringDictionary Options
+               {
+                       get { return this.options; }
+               }
        }
 }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs
 Tue Nov 19 18:32:02 2013
@@ -17,7 +17,7 @@
 
 using System;
 using Apache.NMS.Util;
-using Apache.NMS.MQTT.Commands;
+using Apache.NMS.MQTT.Messages;
 
 namespace Apache.NMS.MQTT.Util
 {
@@ -39,7 +39,9 @@ namespace Apache.NMS.MQTT.Util
 
         protected override IBytesMessage DoCreateBytesMessage()
         {
-                       return null;
+                       BytesMessage message = new BytesMessage();
+                       message.Connection = connection;
+                       return message;
         }
 
         protected override ITextMessage DoCreateTextMessage()

Added: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs?rev=1543529&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs
 (added)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs
 Tue Nov 19 18:32:02 2013
@@ -0,0 +1,27 @@
+using System;
+using System.Reflection;
+using System.Runtime.InteropServices;
+
+// 
------------------------------------------------------------------------------
+//  <autogenerated>
+//      This code was generated by a tool.
+//      Mono Runtime Version: 4.0.30319.1
+// 
+//      Changes to this file may cause incorrect behavior and will be lost if 
+//      the code is regenerated.
+//  </autogenerated>
+// 
------------------------------------------------------------------------------
+
+[assembly: ComVisibleAttribute(false)]
+[assembly: CLSCompliantAttribute(true)]
+[assembly: AssemblyTitleAttribute("Apache NMS for MQTT Class Library")]
+[assembly: AssemblyDescriptionAttribute("Apache NMS for MQTT Class Library 
(.Net Messaging Library Implementation): An implementation of the NMS API for 
MQTT")]
+[assembly: AssemblyConfigurationAttribute("SNAPSHOT")]
+[assembly: AssemblyCompanyAttribute("http://activemq.apache.org/nms";)]
+[assembly: AssemblyProductAttribute("Apache NMS for MQTT Class Library")]
+[assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2013 Apache Software 
Foundation")]
+[assembly: AssemblyTrademarkAttribute("")]
+[assembly: AssemblyCultureAttribute("")]
+[assembly: AssemblyVersionAttribute("1.7.0.3244")]
+[assembly: AssemblyInformationalVersionAttribute("1.7.0")]
+

Propchange: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to