Author: tabish
Date: Thu Dec 10 20:24:36 2009
New Revision: 889412
URL: http://svn.apache.org/viewvc?rev=889412&view=rev
Log:
Fix the parsing of the MessageId to account for negative sessionIds from the
broker. fixes errors from Ack messages.
Modified:
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompHelper.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompHelper.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompHelper.cs?rev=889412&r1=889411&r2=889412&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompHelper.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompHelper.cs
Thu Dec 10 20:24:36 2009
@@ -32,7 +32,7 @@
for(int idx = 0; idx < text.Length; idx++)
{
- if(char.IsNumber(text, idx))
+ if(char.IsNumber(text, idx) || text[idx] == '-')
{
sbtext.Append(text[idx]);
}
@@ -207,7 +207,7 @@
answer.Value = ParseInt(text.Substring(idx + 1));
text = text.Substring(0, idx);
idx = text.LastIndexOf(':');
- if (idx >= 0)
+ if(idx >= 0)
{
answer.SessionId = ParseInt(text.Substring(idx + 1));
text = text.Substring(0, idx);
Modified:
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs?rev=889412&r1=889411&r2=889412&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
Thu Dec 10 20:24:36 2009
@@ -148,11 +148,10 @@
else if(command == "ERROR")
{
string text = frame.RemoveProperty("receipt-id");
-
- Tracer.Debug("StompWireFormat - Received ERROR command:");
if(text != null && text.StartsWith("ignore:"))
{
+ Tracer.Debug("StompWireFormat - Received ERROR Response
command: correlationId = " + text);
Response answer = new Response();
answer.CorrelationId =
Int32.Parse(text.Substring("ignore:".Length));
return answer;
@@ -168,13 +167,14 @@
BrokerError error = new BrokerError();
error.Message = frame.RemoveProperty("message");
answer.Exception = error;
+ Tracer.Debug("StompWireFormat - Received ERROR command: "
+ error.Message);
return answer;
}
}
else if(command == "MESSAGE")
{
Tracer.Debug("StompWireFormat - Received MESSAGE command");
- return CreateMessage(frame);
+ return ReadMessage(frame);
}
Tracer.Error("Unknown command: " + frame.Command + " headers: " +
frame.Properties);
@@ -182,7 +182,7 @@
return null;
}
- protected virtual Command CreateMessage(StompFrame frame)
+ protected virtual Command ReadMessage(StompFrame frame)
{
Message message = null;
if(frame.HasProperty("content-length"))
@@ -200,6 +200,9 @@
message.ReplyTo =
StompHelper.ToDestination(frame.RemoveProperty("reply-to"));
message.TargetConsumerId =
StompHelper.ToConsumerId(frame.RemoveProperty("subscription"));
message.CorrelationId = frame.RemoveProperty("correlation-id");
+
+ Tracer.Debug("RECV - Inbound MessageId = " +
frame.GetProperty("message-id"));
+
message.MessageId =
StompHelper.ToMessageId(frame.RemoveProperty("message-id"));
message.Persistent =
StompHelper.ToBool(frame.RemoveProperty("persistent"), true);
@@ -241,6 +244,84 @@
return dispatch;
}
+ protected virtual void WriteMessage(Message command, BinaryWriter
dataOut)
+ {
+ StompFrame frame = new StompFrame("SEND");
+ if(command.ResponseRequired)
+ {
+ frame.SetProperty("receipt", command.CommandId);
+ }
+
+ frame.SetProperty("destination",
StompHelper.ToStomp(command.Destination));
+
+ if(command.ReplyTo != null)
+ {
+ frame.SetProperty("reply-to",
StompHelper.ToStomp(command.ReplyTo));
+ }
+ if(command.CorrelationId != null )
+ {
+ frame.SetProperty("correlation-id", command.CorrelationId);
+ }
+ if(command.Expiration != 0)
+ {
+ frame.SetProperty("expires", command.Expiration);
+ }
+ if(command.Priority != 4)
+ {
+ frame.SetProperty("priority", command.Priority);
+ }
+ if(command.Type != null)
+ {
+ frame.SetProperty("type", command.Type);
+ }
+ if(command.TransactionId!=null)
+ {
+ frame.SetProperty("transaction",
StompHelper.ToStomp(command.TransactionId));
+ }
+
+ frame.SetProperty("persistent", command.Persistent);
+
+ // Perform any Content Marshaling.
+ command.BeforeMarshall(this);
+
+ // Store the Marshaled Content.
+ frame.Content = command.Content;
+
+ if(command is BytesMessage && command.Content != null &&
command.Content.Length > 0)
+ {
+ frame.SetProperty("content-length", command.Content.Length);
+ }
+
+ // Marshal all properties to the Frame.
+ IPrimitiveMap map = command.Properties;
+ foreach(string key in map.Keys)
+ {
+ frame.SetProperty(key, map[key]);
+ }
+
+ frame.ToStream(dataOut);
+ }
+
+ protected virtual void WriteMessageAck(MessageAck command,
BinaryWriter dataOut)
+ {
+ StompFrame frame = new StompFrame("ACK");
+ if(command.ResponseRequired)
+ {
+ frame.SetProperty("receipt", "ignore:" + command.CommandId);
+ }
+
+ frame.SetProperty("message-id",
StompHelper.ToStomp(command.LastMessageId));
+
+ Tracer.Debug("ACK - Outbound MessageId = " +
frame.GetProperty("message-id"));
+
+ if(command.TransactionId != null)
+ {
+ frame.SetProperty("transaction",
StompHelper.ToStomp(command.TransactionId));
+ }
+
+ frame.ToStream(dataOut);
+ }
+
protected virtual void WriteConnectionInfo(ConnectionInfo command,
BinaryWriter dataOut)
{
// lets force a receipt for the Connect Frame.
@@ -276,6 +357,8 @@
frame.SetProperty("durable-subscriber-name",
command.SubscriptionName);
frame.SetProperty("selector", command.Selector);
frame.SetProperty("ack", StompHelper.ToStomp(command.AckMode));
+
+ Tracer.Debug("SUBSCRIBE : Outbound AckMode = " +
frame.GetProperty("ack"));
if(command.NoLocal)
{
@@ -356,81 +439,6 @@
frame.ToStream(dataOut);
}
- protected virtual void WriteMessage(Message command, BinaryWriter
dataOut)
- {
- StompFrame frame = new StompFrame("SEND");
- if(command.ResponseRequired)
- {
- frame.SetProperty("receipt", command.CommandId);
- }
-
- frame.SetProperty("destination",
StompHelper.ToStomp(command.Destination));
-
- if(command.ReplyTo != null)
- {
- frame.SetProperty("reply-to",
StompHelper.ToStomp(command.ReplyTo));
- }
- if(command.CorrelationId != null )
- {
- frame.SetProperty("correlation-id", command.CorrelationId);
- }
- if(command.Expiration != 0)
- {
- frame.SetProperty("expires", command.Expiration);
- }
- if(command.Priority != 4)
- {
- frame.SetProperty("priority", command.Priority);
- }
- if(command.Type != null)
- {
- frame.SetProperty("type", command.Type);
- }
- if(command.TransactionId!=null)
- {
- frame.SetProperty("transaction",
StompHelper.ToStomp(command.TransactionId));
- }
-
- frame.SetProperty("persistent", command.Persistent);
-
- // Perform any Content Marshaling.
- command.BeforeMarshall(this);
-
- // Store the Marshaled Content.
- frame.Content = command.Content;
-
- if(command is BytesMessage && command.Content != null &&
command.Content.Length > 0)
- {
- frame.SetProperty("content-length", command.Content.Length);
- }
-
- // Marshal all properties to the Frame.
- IPrimitiveMap map = command.Properties;
- foreach(string key in map.Keys)
- {
- frame.SetProperty(key, map[key]);
- }
-
- frame.ToStream(dataOut);
- }
-
- protected virtual void WriteMessageAck(MessageAck command,
BinaryWriter dataOut)
- {
- StompFrame frame = new StompFrame("ACK");
- if(command.ResponseRequired)
- {
- frame.SetProperty("receipt", "ignore:" + command.CommandId);
- }
-
- frame.SetProperty("message-id",
StompHelper.ToStomp(command.LastMessageId));
- if(command.TransactionId != null)
- {
- frame.SetProperty("transaction",
StompHelper.ToStomp(command.TransactionId));
- }
-
- frame.ToStream(dataOut);
- }
-
protected virtual void SendCommand(Command command)
{
if(transport == null)