Author: robbie
Date: Thu Oct 13 11:50:25 2011
New Revision: 1182793

URL: http://svn.apache.org/viewvc?rev=1182793&view=rev
Log:
QPID-3546: update the highestDeliveryTag marker during failover to prevent the 
stale value being used to set the rollback mark on the first rollback after 
failover.

This commit only fixes the 0-10 client path, as fixing this on the 0-8/9/9-1 
path currently would cause undesirable interaction with the issue in QPID 3521.

Applied patch from Oleksandr Rudyy<[email protected]> and myself.

Modified:
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1182793&r1=1182792&r2=1182793&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Thu Oct 13 11:50:25 2011
@@ -308,7 +308,7 @@ public abstract class AMQSession<C exten
     protected final FlowControllingBlockingQueue _queue;
 
     /** Holds the highest received delivery tag. */
-    private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
+    protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
     private final AtomicLong _rollbackMark = new AtomicLong(-1);
     
     /** All the not yet acknowledged message tags */
@@ -856,6 +856,10 @@ public abstract class AMQSession<C exten
         //Check that we are clean to commit.
         if (_failedOverDirty)
         {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Session " + _channelId + " was dirty whilst 
failing over. Rolling back.");
+            }
             rollback();
 
             throw new TransactionRolledBackException("Connection failover has 
occured with uncommitted transaction activity." +
@@ -1814,9 +1818,7 @@ public abstract class AMQSession<C exten
                     suspendChannel(true);
                 }
 
-                // Let the dispatcher know that all the incomming messages
-                // should be rolled back(reject/release)
-                _rollbackMark.set(_highestDeliveryTag.get());
+                setRollbackMark();
 
                 syncDispatchQueue();
 
@@ -3202,7 +3204,7 @@ public abstract class AMQSession<C exten
                     setConnectionStopped(true);
                 }
 
-                _rollbackMark.set(_highestDeliveryTag.get());
+                setRollbackMark();
 
                 _dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
 
@@ -3351,6 +3353,11 @@ public abstract class AMQSession<C exten
                 if (!(message instanceof CloseConsumerMessage)
                     && tagLE(deliveryTag, _rollbackMark.get()))
                 {
+                    if (_logger.isDebugEnabled())
+                    {
+                        _logger.debug("Rejecting message because delivery tag 
" + deliveryTag
+                                + " <= rollback mark " + _rollbackMark.get());
+                    }
                     rejectMessage(message, true);
                 }
                 else if (_usingDispatcherForCleanup)
@@ -3412,6 +3419,11 @@ public abstract class AMQSession<C exten
                 // Don't reject if we're already closing
                 if (!_closed.get())
                 {
+                    if (_logger.isDebugEnabled())
+                    {
+                        _logger.debug("Rejecting message with delivery tag " + 
message.getDeliveryTag()
+                                + " for closing consumer " + 
String.valueOf(consumer == null? null: consumer._consumerTag));
+                    }
                     rejectMessage(message, true);
                 }
             }
@@ -3542,4 +3554,15 @@ public abstract class AMQSession<C exten
     {
         return ((destination instanceof AMQDestination)  && 
((AMQDestination)destination).isBrowseOnly());
     }
+
+    private void setRollbackMark()
+    {
+        // Let the dispatcher know that all the incomming messages
+        // should be rolled back(reject/release)
+        _rollbackMark.set(_highestDeliveryTag.get());
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Rollback mark is set to " + _rollbackMark.get());
+        }
+    }
 }

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1182793&r1=1182792&r2=1182793&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Thu Oct 13 11:50:25 2011
@@ -1378,4 +1378,15 @@ public class AMQSession_0_10 extends AMQ
             getQpidSession().sync();
         }
     }
+
+    @Override
+    void resubscribe() throws AMQException
+    {
+        // Also reset the delivery tag tracker, to insure we dont
+        // return the first <total number of msgs received on session>
+        // messages sent by the brokers following the first rollback
+        // after failover
+        _highestDeliveryTag.set(-1);
+        super.resubscribe();
+    }
 }

Modified: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java?rev=1182793&r1=1182792&r2=1182793&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
 (original)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
 Thu Oct 13 11:50:25 2011
@@ -94,7 +94,7 @@ public class CancelTest extends QpidBrok
         browser.close();
 
         MessageConsumer consumer = _clientSession.createConsumer(_queue);
-        assertNotNull( consumer.receive() );
+        assertNotNull( consumer.receive(2000l) );
         consumer.close();
     }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to