Author: tabish
Date: Wed Dec 22 16:09:26 2010
New Revision: 1051965

URL: http://svn.apache.org/viewvc?rev=1051965&view=rev
Log:
Part of fix for: https://issues.apache.org/jira/browse/AMQNET-298
Additional stale MessageAck filtering for: 
https://issues.apache.org/jira/browse/AMQNET-294

Modified:
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=1051965&r1=1051964&r2=1051965&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
 Wed Dec 22 16:09:26 2010
@@ -543,7 +543,10 @@ namespace Apache.NMS.ActiveMQ.Transport.
                                        {
                                                // Simulate response to 
RemoveInfo command or a MessageAck
                         // since it would be stale at this point.
-                                               OnCommand(this, new Response() 
{ CorrelationId = command.CommandId });
+                        if(command.ResponseRequired)
+                        {
+                                                   OnCommand(this, new 
Response() { CorrelationId = command.CommandId });
+                        }
                                                return;
                                        }
                                }
@@ -553,6 +556,20 @@ namespace Apache.NMS.ActiveMQ.Transport.
                                {
                                        try
                                        {
+                        // Any Ack that was being sent when the connection 
dropped is now
+                        // stale so we don't send it here as it would cause an 
unmatched ack
+                        // on the broker side and probably prevent a consumer 
from getting
+                        // any new messages.
+                        if(command.IsMessageAck && i > 0)
+                        {
+                            Tracer.Debug("Inflight MessageAck being dropped as 
stale.");
+                            if(command.ResponseRequired)
+                            {
+                                OnCommand(this, new Response() { CorrelationId 
= command.CommandId });
+                            }
+                            return;
+                        }
+
                                                // Wait for transport to be 
connected.
                                                ITransport transport = 
ConnectedTransport;
                                                DateTime start = DateTime.Now;
@@ -829,7 +846,14 @@ namespace Apache.NMS.ActiveMQ.Transport.
 
                        foreach(Command command in tmpMap.Values)
                        {
-                               t.Oneway(command);
+                if(command.IsMessageAck)
+                {
+                    Tracer.Debug("Stored MessageAck being dropped as stale.");
+                    OnCommand(this, new Response() { CorrelationId = 
command.CommandId });
+                    continue;
+                }
+
+                t.Oneway(command);
                        }
                }
 
@@ -1320,6 +1344,8 @@ namespace Apache.NMS.ActiveMQ.Transport.
                                // get rid of unmanaged stuff
                        }
 
+            this.Stop();
+
                        disposed = true;
                }
 


Reply via email to