Author: chirino
Date: Tue Mar  6 12:17:53 2007
New Revision: 515281

URL: http://svn.apache.org/viewvc?view=rev&rev=515281
Log:
- Have the ToString of the TextMessage also dump out it's text
- Improved the Stomp wireformat so that it sets the right headers on the Ack
   - We now remove all the consumer assoicated with a session when a session is 
closed.

Modified:
    
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/ActiveMQTextMessage.cs
    
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs

Modified: 
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/ActiveMQTextMessage.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/ActiveMQTextMessage.cs?view=diff&rev=515281&r1=515280&r2=515281
==============================================================================
--- 
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/ActiveMQTextMessage.cs
 (original)
+++ 
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/ActiveMQTextMessage.cs
 Tue Mar  6 12:17:53 2007
@@ -40,9 +40,13 @@
         // TODO generate Equals method
         // TODO generate GetHashCode method
         // TODO generate ToString method
-        
-        
-        public override byte GetDataStructureType()
+
+           public override string ToString()
+           {
+               return base.ToString() + " Text="+Text;
+           }
+
+           public override byte GetDataStructureType()
         {
             return ID_ActiveMQTextMessage;
         }

Modified: 
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs?view=diff&rev=515281&r1=515280&r2=515281
==============================================================================
--- 
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
 (original)
+++ 
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
 Tue Mar  6 12:17:53 2007
@@ -32,8 +32,9 @@
     public class StompWireFormat : IWireFormat
     {
                private Encoding encoding = new UTF8Encoding();
-               private ITransport transport;
-               
+               private ITransport transport;
+        private IDictionary consumers = Hashtable.Synchronized(new 
Hashtable());
+
                public StompWireFormat()
                {
                }
@@ -312,20 +313,25 @@
                {
                        ss.WriteCommand(command, "SUBSCRIBE");
                        ss.WriteHeader("destination", 
StompHelper.ToStomp(command.Destination));
-                       ss.WriteHeader("selector", command.Selector);
-                       ss.WriteHeader("id", 
StompHelper.ToStomp(command.ConsumerId));
-                       ss.WriteHeader("durable-subscriber-name", 
command.SubscriptionName);
-                       ss.WriteHeader("no-local", command.NoLocal);
+                       ss.WriteHeader("id", 
StompHelper.ToStomp(command.ConsumerId));
+                   ss.WriteHeader("durable-subscriber-name", 
command.SubscriptionName);
+            ss.WriteHeader("selector", command.Selector);
+            if ( command.NoLocal )
+                ss.WriteHeader("no-local", command.NoLocal);
                        ss.WriteHeader("ack", "client");
 
                        // ActiveMQ extensions to STOMP
-                       ss.WriteHeader("activemq.dispatchAsync", 
command.DispatchAsync);
-                       ss.WriteHeader("activemq.exclusive", command.Exclusive);
+                       ss.WriteHeader("activemq.dispatchAsync", 
command.DispatchAsync);
+            if ( command.Exclusive )
+                           ss.WriteHeader("activemq.exclusive", 
command.Exclusive);
+                   
                        ss.WriteHeader("activemq.maximumPendingMessageLimit", 
command.MaximumPendingMessageLimit);
                        ss.WriteHeader("activemq.prefetchSize", 
command.PrefetchSize);
-                       ss.WriteHeader("activemq.priority ", command.Priority);
-                       ss.WriteHeader("activemq.retroactive", 
command.Retroactive);
-                       
+                       ss.WriteHeader("activemq.priority ", command.Priority);
+            if ( command.Retroactive )
+                           ss.WriteHeader("activemq.retroactive", 
command.Retroactive);
+
+            consumers[command.ConsumerId] = command.ConsumerId;
                        ss.Flush();
                }
 
@@ -336,10 +342,35 @@
                        {
                                ConsumerId consumerId = id as ConsumerId;
                                ss.WriteCommand(command, "UNSUBSCRIBE");
-                               ss.WriteHeader("id", 
StompHelper.ToStomp(consumerId));
-                               
-                               ss.Flush();
-                       }
+                               ss.WriteHeader("id", 
StompHelper.ToStomp(consumerId));                          
+                               ss.Flush();
+                consumers.Remove(consumerId);
+            }
+                   // When a session is removed, it needs to remove it's 
consumers too.
+            if (id is SessionId)
+            {
+                
+                // Find all the consumer that were part of the session.
+                SessionId sessionId = (SessionId) id;
+                ArrayList matches = new ArrayList();
+                foreach (DictionaryEntry entry in consumers)
+                {
+                    ConsumerId t = (ConsumerId) entry.Key;
+                    if( sessionId.ConnectionId==t.ConnectionId && 
sessionId.Value==t.SessionId )
+                    {
+                        matches.Add(t);
+                    }
+                }
+
+                // Un-subscribe them.
+                foreach (ConsumerId consumerId in matches)
+                {
+                    ss.WriteCommand(command, "UNSUBSCRIBE");
+                    ss.WriteHeader("id", StompHelper.ToStomp(consumerId));
+                    ss.Flush();
+                    consumers.Remove(consumerId);
+                }
+            }
                }
                
                
@@ -374,13 +405,20 @@
                protected virtual void WriteMessage(ActiveMQMessage command, 
StompFrameStream ss)
                {
                        ss.WriteCommand(command, "SEND");
-                       ss.WriteHeader("destination", 
StompHelper.ToStomp(command.Destination));
-                       ss.WriteHeader("reply-to", 
StompHelper.ToStomp(command.ReplyTo));
-                       ss.WriteHeader("correlation-id", command.CorrelationId);
-                       ss.WriteHeader("expires", command.Expiration);
-                       ss.WriteHeader("priority", command.Priority);
-                       ss.WriteHeader("type", command.Type);
-                       ss.WriteHeader("transaction", 
StompHelper.ToStomp(command.TransactionId));
+                       ss.WriteHeader("destination", 
StompHelper.ToStomp(command.Destination));
+            if (command.ReplyTo != null)
+                           ss.WriteHeader("reply-to", 
StompHelper.ToStomp(command.ReplyTo));
+            if (command.CorrelationId != null )
+                ss.WriteHeader("correlation-id", command.CorrelationId);
+            if (command.Expiration != 0)
+                ss.WriteHeader("expires", command.Expiration);
+            if (command.Priority != 4)
+                           ss.WriteHeader("priority", command.Priority);
+            if (command.Type != null)
+                ss.WriteHeader("type", command.Type);            
+                   if (command.TransactionId!=null)
+                           ss.WriteHeader("transaction", 
StompHelper.ToStomp(command.TransactionId));
+                   
                        ss.WriteHeader("persistent", command.Persistent);
                        
                        // lets force the content to be marshalled
@@ -409,9 +447,10 @@
                {
                        ss.WriteCommand(command, "ACK");
                        
-                       // TODO handle bulk ACKs?
-                       ss.WriteHeader("message-id", command.FirstMessageId);
-                       ss.WriteHeader("transaction", command.TransactionId);
+                       // TODO handle bulk ACKs?
+            ss.WriteHeader("message-id", 
StompHelper.ToStomp(command.FirstMessageId));
+                       if( command.TransactionId!=null )
+                ss.WriteHeader("transaction", 
StompHelper.ToStomp(command.TransactionId));
 
                        ss.Flush();
                }


Reply via email to