Author: chirino
Date: Tue Mar 6 08:15:59 2007
New Revision: 515172
URL: http://svn.apache.org/viewvc?view=rev&rev=515172
Log:
- Use the request-id and response-id headers on the CONNECT and CONNECTED
commands so that we stay Stomp 1.0 compatible.
- Avoid using a new StreamReader since on every demarshall since it could be
buffering up bytes that would get discarded on the next frame
- Protect against NPE when converting a Destination
Modified:
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
Modified:
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs?view=diff&rev=515172&r1=515171&r2=515172
==============================================================================
---
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs
(original)
+++
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs
Tue Mar 6 08:15:59 2007
@@ -35,6 +35,10 @@
public static ActiveMQDestination ToDestination(string text)
{
+ if( text == null )
+ {
+ return null;
+ }
int type = ActiveMQDestination.ACTIVEMQ_QUEUE;
if (text.StartsWith("/queue/"))
{
Modified:
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs?view=diff&rev=515172&r1=515171&r2=515172
==============================================================================
---
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
(original)
+++
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
Tue Mar 6 08:15:59 2007
@@ -90,29 +90,49 @@
response.CorrelationId =
command.CommandId;
SendCommand(response);
}
- Console.WriteLine("#### Ignored command: " +
o.GetType());
+ Console.WriteLine("#### Ignored command: " +
o.GetType());
+ Console.Out.Flush();
}
else
{
- Console.WriteLine("#### Ignored command: " +
o.GetType());
+ Console.WriteLine("#### Ignored command: " +
o.GetType());
+ Console.Out.Flush();
}
}
+ internal String ReadLine(BinaryReader dis)
+ {
+ MemoryStream ms = new MemoryStream();
+ while (true)
+ {
+ int nextChar = dis.Read();
+ if (nextChar < 0)
+ {
+ throw new IOException("Peer closed the stream.");
+ }
+ if( nextChar == 10 )
+ {
+ break;
+ }
+ ms.WriteByte((byte)nextChar);
+ }
+ return encoding.GetString(ms.ToArray());
+ }
+
public Object Unmarshal(BinaryReader dis)
- {
- StreamReader socketReader = new
StreamReader(dis.BaseStream);
+ {
string command;
- do {
- command = socketReader.ReadLine();
+ do {
+ command = ReadLine(dis);
}
while (command == "");
Console.WriteLine("<<<< command: " + command);
IDictionary headers = new Hashtable();
- string line;
- while((line = socketReader.ReadLine()) != "")
+ string line;
+ while ((line = ReadLine(dis)) != "")
{
int idx = line.IndexOf(':');
if (idx > 0)
@@ -136,15 +156,19 @@
content = dis.ReadBytes(size);
}
else
- {
- StringBuilder body = new StringBuilder();
+ {
+ MemoryStream ms = new MemoryStream();
int nextChar;
- while((nextChar = socketReader.Read()) != 0)
+ while((nextChar = dis.Read()) != 0)
{
- body.Append((char)nextChar);
- }
- string text = body.ToString().TrimEnd('\r',
'\n');
- content = encoding.GetBytes(text);
+ if( nextChar < 0 )
+ {
+ // EOF ??
+ break;
+ }
+ ms.WriteByte((byte)nextChar);
+ }
+ content = ms.ToArray();
}
Object answer = CreateCommand(command, headers,
content);
Console.WriteLine("<<<< received: " + answer);
@@ -156,13 +180,21 @@
{
if (command == "RECEIPT" || command == "CONNECTED")
{
- Response answer = new Response();
string text = RemoveHeader(headers,
"receipt-id");
if (text != null)
{
+ Response answer = new Response();
answer.CorrelationId =
Int32.Parse(text);
+ return answer;
+ } else if( command == "CONNECTED") {
+ text = RemoveHeader(headers, "response-id");
+ if (text != null)
+ {
+ Response answer = new Response();
+ answer.CorrelationId = Int32.Parse(text);
+ return answer;
+ }
}
- return answer;
}
else if (command == "ERROR")
{
@@ -183,11 +215,8 @@
{
return ReadMessage(command, headers, content);
}
- else
- {
- Console.WriteLine("Unknown command: " + command
+ " headers: " + headers);
- return null;
- }
+ Console.WriteLine("Unknown command: " + command + "
headers: " + headers);
+ return null;
}
protected virtual Command ReadMessage(string command,
IDictionary headers, byte[] content)
@@ -264,6 +293,12 @@
ss.WriteHeader("client-id", command.ClientId);
ss.WriteHeader("login", command.UserName);
ss.WriteHeader("passcode", command.Password);
+
+ if (command.ResponseRequired)
+ {
+ ss.WriteHeader("request-id", command.CommandId);
+ }
+
ss.Flush();
}