Author: tabish
Date: Wed Nov 27 00:21:59 2013
New Revision: 1545889
URL: http://svn.apache.org/r1545889
Log:
https://issues.apache.org/jira/browse/AMQNET-458
Implementation
Modified:
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/ (props changed)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ErrorResponse.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/ConnectionFactoryTest.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt-tests.csproj
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.sln
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Wed Nov 27 00:21:59 2013
@@ -3,3 +3,4 @@ build
lib
vs2008-mqtt.userprefs
bin
+test-results
Modified:
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
Wed Nov 27 00:21:59 2013
@@ -548,7 +548,11 @@ namespace Apache.NMS.MQTT
else
{
ErrorResponse error = response as ErrorResponse;
-
NMSException exception = error.Error;
+
Exception exception = error.Error;
+
if (!(exception is NMSException))
+
{
+
exception = new NMSException(exception.Message);
+
}
// This is non-recoverable.
// Shutdown the transport connection, and re-create it, but don't start
it.
// It will be started if the connection is re-attempted.
Modified:
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs
Wed Nov 27 00:21:59 2013
@@ -32,6 +32,10 @@ namespace Apache.NMS.MQTT.Protocol
public void Marshal(Command cmd, BinaryWriter ds)
{
+ Tracer.DebugFormat("MQTT Command being sent: {0}", cmd);
+
+ Console.WriteLine("MQTT Command being sent: {0}", cmd);
+
MemoryStream buffer = new MemoryStream();
EndianBinaryWriter writer = new
EndianBinaryWriter(buffer);
@@ -41,6 +45,7 @@ namespace Apache.NMS.MQTT.Protocol
ds.Write(fixedHeader);
WriteLength((int)buffer.Length, ds);
ds.Write(buffer.GetBuffer(), 0, (int) buffer.Length);
+ ds.Flush();
}
public Command Unmarshal(BinaryReader dis)
@@ -67,16 +72,24 @@ namespace Apache.NMS.MQTT.Protocol
cmd.Decode(reader);
}
- // A CONNACK is a response, but if it has an error
code, then we create a suitable
- // ErrorResponse here with the correct NMSException in
its payload.
- if (cmd.IsCONNACK && cmd.IsErrorResponse)
+ // A CONNACK is a response and it's correlationId is
always 1, but if it has an
+ // error code, then we create a suitable ErrorResponse here with
the correct
+ // NMSException in its payload.
+ if (cmd.IsCONNACK)
{
- CONNACK connAck = cmd as CONNACK;
- ErrorResponse error = new ErrorResponse();
- error.Error =
MQTTExceptionFactory.CreateConnectionException(connAck.ReturnCode);
- cmd = error;
- }
+ CONNACK connAck = cmd as CONNACK;
+ connAck.CorrelationId = 1;
+ if (cmd.IsErrorResponse)
+ {
+ ErrorResponse error = new
ErrorResponse();
+ error.Error =
MQTTExceptionFactory.CreateConnectionException(connAck.ReturnCode);
+ cmd = error;
+ }
+ }
+ Tracer.DebugFormat("MQTT Command received: {0}", cmd);
+ Console.WriteLine("MQTT Command recieved: {0}", cmd);
+
return cmd;
}
Modified:
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs
Wed Nov 27 00:21:59 2013
@@ -43,11 +43,13 @@ namespace Apache.NMS.MQTT.Transport
short CommandId
{
get;
+ set;
}
bool ResponseRequired
{
get;
+ set;
}
bool IsResponse
Modified:
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ErrorResponse.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ErrorResponse.cs?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ErrorResponse.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ErrorResponse.cs
Wed Nov 27 00:21:59 2013
@@ -21,8 +21,8 @@ namespace Apache.NMS.MQTT.Transport
{
public class ErrorResponse : Response
{
- private NMSException error;
- public NMSException Error
+ private Exception error;
+ public Exception Error
{
get { return error; }
set { this.error = value; }
Modified:
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
Wed Nov 27 00:21:59 2013
@@ -27,7 +27,7 @@ namespace Apache.NMS.MQTT.Transport
public class ResponseCorrelator : TransportFilter
{
private readonly IDictionary requestMap = Hashtable.Synchronized(new
Hashtable());
- private int nextCommandId;
+ private int nextCommandId = 1; // 1 is always CONNECT -> CONNACK
private Exception error;
public ResponseCorrelator(ITransport next) : base(next)
@@ -40,24 +40,34 @@ namespace Apache.NMS.MQTT.Transport
base.OnException(sender, command);
}
- internal int GetNextCommandId()
+ internal short GetNextCommandId()
{
- return Interlocked.Increment(ref nextCommandId);
+ if (nextCommandId == UInt16.MaxValue)
+ {
+ nextCommandId = 1;
+ }
+ return (short) Interlocked.Increment(ref nextCommandId);
}
public override void Oneway(Command command)
{
-// command.CommandId = GetNextCommandId();
-// command.ResponseRequired = false;
+ command.CommandId = GetNextCommandId();
+ command.ResponseRequired = false;
next.Oneway(command);
}
public override FutureResponse AsyncRequest(Command command)
{
- int commandId = GetNextCommandId();
-
-// command.CommandId = commandId;
-// command.ResponseRequired = true;
+ Tracer.DebugFormat("ResponseCorrelator requesting:
{0}", command);
+ if (command.IsCONNECT)
+ {
+ command.CommandId = 1;
+ }
+ else
+ {
+ command.CommandId = GetNextCommandId();
+ }
+ command.ResponseRequired = true;
FutureResponse future = new FutureResponse();
Exception priorError = null;
lock(requestMap.SyncRoot)
@@ -65,17 +75,15 @@ namespace Apache.NMS.MQTT.Transport
priorError = this.error;
if(priorError == null)
{
- requestMap[commandId] = future;
+ requestMap[command.CommandId] = future;
}
}
if(priorError != null)
{
-// BrokerError brError = new BrokerError();
-// brError.Message = priorError.Message;
-// ExceptionResponse response = new
ExceptionResponse();
-// response.Exception = brError;
-// future.Response = response;
+ ErrorResponse response = new ErrorResponse();
+ response.Error = priorError;
+ future.Response = response;
return future;
}
@@ -143,11 +151,9 @@ namespace Apache.NMS.MQTT.Transport
{
foreach(FutureResponse future in requests)
{
-// BrokerError brError = new BrokerError();
-// brError.Message = error.Message;
-// ExceptionResponse response = new
ExceptionResponse();
-// response.Exception = brError;
-// future.Response = response;
+ ErrorResponse response = new
ErrorResponse();
+ response.Error = error;
+ future.Response = response;
}
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/ConnectionFactoryTest.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/ConnectionFactoryTest.cs?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/ConnectionFactoryTest.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/ConnectionFactoryTest.cs
Wed Nov 27 00:21:59 2013
@@ -25,24 +25,24 @@ namespace Apache.NMS.MQTT.Test
[TestFixture]
public class ConnectionFactoryTest
{
-// [Test]
-// [TestCase("mqtt:tcp://${activemqhost}:61613")]
-//
[TestCase("stomp:failover:(tcp://${activemqhost}:61616?keepAlive=false&wireFormat.maxInactivityDuration=1000)")]
-//
[TestCase("stomp:failover:(tcp://${activemqhost}:61616?keepAlive=false&wireFormat.maxInactivityDuration=1000)?connection.asyncSend=false")]
-//
[TestCase("stomp:tcp://${activemqhost}:61613?connection.asyncsend=false")]
-//
[TestCase("stomp:tcp://${activemqhost}:61613?connection.InvalidParameter=true",
ExpectedException = typeof(NMSConnectionException))]
-//
[TestCase("stomp:tcp://${activemqhost}:61613?connection.InvalidParameter=true",
ExpectedException = typeof(NMSConnectionException))]
-//
[TestCase("stomp:(tcp://${activemqhost}:61613)?connection.asyncSend=false",
ExpectedException = typeof(NMSConnectionException))]
-// [TestCase("stomp:tcp://InvalidHost:61613", ExpectedException =
typeof(NMSConnectionException))]
-// [TestCase("stomp:tcp://InvalidHost:61613", ExpectedException =
typeof(NMSConnectionException))]
-//
[TestCase("stomp:tcp://InvalidHost:61613?connection.asyncsend=false",
ExpectedException = typeof(NMSConnectionException))]
-// [TestCase("ftp://${activemqhost}:61613", ExpectedException =
typeof(NMSConnectionException))]
-// [TestCase("http://${activemqhost}:61613", ExpectedException =
typeof(NMSConnectionException))]
-// [TestCase("discovery://${activemqhost}:6155", ExpectedException
= typeof(NMSConnectionException))]
-// [TestCase("sms://${activemqhost}:61613", ExpectedException =
typeof(NMSConnectionException))]
+ [Test]
+ [TestCase("tcp://${activemqhost}:1883")]
+//
[TestCase("stomp:failover:(tcp://${activemqhost}:1883?keepAlive=false&wireFormat.maxInactivityDuration=1000)")]
+//
[TestCase("stomp:failover:(tcp://${activemqhost}:1883?keepAlive=false&wireFormat.maxInactivityDuration=1000)?connection.asyncSend=false")]
+//
[TestCase("stomp:tcp://${activemqhost}:1883?connection.asyncsend=false")]
+//
[TestCase("stomp:tcp://${activemqhost}:1883?connection.InvalidParameter=true",
ExpectedException = typeof(NMSConnectionException))]
+//
[TestCase("stomp:tcp://${activemqhost}:1883?connection.InvalidParameter=true",
ExpectedException = typeof(NMSConnectionException))]
+//
[TestCase("stomp:(tcp://${activemqhost}:1883)?connection.asyncSend=false",
ExpectedException = typeof(NMSConnectionException))]
+// [TestCase("stomp:tcp://InvalidHost:1883", ExpectedException =
typeof(NMSConnectionException))]
+// [TestCase("stomp:tcp://InvalidHost:1883", ExpectedException =
typeof(NMSConnectionException))]
+//
[TestCase("stomp:tcp://InvalidHost:1883?connection.asyncsend=false",
ExpectedException = typeof(NMSConnectionException))]
+// [TestCase("ftp://${activemqhost}:1883", ExpectedException =
typeof(NMSConnectionException))]
+// [TestCase("http://${activemqhost}:1883", ExpectedException =
typeof(NMSConnectionException))]
+// [TestCase("discovery://${activemqhost}:1888", ExpectedException
= typeof(NMSConnectionException))]
+// [TestCase("sms://${activemqhost}:1883", ExpectedException =
typeof(NMSConnectionException))]
// [TestCase("stomp:multicast://${activemqhost}:6155",
ExpectedException = typeof(NMSConnectionException))]
-//
[TestCase("(tcp://${activemqhost}:61613,tcp://${activemqhost}:61613)",
ExpectedException = typeof(UriFormatException))]
-//
[TestCase("tcp://${activemqhost}:61613,tcp://${activemqhost}:61613",
ExpectedException = typeof(UriFormatException))]
+//
[TestCase("(tcp://${activemqhost}:1883,tcp://${activemqhost}:1883)",
ExpectedException = typeof(UriFormatException))]
+//
[TestCase("tcp://${activemqhost}:1883,tcp://${activemqhost}:1883",
ExpectedException = typeof(UriFormatException))]
public void TestURI(string connectionURI)
{
IConnectionFactory factory = new ConnectionFactory(
@@ -53,6 +53,23 @@ namespace Apache.NMS.MQTT.Test
Assert.IsNotNull(connection);
}
}
+
+ [Test]
+ [TestCase("tcp://${activemqhost}:1883")]
+ public void TestConnectionStarts(string connectionURI)
+ {
+ NMS.Tracer.Trace = new NmsConsoleTracer();
+ IConnectionFactory factory = new ConnectionFactory(
+ NMSTestSupport.ReplaceEnvVar(connectionURI));
+ Assert.IsNotNull(factory);
+ using(IConnection connection = factory.CreateConnection("", ""))
+ {
+ Assert.IsNotNull(connection);
+ // This should trigger a CONNECT frame and
CONNACK response.
+ connection.Start();
+ }
+ }
+
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt-tests.csproj
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt-tests.csproj?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt-tests.csproj
(original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt-tests.csproj Wed
Nov 27 00:21:59 2013
@@ -6,7 +6,7 @@
<ProductVersion>10.0.0</ProductVersion>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{186FAC43-F3B0-4B03-82DA-EEC0169307B7}</ProjectGuid>
- <OutputType>Exe</OutputType>
+ <OutputType>Library</OutputType>
<RootNamespace>vs2008mqtttests</RootNamespace>
<AssemblyName>vs2008-mqtt-tests</AssemblyName>
</PropertyGroup>
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.sln
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.sln?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.sln (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.sln Wed Nov 27
00:21:59 2013
@@ -21,6 +21,19 @@ Global
{AEBC857B-D693-4833-9F1E-F6A22787D0C9}.Release|x86.Build.0 =
Release|Any CPU
EndGlobalSection
GlobalSection(MonoDevelopProperties) = preSolution
- StartupItem = vs2008-mqtt.csproj
+ StartupItem = vs2008-mqtt-tests.csproj
+ Policies = $0
+ $0.TextStylePolicy = $1
+ $1.inheritsSet = null
+ $1.scope = text/x-csharp
+ $0.CSharpFormattingPolicy = $2
+ $2.inheritsSet = Mono
+ $2.inheritsScope = text/x-csharp
+ $2.scope = text/x-csharp
+ $0.TextStylePolicy = $3
+ $3.FileWidth = 120
+ $3.inheritsSet = VisualStudio
+ $3.inheritsScope = text/plain
+ $3.scope = text/plain
EndGlobalSection
EndGlobal