Author: tabish
Date: Wed Nov 27 00:21:59 2013
New Revision: 1545889

URL: http://svn.apache.org/r1545889
Log:
https://issues.apache.org/jira/browse/AMQNET-458

Implementation

Modified:
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/   (props changed)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ErrorResponse.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
    
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/ConnectionFactoryTest.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt-tests.csproj
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.sln

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Wed Nov 27 00:21:59 2013
@@ -3,3 +3,4 @@ build
 lib
 vs2008-mqtt.userprefs
 bin
+test-results

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs 
(original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs 
Wed Nov 27 00:21:59 2013
@@ -548,7 +548,11 @@ namespace Apache.NMS.MQTT
                                                                                
else
                                                                                
{
                                                                                
        ErrorResponse error = response as ErrorResponse;
-                                                                               
        NMSException exception = error.Error;
+                                                                               
        Exception exception = error.Error;
+                                                                               
        if (!(exception is NMSException)) 
+                                                                               
        {
+                                                                               
                exception = new NMSException(exception.Message);
+                                                                               
        }
                                                                                
        // This is non-recoverable.
                                                                                
        // Shutdown the transport connection, and re-create it, but don't start 
it.
                                                                                
        // It will be started if the connection is re-attempted.

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs
 Wed Nov 27 00:21:59 2013
@@ -32,6 +32,10 @@ namespace Apache.NMS.MQTT.Protocol
 
         public void Marshal(Command cmd, BinaryWriter ds)
                {
+                       Tracer.DebugFormat("MQTT Command being sent: {0}", cmd);
+
+                       Console.WriteLine("MQTT Command being sent: {0}", cmd);
+
                        MemoryStream buffer = new MemoryStream();
                        EndianBinaryWriter writer = new 
EndianBinaryWriter(buffer);
 
@@ -41,6 +45,7 @@ namespace Apache.NMS.MQTT.Protocol
                        ds.Write(fixedHeader);
                        WriteLength((int)buffer.Length, ds);
             ds.Write(buffer.GetBuffer(), 0, (int) buffer.Length);
+                       ds.Flush();
                }
 
         public Command Unmarshal(BinaryReader dis)
@@ -67,16 +72,24 @@ namespace Apache.NMS.MQTT.Protocol
                                cmd.Decode(reader);
                        }
 
-                       // A CONNACK is a response, but if it has an error 
code, then we create a suitable
-                       // ErrorResponse here with the correct NMSException in 
its payload.
-                       if (cmd.IsCONNACK && cmd.IsErrorResponse)
+                       // A CONNACK is a response and it's correlationId is 
always 1, but if it has an 
+            // error code, then we create a suitable ErrorResponse here with 
the correct 
+            // NMSException in its payload.
+                       if (cmd.IsCONNACK) 
                        {
-                               CONNACK connAck = cmd as CONNACK;
-                               ErrorResponse error = new ErrorResponse();
-                               error.Error = 
MQTTExceptionFactory.CreateConnectionException(connAck.ReturnCode);
-                               cmd = error;
-                       }
+                   CONNACK connAck = cmd as CONNACK;
+                connAck.CorrelationId = 1;
+                               if (cmd.IsErrorResponse)
+                               {
+                                       ErrorResponse error = new 
ErrorResponse();
+                                       error.Error = 
MQTTExceptionFactory.CreateConnectionException(connAck.ReturnCode);
+                                       cmd = error;
+                               }
+                       } 
 
+                       Tracer.DebugFormat("MQTT Command received: {0}", cmd);
+                       Console.WriteLine("MQTT Command recieved: {0}", cmd);
+           
                        return cmd;
                }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs
 Wed Nov 27 00:21:59 2013
@@ -43,11 +43,13 @@ namespace Apache.NMS.MQTT.Transport
         short CommandId
         {
                        get;
+                       set;
         }
 
         bool ResponseRequired
         {
                        get;
+                       set;
         }
 
                bool IsResponse

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ErrorResponse.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ErrorResponse.cs?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ErrorResponse.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ErrorResponse.cs
 Wed Nov 27 00:21:59 2013
@@ -21,8 +21,8 @@ namespace Apache.NMS.MQTT.Transport
 {
        public class ErrorResponse : Response
        {
-               private NMSException error;
-               public NMSException Error
+               private Exception error;
+               public Exception Error
                {
                        get { return error; }
                        set { this.error = value; }

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
 Wed Nov 27 00:21:59 2013
@@ -27,7 +27,7 @@ namespace Apache.NMS.MQTT.Transport
     public class ResponseCorrelator : TransportFilter
     {
         private readonly IDictionary requestMap = Hashtable.Synchronized(new 
Hashtable());
-        private int nextCommandId;
+        private int nextCommandId = 1;  // 1 is always CONNECT -> CONNACK
                private Exception error;
 
         public ResponseCorrelator(ITransport next) : base(next)
@@ -40,24 +40,34 @@ namespace Apache.NMS.MQTT.Transport
             base.OnException(sender, command);
         }
 
-        internal int GetNextCommandId()
+        internal short GetNextCommandId()
         {
-            return Interlocked.Increment(ref nextCommandId);
+                       if (nextCommandId == UInt16.MaxValue)
+                       {
+                               nextCommandId = 1;
+                       }
+            return (short) Interlocked.Increment(ref nextCommandId);
         }
 
         public override void Oneway(Command command)
         {
-//            command.CommandId = GetNextCommandId();
-//                     command.ResponseRequired = false;
+            command.CommandId = GetNextCommandId();
+                       command.ResponseRequired = false;
             next.Oneway(command);
         }
 
         public override FutureResponse AsyncRequest(Command command)
         {
-            int commandId = GetNextCommandId();
-
-//            command.CommandId = commandId;
-//            command.ResponseRequired = true;
+                       Tracer.DebugFormat("ResponseCorrelator requesting: 
{0}", command);
+                       if (command.IsCONNECT)
+                       {
+                               command.CommandId = 1;
+                       }
+                       else
+                       {
+               command.CommandId = GetNextCommandId();
+                       }
+            command.ResponseRequired = true;
             FutureResponse future = new FutureResponse();
                Exception priorError = null;
                lock(requestMap.SyncRoot) 
@@ -65,17 +75,15 @@ namespace Apache.NMS.MQTT.Transport
                    priorError = this.error;
                    if(priorError == null) 
                                {
-                           requestMap[commandId] = future;
+                           requestMap[command.CommandId] = future;
                    }
                }
        
                if(priorError != null) 
                        {
-//                             BrokerError brError = new BrokerError();
-//                             brError.Message = priorError.Message;
-//                             ExceptionResponse response = new 
ExceptionResponse();
-//                             response.Exception = brError;
-//                 future.Response = response;
+                               ErrorResponse response = new ErrorResponse();
+                               response.Error = priorError;
+                   future.Response = response;
                 return future;
                }
                        
@@ -143,11 +151,9 @@ namespace Apache.NMS.MQTT.Transport
                        {
                                foreach(FutureResponse future in requests)
                                {
-//                                     BrokerError brError = new BrokerError();
-//                                     brError.Message = error.Message;
-//                                     ExceptionResponse response = new 
ExceptionResponse();
-//                                     response.Exception = brError;
-//                         future.Response = response;
+                                       ErrorResponse response = new 
ErrorResponse();
+                                       response.Error = error;
+                           future.Response = response;
                                }
                }
                }

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/ConnectionFactoryTest.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/ConnectionFactoryTest.cs?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/ConnectionFactoryTest.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/ConnectionFactoryTest.cs
 Wed Nov 27 00:21:59 2013
@@ -25,24 +25,24 @@ namespace Apache.NMS.MQTT.Test
        [TestFixture]
        public class ConnectionFactoryTest
        {
-//      [Test]
-//      [TestCase("mqtt:tcp://${activemqhost}:61613")]
-//      
[TestCase("stomp:failover:(tcp://${activemqhost}:61616?keepAlive=false&wireFormat.maxInactivityDuration=1000)")]
-//      
[TestCase("stomp:failover:(tcp://${activemqhost}:61616?keepAlive=false&wireFormat.maxInactivityDuration=1000)?connection.asyncSend=false")]
-//             
[TestCase("stomp:tcp://${activemqhost}:61613?connection.asyncsend=false")]
-//             
[TestCase("stomp:tcp://${activemqhost}:61613?connection.InvalidParameter=true", 
ExpectedException = typeof(NMSConnectionException))]
-//             
[TestCase("stomp:tcp://${activemqhost}:61613?connection.InvalidParameter=true", 
ExpectedException = typeof(NMSConnectionException))]
-//             
[TestCase("stomp:(tcp://${activemqhost}:61613)?connection.asyncSend=false", 
ExpectedException = typeof(NMSConnectionException))]
-//             [TestCase("stomp:tcp://InvalidHost:61613", ExpectedException = 
typeof(NMSConnectionException))]
-//             [TestCase("stomp:tcp://InvalidHost:61613", ExpectedException = 
typeof(NMSConnectionException))]
-//             
[TestCase("stomp:tcp://InvalidHost:61613?connection.asyncsend=false", 
ExpectedException = typeof(NMSConnectionException))]
-//             [TestCase("ftp://${activemqhost}:61613";, ExpectedException = 
typeof(NMSConnectionException))]
-//             [TestCase("http://${activemqhost}:61613";, ExpectedException = 
typeof(NMSConnectionException))]
-//             [TestCase("discovery://${activemqhost}:6155", ExpectedException 
= typeof(NMSConnectionException))]
-//             [TestCase("sms://${activemqhost}:61613", ExpectedException = 
typeof(NMSConnectionException))]
+        [Test]
+        [TestCase("tcp://${activemqhost}:1883")]
+//      
[TestCase("stomp:failover:(tcp://${activemqhost}:1883?keepAlive=false&wireFormat.maxInactivityDuration=1000)")]
+//      
[TestCase("stomp:failover:(tcp://${activemqhost}:1883?keepAlive=false&wireFormat.maxInactivityDuration=1000)?connection.asyncSend=false")]
+//             
[TestCase("stomp:tcp://${activemqhost}:1883?connection.asyncsend=false")]
+//             
[TestCase("stomp:tcp://${activemqhost}:1883?connection.InvalidParameter=true", 
ExpectedException = typeof(NMSConnectionException))]
+//             
[TestCase("stomp:tcp://${activemqhost}:1883?connection.InvalidParameter=true", 
ExpectedException = typeof(NMSConnectionException))]
+//             
[TestCase("stomp:(tcp://${activemqhost}:1883)?connection.asyncSend=false", 
ExpectedException = typeof(NMSConnectionException))]
+//             [TestCase("stomp:tcp://InvalidHost:1883", ExpectedException = 
typeof(NMSConnectionException))]
+//             [TestCase("stomp:tcp://InvalidHost:1883", ExpectedException = 
typeof(NMSConnectionException))]
+//             
[TestCase("stomp:tcp://InvalidHost:1883?connection.asyncsend=false", 
ExpectedException = typeof(NMSConnectionException))]
+//             [TestCase("ftp://${activemqhost}:1883";, ExpectedException = 
typeof(NMSConnectionException))]
+//             [TestCase("http://${activemqhost}:1883";, ExpectedException = 
typeof(NMSConnectionException))]
+//             [TestCase("discovery://${activemqhost}:1888", ExpectedException 
= typeof(NMSConnectionException))]
+//             [TestCase("sms://${activemqhost}:1883", ExpectedException = 
typeof(NMSConnectionException))]
 //             [TestCase("stomp:multicast://${activemqhost}:6155", 
ExpectedException = typeof(NMSConnectionException))]
-//             
[TestCase("(tcp://${activemqhost}:61613,tcp://${activemqhost}:61613)", 
ExpectedException = typeof(UriFormatException))]
-//             
[TestCase("tcp://${activemqhost}:61613,tcp://${activemqhost}:61613", 
ExpectedException = typeof(UriFormatException))]
+//             
[TestCase("(tcp://${activemqhost}:1883,tcp://${activemqhost}:1883)", 
ExpectedException = typeof(UriFormatException))]
+//             
[TestCase("tcp://${activemqhost}:1883,tcp://${activemqhost}:1883", 
ExpectedException = typeof(UriFormatException))]
         public void TestURI(string connectionURI)
         {
             IConnectionFactory factory = new ConnectionFactory(
@@ -53,6 +53,23 @@ namespace Apache.NMS.MQTT.Test
                 Assert.IsNotNull(connection);
             }
         }
+
+        [Test]
+        [TestCase("tcp://${activemqhost}:1883")]
+        public void TestConnectionStarts(string connectionURI)
+        {
+                       NMS.Tracer.Trace = new NmsConsoleTracer();
+            IConnectionFactory factory = new ConnectionFactory(
+                               NMSTestSupport.ReplaceEnvVar(connectionURI));
+            Assert.IsNotNull(factory);
+            using(IConnection connection = factory.CreateConnection("", ""))
+            {
+                Assert.IsNotNull(connection);
+                               // This should trigger a CONNECT frame and 
CONNACK response.
+                               connection.Start();
+            }
+        }
+
        }
 }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt-tests.csproj
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt-tests.csproj?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt-tests.csproj 
(original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt-tests.csproj Wed 
Nov 27 00:21:59 2013
@@ -6,7 +6,7 @@
     <ProductVersion>10.0.0</ProductVersion>
     <SchemaVersion>2.0</SchemaVersion>
     <ProjectGuid>{186FAC43-F3B0-4B03-82DA-EEC0169307B7}</ProjectGuid>
-    <OutputType>Exe</OutputType>
+    <OutputType>Library</OutputType>
     <RootNamespace>vs2008mqtttests</RootNamespace>
     <AssemblyName>vs2008-mqtt-tests</AssemblyName>
   </PropertyGroup>

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.sln
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.sln?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.sln (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.sln Wed Nov 27 
00:21:59 2013
@@ -21,6 +21,19 @@ Global
                {AEBC857B-D693-4833-9F1E-F6A22787D0C9}.Release|x86.Build.0 = 
Release|Any CPU
        EndGlobalSection
        GlobalSection(MonoDevelopProperties) = preSolution
-               StartupItem = vs2008-mqtt.csproj
+               StartupItem = vs2008-mqtt-tests.csproj
+               Policies = $0
+               $0.TextStylePolicy = $1
+               $1.inheritsSet = null
+               $1.scope = text/x-csharp
+               $0.CSharpFormattingPolicy = $2
+               $2.inheritsSet = Mono
+               $2.inheritsScope = text/x-csharp
+               $2.scope = text/x-csharp
+               $0.TextStylePolicy = $3
+               $3.FileWidth = 120
+               $3.inheritsSet = VisualStudio
+               $3.inheritsScope = text/plain
+               $3.scope = text/plain
        EndGlobalSection
 EndGlobal


Reply via email to