Author: tabish
Date: Mon Jan  3 19:25:03 2011
New Revision: 1054714

URL: http://svn.apache.org/viewvc?rev=1054714&view=rev
Log:
Fix for: https://issues.apache.org/jira/browse/AMQNET-293

Consumer not recovered using two step ConsumerInfo / ConsumerControl method if 
WireFormat is less then openwire v6.  

Modified:
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/IWireFormat.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs
 Mon Jan  3 19:25:03 2011
@@ -363,7 +363,7 @@ namespace Apache.NMS.ActiveMQ.OpenWire
             return null;
                }
 
-               public void renegotiateWireFormat(WireFormatInfo info)
+               public void RenegotiateWireFormat(WireFormatInfo info)
                {
                        if(info.Version < minimumVersion)
                        {

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
 Mon Jan  3 19:25:03 2011
@@ -29,7 +29,6 @@ namespace Apache.NMS.ActiveMQ.State
     /// </summary>
     public class ConnectionStateTracker : CommandVisitorAdapter
     {
-
         private static readonly Tracked TRACKED_RESPONSE_MARKER = new 
Tracked(null);
 
         protected Dictionary<ConnectionId, ConnectionState> connectionStates = 
new Dictionary<ConnectionId, ConnectionState>();
@@ -81,7 +80,7 @@ namespace Apache.NMS.ActiveMQ.State
         /// </summary>
         /// <param name="command"></param>
         /// <returns>null if the command is not state tracked.</returns>
-        public Tracked track(Command command)
+        public Tracked Track(Command command)
         {
             try
             {
@@ -97,7 +96,7 @@ namespace Apache.NMS.ActiveMQ.State
             }
         }
 
-        public void trackBack(Command command)
+        public void TrackBack(Command command)
         {
             if(TrackMessages && command != null && command.IsMessage)
             {
@@ -190,7 +189,7 @@ namespace Apache.NMS.ActiveMQ.State
             {
                 ConsumerInfo infoToSend = consumerState.Info;
 
-                if(!connectionInterruptionProcessingComplete && 
infoToSend.PrefetchSize > 0)
+                if(!connectionInterruptionProcessingComplete && 
infoToSend.PrefetchSize > 0 && transport.WireFormat.Version > 5)
                 {
                     infoToSend = consumerState.Info.Clone() as ConsumerInfo;
                     
connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, 
consumerState.Info);

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
 Mon Jan  3 19:25:03 2011
@@ -274,6 +274,20 @@ namespace Apache.NMS.ActiveMQ.Transport.
                        set { useExponentialBackOff = value; }
                }
 
+        public IWireFormat WireFormat
+        {
+            get
+            {
+                ITransport transport = ConnectedTransport;
+                if(transport != null)
+                {
+                    return transport.WireFormat;
+                }
+
+                return null;
+            }
+        }
+
                /// <summary>
                /// Gets or sets a value indicating whether to asynchronously 
connect to sockets
                /// </summary>
@@ -625,7 +639,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
                                                // If it was a request and it 
was not being tracked by
                                                // the state tracker, then hold 
it in the requestMap so
                                                // that we can replay it later.
-                                               Tracked tracked = 
stateTracker.track(command);
+                                               Tracked tracked = 
stateTracker.Track(command);
                                                lock(((ICollection) 
requestMap).SyncRoot)
                                                {
                                                        if(tracked != null && 
tracked.WaitingForResponse)
@@ -642,7 +656,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
                                                try
                                                {
                                                        
transport.Oneway(command);
-                                                       
stateTracker.trackBack(command);
+                                                       
stateTracker.TrackBack(command);
                                                }
                                                catch(Exception e)
                                                {
@@ -1220,7 +1234,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
                        }
                }
 
-
                public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
                {
                        if(IsUpdateURIsSupported)

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
 Mon Jan  3 19:25:03 2011
@@ -186,6 +186,15 @@ namespace Apache.NMS.ActiveMQ.Transport
                /// </param>
                void UpdateURIs(bool rebalance, Uri[] updatedURIs);
 
+        /// <summary>
+        /// Returns the IWireFormat object that this transport uses to marshal 
and
+        /// unmarshal Command objects.
+        /// </summary>
+        IWireFormat WireFormat
+        {
+            get;
+        }
+
        }
 }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/IWireFormat.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/IWireFormat.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/IWireFormat.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/IWireFormat.cs
 Mon Jan  3 19:25:03 2011
@@ -34,9 +34,24 @@ namespace Apache.NMS.ActiveMQ.Transport
                /// </summary>
         Object Unmarshal(BinaryReader dis);
 
-               ITransport Transport {
-                       get; set;
+        /// <summary>
+        /// Gets the Transport that own this WireFormat instnace.
+        /// </summary>
+               ITransport Transport
+        {
+                       get;
+            set;
                }
+
+        /// <summary>
+        /// Gets the current version of the protocol that this WireFormat 
instance
+        /// supports
+        /// </summary>
+        int Version
+        {
+            get;
+        }
+        
     }
 }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
 Mon Jan  3 19:25:03 2011
@@ -433,6 +433,11 @@ namespace Apache.NMS.ActiveMQ.Transport.
                        throw new IOException();
                }
 
+        public IWireFormat WireFormat
+        {
+            get { return null; }
+        }
+
                #endregion
        }
 }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
 Mon Jan  3 19:25:03 2011
@@ -129,7 +129,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
                                        seenShutdown = true;
                                }
 
-                               Wireformat.Marshal(command, socketWriter);
+                               WireFormat.Marshal(command, socketWriter);
                        }
                }
 
@@ -276,7 +276,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
 
                                try
                                {
-                                       command = (Command) 
Wireformat.Unmarshal(socketReader);
+                                       command = (Command) 
WireFormat.Unmarshal(socketReader);
                                }
                                catch(Exception ex)
                                {
@@ -334,7 +334,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
                        set { this.resumedHandler = value; }
                }
 
-               public IWireFormat Wireformat
+               public IWireFormat WireFormat
                {
                        get { return wireformat; }
                        set { wireformat = value; }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
 Mon Jan  3 19:25:03 2011
@@ -235,7 +235,11 @@ namespace Apache.NMS.ActiveMQ.Transport
                {
                        next.UpdateURIs(rebalance, updatedURIs);
                }
-               
+
+        public IWireFormat WireFormat
+        {
+            get { return next.WireFormat; }
+        }
     }
 }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs?rev=1054714&r1=1054713&r2=1054714&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs
 Mon Jan  3 19:25:03 2011
@@ -80,7 +80,7 @@ namespace Apache.NMS.ActiveMQ.Transport
                         throw new IOException("Remote wire format magic is 
invalid");
                     }
                     wireInfoSentDownLatch.await(negotiateTimeout);
-                    wireFormat.renegotiateWireFormat(info);
+                    wireFormat.RenegotiateWireFormat(info);
                 }
                 catch (Exception e)
                 {


Reply via email to