Author: robbie Date: Thu Oct 13 11:50:25 2011 New Revision: 1182793 URL: http://svn.apache.org/viewvc?rev=1182793&view=rev Log: QPID-3546: update the highestDeliveryTag marker during failover to prevent the stale value being used to set the rollback mark on the first rollback after failover.
This commit only fixes the 0-10 client path, as fixing this on the 0-8/9/9-1 path currently would cause undesirable interaction with the issue in QPID 3521. Applied patch from Oleksandr Rudyy<[email protected]> and myself. Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1182793&r1=1182792&r2=1182793&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Oct 13 11:50:25 2011 @@ -308,7 +308,7 @@ public abstract class AMQSession<C exten protected final FlowControllingBlockingQueue _queue; /** Holds the highest received delivery tag. */ - private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); + protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1); private final AtomicLong _rollbackMark = new AtomicLong(-1); /** All the not yet acknowledged message tags */ @@ -856,6 +856,10 @@ public abstract class AMQSession<C exten //Check that we are clean to commit. if (_failedOverDirty) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Session " + _channelId + " was dirty whilst failing over. Rolling back."); + } rollback(); throw new TransactionRolledBackException("Connection failover has occured with uncommitted transaction activity." + @@ -1814,9 +1818,7 @@ public abstract class AMQSession<C exten suspendChannel(true); } - // Let the dispatcher know that all the incomming messages - // should be rolled back(reject/release) - _rollbackMark.set(_highestDeliveryTag.get()); + setRollbackMark(); syncDispatchQueue(); @@ -3202,7 +3204,7 @@ public abstract class AMQSession<C exten setConnectionStopped(true); } - _rollbackMark.set(_highestDeliveryTag.get()); + setRollbackMark(); _dispatcherLogger.debug("Session Pre Dispatch Queue cleared"); @@ -3351,6 +3353,11 @@ public abstract class AMQSession<C exten if (!(message instanceof CloseConsumerMessage) && tagLE(deliveryTag, _rollbackMark.get())) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting message because delivery tag " + deliveryTag + + " <= rollback mark " + _rollbackMark.get()); + } rejectMessage(message, true); } else if (_usingDispatcherForCleanup) @@ -3412,6 +3419,11 @@ public abstract class AMQSession<C exten // Don't reject if we're already closing if (!_closed.get()) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting message with delivery tag " + message.getDeliveryTag() + + " for closing consumer " + String.valueOf(consumer == null? null: consumer._consumerTag)); + } rejectMessage(message, true); } } @@ -3542,4 +3554,15 @@ public abstract class AMQSession<C exten { return ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()); } + + private void setRollbackMark() + { + // Let the dispatcher know that all the incomming messages + // should be rolled back(reject/release) + _rollbackMark.set(_highestDeliveryTag.get()); + if (_logger.isDebugEnabled()) + { + _logger.debug("Rollback mark is set to " + _rollbackMark.get()); + } + } } Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1182793&r1=1182792&r2=1182793&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Oct 13 11:50:25 2011 @@ -1378,4 +1378,15 @@ public class AMQSession_0_10 extends AMQ getQpidSession().sync(); } } + + @Override + void resubscribe() throws AMQException + { + // Also reset the delivery tag tracker, to insure we dont + // return the first <total number of msgs received on session> + // messages sent by the brokers following the first rollback + // after failover + _highestDeliveryTag.set(-1); + super.resubscribe(); + } } Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java?rev=1182793&r1=1182792&r2=1182793&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java (original) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java Thu Oct 13 11:50:25 2011 @@ -94,7 +94,7 @@ public class CancelTest extends QpidBrok browser.close(); MessageConsumer consumer = _clientSession.createConsumer(_queue); - assertNotNull( consumer.receive() ); + assertNotNull( consumer.receive(2000l) ); consumer.close(); } } --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:[email protected]
