Author: jstrachan
Date: Wed Feb 28 11:26:05 2007
New Revision: 512915
URL: http://svn.apache.org/viewvc?view=rev&rev=512915
Log:
added support for unsubscribe and transactions; needs more testing though
Modified:
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
Modified:
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs?view=diff&rev=512915&r1=512914&r2=512915
==============================================================================
---
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs
(original)
+++
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs
Wed Feb 28 11:26:05 2007
@@ -31,10 +31,12 @@
/// </summary>
public class StompHelper
{
- public static ActiveMQDestination ToDestination(string text)
+
+
+ public static ActiveMQDestination ToDestination(string text)
{
int type = ActiveMQDestination.ACTIVEMQ_QUEUE;
- if (text.StartsWith("/queue/"))
+ if (text.StartsWith("/queue/"))
{
text = text.Substring("/queue/".Length);
}
@@ -62,7 +64,7 @@
{
return null;
}
- else
+ else
{
switch (destination.DestinationType)
{
@@ -158,6 +160,20 @@
answer.ProducerId = ToProducerId(text);
return answer;
}
+
+ public static string ToStomp(TransactionId id)
+ {
+ if (id is LocalTransactionId)
+ {
+ return ToStomp(id as LocalTransactionId);
+ }
+ return id.ToString();
+ }
+
+ public static string ToStomp(LocalTransactionId transactionId)
+ {
+ return transactionId.ConnectionId.Value + ":" +
transactionId.Value;
+ }
public static bool ToBool(string text, bool defaultValue)
{
@@ -165,7 +181,7 @@
{
return defaultValue;
}
- else
+ else
{
return "true" == text || "TRUE" == text;
}
Modified:
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs?view=diff&rev=512915&r1=512914&r2=512915
==============================================================================
---
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
(original)
+++
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
Wed Feb 28 11:26:05 2007
@@ -69,6 +69,18 @@
{
WriteMessageAck((MessageAck) o, ds);
}
+ else if (o is TransactionInfo)
+ {
+ WriteTransactionInfo((TransactionInfo) o, ds);
+ }
+ else if (o is ShutdownInfo)
+ {
+ WriteShutdownInfo((ShutdownInfo) o, ds);
+ }
+ else if (o is RemoveInfo)
+ {
+ WriteRemoveInfo((RemoveInfo) o, ds);
+ }
else if (o is Command)
{
Command command = o as Command;
@@ -78,13 +90,15 @@
response.CorrelationId =
command.CommandId;
SendCommand(response);
}
+ Console.WriteLine("#### Ignored command: " + o);
}
else
{
- Console.WriteLine("Ignored command: " + o);
+ Console.WriteLine("#### Ignored command: " + o);
}
}
-
+
+
public Object Unmarshal(BinaryReader dis)
{
StreamReader socketReader = new
StreamReader(dis.BaseStream);
@@ -138,7 +152,7 @@
return answer;
}
- protected Object CreateCommand(string command, IDictionary
headers, byte[] content)
+ protected virtual Object CreateCommand(string command,
IDictionary headers, byte[] content)
{
if (command == "RECEIPT" || command == "CONNECTED")
{
@@ -176,7 +190,7 @@
}
}
- protected Command ReadMessage(string command, IDictionary
headers, byte[] content)
+ protected virtual Command ReadMessage(string command,
IDictionary headers, byte[] content)
{
ActiveMQMessage message = null;
if (headers.Contains("content-length"))
@@ -241,7 +255,7 @@
- protected void WriteConnectionInfo(ConnectionInfo command,
StompFrameStream ss)
+ protected virtual void WriteConnectionInfo(ConnectionInfo
command, StompFrameStream ss)
{
// lets force a receipt
command.ResponseRequired = true;
@@ -252,8 +266,14 @@
ss.WriteHeader("passcode", command.Password);
ss.Flush();
}
+
+ protected virtual void WriteShutdownInfo(ShutdownInfo command,
StompFrameStream ss)
+ {
+ ss.WriteCommand(command, "DISCONNECT");
+ ss.Flush();
+ }
- protected void WriteConsumerInfo(ConsumerInfo command,
StompFrameStream ss)
+ protected virtual void WriteConsumerInfo(ConsumerInfo command,
StompFrameStream ss)
{
ss.WriteCommand(command, "SUBSCRIBE");
ss.WriteHeader("destination",
StompHelper.ToStomp(command.Destination));
@@ -274,7 +294,49 @@
ss.Flush();
}
- protected void WriteMessage(ActiveMQMessage command,
StompFrameStream ss)
+ protected virtual void WriteRemoveInfo(RemoveInfo command,
StompFrameStream ss)
+ {
+ object id = command.ObjectId;
+ if (id is ConsumerId)
+ {
+ ConsumerId consumerId = id as ConsumerId;
+ ss.WriteCommand(command, "UNSUBSCRIBE");
+ ss.WriteHeader("id",
StompHelper.ToStomp(consumerId));
+
+ ss.Flush();
+ }
+ }
+
+
+ protected virtual void WriteTransactionInfo(TransactionInfo
command, StompFrameStream ss)
+ {
+ TransactionId id = command.TransactionId;
+ if (id is LocalTransactionId)
+ {
+ string type = "BEGIN";
+ TransactionType transactionType =
(TransactionType) command.Type;
+ switch (transactionType)
+ {
+ case TransactionType.CommitOnePhase:
+ command.ResponseRequired = true;
+ type = "COMMIT";
+ break;
+ case TransactionType.Rollback:
+ command.ResponseRequired = true;
+ type = "ABORT";
+ break;
+ }
+ Console.WriteLine(">>> For transaction type: "
+ transactionType + " we are using command type: " + type);
+
+ ss.WriteCommand(command, type);
+
+ ss.WriteHeader("transaction",
StompHelper.ToStomp(id));
+
+ ss.Flush();
+ }
+ }
+
+ protected virtual void WriteMessage(ActiveMQMessage command,
StompFrameStream ss)
{
ss.WriteCommand(command, "SEND");
ss.WriteHeader("destination",
StompHelper.ToStomp(command.Destination));
@@ -283,7 +345,7 @@
ss.WriteHeader("expires", command.Expiration);
ss.WriteHeader("priority", command.Priority);
ss.WriteHeader("type", command.Type);
- ss.WriteHeader("transaction", command.TransactionId);
+ ss.WriteHeader("transaction",
StompHelper.ToStomp(command.TransactionId));
ss.WriteHeader("persistent", command.Persistent);
// lets force the content to be marshalled
@@ -308,7 +370,7 @@
ss.Flush();
}
- protected void WriteMessageAck(MessageAck command,
StompFrameStream ss)
+ protected virtual void WriteMessageAck(MessageAck command,
StompFrameStream ss)
{
ss.WriteCommand(command, "ACK");
@@ -319,7 +381,7 @@
ss.Flush();
}
- protected void SendCommand(Command command)
+ protected virtual void SendCommand(Command command)
{
if (transport == null)
{
@@ -331,7 +393,7 @@
}
}
- protected string RemoveHeader(IDictionary headers, string name)
+ protected virtual string RemoveHeader(IDictionary headers,
string name)
{
object value = headers[name];
if (value == null)
@@ -346,7 +408,7 @@
}
- protected string ToString(object value)
+ protected virtual string ToString(object value)
{
if (value != null)
{