Author: jstrachan
Date: Wed Feb 28 11:26:05 2007
New Revision: 512915

URL: http://svn.apache.org/viewvc?view=rev&rev=512915
Log:
added support for unsubscribe and transactions; needs more testing though

Modified:
    
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs
    
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs

Modified: 
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs?view=diff&rev=512915&r1=512914&r2=512915
==============================================================================
--- 
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs
 (original)
+++ 
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs
 Wed Feb 28 11:26:05 2007
@@ -31,10 +31,12 @@
     /// </summary>
     public class StompHelper
     {
-               public static ActiveMQDestination ToDestination(string text) 
+
+
+               public static ActiveMQDestination ToDestination(string text)
                {
                        int type = ActiveMQDestination.ACTIVEMQ_QUEUE;
-                       if (text.StartsWith("/queue/")) 
+                       if (text.StartsWith("/queue/"))
                        {
                                text = text.Substring("/queue/".Length);
                        }
@@ -62,7 +64,7 @@
                        {
                                return null;
                        }
-                       else 
+                       else
                        {
                                switch (destination.DestinationType)
                                {
@@ -158,6 +160,20 @@
                        answer.ProducerId = ToProducerId(text);
                        return answer;
                }
+       
+               public static string ToStomp(TransactionId id)
+               {
+                       if (id is LocalTransactionId)
+                       {
+                               return ToStomp(id as LocalTransactionId);
+                       }
+                       return id.ToString();
+               }
+               
+               public static string ToStomp(LocalTransactionId transactionId)
+               {
+                       return transactionId.ConnectionId.Value + ":" + 
transactionId.Value;
+               }
                
                public static bool ToBool(string text, bool defaultValue)
                {
@@ -165,7 +181,7 @@
                        {
                                return defaultValue;
                        }
-                       else 
+                       else
                        {
                                return "true" == text || "TRUE" == text;
                        }

Modified: 
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs?view=diff&rev=512915&r1=512914&r2=512915
==============================================================================
--- 
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
 (original)
+++ 
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
 Wed Feb 28 11:26:05 2007
@@ -69,6 +69,18 @@
                        {
                                WriteMessageAck((MessageAck) o, ds);
                        }
+                       else if (o is TransactionInfo)
+                       {
+                               WriteTransactionInfo((TransactionInfo) o, ds);
+                       }
+                       else if (o is ShutdownInfo)
+                       {
+                               WriteShutdownInfo((ShutdownInfo) o, ds);
+                       }
+                       else if (o is RemoveInfo)
+                       {
+                               WriteRemoveInfo((RemoveInfo) o, ds);
+                       }
                        else if (o is Command)
                        {
                                Command command = o as Command;
@@ -78,13 +90,15 @@
                                        response.CorrelationId = 
command.CommandId;
                                        SendCommand(response);
                                }
+                               Console.WriteLine("#### Ignored command: " + o);
                        }
                        else
                        {
-                               Console.WriteLine("Ignored command: " + o);
+                               Console.WriteLine("#### Ignored command: " + o);
                        }
         }
-        
+
+
         public Object Unmarshal(BinaryReader dis)
         {
                        StreamReader socketReader = new 
StreamReader(dis.BaseStream);
@@ -138,7 +152,7 @@
                        return answer;
         }
 
-               protected Object CreateCommand(string command, IDictionary 
headers, byte[] content)
+               protected virtual Object CreateCommand(string command, 
IDictionary headers, byte[] content)
                {
                        if (command == "RECEIPT" || command == "CONNECTED")
                        {
@@ -176,7 +190,7 @@
                        }
                }
                
-               protected Command ReadMessage(string command, IDictionary 
headers, byte[] content)
+               protected virtual Command ReadMessage(string command, 
IDictionary headers, byte[] content)
                {
                        ActiveMQMessage message = null;
                        if (headers.Contains("content-length"))
@@ -241,7 +255,7 @@
                
                
                
-               protected void WriteConnectionInfo(ConnectionInfo command, 
StompFrameStream ss)
+               protected virtual void WriteConnectionInfo(ConnectionInfo 
command, StompFrameStream ss)
                {
                        // lets force a receipt
                        command.ResponseRequired = true;
@@ -252,8 +266,14 @@
                        ss.WriteHeader("passcode", command.Password);
                        ss.Flush();
                }
+               
+               protected virtual void WriteShutdownInfo(ShutdownInfo command, 
StompFrameStream ss)
+               {
+                       ss.WriteCommand(command, "DISCONNECT");
+                       ss.Flush();
+               }
 
-               protected void WriteConsumerInfo(ConsumerInfo command, 
StompFrameStream ss)
+               protected virtual void WriteConsumerInfo(ConsumerInfo command, 
StompFrameStream ss)
                {
                        ss.WriteCommand(command, "SUBSCRIBE");
                        ss.WriteHeader("destination", 
StompHelper.ToStomp(command.Destination));
@@ -274,7 +294,49 @@
                        ss.Flush();
                }
 
-               protected void WriteMessage(ActiveMQMessage command, 
StompFrameStream ss)
+               protected virtual void WriteRemoveInfo(RemoveInfo command, 
StompFrameStream ss)
+               {
+                       object id = command.ObjectId;
+                       if (id is ConsumerId)
+                       {
+                               ConsumerId consumerId = id as ConsumerId;
+                               ss.WriteCommand(command, "UNSUBSCRIBE");
+                               ss.WriteHeader("id", 
StompHelper.ToStomp(consumerId));
+                               
+                               ss.Flush();
+                       }
+               }
+               
+               
+               protected virtual void WriteTransactionInfo(TransactionInfo 
command, StompFrameStream ss)
+               {
+                       TransactionId id = command.TransactionId;
+                       if (id is LocalTransactionId)
+                       {
+                               string type = "BEGIN";
+                               TransactionType transactionType = 
(TransactionType) command.Type;
+                               switch (transactionType)
+                               {
+                                       case TransactionType.CommitOnePhase:
+                                               command.ResponseRequired = true;
+                                               type = "COMMIT";
+                                               break;
+                                       case TransactionType.Rollback:
+                                               command.ResponseRequired = true;
+                                               type = "ABORT";
+                                               break;
+                               }
+                               Console.WriteLine(">>> For transaction type: " 
+ transactionType + " we are using command type: " + type);
+                               
+                               ss.WriteCommand(command, type);
+                               
+                               ss.WriteHeader("transaction", 
StompHelper.ToStomp(id));
+                               
+                               ss.Flush();
+                       }
+               }
+               
+               protected virtual void WriteMessage(ActiveMQMessage command, 
StompFrameStream ss)
                {
                        ss.WriteCommand(command, "SEND");
                        ss.WriteHeader("destination", 
StompHelper.ToStomp(command.Destination));
@@ -283,7 +345,7 @@
                        ss.WriteHeader("expires", command.Expiration);
                        ss.WriteHeader("priority", command.Priority);
                        ss.WriteHeader("type", command.Type);
-                       ss.WriteHeader("transaction", command.TransactionId);
+                       ss.WriteHeader("transaction", 
StompHelper.ToStomp(command.TransactionId));
                        ss.WriteHeader("persistent", command.Persistent);
                        
                        // lets force the content to be marshalled
@@ -308,7 +370,7 @@
                        ss.Flush();
                }
                
-               protected void WriteMessageAck(MessageAck command, 
StompFrameStream ss)
+               protected virtual void WriteMessageAck(MessageAck command, 
StompFrameStream ss)
                {
                        ss.WriteCommand(command, "ACK");
                        
@@ -319,7 +381,7 @@
                        ss.Flush();
                }
                
-               protected void SendCommand(Command command)
+               protected virtual void SendCommand(Command command)
                {
                        if (transport == null)
                        {
@@ -331,7 +393,7 @@
                        }
                }
                
-               protected string RemoveHeader(IDictionary headers, string name)
+               protected virtual string RemoveHeader(IDictionary headers, 
string name)
                {
                        object value = headers[name];
                        if (value == null)
@@ -346,7 +408,7 @@
                }
                
                
-               protected string ToString(object value)
+               protected virtual string ToString(object value)
                {
                        if (value != null)
                        {


Reply via email to